= init_console_logging(__name__, logging.DEBUG, timestamp=False) syslog
Support functions
Added /home/runner/work/corebridge/corebridge to PATH
build_historic_args
build_historic_args (data:pandas.core.frame.DataFrame, history:dict|list)
Create a timeseries DataFrame from historic data defined in history
.
Type | Details | |
---|---|---|
data | DataFrame | The input time-series DataFrame. |
history | dict | list | Historic data definition, each item in the list is a dictionary with a startDate key to set the start of a section of historic data in the result and a column-value pair for each of the columns in the |
Returns | dict | Historic data in dictionary format where keys are column names and values are the historic values as numpy array. |
=set_time_index_zone(timeseries_dataframe_from_datadict(
test_data
[
{"time":"2023-05-04T10:04:49",
"value":16.72
},
{"time":"2023-05-04T10:44:53",
"value":16.55
},
{"time":"2023-05-04T10:24:51",
"value":16.65
}'datetimeMeasure', 'time'], 'records'), 'UTC').sort_index() ], [
test_data
value | |
---|---|
time | |
2023-05-04 10:04:49+00:00 | 16.72 |
2023-05-04 10:24:51+00:00 | 16.65 |
2023-05-04 10:44:53+00:00 | 16.55 |
= [
history_arg dict(justANumber=1.0),
dict(startDate="2023-05-04T10:25:00+00:00", justANumber=2.0)
] build_historic_args(test_data,history_arg)
{'justANumber': array([1., 1., 2.])}
assert len(test_data) == len(build_historic_args(test_data,history_arg)['justANumber']), "build_historic_args failed to build historic data"
Class AICoreModule
AICoreModule
AICoreModule (processor:Callable, save_dir:str, assets_dir:str, *args, **kwargs)
Initialize self. See help(type(self)) for accurate signature.
Type | Details | |
---|---|---|
processor | Callable | data processing function |
save_dir | str | path where the module can keep files |
assets_dir | str | |
args | VAR_POSITIONAL | |
kwargs | VAR_KEYWORD |
Exported source
class AICoreModule(AICoreModuleBase):
def __init__(self,
# data processing function
processor:typing.Callable, str, # path where the module can keep files
save_dir:str,
assets_dir:*args, **kwargs):
super().__init__(save_dir, assets_dir, *args, **kwargs)
self._init_processor(processor)
AICoreModule.call_processor
AICoreModule.call_processor (calldata, **callargs)
Exported source
# TODO: Refactor into Processor classes to handle different funtion types
@patch
def _init_processor(
self:AICoreModule,
processor:typing.Callable):"""Initializes processor related variables on self"""
self.processor = processor
self.processor_signature = inspect.signature(self.processor)
self.processor_params = dict(self.processor_signature.parameters)
self.return_param = self.processor_params.pop('return', None)
self.data_param, *self.call_params = list(self.processor_params.keys())
if not (
self.processor_params[self.data_param].annotation == pd.DataFrame
or self.processor_params[self.data_param].annotation == np.ndarray
):
self.data_param = None
self.call_params = list(self.processor_params.keys())
Exported source
# can be overloaded
@patch
def call_processor(self:AICoreModule, calldata, **callargs):
if self.data_param:
return self.processor(calldata, **callargs)
else:
return self.processor(**callargs)
infer()
This method, called by the AICore, is responsible for processing the data and parameters request recieved by AICore. Infer takes a data
parameter which contains the contents of the data key in the request body. Additionally an optional list of files that were send with the request - these are currently ignored - and finally the contents of the kwargs key in the request body.
AICoreModule.infer
AICoreModule.infer (data:dict, *_, **kwargs)
Infer the data using the processor function.
Exported source
@patch
def infer(self:AICoreModule, data:dict, *_, **kwargs):
"""Infer the data using the processor function."""
=[
msgf"Startup time: {self.init_time.isoformat()}",
f"Corebridge version: {self.aicorebridge_version}",
]
try:
= time.perf_counter_ns()
t00 "data"] = data
kwargs[+=[
msgf"{self.processor.__name__}({self.processor_signature})",
f"Data: {type(data)} length: {len(data)}",
f"kwargs {list(kwargs.keys())}",
#f"init_args: {self.init_args}, init_kwargs: {self.init_kwargs}",
]
# Pickup params, pop those that are not intended for the processor
= kwargs.pop('lastSeen', False)
lastSeen = kwargs.pop('format', "records").lower()
recordformat = kwargs.get('timezone', 'UTC')
timezone f"lastSeen: {lastSeen}, recordformat: {recordformat}, timezone: {timezone}")
msg.append(
= kwargs.pop('samplerPeriod', self.init_kwargs.get('samplerPeriod','h'))
samplerPeriod = kwargs.pop('samplerMethod', self.init_kwargs.get('samplerMethod',None))
samplerMethod reversed = kwargs.pop('reversed', False)
= self.get_call_data(
calldata
data, =recordformat,
recordformat=timezone)
timezone
= build_historic_args(calldata, kwargs.pop('history', {}))
history = self.get_callargs(kwargs, history)
callargs
# for arg, val in callargs.items():
# msg.append(f"{arg}: {val}")
= time.perf_counter_ns()
t02 = self.call_processor(
calculated_result
calldata, **callargs
)= time.perf_counter_ns()
t03 f"Processing time: {(t03-t02)/1e6:.1f} ms")
msg.append(f"Preparation time: {(t02-t00)/1e6:.1f} ms")
msg.append(
if isinstance(calculated_result, dict):
f"return-data ictionary keys: {calculated_result.keys()}")
msg.append(return {
'msg':msg,
'data': [calculated_result]
}elif isinstance(calculated_result, list):
f"return-data list length: {len(calculated_result)}")
msg.append(return {
'msg':msg,
'data': calculated_result
}
try:
= timeseries_dataframe(
result
calculated_result, =timezone)
timezone
f"result shape: {result.shape}")
msg.append(
if samplerMethod:
f"Sampler: {samplerMethod}, period: {samplerPeriod}")
msg.append(= timeseries_dataframe_resample(result, samplerPeriod, samplerMethod)
result
f"return-data shape: {result.shape}")
msg.append(
if reversed:
= result[::-1]
result
return {
'msg':msg,
'data': timeseries_dataframe_to_datadict(
if not lastSeen else result[-1:],
result =recordformat,
recordformat=timezone,
timezone=True)
popNaN
}
# tries dataframe return
except Exception as err:
f"No timeseries data, error={err}")
msg.append(
= pd.DataFrame(calculated_result)
df
df= [f"value_{str(c)}" if isinstance(c, int) else str(c) for c in list(df.columns)]
df.columns ='records')
df.reset_index().to_dict(orientreturn {
'msg':msg,
'data': df.reset_index().to_dict(orient='records')
}
# function try-catch
except Exception as err:
''.join(traceback.format_exception(None, err, err.__traceback__)))
msg.append(return {
'msg': msg,
'data': []
}
get_callargs
Exported source
# Specialized types for initializing annotated parameters
# Add types by adding a tuple with the type name and a builder function
= {
annotated_arg_builders str(B[0]):B[1] for B in [
lambda X: numpy.array(X, dtype=X.dtype))
(numpy.ndarray,
] }
annotated_arg_builders
{"<class 'numpy.ndarray'>": <function __main__.<lambda>(X)>}
AICoreModule.init_annotated_param
AICoreModule.init_annotated_param (param_name, value)
*Initialize argument for the processor call
param_name: name of the parameter to be initialized value: value of the parameter read from infer data to be used for initialization*
Exported source
@patch
def init_annotated_param(self:AICoreModule, param_name, value):
"""
Initialize argument for the processor call
param_name: name of the parameter to be initialized
value: value of the parameter read from infer data to be used for initialization
"""
= self.processor_signature.parameters[param_name].annotation
annotation #print(f"param_name: {param_name}, value: {value}, annotation: {annotation}")
# try to convert value to one of the types in the builders of annotated_arg_builders
for T in typing.get_args(annotation):
try:
= annotated_arg_builders.get(str(T), lambda X:T(X))
builder return builder(value)
except TypeError as err:
continue
try:
return annotation(value)
except TypeError as err:
f"Exception {str(err)} in fallback conversion to {annotation} of {type(value)}") syslog.exception(
AICoreModule.get_callargs
AICoreModule.get_callargs (kwargs, history)
Get arguments for the processor call
Exported source
@patch
def get_callargs(self:AICoreModule, kwargs, history):
"Get arguments for the processor call"
# Remove null / None values
= {k:v for k,v in kwargs.items() if v is not None}
kwargs
= {
call_args self.init_annotated_param(
K:
K,
history.get(
K,
kwargs.get(
K,self.init_kwargs.get(
K,
history.get(
snake_case_to_camel_case(K),
kwargs.get(
snake_case_to_camel_case(K),self.init_kwargs.get(
snake_case_to_camel_case(K), self.processor_signature.parameters[K].default
)
)
)
)
)
)
)for K in self.call_params
}
return call_args
def processor_function(data:pd.DataFrame, just_a_number:float|np.ndarray):
return just_a_number * data
= AICoreModule(processor_function, os.path.join(os.getcwd(), 'cache'), os.path.join(os.getcwd(), 'cache'))
test_module assert 'just_a_number' in test_module.get_callargs(
{'justANumber': 2
},
{}
"get_callargs failed to translate camel-case processor argument to snake-case kwargs argument" ),
INFO 32084 corebridge.core core.py 343 Init AICoreModule, version 0.4.4, assets dir /home/fenke/repos/corebridge/nbs/cache, save dir /home/fenke/repos/corebridge/nbs/cache
get_call_data
AICoreModule.get_call_data
AICoreModule.get_call_data (data:dict|list, recordformat='records', timezone='UTC')
Convert data to the processor signature
Exported source
@patch
def get_call_data(
self:AICoreModule,
dict|list,
data:='records',
recordformat='UTC'):
timezone
"Convert data to the processor signature"
if not self.data_param:
return None
= set_time_index_zone(timeseries_dataframe_from_datadict(
df 'datetimeMeasure', 'time'], recordformat), timezone)
data, [
=True)
df.sort_index(inplace
if self.processor_params[self.data_param].annotation == pd.DataFrame:
return df
elif len(df.columns) > 1:
= (df.index - datetime.datetime(1970,1,1, tzinfo=datetime.timezone.utc)) / datetime.timedelta(seconds=1)
df.index return df.to_records(index=True)
else:
= (df.index - datetime.datetime(1970,1,1, tzinfo=datetime.timezone.utc)) / datetime.timedelta(seconds=1)
df.index return df.reset_index().to_numpy()
test_data
value | |
---|---|
time | |
2023-05-04 10:04:49+00:00 | 16.72 |
2023-05-04 10:24:51+00:00 | 16.65 |
2023-05-04 10:44:53+00:00 | 16.55 |
timeseries_dataframe_to_datadict(test_data)
[{'time': '2023-05-04T10:04:49Z', 'value': 16.72},
{'time': '2023-05-04T10:24:51Z', 'value': 16.65},
{'time': '2023-05-04T10:44:53Z', 'value': 16.55}]
= test_module.get_call_data(timeseries_dataframe_to_datadict(test_data))
calldata calldata
value | |
---|---|
time | |
2023-05-04 10:04:49+00:00 | 16.72 |
2023-05-04 10:24:51+00:00 | 16.65 |
2023-05-04 10:44:53+00:00 | 16.55 |
= build_historic_args(calldata,history_arg)
history history
{'justANumber': array([2., 2., 2.])}
calldata
value | |
---|---|
time | |
2023-05-04 10:04:49+00:00 | 16.72 |
2023-05-04 10:24:51+00:00 | 16.65 |
2023-05-04 10:44:53+00:00 | 16.55 |
print(test_module.get_callargs(calldata, history))
{'just_a_number': array([2., 2., 2.])}
'justANumber']) np.array(history[
array([2., 2., 2.])
history
{'justANumber': array([2., 2., 2.])}
test_module.init_annotated_param('just_a_number',
12.34
)
12.34
'just_a_number'].annotation test_module.processor_signature.parameters[
float | numpy.ndarray
'justANumber']) np.array(history[
array([2., 2., 2.])
str(numpy.ndarray)](history['justANumber']) annotated_arg_builders[
array([2., 2., 2.])
assert True, 'stop'
Tests
import os, pandas as pd, numpy as np
def test_function(data:pd.DataFrame, anumber:float|np.ndarray=0):
return data * anumber
def test_simple_function(anumber:float, another:float):
return [another * anumber]
class TestAICoreModule(AICoreModule):
def __init__(self, save_dir, *args, **kwargs):
super().__init__(test_function, save_dir, None, *args, **kwargs)
class SimpleAICoreModule(AICoreModule):
def __init__(self, save_dir, *args, **kwargs):
super().__init__(test_simple_function, save_dir, None, *args, **kwargs)
= os.path.join(os.getcwd(), 'cache')
save_dir = TestAICoreModule(os.path.join(os.getcwd(), 'cache'), 1, 2, num_1=3, num_2=4)
test_module
assert test_module.init_args == (1, 2)
assert test_module.init_kwargs['num_1'] == 3
assert test_module.init_kwargs['num_2'] == 4
assert test_module.init_kwargs['save_dir'] == save_dir
INFO 32084 corebridge.core core.py 343 Init TestAICoreModule, version 0.4.4, assets dir None, save dir /home/fenke/repos/corebridge/nbs/cache
= [
test_data dict(datetimeMeasure='2020-04-01T00:01:11.123Z', value=1.1),
dict(datetimeMeasure='2020-04-02T00:20:00Z', value=2.3),
]= test_module.infer(test_data, timezone='Europe/Amsterdam', anumber=2)
result print("Test Data\n", json.dumps(test_data, indent=2))
print("Result Message\n", json.dumps(result['msg'], indent=2))
print("Result Data\n", json.dumps(result['data'], indent=2))
Test Data
[
{
"datetimeMeasure": "2020-04-01T00:01:11.123Z",
"value": 1.1
},
{
"datetimeMeasure": "2020-04-02T00:20:00Z",
"value": 2.3
}
]
Result Message
[
"Startup time: 2025-06-13T13:00:51.564529+00:00",
"Corebridge version: 0.4.4",
"test_function((data: pandas.core.frame.DataFrame, anumber: float | numpy.ndarray = 0))",
"Data: <class 'list'> length: 2",
"kwargs ['timezone', 'anumber', 'data']",
"lastSeen: False, recordformat: records, timezone: Europe/Amsterdam",
"Processing time: 0.2 ms",
"Preparation time: 19.7 ms",
"result shape: (2, 1)",
"return-data shape: (2, 1)"
]
Result Data
[
{
"time": "2020-04-01T02:01:11.123+02:00",
"value": 2.2
},
{
"time": "2020-04-02T02:20:00.000+02:00",
"value": 4.6
}
]
'data'].annotation test_module.processor_signature.parameters[
pandas.core.frame.DataFrame
= test_module.processor_signature.parameters['anumber'].annotation
annotation print(typing.get_args(annotation))
(<class 'float'>, <class 'numpy.ndarray'>)
for T in typing.get_args(annotation):
print(T(0))
0.0
[]
Simple module
= SimpleAICoreModule(save_dir, 1, 2, num_1=3, num_2=4)
simple_module
assert simple_module.init_args == (1, 2)
assert simple_module.init_kwargs['num_1'] == 3
assert simple_module.init_kwargs['num_2'] == 4
assert simple_module.init_kwargs['save_dir'] == save_dir
INFO 32084 corebridge.core core.py 343 Init SimpleAICoreModule, version 0.4.4, assets dir None, save dir /home/fenke/repos/corebridge/nbs/cache
not simple_module.data_param
True
simple_module.call_params
['anumber', 'another']
= simple_module.infer([], timezone='Europe/Amsterdam', anumber=2, another=11) result
print("Result Message\n", json.dumps(result['msg'], indent=2))
print("Result Data\n", json.dumps(result['data'], indent=2))
Result Message
[
"Startup time: 2025-06-13T13:00:51.632266+00:00",
"Corebridge version: 0.4.4",
"test_simple_function((anumber: float, another: float))",
"Data: <class 'list'> length: 0",
"kwargs ['timezone', 'anumber', 'another', 'data']",
"lastSeen: False, recordformat: records, timezone: Europe/Amsterdam",
"Processing time: 0.0 ms",
"Preparation time: 0.1 ms",
"return-data list length: 1"
]
Result Data
[
22.0
]
Tests with library module
import corebridge.core
from corebridge.aicorebridge import AICoreModule
DEBUG 32084 corebridge.aicorebridge aicorebridge.py 31 Loading corebridge.aicorebridge 0.4.4 from /home/fenke/repos/corebridge/corebridge/aicorebridge.py
class TestAICoreModule(AICoreModule):
def __init__(self, save_dir, *args, **kwargs):
super().__init__(test_function, save_dir, None, *args, **kwargs)
= TestAICoreModule(os.path.join(os.getcwd(), 'cache'), 1, 2, num_1=3, num_2=4)
test_module
assert test_module.init_args == (1, 2)
assert test_module.init_kwargs['num_1'] == 3
assert test_module.init_kwargs['num_2'] == 4
assert test_module.init_kwargs['save_dir'] == save_dir
INFO 32084 corebridge.core core.py 343 Init TestAICoreModule, version 0.4.4, assets dir None, save dir /home/fenke/repos/corebridge/nbs/cache
= [
test_data dict(datetimeMeasure='2020-04-01T00:01:11.123Z', value=1.1),
dict(datetimeMeasure='2020-04-02T00:20:00Z', value=2.3),
]= test_module.infer(test_data, timezone='Europe/Amsterdam', anumber=2)
result print("Test Data\n", json.dumps(test_data, indent=2))
print("Result Message\n", json.dumps(result['msg'], indent=2))
print("Result Data\n", json.dumps(result['data'], indent=2))
Test Data
[
{
"datetimeMeasure": "2020-04-01T00:01:11.123Z",
"value": 1.1
},
{
"datetimeMeasure": "2020-04-02T00:20:00Z",
"value": 2.3
}
]
Result Message
[
"Startup time: 2025-06-13T13:00:51.711594+00:00",
"Corebridge version: 0.4.4",
"test_function((data: pandas.core.frame.DataFrame, anumber: float | numpy.ndarray = 0))",
"Data: <class 'list'> length: 2",
"kwargs ['timezone', 'anumber', 'data']",
"lastSeen: False, recordformat: records, timezone: Europe/Amsterdam",
"Processing time: 0.1 ms",
"Preparation time: 2.3 ms",
"result shape: (2, 1)",
"return-data shape: (2, 1)"
]
Result Data
[
{
"time": "2020-04-01T02:01:11.123+02:00",
"value": 2.2
},
{
"time": "2020-04-02T02:20:00.000+02:00",
"value": 4.6
}
]
test_module.__dict__
{'init_time': datetime.datetime(2025, 6, 13, 13, 0, 51, 711594, tzinfo=datetime.timezone.utc),
'aicorebridge_version': '0.4.4',
'init_args': (1, 2),
'init_kwargs': {'num_1': 3,
'num_2': 4,
'assets_dir': None,
'save_dir': '/home/fenke/repos/corebridge/nbs/cache'},
'processor': <function __main__.test_function(data: pandas.core.frame.DataFrame, anumber: float | numpy.ndarray = 0)>,
'processor_signature': <Signature (data: pandas.core.frame.DataFrame, anumber: float | numpy.ndarray = 0)>,
'processor_params': {'data': <Parameter "data: pandas.core.frame.DataFrame">,
'anumber': <Parameter "anumber: float | numpy.ndarray = 0">},
'return_param': None,
'data_param': 'data',
'call_params': ['anumber']}
class TestAICoreModule(AICoreModule):
def __init__(self, save_dir, *args, **kwargs):
super().__init__(test_function, save_dir, None, *args, **kwargs)
= os.path.join(os.getcwd(), 'cache')
save_dir = TestAICoreModule(os.path.join(os.getcwd(), 'cache'), 1, 2, num_1=3, num_2=4)
test_module
assert test_module.init_args == (1, 2)
assert test_module.init_kwargs['num_1'] == 3
assert test_module.init_kwargs['num_2'] == 4
assert test_module.init_kwargs['save_dir'] == save_dir
= [
test_data dict(datetimeMeasure='2020-04-01T00:01:11.123Z', value=1.1),
dict(datetimeMeasure='2020-04-02T00:20:00Z', value=2.3),
]
INFO 32084 corebridge.core core.py 343 Init TestAICoreModule, version 0.4.4, assets dir None, save dir /home/fenke/repos/corebridge/nbs/cache
Test Data
[
{
"datetimeMeasure": "2020-04-01T00:01:11.123Z",
"value": 1.1
},
{
"datetimeMeasure": "2020-04-02T00:20:00Z",
"value": 2.3
}
]
Result Message
[
"Startup time: 2025-06-13T13:00:51.748111+00:00",
"Corebridge version: 0.4.4",
"test_function((data: pandas.core.frame.DataFrame, anumber: float | numpy.ndarray = 0))",
"Data: <class 'list'> length: 2",
"kwargs ['timezone', 'anumber', 'data']",
"lastSeen: False, recordformat: records, timezone: UTC",
"Processing time: 0.2 ms",
"Preparation time: 2.2 ms",
"result shape: (2, 1)",
"return-data shape: (2, 1)"
]
Result Data
[
{
"time": "2020-04-01T00:01:11Z",
"value": 2.2
},
{
"time": "2020-04-02T00:20:00Z",
"value": 4.6
}
]
= test_module.infer(test_data, timezone='UTC', anumber=2) result
print("Test Data\n", json.dumps(test_data, indent=2))
print("Result Message\n", json.dumps(result['msg'], indent=2))
print("Result Data\n", json.dumps(result['data'], indent=2))
Test Data
[
{
"datetimeMeasure": "2020-04-01T00:01:11.123Z",
"value": 1.1
},
{
"datetimeMeasure": "2020-04-02T00:20:00Z",
"value": 2.3
}
]
Result Message
[
"Startup time: 2025-06-13T13:00:51.748111+00:00",
"Corebridge version: 0.4.4",
"test_function((data: pandas.core.frame.DataFrame, anumber: float | numpy.ndarray = 0))",
"Data: <class 'list'> length: 2",
"kwargs ['timezone', 'anumber', 'data']",
"lastSeen: False, recordformat: records, timezone: UTC",
"Processing time: 0.2 ms",
"Preparation time: 2.2 ms",
"result shape: (2, 1)",
"return-data shape: (2, 1)"
]
Result Data
[
{
"time": "2020-04-01T00:01:11Z",
"value": 2.2
},
{
"time": "2020-04-02T00:20:00Z",
"value": 4.6
}
]