Load Documentation

class ecopipeline.load.AlarmLoader

Bases: Loader

Loader subclass for writing alarm data to the alarm and alarm_inst MySQL tables.

Overrides Loader table-existence checks and table-creation logic so that the paired alarm / alarm_inst tables are always created and validated together.

Methods

check_table_exists(cursor, table_name, dbname)

Check whether a table exists in the database.

create_new_table(cursor, table_name, ...[, ...])

Create a new table in the MySQL database.

report_data_loss(config[, site_name])

Log a DATA_LOSS_COP event in the site_events table.

check_table_exists(cursor: MySQLCursor, table_name: str, dbname: str) bool

Return True only when both the alarm table and its _inst companion exist.

Both tables must be present before loading can proceed; if either is missing they should both be (re)created together.

Parameters:
cursormysql.connector.cursor.MySQLCursor

An active database cursor.

table_namestr

Base name of the alarm table (e.g. 'alarm').

dbnamestr

Name of the database to search within.

Returns:
bool

True if both table_name and {table_name}_inst exist; False otherwise.

create_new_table(cursor: MySQLCursor, table_name: str, table_column_names: list = None, table_column_types: list = None, primary_key: str = 'time_pt', has_primary_key: bool = True) bool

Create both the alarm table and its _inst companion table.

Uses CREATE TABLE IF NOT EXISTS so the method is safe to call even when only one of the two tables is missing.

Parameters:
cursormysql.connector.cursor.MySQLCursor

An active database cursor.

table_namestr

Base name for the alarm table. The instance table is derived as {table_name}_inst.

table_column_nameslist, optional

Ignored; the alarm schema is fixed. Retained for interface compatibility with the parent class.

table_column_typeslist, optional

Ignored; the alarm schema is fixed. Retained for interface compatibility with the parent class.

primary_keystr, optional

Ignored; the alarm schema uses a fixed auto-increment id. Retained for interface compatibility with the parent class.

has_primary_keybool, optional

Ignored; the alarm schema always defines a primary key. Retained for interface compatibility with the parent class.

Returns:
bool

Always returns True after executing the DDL statements.

load_database(config: ConfigManager, alarm_df: DataFrame, table_name: str, dbname: str, auto_log_data_loss: bool = False, primary_key: str = 'time_pt', site_name: str = None) bool

Load alarm data into the alarm and alarm_inst tables.

For each alarm instance in the DataFrame the method:

  1. Checks whether a matching alarm record (same site_name, alarm_type, variable_name) already exists within a three-day gap tolerance.

  2. Creates a new alarm record if none is found, or extends the date range of the nearest existing record.

  3. Inserts alarm instances into {table_name}_inst using certainty-based overlap resolution:

    • Higher certainty new alarm: the existing instance is split around the new one so each segment retains the highest available certainty.

    • Lower certainty new alarm: only the non-overlapping portions of the new alarm are inserted.

    • Same certainty: the existing instance is extended to encompass both time periods.

Parameters:
configConfigManager

The ConfigManager object that holds configuration data for the pipeline.

alarm_dfpd.DataFrame

DataFrame of alarms to load. Required columns: start_time_pt, end_time_pt, alarm_type, variable_name. Optional column: certainty (defaults to 3 if absent). Certainty scale: 3 = high, 2 = medium, 1 = low.

table_namestr

Base name of the alarm table. The companion instance table is derived as {table_name}_inst.

dbnamestr

Name of the MySQL database.

auto_log_data_lossbool, optional

Unused in this subclass; retained for interface compatibility. Defaults to False.

primary_keystr, optional

Unused in this subclass; retained for interface compatibility. Defaults to 'time_pt'.

site_namestr, optional

Site name to associate alarms with. Defaults to config.get_site_name().

Returns:
bool

True if all alarms were loaded successfully; False if an exception occurred (transaction is rolled back).

Raises:
Exception

If alarm_df is missing any of the required columns.

Exception

If an alarm ID cannot be retrieved after insertion.

class ecopipeline.load.Loader

Bases: object

Base class for loading pandas DataFrames into a MySQL database.

Provides UPSERT-based loading, table creation, column management, and data-loss reporting utilities used by all concrete loader subclasses.

Attributes:
data_mapdict

Mapping from pandas dtype name strings to MySQL column type strings.

Methods

check_table_exists(cursor, table_name, dbname)

Check whether a table exists in the database.

create_new_table(cursor, table_name, ...[, ...])

Create a new table in the MySQL database.

report_data_loss(config[, site_name])

Log a DATA_LOSS_COP event in the site_events table.

check_table_exists(cursor: MySQLCursor, table_name: str, dbname: str) int

Check whether a table exists in the database.

Parameters:
cursormysql.connector.cursor.MySQLCursor

An active database cursor.

table_namestr

Name of the table to check.

dbnamestr

Name of the database to search within.

Returns:
int

The count of tables matching table_name in dbname. Evaluates to True when non-zero, so it can be used directly as a boolean.

create_new_columns(cursor: MySQLCursor, table_name: str, new_columns: list, data_types: str)

Add new columns to an existing database table.

Issues one ALTER TABLE ADD COLUMN statement per column. Stops and returns False on the first database error.

Parameters:
cursormysql.connector.cursor.MySQLCursor

An active database cursor.

table_namestr

Name of the table to alter.

new_columnslist

Ordered list of column names to add.

data_typesstr

Ordered list of MySQL type strings corresponding to new_columns.

Returns:
bool

True if all columns were added successfully; False if a database error occurred.

create_new_table(cursor: MySQLCursor, table_name: str, table_column_names: list, table_column_types: list, primary_key: str = 'time_pt', has_primary_key: bool = True) bool

Create a new table in the MySQL database.

Parameters:
cursormysql.connector.cursor.MySQLCursor

An active database cursor.

table_namestr

Name of the table to create.

table_column_nameslist

Ordered list of column names (excluding the primary-key column).

table_column_typeslist

Ordered list of MySQL type strings corresponding to table_column_names. Must be the same length as table_column_names.

primary_keystr, optional

Name of the primary-key column. Defaults to 'time_pt'.

has_primary_keybool, optional

If False, the primary_key column is added as a plain column rather than a PRIMARY KEY. Defaults to True.

Returns:
bool

True if the table was successfully created.

Raises:
Exception

If table_column_names and table_column_types are different lengths.

find_missing_columns(cursor: MySQLCursor, dataframe: DataFrame, dbname: str, table_name: str)

Identify DataFrame columns that are absent from the database table.

If communication with the database fails, empty lists are returned so that the caller can continue without adding any columns.

Parameters:
cursormysql.connector.cursor.MySQLCursor

An active database cursor.

dataframepd.DataFrame

The DataFrame whose columns are compared against the table schema.

dbnamestr

Name of the database that contains table_name.

table_namestr

Name of the table to inspect.

Returns:
list

Column names present in dataframe but absent from the table.

list

Corresponding MySQL type strings for each missing column.

load_database(config: ConfigManager, dataframe: DataFrame, table_name: str, dbname: str, auto_log_data_loss: bool = False, primary_key: str = 'time_pt')

Load a pandas DataFrame into a MySQL table using an UPSERT strategy.

Existing rows are updated rather than replaced; NULL values in the incoming DataFrame will not overwrite existing non-NULL values in the database.

Parameters:
configConfigManager

The ConfigManager object that holds configuration data for the pipeline.

dataframepd.DataFrame

The pandas DataFrame to be written into the MySQL server. The index must be the primary-key column (e.g. a datetime index named time_pt).

table_namestr

Name of the destination table in the database.

dbnamestr

Name of the MySQL database that contains table_name.

auto_log_data_lossbool, optional

If True, a DATA_LOSS_COP event is recorded when no data exists in the DataFrame for the last three days, or when an exception occurs. Defaults to False.

primary_keystr, optional

Column name used as the primary key in the destination table. Defaults to 'time_pt'.

Returns:
bool

True if the data were successfully written; False otherwise.

report_data_loss(config: ConfigManager, site_name: str = None)

Log a DATA_LOSS_COP event in the site_events table.

Records that COP calculations have been affected by a data loss condition. If an open DATA_LOSS_COP event already exists for the given site, no duplicate is inserted.

Parameters:
configConfigManager

The ConfigManager object that holds configuration data for the pipeline.

site_namestr, optional

Name of the site to associate the event with. Defaults to the site name returned by config.get_site_name().

Returns:
bool

True if the event was logged (or already existed); False if the site_events table does not exist.

ecopipeline.load.central_load_function(config: ConfigManager, df: DataFrame, hourly_df: DataFrame, daily_df: DataFrame, alarm_df: DataFrame)

Dispatch all pipeline DataFrames to their respective database tables.

Loads minute, hourly, and daily sensor data using Loader, and alarm data using AlarmLoader. Each DataFrame is only written when it is non-None and non-empty.

Parameters:
configConfigManager

The ConfigManager object that holds configuration data for the pipeline.

dfpd.DataFrame

Minute-resolution sensor data. May be None.

hourly_dfpd.DataFrame

Hourly-resolution sensor data. May be None.

daily_dfpd.DataFrame

Daily-resolution sensor data. May be None.

alarm_dfpd.DataFrame

Alarm records produced by the transform stage. May be None.

ecopipeline.load.check_table_exists(cursor: MySQLCursor, table_name: str, dbname: str) int

Check whether a table exists in the database.

Parameters:
cursormysql.connector.cursor.MySQLCursor

An active database cursor.

table_namestr

Name of the table to check.

dbnamestr

Name of the database to search within.

Returns:
int

The count of tables matching table_name in dbname. Evaluates to True when non-zero, so it can be used directly as a boolean.

ecopipeline.load.create_new_table(cursor: MySQLCursor, table_name: str, table_column_names: list, table_column_types: list, primary_key: str = 'time_pt', has_primary_key: bool = True) bool

Create a new table in the MySQL database.

Parameters:
cursormysql.connector.cursor.MySQLCursor

An active database cursor.

table_namestr

Name of the table to create.

table_column_nameslist

Ordered list of column names (excluding the primary-key column).

table_column_typeslist

Ordered list of MySQL type strings corresponding to table_column_names. Must be the same length as table_column_names.

primary_keystr, optional

Name of the primary-key column. Defaults to 'time_pt'.

has_primary_keybool, optional

If False, the primary_key column is added as a plain column rather than a PRIMARY KEY. Defaults to True.

Returns:
bool

True if the table was successfully created.

Raises:
Exception

If table_column_names and table_column_types are different lengths.

ecopipeline.load.load_alarms(config: ConfigManager, alarm_df: DataFrame, site_name: str = None) bool

Load alarm data into the alarm and alarm_inst tables.

Processes the output of central_alarm_df_creator(). For each alarm instance in the DataFrame the function:

  1. Checks whether a matching alarm record (same site_name, alarm_type, variable_name) already exists within a three-day gap tolerance.

  2. Creates a new alarm record if none is found, or extends the date range of the nearest existing record.

  3. Inserts alarm instances into alarm_inst using certainty-based overlap resolution:

    • Higher certainty new alarm: the existing instance is split around the new one so each segment retains the highest available certainty.

    • Lower certainty new alarm: only the non-overlapping portions of the new alarm are inserted.

    • Same certainty: the existing instance is extended to encompass both time periods.

Parameters:
configConfigManager

The ConfigManager object that holds configuration data for the pipeline.

alarm_dfpd.DataFrame

DataFrame output from central_alarm_df_creator(). Required columns: start_time_pt, end_time_pt, alarm_type, variable_name. Optional column: certainty (defaults to 3 if absent). Certainty scale: 3 = high, 2 = medium, 1 = low.

site_namestr, optional

Name of the site to associate alarms with. Defaults to config.get_site_name().

Returns:
bool

True if all alarms were loaded successfully; False if an exception occurred (transaction is rolled back).

Raises:
Exception

If alarm_df is missing any of the required columns.

Exception

If an alarm ID cannot be retrieved after insertion.

ecopipeline.load.load_data_statistics(config: ConfigManager, daily_stats_df: DataFrame, config_daily_indicator: str = 'day', custom_table_name: str = None)

Write daily data-quality statistics to the database.

The destination table is named {daily_table_name}_stats unless custom_table_name is provided.

Parameters:
configConfigManager

The ConfigManager object that holds configuration data for the pipeline.

daily_stats_dfpd.DataFrame

DataFrame produced by create_data_statistics_df() in ecopipeline.transform.

config_daily_indicatorstr, optional

Key used to look up the daily table name in config.ini. Defaults to 'day'.

custom_table_namestr, optional

Overrides the auto-generated {daily_table_name}_stats destination table name. When provided, config_daily_indicator is only used to supply config_info; it no longer determines the table name. Defaults to None.

Returns:
bool

True if the data were successfully written; False otherwise.

ecopipeline.load.load_event_table(config: ConfigManager, event_df: DataFrame, site_name: str = None)

Load event records into the site_events MySQL table.

Uses an UPSERT strategy so that existing automatically-uploaded rows can be updated while manually modified rows are left unchanged. If the DataFrame contains an alarm_type column the call is transparently redirected to load_alarms().

Parameters:
configConfigManager

The ConfigManager object that holds configuration data for the pipeline.

event_dfpd.DataFrame

DataFrame of events to load. The index must be start_time_pt. Required columns: end_time_pt, event_type, event_detail. Optional column: variable_name.

site_namestr, optional

Name of the site to associate events with. Defaults to config.get_site_name().

Returns:
bool

True if the data were successfully written; False if the table could not be created.

Raises:
Exception

If event_df is missing any of the required columns (end_time_pt, event_type, event_detail).

ecopipeline.load.load_overwrite_database(config: ConfigManager, dataframe: DataFrame, config_info: dict, data_type: str, primary_key: str = 'time_pt', table_name: str = None, auto_log_data_loss: bool = False, config_key: str = 'minute')

Load a pandas DataFrame into a MySQL table using an UPSERT strategy.

Existing rows are updated rather than replaced; NULL values in the incoming DataFrame will not overwrite existing non-NULL values in the database.

Parameters:
configConfigManager

The ConfigManager object that holds configuration data for the pipeline.

dataframepd.DataFrame

The DataFrame to be written into the MySQL server. The index must be the primary-key column (e.g. a datetime index named time_pt).

config_infodict

Configuration dictionary for the upload, obtainable via get_login_info(). Must contain a 'database' key and a nested dict keyed by data_type with a 'table_name' entry (used when table_name is None).

data_typestr

Key within config_info that identifies the target table section.

primary_keystr, optional

Column name used as the primary key. Defaults to 'time_pt'.

table_namestr, optional

Overrides the table name derived from config_info[data_type]. Defaults to None.

auto_log_data_lossbool, optional

If True, a DATA_LOSS_COP event is recorded when no data exists in the DataFrame for the last three days, or when an exception occurs. Defaults to False.

config_keystr, optional

Key in config.ini that points to the minute-table data for the site; also used as the site name when reporting data loss. Defaults to 'minute'.

Returns:
bool

True if the data were successfully written; False otherwise.

ecopipeline.load.report_data_loss(config: ConfigManager, site_name: str = None)

Log a DATA_LOSS_COP event in the site_events table.

Records that COP calculations have been affected by a data loss condition. If an open DATA_LOSS_COP event already exists for the given site, no duplicate is inserted.

Parameters:
configConfigManager

The ConfigManager object that holds configuration data for the pipeline.

site_namestr, optional

Name of the site to associate the event with. Defaults to the site name returned by config.get_site_name().

Returns:
bool

True if the event was logged (or already existed); False if the site_events table does not exist.