AICore-Bridge

Bridge between Stactics AICore framework and Wodan/Conan processor modules

Support functions


source

build_historic_args

 build_historic_args (data:pandas.core.frame.DataFrame, history:dict|list)

Create a timeseries DataFrame from historic data defined in history.

Type Details
data DataFrame The input time-series DataFrame.
history dict | list Historic data definition, each item in the list is a dictionary with a startDate key to set the start of a section of historic data in the result and a column-value pair for each of the columns in the
Returns dict Historic data in dictionary format where keys are column names and values are the historic values as numpy array.
test_data=set_time_index_zone(timeseries_dataframe_from_datadict(
   [
      {
         "time":"2023-05-04T10:04:49",
         "value":16.72
      },
      {
         "time":"2023-05-04T10:44:53",
         "value":16.55
      },
      {
         "time":"2023-05-04T10:24:51",
         "value":16.65
      }
   ], ['datetimeMeasure', 'time'], 'records'), 'UTC').sort_index()
history_arg = [
                dict(anumber=1.0),
                dict(startDate="2023-05-04T10:25:00+00:00", anumber=2.0)
            ]
build_historic_args(test_data,history_arg)
{'anumber': array([1., 1., 2.])}

Class AICoreModule


source

AICoreModule

 AICoreModule (processor:Callable, save_dir:str, assets_dir:str, *args,
               **kwargs)

Initialize self. See help(type(self)) for accurate signature.

Type Details
processor Callable data processing function
save_dir str path where the module can keep files
assets_dir str
args VAR_POSITIONAL
kwargs VAR_KEYWORD
Exported source
class AICoreModule(AICoreModuleBase):
    def __init__(self, 
             processor:typing.Callable, # data processing function
             save_dir:str, # path where the module can keep files 
             assets_dir:str,
             *args, **kwargs):
    
        super().__init__(save_dir, assets_dir, *args, **kwargs)
        self._init_processor(processor)

source

AICoreModule.call_processor

 AICoreModule.call_processor (calldata, **callargs)
Exported source
@patch
def _init_processor(
        self:AICoreModule, 
        processor:typing.Callable):
    """Initializes processor related variables on self"""
    
    self.processor = processor
    self.processor_signature = inspect.signature(self.processor)
    self.processor_params = dict(self.processor_signature.parameters)
    self.return_param = self.processor_params.pop('return', None)
    
    self.data_param, *self.call_params = list(self.processor_params.keys())

    if not (
        self.processor_params[self.data_param].annotation == pd.DataFrame
        or self.processor_params[self.data_param].annotation == np.ndarray

    ):

        self.data_param = None
        self.call_params = list(self.processor_params.keys())
Exported source
# can be overloaded
@patch
def call_processor(self:AICoreModule, calldata, **callargs):
    if self.data_param:
        return self.processor(calldata, **callargs)
    else:
        return self.processor(**callargs)

infer()

This method, called by the AICore, is responsible for processing the data and parameters request recieved by AICore. Infer takes a data parameter which contains the contents of the data key in the request body. Additionally an optional list of files that were send with the request - these are currently ignored - and finally the contents of the kwargs key in the request body.


source

AICoreModule.infer

 AICoreModule.infer (data:dict, *_, **kwargs)
Exported source
@patch
def infer(self:AICoreModule, data:dict, *_, **kwargs):
    try:

        msg=[
            f"Startup time: {self.init_time.isoformat()}",
            f"Corebridge version: {self.aicorebridge_version}",
            f"{self.processor.__name__}({self.processor_signature})",             
            #f"init_args: {self.init_args}, init_kwargs: {self.init_kwargs}",
        ]

        # Pickup params, pop those that are not intended for the processor
        lastSeen = kwargs.pop('lastSeen', False)
        recordformat = kwargs.pop('format', "records").lower()
        timezone = kwargs.get('timezone', 'UTC')
        msg.append(f"lastSeen: {lastSeen}, recordformat: {recordformat}, timezone: {timezone}")

        samplerPeriod = kwargs.pop('samplerPeriod', self.init_kwargs.get('samplerPeriod','h'))
        samplerMethod = kwargs.pop('samplerMethod', self.init_kwargs.get('samplerMethod',None))
        reversed = kwargs.pop('reversed', False)

        calldata = self.get_call_data(
            data, 
            recordformat=recordformat,
            timezone=timezone)
        
        #msg.append(f"calldata shape: {calldata.shape}")

        history = build_historic_args(calldata, kwargs.pop('history', {}))

        callargs = self.get_callargs(kwargs, history)

        for arg, val in callargs.items():
            msg.append(f"{arg}: {val}")
        
        calculated_result = self.call_processor(
            calldata, 
            **callargs
        )

        try:
            result = timeseries_dataframe(
                calculated_result, 
                timezone=timezone)
            
            msg.append(f"result shape: {result.shape}")

            if samplerMethod:
                msg.append(f"Sampler: {samplerMethod}, period: {samplerPeriod}")
                result = timeseries_dataframe_resample(result, samplerPeriod, samplerMethod)

            msg.append(f"return-data shape: {result.shape}")

            if reversed:
                result = result[::-1]

            return {
                'msg':msg,
                'data': timeseries_dataframe_to_datadict(
                    result if not lastSeen else result[-1:],
                    recordformat=recordformat,
                    timezone=timezone,
                    popNaN=True)
            }
        
        # tries dataframe return
        except Exception as err:
            msg.append(f"No timeseries data, error={err}")
        
        df = pd.DataFrame(calculated_result)
        df
        df.columns = [f"value_{str(c)}" if isinstance(c, int) else str(c) for c in list(df.columns)]
        df.reset_index().to_dict(orient='records')
        return {
            'msg':msg,
            'data': df.reset_index().to_dict(orient='records')
        }

    
    # function try-catch
    except Exception as err:
        msg.append(''.join(traceback.format_exception(None, err, err.__traceback__)))
        syslog.exception(f"Exception {str(err)} in infer()")
        return {
            'msg': f"Unexpected {err=}, {type(err)=}",
            'data': []
        }

get_callargs


source

AICoreModule.init_annotated_param

 AICoreModule.init_annotated_param (param_name, value)

*Initialize argument for the processor call

param_name: name of the parameter to be initialized value: value of the parameter read from infer data to be used for initialization*

Exported source
# Specialized types for initializing annotated parameters
# Add types by adding a tuple with the type name and a builder function
annotated_arg_builders = {
    str(B[0]):B[1] for B in [
        (numpy.ndarray, lambda X: numpy.array(X, dtype=X.dtype))
    ]
}
Exported source
@patch
def init_annotated_param(self:AICoreModule, param_name, value):
    """
    Initialize argument for the processor call
    
    param_name: name of the parameter to be initialized
    value: value of the parameter read from infer data to be used for initialization
    
    """

    annotation = self.processor_signature.parameters[param_name].annotation

    # try to convert value to one of the types in the annotation
    for T in typing.get_args(annotation):
        try:
            builder = annotated_arg_builders.get(str(T), T)
            return builder(value)
        except TypeError as err:
            continue
    try:
        return annotation(value)
    except TypeError as err:
        syslog.exception(f"Exception {str(err)} in fallback conversion to {annotation} of {type(value)}")

source

AICoreModule.get_callargs

 AICoreModule.get_callargs (kwargs, history)

Get arguments for the processor call

Exported source
@patch
def get_callargs(self:AICoreModule, kwargs, history):
    "Get arguments for the processor call"

    # Remove null / None values
    kwargs = {k:v for k,v in kwargs.items() if v is not None}
    
    return {
        K:self.init_annotated_param(
            K,
            history.get(
                K,
                self.init_kwargs.get(
                    K,
                    kwargs.get(
                        K, 
                        self.processor_signature.parameters[K].default
                    )
                )
            )
        )
        for K in self.call_params
    }
def processor_function(data:pd.DataFrame, anumber:float|np.ndarray):
    return anumber * data

test_module = AICoreModule(processor_function, os.path.join(os.getcwd(), 'cache'), os.path.join(os.getcwd(), 'cache'))

get_call_data


source

AICoreModule.get_call_data

 AICoreModule.get_call_data (data:dict, recordformat='records',
                             timezone='UTC')

Convert data to the processor signature

Exported source
@patch
def get_call_data(
        self:AICoreModule, 
        data:dict, 
        recordformat='records', 
        timezone='UTC'):
    
    "Convert data to the processor signature"
    
    if not self.data_param:
        return None

    df = set_time_index_zone(timeseries_dataframe_from_datadict(
        data, ['datetimeMeasure', 'time'], recordformat), timezone)

    df.sort_index(inplace=True)

    if self.processor_params[self.data_param].annotation == pd.DataFrame:
        return df
    elif len(df.columns) > 1:
        df.index = (df.index - datetime.datetime(1970,1,1, tzinfo=datetime.timezone.utc)) / datetime.timedelta(seconds=1)
        return df.to_records(index=True)
    else:
        df.index = (df.index - datetime.datetime(1970,1,1, tzinfo=datetime.timezone.utc)) / datetime.timedelta(seconds=1)
        return df.reset_index().to_numpy()
calldata = test_module.get_call_data(test_data.reset_index().to_records())
calldata
index value
time
2023-05-04 10:04:49+00:00 0 16.72
2023-05-04 10:24:51+00:00 1 16.65
2023-05-04 10:44:53+00:00 2 16.55
history = build_historic_args(calldata,history_arg)
history
{'anumber': array([2., 2., 2.])}
print(test_module.get_callargs(calldata, history))
{'anumber': array([2., 2., 2.])}
np.array(history['anumber'])
array([2., 2., 2.])
test_module.init_annotated_param(
    'anumber',
    history['anumber']
)
array([2., 2., 2.])
test_module.init_annotated_param(
    'anumber',
    1.5
)
1.5
test_module.processor_signature.parameters['anumber'].annotation
float | numpy.ndarray
np.array(history['anumber'])
array([2., 2., 2.])
annotated_arg_builders[str(numpy.ndarray)](history['anumber'])
array([2., 2., 2.])
assert True, 'stop'

Tests

import os, addroot, pandas as pd, numpy as np
def test_function(data:pd.DataFrame, anumber:float|np.ndarray=0):
    return data * anumber
def test_simple_function(anumber:float, another:float):
    return [another * anumber]
class TestAICoreModule(AICoreModule):
    def __init__(self, save_dir, *args, **kwargs):
        super().__init__(test_function, save_dir, None, *args, **kwargs)

<<<<<<< HEAD

class SimpleAICoreModule(AICoreModule):
    def __init__(self, save_dir, *args, **kwargs):
        super().__init__(test_simple_function, save_dir, None, *args, **kwargs)
save_dir = os.path.join(os.getcwd(), 'cache')
test_module = TestAICoreModule(os.path.join(os.getcwd(), 'cache'), 1, 2, num_1=3, num_2=4)

assert test_module.init_args == (1, 2)
assert test_module.init_kwargs['num_1'] == 3
assert test_module.init_kwargs['num_2'] == 4
assert test_module.init_kwargs['save_dir'] == save_dir
test_data = [
    dict(datetimeMeasure='2020-04-01T00:01:11.123Z', value=1.1),
    dict(datetimeMeasure='2020-04-02T00:20:00Z', value=2.3),
]
result = test_module.infer(test_data, timezone='Europe/Amsterdam', anumber=2)
print("Test Data\n", json.dumps(test_data, indent=2))
print("Result Message\n", json.dumps(result['msg'], indent=2))
print("Result Data\n", json.dumps(result['data'], indent=2))
Normalized, Europe/Amsterdam
Test Data
 [
  {
    "datetimeMeasure": "2020-04-01T00:01:11.123Z",
    "value": 1.1
  },
  {
    "datetimeMeasure": "2020-04-02T00:20:00Z",
    "value": 2.3
  }
]
Result Message
 [
  "Startup time: 2025-02-07T12:51:34.319178+00:00",
  "Corebridge version: 0.4.0",
  "test_function((data: pandas.core.frame.DataFrame, anumber: float | numpy.ndarray = 0))",
  "lastSeen: False, recordformat: records, timezone: Europe/Amsterdam",
  "anumber: 2.0",
  "result shape: (2, 1)",
  "return-data shape: (2, 1)"
]
Result Data
 [
  {
    "time": "2020-04-01T02:01:11.123+02:00",
    "value": 2.2
  },
  {
    "time": "2020-04-02T02:20:00.000+02:00",
    "value": 4.6
  }
]
test_module.processor_signature.parameters['data'].annotation
pandas.core.frame.DataFrame
annotation = test_module.processor_signature.parameters['anumber'].annotation
print(typing.get_args(annotation))
(<class 'float'>, <class 'numpy.ndarray'>)
for T in typing.get_args(annotation):
    print(T(0))
0.0
[]

Simple module

simple_module = SimpleAICoreModule(save_dir, 1, 2, num_1=3, num_2=4)

assert simple_module.init_args == (1, 2)
assert simple_module.init_kwargs['num_1'] == 3
assert simple_module.init_kwargs['num_2'] == 4
assert simple_module.init_kwargs['save_dir'] == save_dir
not simple_module.data_param
True
simple_module.call_params
['anumber', 'another']
result = simple_module.infer([], timezone='Europe/Amsterdam', anumber=2, another=11)
print("Result Message\n", json.dumps(result['msg'], indent=2))
print("Result Data\n", json.dumps(result['data'], indent=2))
Result Message
 [
  "Startup time: 2025-02-07T12:51:34.381787+00:00",
  "Corebridge version: 0.4.0",
  "test_simple_function((anumber: float, another: float))",
  "lastSeen: False, recordformat: records, timezone: Europe/Amsterdam",
  "anumber: 2.0",
  "another: 11.0",
  "No timeseries data, error='list' object has no attribute 'dtype'"
]
Result Data
 [
  {
    "index": 0,
    "value_0": 22.0
  }
]

Tests with library module

import corebridge.core
from corebridge.aicorebridge import AICoreModule
class TestAICoreModule(AICoreModule):
    def __init__(self, save_dir, *args, **kwargs):
        super().__init__(test_function, save_dir, None, *args, **kwargs)
        
test_module = TestAICoreModule(os.path.join(os.getcwd(), 'cache'), 1, 2, num_1=3, num_2=4)

assert test_module.init_args == (1, 2)
assert test_module.init_kwargs['num_1'] == 3
assert test_module.init_kwargs['num_2'] == 4
assert test_module.init_kwargs['save_dir'] == save_dir
test_data = [
    dict(datetimeMeasure='2020-04-01T00:01:11.123Z', value=1.1),
    dict(datetimeMeasure='2020-04-02T00:20:00Z', value=2.3),
]
result = test_module.infer(test_data, timezone='Europe/Amsterdam', anumber=2)
print("Test Data\n", json.dumps(test_data, indent=2))
print("Result Message\n", json.dumps(result['msg'], indent=2))
print("Result Data\n", json.dumps(result['data'], indent=2))
Normalized, Europe/Amsterdam
Test Data
 [
  {
    "datetimeMeasure": "2020-04-01T00:01:11.123Z",
    "value": 1.1
  },
  {
    "datetimeMeasure": "2020-04-02T00:20:00Z",
    "value": 2.3
  }
]
Result Message
 [
  "Startup time: 2025-02-07T12:51:34.421358+00:00",
  "Corebridge version: 0.4.0",
  "test_function((data: pandas.core.frame.DataFrame, anumber: float | numpy.ndarray = 0))",
  "init_args: (1, 2), init_kwargs: {'num_1': 3, 'num_2': 4, 'assets_dir': None, 'save_dir': '/home/fenke/repos/corebridge/nbs/cache'}",
  "lastSeen: False, recordformat: records, timezone: Europe/Amsterdam",
  "anumber: 2.0",
  "result shape: (2, 1)",
  "return-data shape: (2, 1)"
]
Result Data
 [
  {
    "time": "2020-04-01T02:01:11.123+02:00",
    "value": 2.2
  },
  {
    "time": "2020-04-02T02:20:00.000+02:00",
    "value": 4.6
  }
]
test_module.__dict__
{'init_time': datetime.datetime(2025, 2, 7, 12, 51, 34, 421358, tzinfo=datetime.timezone.utc),
 'aicorebridge_version': '0.4.0',
 'init_args': (1, 2),
 'init_kwargs': {'num_1': 3,
  'num_2': 4,
  'assets_dir': None,
  'save_dir': '/home/fenke/repos/corebridge/nbs/cache'},
 'processor': <function __main__.test_function(data: pandas.core.frame.DataFrame, anumber: float | numpy.ndarray = 0)>,
 'processor_signature': <Signature (data: pandas.core.frame.DataFrame, anumber: float | numpy.ndarray = 0)>,
 'processor_params': {'data': <Parameter "data: pandas.core.frame.DataFrame">,
  'anumber': <Parameter "anumber: float | numpy.ndarray = 0">},
 'return_param': None,
 'data_param': 'data',
 'call_params': ['anumber']}
class TestAICoreModule(AICoreModule):
    def __init__(self, save_dir, *args, **kwargs):
        super().__init__(test_function, save_dir, None, *args, **kwargs)

save_dir = os.path.join(os.getcwd(), 'cache')
test_module = TestAICoreModule(os.path.join(os.getcwd(), 'cache'), 1, 2, num_1=3, num_2=4)

assert test_module.init_args == (1, 2)
assert test_module.init_kwargs['num_1'] == 3
assert test_module.init_kwargs['num_2'] == 4
assert test_module.init_kwargs['save_dir'] == save_dir

test_data = [
    dict(datetimeMeasure='2020-04-01T00:01:11.123Z', value=1.1),
    dict(datetimeMeasure='2020-04-02T00:20:00Z', value=2.3),
]
result = test_module.infer(test_data, timezone='UTC', anumber=2)
print("Test Data\n", json.dumps(test_data, indent=2))
print("Result Message\n", json.dumps(result['msg'], indent=2))
print("Result Data\n", json.dumps(result['data'], indent=2))
Normalized, UTC
Test Data
 [
  {
    "datetimeMeasure": "2020-04-01T00:01:11.123Z",
    "value": 1.1
  },
  {
    "datetimeMeasure": "2020-04-02T00:20:00Z",
    "value": 2.3
  }
]
Result Message
 [
  "Startup time: 2025-02-07T12:51:34.443805+00:00",
  "Corebridge version: 0.4.0",
  "test_function((data: pandas.core.frame.DataFrame, anumber: float | numpy.ndarray = 0))",
  "init_args: (1, 2), init_kwargs: {'num_1': 3, 'num_2': 4, 'assets_dir': None, 'save_dir': '/home/fenke/repos/corebridge/nbs/cache'}",
  "lastSeen: False, recordformat: records, timezone: UTC",
  "anumber: 2.0",
  "result shape: (2, 1)",
  "return-data shape: (2, 1)"
]
Result Data
 [
  {
    "time": "2020-04-01T00:01:11Z",
    "value": 2.2
  },
  {
    "time": "2020-04-02T00:20:00Z",
    "value": 4.6
  }
]

References