Setup none-blocking stream handler for sending loggin to the console.
Exported source
def init_console_logging(name=None, level=logging.INFO, timestamp=True):'''Setup none-blocking stream handler for sending loggin to the console.'''# Only if no handlers defined.ifnot logging.getLogger(name).handlers: logger = logging.getLogger() logger.setLevel(level) console = logging.StreamHandler() console.setLevel(level)# set a format which is simpler for console useif timestamp: formatter = logging.Formatter("%(asctime)s%(levelname)s\t%(process)d\t%(name)s\t%(filename)s\t%(lineno)d\t%(message)s", datefmt='%Y-%m-%dT%H:%M:%S%z')else: formatter = logging.Formatter("%(levelname)s\t%(process)d\t%(name)s\t%(filename)s\t%(lineno)d\t%(message)s")#formatter = logging.Formatter("%(asctime)s %(levelname)s\t%(process)d\t%(name)s\t%(filename)s\t%(lineno)d\t%(message)s", datefmt='%Y-%m-%dT%H:%M:%S%z')# tell the handler to use this format console.setFormatter(formatter)# add the handler to the root logger logger.addHandler(console)return loggerelse: logging.getLogger(name).info(f'There already is a logger installed for {name}.')
Timeseries dataframes
Timeseries data is a cornerstone of our data manipulation and most processing is on them
Converts Pandas dataframes and series, Numpy array’s and recarrays or a dictionary of individual timeseries into a Pandas dataframe with one datetime index. With all arrays dataframes and series it is assumed that the first column contains the timestamps.
*Convert various tabular data formats to timeseries DataFrame
Args: data (Union[pd.DataFrame, pd.Series, dict, np.ndarray, np.recarray]): The input data to be converted. timezone (str, optional): The timezone to set for the index of the DataFrame. Defaults to ‘UTC’. columnnames (Optional[List[str]]): The column names to use for the DataFrame. Defaults to None.
Returns: pd.DataFrame: The converted timeseries DataFrame with the index set to the specified timezone.*
Converts a data dict into a pandas DataFrame based on the specified record format. Parameters: - data: A dictionary containing the data to convert. - timecolumns: A list of column names to be treated as time columns. - recordformat: A string specifying the format of the data records (‘records’, ‘table’, ‘split’, ‘index’, ‘tight’). Returns: - df: A pandas DataFrame with a DatetimeIndex representing the converted data.
Exported source
def timeseries_dataframe_from_datadict( data:dict, timecolumns=None, recordformat='records'):""" Converts a data dict into a pandas DataFrame based on the specified record format. Parameters: - data: A dictionary containing the data to convert. - timecolumns: A list of column names to be treated as time columns. - recordformat: A string specifying the format of the data records ('records', 'table', 'split', 'index', 'tight'). Returns: - df: A pandas DataFrame with a DatetimeIndex representing the converted data. """ orient = recordformat.lower()assert orient in ['records', 'table', 'split', 'index', 'tight']assert timecolumns, 'No time columns specified'if orient =='records':# data is a structured ndarray, sequence of tuples or dicts, or DataFrame df = pd.DataFrame.from_records(data) time_columns_in_df = [C for C in df.columns if C in timecolumns]ifnot time_columns_in_df:#syslog.error(f"No column in records {df.columns} matches specification in time columns {timecolumns}, assuming first column is time") time_column = df.columns[0]else: time_column = time_columns_in_df[0]elif orient =='table':# data is in pandas table format time_column = data['schema']['primaryKey'][0] df = pd.DataFrame.from_dict(data['data']).set_index(data['schema']['primaryKey']) df.index.name ='time'else:# data is formatted according to 'orient' parameter (pandas) df = pd.DataFrame.from_dict(data, orient=orient) time_column = df.index.name df.columns =list(df.columns) df[time_column] = pd.to_datetime(df[time_column],utc=True,format='ISO8601') df.set_index(time_column, inplace=True) df.index = pd.DatetimeIndex(df.index).round('ms') df.index.name ='time'return df
*Convert a timeseries DataFrame or Series into a dictionary representation.
Args: data (Union[pd.DataFrame, pd.Series, dict]): The input data to be converted. It can be a pandas DataFrame, Series, or a dictionary. recordformat (str, optional): The format of the output records. Defaults to ‘records’. timezone (str, optional): The timezone to use for the DataFrame index. Defaults to ‘UTC’. popNaN (bool, optional): Whether to remove NaN values from the output dictionary. Defaults to False.
Returns: Union[dict, list]: The converted dictionary representation of the input data. If popNaN is True, it returns a dictionary with NaN values removed. Otherwise, it returns a dictionary or a list of dictionaries depending on the recordformat parameter.*
*Resamples a time-series DataFrame on the specified period and method.
Parameters: df (pd.DataFrame): The input time-series DataFrame. period (str): The resampling period. method (str): The resampling method. Can be a string of multiple methods separated by ‘;’. method_args (dict, optional): Additional arguments for the resampling method.
Initialize self. See help(type(self)) for accurate signature.
Type
Details
save_dir
str
path where the module can keep files
assets_dir
str
path to support files (scripts, metadata, etc)
args
VAR_POSITIONAL
kwargs
VAR_KEYWORD
Exported source
@patchdef__init__(self:AICoreModuleBase, save_dir:str, # path where the module can keep files assets_dir:str, # path to support files (scripts, metadata, etc)*args, **kwargs ):self.init_time = datetime.datetime.now(datetime.UTC)self.aicorebridge_version = __version__self.init_args = argsself.init_kwargs =dict(**kwargs, assets_dir=assets_dir, save_dir=save_dir )
save_dir = os.path.join(os.getcwd(), 'cache')test_module = AICoreModuleBase(os.path.join(os.getcwd(), 'cache'), None, 1, 2, num_1=3, num_2=4)assert test_module.init_args == (1, 2), "init_args should be (1, 2)"assert test_module.init_kwargs['num_1'] ==3, "init_kwargs['num_1'] should be 3"assert test_module.init_kwargs['num_2'] ==4, "init_kwargs['num_2'] should be 4"assert test_module.init_kwargs['save_dir'] == save_dir, f"init_kwargs['save_dir'] should be {save_dir}"