RScript-Bridge

Bridge between Stactics AICore framework and RScript prediction scripts

Some things to set up first

Notebooks use nbdev thingses and addroot makes importing from the repo-directory more convenient.

from nbdev.showdoc import *
import logging
#import addroot
Exported source
import os, logging, json, hashlib
import typing,fcntl, subprocess
import traceback
import pandas as pd, numpy as np, rdata

from functools import reduce

from collections import namedtuple
from fastcore.basics import patch_to, patch

import corebridge
from corebridge.core import *
syslog = init_console_logging(__name__, logging.DEBUG, timestamp=False)

AICore uses an assets dir from which we can read files, like scripts and a save dir were modules can write and read files.

assets_dir = os.path.join(os.path.abspath(os.getcwd()), '..', 'corebridge', 'assets', 'rscript')
save_dir = os.path.join(os.path.abspath(os.getcwd()), '..', 'corebridge', 'saves', 'rscript')

source

get_save_path

 get_save_path (datafile_name:str, save_dir:str)
Exported source
def get_asset_path(script_name, assets_dir:str): 
    return os.path.join(assets_dir, script_name)
def get_rscript_libpath(save_dir:str):
    return os.path.join(save_dir, 'libs')
def get_save_path(datafile_name:str, save_dir:str): 
    return os.path.join(save_dir, datafile_name)

source

get_rscript_libpath

 get_rscript_libpath (save_dir:str)

source

get_asset_path

 get_asset_path (script_name, assets_dir:str)

Running R code

Scripts written in R can be run from a Python program using subprocess and Rscript.

Rscript

A script can be run from the commandline with

Rscript ascript.R

subproces

Python’s subprocessmodule has the tools to execute external programs like Rscript

subprocess.run(['Rscript',get_asset_path('hello.R', assets_dir)], capture_output=True).stdout.decode('UTF-8')
'[1] "hello world"\n'

Example: sapflow prediction scripts

Data_preparation.R

Libraries

  • lubridate
  • stringer
  • zoo

Input

  • Data/Meta_data.csv
  • Data/Sapflux_Tilia_train.csv
  • Data/Weather_Tilia_train.csv
  • Data/Weather_Tilia_pred.csv

Output

  • Modelling_data.RData
  • Prediction_data.RData
data_file_flow['Data_preparation.R'] = {
      "in": [
         "Data/Meta_data.csv",
         "Data/Sapflux_Tilia_train.csv",
         "Data/Weather_Tilia_train.csv",
         "Data/Weather_Tilia_pred.csv"
      ],
      "out": [
         "Modelling_data.RData",
         "Prediction_data.RData"
      ],
    'libs':['lubridate', 'stringr', 'zoo']
}
data_file_flow['Data_preparation.R'] =  {
  "in": {
    "Meta_data": "Data/Meta_data.csv",
    "Sapflux_Tilia_train": "Data/Sapflux_Tilia_train.csv",
    "Weather_Tilia_train": "Data/Weather_Tilia_train.csv",
    "Weather_Tilia_pred": "Data/Weather_Tilia_pred.csv"
  },
  "out": {
    "Modelling_data": "Modelling_data.RData",
    "Prediction_data": "Prediction_data.RData"
  },
  "libs": [
    "lubridate",
    "stringr",
    "zoo"
  ]
}

Prediction_part1.R

Libraries

  • lubridate
  • stringr
  • mgcv

Input

  • Modelling_data.RData

Output

  • Fitted_models.RData
  • Weights.RData
data_file_flow['Prediction_part1.R'] = {
      "in": [
         "Modelling_data.RData"
      ],
      "out": [
         "Fitted_models.RData",
         "Weights.RData"
      ],
    'libs':['lubridate', 'stringr', 'mgcv']
}
data_file_flow['Prediction_part1.R'] =  {
  "in": {
    "Modelling_data": "Modelling_data.RData"
  },
  "out": {
    "Fitted_models": "Fitted_models.RData",
    "Weights": "Weights.RData"
  },
  "libs": [
    "lubridate",
    "stringr",
    "mgcv"
  ],

}

Prediction_part2.R

Libraries

  • lubridate
  • stringr
  • mgcv

Input

  • Fitted_models.RData
  • Weights.RData
  • Modelling_data.RData
  • Prediction_data.RData

Output

  • Predicted_sapflux.RData
data_file_flow['Prediction_part2.R'] = {
    "in":[
        'Fitted_models.RData',
        'Weights.RData',
        'Modelling_data.RData',
        'Prediction_data.RData'
    ],
    "out":[
        'Predicted_sapflux.RData'
    ],
    'libs':['lubridate', 'stringr', 'mgcv']
}
data_file_flow['Prediction_part2.R'] =  {
  "in": {
    "Fitted_models": "Fitted_models.RData",
    "Weights": "Weights.RData",
    "Modelling_data": "Modelling_data.RData",
    "Prediction_data": "Prediction_data.RData"
  },
  "out": {
    "Predicted_sapflux": "Predicted_sapflux.RData"
  },
  "libs": [
    "lubridate",
    "stringr",
    "mgcv"
  ],
}

Prediction_part3.R

Libraries

  • lubridate
  • stringr

Input

  • Predicted_sapflux.RData

Output

  • Predicted_water_usage.RData
data_file_flow['Prediction_part3.R'] = {
    'in':['Predicted_sapflux.RData'],
    'out':['Predicted_water_usage.RData'],
    'libs':['lubridate', 'stringr']
}
data_file_flow['Prediction_part3.R'] =  {
  "in": {
    "Predicted_sapflux": "Predicted_sapflux.RData"
  },
  "out": {
    "Predicted_water_usage": "Predicted_water_usage.RData"
  },
  "libs": [
    "lubridate",
    "stringr"
  ],
}
{
    D:F
    for P in data_file_flow.values()
    for D,F in P['in'].items()
}
{'Meta_data': 'Data/Meta_data.csv',
 'Sapflux_Tilia_train': 'Data/Sapflux_Tilia_train.csv',
 'Weather_Tilia_train': 'Data/Weather_Tilia_train.csv',
 'Weather_Tilia_pred': 'Data/Weather_Tilia_pred.csv',
 'Modelling_data': 'Modelling_data.RData',
 'Fitted_models': 'Fitted_models.RData',
 'Weights': 'Weights.RData',
 'Prediction_data': 'Prediction_data.RData',
 'Predicted_sapflux': 'Predicted_sapflux.RData'}
data_files = list(set([
    f
    for P in data_file_flow.values()
    for D,F in P.items()
    if D in ['in','out']
    for f in list(F.values())
]))

display(Markdown(tabulate(
    [
        [F.split('/')[-1]]+[
            'in' if F in P['in'].values() else 'out' if F in P['out'].values() else '--' 
            for S,P in data_file_flow.items()
        ] 
        for F in data_files
    ],
    headers=['data-file / script'] + [I['name'] for I in data_file_flow.values()],
    tablefmt='github'
)))
data-file / script Data_preparation.R Prediction_part1.R Prediction_part2.R Prediction_part3.R
Weather_Tilia_pred.csv in
Meta_data.csv in
Weights.RData out in
Predicted_water_usage.RData out
Prediction_data.RData out in
Weather_Tilia_train.csv in
Modelling_data.RData out in in
Predicted_sapflux.RData out in
Fitted_models.RData out in
Sapflux_Tilia_train.csv in
data_files
['Data/Weather_Tilia_pred.csv',
 'Data/Meta_data.csv',
 'Weights.RData',
 'Predicted_water_usage.RData',
 'Prediction_data.RData',
 'Data/Weather_Tilia_train.csv',
 'Modelling_data.RData',
 'Predicted_sapflux.RData',
 'Fitted_models.RData',
 'Data/Sapflux_Tilia_train.csv']
for k, i in data_file_flow.items():
    print(f"data_file_flow['{i['name']}'] = ", json.dumps(i, indent=2), '\n')
data_file_flow['Data_preparation.R'] =  {
  "in": {
    "Meta_data": "Data/Meta_data.csv",
    "Sapflux_Tilia_train": "Data/Sapflux_Tilia_train.csv",
    "Weather_Tilia_train": "Data/Weather_Tilia_train.csv",
    "Weather_Tilia_pred": "Data/Weather_Tilia_pred.csv"
  },
  "out": {
    "Modelling_data": "Modelling_data.RData",
    "Prediction_data": "Prediction_data.RData"
  },
  "libs": [
    "lubridate",
    "stringr",
    "zoo"
  ],
  "name": "Data_preparation.R"
} 

data_file_flow['Prediction_part1.R'] =  {
  "in": {
    "Modelling_data": "Modelling_data.RData"
  },
  "out": {
    "Fitted_models": "Fitted_models.RData",
    "Weights": "Weights.RData"
  },
  "libs": [
    "lubridate",
    "stringr",
    "mgcv"
  ],
  "name": "Prediction_part1.R"
} 

data_file_flow['Prediction_part2.R'] =  {
  "in": {
    "Fitted_models": "Fitted_models.RData",
    "Weights": "Weights.RData",
    "Modelling_data": "Modelling_data.RData",
    "Prediction_data": "Prediction_data.RData"
  },
  "out": {
    "Predicted_sapflux": "Predicted_sapflux.RData"
  },
  "libs": [
    "lubridate",
    "stringr",
    "mgcv"
  ],
  "name": "Prediction_part2.R"
} 

data_file_flow['Prediction_part3.R'] =  {
  "in": {
    "Predicted_sapflux": "Predicted_sapflux.RData"
  },
  "out": {
    "Predicted_water_usage": "Predicted_water_usage.RData"
  },
  "libs": [
    "lubridate",
    "stringr"
  ],
  "name": "Prediction_part3.R"
} 

Import R libraries

Importing libraries can be done with

Rscript -e 'install.packages("drat", repos="https://cloud.r-project.org")'
print(subprocess.run(['Rscript','--version', ], capture_output=True).stdout.decode('UTF-8'))
Rscript (R) version 4.2.2 (2022-10-31)
rversion = subprocess.run(['Rscript','--version', ], capture_output=True)
print(rversion.stdout.decode('UTF-8'))
Rscript (R) version 4.2.2 (2022-10-31)

User library folder


source

get_rscript_env

 get_rscript_env (libfolder:str)
Exported source
def get_rscript_env(libfolder:str):
    if os.environ.get('R_LIBS_USER'):
        return dict(**os.environ)
    else:
        return dict(**os.environ, R_LIBS_USER=str(libfolder))
assert get_rscript_env(get_rscript_libpath(save_dir)).get('R_LIBS_USER') == get_rscript_libpath(save_dir), 'rscript environment not set as expected'

Used libraries

list(set([L for V in data_file_flow.values() for L in V['libs']]))
['lubridate', 'mgcv', 'zoo', 'stringr']
run_script_result = subprocess.run(['Rscript','-e', "library(lubridate)"], capture_output=True)
print(run_script_result.stderr.decode('UTF-8'), run_script_result.returncode)
Error in library(lubridate) : there is no package called ‘lubridate’
Execution halted
 1
[os.path.exists(os.path.join(get_rscript_libpath(save_dir), L)) for L in list(set([L for V in data_file_flow.values() for L in V['libs']]))]
[True, False, True, True]
[os.path.join(get_rscript_libpath(save_dir), L) for L in list(set([L for V in data_file_flow.values() for L in V['libs']]))]
['/home/fenke/repos/corebridge/nbs/saves/rscript/libs/lubridate',
 '/home/fenke/repos/corebridge/nbs/saves/rscript/libs/mgcv',
 '/home/fenke/repos/corebridge/nbs/saves/rscript/libs/zoo',
 '/home/fenke/repos/corebridge/nbs/saves/rscript/libs/stringr']

source

check_rscript_lib

 check_rscript_lib (lib:str, libfolder:str)

Checks if a R package is installed in libfolder

Type Details
lib str name of the package
libfolder str path to the library folder
Returns bool True if the package is installed, False otherwise
Exported source
def check_rscript_libs(libs:list, libfolder:str):
    """Quick check if for all the R packages in libs a folder exists in libfolder"""
    return all([os.path.exists(os.path.join(libfolder, L)) for L in libs])

def check_rscript_lib(lib:str, libfolder:str) -> bool:
    """Checks if a R package is installed in libfolder

    Parameters
    ----------
    lib : str
        name of the package
    libfolder : str
        path to the library folder

    Returns
    -------
    bool
        True if the package is installed, False otherwise
    """

    run_script_result = subprocess.run(['Rscript','-e', f"library({lib})"], env=get_rscript_env(libfolder), capture_output=True)
    if run_script_result.returncode != 0:
        print('STDERR\n', run_script_result.stderr.decode('UTF-8'))
        print('STDOUT\n', run_script_result.stdout.decode('UTF-8'))
    return run_script_result.returncode == 0

source

check_rscript_libs

 check_rscript_libs (libs:list, libfolder:str)

Quick check if for all the R packages in libs a folder exists in libfolder

check_rscript_libs(list(set([L for V in data_file_flow.values() for L in V['libs']])), get_rscript_libpath(save_dir))
False
check_rscript_lib('mgcv', get_rscript_libpath(save_dir))
True
check_rscript_lib('zoo', get_rscript_libpath(save_dir))
True

Installing libraries


source

install_R_package_wait

 install_R_package_wait (pkg:str|list, workdir:str,
                         repo='https://cloud.r-project.org')

Checks and if neccesary installs an R package

Type Default Details
pkg str | list name(s) of the package(s)
workdir str
repo str https://cloud.r-project.org
install_R_package_wait(['generics', 'timechange', 'rlang'], save_dir)
DEBUG   1975    __main__    1929856308.py   18  Using libfolder /home/fenke/repos/corebridge/nbs/saves/rscript/libs for packages
DEBUG   1975    __main__    1929856308.py   22  Using libfolder /home/fenke/repos/corebridge/nbs/saves/rscript/libs for R_LIBS_USER

Installing package generics, testing attach ...
Attach successful. Library generics appears to have been installed

Installing package timechange, testing attach ...
Attach successful. Library timechange appears to have been installed

Installing package rlang, testing attach ...
Attach successful. Library rlang appears to have been installed
# install_R_package_wait(
#     ['zoo'],
#     libfolder=get_rscript_libpath(save_dir))
# install_R_package_wait(
#     sorted(list(set([L for V in data_file_flow.values() for L in V['libs']]))),
#     libfolder=get_rscript_libpath(save_dir))

Running the scripts

Installing scripts


source

unpack_assets

 unpack_assets (assets_dir:str, save_dir:str)

Unpack the assets folder to the save_dir

Exported source
def unpack_assets(assets_dir:str, save_dir:str):
    """
    Unpack the assets folder to the save_dir
    """
    unpack_result = subprocess.Popen(
        ['unzip', '-un', '-d', save_dir, os.path.join(assets_dir, '*.zip')],
        stdout=subprocess.PIPE,
        stderr=subprocess.PIPE
    )
    return unpack_result
unpack_result = unpack_assets(assets_dir, save_dir)
unpack_result.args
['unzip',
 '-un',
 '-d',
 '/home/fenke/repos/corebridge/nbs/saves/rscript',
 '/home/fenke/repos/corebridge/nbs/assets/rscript/*.zip']
unpack_result.poll()
0
print(unpack_result.stdout.read().decode('UTF-8'))
Archive:  /home/fenke/repos/corebridge/nbs/assets/rscript/SapflowPredictionScripts.zip

Archive:  /home/fenke/repos/corebridge/nbs/assets/rscript/SapflowPredictionData.zip

Checksum calculation

Each script has it’s own set of input files and should be run to update it’s output when either it’s inputs have changed or it’s expected output does not exist.

We can check for filechanges using a hashing algorithm, for instance MD5 or SHA-256. These are available either in Python or from the commandline.

Lets look at the commandline version of MD5, on linux this is md5sum, with the input file for the preparation stage:

print(json.dumps(data_file_flow[list(data_file_flow.keys())[0]]['in'], indent=3))
{
   "Meta_data": "Data/Meta_data.csv",
   "Sapflux_Tilia_train": "Data/Sapflux_Tilia_train.csv",
   "Weather_Tilia_train": "Data/Weather_Tilia_train.csv",
   "Weather_Tilia_pred": "Data/Weather_Tilia_pred.csv"
}

md5sum will output hashes to stdout, which subprocess.run captures for us

flow_object_index = 0
input_files = list(data_file_flow[flow_object_index]['in'].values())

print(json.dumps(input_files, indent=3))
[
   "Data/Meta_data.csv",
   "Data/Sapflux_Tilia_train.csv",
   "Data/Weather_Tilia_train.csv",
   "Data/Weather_Tilia_pred.csv"
]
md5_encode_result = subprocess.run(
    ['md5sum','-b']+
    input_files, 
    cwd=save_dir,
    capture_output=True)
print(md5_encode_result.stdout.decode('UTF-8'))
4bed61a77505bfd52032591d5c3a6050 *Data/Meta_data.csv
6d705d98caa6618a4a990c3742c16564 *Data/Sapflux_Tilia_train.csv
1232592f9488ce4fbb4ae11ba5be0349 *Data/Weather_Tilia_train.csv
366dac1bf64003d1d08fca6121c036bd *Data/Weather_Tilia_pred.csv

If we want to check the files we run it with the -c option and a file with the previously calculated checksums

script_name = data_file_flow[flow_object_index]['name']

checksum_file = get_save_path(f"input-checksum-{script_name.split('.')[0]}", save_dir)
with open(checksum_file, 'wt') as cf:
    cf.write(md5_encode_result.stdout.decode('UTF-8'))
md5_check_result = subprocess.run(
    ['md5sum', '-c', checksum_file], 
    cwd=save_dir,
    capture_output=True)
print(md5_check_result.stdout.decode('UTF-8'))
print(f"Run returned code {md5_check_result.returncode}")
if md5_check_result.returncode:
    print(md5_check_result.stderr.decode('UTF-8'))
Data/Meta_data.csv: OK
Data/Sapflux_Tilia_train.csv: OK
Data/Weather_Tilia_train.csv: OK
Data/Weather_Tilia_pred.csv: OK

Run returned code 0

Had there been a change to a file it would have looked like

md5_check_result = subprocess.run(
    ['md5sum', '-c', checksum_file+'-modified'], 
    cwd=save_dir,
    capture_output=True)
print(md5_check_result.stdout.decode('UTF-8'))
print(f"Run returned code {md5_check_result.returncode}")

Run returned code 1

We don’t really need specifics, only the return code will do for our purpose.

Checking files

Generating names


source

calc_hash_from_data_files

 calc_hash_from_data_files (flow_object:dict, save_dir:str)

Calculate hash from the contents of the input files for a given flow object

Exported source
read_chunk_size = 1024 * 32
def calc_hash_from_flowobject(flow_object:dict)->str:
    '''Calculate a unique hash for a given flow object'''
    return hashlib.md5(repr(flow_object).encode('UTF-8')).hexdigest()

def calc_hash_from_files(files:list, save_dir:str)->str:
    '''Calculate hash from the contents of the input files'''
    hashobj = hashlib.md5()

    # iterate over files 
    for data_file in files:
        full_name = os.path.join(save_dir, data_file)
        if not os.path.isfile(full_name):
            continue
        
        with open(full_name, 'rb') as f:
            # loop till the end of the file
            while True:
                # read only 1024 bytes at a time
                chunk = f.read(read_chunk_size)
                if not chunk:
                    break
                
                hashobj.update(chunk)
        
    return hashobj.hexdigest()

def calc_hash_from_input_files(flow_object:dict, save_dir:str)->str:
    '''Calculate hash from the contents of the input files for a given flow object'''
    return calc_hash_from_files(list(flow_object['in'].values()), save_dir)

def calc_hash_from_data_files(flow_object:dict, save_dir:str)->str:
    '''Calculate hash from the contents of the input files for a given flow object'''
    return calc_hash_from_files(list(flow_object['in'].values()) + list(flow_object['out'].values()), save_dir)

source

calc_hash_from_input_files

 calc_hash_from_input_files (flow_object:dict, save_dir:str)

Calculate hash from the contents of the input files for a given flow object


source

calc_hash_from_files

 calc_hash_from_files (files:list, save_dir:str)

Calculate hash from the contents of the input files


source

calc_hash_from_flowobject

 calc_hash_from_flowobject (flow_object:dict)

Calculate a unique hash for a given flow object

calc_hash_from_flowobject(data_file_flow[flow_object_index])
'5d532037f7dda7e7fad290ddef53f69d'
calc_hash_from_input_files(data_file_flow[flow_object_index], save_dir)
'32095cd16a83a2c63f1ab51a58ed96c9'
calc_hash_from_data_files(data_file_flow[flow_object_index], save_dir)
'65e6d36ac0c8aadb1fb4ee48d4ff88f3'

Inputs


source

check_script_inputs

 check_script_inputs (flow_object:dict, workdir:str)

Check if the input files for a script are up-to-date, returns True if up-to-date.

Exported source
def check_script_inputs(flow_object:dict, workdir:str)->bool:
    """ 
    Check if the input files for a script are up-to-date, returns True if up-to-date.
    """
    checksum_filename = f"input-checksum-{calc_hash_from_flowobject(flow_object)}"
    md5_check_result = subprocess.run(
        ['md5sum', '-c', checksum_filename], 
        cwd=workdir,
        capture_output=True)
    syslog.debug(f"Checksum check result for Flow object: {flow_object['name']}: {md5_check_result.returncode}, checksum file: {checksum_filename}")
    
    return int(md5_check_result.returncode) == 0
check_script_inputs(data_file_flow[1], save_dir)
DEBUG   1975    __main__    4142405122.py   11  Checksum check result for Flow object: Prediction_part1.R: 0, checksum file: input-checksum-33757e2024067c7704d44232a63c870b
True

Outputs

The output is easily checked for existence with isfile.


source

check_script_output

 check_script_output (flow_object:dict, workdir:str)

Check if the output files for a script exist, returns True if they all exist.

Exported source
def check_script_output(flow_object:dict, workdir:str)->bool:
    """ 
    Check if the output files for a script exist, returns True if they all exist.
    """
    files_exist = [
        os.path.isfile(get_save_path(F, workdir)) 
        for F in flow_object['out'].values()
    ]
    syslog.debug(f"Output files for Flow object: {flow_object['name']}: {list(zip(flow_object['out'], files_exist))}")
    return all(files_exist)
check_script_output(data_file_flow[0], save_dir)
DEBUG   1975    __main__    3613995005.py   10  Output files for Flow object: Data_preparation.R: [('Modelling_data', True), ('Prediction_data', True)]
True

Generating the checksum file


source

generate_checksum_file

 generate_checksum_file (flow_object:dict, workdir:str)

Generates the checksum file for a given flow object

Exported source
def generate_checksum_file(flow_object:dict, workdir:str)->bool:
    """Generates the checksum file for a given flow object"""

    input_files = list(flow_object['in'].values())
    md5_encode_result = subprocess.run(
        ['md5sum','-b']+
        input_files, 
        cwd=workdir,
        capture_output=True)
    
    checksum_filename = f"input-checksum-{calc_hash_from_flowobject(flow_object)}"
    syslog.debug(f"Checksum file for Flow object: {flow_object['name']} created return {md5_encode_result.returncode}, checksum file: {checksum_filename}")
    with open(os.path.join(workdir, checksum_filename), 'wt') as cf:
        cf.write(md5_encode_result.stdout.decode('UTF-8'))

    return md5_encode_result.returncode == 0 and check_script_inputs(flow_object, workdir)
generate_checksum_file(data_file_flow[0], save_dir)
DEBUG   1975    __main__    3722079908.py   14  Checksum file for Flow object: Data_preparation.R created return 0, checksum file: input-checksum-5d532037f7dda7e7fad290ddef53f69d
DEBUG   1975    __main__    4142405122.py   11  Checksum check result for Flow object: Data_preparation.R: 0, checksum file: input-checksum-5d532037f7dda7e7fad290ddef53f69d
True

Running once

We don’t need and don’t want’t to run the script if more then once. This is not a problem when a script has finished and updated the checksum file, but we also want to prevent near simultaneous runs in a multiprocessing environment.

We’ll use file locking from fcntl directly

# Use fcntl file locking to prevent multiple processes from running the same code at the same time.
# see https://docs.python.org/3/library/fcntl.html#fcntl.flock

# Create a filename based on input-file contents
lock_file = get_save_path(f"lock-{calc_hash_from_input_files(data_file_flow[0], save_dir)}", save_dir)

Now we use fcntl.flock with flags fcntl.LOCK_EX | fcntl.LOCK_NB to lock the file for exclusive access, while an exception is thrown if it’s already locked.

with open(lock_file, 'wt') as cf:
    fcntl.flock(cf, fcntl.LOCK_EX | fcntl.LOCK_NB)
    with open(lock_file, 'wt') as cf2:
        try:
            fcntl.flock(cf2, fcntl.LOCK_EX | fcntl.LOCK_NB)
        except BlockingIOError as locked_error:
            print(locked_error)
[Errno 11] Resource temporarily unavailable

The locks are removed when the file is closed, how convenient

with open(lock_file, 'wt') as cf:
    fcntl.flock(cf, fcntl.LOCK_EX | fcntl.LOCK_NB)
with open(lock_file, 'wt') as cf:
    fcntl.flock(cf, fcntl.LOCK_EX | fcntl.LOCK_NB)

Putting it together

Synchroneous

We need to run a script when either any of it’s inputs have changed or any of it’s outputs do not exist. Return True if a follow-up script should be executed, False if nothing changed or executing the script failed.


source

run_rscript_wait

 run_rscript_wait (flow_object, assets_dir:str, save_dir:str)

Run a script in R args: flow_object: dict of flow object returns: bool: True if a follow-up script might need to be run, False if not

Exported source
def run_rscript_wait(flow_object, assets_dir:str, save_dir:str):
    """ Run a script in R 
        args:
            flow_object: dict of flow object
        returns:
            bool: True if a follow-up script might need to be run, False if not

    """
    syslog.debug(f"Running script {flow_object['name']}")
    # Check if output exists and inputs have not changed and return False if 
    # output exists and inputs have not changed
    if check_script_output(flow_object, save_dir) and check_script_inputs(flow_object, save_dir):
        return True
    
    # Create the lock file
    lock_file = get_save_path(f"lock-{calc_hash_from_flowobject(flow_object)}", save_dir)
    with open(lock_file, 'wt') as cf:
        try:
            syslog.debug(f"Locking {lock_file}")
            # Get exclusive lock on the file, is released on file close
            fcntl.flock(cf, fcntl.LOCK_EX | fcntl.LOCK_NB)

            # run the script
            run_script_result = subprocess.run(
                ['Rscript', '--vanilla', get_asset_path(flow_object['name'], assets_dir)],
                cwd=save_dir,
                capture_output=True
            )
            
            # check the return code
            if run_script_result.returncode:
                cf.write(f"Run returned code {run_script_result.returncode}\n")
                cf.write(f"STDOUT------------\n{run_script_result.stdout.decode('UTF-8')}\n")
                cf.write(f"STDERR------------\n{run_script_result.stderr.decode('UTF-8')}\n")
                return False

        except BlockingIOError as locked_error:
            syslog.debug(locked_error)
            return False

    
    # check the output and generate the checksum file
    return check_script_output(flow_object, save_dir) and generate_checksum_file(flow_object, save_dir)
#run_rscript_wait(data_file_flow[0], assets_dir, save_dir)
#run_rscript_wait(data_file_flow[1], assets_dir, save_dir)
#run_rscript_wait(data_file_flow[2], assets_dir, save_dir)
def clear_results(flow_object :dict, save_dir:str):
    """Clear the results of a given flow object"""
    for fname in flow_object['out']:
        try:
            os.remove(get_save_path(fname, save_dir))
        except FileNotFoundError:
            pass
# [clear_results(O, save_dir) for O in data_file_flow.values()]

Asynchronous

In the API we can not wait for a script to finish and we’ll use Popen instead. This means we’ll have to keep track of the process.


source

run_rscript_nowait

 run_rscript_nowait (flow_object, workdir:str, libfolder:str=None,
                     pkg_repo:str='https://cloud.r-project.org')

Run a script in R args: flow_object: dict of flow object workdir: working directory pkg_repo: CRAN package repository returns: RScriptProcess: Popen container object for the script

Exported source
RScriptProcess = namedtuple('RScriptProcess', ['flow_object', 'lock_file', 'stdout','stderr', 'popen_args', 'popen'])

#### Asynchronous RScript processing ------------------------------------------------

def run_rscript_nowait(
        flow_object, 
        workdir:str, 
        libfolder:str=None,
        pkg_repo:str='https://cloud.r-project.org') -> RScriptProcess:
    
    """ Run a script in R 
        args:
            flow_object: dict of flow object
            workdir: working directory
            pkg_repo: CRAN package repository
        returns:
            RScriptProcess: Popen container object for the script
    """
    
    syslog.debug(f"Starting rscript for {flow_object['name']}")

    # lockfile -------------------------------------------------------------------
    os.makedirs(os.path.abspath(os.path.join(workdir, 'temp')), exist_ok=True)
    def get_temp_path(lname):
        return os.path.abspath(os.path.join(workdir, 'temp', lname))
    
    lock_name = 'run_flow_object-'+calc_hash_from_flowobject(flow_object)

    # lock maintenance
    if run_rscript_nowait.lock_objects.get(lock_name): 
        lock_object = run_rscript_nowait.lock_objects[lock_name]
        if not lock_object.lock_file.closed:
            syslog.debug(f"Lockfile is open for {flow_object['name']} ({lock_name})")
            # If the lockfile is open, check if the process is still running
            
            if lock_object.popen is None:
                syslog.debug(f"No process running for {flow_object['name']} ({lock_name})")
            elif lock_object.popen.poll() is None:
                syslog.debug(f"Script is still running for {flow_object['name']} ({lock_name})")
                return lock_object
            else:
                syslog.debug(f"Script has finished for {flow_object['name']} ({lock_name}), returned {lock_object.popen.returncode}")
                # since poll return not-None the script has finished so close the lockfile
                lock_object.lock_file.close()
                lock_object.stdout.close()
                lock_object.stderr.close()
                if lock_object.popen.returncode != 0:
                    syslog.error(f"Script failed for {flow_object['name']} ({lock_name}), returned {lock_object.popen.returncode}")
                    syslog.error(f"Args were: {lock_object.popen_args}")
                    with open(lock_object.stdout.name, 'rb') as so:
                        syslog.error(f"STDOUT\n{so.read().decode('UTF-8')}")
                    with open(lock_object.stderr.name, 'rb') as se:
                        syslog.error(f"STDERR\n{se.read().decode('UTF-8')}")
                else:
                    syslog.debug(f"Script was successful for {flow_object['name']} ({lock_name})")
                    generate_checksum_file(flow_object, os.path.abspath(workdir))

                #os.remove(lock_object.stdout.name)
                #os.remove(lock_object.stderr.name)


    # Check if output exists and inputs have not changed and return False if 
    # output exists and inputs have not changed
    if check_script_output(flow_object, os.path.abspath(workdir)) and check_script_inputs(flow_object, os.path.abspath(workdir)):
        syslog.debug(f"Output and inputs are up-to-date for {flow_object['name']}")
        return run_rscript_nowait.lock_objects.get(lock_name)

    if not all([os.path.exists(get_save_path(fname, os.path.abspath(workdir))) for fname in flow_object['in'].values()]):
        syslog.debug(f"Inputs missing for {flow_object['name']}")
        return run_rscript_nowait.lock_objects.get(lock_name)
    # Create the lock file -----------------------------------------------------------
    syslog.debug(f"Preparing to run scripts for {flow_object['name']}, creating lockfile ({lock_name})")
    cf = open(get_temp_path(f"lock-{lock_name}"), 'wt')
    
    try:
        # Set lock on lockfile
        fcntl.flock(cf, fcntl.LOCK_EX | fcntl.LOCK_NB)

        so = open(get_temp_path(f"stdout-{lock_name}"), 'wt')
        se = open(get_temp_path(f"stderr-{lock_name}"), 'wt')

        # check libs
        if not libfolder:
            libfolder=os.path.abspath(os.path.join(workdir, 'libs'))
            
        os.makedirs(libfolder, exist_ok=True)
        syslog.debug(f"Using libfolder {libfolder} for packages")
        
        env = dict(os.environ)
        env['R_LIBS_USER'] = libfolder
        syslog.debug(F"Using libfolder {env['R_LIBS_USER']} for R_LIBS_USER")
        
        if not check_rscript_libs(flow_object['libs'], libfolder):
            for pkg_i in flow_object['libs']:
                syslog.debug(f"Checking lib {pkg_i} for {flow_object['name']} ({lock_name})")
                if not check_rscript_lib(pkg_i, libfolder):
                    syslog.debug(f"Starting installation of {pkg_i} for {flow_object['name']} ({lock_name})")
                    popen_args = [
                            'Rscript','-e', 
                            f"install.packages('{pkg_i}', repos='{pkg_repo}', lib='{libfolder}', dependencies=TRUE)",
                        ]
                    run_script_install = subprocess.Popen(
                        popen_args, 
                        cwd=os.path.abspath(workdir),
                        stdout=so,
                        stderr=se,
                        encoding='UTF-8',
                        env=env,
                    )
                    run_rscript_nowait.lock_objects[lock_name] =  RScriptProcess(flow_object, cf, so, se, popen_args, run_script_install)
                    return run_rscript_nowait.lock_objects.get(lock_name)
                    
        
        syslog.debug(f"Libs are up-to-date, starting script for {flow_object['name']} ({lock_name})")
        # run the script
        popen_args = ['Rscript', flow_object['name']]
        popen_run = subprocess.Popen(
            popen_args,
            cwd=os.path.abspath(workdir),
            stdout=so,
            stderr=se,
            encoding='UTF-8',
            env=env,
        )

        run_rscript_nowait.lock_objects[lock_name] =  RScriptProcess(flow_object, cf, so, se, popen_args, popen_run)
            
    except BlockingIOError as locked_error:
        cf.close()
        #syslog.error(f"{flow_object['name']} is locked, cannot run", exc_info=locked_error)

    syslog.debug(f"Done with {flow_object['name']}.")

    return run_rscript_nowait.lock_objects.get(lock_name)

run_rscript_nowait.lock_objects = {}

source

release_script_lock

 release_script_lock (flow_object, save_dir)
Exported source
def release_script_lock(flow_object, save_dir):
    process = run_rscript_nowait.lock_objects.get(flow_object['name'])
    if process.popen and process.popen.poll() is not None:
        syslog.debug(f"Closing lockfile {process.lock_file.name}")
        process.lock_file.close()
for flow_object in data_file_flow.values():
    syslog.info(f"{flow_object['name']} --------------------")
    startresult = run_rscript_nowait(flow_object, workdir=save_dir)
    
    #print(f"Args: {startresult.popen_args if startresult else None}")
INFO    1975    __main__    2844065539.py   2   Data_preparation.R --------------------
DEBUG   1975    __main__    319997563.py    22  Starting rscript for Data_preparation.R
DEBUG   1975    __main__    3613995005.py   10  Output files for Flow object: Data_preparation.R: [('Modelling_data', True), ('Prediction_data', True)]
DEBUG   1975    __main__    4142405122.py   11  Checksum check result for Flow object: Data_preparation.R: 0, checksum file: input-checksum-5d532037f7dda7e7fad290ddef53f69d
DEBUG   1975    __main__    319997563.py    67  Output and inputs are up-to-date for Data_preparation.R
INFO    1975    __main__    2844065539.py   2   Prediction_part1.R --------------------
DEBUG   1975    __main__    319997563.py    22  Starting rscript for Prediction_part1.R
DEBUG   1975    __main__    3613995005.py   10  Output files for Flow object: Prediction_part1.R: [('Fitted_models', True), ('Weights', True)]
DEBUG   1975    __main__    4142405122.py   11  Checksum check result for Flow object: Prediction_part1.R: 0, checksum file: input-checksum-33757e2024067c7704d44232a63c870b
DEBUG   1975    __main__    319997563.py    67  Output and inputs are up-to-date for Prediction_part1.R
INFO    1975    __main__    2844065539.py   2   Prediction_part2.R --------------------
DEBUG   1975    __main__    319997563.py    22  Starting rscript for Prediction_part2.R
DEBUG   1975    __main__    3613995005.py   10  Output files for Flow object: Prediction_part2.R: [('Predicted_sapflux', True)]
DEBUG   1975    __main__    4142405122.py   11  Checksum check result for Flow object: Prediction_part2.R: 0, checksum file: input-checksum-d37df0122cd5162bc80b4c25b959e4b0
DEBUG   1975    __main__    319997563.py    67  Output and inputs are up-to-date for Prediction_part2.R
INFO    1975    __main__    2844065539.py   2   Prediction_part3.R --------------------
DEBUG   1975    __main__    319997563.py    22  Starting rscript for Prediction_part3.R
DEBUG   1975    __main__    3613995005.py   10  Output files for Flow object: Prediction_part3.R: [('Predicted_water_usage', True)]
DEBUG   1975    __main__    4142405122.py   11  Checksum check result for Flow object: Prediction_part3.R: 0, checksum file: input-checksum-b4e0cb61356d5cc9830bc495b1584e9c
DEBUG   1975    __main__    319997563.py    67  Output and inputs are up-to-date for Prediction_part3.R
for name, process in run_rscript_nowait.lock_objects.items():
    if process.popen:
        print(f"process {name} for {process.flow_object['name']} is done? {process.popen.poll()}")
        print(f"args: {process.popen_args}")
for flow_object in data_file_flow.values():
    syslog.info(f"Checking {flow_object['name']}")
    if check_script_output(flow_object, save_dir) and check_script_inputs(flow_object, save_dir):
        syslog.info(f"Output and inputs are up-to-date for {flow_object['name']}")
    else:
        print([
            (F,os.path.isfile(get_save_path(F, save_dir)) )
            for F in flow_object['out']
        ])
        checksum_file = get_save_path(f"input-checksum-{calc_hash_from_flowobject(flow_object)}", save_dir)
        md5_check_result = subprocess.run(
            ['md5sum', '-c', checksum_file], 
            cwd=save_dir,
            capture_output=True)
        
        print(md5_check_result.returncode, md5_check_result.stderr.decode('UTF-8'))
INFO    1975    __main__    1138821752.py   2   Checking Data_preparation.R
DEBUG   1975    __main__    3613995005.py   10  Output files for Flow object: Data_preparation.R: [('Modelling_data', True), ('Prediction_data', True)]
DEBUG   1975    __main__    4142405122.py   11  Checksum check result for Flow object: Data_preparation.R: 0, checksum file: input-checksum-5d532037f7dda7e7fad290ddef53f69d
INFO    1975    __main__    1138821752.py   4   Output and inputs are up-to-date for Data_preparation.R
INFO    1975    __main__    1138821752.py   2   Checking Prediction_part1.R
DEBUG   1975    __main__    3613995005.py   10  Output files for Flow object: Prediction_part1.R: [('Fitted_models', True), ('Weights', True)]
DEBUG   1975    __main__    4142405122.py   11  Checksum check result for Flow object: Prediction_part1.R: 0, checksum file: input-checksum-33757e2024067c7704d44232a63c870b
INFO    1975    __main__    1138821752.py   4   Output and inputs are up-to-date for Prediction_part1.R
INFO    1975    __main__    1138821752.py   2   Checking Prediction_part2.R
DEBUG   1975    __main__    3613995005.py   10  Output files for Flow object: Prediction_part2.R: [('Predicted_sapflux', True)]
DEBUG   1975    __main__    4142405122.py   11  Checksum check result for Flow object: Prediction_part2.R: 0, checksum file: input-checksum-d37df0122cd5162bc80b4c25b959e4b0
INFO    1975    __main__    1138821752.py   4   Output and inputs are up-to-date for Prediction_part2.R
INFO    1975    __main__    1138821752.py   2   Checking Prediction_part3.R
DEBUG   1975    __main__    3613995005.py   10  Output files for Flow object: Prediction_part3.R: [('Predicted_water_usage', True)]
DEBUG   1975    __main__    4142405122.py   11  Checksum check result for Flow object: Prediction_part3.R: 0, checksum file: input-checksum-b4e0cb61356d5cc9830bc495b1584e9c
INFO    1975    __main__    1138821752.py   4   Output and inputs are up-to-date for Prediction_part3.R
for process in run_rscript_nowait.lock_objects.values():
    if process.popen and process.popen.poll() is not None:
        print(f"Closing lockfile {process.lock_file.name}")
        process.lock_file.close()

AICore module class

Init


source

AICoreRScriptModule

 AICoreRScriptModule (flow_mapping:dict, save_dir:str, assets_dir:str,
                      cran_repo:str='https://cloud.r-project.org', *args,
                      **kwargs)

Initialize self. See help(type(self)) for accurate signature.

Type Default Details
flow_mapping dict scripts flow map
save_dir str path where the module can keep files
assets_dir str path to support files (scripts, metadata, etc)
cran_repo str https://cloud.r-project.org CRAN repo
args VAR_POSITIONAL
kwargs VAR_KEYWORD
Exported source
class AICoreRScriptModule(AICoreModuleBase):
    def __init__(self, 
                flow_mapping:dict, # scripts flow map
                save_dir:str, # path where the module can keep files 
                assets_dir:str, # path to support files (scripts, metadata, etc)
                cran_repo:str='https://cloud.r-project.org', # CRAN repo
                *args, **kwargs):
        
        super().__init__(save_dir, assets_dir, *args, **kwargs)
    
        self.corebridge_version = corebridge.__version__

        self.flow_mapping = flow_mapping
        self.cran_repo = cran_repo

        self.data_files_map = {
            D:F
            for P in self.flow_mapping.values()
            for D,F in P['in'].items()
        }

        for N in self.data_files_map.keys():
            print(N, os.path.isfile(self.get_save_path(self.data_files_map.get(N))), self.get_save_path(self.data_files_map.get(N)))
        # list assets
        print('Assets:\n', subprocess.run(['ls', '-la', assets_dir], capture_output=True).stdout.decode('UTF-8'))

        self.unpack_result = unpack_assets(assets_dir, self.get_rscript_workdir())
        # list working directory
        #print('Working directory:\n', subprocess.run(['ls', '-l', '*.R', save_dir], capture_output=True).stdout.decode('UTF-8'))

        self.flow_results = {
            flow_object['name']:run_rscript_nowait(
                flow_object, 
                workdir=self.get_rscript_workdir(),
                libfolder=self.get_rscript_libpath(),
                pkg_repo=self.cran_repo
            )
            for flow_object in self.flow_mapping.values()
        }

        self.update_flow()
        
        syslog.info(f"RScriptModule initialized with {len(flow_mapping)} flow objects.")

    # def get_asset_path(self,script_name): 
    #     return os.path.abspath(os.path.join(self.init_kwargs['assets_dir'], script_name))
    def get_rscript_libpath(self):
        return os.path.abspath(os.path.join(self.init_kwargs['save_dir'], 'libs'))
    def get_rscript_workdir(self):
        return os.path.abspath(os.path.join(self.init_kwargs['save_dir'], 'workdir'))
    def get_save_path(self, datafile_name:str): 
        return os.path.abspath(os.path.join(self.init_kwargs['save_dir'], 'workdir', datafile_name))
    
    def get_flow_status(self):
        return [
            f"process {name} for {process.flow_object['name']} pollstatus: {process.popen.poll()}, args: {process.popen_args}"
            for name, process in self.flow_results.items()
            if process and process.popen
        ]

Run flow scripts to update data


source

AICoreRScriptModule.update_flow

 AICoreRScriptModule.update_flow ()
Exported source
@patch
def update_flow(self:AICoreRScriptModule):
    workdir = self.get_rscript_workdir()
    libfolder = self.get_rscript_libpath()

    for flow_object in self.flow_mapping.values():
        
        syslog.debug(f"Update {flow_object['name']}, output: {check_script_output(flow_object, os.path.abspath(workdir))}, inputs: {check_script_inputs(flow_object, os.path.abspath(workdir))}")
        if (
            not check_script_output(flow_object, os.path.abspath(workdir)) 
            or not check_script_inputs(flow_object, os.path.abspath(workdir))
        ):
            if self.flow_results[flow_object['name']]: 
                process = self.flow_results[flow_object['name']]
                if process.popen.poll() is None:
                    syslog.debug(f"Process is still running: {flow_object['name']}, args: {process.popen_args}")
                    return self.get_flow_status()
                else:
                    syslog.debug(f"Process finished: {flow_object['name']}, args: {process.popen_args}, returncode: {process.popen.poll()}")
                

            syslog.debug(f"Updating for {flow_object['name']}, starting at {workdir}")

            self.flow_results[flow_object['name']] = run_rscript_nowait(
                flow_object, 
                workdir=workdir, 
                libfolder=libfolder,
                pkg_repo=self.cran_repo
            )

    syslog.info(f"RScriptModule flow update complete.")
    return self.get_flow_status()

Converting RData

?rdata.read_rda
Signature:
rdata.read_rda(
    file_or_path: 'AcceptableFile | os.PathLike[Any] | Traversable | str',
    *,
    expand_altrep: 'bool' = True,
    altrep_constructor_dict: 'AltRepConstructorMap' = mappingproxy({b'deferred_string': <function deferred_string_constructor at 0x7f22ea064360>, b'compact_intseq': <function compact_intseq_constructor at 0x7f22ea0644a0>, b'compact_realseq': <function compact_realseq_constructor at 0x7f22ea064540>, b'wrap_real': <function wrap_constructor at 0x7f22ea0645e0>, b'wrap_string': <function wrap_constructor at 0x7f22ea0645e0>, b'wrap_logical': <function wrap_constructor at 0x7f22ea0645e0>, b'wrap_integer': <function wrap_constructor at 0x7f22ea0645e0>, b'wrap_complex': <function wrap_constructor at 0x7f22ea0645e0>, b'wrap_raw': <function wrap_constructor at 0x7f22ea0645e0>}),
    constructor_dict: 'ConstructorDict' = mappingproxy({'data.frame': <function dataframe_constructor at 0x7f22ea066ac0>, 'factor': <function factor_constructor at 0x7f22ea066c00>, 'ordered': <function ordered_constructor at 0x7f22ea066ca0>, 'ts': <function ts_constructor at 0x7f22ea066d40>, 'srcref': <function srcref_constructor at 0x7f22ea066de0>, 'srcfile': <function srcfile_constructor at 0x7f22ea067240>, 'srcfilecopy': <function srcfilecopy_constructor at 0x7f22ea067560>}),
    default_encoding: 'str | None' = None,
    force_default_encoding: 'bool' = False,
    global_environment: 'MutableMapping[str, Any] | None' = None,
    base_environment: 'MutableMapping[str, Any] | None' = None,
) -> 'dict[str, Any]'
Docstring:
Read an RDA or RDATA file, containing an R object.

This is a convenience function that wraps :func:`rdata.parser.parse_file`
and :func:`rdata.parser.convert`, as it is the common use case.

Args:
    file_or_path: File in the RDA format.
    expand_altrep: Whether to translate ALTREPs to normal objects.
    altrep_constructor_dict: Dictionary mapping each ALTREP to
        its constructor.
    constructor_dict: Dictionary mapping names of R classes to constructor
        functions with the following prototype:

        .. code-block :: python

            def constructor(obj, attrs):
                ...

        This dictionary can be used to support custom R classes. By
        default, the dictionary used is
        :data:`~rdata.conversion._conversion.DEFAULT_CLASS_MAP`
        which has support for several common classes.
    default_encoding: Default encoding used for strings with unknown
        encoding. If `None`, the one stored in the file will be used, or
        ASCII as a fallback.
    force_default_encoding:
        Use the default encoding even if the strings specify other
        encoding.
    global_environment: Global environment to use. By default is an empty
        environment.
    base_environment: Base environment to use. By default is an empty
        environment.

Returns:
    Contents of the file converted to a Python object.

See Also:
    :func:`read_rds`: Similar function that parses a RDS file.

Examples:
    Parse one of the included examples, containing a dataframe

    >>> import rdata
    >>>
    >>> data = rdata.read_rda(
    ...              rdata.TESTDATA_PATH / "test_dataframe.rda"
    ... )
    >>> data
    {'test_dataframe':   class  value
    1     a      1
    2     b      2
    3     b      3}
File:      ~/repos/corebridge/.devenv-corebridge/lib64/python3.11/site-packages/rdata/_read.py
Type:      function
module = AICoreRScriptModule(data_file_flow, save_dir, assets_dir)
---------------------------------------------------------------------------
NameError                                 Traceback (most recent call last)
Cell In[81], line 1
----> 1 module = AICoreRScriptModule(data_file_flow, save_dir, assets_dir)

Cell In[78], line 10, in AICoreRScriptModule.__init__(self, flow_mapping, save_dir, assets_dir, cran_repo, *args, **kwargs)
      3 def __init__(self, 
      4             flow_mapping:dict, # scripts flow map
      5             save_dir:str, # path where the module can keep files 
      6             assets_dir:str, # path to support files (scripts, metadata, etc)
      7             cran_repo:str='https://cloud.r-project.org', # CRAN repo
      8             *args, **kwargs):
---> 10     super().__init__(save_dir, assets_dir, *args, **kwargs)
     12     self.corebridge_version = __version__
     14     self.flow_mapping = flow_mapping

File ~/repos/corebridge/corebridge/core.py:311, in __init__(self, save_dir, assets_dir, *args, **kwargs)
    304 @patch
    305 def __init__(self:AICoreModuleBase, 
    306             save_dir:str, # path where the module can keep files 
    307             assets_dir:str, # path to support files (scripts, metadata, etc)
    308             *args, **kwargs):
    310     self.init_time = datetime.datetime.now(datetime.UTC)
--> 311     self.corebridge_version = __version__
    313     self.init_args = args
    314     self.init_kwargs = dict(
    315         **kwargs,
    316         assets_dir=assets_dir,
    317         save_dir=save_dir
    318     )

NameError: name '__version__' is not defined
rdata.TESTDATA_PATH / "test_dataframe.rda"
Path('/home/fenke/repos/corebridge/.devenv-corebridge/lib64/python3.11/site-packages/rdata/tests/data/test_dataframe.rda')
converted = rdata.read_rda(rdata.TESTDATA_PATH / "test_dataframe.rda")
converted
{'test_dataframe':   class  value
 1     a      1
 2     b      2
 3     b      3}
if os.path.exists(module.get_save_path(module.data_files_map.get('Predicted_sapflux'))):

    converted = rdata.read_rda(module.get_save_path(module.data_files_map.get('Predicted_sapflux')))
    converted
if os.path.exists(module.get_save_path(module.data_files_map.get('Predicted_sapflux'))):

    list(converted['ensemble_pred'].keys())
def print_structure(data, indent):
    if not isinstance(data, dict):
        if isinstance(data, list) or isinstance(data, np.ndarray):
            print(f"\n{indent}data:{len(data)} items.")
            print_structure(data[0], indent + '--> ')
        else:
            print(f"\n{indent}type: {type(data)}")

        return
    
    for key in data.keys():
        print(f"\n{indent}{key}")
        print_structure(data[key], indent+'++> ')
if os.path.exists(module.get_save_path(module.data_files_map.get('Predicted_sapflux'))):
    print_structure(converted, '  ')

  ensemble_pred

  ++> ensemble

  ++> ++> data:10 items.

  ++> ++> --> equ

  ++> ++> --> ++> data:168 items.

  ++> ++> --> ++> --> type: <class 'numpy.float64'>

  ++> ++> --> mse

  ++> ++> --> ++> data:168 items.

  ++> ++> --> ++> --> type: <class 'numpy.float64'>

  ++> error

  ++> ++> data:10 items.

  ++> ++> --> equ

  ++> ++> --> ++> data:168 items.

  ++> ++> --> ++> --> type: <class 'numpy.float64'>

  ++> ++> --> mse

  ++> ++> --> ++> data:168 items.

  ++> ++> --> ++> --> type: <class 'numpy.float64'>

  ++> typical

  ++> ++> equ

  ++> ++> ++> pred

  ++> ++> ++> ++> data:168 items.

  ++> ++> ++> ++> --> type: <class 'numpy.float64'>

  ++> ++> ++> se

  ++> ++> ++> ++> data:168 items.

  ++> ++> ++> ++> --> type: <class 'numpy.float64'>

  ++> ++> mse

  ++> ++> ++> pred

  ++> ++> ++> ++> data:168 items.

  ++> ++> ++> ++> --> type: <class 'numpy.float64'>

  ++> ++> ++> se

  ++> ++> ++> ++> data:168 items.

  ++> ++> ++> ++> --> type: <class 'numpy.float64'>

  ++> time

  ++> ++> data:168 items.

  ++> ++> --> type: <class 'numpy.str_'>

source

recursive_flatten_nested_data

 recursive_flatten_nested_data (data:dict, column_prefix:str='',
                                camel_case=False)
Exported source
def snake_case_to_camel_case(snake_case:str) -> str:
    splittext = snake_case.split('_')
    return ''.join([x.capitalize() if n > 0 else x for x,n in zip(splittext, range(len(splittext)))])

def recursive_flatten_nested_data(
        data:dict, 
        column_prefix:str='',
        camel_case=False) -> dict:
    
    if isinstance(data, np.ndarray):
        return {column_prefix:data}
    
    if isinstance(data, list):
        return reduce(
            lambda R, X: dict(**R, **X) if R else X,
            [
                recursive_flatten_nested_data(value, f"{column_prefix}_{i+1}_", camel_case)
                for i, value in enumerate(data)
             
            ],
            {}

        )
    
    if isinstance(data, dict):
        
        #if len(data.keys()) == 0:
        #    return data
        if len(data.keys()) > 1:
            return reduce(
                lambda R, X: dict(**R, **X) if R else X,
                [
                    recursive_flatten_nested_data(
                        value, 
                        snake_case_to_camel_case(column_prefix+'_'+str(key)) if camel_case else column_prefix+'_'+str(key),
                        camel_case
                    )
                    for key, value in data.items()
                ],
                {}
                
            )
        else:
            key = list(data.keys())[0]
            value = data[key]
            if column_prefix:
                column_name = snake_case_to_camel_case(column_prefix+'_'+str(key)) if camel_case else column_prefix+'_'+str(key)
            else:
                column_name = snake_case_to_camel_case(str(key)) if camel_case else str(key)
            return recursive_flatten_nested_data(
                value, column_name, camel_case
            )

source

snake_case_to_camel_case

 snake_case_to_camel_case (snake_case:str)
flattened = recursive_flatten_nested_data(converted, camel_case=True)
df = pd.DataFrame(flattened)
time_column = [k for k,v in df.dtypes.to_dict().items() if 'float' not in str(v)][0]
df.set_index( pd.DatetimeIndex(df[time_column]), inplace=True)
#df.drop('ensemble_predTime', axis=1, inplace=True)
df.info()
<class 'pandas.core.frame.DataFrame'>
DatetimeIndex: 168 entries, 2023-08-01 00:00:01 to 2023-08-07 23:00:01
Data columns (total 45 columns):
 #   Column                      Non-Null Count  Dtype  
---  ------                      --------------  -----  
 0   ensemblePredEnsemble1Equ    168 non-null    float64
 1   ensemblePredEnsemble1Mse    168 non-null    float64
 2   ensemblePredEnsemble2Equ    168 non-null    float64
 3   ensemblePredEnsemble2Mse    168 non-null    float64
 4   ensemblePredEnsemble3Equ    168 non-null    float64
 5   ensemblePredEnsemble3Mse    168 non-null    float64
 6   ensemblePredEnsemble4Equ    168 non-null    float64
 7   ensemblePredEnsemble4Mse    168 non-null    float64
 8   ensemblePredEnsemble5Equ    168 non-null    float64
 9   ensemblePredEnsemble5Mse    168 non-null    float64
 10  ensemblePredEnsemble6Equ    168 non-null    float64
 11  ensemblePredEnsemble6Mse    168 non-null    float64
 12  ensemblePredEnsemble7Equ    168 non-null    float64
 13  ensemblePredEnsemble7Mse    168 non-null    float64
 14  ensemblePredEnsemble8Equ    0 non-null      float64
 15  ensemblePredEnsemble8Mse    168 non-null    float64
 16  ensemblePredEnsemble9Equ    168 non-null    float64
 17  ensemblePredEnsemble9Mse    168 non-null    float64
 18  ensemblePredEnsemble10Equ   168 non-null    float64
 19  ensemblePredEnsemble10Mse   168 non-null    float64
 20  ensemblePredError1Equ       168 non-null    float64
 21  ensemblePredError1Mse       168 non-null    float64
 22  ensemblePredError2Equ       168 non-null    float64
 23  ensemblePredError2Mse       168 non-null    float64
 24  ensemblePredError3Equ       168 non-null    float64
 25  ensemblePredError3Mse       168 non-null    float64
 26  ensemblePredError4Equ       168 non-null    float64
 27  ensemblePredError4Mse       168 non-null    float64
 28  ensemblePredError5Equ       168 non-null    float64
 29  ensemblePredError5Mse       168 non-null    float64
 30  ensemblePredError6Equ       168 non-null    float64
 31  ensemblePredError6Mse       168 non-null    float64
 32  ensemblePredError7Equ       168 non-null    float64
 33  ensemblePredError7Mse       168 non-null    float64
 34  ensemblePredError8Equ       0 non-null      float64
 35  ensemblePredError8Mse       0 non-null      float64
 36  ensemblePredError9Equ       168 non-null    float64
 37  ensemblePredError9Mse       168 non-null    float64
 38  ensemblePredError10Equ      168 non-null    float64
 39  ensemblePredError10Mse      168 non-null    float64
 40  ensemblePredTypicalEquPred  168 non-null    float64
 41  ensemblePredTypicalEquSe    168 non-null    float64
 42  ensemblePredTypicalMsePred  168 non-null    float64
 43  ensemblePredTypicalMseSe    168 non-null    float64
 44  ensemblePredTime            168 non-null    object 
dtypes: float64(44), object(1)
memory usage: 60.4+ KB
df
ensemblePredEnsemble1Equ ensemblePredEnsemble1Mse ensemblePredEnsemble2Equ ensemblePredEnsemble2Mse ensemblePredEnsemble3Equ ensemblePredEnsemble3Mse ensemblePredEnsemble4Equ ensemblePredEnsemble4Mse ensemblePredEnsemble5Equ ensemblePredEnsemble5Mse ... ensemblePredError8Mse ensemblePredError9Equ ensemblePredError9Mse ensemblePredError10Equ ensemblePredError10Mse ensemblePredTypicalEquPred ensemblePredTypicalEquSe ensemblePredTypicalMsePred ensemblePredTypicalMseSe ensemblePredTime
ensemblePredTime
2023-08-01 00:00:01 0.549107 0.542947 0.666708 0.658971 0.780691 0.771425 0.660063 0.652414 0.584461 0.577826 ... NaN 0.356676 0.356170 0.375093 0.375200 0.617332 0.125831 0.549231 0.126207 2023-08-01 00:00:01
2023-08-01 01:00:01 0.490717 0.483454 0.561241 0.552198 0.629595 0.618828 0.557256 0.548314 0.511918 0.504120 ... NaN 0.510744 0.508403 0.527951 0.526323 0.531630 0.171059 0.471002 0.170967 2023-08-01 01:00:01
2023-08-01 02:00:01 0.455916 0.448803 0.499389 0.490743 0.541525 0.531393 0.496933 0.488373 0.468985 0.461412 ... NaN 0.594047 0.589900 0.606447 0.602945 0.481136 0.191053 0.425821 0.190360 2023-08-01 02:00:01
2023-08-01 03:00:01 0.424082 0.417034 0.451503 0.443261 0.478080 0.468680 0.449953 0.441779 0.432326 0.424918 ... NaN 0.641129 0.635564 0.649752 0.644735 0.439990 0.201109 0.389024 0.199905 2023-08-01 03:00:01
2023-08-01 04:00:01 0.407526 0.402109 0.425149 0.418847 0.442231 0.435070 0.424153 0.417901 0.412824 0.407141 ... NaN 0.622489 0.615642 0.627102 0.620682 0.417750 0.191545 0.370637 0.189855 2023-08-01 04:00:01
... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ...
2023-08-07 19:00:01 3.658327 3.659715 3.658327 3.659715 3.658327 3.659715 3.658327 3.659715 3.658327 3.659715 ... NaN 2.302566 2.259919 2.302566 2.259919 3.658327 0.690770 3.293744 0.677976 2023-08-07 19:00:01
2023-08-07 20:00:01 2.298355 2.278851 2.298355 2.278851 2.298355 2.278851 2.298355 2.278851 2.298355 2.278851 ... NaN 1.790125 1.785654 1.790125 1.785654 2.298355 0.537037 2.050966 0.535696 2023-08-07 20:00:01
2023-08-07 21:00:01 1.512578 1.489109 1.512578 1.489109 1.512578 1.489109 1.512578 1.489109 1.512578 1.489109 ... NaN 1.489199 1.497512 1.489199 1.497512 1.512578 0.446760 1.340198 0.449254 2023-08-07 21:00:01
2023-08-07 22:00:01 1.018082 1.001020 1.018082 1.001020 1.018082 1.001020 1.018082 1.001020 1.018082 1.001020 ... NaN 1.051568 1.060695 1.051568 1.060695 1.018082 0.315470 0.900918 0.318209 2023-08-07 22:00:01
2023-08-07 23:00:01 0.667827 0.661204 0.667827 0.661204 0.667827 0.661204 0.667827 0.661204 0.667827 0.661204 ... NaN 0.647845 0.656302 0.647845 0.656302 0.667827 0.194353 0.595083 0.196890 2023-08-07 23:00:01

168 rows × 45 columns


source

AICoreRScriptModule.infer

 AICoreRScriptModule.infer (data:dict, *_, **kwargs)

Infer method for the RScriptModule

Exported source
@patch
def write_uploaded_data(
    self:AICoreRScriptModule, 
    df:pd.DataFrame, 
    tag:str=None,
    **kwargs):

    csv_filename = self.get_save_path(self.data_files_map.get(tag, tag))
    syslog.debug(f"Writing {df.shape[0]} rows to {csv_filename}")

    df.reset_index().to_csv(csv_filename, index=False, date_format='%Y-%m-%d %H:%M:%S')

@patch
def read_data(self:AICoreRScriptModule, tag:str=None, camel_case=False, **kwargs):
    
    rdata_filename = self.get_save_path(self.data_files_map.get(tag, tag))
    converted = rdata.read_rda(rdata_filename)

    flattened = recursive_flatten_nested_data(converted, camel_case=camel_case)
    df = pd.DataFrame(flattened)
    syslog.debug(f"Read {df.shape[0]} rows from {rdata_filename} for {tag} (camel_case={camel_case})")

    time_column = [k for k,v in df.dtypes.to_dict().items() if 'float' not in str(v)][0]
    df.set_index( pd.DatetimeIndex(df[time_column]), inplace=True)
    df.index.name = 'time'
    df.drop(time_column, axis=1, inplace=True)

    return df
                                        


@patch
def infer(
    self:AICoreRScriptModule, 
    data:dict, 
    *_, 
    **kwargs):

    """ 
    Infer method for the RScriptModule
    """

    try:

        msg=[
            f"Startup time: {self.init_time.isoformat()}",
            f"Corebridge version: {self.corebridge_version}",
            f"init_args: {self.init_args}, init_kwargs: {self.init_kwargs}",
        ]
        
        msg += self.update_flow()
        # Pickup params, pop those that are not intended for the processor
        writeTag = kwargs.pop('writeTag', None)
        readTag = kwargs.pop('readTag', None)
        camelCase = bool(kwargs.pop('camelCase', False))
        msg.append(f"writeTag: {writeTag}, readTag: {readTag}, camelCase: {camelCase}")

        lastSeen = kwargs.pop('lastSeen', False)
        recordformat = kwargs.pop('format', "records").lower()
        timezone = kwargs.get('timezone', 'UTC')
        msg.append(f"lastSeen: {lastSeen}, recordformat: {recordformat}, timezone: {timezone}")

        reversed = kwargs.pop('reversed', False)

        if writeTag:

            df = set_time_index_zone(timeseries_dataframe_from_datadict(
            data, ['datetimeMeasure', 'time'], recordformat), timezone)

            df.sort_index(inplace=True)

            syslog.debug(f"Writing {df.shape[0]} rows to {writeTag}")
            self.write_uploaded_data(df, writeTag)

        if readTag:
            result = self.read_data(readTag, camel_case=camelCase)

            if reversed:
                result = result[::-1]

            syslog.debug(f"Read {result.shape[0]} rows from {readTag}")

            return {
                'msg':msg,
                'data': timeseries_dataframe_to_datadict(
                    result if not lastSeen else result[-1:],
                    recordformat=recordformat,
                    timezone=timezone,
                    popNaN=True)
            }
        
        return {
            'msg':msg + self.get_flow_status(),
            'data': []
        }

    except Exception as err:
        msg.append(''.join(traceback.format_exception(None, err, err.__traceback__)))
        syslog.exception(f"Exception {str(err)} in infer()")
        return {
            'msg': f"Unexpected {err=}, {type(err)=}",
            'data': []
        }

source

AICoreRScriptModule.read_data

 AICoreRScriptModule.read_data (tag:str=None, camel_case=False, **kwargs)

source

AICoreRScriptModule.write_uploaded_data

 AICoreRScriptModule.write_uploaded_data (df:pandas.core.frame.DataFrame,
                                          tag:str=None, **kwargs)
module = AICoreRScriptModule(data_file_flow, save_dir, assets_dir)
DEBUG   16824   __main__    319997563.py    22  Starting rscript for Data_preparation.R
DEBUG   16824   __main__    3613995005.py   10  Output files for Flow object: Data_preparation.R: [('Modelling_data', True), ('Prediction_data', True)]
DEBUG   16824   __main__    4142405122.py   11  Checksum check result for Flow object: Data_preparation.R: 0, checksum file: input-checksum-5d532037f7dda7e7fad290ddef53f69d
DEBUG   16824   __main__    319997563.py    67  Output and inputs are up-to-date for Data_preparation.R
DEBUG   16824   __main__    319997563.py    22  Starting rscript for Prediction_part1.R
DEBUG   16824   __main__    3613995005.py   10  Output files for Flow object: Prediction_part1.R: [('Fitted_models', True), ('Weights', True)]
DEBUG   16824   __main__    4142405122.py   11  Checksum check result for Flow object: Prediction_part1.R: 0, checksum file: input-checksum-33757e2024067c7704d44232a63c870b
DEBUG   16824   __main__    319997563.py    67  Output and inputs are up-to-date for Prediction_part1.R
DEBUG   16824   __main__    319997563.py    22  Starting rscript for Prediction_part2.R
DEBUG   16824   __main__    3613995005.py   10  Output files for Flow object: Prediction_part2.R: [('Predicted_sapflux', True)]
DEBUG   16824   __main__    4142405122.py   11  Checksum check result for Flow object: Prediction_part2.R: 0, checksum file: input-checksum-d37df0122cd5162bc80b4c25b959e4b0
DEBUG   16824   __main__    319997563.py    67  Output and inputs are up-to-date for Prediction_part2.R
DEBUG   16824   __main__    319997563.py    22  Starting rscript for Prediction_part3.R
DEBUG   16824   __main__    3613995005.py   10  Output files for Flow object: Prediction_part3.R: [('Predicted_water_usage', True)]
DEBUG   16824   __main__    4142405122.py   11  Checksum check result for Flow object: Prediction_part3.R: 0, checksum file: input-checksum-b4e0cb61356d5cc9830bc495b1584e9c
DEBUG   16824   __main__    319997563.py    67  Output and inputs are up-to-date for Prediction_part3.R
DEBUG   16824   __main__    3613995005.py   10  Output files for Flow object: Data_preparation.R: [('Modelling_data', True), ('Prediction_data', True)]
DEBUG   16824   __main__    4142405122.py   11  Checksum check result for Flow object: Data_preparation.R: 0, checksum file: input-checksum-5d532037f7dda7e7fad290ddef53f69d
DEBUG   16824   __main__    2480783271.py   10  Update Data_preparation.R, output: True, inputs: True
DEBUG   16824   __main__    3613995005.py   10  Output files for Flow object: Data_preparation.R: [('Modelling_data', True), ('Prediction_data', True)]
DEBUG   16824   __main__    4142405122.py   11  Checksum check result for Flow object: Data_preparation.R: 0, checksum file: input-checksum-5d532037f7dda7e7fad290ddef53f69d
DEBUG   16824   __main__    3613995005.py   10  Output files for Flow object: Prediction_part1.R: [('Fitted_models', True), ('Weights', True)]
DEBUG   16824   __main__    4142405122.py   11  Checksum check result for Flow object: Prediction_part1.R: 0, checksum file: input-checksum-33757e2024067c7704d44232a63c870b
DEBUG   16824   __main__    2480783271.py   10  Update Prediction_part1.R, output: True, inputs: True
DEBUG   16824   __main__    3613995005.py   10  Output files for Flow object: Prediction_part1.R: [('Fitted_models', True), ('Weights', True)]
DEBUG   16824   __main__    4142405122.py   11  Checksum check result for Flow object: Prediction_part1.R: 0, checksum file: input-checksum-33757e2024067c7704d44232a63c870b
DEBUG   16824   __main__    3613995005.py   10  Output files for Flow object: Prediction_part2.R: [('Predicted_sapflux', True)]
DEBUG   16824   __main__    4142405122.py   11  Checksum check result for Flow object: Prediction_part2.R: 0, checksum file: input-checksum-d37df0122cd5162bc80b4c25b959e4b0
DEBUG   16824   __main__    2480783271.py   10  Update Prediction_part2.R, output: True, inputs: True
DEBUG   16824   __main__    3613995005.py   10  Output files for Flow object: Prediction_part2.R: [('Predicted_sapflux', True)]
DEBUG   16824   __main__    4142405122.py   11  Checksum check result for Flow object: Prediction_part2.R: 0, checksum file: input-checksum-d37df0122cd5162bc80b4c25b959e4b0
DEBUG   16824   __main__    3613995005.py   10  Output files for Flow object: Prediction_part3.R: [('Predicted_water_usage', True)]
DEBUG   16824   __main__    4142405122.py   11  Checksum check result for Flow object: Prediction_part3.R: 0, checksum file: input-checksum-b4e0cb61356d5cc9830bc495b1584e9c
DEBUG   16824   __main__    2480783271.py   10  Update Prediction_part3.R, output: True, inputs: True
DEBUG   16824   __main__    3613995005.py   10  Output files for Flow object: Prediction_part3.R: [('Predicted_water_usage', True)]
DEBUG   16824   __main__    4142405122.py   11  Checksum check result for Flow object: Prediction_part3.R: 0, checksum file: input-checksum-b4e0cb61356d5cc9830bc495b1584e9c
INFO    16824   __main__    2480783271.py   33  RScriptModule flow update complete.
INFO    16824   __main__    3112676160.py   42  RScriptModule initialized with 4 flow objects.
Meta_data True /home/fenke/repos/corebridge/nbs/saves/rscript/workdir/Data/Meta_data.csv
Sapflux_Tilia_train True /home/fenke/repos/corebridge/nbs/saves/rscript/workdir/Data/Sapflux_Tilia_train.csv
Weather_Tilia_train True /home/fenke/repos/corebridge/nbs/saves/rscript/workdir/Data/Weather_Tilia_train.csv
Weather_Tilia_pred True /home/fenke/repos/corebridge/nbs/saves/rscript/workdir/Data/Weather_Tilia_pred.csv
Modelling_data True /home/fenke/repos/corebridge/nbs/saves/rscript/workdir/Modelling_data.RData
Fitted_models True /home/fenke/repos/corebridge/nbs/saves/rscript/workdir/Fitted_models.RData
Weights True /home/fenke/repos/corebridge/nbs/saves/rscript/workdir/Weights.RData
Prediction_data True /home/fenke/repos/corebridge/nbs/saves/rscript/workdir/Prediction_data.RData
Predicted_sapflux True /home/fenke/repos/corebridge/nbs/saves/rscript/workdir/Predicted_sapflux.RData
Assets:
 total 2112
drwxr-xr-x 1 fenke users     264 Aug 22 14:42 .
drwxr-xr-x 1 fenke users      14 Jul  3 13:28 ..
-rw-r--r-- 1 fenke users    5790 Jun 20 11:25 Data_preparation.R
-rw-r--r-- 1 fenke users      61 Jul  5 10:31 hello.R
-rw-r--r-- 1 fenke users    6397 Jun 20 11:25 Prediction_part1.R
-rw-r--r-- 1 fenke users    4203 Jun 20 11:25 Prediction_part2.R
-rw-r--r-- 1 fenke users    3175 Jun 20 11:25 Prediction_part3.R
-rw-r--r-- 1 fenke users 2117834 Aug 22 14:41 SapflowPredictionData.zip
-rw-r--r-- 1 fenke users    6502 Aug 21 13:10 SapflowPredictionScripts.zip
module.flow_mapping
{0: {'in': {'Meta_data': 'Data/Meta_data.csv',
   'Sapflux_Tilia_train': 'Data/Sapflux_Tilia_train.csv',
   'Weather_Tilia_train': 'Data/Weather_Tilia_train.csv',
   'Weather_Tilia_pred': 'Data/Weather_Tilia_pred.csv'},
  'out': {'Modelling_data': 'Modelling_data.RData',
   'Prediction_data': 'Prediction_data.RData'},
  'libs': ['lubridate', 'stringr', 'zoo'],
  'name': 'Data_preparation.R'},
 1: {'in': {'Modelling_data': 'Modelling_data.RData'},
  'out': {'Fitted_models': 'Fitted_models.RData', 'Weights': 'Weights.RData'},
  'libs': ['lubridate', 'stringr', 'mgcv'],
  'name': 'Prediction_part1.R'},
 2: {'in': {'Fitted_models': 'Fitted_models.RData',
   'Weights': 'Weights.RData',
   'Modelling_data': 'Modelling_data.RData',
   'Prediction_data': 'Prediction_data.RData'},
  'out': {'Predicted_sapflux': 'Predicted_sapflux.RData'},
  'libs': ['lubridate', 'stringr', 'mgcv'],
  'name': 'Prediction_part2.R'},
 3: {'in': {'Predicted_sapflux': 'Predicted_sapflux.RData'},
  'out': {'Predicted_water_usage': 'Predicted_water_usage.RData'},
  'libs': ['lubridate', 'stringr'],
  'name': 'Prediction_part3.R'}}
module.get_flow_status()
[]
test_data   = [
    {
        'time': '2020-01-01 00:00:00',
        'temperature': 10,
        'humidity': 10
    },
    {
        'time': '2020-01-01 01:00:00',
        'temperature': 11,
        'humidity': 11
    }
]
module.data_files_map
{'Meta_data': 'Data/Meta_data.csv',
 'Sapflux_Tilia_train': 'Data/Sapflux_Tilia_train.csv',
 'Weather_Tilia_train': 'Data/Weather_Tilia_train.csv',
 'Weather_Tilia_pred': 'Data/Weather_Tilia_pred.csv',
 'Modelling_data': 'Modelling_data.RData',
 'Fitted_models': 'Fitted_models.RData',
 'Weights': 'Weights.RData',
 'Prediction_data': 'Prediction_data.RData',
 'Predicted_sapflux': 'Predicted_sapflux.RData'}
test_df = pd.DataFrame(test_data)
test_df.index=pd.DatetimeIndex(test_df['time'])
test_df.drop('time', axis=1, inplace=True)
test_df
temperature humidity
time
2020-01-01 00:00:00 10 10
2020-01-01 01:00:00 11 11
module.write_uploaded_data(test_df, 'upload_data.csv')
DEBUG   16824   __main__    2136897418.py   11  Writing 2 rows to /home/fenke/repos/corebridge/nbs/saves/rscript/workdir/upload_data.csv
with open(module.get_save_path('upload_data.csv'), 'rt') as cf:
    print(cf.read())
time,temperature,humidity
2020-01-01 00:00:00,10,10
2020-01-01 01:00:00,11,11
module.infer(test_data, writeTag='uploaded_data.csv')
DEBUG   16824   __main__    3613995005.py   10  Output files for Flow object: Data_preparation.R: [('Modelling_data', True), ('Prediction_data', True)]
DEBUG   16824   __main__    4142405122.py   11  Checksum check result for Flow object: Data_preparation.R: 0, checksum file: input-checksum-5d532037f7dda7e7fad290ddef53f69d
DEBUG   16824   __main__    2480783271.py   10  Update Data_preparation.R, output: True, inputs: True
DEBUG   16824   __main__    3613995005.py   10  Output files for Flow object: Data_preparation.R: [('Modelling_data', True), ('Prediction_data', True)]
DEBUG   16824   __main__    4142405122.py   11  Checksum check result for Flow object: Data_preparation.R: 0, checksum file: input-checksum-5d532037f7dda7e7fad290ddef53f69d
DEBUG   16824   __main__    3613995005.py   10  Output files for Flow object: Prediction_part1.R: [('Fitted_models', True), ('Weights', True)]
DEBUG   16824   __main__    4142405122.py   11  Checksum check result for Flow object: Prediction_part1.R: 0, checksum file: input-checksum-33757e2024067c7704d44232a63c870b
DEBUG   16824   __main__    2480783271.py   10  Update Prediction_part1.R, output: True, inputs: True
DEBUG   16824   __main__    3613995005.py   10  Output files for Flow object: Prediction_part1.R: [('Fitted_models', True), ('Weights', True)]
DEBUG   16824   __main__    4142405122.py   11  Checksum check result for Flow object: Prediction_part1.R: 0, checksum file: input-checksum-33757e2024067c7704d44232a63c870b
DEBUG   16824   __main__    3613995005.py   10  Output files for Flow object: Prediction_part2.R: [('Predicted_sapflux', True)]
DEBUG   16824   __main__    4142405122.py   11  Checksum check result for Flow object: Prediction_part2.R: 0, checksum file: input-checksum-d37df0122cd5162bc80b4c25b959e4b0
DEBUG   16824   __main__    2480783271.py   10  Update Prediction_part2.R, output: True, inputs: True
DEBUG   16824   __main__    3613995005.py   10  Output files for Flow object: Prediction_part2.R: [('Predicted_sapflux', True)]
DEBUG   16824   __main__    4142405122.py   11  Checksum check result for Flow object: Prediction_part2.R: 0, checksum file: input-checksum-d37df0122cd5162bc80b4c25b959e4b0
DEBUG   16824   __main__    3613995005.py   10  Output files for Flow object: Prediction_part3.R: [('Predicted_water_usage', True)]
DEBUG   16824   __main__    4142405122.py   11  Checksum check result for Flow object: Prediction_part3.R: 0, checksum file: input-checksum-b4e0cb61356d5cc9830bc495b1584e9c
DEBUG   16824   __main__    2480783271.py   10  Update Prediction_part3.R, output: True, inputs: True
DEBUG   16824   __main__    3613995005.py   10  Output files for Flow object: Prediction_part3.R: [('Predicted_water_usage', True)]
DEBUG   16824   __main__    4142405122.py   11  Checksum check result for Flow object: Prediction_part3.R: 0, checksum file: input-checksum-b4e0cb61356d5cc9830bc495b1584e9c
INFO    16824   __main__    2480783271.py   33  RScriptModule flow update complete.
DEBUG   16824   __main__    2136897418.py   73  Writing 2 rows to uploaded_data.csv
DEBUG   16824   __main__    2136897418.py   11  Writing 2 rows to /home/fenke/repos/corebridge/nbs/saves/rscript/workdir/uploaded_data.csv
{'msg': ['Startup time: 2024-09-13T08:10:16.217583+00:00',
  "init_args: (), init_kwargs: {'assets_dir': '/home/fenke/repos/corebridge/nbs/assets/rscript', 'save_dir': '/home/fenke/repos/corebridge/nbs/saves/rscript'}",
  'writeTag: uploaded_data.csv, readTag: None, camelCase: False',
  'lastSeen: False, recordformat: records, timezone: UTC'],
 'data': []}
module.get_save_path('uploaded_data.csv')
'/home/fenke/repos/corebridge/nbs/saves/rscript/workdir/uploaded_data.csv'
with open(module.get_save_path('uploaded_data.csv'), 'rt') as cf:
    print(cf.read())
time,temperature,humidity
2020-01-01 00:00:00,10,10
2020-01-01 01:00:00,11,11
#converted = rdata.read_rda(module.get_save_path(module.data_files_map.get('Predicted_sapflux')))
module.read_data('Predicted_sapflux', camel_case=True)
DEBUG   16824   __main__    2136897418.py   23  Read 168 rows from /home/fenke/repos/corebridge/nbs/saves/rscript/workdir/Predicted_sapflux.RData for Predicted_sapflux (camel_case=True)
ensemblePredEnsemble1Equ ensemblePredEnsemble1Mse ensemblePredEnsemble2Equ ensemblePredEnsemble2Mse ensemblePredEnsemble3Equ ensemblePredEnsemble3Mse ensemblePredEnsemble4Equ ensemblePredEnsemble4Mse ensemblePredEnsemble5Equ ensemblePredEnsemble5Mse ... ensemblePredError8Equ ensemblePredError8Mse ensemblePredError9Equ ensemblePredError9Mse ensemblePredError10Equ ensemblePredError10Mse ensemblePredTypicalEquPred ensemblePredTypicalEquSe ensemblePredTypicalMsePred ensemblePredTypicalMseSe
time
2023-08-01 00:00:01 0.549107 0.542947 0.666708 0.658971 0.780691 0.771425 0.660063 0.652414 0.584461 0.577826 ... NaN NaN 0.356676 0.356170 0.375093 0.375200 0.617332 0.125831 0.549231 0.126207
2023-08-01 01:00:01 0.490717 0.483454 0.561241 0.552198 0.629595 0.618828 0.557256 0.548314 0.511918 0.504120 ... NaN NaN 0.510744 0.508403 0.527951 0.526323 0.531630 0.171059 0.471002 0.170967
2023-08-01 02:00:01 0.455916 0.448803 0.499389 0.490743 0.541525 0.531393 0.496933 0.488373 0.468985 0.461412 ... NaN NaN 0.594047 0.589900 0.606447 0.602945 0.481136 0.191053 0.425821 0.190360
2023-08-01 03:00:01 0.424082 0.417034 0.451503 0.443261 0.478080 0.468680 0.449953 0.441779 0.432326 0.424918 ... NaN NaN 0.641129 0.635564 0.649752 0.644735 0.439990 0.201109 0.389024 0.199905
2023-08-01 04:00:01 0.407526 0.402109 0.425149 0.418847 0.442231 0.435070 0.424153 0.417901 0.412824 0.407141 ... NaN NaN 0.622489 0.615642 0.627102 0.620682 0.417750 0.191545 0.370637 0.189855
... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ...
2023-08-07 19:00:01 3.658327 3.659715 3.658327 3.659715 3.658327 3.659715 3.658327 3.659715 3.658327 3.659715 ... NaN NaN 2.302566 2.259919 2.302566 2.259919 3.658327 0.690770 3.293744 0.677976
2023-08-07 20:00:01 2.298355 2.278851 2.298355 2.278851 2.298355 2.278851 2.298355 2.278851 2.298355 2.278851 ... NaN NaN 1.790125 1.785654 1.790125 1.785654 2.298355 0.537037 2.050966 0.535696
2023-08-07 21:00:01 1.512578 1.489109 1.512578 1.489109 1.512578 1.489109 1.512578 1.489109 1.512578 1.489109 ... NaN NaN 1.489199 1.497512 1.489199 1.497512 1.512578 0.446760 1.340198 0.449254
2023-08-07 22:00:01 1.018082 1.001020 1.018082 1.001020 1.018082 1.001020 1.018082 1.001020 1.018082 1.001020 ... NaN NaN 1.051568 1.060695 1.051568 1.060695 1.018082 0.315470 0.900918 0.318209
2023-08-07 23:00:01 0.667827 0.661204 0.667827 0.661204 0.667827 0.661204 0.667827 0.661204 0.667827 0.661204 ... NaN NaN 0.647845 0.656302 0.647845 0.656302 0.667827 0.194353 0.595083 0.196890

168 rows × 44 columns

result =module.infer(test_data, writeTag='uploaded_data.csv', readTag='Predicted_sapflux', camelCase=True)
DEBUG   16824   __main__    3613995005.py   10  Output files for Flow object: Data_preparation.R: [('Modelling_data', True), ('Prediction_data', True)]
DEBUG   16824   __main__    4142405122.py   11  Checksum check result for Flow object: Data_preparation.R: 0, checksum file: input-checksum-5d532037f7dda7e7fad290ddef53f69d
DEBUG   16824   __main__    2480783271.py   10  Update Data_preparation.R, output: True, inputs: True
DEBUG   16824   __main__    3613995005.py   10  Output files for Flow object: Data_preparation.R: [('Modelling_data', True), ('Prediction_data', True)]
DEBUG   16824   __main__    4142405122.py   11  Checksum check result for Flow object: Data_preparation.R: 0, checksum file: input-checksum-5d532037f7dda7e7fad290ddef53f69d
DEBUG   16824   __main__    3613995005.py   10  Output files for Flow object: Prediction_part1.R: [('Fitted_models', True), ('Weights', True)]
DEBUG   16824   __main__    4142405122.py   11  Checksum check result for Flow object: Prediction_part1.R: 0, checksum file: input-checksum-33757e2024067c7704d44232a63c870b
DEBUG   16824   __main__    2480783271.py   10  Update Prediction_part1.R, output: True, inputs: True
DEBUG   16824   __main__    3613995005.py   10  Output files for Flow object: Prediction_part1.R: [('Fitted_models', True), ('Weights', True)]
DEBUG   16824   __main__    4142405122.py   11  Checksum check result for Flow object: Prediction_part1.R: 0, checksum file: input-checksum-33757e2024067c7704d44232a63c870b
DEBUG   16824   __main__    3613995005.py   10  Output files for Flow object: Prediction_part2.R: [('Predicted_sapflux', True)]
DEBUG   16824   __main__    4142405122.py   11  Checksum check result for Flow object: Prediction_part2.R: 0, checksum file: input-checksum-d37df0122cd5162bc80b4c25b959e4b0
DEBUG   16824   __main__    2480783271.py   10  Update Prediction_part2.R, output: True, inputs: True
DEBUG   16824   __main__    3613995005.py   10  Output files for Flow object: Prediction_part2.R: [('Predicted_sapflux', True)]
DEBUG   16824   __main__    4142405122.py   11  Checksum check result for Flow object: Prediction_part2.R: 0, checksum file: input-checksum-d37df0122cd5162bc80b4c25b959e4b0
DEBUG   16824   __main__    3613995005.py   10  Output files for Flow object: Prediction_part3.R: [('Predicted_water_usage', True)]
DEBUG   16824   __main__    4142405122.py   11  Checksum check result for Flow object: Prediction_part3.R: 0, checksum file: input-checksum-b4e0cb61356d5cc9830bc495b1584e9c
DEBUG   16824   __main__    2480783271.py   10  Update Prediction_part3.R, output: True, inputs: True
DEBUG   16824   __main__    3613995005.py   10  Output files for Flow object: Prediction_part3.R: [('Predicted_water_usage', True)]
DEBUG   16824   __main__    4142405122.py   11  Checksum check result for Flow object: Prediction_part3.R: 0, checksum file: input-checksum-b4e0cb61356d5cc9830bc495b1584e9c
INFO    16824   __main__    2480783271.py   33  RScriptModule flow update complete.
DEBUG   16824   __main__    2136897418.py   73  Writing 2 rows to uploaded_data.csv
DEBUG   16824   __main__    2136897418.py   11  Writing 2 rows to /home/fenke/repos/corebridge/nbs/saves/rscript/workdir/uploaded_data.csv
DEBUG   16824   __main__    2136897418.py   23  Read 168 rows from /home/fenke/repos/corebridge/nbs/saves/rscript/workdir/Predicted_sapflux.RData for Predicted_sapflux (camel_case=True)
DEBUG   16824   __main__    2136897418.py   82  Read 168 rows from Predicted_sapflux
result['msg']
['Startup time: 2024-09-13T08:10:16.217583+00:00',
 "init_args: (), init_kwargs: {'assets_dir': '/home/fenke/repos/corebridge/nbs/assets/rscript', 'save_dir': '/home/fenke/repos/corebridge/nbs/saves/rscript'}",
 'writeTag: uploaded_data.csv, readTag: Predicted_sapflux, camelCase: True',
 'lastSeen: False, recordformat: records, timezone: UTC']
timeseries_dataframe_from_datadict(result['data'], ['time'], 'records')
ensemblePredEnsemble1Equ ensemblePredEnsemble1Mse ensemblePredEnsemble2Equ ensemblePredEnsemble2Mse ensemblePredEnsemble3Equ ensemblePredEnsemble3Mse ensemblePredEnsemble4Equ ensemblePredEnsemble4Mse ensemblePredEnsemble5Equ ensemblePredEnsemble5Mse ... ensemblePredError7Equ ensemblePredError7Mse ensemblePredError9Equ ensemblePredError9Mse ensemblePredError10Equ ensemblePredError10Mse ensemblePredTypicalEquPred ensemblePredTypicalEquSe ensemblePredTypicalMsePred ensemblePredTypicalMseSe
time
2023-08-01 00:00:01+00:00 0.549107 0.542947 0.666708 0.658971 0.780691 0.771425 0.660063 0.652414 0.584461 0.577826 ... 0.392640 0.393263 0.356676 0.356170 0.375093 0.375200 0.617332 0.125831 0.549231 0.126207
2023-08-01 01:00:01+00:00 0.490717 0.483454 0.561241 0.552198 0.629595 0.618828 0.557256 0.548314 0.511918 0.504120 ... 0.544555 0.543532 0.510744 0.508403 0.527951 0.526323 0.531630 0.171059 0.471002 0.170967
2023-08-01 02:00:01+00:00 0.455916 0.448803 0.499389 0.490743 0.541525 0.531393 0.496933 0.488373 0.468985 0.461412 ... 0.618437 0.615487 0.594047 0.589900 0.606447 0.602945 0.481136 0.191053 0.425821 0.190360
2023-08-01 03:00:01+00:00 0.424082 0.417034 0.451503 0.443261 0.478080 0.468680 0.449953 0.441779 0.432326 0.424918 ... 0.658013 0.653465 0.641129 0.635564 0.649752 0.644735 0.439990 0.201109 0.389024 0.199905
2023-08-01 04:00:01+00:00 0.407526 0.402109 0.425149 0.418847 0.442231 0.435070 0.424153 0.417901 0.412824 0.407141 ... 0.631589 0.625537 0.622489 0.615642 0.627102 0.620682 0.417750 0.191545 0.370637 0.189855
... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ...
2023-08-07 19:00:01+00:00 3.658327 3.659715 3.658327 3.659715 3.658327 3.659715 3.658327 3.659715 3.658327 3.659715 ... 2.302566 2.259919 2.302566 2.259919 2.302566 2.259919 3.658327 0.690770 3.293744 0.677976
2023-08-07 20:00:01+00:00 2.298355 2.278851 2.298355 2.278851 2.298355 2.278851 2.298355 2.278851 2.298355 2.278851 ... 1.790125 1.785654 1.790125 1.785654 1.790125 1.785654 2.298355 0.537037 2.050966 0.535696
2023-08-07 21:00:01+00:00 1.512578 1.489109 1.512578 1.489109 1.512578 1.489109 1.512578 1.489109 1.512578 1.489109 ... 1.489199 1.497512 1.489199 1.497512 1.489199 1.497512 1.512578 0.446760 1.340198 0.449254
2023-08-07 22:00:01+00:00 1.018082 1.001020 1.018082 1.001020 1.018082 1.001020 1.018082 1.001020 1.018082 1.001020 ... 1.051568 1.060695 1.051568 1.060695 1.051568 1.060695 1.018082 0.315470 0.900918 0.318209
2023-08-07 23:00:01+00:00 0.667827 0.661204 0.667827 0.661204 0.667827 0.661204 0.667827 0.661204 0.667827 0.661204 ... 0.647845 0.656302 0.647845 0.656302 0.647845 0.656302 0.667827 0.194353 0.595083 0.196890

168 rows × 41 columns

timeseries_dataframe_from_datadict(module.infer(test_data, camelCase=True, writeTag='uploaded_data.csv', readTag='Predicted_sapflux')['data'], ['time'], 'records')
DEBUG   16824   __main__    3613995005.py   10  Output files for Flow object: Data_preparation.R: [('Modelling_data', True), ('Prediction_data', True)]
DEBUG   16824   __main__    4142405122.py   11  Checksum check result for Flow object: Data_preparation.R: 0, checksum file: input-checksum-5d532037f7dda7e7fad290ddef53f69d
DEBUG   16824   __main__    2480783271.py   10  Update Data_preparation.R, output: True, inputs: True
DEBUG   16824   __main__    3613995005.py   10  Output files for Flow object: Data_preparation.R: [('Modelling_data', True), ('Prediction_data', True)]
DEBUG   16824   __main__    4142405122.py   11  Checksum check result for Flow object: Data_preparation.R: 0, checksum file: input-checksum-5d532037f7dda7e7fad290ddef53f69d
DEBUG   16824   __main__    3613995005.py   10  Output files for Flow object: Prediction_part1.R: [('Fitted_models', True), ('Weights', True)]
DEBUG   16824   __main__    4142405122.py   11  Checksum check result for Flow object: Prediction_part1.R: 0, checksum file: input-checksum-33757e2024067c7704d44232a63c870b
DEBUG   16824   __main__    2480783271.py   10  Update Prediction_part1.R, output: True, inputs: True
DEBUG   16824   __main__    3613995005.py   10  Output files for Flow object: Prediction_part1.R: [('Fitted_models', True), ('Weights', True)]
DEBUG   16824   __main__    4142405122.py   11  Checksum check result for Flow object: Prediction_part1.R: 0, checksum file: input-checksum-33757e2024067c7704d44232a63c870b
DEBUG   16824   __main__    3613995005.py   10  Output files for Flow object: Prediction_part2.R: [('Predicted_sapflux', True)]
DEBUG   16824   __main__    4142405122.py   11  Checksum check result for Flow object: Prediction_part2.R: 0, checksum file: input-checksum-d37df0122cd5162bc80b4c25b959e4b0
DEBUG   16824   __main__    2480783271.py   10  Update Prediction_part2.R, output: True, inputs: True
DEBUG   16824   __main__    3613995005.py   10  Output files for Flow object: Prediction_part2.R: [('Predicted_sapflux', True)]
DEBUG   16824   __main__    4142405122.py   11  Checksum check result for Flow object: Prediction_part2.R: 0, checksum file: input-checksum-d37df0122cd5162bc80b4c25b959e4b0
DEBUG   16824   __main__    3613995005.py   10  Output files for Flow object: Prediction_part3.R: [('Predicted_water_usage', True)]
DEBUG   16824   __main__    4142405122.py   11  Checksum check result for Flow object: Prediction_part3.R: 0, checksum file: input-checksum-b4e0cb61356d5cc9830bc495b1584e9c
DEBUG   16824   __main__    2480783271.py   10  Update Prediction_part3.R, output: True, inputs: True
DEBUG   16824   __main__    3613995005.py   10  Output files for Flow object: Prediction_part3.R: [('Predicted_water_usage', True)]
DEBUG   16824   __main__    4142405122.py   11  Checksum check result for Flow object: Prediction_part3.R: 0, checksum file: input-checksum-b4e0cb61356d5cc9830bc495b1584e9c
INFO    16824   __main__    2480783271.py   33  RScriptModule flow update complete.
DEBUG   16824   __main__    2136897418.py   73  Writing 2 rows to uploaded_data.csv
DEBUG   16824   __main__    2136897418.py   11  Writing 2 rows to /home/fenke/repos/corebridge/nbs/saves/rscript/workdir/uploaded_data.csv
DEBUG   16824   __main__    2136897418.py   23  Read 168 rows from /home/fenke/repos/corebridge/nbs/saves/rscript/workdir/Predicted_sapflux.RData for Predicted_sapflux (camel_case=True)
DEBUG   16824   __main__    2136897418.py   82  Read 168 rows from Predicted_sapflux
ensemblePredEnsemble1Equ ensemblePredEnsemble1Mse ensemblePredEnsemble2Equ ensemblePredEnsemble2Mse ensemblePredEnsemble3Equ ensemblePredEnsemble3Mse ensemblePredEnsemble4Equ ensemblePredEnsemble4Mse ensemblePredEnsemble5Equ ensemblePredEnsemble5Mse ... ensemblePredError7Equ ensemblePredError7Mse ensemblePredError9Equ ensemblePredError9Mse ensemblePredError10Equ ensemblePredError10Mse ensemblePredTypicalEquPred ensemblePredTypicalEquSe ensemblePredTypicalMsePred ensemblePredTypicalMseSe
time
2023-08-01 00:00:01+00:00 0.549107 0.542947 0.666708 0.658971 0.780691 0.771425 0.660063 0.652414 0.584461 0.577826 ... 0.392640 0.393263 0.356676 0.356170 0.375093 0.375200 0.617332 0.125831 0.549231 0.126207
2023-08-01 01:00:01+00:00 0.490717 0.483454 0.561241 0.552198 0.629595 0.618828 0.557256 0.548314 0.511918 0.504120 ... 0.544555 0.543532 0.510744 0.508403 0.527951 0.526323 0.531630 0.171059 0.471002 0.170967
2023-08-01 02:00:01+00:00 0.455916 0.448803 0.499389 0.490743 0.541525 0.531393 0.496933 0.488373 0.468985 0.461412 ... 0.618437 0.615487 0.594047 0.589900 0.606447 0.602945 0.481136 0.191053 0.425821 0.190360
2023-08-01 03:00:01+00:00 0.424082 0.417034 0.451503 0.443261 0.478080 0.468680 0.449953 0.441779 0.432326 0.424918 ... 0.658013 0.653465 0.641129 0.635564 0.649752 0.644735 0.439990 0.201109 0.389024 0.199905
2023-08-01 04:00:01+00:00 0.407526 0.402109 0.425149 0.418847 0.442231 0.435070 0.424153 0.417901 0.412824 0.407141 ... 0.631589 0.625537 0.622489 0.615642 0.627102 0.620682 0.417750 0.191545 0.370637 0.189855
... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ...
2023-08-07 19:00:01+00:00 3.658327 3.659715 3.658327 3.659715 3.658327 3.659715 3.658327 3.659715 3.658327 3.659715 ... 2.302566 2.259919 2.302566 2.259919 2.302566 2.259919 3.658327 0.690770 3.293744 0.677976
2023-08-07 20:00:01+00:00 2.298355 2.278851 2.298355 2.278851 2.298355 2.278851 2.298355 2.278851 2.298355 2.278851 ... 1.790125 1.785654 1.790125 1.785654 1.790125 1.785654 2.298355 0.537037 2.050966 0.535696
2023-08-07 21:00:01+00:00 1.512578 1.489109 1.512578 1.489109 1.512578 1.489109 1.512578 1.489109 1.512578 1.489109 ... 1.489199 1.497512 1.489199 1.497512 1.489199 1.497512 1.512578 0.446760 1.340198 0.449254
2023-08-07 22:00:01+00:00 1.018082 1.001020 1.018082 1.001020 1.018082 1.001020 1.018082 1.001020 1.018082 1.001020 ... 1.051568 1.060695 1.051568 1.060695 1.051568 1.060695 1.018082 0.315470 0.900918 0.318209
2023-08-07 23:00:01+00:00 0.667827 0.661204 0.667827 0.661204 0.667827 0.661204 0.667827 0.661204 0.667827 0.661204 ... 0.647845 0.656302 0.647845 0.656302 0.647845 0.656302 0.667827 0.194353 0.595083 0.196890

168 rows × 41 columns

module.update_flow()
DEBUG   16824   __main__    3613995005.py   10  Output files for Flow object: Data_preparation.R: [('Modelling_data', True), ('Prediction_data', True)]
DEBUG   16824   __main__    4142405122.py   11  Checksum check result for Flow object: Data_preparation.R: 0, checksum file: input-checksum-5d532037f7dda7e7fad290ddef53f69d
DEBUG   16824   __main__    2480783271.py   10  Update Data_preparation.R, output: True, inputs: True
DEBUG   16824   __main__    3613995005.py   10  Output files for Flow object: Data_preparation.R: [('Modelling_data', True), ('Prediction_data', True)]
DEBUG   16824   __main__    4142405122.py   11  Checksum check result for Flow object: Data_preparation.R: 0, checksum file: input-checksum-5d532037f7dda7e7fad290ddef53f69d
DEBUG   16824   __main__    3613995005.py   10  Output files for Flow object: Prediction_part1.R: [('Fitted_models', True), ('Weights', True)]
DEBUG   16824   __main__    4142405122.py   11  Checksum check result for Flow object: Prediction_part1.R: 0, checksum file: input-checksum-33757e2024067c7704d44232a63c870b
DEBUG   16824   __main__    2480783271.py   10  Update Prediction_part1.R, output: True, inputs: True
DEBUG   16824   __main__    3613995005.py   10  Output files for Flow object: Prediction_part1.R: [('Fitted_models', True), ('Weights', True)]
DEBUG   16824   __main__    4142405122.py   11  Checksum check result for Flow object: Prediction_part1.R: 0, checksum file: input-checksum-33757e2024067c7704d44232a63c870b
DEBUG   16824   __main__    3613995005.py   10  Output files for Flow object: Prediction_part2.R: [('Predicted_sapflux', True)]
DEBUG   16824   __main__    4142405122.py   11  Checksum check result for Flow object: Prediction_part2.R: 0, checksum file: input-checksum-d37df0122cd5162bc80b4c25b959e4b0
DEBUG   16824   __main__    2480783271.py   10  Update Prediction_part2.R, output: True, inputs: True
DEBUG   16824   __main__    3613995005.py   10  Output files for Flow object: Prediction_part2.R: [('Predicted_sapflux', True)]
DEBUG   16824   __main__    4142405122.py   11  Checksum check result for Flow object: Prediction_part2.R: 0, checksum file: input-checksum-d37df0122cd5162bc80b4c25b959e4b0
DEBUG   16824   __main__    3613995005.py   10  Output files for Flow object: Prediction_part3.R: [('Predicted_water_usage', True)]
DEBUG   16824   __main__    4142405122.py   11  Checksum check result for Flow object: Prediction_part3.R: 0, checksum file: input-checksum-b4e0cb61356d5cc9830bc495b1584e9c
DEBUG   16824   __main__    2480783271.py   10  Update Prediction_part3.R, output: True, inputs: True
DEBUG   16824   __main__    3613995005.py   10  Output files for Flow object: Prediction_part3.R: [('Predicted_water_usage', True)]
DEBUG   16824   __main__    4142405122.py   11  Checksum check result for Flow object: Prediction_part3.R: 0, checksum file: input-checksum-b4e0cb61356d5cc9830bc495b1584e9c
INFO    16824   __main__    2480783271.py   33  RScriptModule flow update complete.
[]
module.get_flow_status()
[]
module.infer([])
DEBUG   16824   __main__    3613995005.py   10  Output files for Flow object: Data_preparation.R: [('Modelling_data', True), ('Prediction_data', True)]
DEBUG   16824   __main__    4142405122.py   11  Checksum check result for Flow object: Data_preparation.R: 0, checksum file: input-checksum-5d532037f7dda7e7fad290ddef53f69d
DEBUG   16824   __main__    2480783271.py   10  Update Data_preparation.R, output: True, inputs: True
DEBUG   16824   __main__    3613995005.py   10  Output files for Flow object: Data_preparation.R: [('Modelling_data', True), ('Prediction_data', True)]
DEBUG   16824   __main__    4142405122.py   11  Checksum check result for Flow object: Data_preparation.R: 0, checksum file: input-checksum-5d532037f7dda7e7fad290ddef53f69d
DEBUG   16824   __main__    3613995005.py   10  Output files for Flow object: Prediction_part1.R: [('Fitted_models', True), ('Weights', True)]
DEBUG   16824   __main__    4142405122.py   11  Checksum check result for Flow object: Prediction_part1.R: 0, checksum file: input-checksum-33757e2024067c7704d44232a63c870b
DEBUG   16824   __main__    2480783271.py   10  Update Prediction_part1.R, output: True, inputs: True
DEBUG   16824   __main__    3613995005.py   10  Output files for Flow object: Prediction_part1.R: [('Fitted_models', True), ('Weights', True)]
DEBUG   16824   __main__    4142405122.py   11  Checksum check result for Flow object: Prediction_part1.R: 0, checksum file: input-checksum-33757e2024067c7704d44232a63c870b
DEBUG   16824   __main__    3613995005.py   10  Output files for Flow object: Prediction_part2.R: [('Predicted_sapflux', True)]
DEBUG   16824   __main__    4142405122.py   11  Checksum check result for Flow object: Prediction_part2.R: 0, checksum file: input-checksum-d37df0122cd5162bc80b4c25b959e4b0
DEBUG   16824   __main__    2480783271.py   10  Update Prediction_part2.R, output: True, inputs: True
DEBUG   16824   __main__    3613995005.py   10  Output files for Flow object: Prediction_part2.R: [('Predicted_sapflux', True)]
DEBUG   16824   __main__    4142405122.py   11  Checksum check result for Flow object: Prediction_part2.R: 0, checksum file: input-checksum-d37df0122cd5162bc80b4c25b959e4b0
DEBUG   16824   __main__    3613995005.py   10  Output files for Flow object: Prediction_part3.R: [('Predicted_water_usage', True)]
DEBUG   16824   __main__    4142405122.py   11  Checksum check result for Flow object: Prediction_part3.R: 0, checksum file: input-checksum-b4e0cb61356d5cc9830bc495b1584e9c
DEBUG   16824   __main__    2480783271.py   10  Update Prediction_part3.R, output: True, inputs: True
DEBUG   16824   __main__    3613995005.py   10  Output files for Flow object: Prediction_part3.R: [('Predicted_water_usage', True)]
DEBUG   16824   __main__    4142405122.py   11  Checksum check result for Flow object: Prediction_part3.R: 0, checksum file: input-checksum-b4e0cb61356d5cc9830bc495b1584e9c
INFO    16824   __main__    2480783271.py   33  RScriptModule flow update complete.
{'msg': ['Startup time: 2024-09-13T08:10:16.217583+00:00',
  "init_args: (), init_kwargs: {'assets_dir': '/home/fenke/repos/corebridge/nbs/assets/rscript', 'save_dir': '/home/fenke/repos/corebridge/nbs/saves/rscript'}",
  'writeTag: None, readTag: None, camelCase: False',
  'lastSeen: False, recordformat: records, timezone: UTC'],
 'data': []}
run_rscript_nowait.lock_objects
{}

References