Load Documentation¶
- class ecopipeline.load.AlarmLoader¶
Bases:
LoaderLoader subclass for writing alarm data to the
alarmandalarm_instMySQL tables.Overrides
Loadertable-existence checks and table-creation logic so that the pairedalarm/alarm_insttables are always created and validated together.Methods
check_table_exists(cursor, table_name, dbname)Check whether a table exists in the database.
create_new_table(cursor, table_name, ...[, ...])Create a new table in the MySQL database.
report_data_loss(config[, site_name])Log a DATA_LOSS_COP event in the
site_eventstable.- check_table_exists(cursor: MySQLCursor, table_name: str, dbname: str) bool¶
Return
Trueonly when both the alarm table and its_instcompanion exist.Both tables must be present before loading can proceed; if either is missing they should both be (re)created together.
- Parameters:
- cursormysql.connector.cursor.MySQLCursor
An active database cursor.
- table_namestr
Base name of the alarm table (e.g.
'alarm').- dbnamestr
Name of the database to search within.
- Returns:
- bool
Trueif bothtable_nameand{table_name}_instexist;Falseotherwise.
- create_new_table(cursor: MySQLCursor, table_name: str, table_column_names: list = None, table_column_types: list = None, primary_key: str = 'time_pt', has_primary_key: bool = True) bool¶
Create both the alarm table and its
_instcompanion table.Uses
CREATE TABLE IF NOT EXISTSso the method is safe to call even when only one of the two tables is missing.- Parameters:
- cursormysql.connector.cursor.MySQLCursor
An active database cursor.
- table_namestr
Base name for the alarm table. The instance table is derived as
{table_name}_inst.- table_column_nameslist, optional
Ignored; the alarm schema is fixed. Retained for interface compatibility with the parent class.
- table_column_typeslist, optional
Ignored; the alarm schema is fixed. Retained for interface compatibility with the parent class.
- primary_keystr, optional
Ignored; the alarm schema uses a fixed auto-increment
id. Retained for interface compatibility with the parent class.- has_primary_keybool, optional
Ignored; the alarm schema always defines a primary key. Retained for interface compatibility with the parent class.
- Returns:
- bool
Always returns
Trueafter executing the DDL statements.
- load_database(config: ConfigManager, alarm_df: DataFrame, table_name: str, dbname: str, auto_log_data_loss: bool = False, primary_key: str = 'time_pt', site_name: str = None) bool¶
Load alarm data into the alarm and alarm_inst tables.
For each alarm instance in the DataFrame the method:
Checks whether a matching alarm record (same
site_name,alarm_type,variable_name) already exists within a three-day gap tolerance.Creates a new alarm record if none is found, or extends the date range of the nearest existing record.
Inserts alarm instances into
{table_name}_instusing certainty-based overlap resolution:Higher certainty new alarm: the existing instance is split around the new one so each segment retains the highest available certainty.
Lower certainty new alarm: only the non-overlapping portions of the new alarm are inserted.
Same certainty: the existing instance is extended to encompass both time periods.
- Parameters:
- configConfigManager
The ConfigManager object that holds configuration data for the pipeline.
- alarm_dfpd.DataFrame
DataFrame of alarms to load. Required columns:
start_time_pt,end_time_pt,alarm_type,variable_name. Optional column:certainty(defaults to3if absent). Certainty scale:3= high,2= medium,1= low.- table_namestr
Base name of the alarm table. The companion instance table is derived as
{table_name}_inst.- dbnamestr
Name of the MySQL database.
- auto_log_data_lossbool, optional
Unused in this subclass; retained for interface compatibility. Defaults to
False.- primary_keystr, optional
Unused in this subclass; retained for interface compatibility. Defaults to
'time_pt'.- site_namestr, optional
Site name to associate alarms with. Defaults to
config.get_site_name().
- Returns:
- bool
Trueif all alarms were loaded successfully;Falseif an exception occurred (transaction is rolled back).
- Raises:
- Exception
If
alarm_dfis missing any of the required columns.- Exception
If an alarm ID cannot be retrieved after insertion.
- class ecopipeline.load.Loader¶
Bases:
objectBase class for loading pandas DataFrames into a MySQL database.
Provides UPSERT-based loading, table creation, column management, and data-loss reporting utilities used by all concrete loader subclasses.
- Attributes:
- data_mapdict
Mapping from pandas dtype name strings to MySQL column type strings.
Methods
check_table_exists(cursor, table_name, dbname)Check whether a table exists in the database.
create_new_table(cursor, table_name, ...[, ...])Create a new table in the MySQL database.
report_data_loss(config[, site_name])Log a DATA_LOSS_COP event in the
site_eventstable.- check_table_exists(cursor: MySQLCursor, table_name: str, dbname: str) int¶
Check whether a table exists in the database.
- Parameters:
- cursormysql.connector.cursor.MySQLCursor
An active database cursor.
- table_namestr
Name of the table to check.
- dbnamestr
Name of the database to search within.
- Returns:
- int
The count of tables matching
table_nameindbname. Evaluates toTruewhen non-zero, so it can be used directly as a boolean.
- create_new_columns(cursor: MySQLCursor, table_name: str, new_columns: list, data_types: str)¶
Add new columns to an existing database table.
Issues one
ALTER TABLE … ADD COLUMNstatement per column. Stops and returnsFalseon the first database error.- Parameters:
- cursormysql.connector.cursor.MySQLCursor
An active database cursor.
- table_namestr
Name of the table to alter.
- new_columnslist
Ordered list of column names to add.
- data_typesstr
Ordered list of MySQL type strings corresponding to
new_columns.
- Returns:
- bool
Trueif all columns were added successfully;Falseif a database error occurred.
- create_new_table(cursor: MySQLCursor, table_name: str, table_column_names: list, table_column_types: list, primary_key: str = 'time_pt', has_primary_key: bool = True) bool¶
Create a new table in the MySQL database.
- Parameters:
- cursormysql.connector.cursor.MySQLCursor
An active database cursor.
- table_namestr
Name of the table to create.
- table_column_nameslist
Ordered list of column names (excluding the primary-key column).
- table_column_typeslist
Ordered list of MySQL type strings corresponding to
table_column_names. Must be the same length astable_column_names.- primary_keystr, optional
Name of the primary-key column. Defaults to
'time_pt'.- has_primary_keybool, optional
If
False, theprimary_keycolumn is added as a plain column rather than a PRIMARY KEY. Defaults toTrue.
- Returns:
- bool
Trueif the table was successfully created.
- Raises:
- Exception
If
table_column_namesandtable_column_typesare different lengths.
- find_missing_columns(cursor: MySQLCursor, dataframe: DataFrame, dbname: str, table_name: str)¶
Identify DataFrame columns that are absent from the database table.
If communication with the database fails, empty lists are returned so that the caller can continue without adding any columns.
- Parameters:
- cursormysql.connector.cursor.MySQLCursor
An active database cursor.
- dataframepd.DataFrame
The DataFrame whose columns are compared against the table schema.
- dbnamestr
Name of the database that contains
table_name.- table_namestr
Name of the table to inspect.
- Returns:
- list
Column names present in
dataframebut absent from the table.- list
Corresponding MySQL type strings for each missing column.
- load_database(config: ConfigManager, dataframe: DataFrame, table_name: str, dbname: str, auto_log_data_loss: bool = False, primary_key: str = 'time_pt')¶
Load a pandas DataFrame into a MySQL table using an UPSERT strategy.
Existing rows are updated rather than replaced; NULL values in the incoming DataFrame will not overwrite existing non-NULL values in the database.
- Parameters:
- configConfigManager
The ConfigManager object that holds configuration data for the pipeline.
- dataframepd.DataFrame
The pandas DataFrame to be written into the MySQL server. The index must be the primary-key column (e.g. a datetime index named
time_pt).- table_namestr
Name of the destination table in the database.
- dbnamestr
Name of the MySQL database that contains
table_name.- auto_log_data_lossbool, optional
If
True, a DATA_LOSS_COP event is recorded when no data exists in the DataFrame for the last three days, or when an exception occurs. Defaults toFalse.- primary_keystr, optional
Column name used as the primary key in the destination table. Defaults to
'time_pt'.
- Returns:
- bool
Trueif the data were successfully written;Falseotherwise.
- report_data_loss(config: ConfigManager, site_name: str = None)¶
Log a DATA_LOSS_COP event in the
site_eventstable.Records that COP calculations have been affected by a data loss condition. If an open DATA_LOSS_COP event already exists for the given site, no duplicate is inserted.
- Parameters:
- configConfigManager
The ConfigManager object that holds configuration data for the pipeline.
- site_namestr, optional
Name of the site to associate the event with. Defaults to the site name returned by
config.get_site_name().
- Returns:
- bool
Trueif the event was logged (or already existed);Falseif thesite_eventstable does not exist.
- ecopipeline.load.central_load_function(config: ConfigManager, df: DataFrame, hourly_df: DataFrame, daily_df: DataFrame, alarm_df: DataFrame)¶
Dispatch all pipeline DataFrames to their respective database tables.
Loads minute, hourly, and daily sensor data using
Loader, and alarm data usingAlarmLoader. Each DataFrame is only written when it is non-Noneand non-empty.- Parameters:
- configConfigManager
The ConfigManager object that holds configuration data for the pipeline.
- dfpd.DataFrame
Minute-resolution sensor data. May be
None.- hourly_dfpd.DataFrame
Hourly-resolution sensor data. May be
None.- daily_dfpd.DataFrame
Daily-resolution sensor data. May be
None.- alarm_dfpd.DataFrame
Alarm records produced by the transform stage. May be
None.
- ecopipeline.load.check_table_exists(cursor: MySQLCursor, table_name: str, dbname: str) int¶
Check whether a table exists in the database.
- Parameters:
- cursormysql.connector.cursor.MySQLCursor
An active database cursor.
- table_namestr
Name of the table to check.
- dbnamestr
Name of the database to search within.
- Returns:
- int
The count of tables matching
table_nameindbname. Evaluates toTruewhen non-zero, so it can be used directly as a boolean.
- ecopipeline.load.create_new_table(cursor: MySQLCursor, table_name: str, table_column_names: list, table_column_types: list, primary_key: str = 'time_pt', has_primary_key: bool = True) bool¶
Create a new table in the MySQL database.
- Parameters:
- cursormysql.connector.cursor.MySQLCursor
An active database cursor.
- table_namestr
Name of the table to create.
- table_column_nameslist
Ordered list of column names (excluding the primary-key column).
- table_column_typeslist
Ordered list of MySQL type strings corresponding to
table_column_names. Must be the same length astable_column_names.- primary_keystr, optional
Name of the primary-key column. Defaults to
'time_pt'.- has_primary_keybool, optional
If
False, theprimary_keycolumn is added as a plain column rather than a PRIMARY KEY. Defaults toTrue.
- Returns:
- bool
Trueif the table was successfully created.
- Raises:
- Exception
If
table_column_namesandtable_column_typesare different lengths.
- ecopipeline.load.load_alarms(config: ConfigManager, alarm_df: DataFrame, site_name: str = None) bool¶
Load alarm data into the
alarmandalarm_insttables.Processes the output of
central_alarm_df_creator(). For each alarm instance in the DataFrame the function:Checks whether a matching alarm record (same
site_name,alarm_type,variable_name) already exists within a three-day gap tolerance.Creates a new alarm record if none is found, or extends the date range of the nearest existing record.
Inserts alarm instances into
alarm_instusing certainty-based overlap resolution:Higher certainty new alarm: the existing instance is split around the new one so each segment retains the highest available certainty.
Lower certainty new alarm: only the non-overlapping portions of the new alarm are inserted.
Same certainty: the existing instance is extended to encompass both time periods.
- Parameters:
- configConfigManager
The ConfigManager object that holds configuration data for the pipeline.
- alarm_dfpd.DataFrame
DataFrame output from
central_alarm_df_creator(). Required columns:start_time_pt,end_time_pt,alarm_type,variable_name. Optional column:certainty(defaults to3if absent). Certainty scale:3= high,2= medium,1= low.- site_namestr, optional
Name of the site to associate alarms with. Defaults to
config.get_site_name().
- Returns:
- bool
Trueif all alarms were loaded successfully;Falseif an exception occurred (transaction is rolled back).
- Raises:
- Exception
If
alarm_dfis missing any of the required columns.- Exception
If an alarm ID cannot be retrieved after insertion.
- ecopipeline.load.load_data_statistics(config: ConfigManager, daily_stats_df: DataFrame, config_daily_indicator: str = 'day', custom_table_name: str = None)¶
Write daily data-quality statistics to the database.
The destination table is named
{daily_table_name}_statsunlesscustom_table_nameis provided.- Parameters:
- configConfigManager
The ConfigManager object that holds configuration data for the pipeline.
- daily_stats_dfpd.DataFrame
DataFrame produced by
create_data_statistics_df()inecopipeline.transform.- config_daily_indicatorstr, optional
Key used to look up the daily table name in
config.ini. Defaults to'day'.- custom_table_namestr, optional
Overrides the auto-generated
{daily_table_name}_statsdestination table name. When provided,config_daily_indicatoris only used to supplyconfig_info; it no longer determines the table name. Defaults toNone.
- Returns:
- bool
Trueif the data were successfully written;Falseotherwise.
- ecopipeline.load.load_event_table(config: ConfigManager, event_df: DataFrame, site_name: str = None)¶
Load event records into the
site_eventsMySQL table.Uses an UPSERT strategy so that existing automatically-uploaded rows can be updated while manually modified rows are left unchanged. If the DataFrame contains an
alarm_typecolumn the call is transparently redirected toload_alarms().- Parameters:
- configConfigManager
The ConfigManager object that holds configuration data for the pipeline.
- event_dfpd.DataFrame
DataFrame of events to load. The index must be
start_time_pt. Required columns:end_time_pt,event_type,event_detail. Optional column:variable_name.- site_namestr, optional
Name of the site to associate events with. Defaults to
config.get_site_name().
- Returns:
- bool
Trueif the data were successfully written;Falseif the table could not be created.
- Raises:
- Exception
If
event_dfis missing any of the required columns (end_time_pt,event_type,event_detail).
- ecopipeline.load.load_overwrite_database(config: ConfigManager, dataframe: DataFrame, config_info: dict, data_type: str, primary_key: str = 'time_pt', table_name: str = None, auto_log_data_loss: bool = False, config_key: str = 'minute')¶
Load a pandas DataFrame into a MySQL table using an UPSERT strategy.
Existing rows are updated rather than replaced; NULL values in the incoming DataFrame will not overwrite existing non-NULL values in the database.
- Parameters:
- configConfigManager
The ConfigManager object that holds configuration data for the pipeline.
- dataframepd.DataFrame
The DataFrame to be written into the MySQL server. The index must be the primary-key column (e.g. a datetime index named
time_pt).- config_infodict
Configuration dictionary for the upload, obtainable via
get_login_info(). Must contain a'database'key and a nested dict keyed bydata_typewith a'table_name'entry (used whentable_nameisNone).- data_typestr
Key within
config_infothat identifies the target table section.- primary_keystr, optional
Column name used as the primary key. Defaults to
'time_pt'.- table_namestr, optional
Overrides the table name derived from
config_info[data_type]. Defaults toNone.- auto_log_data_lossbool, optional
If
True, a DATA_LOSS_COP event is recorded when no data exists in the DataFrame for the last three days, or when an exception occurs. Defaults toFalse.- config_keystr, optional
Key in
config.inithat points to the minute-table data for the site; also used as the site name when reporting data loss. Defaults to'minute'.
- Returns:
- bool
Trueif the data were successfully written;Falseotherwise.
- ecopipeline.load.report_data_loss(config: ConfigManager, site_name: str = None)¶
Log a DATA_LOSS_COP event in the
site_eventstable.Records that COP calculations have been affected by a data loss condition. If an open DATA_LOSS_COP event already exists for the given site, no duplicate is inserted.
- Parameters:
- configConfigManager
The ConfigManager object that holds configuration data for the pipeline.
- site_namestr, optional
Name of the site to associate the event with. Defaults to the site name returned by
config.get_site_name().
- Returns:
- bool
Trueif the event was logged (or already existed);Falseif thesite_eventstable does not exist.