Extract Documentation¶
Extract layer for the ecopipeline data pipeline.
This package exposes all classes and functions needed to ingest raw data from
files (CSV, JSON, Excel) and remote APIs into pandas.DataFrame objects
ready for downstream transform and load steps.
Classes¶
- FileProcessor
Abstract base class for file-based data processors.
- APIExtractor
Abstract base class for API-based data extractors.
- CSVProcessor
Generic CSV file processor.
- JSONProcessor
Generic JSON file processor.
- ModbusCSVProcessor
CSV processor for Modbus-format files.
- DentCSVProcessor
CSV processor for Dent meter files.
- FlowCSVProcessor
CSV processor for flow-meter CSV files.
- MSACSVProcessor
CSV processor for MSA-format files.
- EGaugeCSVProcessor
CSV processor for eGauge meter CSV exports.
- SmallPlanetCSVProcessor
CSV processor for Small Planet Controls files.
- ThingsBoard
API extractor for the ThingsBoard IoT platform.
- Skycentrics
API extractor for the Skycentrics solar-monitoring API.
- FieldManager
API extractor for the FieldPop / Field Manager API.
- LiCOR
API extractor for the LI-COR Cloud sensor API.
- class ecopipeline.extract.APIExtractor(config: ConfigManager, start_time: datetime = None, end_time: datetime = None, create_csv: bool = True, csv_prefix: str = '')¶
Bases:
objectMethods
get_raw_data
raw_data_to_df
- get_raw_data() DataFrame¶
- raw_data_to_df(config: ConfigManager, startTime: datetime = None, endTime: datetime = None) DataFrame¶
- class ecopipeline.extract.CSVProcessor(config: ConfigManager, start_time: datetime = None, end_time: datetime = None, raw_time_column: str = 'DateTime', time_column_format: str = '%Y/%m/%d %H:%M:%S', filename_date_format: str = '%Y%m%d%H%M%S', file_prefix: str = '', data_sub_dir: str = '', date_string_start_idx: int = -17, date_string_end_idx: int = -3)¶
Bases:
FileProcessorFileProcessor for generic CSV files with a named timestamp column.
Reads
.csvfiles and converts the column identified byraw_time_columninto atime_ptdatetime index usingtime_column_format.- Parameters:
- configConfigManager
The ConfigManager object that holds configuration data for the pipeline.
- start_timedatetime, optional
Earliest filename-encoded timestamp to include.
- end_timedatetime, optional
Latest filename-encoded timestamp to include (exclusive).
- raw_time_columnstr, optional
Name of the column containing timestamp strings. Defaults to
'DateTime'.- time_column_formatstr, optional
datetime.strptime()format used to parseraw_time_column. Defaults to'%Y/%m/%d %H:%M:%S'.- filename_date_formatstr, optional
datetime.strftime()format for filename date comparison. Defaults to'%Y%m%d%H%M%S'.- file_prefixstr, optional
Only process files whose names begin with this prefix. Defaults to an empty string.
- data_sub_dirstr, optional
Sub-directory under the configured data directory containing the files. Defaults to an empty string.
- date_string_start_idxint, optional
Start index (from the end) of the date substring in the filename. Defaults to
-17.- date_string_end_idxint, optional
End index (from the end) of the date substring in the filename. Defaults to
-3.
Methods
extract_files(config)Collect the full paths of files that match the processor's criteria.
extract_new(filenames)Filter a list of filenames to those whose encoded date falls in range.
get_raw_data()Return the concatenated raw DataFrame produced during initialisation.
raw_files_to_df(filenames)Concatenate multiple raw files into a single DataFrame.
- class ecopipeline.extract.DentCSVProcessor(config: ConfigManager, start_time: datetime = None, end_time: datetime = None, filename_date_format: str = '%Y%m%d%H%M%S', file_prefix: str = '', data_sub_dir: str = '', date_string_start_idx: int = -17, date_string_end_idx: int = -3)¶
Bases:
FileProcessorFileProcessor for DENT CSV files.
DENT files have 12 header rows to skip. The first data column is a throwaway index, and columns 2 and 3 are date and time strings that are combined into a
time_ptdatetime index. The index is floored to the minute and duplicate timestamps are averaged.- Parameters:
- configConfigManager
The ConfigManager object that holds configuration data for the pipeline.
- start_timedatetime, optional
Earliest filename-encoded timestamp to include.
- end_timedatetime, optional
Latest filename-encoded timestamp to include (exclusive).
- filename_date_formatstr, optional
datetime.strftime()format for filename date comparison. Defaults to'%Y%m%d%H%M%S'.- file_prefixstr, optional
Only process files whose names begin with this prefix. Defaults to an empty string.
- data_sub_dirstr, optional
Sub-directory under the configured data directory containing the files. Defaults to an empty string.
- date_string_start_idxint, optional
Start index (from the end) of the date substring in the filename. Defaults to
-17.- date_string_end_idxint, optional
End index (from the end) of the date substring in the filename. Defaults to
-3.
Methods
extract_files(config)Collect the full paths of files that match the processor's criteria.
extract_new(filenames)Filter a list of filenames to those whose encoded date falls in range.
get_raw_data()Return the concatenated raw DataFrame produced during initialisation.
raw_files_to_df(filenames)Concatenate multiple raw files into a single DataFrame.
- class ecopipeline.extract.EGaugeCSVProcessor(config: ConfigManager, start_time: datetime = None, end_time: datetime = None, filename_date_format: str = '%Y%m%d%H%M%S', file_prefix: str = '', data_sub_dir: str = '', date_string_start_idx: int = -17, date_string_end_idx: int = -3, time_zone: str = 'US/Pacific')¶
Bases:
FileProcessorFileProcessor for eGauge CSV files.
eGauge files contain a
'Date & Time'Unix-epoch column (seconds). Column names are prefixed with the filename stem so data from multiple devices can be merged without collisions. After concatenation the cumulative register values are differenced to produce per-interval deltas. The index is floored to the minute and duplicate timestamps are averaged.- Parameters:
- configConfigManager
The ConfigManager object that holds configuration data for the pipeline.
- start_timedatetime, optional
Earliest filename-encoded timestamp to include.
- end_timedatetime, optional
Latest filename-encoded timestamp to include (exclusive).
- filename_date_formatstr, optional
datetime.strftime()format for filename date comparison. Defaults to'%Y%m%d%H%M%S'.- file_prefixstr, optional
Only process files whose names begin with this prefix. Defaults to an empty string.
- data_sub_dirstr, optional
Sub-directory under the configured data directory containing the files. Defaults to an empty string.
- date_string_start_idxint, optional
Start index (from the end) of the date substring in the filename. Defaults to
-17.- date_string_end_idxint, optional
End index (from the end) of the date substring in the filename. Defaults to
-3.- time_zonestr, optional
Timezone name used to convert UTC epoch timestamps to local time. Defaults to
'US/Pacific'.
Methods
extract_files(config)Collect the full paths of files that match the processor's criteria.
extract_new(filenames)Filter a list of filenames to those whose encoded date falls in range.
get_raw_data()Return the concatenated raw DataFrame produced during initialisation.
raw_files_to_df(filenames)Concatenate files and difference cumulative register values.
- raw_files_to_df(filenames: list[str]) DataFrame¶
Concatenate files and difference cumulative register values.
Calls the parent implementation to concatenate all files, then computes row-wise differences so cumulative register values are converted into per-interval deltas. The first row is set to
NaNbecause its delta cannot be computed.- Parameters:
- filenameslist of str
Absolute file paths to read and concatenate.
- Returns:
- pd.DataFrame
Differenced DataFrame of interval deltas. Returns an empty DataFrame when no files could be read.
- class ecopipeline.extract.FieldManager(config: ConfigManager, start_time: datetime = None, end_time: datetime = None, create_csv: bool = True, csv_prefix: str = '')¶
Bases:
APIExtractorAPIExtractor for the FieldPop / Field Manager API.
Recursively splits the time range in half whenever the server returns a 500 ‘log size too large’ response, down to a minimum of 30-minute windows.
- Parameters:
- configConfigManager
The ConfigManager object that holds configuration data for the pipeline, including the FieldPop API token and device ID.
- start_timedatetime, optional
The start of the data extraction window. Defaults to
datetime(2000, 1, 1, 0, 0, 0)if not provided.- end_timedatetime, optional
The end of the data extraction window. Defaults to
datetime.now()if not provided.- create_csvbool, optional
If
True, writes the raw DataFrame to a CSV file in the configured data directory after a successful pull. Default isTrue.- csv_prefixstr, optional
A string prefix prepended to the generated CSV filename. Default is an empty string.
Methods
raw_data_to_df(config[, startTime, endTime])Fetch sensor data from the FieldPop API and return it as a DataFrame.
get_raw_data
- raw_data_to_df(config: ConfigManager, startTime: datetime = None, endTime: datetime = None) DataFrame¶
Fetch sensor data from the FieldPop API and return it as a DataFrame.
Queries the
fieldpop-api/deviceDataLogendpoint for the given time range. If the server responds with a 500 error indicating that the log size is too large, the method recursively bisects the time window until each sub-window is no smaller than 30 minutes.- Parameters:
- configConfigManager
The ConfigManager object used to retrieve the FieldPop API token (
get_fm_token) and device ID (get_fm_device_id).- startTimedatetime, optional
Start of the query window. Defaults to
datetime(2000, 1, 1, 0, 0, 0)if not provided.- endTimedatetime, optional
End of the query window. Defaults to
datetime.now()if not provided.
- Returns:
- pd.DataFrame
A DataFrame indexed by
time_pt(UTC timestamps converted todatetime) with one column per sensor, values aggregated by mean when multiple readings share the same timestamp. Returns an empty DataFrame if the request fails or an error occurs.
- class ecopipeline.extract.FileProcessor(config: ConfigManager, extension: str, start_time: datetime = None, end_time: datetime = None, raw_time_column: str = 'DateTime', time_column_format: str = '%Y/%m/%d %H:%M:%S', filename_date_format: str = '%Y%m%d%H%M%S', file_prefix: str = '', data_sub_dir: str = '', date_string_start_idx: int = -17, date_string_end_idx: int = -3, round_time_index: bool = False)¶
Bases:
objectBase class for reading raw data files into a pandas DataFrame.
On instantiation the processor collects matching files from the configured data directory, optionally filters them by date range, and concatenates them into a single DataFrame accessible via
get_raw_data().- Parameters:
- configConfigManager
The ConfigManager object that holds configuration data for the pipeline.
- extensionstr
File extension of raw data files (e.g.
".csv",".gz").- start_timedatetime, optional
Earliest timestamp (inclusive) used to filter files by the date encoded in their filename. Uses local time matching the data index.
- end_timedatetime, optional
Latest timestamp (exclusive) used to filter files by the date encoded in their filename. Uses local time matching the data index.
- raw_time_columnstr, optional
Name of the column in the raw file that contains timestamp strings. Defaults to
'DateTime'.- time_column_formatstr, optional
datetime.strptime()format string used to parseraw_time_column. Defaults to'%Y/%m/%d %H:%M:%S'.- filename_date_formatstr, optional
datetime.strftime()format string used to convertstart_timeandend_timeto integers for filename comparison. Defaults to'%Y%m%d%H%M%S'.- file_prefixstr, optional
Only files whose names begin with this prefix are processed. Defaults to an empty string (all files).
- data_sub_dirstr, optional
Sub-directory appended to the configured data directory when locating files. For example, if files live in
'path/to/data/DENT/'and the configured data directory is'path/to/data/', pass'DENT/'. Defaults to an empty string.- date_string_start_idxint, optional
Start index (from the end) of the date substring within each filename. Defaults to
-17.- date_string_end_idxint, optional
End index (from the end) of the date substring within each filename. Defaults to
-3.- round_time_indexbool, optional
If
True, floor the datetime index to the minute and average any duplicate timestamps after concatenation. Defaults toFalse.
Methods
extract_files(extension, config[, ...])Function takes in a file extension and subdirectory and returns a list of paths files in the directory of that type.
extract_new(startTime, filenames[, decihex, ...])Function filters the filenames to only those equal to or newer than the date specified startTime.
- extract_files(config: ConfigManager) list[str]¶
Collect the full paths of files that match the processor’s criteria.
Scans the configured data directory (plus any
data_sub_dir) for files whose names end with the configured extension and begin with the configured prefix. Whenstart_timeis set the list is further filtered byextract_new().- Parameters:
- configConfigManager
The ConfigManager object that provides the base data directory path.
- Returns:
- list of str
Absolute file paths for all files that satisfy the criteria.
- extract_new(filenames: list[str]) list[str]¶
Filter a list of filenames to those whose encoded date falls in range.
The date is extracted from each filename using
date_string_start_idxanddate_string_end_idx, then compared as an integer against the integer representations ofstart_timeandend_time(formatted withfilename_date_format).- Parameters:
- filenameslist of str
Candidate file paths to filter.
- Returns:
- list of str
Only the file paths whose encoded date satisfies
start_time <= date < end_time(end_timebound is omitted whenend_timeisNone).
- get_raw_data() DataFrame¶
Return the concatenated raw DataFrame produced during initialisation.
- Returns:
- pd.DataFrame
DataFrame containing all raw data read from the matching files. Returns an empty DataFrame when no files were found or all reads failed.
- raw_files_to_df(filenames: list[str]) DataFrame¶
Concatenate multiple raw files into a single DataFrame.
Calls
_read_file_into_df()for each path infilenames, ignores files that are not found or raise read errors, and concatenates the results. Whenround_time_indexis enabled the index is floored to the minute and duplicate timestamps are averaged.- Parameters:
- filenameslist of str
Absolute file paths to read and concatenate.
- Returns:
- pd.DataFrame
Concatenated DataFrame for all successfully read files. Returns an empty DataFrame when no files could be read.
- class ecopipeline.extract.FlowCSVProcessor(config: ConfigManager, start_time: datetime = None, end_time: datetime = None, filename_date_format: str = '%Y%m%d%H%M%S', file_prefix: str = '', data_sub_dir: str = '', date_string_start_idx: int = -17, date_string_end_idx: int = -3)¶
Bases:
FileProcessorFileProcessor for flow meter CSV files.
Flow meter files have 6 header rows to skip. The timestamp is reconstructed from individual
Year,Month,Day,Hour,Minute, andSecondcolumns into atime_ptdatetime index. The index is floored to the minute and duplicate timestamps are averaged.- Parameters:
- configConfigManager
The ConfigManager object that holds configuration data for the pipeline.
- start_timedatetime, optional
Earliest filename-encoded timestamp to include.
- end_timedatetime, optional
Latest filename-encoded timestamp to include (exclusive).
- filename_date_formatstr, optional
datetime.strftime()format for filename date comparison. Defaults to'%Y%m%d%H%M%S'.- file_prefixstr, optional
Only process files whose names begin with this prefix. Defaults to an empty string.
- data_sub_dirstr, optional
Sub-directory under the configured data directory containing the files. Defaults to an empty string.
- date_string_start_idxint, optional
Start index (from the end) of the date substring in the filename. Defaults to
-17.- date_string_end_idxint, optional
End index (from the end) of the date substring in the filename. Defaults to
-3.
Methods
extract_files(config)Collect the full paths of files that match the processor's criteria.
extract_new(filenames)Filter a list of filenames to those whose encoded date falls in range.
get_raw_data()Return the concatenated raw DataFrame produced during initialisation.
raw_files_to_df(filenames)Concatenate multiple raw files into a single DataFrame.
- class ecopipeline.extract.JSONProcessor(config: ConfigManager, start_time: datetime = None, end_time: datetime = None, raw_time_column: str = 'time', time_column_format: str = '%Y/%m/%d %H:%M:%S', filename_date_format: str = '%Y%m%d%H%M%S', file_prefix: str = '', data_sub_dir: str = '', date_string_start_idx: int = -17, date_string_end_idx: int = -3, zip_files: bool = True, time_zone: str = 'US/Pacific')¶
Bases:
FileProcessorMethods
extract_files(config)Collect the full paths of files that match the processor's criteria.
extract_new(filenames)Filter a list of filenames to those whose encoded date falls in range.
get_raw_data()Return the concatenated raw DataFrame produced during initialisation.
raw_files_to_df(filenames)Concatenate multiple raw files into a single DataFrame.
- class ecopipeline.extract.LiCOR(config: ConfigManager, start_time: datetime = None, end_time: datetime = None, create_csv: bool = True, csv_prefix: str = '')¶
Bases:
APIExtractorAPIExtractor for the LI-COR Cloud API.
Queries sensor data for the configured device between
start_timeandend_time. Returns a DataFrame indexed by UTC timestamp with sensor serial numbers as columns.- Parameters:
- configConfigManager
The ConfigManager object that holds configuration data for the pipeline, including the LI-COR API token (
config.api_token) and the device serial number (config.api_device_id).- start_timedatetime, optional
The start of the data extraction window. Defaults to 28 hours before
end_timeif not provided.- end_timedatetime, optional
The end of the data extraction window. Defaults to
datetime.now()if not provided.- create_csvbool, optional
If
True, writes the raw DataFrame to a CSV file in the configured data directory after a successful pull. Default isTrue.- csv_prefixstr, optional
A string prefix prepended to the generated CSV filename. Default is an empty string.
Methods
raw_data_to_df(config[, startTime, endTime])Fetch sensor data from the LI-COR Cloud API and return it as a DataFrame.
get_raw_data
- raw_data_to_df(config: ConfigManager, startTime: datetime = None, endTime: datetime = None) DataFrame¶
Fetch sensor data from the LI-COR Cloud API and return it as a DataFrame.
Calls the
/v2/dataendpoint, iterates over each sensor returned in the response, and assembles a wide-format DataFrame keyed by millisecond-precision UTC timestamps.- Parameters:
- configConfigManager
The ConfigManager object used to retrieve
config.api_tokenfor the Bearer authorisation header andconfig.api_device_idfor the device serial number query parameter.- startTimedatetime, optional
Start of the query window. Defaults to 28 hours before
endTimeif not provided.- endTimedatetime, optional
End of the query window. Defaults to
datetime.now()if not provided.
- Returns:
- pd.DataFrame
A DataFrame indexed by UTC
datetime(millisecond precision) with one column per sensor serial number. Non-numeric values are coerced toNonevia_get_float_value(). Returns an empty DataFrame if the request fails or an error occurs.
- class ecopipeline.extract.MSACSVProcessor(config: ConfigManager, start_time: datetime = None, end_time: datetime = None, filename_date_format: str = '%Y%m%d%H%M%S', file_prefix: str = '', data_sub_dir: str = '', date_string_start_idx: int = -17, date_string_end_idx: int = -3, mb_prefix: bool = False, time_zone: str = 'US/Pacific')¶
Bases:
FileProcessorFileProcessor for MSA CSV files.
MSA files contain a
'DateEpoch(secs)'Unix-epoch column that is converted from UTC to the configured timezone. Whenmb_prefixisTrue, all data columns are prefixed with the filename stem so that data from multiple devices can be merged without column collisions. The index is floored to the minute and duplicate timestamps are averaged.- Parameters:
- configConfigManager
The ConfigManager object that holds configuration data for the pipeline.
- start_timedatetime, optional
Earliest filename-encoded timestamp to include.
- end_timedatetime, optional
Latest filename-encoded timestamp to include (exclusive).
- filename_date_formatstr, optional
datetime.strftime()format for filename date comparison. Defaults to'%Y%m%d%H%M%S'.- file_prefixstr, optional
Only process files whose names begin with this prefix. Defaults to an empty string.
- data_sub_dirstr, optional
Sub-directory under the configured data directory containing the files. Defaults to an empty string.
- date_string_start_idxint, optional
Start index (from the end) of the date substring in the filename. Defaults to
-17.- date_string_end_idxint, optional
End index (from the end) of the date substring in the filename. Defaults to
-3.- mb_prefixbool, optional
If
True, prefix every column name with the filename stem. Defaults toFalse.- time_zonestr, optional
Timezone name used to convert UTC epoch timestamps to local time. Defaults to
'US/Pacific'.
Methods
extract_files(config)Collect the full paths of files that match the processor's criteria.
extract_new(filenames)Filter a list of filenames to those whose encoded date falls in range.
get_raw_data()Return the concatenated raw DataFrame produced during initialisation.
raw_files_to_df(filenames)Concatenate multiple raw files into a single DataFrame.
- class ecopipeline.extract.ModbusCSVProcessor(config: ConfigManager, start_time: datetime = None, end_time: datetime = None, raw_time_column: str = 'time(UTC)', filename_date_format: str = '%Y%m%d%H%M%S', file_prefix: str = '', data_sub_dir: str = '', date_string_start_idx: int = -17, date_string_end_idx: int = -3)¶
Bases:
FileProcessorFileProcessor for Modbus CSV files (e.g. Acquisuite).
Reads standard CSV files whose first column is a
time(UTC)timestamp string. All data columns are prefixed with the filename stem so that data from multiple Modbus devices can be merged without column name collisions. The index is floored to the minute and duplicate timestamps are averaged after concatenation.Acquisuite filenames encode the file start time as a hexadecimal Unix timestamp in the filename, so
extract_new()overrides the default date-substring comparison with a hex-to-datetime conversion.- Parameters:
- configConfigManager
The ConfigManager object that holds configuration data for the pipeline.
- start_timedatetime, optional
Earliest local timestamp to include (compared against the hex date embedded in the filename).
- end_timedatetime, optional
Latest local timestamp to include, exclusive (compared against the hex date embedded in the filename).
- raw_time_columnstr, optional
Name of the timestamp column in each CSV file. Defaults to
'time(UTC)'.- filename_date_formatstr, optional
datetime.strftime()format used when comparing non-hex filenames. Defaults to'%Y%m%d%H%M%S'.- file_prefixstr, optional
Only process files whose names begin with this prefix. Defaults to an empty string.
- data_sub_dirstr, optional
Sub-directory under the configured data directory containing the files. Defaults to an empty string.
- date_string_start_idxint, optional
Start index (from the end) of the date substring in the filename. Defaults to
-17.- date_string_end_idxint, optional
End index (from the end) of the date substring in the filename. Defaults to
-3.
Methods
extract_files(config)Collect the full paths of files that match the processor's criteria.
extract_new(filenames)Filter filenames using the hexadecimal Unix timestamp in the filename.
get_raw_data()Return the concatenated raw DataFrame produced during initialisation.
raw_files_to_df(filenames)Concatenate multiple raw files into a single DataFrame.
- extract_new(filenames: list[str]) list[str]¶
Filter filenames using the hexadecimal Unix timestamp in the filename.
Acquisuite filenames embed the file start time as a hexadecimal number between the first
.and the first_in the basename. This method decodes those hex timestamps to UTC datetimes, strips timezone awareness, and keeps only the files whose decoded time falls within[start_time, end_time).- Parameters:
- filenameslist of str
Candidate file paths to filter.
- Returns:
- list of str
Only the file paths whose decoded hex timestamp satisfies
start_time < local_timeand, whenend_timeis set,local_time < end_time.
- class ecopipeline.extract.Skycentrics(config: ConfigManager, start_time: datetime = None, end_time: datetime = None, create_csv: bool = True, csv_prefix: str = '', time_zone: str = 'US/Pacific')¶
Bases:
APIExtractorAPIExtractor for the Skycentrics API.
Pulls data day-by-day between
start_timeandend_time, normalises the JSON sensor records into a pivot table, and rounds 59:59 timestamps up to the next minute.- Parameters:
- configConfigManager
The ConfigManager object that holds configuration data for the pipeline, including the Skycentrics API token (
config.get_skycentrics_token) and device ID (config.api_device_id).- start_timedatetime, optional
The start of the data extraction window. Defaults to one day before
end_timeif not provided.- end_timedatetime, optional
The end of the data extraction window. Defaults to
datetime.utcnow()if not provided.- create_csvbool, optional
If
True, writes the raw DataFrame to a CSV file in the configured data directory after a successful pull. Default isTrue.- csv_prefixstr, optional
A string prefix prepended to the generated CSV filename. Default is an empty string.
- time_zonestr, optional
The timezone string used to localise timestamps after converting from UTC. Default is
'US/Pacific'.
Methods
raw_data_to_df(config[, startTime, endTime])Fetch sensor data from the Skycentrics API and return it as a DataFrame.
get_raw_data
- raw_data_to_df(config: ConfigManager, startTime: datetime = None, endTime: datetime = None) DataFrame¶
Fetch sensor data from the Skycentrics API and return it as a DataFrame.
Iterates day-by-day from
startTimetoendTime, issuing one authenticated GET request per day. Each gzip-compressed JSON response is decompressed, normalised withpandas.json_normalize(), and pivoted so that sensor IDs become columns. Timestamps that fall at second 59:59 of a minute are nudged forward by one second to align with the top of the next minute.- Parameters:
- configConfigManager
The ConfigManager object used to retrieve the per-request Skycentrics HMAC token via
config.get_skycentrics_tokenand the device ID viaconfig.api_device_id.- startTimedatetime, optional
Start of the query window (UTC). Defaults to one day before
endTimeif not provided.- endTimedatetime, optional
End of the query window (UTC). Defaults to
datetime.utcnow()if not provided.
- Returns:
- pd.DataFrame
A DataFrame indexed by
time_pt(timezone-aware timestamps converted toself.time_zone) with one column per sensor ID. Returns an empty DataFrame if no data is retrieved for the requested time frame.
- class ecopipeline.extract.SmallPlanetCSVProcessor(config: ConfigManager, start_time: datetime = None, end_time: datetime = None, filename_date_format: str = '%Y%m%d%H%M%S', file_prefix: str = '', data_sub_dir: str = '', date_string_start_idx: int = -17, date_string_end_idx: int = -3, site: str = '', system: str = '', time_zone: str = 'US/Pacific')¶
Bases:
FileProcessorFileProcessor for Small Planet Controls CSV files.
Similar to
MSACSVProcessorbut applies variable-name mapping fromVariable_Names.csvat read time: column names are first prefixed with the filename stem, then renamed through the alias-to-true-name mapping, and finally any columns that still carry an alias name or are absent from the true-name list are dropped. Only-NaN rows are also removed. The index is floored to the minute and duplicate timestamps are averaged.- Parameters:
- configConfigManager
The ConfigManager object that holds configuration data for the pipeline. Must provide a path to
Variable_Names.csvviaget_var_names_path().- start_timedatetime, optional
Earliest filename-encoded timestamp to include.
- end_timedatetime, optional
Latest filename-encoded timestamp to include (exclusive).
- filename_date_formatstr, optional
datetime.strftime()format for filename date comparison. Defaults to'%Y%m%d%H%M%S'.- file_prefixstr, optional
Only process files whose names begin with this prefix. Defaults to an empty string.
- data_sub_dirstr, optional
Sub-directory under the configured data directory containing the files. Defaults to an empty string.
- date_string_start_idxint, optional
Start index (from the end) of the date substring in the filename. Defaults to
-17.- date_string_end_idxint, optional
End index (from the end) of the date substring in the filename. Defaults to
-3.- sitestr, optional
If non-empty, only variable-name rows whose
sitecolumn matches this value are used for column mapping. Defaults to an empty string.- systemstr, optional
If non-empty, only variable-name rows whose
systemcolumn contains this substring are used for column mapping. Defaults to an empty string.- time_zonestr, optional
Timezone name used to convert UTC epoch timestamps to local time. Defaults to
'US/Pacific'.
- Raises:
- Exception
If the
Variable_Names.csvfile cannot be found at the path returned byconfig.get_var_names_path().
Methods
extract_files(config)Collect the full paths of files that match the processor's criteria.
extract_new(filenames)Filter a list of filenames to those whose encoded date falls in range.
get_raw_data()Return the concatenated raw DataFrame produced during initialisation.
raw_files_to_df(filenames)Concatenate multiple raw files into a single DataFrame.
- class ecopipeline.extract.ThingsBoard(config: ConfigManager, start_time: datetime = None, end_time: datetime = None, create_csv: bool = True)¶
Bases:
APIExtractorMethods
get_raw_data
raw_data_to_df
- raw_data_to_df(config: ConfigManager, startTime: datetime = None, endTime: datetime = None) DataFrame¶
- ecopipeline.extract.central_extract_function(config: ConfigManager, process_type: str, start_time: datetime = None, end_time: datetime = None, use_defaults: bool = True, raw_time_column: str = 'DateTime', time_column_format: str = '%Y/%m/%d %H:%M:%S', filename_date_format: str = '%Y%m%d%H%M%S', file_prefix: str = '', data_sub_dir: str = '', date_string_start_idx: int = -17, date_string_end_idx: int = -3, time_zone: str = 'America/Los_Angeles', site: str = '', system: str = '') [<class 'pandas.core.frame.DataFrame'>, <class 'pandas.core.frame.DataFrame'>]¶
Primary entry point for the extract stage of the data pipeline.
Dispatches to the appropriate file-based or API-based extractor based on
process_type, reads raw sensor data into a DataFrame, and then fetches matching outdoor air temperature (OAT) weather data from Open Meteo for the same time window. Whenstart_timeisNonethe function queries the database for the timestamp of the last available minute record and extracts only data newer than that point (normal run mode). Whenstart_timeis provided explicitly the function performs a reprocess run, reading from already-saved CSV files for API-based process types instead of hitting the remote API again.- Parameters:
- configConfigManager
Configuration object that stores database credentials, directory paths, API tokens, and site metadata used throughout the pipeline.
- process_typestr
Identifier for the extraction method. Accepted values are
"csv","csv_mb","csv_dent","csv_flow","csv_msa","csv_egauge","csv_small_planet","json","api_tb","api_skycentrics","api_fm", and"api_licor".- start_timedatetime, optional
Inclusive start of the extraction window in local time. If
Nonethe start time is derived fromget_last_full_day_from_db().- end_timedatetime, optional
Exclusive end of the extraction window in local time. If
Nonedata is extracted up to the most recent available record.- use_defaultsbool, optional
When
True(default) the function overwritesraw_time_columnandtime_column_formatwith the standard defaults forprocess_typevia_get_time_indicator_defaults().- raw_time_columnstr, optional
Name of the timestamp column in raw data files. Default is
'DateTime'. Ignored whenuse_defaultsisTrue.- time_column_formatstr, optional
strptime-compatible format string for parsingraw_time_column. Default is'%Y/%m/%d %H:%M:%S'. Ignored whenuse_defaultsisTrue.- filename_date_formatstr, optional
strftime-compatible format string used to parse dates embedded in raw data filenames. Default is"%Y%m%d%H%M%S".- file_prefixstr, optional
Optional prefix used to filter which files in the data directory are ingested. Default is
"".- data_sub_dirstr, optional
Sub-directory path appended to the configured data directory when locating raw data files (e.g.
"DENT/"). Default is"".- date_string_start_idxint, optional
Character index marking the start of the date substring within each filename. Default is
-17.- date_string_end_idxint, optional
Character index marking the end of the date substring within each filename. Default is
-3.- time_zonestr, optional
IANA time-zone name used to localise timestamps in the returned DataFrames (e.g.
"America/Los_Angeles"). Default is"America/Los_Angeles".- sitestr, optional
Site identifier passed to
SmallPlanetCSVProcessorto filter data to a single site when a file contains data for multiple sites. Default is"".- systemstr, optional
System identifier passed to
SmallPlanetCSVProcessorto filter data to a single system. Default is"".
- Returns:
- pd.DataFrame
Raw sensor data indexed by timestamp. Empty if no data was found for the requested time window.
- pd.DataFrame
Hourly outdoor air temperature data from Open Meteo indexed by timestamp, covering the same time window as the raw data. Empty if the raw DataFrame is empty or if the weather request fails.
- Raises:
- Exception
If
process_typeis not one of the recognised extraction method strings.
- ecopipeline.extract.csv_to_df(csv_filenames: List[str], mb_prefix: bool = False, round_time_index: bool = True, create_time_pt_idx: bool = False, original_time_columns: str = 'DateTime', time_format: str = '%Y/%m/%d %H:%M:%S') DataFrame¶
Function takes a list of csv filenames and reads all files into a singular dataframe. Use this for aquisuite data.
Deprecated since version Use:
central_extract_function()instead.- Parameters:
- csv_filenames: List[str]
List of filenames to be processed into a single dataframe
- mb_prefix: bool
A boolean that signifys if the data is in modbus form- if set to true, will prepend modbus prefix to each raw varriable name
- round_time_index: bool
A boolean that signifys if the dataframe timestamp indexes should be rounded down to the nearest minute. Should be set to False if there is no column in the data frame called ‘time(UTC)’ to index on. Defaults to True.
- create_time_pt_idx: bool
set to true if there is a time column in the csv that you wish to convert to a ‘time_pt’ index. False otherwise defaults to false.
- original_time_columnsstr
The name of the time column in the raw datafiles. defaults to ‘DateTime’. Only used if create_time_pt_idx is True
- Returns:
- pd.DataFrame:
Pandas Dataframe containing data from all files with column headers the same as the variable names in the files (with prepended modbus prefix if mb_prefix = True)
- ecopipeline.extract.dent_csv_to_df(csv_filenames: List[str], round_time_index: bool = True) DataFrame¶
Function takes a list of csv filenames and reads all files into a singular dataframe. Use this for aquisuite data.
Deprecated since version Use:
central_extract_function()instead.- Parameters:
- csv_filenames: List[str]
List of filenames to be processed into a single dataframe
- round_time_index: bool
A boolean that signifys if the dataframe timestamp indexes should be rounded down to the nearest minute. Should be set to False if there is no column in the data frame called ‘time(UTC)’ to index on. Defaults to True.
- Returns:
- pd.DataFrame:
Pandas Dataframe containing data from all files with column headers the same as the variable names in the files (with prepended modbus prefix if mb_prefix = True)
- ecopipeline.extract.egauge_csv_to_df(csv_filenames: List[str]) DataFrame¶
Function takes a list of csv filenames and reads all files into a singular dataframe. Use this for small planet control data.
Deprecated since version Use:
central_extract_function()instead.This data will have variable names equal variable_name column is Variable_Names.csv so you will not need to use the rename_sensors function afterwards.
- Parameters:
- csv_filenamesList[str]
List of filenames
- Returns:
- pd.DataFrame:
Pandas Dataframe containing data from all files
- ecopipeline.extract.excel_to_csv(excel_folder_path: str, csv_folder_path: str, excel_date_format: str = '%m/%d/%Y %I:%M:%S %p')¶
Combines all Excel files (.xlsx or .xls) in a folder into a single CSV file sorted by timestamp. The output CSV is named after the earliest timestamp found across all files.
Expects each Excel file to contain a ‘Time stamp’ column in the format excel_date_format. All files are concatenated, sorted by ‘Time stamp’, and written to a single CSV in csv_folder_path.
- Parameters:
- excel_folder_pathstr
Path to the folder containing the Excel files to combine.
- csv_folder_pathstr
Path to the folder where the output CSV file will be written. The output filename is derived from the earliest timestamp (e.g., ‘20220101010000.csv’).
- excel_date_formatstr
expected format of ‘Time stamp’ column in excel doc
- ecopipeline.extract.extract_files(extension: str, config: ConfigManager, data_sub_dir: str = '', file_prefix: str = '') List[str]¶
Function takes in a file extension and subdirectory and returns a list of paths files in the directory of that type.
Deprecated since version Use:
central_extract_function()instead.- Parameters:
- extensionstr
File extension of raw data files as string (e.g. “.csv”, “.gz”, …)
- configecopipeline.ConfigManager
The ConfigManager object that holds configuration data for the pipeline
- data_sub_dirstr
defaults to an empty string. If the files being accessed are in a sub directory of the configured data directory, use this parameter to point there. e.g. if the data files you want to extract are in “path/to/data/DENT/” and your configured data directory is “path/to/data/”, put “DENT/” as the data_sub_dir
- file_prefixstr
File name prefix of raw data file if only file names with a certain prefix should be processed.
- Returns:
- List[str]:
List of filenames
- ecopipeline.extract.extract_new(startTime: datetime, filenames: List[str], decihex=False, timeZone: str = None, endTime: datetime = None, dateStringStartIdx: int = -17, dateStringEndIdx: int = -3, dateFormat: str = '%Y%m%d%H%M%S', epochFormat: bool = False) List[str]¶
Function filters the filenames to only those equal to or newer than the date specified startTime.
Deprecated since version Use:
central_extract_function()instead.If filenames are in deciheximal, The function can still handel it. Note that for some projects, files are dropped at irregular intervals so data cannot be filtered by exact date.
Currently, this function expects file names to be in one of three formats:
- default (set decihex = False) format assumes file names are in format such that characters [-17,-3] in the file names string
are the files date in the form “%Y%m%d%H%M%S”
- deciheximal (set decihex = True) format assumes file names are in format such there is a deciheximal value between a ‘.’ and ‘_’ character in each filename string
that has a deciheximal value equal to the number of seconds since January 1, 1970 to represent the timestamp of the data in the file.
custom format is the same as default format but uses a custom date format with the dateFormat parameter and expects the date to be characters [dateStringStartIdx,dateStringEndIdx]
- Parameters:
- startTime: datetime
The point in time for which we want to start the data extraction from. This is local time from the data’s index.
- filenames: List[str]
List of filenames to be filtered by those equal to or newer than startTime
- decihex: bool
Defaults to False. Set to True if filenames contain date of data in deciheximal format
- timeZone: str
The timezone for the indexes in the output dataframe as a string. Must be a string recognized as a time stamp by the pandas tz_localize() function https://pandas.pydata.org/docs/reference/api/pandas.Series.tz_localize.html defaults to None
- dateStringStartIdx: int
The character index in each file where the date in format starts. Default is -17 (meaning 17 characters from the end of the filename string)
- dateStringEndIdx: int
The character index in each file where the date in format ends. Default is -3 (meaning 3 characters from the end of the filename string)
- Returns:
- List[str]:
Filtered list of filenames
- ecopipeline.extract.flow_csv_to_df(csv_filenames: List[str], round_time_index: bool = True) DataFrame¶
Function takes a list of csv filenames and reads all files into a singular dataframe. Use this for aquisuite data.
Deprecated since version Use:
central_extract_function()instead.- Parameters:
- csv_filenames: List[str]
List of filenames to be processed into a single dataframe
- round_time_index: bool
A boolean that signifys if the dataframe timestamp indexes should be rounded down to the nearest minute. Should be set to False if there is no column in the data frame called ‘time(UTC)’ to index on. Defaults to True.
- Returns:
- pd.DataFrame:
Pandas Dataframe containing data from all files with column headers the same as the variable names in the files (with prepended modbus prefix if mb_prefix = True)
- ecopipeline.extract.fm_api_to_df(config: ConfigManager, startTime: datetime = None, endTime: datetime = None, create_csv: bool = True) DataFrame¶
Function connects to the field manager api to pull data and returns a dataframe.
Deprecated since version Use:
central_extract_function()instead.- Parameters:
- configecopipeline.ConfigManager
The ConfigManager object that holds configuration data for the pipeline. The config manager must contain information to connect to the api, i.e. the api user name and password as well as the device id for the device the data is being pulled from.
- startTime: datetime
The point in time for which we want to start the data extraction from. This is local time from the data’s index.
- endTime: datetime
The point in time for which we want to end the data extraction. This is local time from the data’s index.
- create_csvbool
create csv files as you process such that API need not be relied upon for reprocessing
- Returns:
- pd.DataFrame:
Pandas Dataframe containing data from the API pull with column headers the same as the variable names in the data from the pull
- ecopipeline.extract.get_OAT_open_meteo(lat: float, long: float, start_date: datetime, end_date: datetime = None, time_zone: str = 'America/Los_Angeles', use_noaa_names: bool = True) DataFrame¶
- ecopipeline.extract.get_db_row_from_time(time: datetime, config: ConfigManager) DataFrame¶
Extracts a row from the applicable minute table in the database for the given datetime or returns empty dataframe if none exists
- Parameters:
- timedatetime
The time index to get the row from
- configecopipeline.ConfigManager
The ConfigManager object that holds configuration data for the pipeline
- Returns:
- pd.DataFrame:
Pandas Dataframe containing the row or empty if no row exists for the timestamp
- ecopipeline.extract.get_last_full_day_from_db(config: ConfigManager, table_identifier: str = 'minute') datetime¶
Retrieve the timestamp of the last fully recorded minute from the database.
Queries the table identified by
table_identifierin the pipeline database and returns the most recent timestamp. If the most recent row does not fall exactly at 23:59, the function assumes the day is incomplete and rolls back to the previous day at 23:59:00. If the table is empty or the query fails, a default datetime of 2000-01-09 23:59:00 US/Pacific is returned.- Parameters:
- configConfigManager
Configuration object that provides database connection details and table metadata for the pipeline.
- table_identifierstr, optional
Key used to look up the target table name in the pipeline config. Default is
"minute".
- Returns:
- datetime
End of the last fully populated day found in the database, or a default past datetime if no data is available.
- ecopipeline.extract.get_noaa_data(station_names: List[str], config: ConfigManager, station_ids: dict = {}) dict¶
Function will take in a list of station names and will return a dictionary where the key is the station name and the value is a dataframe with the parsed weather data.
- Parameters:
- station_namesList[str]
List of Station Names
- configecopipeline.ConfigManager
The ConfigManager object that holds configuration data for the pipeline
- Returns:
- dict:
Dictionary with key as Station Name and Value as DF of Parsed Weather Data
- ecopipeline.extract.get_sub_dirs(dir: str) List[str]¶
Function takes in a directory and returns a list of the paths to all immediate subfolders in that directory. This is used when multiple sites are being ran in same pipeline.
- Parameters:
- dirstr
Directory as a string.
- Returns:
- List[str]:
List of paths to subfolders.
- ecopipeline.extract.json_to_df(json_filenames: List[str], time_zone: str = 'US/Pacific') DataFrame¶
Function takes a list of gz/json filenames and reads all files into a singular dataframe.
Deprecated since version Use:
central_extract_function()instead.- Parameters:
- json_filenames: List[str]
List of filenames to be processed into a single dataframe
- time_zone: str
The timezone for the indexes in the output dataframe as a string. Must be a string recognized as a time stamp by the pandas tz_localize() function https://pandas.pydata.org/docs/reference/api/pandas.Series.tz_localize.html defaults to ‘US/Pacific’
- Returns:
- pd.DataFrame:
Pandas Dataframe containing data from all files with column headers the same as the variable names in the files
- ecopipeline.extract.licor_cloud_api_to_df(config: ConfigManager, startTime: datetime = None, endTime: datetime = None, create_csv: bool = True) DataFrame¶
Connects to the LI-COR Cloud API to pull sensor data and returns a dataframe.
Deprecated since version Use:
central_extract_function()instead.The function queries the LI-COR Cloud API for sensor data within the specified time range. Each sensor’s data is returned as a separate column in the dataframe, indexed by timestamp.
- Parameters:
- configecopipeline.ConfigManager
The ConfigManager object that holds configuration data for the pipeline. The config manager must contain the api_token and api_device_id (device serial number) for authentication with the LI-COR Cloud API.
- startTimedatetime
The start time for data extraction. If None, defaults to 28 hours before endTime.
- endTimedatetime
The end time for data extraction. If None, defaults to the current time.
- create_csvbool
If True, saves the extracted data to a CSV file in the data directory (default True).
- Returns:
- pd.DataFrame:
Pandas DataFrame with sensor serial numbers as column headers and timestamps as the index. The index is in UTC and may need to be converted to the appropriate timezone. Returns an empty DataFrame if the API call fails.
- ecopipeline.extract.msa_to_df(csv_filenames: List[str], mb_prefix: bool = False, time_zone: str = 'US/Pacific') DataFrame¶
Function takes a list of csv filenames and reads all files into a singular dataframe. Use this for MSA data.
Deprecated since version Use:
central_extract_function()instead.- Parameters:
- csv_filenamesList[str]
List of filenames
- mb_prefixbool
signifys in modbus form- if set to true, will append modbus prefix to each raw varriable
- timezonestr
local timezone, default is pacific
- Returns:
- pd.DataFrame:
Pandas Dataframe containing data from all files
- ecopipeline.extract.pull_egauge_data(config: ConfigManager, eGauge_ids: list, eGauge_usr: str, eGauge_pw: str, num_days: int = 2)¶
Download raw eGauge data files from eGauge devices via wget.
Deprecated since version Use:
central_extract_function()instead.
- ecopipeline.extract.remove_char_sequence_from_csv_header(csv_filenames: List[str], header_sequences_to_remove: List[str] = [])¶
Function to remove special characters that can’t be processed by pandas pd.read_csv function from csv headers
- Parameters:
- csv_filenames: List[str]
List of filenames to be processed into a single dataframe
- header_sequences_to_remove: List[str]
List of special character sequences to remove from column headers
- ecopipeline.extract.skycentrics_api_to_df(config: ConfigManager, startTime: datetime = None, endTime: datetime = None, create_csv: bool = True, time_zone: str = 'US/Pacific')¶
Function connects to the field manager api to pull data and returns a dataframe.
Deprecated since version Use:
central_extract_function()instead.- Parameters:
- configecopipeline.ConfigManager
The ConfigManager object that holds configuration data for the pipeline. The config manager must contain information to connect to the api, i.e. the api user name and password as well as the device id for the device the data is being pulled from.
- startTime: datetime
The point in time for which we want to start the data extraction from. This is local time from the data’s index.
- endTime: datetime
The point in time for which we want to end the data extraction. This is local time from the data’s index.
- create_csvbool
create csv files as you process such that API need not be relied upon for reprocessing
- time_zone: str
The timezone for the indexes in the output dataframe as a string. Must be a string recognized as a time stamp by the pandas tz_localize() function https://pandas.pydata.org/docs/reference/api/pandas.Series.tz_localize.html defaults to ‘US/Pacific’
- Returns:
- pd.DataFrame:
Pandas Dataframe containing data from the API pull with column headers the same as the variable names in the data from the pull
- ecopipeline.extract.small_planet_control_to_df(config: ConfigManager, csv_filenames: List[str], site: str = '', system: str = '') DataFrame¶
Function takes a list of csv filenames and reads all files into a singular dataframe. Use this for small planet control data.
Deprecated since version Use:
central_extract_function()instead.This data will have variable names equal variable_name column is Variable_Names.csv so you will not need to use the rename_sensors function afterwards.
- Parameters:
- configecopipeline.ConfigManager
The ConfigManager object that holds configuration data for the pipeline. Among other things, this object will point to a file called Varriable_Names.csv in the input folder of the pipeline (e.g. “full/path/to/pipeline/input/Variable_Names.csv”) The csv this points to should have at least 2 columns called “variable_alias” (the raw name to be changed from) and “variable_name” (the name to be changed to). All columns without a cooresponding variable_name will be dropped from the dataframe.
- csv_filenamesList[str]
List of filenames
- site: str
If the pipeline is processing data for a particular site with a dataframe that contains data from multiple sites that need to be prossessed seperatly, fill in this optional varriable to drop data from all other sites in the returned dataframe. Appropriate varriables in your Variable_Names.csv must have a matching substring to this varriable in a column called “site”.
- system: str
If the pipeline is processing data for a particular system with a dataframe that contains data from multiple systems that need to be prossessed seperatly, fill in this optional varriable to drop data from all other systems in the returned dataframe. Appropriate varriables in your Variable_Names.csv must have a matching string to this varriable in a column called “system”
- Returns:
- pd.DataFrame:
Pandas Dataframe containing data from all files
- ecopipeline.extract.tb_api_to_df(config: ConfigManager, startTime: datetime = None, endTime: datetime = None, create_csv: bool = True, query_hours: float = 1, sensor_keys: list = [], seperate_keys: bool = False, device_id_overwrite: str = None, csv_prefix: str = '')¶
Function connects to the things board manager api to pull data and returns a dataframe.
Deprecated since version Use:
central_extract_function()instead.- Parameters:
- configecopipeline.ConfigManager
The ConfigManager object that holds configuration data for the pipeline. The config manager must contain information to connect to the api, i.e. the api user name and password as well as the device id for the device the data is being pulled from.
- startTime: datetime
The point in time for which we want to start the data extraction from. This is local time from the data’s index.
- endTime: datetime
The point in time for which we want to end the data extraction. This is local time from the data’s index.
- create_csvbool
create csv files as you process such that API need not be relied upon for reprocessing
- query_hoursfloat
number of hours to query at a time from ThingsBoard API
- device_id_overwritestr
Overwrites device ID for API pull
- csv_prefixstr
prefix to add to the csv title
- Returns:
- pd.DataFrame:
Pandas Dataframe containing data from the API pull with column headers the same as the variable names in the data from the pull. Will return with index in UTC so needs to be converted after to appropriate timezone