Extract Documentation

Extract layer for the ecopipeline data pipeline.

This package exposes all classes and functions needed to ingest raw data from files (CSV, JSON, Excel) and remote APIs into pandas.DataFrame objects ready for downstream transform and load steps.

Classes

FileProcessor

Abstract base class for file-based data processors.

APIExtractor

Abstract base class for API-based data extractors.

CSVProcessor

Generic CSV file processor.

JSONProcessor

Generic JSON file processor.

ModbusCSVProcessor

CSV processor for Modbus-format files.

DentCSVProcessor

CSV processor for Dent meter files.

FlowCSVProcessor

CSV processor for flow-meter CSV files.

MSACSVProcessor

CSV processor for MSA-format files.

EGaugeCSVProcessor

CSV processor for eGauge meter CSV exports.

SmallPlanetCSVProcessor

CSV processor for Small Planet Controls files.

ThingsBoard

API extractor for the ThingsBoard IoT platform.

Skycentrics

API extractor for the Skycentrics solar-monitoring API.

FieldManager

API extractor for the FieldPop / Field Manager API.

LiCOR

API extractor for the LI-COR Cloud sensor API.

class ecopipeline.extract.APIExtractor(config: ConfigManager, start_time: datetime = None, end_time: datetime = None, create_csv: bool = True, csv_prefix: str = '')

Bases: object

Methods

get_raw_data

raw_data_to_df

get_raw_data() DataFrame
raw_data_to_df(config: ConfigManager, startTime: datetime = None, endTime: datetime = None) DataFrame
class ecopipeline.extract.CSVProcessor(config: ConfigManager, start_time: datetime = None, end_time: datetime = None, raw_time_column: str = 'DateTime', time_column_format: str = '%Y/%m/%d %H:%M:%S', filename_date_format: str = '%Y%m%d%H%M%S', file_prefix: str = '', data_sub_dir: str = '', date_string_start_idx: int = -17, date_string_end_idx: int = -3)

Bases: FileProcessor

FileProcessor for generic CSV files with a named timestamp column.

Reads .csv files and converts the column identified by raw_time_column into a time_pt datetime index using time_column_format.

Parameters:
configConfigManager

The ConfigManager object that holds configuration data for the pipeline.

start_timedatetime, optional

Earliest filename-encoded timestamp to include.

end_timedatetime, optional

Latest filename-encoded timestamp to include (exclusive).

raw_time_columnstr, optional

Name of the column containing timestamp strings. Defaults to 'DateTime'.

time_column_formatstr, optional

datetime.strptime() format used to parse raw_time_column. Defaults to '%Y/%m/%d %H:%M:%S'.

filename_date_formatstr, optional

datetime.strftime() format for filename date comparison. Defaults to '%Y%m%d%H%M%S'.

file_prefixstr, optional

Only process files whose names begin with this prefix. Defaults to an empty string.

data_sub_dirstr, optional

Sub-directory under the configured data directory containing the files. Defaults to an empty string.

date_string_start_idxint, optional

Start index (from the end) of the date substring in the filename. Defaults to -17.

date_string_end_idxint, optional

End index (from the end) of the date substring in the filename. Defaults to -3.

Methods

extract_files(config)

Collect the full paths of files that match the processor's criteria.

extract_new(filenames)

Filter a list of filenames to those whose encoded date falls in range.

get_raw_data()

Return the concatenated raw DataFrame produced during initialisation.

raw_files_to_df(filenames)

Concatenate multiple raw files into a single DataFrame.

class ecopipeline.extract.DentCSVProcessor(config: ConfigManager, start_time: datetime = None, end_time: datetime = None, filename_date_format: str = '%Y%m%d%H%M%S', file_prefix: str = '', data_sub_dir: str = '', date_string_start_idx: int = -17, date_string_end_idx: int = -3)

Bases: FileProcessor

FileProcessor for DENT CSV files.

DENT files have 12 header rows to skip. The first data column is a throwaway index, and columns 2 and 3 are date and time strings that are combined into a time_pt datetime index. The index is floored to the minute and duplicate timestamps are averaged.

Parameters:
configConfigManager

The ConfigManager object that holds configuration data for the pipeline.

start_timedatetime, optional

Earliest filename-encoded timestamp to include.

end_timedatetime, optional

Latest filename-encoded timestamp to include (exclusive).

filename_date_formatstr, optional

datetime.strftime() format for filename date comparison. Defaults to '%Y%m%d%H%M%S'.

file_prefixstr, optional

Only process files whose names begin with this prefix. Defaults to an empty string.

data_sub_dirstr, optional

Sub-directory under the configured data directory containing the files. Defaults to an empty string.

date_string_start_idxint, optional

Start index (from the end) of the date substring in the filename. Defaults to -17.

date_string_end_idxint, optional

End index (from the end) of the date substring in the filename. Defaults to -3.

Methods

extract_files(config)

Collect the full paths of files that match the processor's criteria.

extract_new(filenames)

Filter a list of filenames to those whose encoded date falls in range.

get_raw_data()

Return the concatenated raw DataFrame produced during initialisation.

raw_files_to_df(filenames)

Concatenate multiple raw files into a single DataFrame.

class ecopipeline.extract.EGaugeCSVProcessor(config: ConfigManager, start_time: datetime = None, end_time: datetime = None, filename_date_format: str = '%Y%m%d%H%M%S', file_prefix: str = '', data_sub_dir: str = '', date_string_start_idx: int = -17, date_string_end_idx: int = -3, time_zone: str = 'US/Pacific')

Bases: FileProcessor

FileProcessor for eGauge CSV files.

eGauge files contain a 'Date & Time' Unix-epoch column (seconds). Column names are prefixed with the filename stem so data from multiple devices can be merged without collisions. After concatenation the cumulative register values are differenced to produce per-interval deltas. The index is floored to the minute and duplicate timestamps are averaged.

Parameters:
configConfigManager

The ConfigManager object that holds configuration data for the pipeline.

start_timedatetime, optional

Earliest filename-encoded timestamp to include.

end_timedatetime, optional

Latest filename-encoded timestamp to include (exclusive).

filename_date_formatstr, optional

datetime.strftime() format for filename date comparison. Defaults to '%Y%m%d%H%M%S'.

file_prefixstr, optional

Only process files whose names begin with this prefix. Defaults to an empty string.

data_sub_dirstr, optional

Sub-directory under the configured data directory containing the files. Defaults to an empty string.

date_string_start_idxint, optional

Start index (from the end) of the date substring in the filename. Defaults to -17.

date_string_end_idxint, optional

End index (from the end) of the date substring in the filename. Defaults to -3.

time_zonestr, optional

Timezone name used to convert UTC epoch timestamps to local time. Defaults to 'US/Pacific'.

Methods

extract_files(config)

Collect the full paths of files that match the processor's criteria.

extract_new(filenames)

Filter a list of filenames to those whose encoded date falls in range.

get_raw_data()

Return the concatenated raw DataFrame produced during initialisation.

raw_files_to_df(filenames)

Concatenate files and difference cumulative register values.

raw_files_to_df(filenames: list[str]) DataFrame

Concatenate files and difference cumulative register values.

Calls the parent implementation to concatenate all files, then computes row-wise differences so cumulative register values are converted into per-interval deltas. The first row is set to NaN because its delta cannot be computed.

Parameters:
filenameslist of str

Absolute file paths to read and concatenate.

Returns:
pd.DataFrame

Differenced DataFrame of interval deltas. Returns an empty DataFrame when no files could be read.

class ecopipeline.extract.FieldManager(config: ConfigManager, start_time: datetime = None, end_time: datetime = None, create_csv: bool = True, csv_prefix: str = '')

Bases: APIExtractor

APIExtractor for the FieldPop / Field Manager API.

Recursively splits the time range in half whenever the server returns a 500 ‘log size too large’ response, down to a minimum of 30-minute windows.

Parameters:
configConfigManager

The ConfigManager object that holds configuration data for the pipeline, including the FieldPop API token and device ID.

start_timedatetime, optional

The start of the data extraction window. Defaults to datetime(2000, 1, 1, 0, 0, 0) if not provided.

end_timedatetime, optional

The end of the data extraction window. Defaults to datetime.now() if not provided.

create_csvbool, optional

If True, writes the raw DataFrame to a CSV file in the configured data directory after a successful pull. Default is True.

csv_prefixstr, optional

A string prefix prepended to the generated CSV filename. Default is an empty string.

Methods

raw_data_to_df(config[, startTime, endTime])

Fetch sensor data from the FieldPop API and return it as a DataFrame.

get_raw_data

raw_data_to_df(config: ConfigManager, startTime: datetime = None, endTime: datetime = None) DataFrame

Fetch sensor data from the FieldPop API and return it as a DataFrame.

Queries the fieldpop-api/deviceDataLog endpoint for the given time range. If the server responds with a 500 error indicating that the log size is too large, the method recursively bisects the time window until each sub-window is no smaller than 30 minutes.

Parameters:
configConfigManager

The ConfigManager object used to retrieve the FieldPop API token (get_fm_token) and device ID (get_fm_device_id).

startTimedatetime, optional

Start of the query window. Defaults to datetime(2000, 1, 1, 0, 0, 0) if not provided.

endTimedatetime, optional

End of the query window. Defaults to datetime.now() if not provided.

Returns:
pd.DataFrame

A DataFrame indexed by time_pt (UTC timestamps converted to datetime) with one column per sensor, values aggregated by mean when multiple readings share the same timestamp. Returns an empty DataFrame if the request fails or an error occurs.

class ecopipeline.extract.FileProcessor(config: ConfigManager, extension: str, start_time: datetime = None, end_time: datetime = None, raw_time_column: str = 'DateTime', time_column_format: str = '%Y/%m/%d %H:%M:%S', filename_date_format: str = '%Y%m%d%H%M%S', file_prefix: str = '', data_sub_dir: str = '', date_string_start_idx: int = -17, date_string_end_idx: int = -3, round_time_index: bool = False)

Bases: object

Base class for reading raw data files into a pandas DataFrame.

On instantiation the processor collects matching files from the configured data directory, optionally filters them by date range, and concatenates them into a single DataFrame accessible via get_raw_data().

Parameters:
configConfigManager

The ConfigManager object that holds configuration data for the pipeline.

extensionstr

File extension of raw data files (e.g. ".csv", ".gz").

start_timedatetime, optional

Earliest timestamp (inclusive) used to filter files by the date encoded in their filename. Uses local time matching the data index.

end_timedatetime, optional

Latest timestamp (exclusive) used to filter files by the date encoded in their filename. Uses local time matching the data index.

raw_time_columnstr, optional

Name of the column in the raw file that contains timestamp strings. Defaults to 'DateTime'.

time_column_formatstr, optional

datetime.strptime() format string used to parse raw_time_column. Defaults to '%Y/%m/%d %H:%M:%S'.

filename_date_formatstr, optional

datetime.strftime() format string used to convert start_time and end_time to integers for filename comparison. Defaults to '%Y%m%d%H%M%S'.

file_prefixstr, optional

Only files whose names begin with this prefix are processed. Defaults to an empty string (all files).

data_sub_dirstr, optional

Sub-directory appended to the configured data directory when locating files. For example, if files live in 'path/to/data/DENT/' and the configured data directory is 'path/to/data/', pass 'DENT/'. Defaults to an empty string.

date_string_start_idxint, optional

Start index (from the end) of the date substring within each filename. Defaults to -17.

date_string_end_idxint, optional

End index (from the end) of the date substring within each filename. Defaults to -3.

round_time_indexbool, optional

If True, floor the datetime index to the minute and average any duplicate timestamps after concatenation. Defaults to False.

Methods

extract_files(extension, config[, ...])

Function takes in a file extension and subdirectory and returns a list of paths files in the directory of that type.

extract_new(startTime, filenames[, decihex, ...])

Function filters the filenames to only those equal to or newer than the date specified startTime.

extract_files(config: ConfigManager) list[str]

Collect the full paths of files that match the processor’s criteria.

Scans the configured data directory (plus any data_sub_dir) for files whose names end with the configured extension and begin with the configured prefix. When start_time is set the list is further filtered by extract_new().

Parameters:
configConfigManager

The ConfigManager object that provides the base data directory path.

Returns:
list of str

Absolute file paths for all files that satisfy the criteria.

extract_new(filenames: list[str]) list[str]

Filter a list of filenames to those whose encoded date falls in range.

The date is extracted from each filename using date_string_start_idx and date_string_end_idx, then compared as an integer against the integer representations of start_time and end_time (formatted with filename_date_format).

Parameters:
filenameslist of str

Candidate file paths to filter.

Returns:
list of str

Only the file paths whose encoded date satisfies start_time <= date < end_time (end_time bound is omitted when end_time is None).

get_raw_data() DataFrame

Return the concatenated raw DataFrame produced during initialisation.

Returns:
pd.DataFrame

DataFrame containing all raw data read from the matching files. Returns an empty DataFrame when no files were found or all reads failed.

raw_files_to_df(filenames: list[str]) DataFrame

Concatenate multiple raw files into a single DataFrame.

Calls _read_file_into_df() for each path in filenames, ignores files that are not found or raise read errors, and concatenates the results. When round_time_index is enabled the index is floored to the minute and duplicate timestamps are averaged.

Parameters:
filenameslist of str

Absolute file paths to read and concatenate.

Returns:
pd.DataFrame

Concatenated DataFrame for all successfully read files. Returns an empty DataFrame when no files could be read.

class ecopipeline.extract.FlowCSVProcessor(config: ConfigManager, start_time: datetime = None, end_time: datetime = None, filename_date_format: str = '%Y%m%d%H%M%S', file_prefix: str = '', data_sub_dir: str = '', date_string_start_idx: int = -17, date_string_end_idx: int = -3)

Bases: FileProcessor

FileProcessor for flow meter CSV files.

Flow meter files have 6 header rows to skip. The timestamp is reconstructed from individual Year, Month, Day, Hour, Minute, and Second columns into a time_pt datetime index. The index is floored to the minute and duplicate timestamps are averaged.

Parameters:
configConfigManager

The ConfigManager object that holds configuration data for the pipeline.

start_timedatetime, optional

Earliest filename-encoded timestamp to include.

end_timedatetime, optional

Latest filename-encoded timestamp to include (exclusive).

filename_date_formatstr, optional

datetime.strftime() format for filename date comparison. Defaults to '%Y%m%d%H%M%S'.

file_prefixstr, optional

Only process files whose names begin with this prefix. Defaults to an empty string.

data_sub_dirstr, optional

Sub-directory under the configured data directory containing the files. Defaults to an empty string.

date_string_start_idxint, optional

Start index (from the end) of the date substring in the filename. Defaults to -17.

date_string_end_idxint, optional

End index (from the end) of the date substring in the filename. Defaults to -3.

Methods

extract_files(config)

Collect the full paths of files that match the processor's criteria.

extract_new(filenames)

Filter a list of filenames to those whose encoded date falls in range.

get_raw_data()

Return the concatenated raw DataFrame produced during initialisation.

raw_files_to_df(filenames)

Concatenate multiple raw files into a single DataFrame.

class ecopipeline.extract.JSONProcessor(config: ConfigManager, start_time: datetime = None, end_time: datetime = None, raw_time_column: str = 'time', time_column_format: str = '%Y/%m/%d %H:%M:%S', filename_date_format: str = '%Y%m%d%H%M%S', file_prefix: str = '', data_sub_dir: str = '', date_string_start_idx: int = -17, date_string_end_idx: int = -3, zip_files: bool = True, time_zone: str = 'US/Pacific')

Bases: FileProcessor

Methods

extract_files(config)

Collect the full paths of files that match the processor's criteria.

extract_new(filenames)

Filter a list of filenames to those whose encoded date falls in range.

get_raw_data()

Return the concatenated raw DataFrame produced during initialisation.

raw_files_to_df(filenames)

Concatenate multiple raw files into a single DataFrame.

class ecopipeline.extract.LiCOR(config: ConfigManager, start_time: datetime = None, end_time: datetime = None, create_csv: bool = True, csv_prefix: str = '')

Bases: APIExtractor

APIExtractor for the LI-COR Cloud API.

Queries sensor data for the configured device between start_time and end_time. Returns a DataFrame indexed by UTC timestamp with sensor serial numbers as columns.

Parameters:
configConfigManager

The ConfigManager object that holds configuration data for the pipeline, including the LI-COR API token (config.api_token) and the device serial number (config.api_device_id).

start_timedatetime, optional

The start of the data extraction window. Defaults to 28 hours before end_time if not provided.

end_timedatetime, optional

The end of the data extraction window. Defaults to datetime.now() if not provided.

create_csvbool, optional

If True, writes the raw DataFrame to a CSV file in the configured data directory after a successful pull. Default is True.

csv_prefixstr, optional

A string prefix prepended to the generated CSV filename. Default is an empty string.

Methods

raw_data_to_df(config[, startTime, endTime])

Fetch sensor data from the LI-COR Cloud API and return it as a DataFrame.

get_raw_data

raw_data_to_df(config: ConfigManager, startTime: datetime = None, endTime: datetime = None) DataFrame

Fetch sensor data from the LI-COR Cloud API and return it as a DataFrame.

Calls the /v2/data endpoint, iterates over each sensor returned in the response, and assembles a wide-format DataFrame keyed by millisecond-precision UTC timestamps.

Parameters:
configConfigManager

The ConfigManager object used to retrieve config.api_token for the Bearer authorisation header and config.api_device_id for the device serial number query parameter.

startTimedatetime, optional

Start of the query window. Defaults to 28 hours before endTime if not provided.

endTimedatetime, optional

End of the query window. Defaults to datetime.now() if not provided.

Returns:
pd.DataFrame

A DataFrame indexed by UTC datetime (millisecond precision) with one column per sensor serial number. Non-numeric values are coerced to None via _get_float_value(). Returns an empty DataFrame if the request fails or an error occurs.

class ecopipeline.extract.MSACSVProcessor(config: ConfigManager, start_time: datetime = None, end_time: datetime = None, filename_date_format: str = '%Y%m%d%H%M%S', file_prefix: str = '', data_sub_dir: str = '', date_string_start_idx: int = -17, date_string_end_idx: int = -3, mb_prefix: bool = False, time_zone: str = 'US/Pacific')

Bases: FileProcessor

FileProcessor for MSA CSV files.

MSA files contain a 'DateEpoch(secs)' Unix-epoch column that is converted from UTC to the configured timezone. When mb_prefix is True, all data columns are prefixed with the filename stem so that data from multiple devices can be merged without column collisions. The index is floored to the minute and duplicate timestamps are averaged.

Parameters:
configConfigManager

The ConfigManager object that holds configuration data for the pipeline.

start_timedatetime, optional

Earliest filename-encoded timestamp to include.

end_timedatetime, optional

Latest filename-encoded timestamp to include (exclusive).

filename_date_formatstr, optional

datetime.strftime() format for filename date comparison. Defaults to '%Y%m%d%H%M%S'.

file_prefixstr, optional

Only process files whose names begin with this prefix. Defaults to an empty string.

data_sub_dirstr, optional

Sub-directory under the configured data directory containing the files. Defaults to an empty string.

date_string_start_idxint, optional

Start index (from the end) of the date substring in the filename. Defaults to -17.

date_string_end_idxint, optional

End index (from the end) of the date substring in the filename. Defaults to -3.

mb_prefixbool, optional

If True, prefix every column name with the filename stem. Defaults to False.

time_zonestr, optional

Timezone name used to convert UTC epoch timestamps to local time. Defaults to 'US/Pacific'.

Methods

extract_files(config)

Collect the full paths of files that match the processor's criteria.

extract_new(filenames)

Filter a list of filenames to those whose encoded date falls in range.

get_raw_data()

Return the concatenated raw DataFrame produced during initialisation.

raw_files_to_df(filenames)

Concatenate multiple raw files into a single DataFrame.

class ecopipeline.extract.ModbusCSVProcessor(config: ConfigManager, start_time: datetime = None, end_time: datetime = None, raw_time_column: str = 'time(UTC)', filename_date_format: str = '%Y%m%d%H%M%S', file_prefix: str = '', data_sub_dir: str = '', date_string_start_idx: int = -17, date_string_end_idx: int = -3)

Bases: FileProcessor

FileProcessor for Modbus CSV files (e.g. Acquisuite).

Reads standard CSV files whose first column is a time(UTC) timestamp string. All data columns are prefixed with the filename stem so that data from multiple Modbus devices can be merged without column name collisions. The index is floored to the minute and duplicate timestamps are averaged after concatenation.

Acquisuite filenames encode the file start time as a hexadecimal Unix timestamp in the filename, so extract_new() overrides the default date-substring comparison with a hex-to-datetime conversion.

Parameters:
configConfigManager

The ConfigManager object that holds configuration data for the pipeline.

start_timedatetime, optional

Earliest local timestamp to include (compared against the hex date embedded in the filename).

end_timedatetime, optional

Latest local timestamp to include, exclusive (compared against the hex date embedded in the filename).

raw_time_columnstr, optional

Name of the timestamp column in each CSV file. Defaults to 'time(UTC)'.

filename_date_formatstr, optional

datetime.strftime() format used when comparing non-hex filenames. Defaults to '%Y%m%d%H%M%S'.

file_prefixstr, optional

Only process files whose names begin with this prefix. Defaults to an empty string.

data_sub_dirstr, optional

Sub-directory under the configured data directory containing the files. Defaults to an empty string.

date_string_start_idxint, optional

Start index (from the end) of the date substring in the filename. Defaults to -17.

date_string_end_idxint, optional

End index (from the end) of the date substring in the filename. Defaults to -3.

Methods

extract_files(config)

Collect the full paths of files that match the processor's criteria.

extract_new(filenames)

Filter filenames using the hexadecimal Unix timestamp in the filename.

get_raw_data()

Return the concatenated raw DataFrame produced during initialisation.

raw_files_to_df(filenames)

Concatenate multiple raw files into a single DataFrame.

extract_new(filenames: list[str]) list[str]

Filter filenames using the hexadecimal Unix timestamp in the filename.

Acquisuite filenames embed the file start time as a hexadecimal number between the first . and the first _ in the basename. This method decodes those hex timestamps to UTC datetimes, strips timezone awareness, and keeps only the files whose decoded time falls within [start_time, end_time).

Parameters:
filenameslist of str

Candidate file paths to filter.

Returns:
list of str

Only the file paths whose decoded hex timestamp satisfies start_time < local_time and, when end_time is set, local_time < end_time.

class ecopipeline.extract.Skycentrics(config: ConfigManager, start_time: datetime = None, end_time: datetime = None, create_csv: bool = True, csv_prefix: str = '', time_zone: str = 'US/Pacific')

Bases: APIExtractor

APIExtractor for the Skycentrics API.

Pulls data day-by-day between start_time and end_time, normalises the JSON sensor records into a pivot table, and rounds 59:59 timestamps up to the next minute.

Parameters:
configConfigManager

The ConfigManager object that holds configuration data for the pipeline, including the Skycentrics API token (config.get_skycentrics_token) and device ID (config.api_device_id).

start_timedatetime, optional

The start of the data extraction window. Defaults to one day before end_time if not provided.

end_timedatetime, optional

The end of the data extraction window. Defaults to datetime.utcnow() if not provided.

create_csvbool, optional

If True, writes the raw DataFrame to a CSV file in the configured data directory after a successful pull. Default is True.

csv_prefixstr, optional

A string prefix prepended to the generated CSV filename. Default is an empty string.

time_zonestr, optional

The timezone string used to localise timestamps after converting from UTC. Default is 'US/Pacific'.

Methods

raw_data_to_df(config[, startTime, endTime])

Fetch sensor data from the Skycentrics API and return it as a DataFrame.

get_raw_data

raw_data_to_df(config: ConfigManager, startTime: datetime = None, endTime: datetime = None) DataFrame

Fetch sensor data from the Skycentrics API and return it as a DataFrame.

Iterates day-by-day from startTime to endTime, issuing one authenticated GET request per day. Each gzip-compressed JSON response is decompressed, normalised with pandas.json_normalize(), and pivoted so that sensor IDs become columns. Timestamps that fall at second 59:59 of a minute are nudged forward by one second to align with the top of the next minute.

Parameters:
configConfigManager

The ConfigManager object used to retrieve the per-request Skycentrics HMAC token via config.get_skycentrics_token and the device ID via config.api_device_id.

startTimedatetime, optional

Start of the query window (UTC). Defaults to one day before endTime if not provided.

endTimedatetime, optional

End of the query window (UTC). Defaults to datetime.utcnow() if not provided.

Returns:
pd.DataFrame

A DataFrame indexed by time_pt (timezone-aware timestamps converted to self.time_zone) with one column per sensor ID. Returns an empty DataFrame if no data is retrieved for the requested time frame.

class ecopipeline.extract.SmallPlanetCSVProcessor(config: ConfigManager, start_time: datetime = None, end_time: datetime = None, filename_date_format: str = '%Y%m%d%H%M%S', file_prefix: str = '', data_sub_dir: str = '', date_string_start_idx: int = -17, date_string_end_idx: int = -3, site: str = '', system: str = '', time_zone: str = 'US/Pacific')

Bases: FileProcessor

FileProcessor for Small Planet Controls CSV files.

Similar to MSACSVProcessor but applies variable-name mapping from Variable_Names.csv at read time: column names are first prefixed with the filename stem, then renamed through the alias-to-true-name mapping, and finally any columns that still carry an alias name or are absent from the true-name list are dropped. Only-NaN rows are also removed. The index is floored to the minute and duplicate timestamps are averaged.

Parameters:
configConfigManager

The ConfigManager object that holds configuration data for the pipeline. Must provide a path to Variable_Names.csv via get_var_names_path().

start_timedatetime, optional

Earliest filename-encoded timestamp to include.

end_timedatetime, optional

Latest filename-encoded timestamp to include (exclusive).

filename_date_formatstr, optional

datetime.strftime() format for filename date comparison. Defaults to '%Y%m%d%H%M%S'.

file_prefixstr, optional

Only process files whose names begin with this prefix. Defaults to an empty string.

data_sub_dirstr, optional

Sub-directory under the configured data directory containing the files. Defaults to an empty string.

date_string_start_idxint, optional

Start index (from the end) of the date substring in the filename. Defaults to -17.

date_string_end_idxint, optional

End index (from the end) of the date substring in the filename. Defaults to -3.

sitestr, optional

If non-empty, only variable-name rows whose site column matches this value are used for column mapping. Defaults to an empty string.

systemstr, optional

If non-empty, only variable-name rows whose system column contains this substring are used for column mapping. Defaults to an empty string.

time_zonestr, optional

Timezone name used to convert UTC epoch timestamps to local time. Defaults to 'US/Pacific'.

Raises:
Exception

If the Variable_Names.csv file cannot be found at the path returned by config.get_var_names_path().

Methods

extract_files(config)

Collect the full paths of files that match the processor's criteria.

extract_new(filenames)

Filter a list of filenames to those whose encoded date falls in range.

get_raw_data()

Return the concatenated raw DataFrame produced during initialisation.

raw_files_to_df(filenames)

Concatenate multiple raw files into a single DataFrame.

class ecopipeline.extract.ThingsBoard(config: ConfigManager, start_time: datetime = None, end_time: datetime = None, create_csv: bool = True)

Bases: APIExtractor

Methods

get_raw_data

raw_data_to_df

raw_data_to_df(config: ConfigManager, startTime: datetime = None, endTime: datetime = None) DataFrame
ecopipeline.extract.central_extract_function(config: ConfigManager, process_type: str, start_time: datetime = None, end_time: datetime = None, use_defaults: bool = True, raw_time_column: str = 'DateTime', time_column_format: str = '%Y/%m/%d %H:%M:%S', filename_date_format: str = '%Y%m%d%H%M%S', file_prefix: str = '', data_sub_dir: str = '', date_string_start_idx: int = -17, date_string_end_idx: int = -3, time_zone: str = 'America/Los_Angeles', site: str = '', system: str = '') [<class 'pandas.core.frame.DataFrame'>, <class 'pandas.core.frame.DataFrame'>]

Primary entry point for the extract stage of the data pipeline.

Dispatches to the appropriate file-based or API-based extractor based on process_type, reads raw sensor data into a DataFrame, and then fetches matching outdoor air temperature (OAT) weather data from Open Meteo for the same time window. When start_time is None the function queries the database for the timestamp of the last available minute record and extracts only data newer than that point (normal run mode). When start_time is provided explicitly the function performs a reprocess run, reading from already-saved CSV files for API-based process types instead of hitting the remote API again.

Parameters:
configConfigManager

Configuration object that stores database credentials, directory paths, API tokens, and site metadata used throughout the pipeline.

process_typestr

Identifier for the extraction method. Accepted values are "csv", "csv_mb", "csv_dent", "csv_flow", "csv_msa", "csv_egauge", "csv_small_planet", "json", "api_tb", "api_skycentrics", "api_fm", and "api_licor".

start_timedatetime, optional

Inclusive start of the extraction window in local time. If None the start time is derived from get_last_full_day_from_db().

end_timedatetime, optional

Exclusive end of the extraction window in local time. If None data is extracted up to the most recent available record.

use_defaultsbool, optional

When True (default) the function overwrites raw_time_column and time_column_format with the standard defaults for process_type via _get_time_indicator_defaults().

raw_time_columnstr, optional

Name of the timestamp column in raw data files. Default is 'DateTime'. Ignored when use_defaults is True.

time_column_formatstr, optional

strptime-compatible format string for parsing raw_time_column. Default is '%Y/%m/%d %H:%M:%S'. Ignored when use_defaults is True.

filename_date_formatstr, optional

strftime-compatible format string used to parse dates embedded in raw data filenames. Default is "%Y%m%d%H%M%S".

file_prefixstr, optional

Optional prefix used to filter which files in the data directory are ingested. Default is "".

data_sub_dirstr, optional

Sub-directory path appended to the configured data directory when locating raw data files (e.g. "DENT/"). Default is "".

date_string_start_idxint, optional

Character index marking the start of the date substring within each filename. Default is -17.

date_string_end_idxint, optional

Character index marking the end of the date substring within each filename. Default is -3.

time_zonestr, optional

IANA time-zone name used to localise timestamps in the returned DataFrames (e.g. "America/Los_Angeles"). Default is "America/Los_Angeles".

sitestr, optional

Site identifier passed to SmallPlanetCSVProcessor to filter data to a single site when a file contains data for multiple sites. Default is "".

systemstr, optional

System identifier passed to SmallPlanetCSVProcessor to filter data to a single system. Default is "".

Returns:
pd.DataFrame

Raw sensor data indexed by timestamp. Empty if no data was found for the requested time window.

pd.DataFrame

Hourly outdoor air temperature data from Open Meteo indexed by timestamp, covering the same time window as the raw data. Empty if the raw DataFrame is empty or if the weather request fails.

Raises:
Exception

If process_type is not one of the recognised extraction method strings.

ecopipeline.extract.csv_to_df(csv_filenames: List[str], mb_prefix: bool = False, round_time_index: bool = True, create_time_pt_idx: bool = False, original_time_columns: str = 'DateTime', time_format: str = '%Y/%m/%d %H:%M:%S') DataFrame

Function takes a list of csv filenames and reads all files into a singular dataframe. Use this for aquisuite data.

Deprecated since version Use: central_extract_function() instead.

Parameters:
csv_filenames: List[str]

List of filenames to be processed into a single dataframe

mb_prefix: bool

A boolean that signifys if the data is in modbus form- if set to true, will prepend modbus prefix to each raw varriable name

round_time_index: bool

A boolean that signifys if the dataframe timestamp indexes should be rounded down to the nearest minute. Should be set to False if there is no column in the data frame called ‘time(UTC)’ to index on. Defaults to True.

create_time_pt_idx: bool

set to true if there is a time column in the csv that you wish to convert to a ‘time_pt’ index. False otherwise defaults to false.

original_time_columnsstr

The name of the time column in the raw datafiles. defaults to ‘DateTime’. Only used if create_time_pt_idx is True

Returns:
pd.DataFrame:

Pandas Dataframe containing data from all files with column headers the same as the variable names in the files (with prepended modbus prefix if mb_prefix = True)

ecopipeline.extract.dent_csv_to_df(csv_filenames: List[str], round_time_index: bool = True) DataFrame

Function takes a list of csv filenames and reads all files into a singular dataframe. Use this for aquisuite data.

Deprecated since version Use: central_extract_function() instead.

Parameters:
csv_filenames: List[str]

List of filenames to be processed into a single dataframe

round_time_index: bool

A boolean that signifys if the dataframe timestamp indexes should be rounded down to the nearest minute. Should be set to False if there is no column in the data frame called ‘time(UTC)’ to index on. Defaults to True.

Returns:
pd.DataFrame:

Pandas Dataframe containing data from all files with column headers the same as the variable names in the files (with prepended modbus prefix if mb_prefix = True)

ecopipeline.extract.egauge_csv_to_df(csv_filenames: List[str]) DataFrame

Function takes a list of csv filenames and reads all files into a singular dataframe. Use this for small planet control data.

Deprecated since version Use: central_extract_function() instead.

This data will have variable names equal variable_name column is Variable_Names.csv so you will not need to use the rename_sensors function afterwards.

Parameters:
csv_filenamesList[str]

List of filenames

Returns:
pd.DataFrame:

Pandas Dataframe containing data from all files

ecopipeline.extract.excel_to_csv(excel_folder_path: str, csv_folder_path: str, excel_date_format: str = '%m/%d/%Y %I:%M:%S %p')

Combines all Excel files (.xlsx or .xls) in a folder into a single CSV file sorted by timestamp. The output CSV is named after the earliest timestamp found across all files.

Expects each Excel file to contain a ‘Time stamp’ column in the format excel_date_format. All files are concatenated, sorted by ‘Time stamp’, and written to a single CSV in csv_folder_path.

Parameters:
excel_folder_pathstr

Path to the folder containing the Excel files to combine.

csv_folder_pathstr

Path to the folder where the output CSV file will be written. The output filename is derived from the earliest timestamp (e.g., ‘20220101010000.csv’).

excel_date_formatstr

expected format of ‘Time stamp’ column in excel doc

ecopipeline.extract.extract_files(extension: str, config: ConfigManager, data_sub_dir: str = '', file_prefix: str = '') List[str]

Function takes in a file extension and subdirectory and returns a list of paths files in the directory of that type.

Deprecated since version Use: central_extract_function() instead.

Parameters:
extensionstr

File extension of raw data files as string (e.g. “.csv”, “.gz”, …)

configecopipeline.ConfigManager

The ConfigManager object that holds configuration data for the pipeline

data_sub_dirstr

defaults to an empty string. If the files being accessed are in a sub directory of the configured data directory, use this parameter to point there. e.g. if the data files you want to extract are in “path/to/data/DENT/” and your configured data directory is “path/to/data/”, put “DENT/” as the data_sub_dir

file_prefixstr

File name prefix of raw data file if only file names with a certain prefix should be processed.

Returns:
List[str]:

List of filenames

ecopipeline.extract.extract_new(startTime: datetime, filenames: List[str], decihex=False, timeZone: str = None, endTime: datetime = None, dateStringStartIdx: int = -17, dateStringEndIdx: int = -3, dateFormat: str = '%Y%m%d%H%M%S', epochFormat: bool = False) List[str]

Function filters the filenames to only those equal to or newer than the date specified startTime.

Deprecated since version Use: central_extract_function() instead.

If filenames are in deciheximal, The function can still handel it. Note that for some projects, files are dropped at irregular intervals so data cannot be filtered by exact date.

Currently, this function expects file names to be in one of three formats:

  1. default (set decihex = False) format assumes file names are in format such that characters [-17,-3] in the file names string

    are the files date in the form “%Y%m%d%H%M%S”

  2. deciheximal (set decihex = True) format assumes file names are in format such there is a deciheximal value between a ‘.’ and ‘_’ character in each filename string

    that has a deciheximal value equal to the number of seconds since January 1, 1970 to represent the timestamp of the data in the file.

  3. custom format is the same as default format but uses a custom date format with the dateFormat parameter and expects the date to be characters [dateStringStartIdx,dateStringEndIdx]

Parameters:
startTime: datetime

The point in time for which we want to start the data extraction from. This is local time from the data’s index.

filenames: List[str]

List of filenames to be filtered by those equal to or newer than startTime

decihex: bool

Defaults to False. Set to True if filenames contain date of data in deciheximal format

timeZone: str

The timezone for the indexes in the output dataframe as a string. Must be a string recognized as a time stamp by the pandas tz_localize() function https://pandas.pydata.org/docs/reference/api/pandas.Series.tz_localize.html defaults to None

dateStringStartIdx: int

The character index in each file where the date in format starts. Default is -17 (meaning 17 characters from the end of the filename string)

dateStringEndIdx: int

The character index in each file where the date in format ends. Default is -3 (meaning 3 characters from the end of the filename string)

Returns:
List[str]:

Filtered list of filenames

ecopipeline.extract.flow_csv_to_df(csv_filenames: List[str], round_time_index: bool = True) DataFrame

Function takes a list of csv filenames and reads all files into a singular dataframe. Use this for aquisuite data.

Deprecated since version Use: central_extract_function() instead.

Parameters:
csv_filenames: List[str]

List of filenames to be processed into a single dataframe

round_time_index: bool

A boolean that signifys if the dataframe timestamp indexes should be rounded down to the nearest minute. Should be set to False if there is no column in the data frame called ‘time(UTC)’ to index on. Defaults to True.

Returns:
pd.DataFrame:

Pandas Dataframe containing data from all files with column headers the same as the variable names in the files (with prepended modbus prefix if mb_prefix = True)

ecopipeline.extract.fm_api_to_df(config: ConfigManager, startTime: datetime = None, endTime: datetime = None, create_csv: bool = True) DataFrame

Function connects to the field manager api to pull data and returns a dataframe.

Deprecated since version Use: central_extract_function() instead.

Parameters:
configecopipeline.ConfigManager

The ConfigManager object that holds configuration data for the pipeline. The config manager must contain information to connect to the api, i.e. the api user name and password as well as the device id for the device the data is being pulled from.

startTime: datetime

The point in time for which we want to start the data extraction from. This is local time from the data’s index.

endTime: datetime

The point in time for which we want to end the data extraction. This is local time from the data’s index.

create_csvbool

create csv files as you process such that API need not be relied upon for reprocessing

Returns:
pd.DataFrame:

Pandas Dataframe containing data from the API pull with column headers the same as the variable names in the data from the pull

ecopipeline.extract.get_OAT_open_meteo(lat: float, long: float, start_date: datetime, end_date: datetime = None, time_zone: str = 'America/Los_Angeles', use_noaa_names: bool = True) DataFrame
ecopipeline.extract.get_db_row_from_time(time: datetime, config: ConfigManager) DataFrame

Extracts a row from the applicable minute table in the database for the given datetime or returns empty dataframe if none exists

Parameters:
timedatetime

The time index to get the row from

configecopipeline.ConfigManager

The ConfigManager object that holds configuration data for the pipeline

Returns:
pd.DataFrame:

Pandas Dataframe containing the row or empty if no row exists for the timestamp

ecopipeline.extract.get_last_full_day_from_db(config: ConfigManager, table_identifier: str = 'minute') datetime

Retrieve the timestamp of the last fully recorded minute from the database.

Queries the table identified by table_identifier in the pipeline database and returns the most recent timestamp. If the most recent row does not fall exactly at 23:59, the function assumes the day is incomplete and rolls back to the previous day at 23:59:00. If the table is empty or the query fails, a default datetime of 2000-01-09 23:59:00 US/Pacific is returned.

Parameters:
configConfigManager

Configuration object that provides database connection details and table metadata for the pipeline.

table_identifierstr, optional

Key used to look up the target table name in the pipeline config. Default is "minute".

Returns:
datetime

End of the last fully populated day found in the database, or a default past datetime if no data is available.

ecopipeline.extract.get_noaa_data(station_names: List[str], config: ConfigManager, station_ids: dict = {}) dict

Function will take in a list of station names and will return a dictionary where the key is the station name and the value is a dataframe with the parsed weather data.

Parameters:
station_namesList[str]

List of Station Names

configecopipeline.ConfigManager

The ConfigManager object that holds configuration data for the pipeline

Returns:
dict:

Dictionary with key as Station Name and Value as DF of Parsed Weather Data

ecopipeline.extract.get_sub_dirs(dir: str) List[str]

Function takes in a directory and returns a list of the paths to all immediate subfolders in that directory. This is used when multiple sites are being ran in same pipeline.

Parameters:
dirstr

Directory as a string.

Returns:
List[str]:

List of paths to subfolders.

ecopipeline.extract.json_to_df(json_filenames: List[str], time_zone: str = 'US/Pacific') DataFrame

Function takes a list of gz/json filenames and reads all files into a singular dataframe.

Deprecated since version Use: central_extract_function() instead.

Parameters:
json_filenames: List[str]

List of filenames to be processed into a single dataframe

time_zone: str

The timezone for the indexes in the output dataframe as a string. Must be a string recognized as a time stamp by the pandas tz_localize() function https://pandas.pydata.org/docs/reference/api/pandas.Series.tz_localize.html defaults to ‘US/Pacific’

Returns:
pd.DataFrame:

Pandas Dataframe containing data from all files with column headers the same as the variable names in the files

ecopipeline.extract.licor_cloud_api_to_df(config: ConfigManager, startTime: datetime = None, endTime: datetime = None, create_csv: bool = True) DataFrame

Connects to the LI-COR Cloud API to pull sensor data and returns a dataframe.

Deprecated since version Use: central_extract_function() instead.

The function queries the LI-COR Cloud API for sensor data within the specified time range. Each sensor’s data is returned as a separate column in the dataframe, indexed by timestamp.

Parameters:
configecopipeline.ConfigManager

The ConfigManager object that holds configuration data for the pipeline. The config manager must contain the api_token and api_device_id (device serial number) for authentication with the LI-COR Cloud API.

startTimedatetime

The start time for data extraction. If None, defaults to 28 hours before endTime.

endTimedatetime

The end time for data extraction. If None, defaults to the current time.

create_csvbool

If True, saves the extracted data to a CSV file in the data directory (default True).

Returns:
pd.DataFrame:

Pandas DataFrame with sensor serial numbers as column headers and timestamps as the index. The index is in UTC and may need to be converted to the appropriate timezone. Returns an empty DataFrame if the API call fails.

ecopipeline.extract.msa_to_df(csv_filenames: List[str], mb_prefix: bool = False, time_zone: str = 'US/Pacific') DataFrame

Function takes a list of csv filenames and reads all files into a singular dataframe. Use this for MSA data.

Deprecated since version Use: central_extract_function() instead.

Parameters:
csv_filenamesList[str]

List of filenames

mb_prefixbool

signifys in modbus form- if set to true, will append modbus prefix to each raw varriable

timezonestr

local timezone, default is pacific

Returns:
pd.DataFrame:

Pandas Dataframe containing data from all files

ecopipeline.extract.pull_egauge_data(config: ConfigManager, eGauge_ids: list, eGauge_usr: str, eGauge_pw: str, num_days: int = 2)

Download raw eGauge data files from eGauge devices via wget.

Deprecated since version Use: central_extract_function() instead.

ecopipeline.extract.remove_char_sequence_from_csv_header(csv_filenames: List[str], header_sequences_to_remove: List[str] = [])

Function to remove special characters that can’t be processed by pandas pd.read_csv function from csv headers

Parameters:
csv_filenames: List[str]

List of filenames to be processed into a single dataframe

header_sequences_to_remove: List[str]

List of special character sequences to remove from column headers

ecopipeline.extract.skycentrics_api_to_df(config: ConfigManager, startTime: datetime = None, endTime: datetime = None, create_csv: bool = True, time_zone: str = 'US/Pacific')

Function connects to the field manager api to pull data and returns a dataframe.

Deprecated since version Use: central_extract_function() instead.

Parameters:
configecopipeline.ConfigManager

The ConfigManager object that holds configuration data for the pipeline. The config manager must contain information to connect to the api, i.e. the api user name and password as well as the device id for the device the data is being pulled from.

startTime: datetime

The point in time for which we want to start the data extraction from. This is local time from the data’s index.

endTime: datetime

The point in time for which we want to end the data extraction. This is local time from the data’s index.

create_csvbool

create csv files as you process such that API need not be relied upon for reprocessing

time_zone: str

The timezone for the indexes in the output dataframe as a string. Must be a string recognized as a time stamp by the pandas tz_localize() function https://pandas.pydata.org/docs/reference/api/pandas.Series.tz_localize.html defaults to ‘US/Pacific’

Returns:
pd.DataFrame:

Pandas Dataframe containing data from the API pull with column headers the same as the variable names in the data from the pull

ecopipeline.extract.small_planet_control_to_df(config: ConfigManager, csv_filenames: List[str], site: str = '', system: str = '') DataFrame

Function takes a list of csv filenames and reads all files into a singular dataframe. Use this for small planet control data.

Deprecated since version Use: central_extract_function() instead.

This data will have variable names equal variable_name column is Variable_Names.csv so you will not need to use the rename_sensors function afterwards.

Parameters:
configecopipeline.ConfigManager

The ConfigManager object that holds configuration data for the pipeline. Among other things, this object will point to a file called Varriable_Names.csv in the input folder of the pipeline (e.g. “full/path/to/pipeline/input/Variable_Names.csv”) The csv this points to should have at least 2 columns called “variable_alias” (the raw name to be changed from) and “variable_name” (the name to be changed to). All columns without a cooresponding variable_name will be dropped from the dataframe.

csv_filenamesList[str]

List of filenames

site: str

If the pipeline is processing data for a particular site with a dataframe that contains data from multiple sites that need to be prossessed seperatly, fill in this optional varriable to drop data from all other sites in the returned dataframe. Appropriate varriables in your Variable_Names.csv must have a matching substring to this varriable in a column called “site”.

system: str

If the pipeline is processing data for a particular system with a dataframe that contains data from multiple systems that need to be prossessed seperatly, fill in this optional varriable to drop data from all other systems in the returned dataframe. Appropriate varriables in your Variable_Names.csv must have a matching string to this varriable in a column called “system”

Returns:
pd.DataFrame:

Pandas Dataframe containing data from all files

ecopipeline.extract.tb_api_to_df(config: ConfigManager, startTime: datetime = None, endTime: datetime = None, create_csv: bool = True, query_hours: float = 1, sensor_keys: list = [], seperate_keys: bool = False, device_id_overwrite: str = None, csv_prefix: str = '')

Function connects to the things board manager api to pull data and returns a dataframe.

Deprecated since version Use: central_extract_function() instead.

Parameters:
configecopipeline.ConfigManager

The ConfigManager object that holds configuration data for the pipeline. The config manager must contain information to connect to the api, i.e. the api user name and password as well as the device id for the device the data is being pulled from.

startTime: datetime

The point in time for which we want to start the data extraction from. This is local time from the data’s index.

endTime: datetime

The point in time for which we want to end the data extraction. This is local time from the data’s index.

create_csvbool

create csv files as you process such that API need not be relied upon for reprocessing

query_hoursfloat

number of hours to query at a time from ThingsBoard API

device_id_overwritestr

Overwrites device ID for API pull

csv_prefixstr

prefix to add to the csv title

Returns:
pd.DataFrame:

Pandas Dataframe containing data from the API pull with column headers the same as the variable names in the data from the pull. Will return with index in UTC so needs to be converted after to appropriate timezone