=set_time_index_zone(timeseries_dataframe_from_datadict(
test_data
[
{"time":"2023-05-04T10:04:49",
"value":16.72
},
{"time":"2023-05-04T10:44:53",
"value":16.55
},
{"time":"2023-05-04T10:24:51",
"value":16.65
}'datetimeMeasure', 'time'], 'records'), 'UTC').sort_index() ], [
Support functions
build_historic_args
build_historic_args (data:pandas.core.frame.DataFrame, history:dict|list)
Create a timeseries DataFrame from historic data defined in history
.
Type | Details | |
---|---|---|
data | DataFrame | The input time-series DataFrame. |
history | dict | list | Historic data definition, each item in the list is a dictionary with a startDate key to set the start of a section of historic data in the result and a column-value pair for each of the columns in the |
Returns | dict | Historic data in dictionary format where keys are column names and values are the historic values as numpy array. |
= [
history_arg dict(anumber=1.0),
dict(startDate="2023-05-04T10:25:00+00:00", anumber=2.0)
] build_historic_args(test_data,history_arg)
{'anumber': array([1., 1., 2.])}
Class AICoreModule
AICoreModule
AICoreModule (processor:Callable, save_dir:str, assets_dir:str, *args, **kwargs)
Initialize self. See help(type(self)) for accurate signature.
Type | Details | |
---|---|---|
processor | Callable | data processing function |
save_dir | str | path where the module can keep files |
assets_dir | str | |
args | VAR_POSITIONAL | |
kwargs | VAR_KEYWORD |
Exported source
class AICoreModule(AICoreModuleBase):
def __init__(self,
# data processing function
processor:typing.Callable, str, # path where the module can keep files
save_dir:str,
assets_dir:*args, **kwargs):
super().__init__(save_dir, assets_dir, *args, **kwargs)
self._init_processor(processor)
AICoreModule.call_processor
AICoreModule.call_processor (calldata, **callargs)
Exported source
@patch
def _init_processor(
self:AICoreModule,
processor:typing.Callable):"""Initializes processor related variables on self"""
self.processor = processor
self.processor_signature = inspect.signature(self.processor)
self.processor_params = dict(self.processor_signature.parameters)
self.return_param = self.processor_params.pop('return', None)
self.data_param, *self.call_params = list(self.processor_params.keys())
if not (
self.processor_params[self.data_param].annotation == pd.DataFrame
or self.processor_params[self.data_param].annotation == np.ndarray
):
self.data_param = None
self.call_params = list(self.processor_params.keys())
Exported source
# can be overloaded
@patch
def call_processor(self:AICoreModule, calldata, **callargs):
if self.data_param:
return self.processor(calldata, **callargs)
else:
return self.processor(**callargs)
infer()
This method, called by the AICore, is responsible for processing the data and parameters request recieved by AICore. Infer takes a data
parameter which contains the contents of the data key in the request body. Additionally an optional list of files that were send with the request - these are currently ignored - and finally the contents of the kwargs key in the request body.
AICoreModule.infer
AICoreModule.infer (data:dict, *_, **kwargs)
Exported source
@patch
def infer(self:AICoreModule, data:dict, *_, **kwargs):
try:
=[
msgf"Startup time: {self.init_time.isoformat()}",
f"Corebridge version: {self.aicorebridge_version}",
f"{self.processor.__name__}({self.processor_signature})",
#f"init_args: {self.init_args}, init_kwargs: {self.init_kwargs}",
]
# Pickup params, pop those that are not intended for the processor
= kwargs.pop('lastSeen', False)
lastSeen = kwargs.pop('format', "records").lower()
recordformat = kwargs.get('timezone', 'UTC')
timezone f"lastSeen: {lastSeen}, recordformat: {recordformat}, timezone: {timezone}")
msg.append(
= kwargs.pop('samplerPeriod', self.init_kwargs.get('samplerPeriod','h'))
samplerPeriod = kwargs.pop('samplerMethod', self.init_kwargs.get('samplerMethod',None))
samplerMethod reversed = kwargs.pop('reversed', False)
= self.get_call_data(
calldata
data, =recordformat,
recordformat=timezone)
timezone
#msg.append(f"calldata shape: {calldata.shape}")
= build_historic_args(calldata, kwargs.pop('history', {}))
history
= self.get_callargs(kwargs, history)
callargs
for arg, val in callargs.items():
f"{arg}: {val}")
msg.append(
= self.call_processor(
calculated_result
calldata, **callargs
)
try:
= timeseries_dataframe(
result
calculated_result, =timezone)
timezone
f"result shape: {result.shape}")
msg.append(
if samplerMethod:
f"Sampler: {samplerMethod}, period: {samplerPeriod}")
msg.append(= timeseries_dataframe_resample(result, samplerPeriod, samplerMethod)
result
f"return-data shape: {result.shape}")
msg.append(
if reversed:
= result[::-1]
result
return {
'msg':msg,
'data': timeseries_dataframe_to_datadict(
if not lastSeen else result[-1:],
result =recordformat,
recordformat=timezone,
timezone=True)
popNaN
}
# tries dataframe return
except Exception as err:
f"No timeseries data, error={err}")
msg.append(
= pd.DataFrame(calculated_result)
df
df= [f"value_{str(c)}" if isinstance(c, int) else str(c) for c in list(df.columns)]
df.columns ='records')
df.reset_index().to_dict(orientreturn {
'msg':msg,
'data': df.reset_index().to_dict(orient='records')
}
# function try-catch
except Exception as err:
''.join(traceback.format_exception(None, err, err.__traceback__)))
msg.append(f"Exception {str(err)} in infer()")
syslog.exception(return {
'msg': f"Unexpected {err=}, {type(err)=}",
'data': []
}
get_callargs
AICoreModule.init_annotated_param
AICoreModule.init_annotated_param (param_name, value)
*Initialize argument for the processor call
param_name: name of the parameter to be initialized value: value of the parameter read from infer data to be used for initialization*
Exported source
# Specialized types for initializing annotated parameters
# Add types by adding a tuple with the type name and a builder function
= {
annotated_arg_builders str(B[0]):B[1] for B in [
lambda X: numpy.array(X, dtype=X.dtype))
(numpy.ndarray,
] }
Exported source
@patch
def init_annotated_param(self:AICoreModule, param_name, value):
"""
Initialize argument for the processor call
param_name: name of the parameter to be initialized
value: value of the parameter read from infer data to be used for initialization
"""
= self.processor_signature.parameters[param_name].annotation
annotation
# try to convert value to one of the types in the annotation
for T in typing.get_args(annotation):
try:
= annotated_arg_builders.get(str(T), T)
builder return builder(value)
except TypeError as err:
continue
try:
return annotation(value)
except TypeError as err:
f"Exception {str(err)} in fallback conversion to {annotation} of {type(value)}") syslog.exception(
AICoreModule.get_callargs
AICoreModule.get_callargs (kwargs, history)
Get arguments for the processor call
Exported source
@patch
def get_callargs(self:AICoreModule, kwargs, history):
"Get arguments for the processor call"
# Remove null / None values
= {k:v for k,v in kwargs.items() if v is not None}
kwargs
return {
self.init_annotated_param(
K:
K,
history.get(
K,self.init_kwargs.get(
K,
kwargs.get(
K, self.processor_signature.parameters[K].default
)
)
)
)for K in self.call_params
}
def processor_function(data:pd.DataFrame, anumber:float|np.ndarray):
return anumber * data
= AICoreModule(processor_function, os.path.join(os.getcwd(), 'cache'), os.path.join(os.getcwd(), 'cache')) test_module
get_call_data
AICoreModule.get_call_data
AICoreModule.get_call_data (data:dict, recordformat='records', timezone='UTC')
Convert data to the processor signature
Exported source
@patch
def get_call_data(
self:AICoreModule,
dict,
data:='records',
recordformat='UTC'):
timezone
"Convert data to the processor signature"
if not self.data_param:
return None
= set_time_index_zone(timeseries_dataframe_from_datadict(
df 'datetimeMeasure', 'time'], recordformat), timezone)
data, [
=True)
df.sort_index(inplace
if self.processor_params[self.data_param].annotation == pd.DataFrame:
return df
elif len(df.columns) > 1:
= (df.index - datetime.datetime(1970,1,1, tzinfo=datetime.timezone.utc)) / datetime.timedelta(seconds=1)
df.index return df.to_records(index=True)
else:
= (df.index - datetime.datetime(1970,1,1, tzinfo=datetime.timezone.utc)) / datetime.timedelta(seconds=1)
df.index return df.reset_index().to_numpy()
= test_module.get_call_data(test_data.reset_index().to_records())
calldata calldata
index | value | |
---|---|---|
time | ||
2023-05-04 10:04:49+00:00 | 0 | 16.72 |
2023-05-04 10:24:51+00:00 | 1 | 16.65 |
2023-05-04 10:44:53+00:00 | 2 | 16.55 |
= build_historic_args(calldata,history_arg)
history history
{'anumber': array([2., 2., 2.])}
print(test_module.get_callargs(calldata, history))
{'anumber': array([2., 2., 2.])}
'anumber']) np.array(history[
array([2., 2., 2.])
test_module.init_annotated_param('anumber',
'anumber']
history[ )
array([2., 2., 2.])
test_module.init_annotated_param('anumber',
1.5
)
1.5
'anumber'].annotation test_module.processor_signature.parameters[
float | numpy.ndarray
'anumber']) np.array(history[
array([2., 2., 2.])
str(numpy.ndarray)](history['anumber']) annotated_arg_builders[
array([2., 2., 2.])
assert True, 'stop'
Tests
import os, addroot, pandas as pd, numpy as np
def test_function(data:pd.DataFrame, anumber:float|np.ndarray=0):
return data * anumber
def test_simple_function(anumber:float, another:float):
return [another * anumber]
class TestAICoreModule(AICoreModule):
def __init__(self, save_dir, *args, **kwargs):
super().__init__(test_function, save_dir, None, *args, **kwargs)
<<<<<<< HEAD
class SimpleAICoreModule(AICoreModule):
def __init__(self, save_dir, *args, **kwargs):
super().__init__(test_simple_function, save_dir, None, *args, **kwargs)
= os.path.join(os.getcwd(), 'cache')
save_dir = TestAICoreModule(os.path.join(os.getcwd(), 'cache'), 1, 2, num_1=3, num_2=4)
test_module
assert test_module.init_args == (1, 2)
assert test_module.init_kwargs['num_1'] == 3
assert test_module.init_kwargs['num_2'] == 4
assert test_module.init_kwargs['save_dir'] == save_dir
= [
test_data dict(datetimeMeasure='2020-04-01T00:01:11.123Z', value=1.1),
dict(datetimeMeasure='2020-04-02T00:20:00Z', value=2.3),
]= test_module.infer(test_data, timezone='Europe/Amsterdam', anumber=2)
result print("Test Data\n", json.dumps(test_data, indent=2))
print("Result Message\n", json.dumps(result['msg'], indent=2))
print("Result Data\n", json.dumps(result['data'], indent=2))
Normalized, Europe/Amsterdam
Test Data
[
{
"datetimeMeasure": "2020-04-01T00:01:11.123Z",
"value": 1.1
},
{
"datetimeMeasure": "2020-04-02T00:20:00Z",
"value": 2.3
}
]
Result Message
[
"Startup time: 2025-02-07T12:51:34.319178+00:00",
"Corebridge version: 0.4.0",
"test_function((data: pandas.core.frame.DataFrame, anumber: float | numpy.ndarray = 0))",
"lastSeen: False, recordformat: records, timezone: Europe/Amsterdam",
"anumber: 2.0",
"result shape: (2, 1)",
"return-data shape: (2, 1)"
]
Result Data
[
{
"time": "2020-04-01T02:01:11.123+02:00",
"value": 2.2
},
{
"time": "2020-04-02T02:20:00.000+02:00",
"value": 4.6
}
]
'data'].annotation test_module.processor_signature.parameters[
pandas.core.frame.DataFrame
= test_module.processor_signature.parameters['anumber'].annotation
annotation print(typing.get_args(annotation))
(<class 'float'>, <class 'numpy.ndarray'>)
for T in typing.get_args(annotation):
print(T(0))
0.0
[]
Simple module
= SimpleAICoreModule(save_dir, 1, 2, num_1=3, num_2=4)
simple_module
assert simple_module.init_args == (1, 2)
assert simple_module.init_kwargs['num_1'] == 3
assert simple_module.init_kwargs['num_2'] == 4
assert simple_module.init_kwargs['save_dir'] == save_dir
not simple_module.data_param
True
simple_module.call_params
['anumber', 'another']
= simple_module.infer([], timezone='Europe/Amsterdam', anumber=2, another=11) result
print("Result Message\n", json.dumps(result['msg'], indent=2))
print("Result Data\n", json.dumps(result['data'], indent=2))
Result Message
[
"Startup time: 2025-02-07T12:51:34.381787+00:00",
"Corebridge version: 0.4.0",
"test_simple_function((anumber: float, another: float))",
"lastSeen: False, recordformat: records, timezone: Europe/Amsterdam",
"anumber: 2.0",
"another: 11.0",
"No timeseries data, error='list' object has no attribute 'dtype'"
]
Result Data
[
{
"index": 0,
"value_0": 22.0
}
]
Tests with library module
import corebridge.core
from corebridge.aicorebridge import AICoreModule
class TestAICoreModule(AICoreModule):
def __init__(self, save_dir, *args, **kwargs):
super().__init__(test_function, save_dir, None, *args, **kwargs)
= TestAICoreModule(os.path.join(os.getcwd(), 'cache'), 1, 2, num_1=3, num_2=4)
test_module
assert test_module.init_args == (1, 2)
assert test_module.init_kwargs['num_1'] == 3
assert test_module.init_kwargs['num_2'] == 4
assert test_module.init_kwargs['save_dir'] == save_dir
= [
test_data dict(datetimeMeasure='2020-04-01T00:01:11.123Z', value=1.1),
dict(datetimeMeasure='2020-04-02T00:20:00Z', value=2.3),
]= test_module.infer(test_data, timezone='Europe/Amsterdam', anumber=2)
result print("Test Data\n", json.dumps(test_data, indent=2))
print("Result Message\n", json.dumps(result['msg'], indent=2))
print("Result Data\n", json.dumps(result['data'], indent=2))
Normalized, Europe/Amsterdam
Test Data
[
{
"datetimeMeasure": "2020-04-01T00:01:11.123Z",
"value": 1.1
},
{
"datetimeMeasure": "2020-04-02T00:20:00Z",
"value": 2.3
}
]
Result Message
[
"Startup time: 2025-02-07T12:51:34.421358+00:00",
"Corebridge version: 0.4.0",
"test_function((data: pandas.core.frame.DataFrame, anumber: float | numpy.ndarray = 0))",
"init_args: (1, 2), init_kwargs: {'num_1': 3, 'num_2': 4, 'assets_dir': None, 'save_dir': '/home/fenke/repos/corebridge/nbs/cache'}",
"lastSeen: False, recordformat: records, timezone: Europe/Amsterdam",
"anumber: 2.0",
"result shape: (2, 1)",
"return-data shape: (2, 1)"
]
Result Data
[
{
"time": "2020-04-01T02:01:11.123+02:00",
"value": 2.2
},
{
"time": "2020-04-02T02:20:00.000+02:00",
"value": 4.6
}
]
test_module.__dict__
{'init_time': datetime.datetime(2025, 2, 7, 12, 51, 34, 421358, tzinfo=datetime.timezone.utc),
'aicorebridge_version': '0.4.0',
'init_args': (1, 2),
'init_kwargs': {'num_1': 3,
'num_2': 4,
'assets_dir': None,
'save_dir': '/home/fenke/repos/corebridge/nbs/cache'},
'processor': <function __main__.test_function(data: pandas.core.frame.DataFrame, anumber: float | numpy.ndarray = 0)>,
'processor_signature': <Signature (data: pandas.core.frame.DataFrame, anumber: float | numpy.ndarray = 0)>,
'processor_params': {'data': <Parameter "data: pandas.core.frame.DataFrame">,
'anumber': <Parameter "anumber: float | numpy.ndarray = 0">},
'return_param': None,
'data_param': 'data',
'call_params': ['anumber']}
class TestAICoreModule(AICoreModule):
def __init__(self, save_dir, *args, **kwargs):
super().__init__(test_function, save_dir, None, *args, **kwargs)
= os.path.join(os.getcwd(), 'cache')
save_dir = TestAICoreModule(os.path.join(os.getcwd(), 'cache'), 1, 2, num_1=3, num_2=4)
test_module
assert test_module.init_args == (1, 2)
assert test_module.init_kwargs['num_1'] == 3
assert test_module.init_kwargs['num_2'] == 4
assert test_module.init_kwargs['save_dir'] == save_dir
= [
test_data dict(datetimeMeasure='2020-04-01T00:01:11.123Z', value=1.1),
dict(datetimeMeasure='2020-04-02T00:20:00Z', value=2.3),
]= test_module.infer(test_data, timezone='UTC', anumber=2)
result print("Test Data\n", json.dumps(test_data, indent=2))
print("Result Message\n", json.dumps(result['msg'], indent=2))
print("Result Data\n", json.dumps(result['data'], indent=2))
Normalized, UTC
Test Data
[
{
"datetimeMeasure": "2020-04-01T00:01:11.123Z",
"value": 1.1
},
{
"datetimeMeasure": "2020-04-02T00:20:00Z",
"value": 2.3
}
]
Result Message
[
"Startup time: 2025-02-07T12:51:34.443805+00:00",
"Corebridge version: 0.4.0",
"test_function((data: pandas.core.frame.DataFrame, anumber: float | numpy.ndarray = 0))",
"init_args: (1, 2), init_kwargs: {'num_1': 3, 'num_2': 4, 'assets_dir': None, 'save_dir': '/home/fenke/repos/corebridge/nbs/cache'}",
"lastSeen: False, recordformat: records, timezone: UTC",
"anumber: 2.0",
"result shape: (2, 1)",
"return-data shape: (2, 1)"
]
Result Data
[
{
"time": "2020-04-01T00:01:11Z",
"value": 2.2
},
{
"time": "2020-04-02T00:20:00Z",
"value": 4.6
}
]