Extract Documentation

ecopipeline.extract.csv_to_df(csv_filenames: List[str], mb_prefix: bool = False, round_time_index: bool = True, create_time_pt_idx: bool = False, original_time_columns: str = 'DateTime', time_format: str = '%Y/%m/%d %H:%M:%S') DataFrame

Function takes a list of csv filenames and reads all files into a singular dataframe. Use this for aquisuite data.

Parameters:
csv_filenames: List[str]

List of filenames to be processed into a single dataframe

mb_prefix: bool

A boolean that signifys if the data is in modbus form- if set to true, will prepend modbus prefix to each raw varriable name

round_time_index: bool

A boolean that signifys if the dataframe timestamp indexes should be rounded down to the nearest minute. Should be set to False if there is no column in the data frame called ‘time(UTC)’ to index on. Defaults to True.

create_time_pt_idx: bool

set to true if there is a time column in the csv that you wish to convert to a ‘time_pt’ index. False otherwise defaults to false.

original_time_columnsstr

The name of the time column in the raw datafiles. defaults to ‘DateTime’. Only used if create_time_pt_idx is True

Returns:
pd.DataFrame:

Pandas Dataframe containing data from all files with column headers the same as the variable names in the files (with prepended modbus prefix if mb_prefix = True)

ecopipeline.extract.dent_csv_to_df(csv_filenames: List[str], round_time_index: bool = True) DataFrame

Function takes a list of csv filenames and reads all files into a singular dataframe. Use this for aquisuite data.

Parameters:
csv_filenames: List[str]

List of filenames to be processed into a single dataframe

round_time_index: bool

A boolean that signifys if the dataframe timestamp indexes should be rounded down to the nearest minute. Should be set to False if there is no column in the data frame called ‘time(UTC)’ to index on. Defaults to True.

Returns:
pd.DataFrame:

Pandas Dataframe containing data from all files with column headers the same as the variable names in the files (with prepended modbus prefix if mb_prefix = True)

ecopipeline.extract.egauge_csv_to_df(csv_filenames: List[str]) DataFrame

Function takes a list of csv filenames and reads all files into a singular dataframe. Use this for small planet control data. This data will have variable names equal variable_name column is Variable_Names.csv so you will not need to use the rename_sensors function afterwards.

Parameters:
csv_filenamesList[str]

List of filenames

Returns:
pd.DataFrame:

Pandas Dataframe containing data from all files

ecopipeline.extract.extract_files(extension: str, config: ConfigManager, data_sub_dir: str = '', file_prefix: str = '') List[str]

Function takes in a file extension and subdirectory and returns a list of paths files in the directory of that type.

Parameters:
extensionstr

File extension of raw data files as string (e.g. “.csv”, “.gz”, …)

configecopipeline.ConfigManager

The ConfigManager object that holds configuration data for the pipeline

data_sub_dirstr

defaults to an empty string. If the files being accessed are in a sub directory of the configured data directory, use this parameter to point there. e.g. if the data files you want to extract are in “path/to/data/DENT/” and your configured data directory is “path/to/data/”, put “DENT/” as the data_sub_dir

file_prefixstr

File name prefix of raw data file if only file names with a certain prefix should be processed.

Returns:
List[str]:

List of filenames

ecopipeline.extract.extract_new(startTime: datetime, filenames: List[str], decihex=False, timeZone: str = None, endTime: datetime = None, dateStringStartIdx: int = -17, dateStringEndIdx: int = -3, dateFormat: str = '%Y%m%d%H%M%S', epochFormat: bool = False) List[str]

Function filters the filenames to only those equal to or newer than the date specified startTime. If filenames are in deciheximal, The function can still handel it. Note that for some projects, files are dropped at irregular intervals so data cannot be filtered by exact date.

Currently, this function expects file names to be in one of three formats:

  1. default (set decihex = False) format assumes file names are in format such that characters [-17,-3] in the file names string

    are the files date in the form “%Y%m%d%H%M%S”

  2. deciheximal (set decihex = True) format assumes file names are in format such there is a deciheximal value between a ‘.’ and ‘_’ character in each filename string

    that has a deciheximal value equal to the number of seconds since January 1, 1970 to represent the timestamp of the data in the file.

  3. custom format is the same as default format but uses a custom date format with the dateFormat parameter and expects the date to be characters [dateStringStartIdx,dateStringEndIdx]

Parameters:
startTime: datetime

The point in time for which we want to start the data extraction from. This is local time from the data’s index.

filenames: List[str]

List of filenames to be filtered by those equal to or newer than startTime

decihex: bool

Defaults to False. Set to True if filenames contain date of data in deciheximal format

timeZone: str

The timezone for the indexes in the output dataframe as a string. Must be a string recognized as a time stamp by the pandas tz_localize() function https://pandas.pydata.org/docs/reference/api/pandas.Series.tz_localize.html defaults to None

dateStringStartIdx: int

The character index in each file where the date in format starts. Default is -17 (meaning 17 characters from the end of the filename string)

dateStringEndIdx: int

The character index in each file where the date in format ends. Default is -3 (meaning 3 characters from the end of the filename string)

Returns:
List[str]:

Filtered list of filenames

ecopipeline.extract.flow_csv_to_df(csv_filenames: List[str], round_time_index: bool = True) DataFrame

Function takes a list of csv filenames and reads all files into a singular dataframe. Use this for aquisuite data.

Parameters:
csv_filenames: List[str]

List of filenames to be processed into a single dataframe

round_time_index: bool

A boolean that signifys if the dataframe timestamp indexes should be rounded down to the nearest minute. Should be set to False if there is no column in the data frame called ‘time(UTC)’ to index on. Defaults to True.

Returns:
pd.DataFrame:

Pandas Dataframe containing data from all files with column headers the same as the variable names in the files (with prepended modbus prefix if mb_prefix = True)

ecopipeline.extract.fm_api_to_df(config: ConfigManager, startTime: datetime = None, endTime: datetime = None) DataFrame

Function connects to the field manager api to pull data and returns a dataframe.

Parameters:
configecopipeline.ConfigManager

The ConfigManager object that holds configuration data for the pipeline. The config manager must contain information to connect to the api, i.e. the api user name and password as well as the device id for the device the data is being pulled from.

startTime: datetime

The point in time for which we want to start the data extraction from. This is local time from the data’s index.

endTime: datetime

The point in time for which we want to end the data extraction. This is local time from the data’s index.

Returns:
pd.DataFrame:

Pandas Dataframe containing data from the API pull with column headers the same as the variable names in the data from the pull

ecopipeline.extract.get_db_row_from_time(time: datetime, config: ConfigManager) DataFrame

Extracts a row from the applicable minute table in the database for the given datetime or returns empty dataframe if none exists

Parameters:
timedatetime

The time index to get the row from

configecopipeline.ConfigManager

The ConfigManager object that holds configuration data for the pipeline

Returns:
pd.DataFrame:

Pandas Dataframe containing the row or empty if no row exists for the timestamp

ecopipeline.extract.get_last_full_day_from_db(config: ConfigManager, table_identifier: str = 'minute') datetime

Function retrieves the last line from the database with the most recent datetime in local time.

Parameters:
configecopipeline.ConfigManager

The ConfigManager object that holds configuration data for the pipeline

table_identifierstr

Table identifier in config.ini with minute data. Default: “minute”

Returns:
datetime:

end of last full day populated in database or default past time if no data found

ecopipeline.extract.get_noaa_data(station_names: List[str], config: ConfigManager, station_ids: dict = {}) dict

Function will take in a list of station names and will return a dictionary where the key is the station name and the value is a dataframe with the parsed weather data.

Parameters:
station_namesList[str]

List of Station Names

configecopipeline.ConfigManager

The ConfigManager object that holds configuration data for the pipeline

Returns:
dict:

Dictionary with key as Station Name and Value as DF of Parsed Weather Data

ecopipeline.extract.get_sub_dirs(dir: str) List[str]

Function takes in a directory and returns a list of the paths to all immediate subfolders in that directory. This is used when multiple sites are being ran in same pipeline.

Parameters:
dirstr

Directory as a string.

Returns:
List[str]:

List of paths to subfolders.

ecopipeline.extract.json_to_df(json_filenames: List[str], time_zone: str = 'US/Pacific') DataFrame

Function takes a list of gz/json filenames and reads all files into a singular dataframe.

Parameters:
json_filenames: List[str]

List of filenames to be processed into a single dataframe

time_zone: str

The timezone for the indexes in the output dataframe as a string. Must be a string recognized as a time stamp by the pandas tz_localize() function https://pandas.pydata.org/docs/reference/api/pandas.Series.tz_localize.html defaults to ‘US/Pacific’

Returns:
pd.DataFrame:

Pandas Dataframe containing data from all files with column headers the same as the variable names in the files

ecopipeline.extract.msa_to_df(csv_filenames: List[str], mb_prefix: bool = False, time_zone: str = 'US/Pacific') DataFrame

Function takes a list of csv filenames and reads all files into a singular dataframe. Use this for MSA data.

Parameters:
csv_filenamesList[str]

List of filenames

mb_prefixbool

signifys in modbus form- if set to true, will append modbus prefix to each raw varriable

timezonestr

local timezone, default is pacific

Returns:
pd.DataFrame:

Pandas Dataframe containing data from all files

ecopipeline.extract.pull_egauge_data(config: ConfigManager, eGauge_ids: list, eGauge_usr: str, eGauge_pw: str, num_days: int = 1)
ecopipeline.extract.remove_char_sequence_from_csv_header(csv_filenames: List[str], header_sequences_to_remove: List[str] = [])

Function to remove special characters that can’t be processed by pandas pd.read_csv function from csv headers

Parameters:
csv_filenames: List[str]

List of filenames to be processed into a single dataframe

header_sequences_to_remove: List[str]

List of special character sequences to remove from column headers

ecopipeline.extract.small_planet_control_to_df(config: ConfigManager, csv_filenames: List[str], site: str = '', system: str = '') DataFrame

Function takes a list of csv filenames and reads all files into a singular dataframe. Use this for small planet control data. This data will have variable names equal variable_name column is Variable_Names.csv so you will not need to use the rename_sensors function afterwards.

Parameters:
configecopipeline.ConfigManager

The ConfigManager object that holds configuration data for the pipeline. Among other things, this object will point to a file called Varriable_Names.csv in the input folder of the pipeline (e.g. “full/path/to/pipeline/input/Variable_Names.csv”) The csv this points to should have at least 2 columns called “variable_alias” (the raw name to be changed from) and “variable_name” (the name to be changed to). All columns without a cooresponding variable_name will be dropped from the dataframe.

csv_filenamesList[str]

List of filenames

site: str

If the pipeline is processing data for a particular site with a dataframe that contains data from multiple sites that need to be prossessed seperatly, fill in this optional varriable to drop data from all other sites in the returned dataframe. Appropriate varriables in your Variable_Names.csv must have a matching substring to this varriable in a column called “site”.

system: str

If the pipeline is processing data for a particular system with a dataframe that contains data from multiple systems that need to be prossessed seperatly, fill in this optional varriable to drop data from all other systems in the returned dataframe. Appropriate varriables in your Variable_Names.csv must have a matching string to this varriable in a column called “system”

Returns:
pd.DataFrame:

Pandas Dataframe containing data from all files

ecopipeline.extract.tb_api_to_df(config: ConfigManager, startTime: datetime = None, endTime: datetime = None)