from nbdev.showdoc import *
import logging
#import addroot
Some things to set up first
Notebooks use nbdev thingses and addroot
makes importing from the repo-directory more convenient.
Exported source
import os, logging, json, hashlib
import typing,fcntl, subprocess
import traceback
import pandas as pd, numpy as np, rdata
from functools import reduce
from collections import namedtuple
from fastcore.basics import patch_to, patch
import corebridge
from corebridge.core import *
= init_console_logging(__name__, logging.DEBUG, timestamp=False) syslog
AICore uses an assets
dir from which we can read files, like scripts and a save
dir were modules can write and read files.
= os.path.join(os.path.abspath(os.getcwd()), '..', 'corebridge', 'assets', 'rscript')
assets_dir = os.path.join(os.path.abspath(os.getcwd()), '..', 'corebridge', 'saves', 'rscript') save_dir
get_save_path
get_save_path (datafile_name:str, save_dir:str)
Exported source
def get_asset_path(script_name, assets_dir:str):
return os.path.join(assets_dir, script_name)
def get_rscript_libpath(save_dir:str):
return os.path.join(save_dir, 'libs')
def get_save_path(datafile_name:str, save_dir:str):
return os.path.join(save_dir, datafile_name)
get_rscript_libpath
get_rscript_libpath (save_dir:str)
get_asset_path
get_asset_path (script_name, assets_dir:str)
Running R code
Scripts written in R can be run from a Python program using subprocess
and Rscript
.
Rscript
A script can be run from the commandline with
Rscript ascript.R
subproces
Python’s subprocess
module has the tools to execute external programs like Rscript
'Rscript',get_asset_path('hello.R', assets_dir)], capture_output=True).stdout.decode('UTF-8') subprocess.run([
'[1] "hello world"\n'
Example: sapflow prediction scripts
Data_preparation.R
Libraries
- lubridate
- stringer
- zoo
Input
Data/Meta_data.csv
Data/Sapflux_Tilia_train.csv
Data/Weather_Tilia_train.csv
Data/Weather_Tilia_pred.csv
Output
Modelling_data.RData
Prediction_data.RData
'Data_preparation.R'] = {
data_file_flow["in": [
"Data/Meta_data.csv",
"Data/Sapflux_Tilia_train.csv",
"Data/Weather_Tilia_train.csv",
"Data/Weather_Tilia_pred.csv"
],"out": [
"Modelling_data.RData",
"Prediction_data.RData"
],'libs':['lubridate', 'stringr', 'zoo']
}
'Data_preparation.R'] = {
data_file_flow["in": {
"Meta_data": "Data/Meta_data.csv",
"Sapflux_Tilia_train": "Data/Sapflux_Tilia_train.csv",
"Weather_Tilia_train": "Data/Weather_Tilia_train.csv",
"Weather_Tilia_pred": "Data/Weather_Tilia_pred.csv"
},"out": {
"Modelling_data": "Modelling_data.RData",
"Prediction_data": "Prediction_data.RData"
},"libs": [
"lubridate",
"stringr",
"zoo"
] }
Prediction_part1.R
Libraries
- lubridate
- stringr
- mgcv
Input
Modelling_data.RData
Output
Fitted_models.RData
Weights.RData
'Prediction_part1.R'] = {
data_file_flow["in": [
"Modelling_data.RData"
],"out": [
"Fitted_models.RData",
"Weights.RData"
],'libs':['lubridate', 'stringr', 'mgcv']
}
'Prediction_part1.R'] = {
data_file_flow["in": {
"Modelling_data": "Modelling_data.RData"
},"out": {
"Fitted_models": "Fitted_models.RData",
"Weights": "Weights.RData"
},"libs": [
"lubridate",
"stringr",
"mgcv"
],
}
Prediction_part2.R
Libraries
- lubridate
- stringr
- mgcv
Input
Fitted_models.RData
Weights.RData
Modelling_data.RData
Prediction_data.RData
Output
Predicted_sapflux.RData
'Prediction_part2.R'] = {
data_file_flow["in":[
'Fitted_models.RData',
'Weights.RData',
'Modelling_data.RData',
'Prediction_data.RData'
],"out":[
'Predicted_sapflux.RData'
],'libs':['lubridate', 'stringr', 'mgcv']
}
'Prediction_part2.R'] = {
data_file_flow["in": {
"Fitted_models": "Fitted_models.RData",
"Weights": "Weights.RData",
"Modelling_data": "Modelling_data.RData",
"Prediction_data": "Prediction_data.RData"
},"out": {
"Predicted_sapflux": "Predicted_sapflux.RData"
},"libs": [
"lubridate",
"stringr",
"mgcv"
], }
Prediction_part3.R
Libraries
- lubridate
- stringr
Input
Predicted_sapflux.RData
Output
Predicted_water_usage.RData
'Prediction_part3.R'] = {
data_file_flow['in':['Predicted_sapflux.RData'],
'out':['Predicted_water_usage.RData'],
'libs':['lubridate', 'stringr']
}
'Prediction_part3.R'] = {
data_file_flow["in": {
"Predicted_sapflux": "Predicted_sapflux.RData"
},"out": {
"Predicted_water_usage": "Predicted_water_usage.RData"
},"libs": [
"lubridate",
"stringr"
], }
{
D:Ffor P in data_file_flow.values()
for D,F in P['in'].items()
}
{'Meta_data': 'Data/Meta_data.csv',
'Sapflux_Tilia_train': 'Data/Sapflux_Tilia_train.csv',
'Weather_Tilia_train': 'Data/Weather_Tilia_train.csv',
'Weather_Tilia_pred': 'Data/Weather_Tilia_pred.csv',
'Modelling_data': 'Modelling_data.RData',
'Fitted_models': 'Fitted_models.RData',
'Weights': 'Weights.RData',
'Prediction_data': 'Prediction_data.RData',
'Predicted_sapflux': 'Predicted_sapflux.RData'}
= list(set([
data_files
ffor P in data_file_flow.values()
for D,F in P.items()
if D in ['in','out']
for f in list(F.values())
]))
display(Markdown(tabulate(
['/')[-1]]+[
[F.split('in' if F in P['in'].values() else 'out' if F in P['out'].values() else '--'
for S,P in data_file_flow.items()
] for F in data_files
],=['data-file / script'] + [I['name'] for I in data_file_flow.values()],
headers='github'
tablefmt )))
data-file / script | Data_preparation.R | Prediction_part1.R | Prediction_part2.R | Prediction_part3.R |
---|---|---|---|---|
Weather_Tilia_pred.csv | in | – | – | – |
Meta_data.csv | in | – | – | – |
Weights.RData | – | out | in | – |
Predicted_water_usage.RData | – | – | – | out |
Prediction_data.RData | out | – | in | – |
Weather_Tilia_train.csv | in | – | – | – |
Modelling_data.RData | out | in | in | – |
Predicted_sapflux.RData | – | – | out | in |
Fitted_models.RData | – | out | in | – |
Sapflux_Tilia_train.csv | in | – | – | – |
data_files
['Data/Weather_Tilia_pred.csv',
'Data/Meta_data.csv',
'Weights.RData',
'Predicted_water_usage.RData',
'Prediction_data.RData',
'Data/Weather_Tilia_train.csv',
'Modelling_data.RData',
'Predicted_sapflux.RData',
'Fitted_models.RData',
'Data/Sapflux_Tilia_train.csv']
for k, i in data_file_flow.items():
print(f"data_file_flow['{i['name']}'] = ", json.dumps(i, indent=2), '\n')
data_file_flow['Data_preparation.R'] = {
"in": {
"Meta_data": "Data/Meta_data.csv",
"Sapflux_Tilia_train": "Data/Sapflux_Tilia_train.csv",
"Weather_Tilia_train": "Data/Weather_Tilia_train.csv",
"Weather_Tilia_pred": "Data/Weather_Tilia_pred.csv"
},
"out": {
"Modelling_data": "Modelling_data.RData",
"Prediction_data": "Prediction_data.RData"
},
"libs": [
"lubridate",
"stringr",
"zoo"
],
"name": "Data_preparation.R"
}
data_file_flow['Prediction_part1.R'] = {
"in": {
"Modelling_data": "Modelling_data.RData"
},
"out": {
"Fitted_models": "Fitted_models.RData",
"Weights": "Weights.RData"
},
"libs": [
"lubridate",
"stringr",
"mgcv"
],
"name": "Prediction_part1.R"
}
data_file_flow['Prediction_part2.R'] = {
"in": {
"Fitted_models": "Fitted_models.RData",
"Weights": "Weights.RData",
"Modelling_data": "Modelling_data.RData",
"Prediction_data": "Prediction_data.RData"
},
"out": {
"Predicted_sapflux": "Predicted_sapflux.RData"
},
"libs": [
"lubridate",
"stringr",
"mgcv"
],
"name": "Prediction_part2.R"
}
data_file_flow['Prediction_part3.R'] = {
"in": {
"Predicted_sapflux": "Predicted_sapflux.RData"
},
"out": {
"Predicted_water_usage": "Predicted_water_usage.RData"
},
"libs": [
"lubridate",
"stringr"
],
"name": "Prediction_part3.R"
}
Import R libraries
Importing libraries can be done with
Rscript -e 'install.packages("drat", repos="https://cloud.r-project.org")'
print(subprocess.run(['Rscript','--version', ], capture_output=True).stdout.decode('UTF-8'))
Rscript (R) version 4.2.2 (2022-10-31)
= subprocess.run(['Rscript','--version', ], capture_output=True)
rversion print(rversion.stdout.decode('UTF-8'))
Rscript (R) version 4.2.2 (2022-10-31)
User library folder
get_rscript_env
get_rscript_env (libfolder:str)
Exported source
def get_rscript_env(libfolder:str):
if os.environ.get('R_LIBS_USER'):
return dict(**os.environ)
else:
return dict(**os.environ, R_LIBS_USER=str(libfolder))
assert get_rscript_env(get_rscript_libpath(save_dir)).get('R_LIBS_USER') == get_rscript_libpath(save_dir), 'rscript environment not set as expected'
Used libraries
list(set([L for V in data_file_flow.values() for L in V['libs']]))
['lubridate', 'mgcv', 'zoo', 'stringr']
= subprocess.run(['Rscript','-e', "library(lubridate)"], capture_output=True)
run_script_result print(run_script_result.stderr.decode('UTF-8'), run_script_result.returncode)
Error in library(lubridate) : there is no package called ‘lubridate’
Execution halted
1
for L in list(set([L for V in data_file_flow.values() for L in V['libs']]))] [os.path.exists(os.path.join(get_rscript_libpath(save_dir), L))
[True, False, True, True]
for L in list(set([L for V in data_file_flow.values() for L in V['libs']]))] [os.path.join(get_rscript_libpath(save_dir), L)
['/home/fenke/repos/corebridge/nbs/saves/rscript/libs/lubridate',
'/home/fenke/repos/corebridge/nbs/saves/rscript/libs/mgcv',
'/home/fenke/repos/corebridge/nbs/saves/rscript/libs/zoo',
'/home/fenke/repos/corebridge/nbs/saves/rscript/libs/stringr']
check_rscript_lib
check_rscript_lib (lib:str, libfolder:str)
Checks if a R package is installed in libfolder
Type | Details | |
---|---|---|
lib | str | name of the package |
libfolder | str | path to the library folder |
Returns | bool | True if the package is installed, False otherwise |
Exported source
def check_rscript_libs(libs:list, libfolder:str):
"""Quick check if for all the R packages in libs a folder exists in libfolder"""
return all([os.path.exists(os.path.join(libfolder, L)) for L in libs])
def check_rscript_lib(lib:str, libfolder:str) -> bool:
"""Checks if a R package is installed in libfolder
Parameters
----------
lib : str
name of the package
libfolder : str
path to the library folder
Returns
-------
bool
True if the package is installed, False otherwise
"""
= subprocess.run(['Rscript','-e', f"library({lib})"], env=get_rscript_env(libfolder), capture_output=True)
run_script_result if run_script_result.returncode != 0:
print('STDERR\n', run_script_result.stderr.decode('UTF-8'))
print('STDOUT\n', run_script_result.stdout.decode('UTF-8'))
return run_script_result.returncode == 0
check_rscript_libs
check_rscript_libs (libs:list, libfolder:str)
Quick check if for all the R packages in libs a folder exists in libfolder
list(set([L for V in data_file_flow.values() for L in V['libs']])), get_rscript_libpath(save_dir)) check_rscript_libs(
False
'mgcv', get_rscript_libpath(save_dir)) check_rscript_lib(
True
'zoo', get_rscript_libpath(save_dir)) check_rscript_lib(
True
Installing libraries
install_R_package_wait
install_R_package_wait (pkg:str|list, workdir:str, repo='https://cloud.r-project.org')
Checks and if neccesary installs an R package
Type | Default | Details | |
---|---|---|---|
pkg | str | list | name(s) of the package(s) | |
workdir | str | ||
repo | str | https://cloud.r-project.org |
'generics', 'timechange', 'rlang'], save_dir) install_R_package_wait([
DEBUG 1975 __main__ 1929856308.py 18 Using libfolder /home/fenke/repos/corebridge/nbs/saves/rscript/libs for packages
DEBUG 1975 __main__ 1929856308.py 22 Using libfolder /home/fenke/repos/corebridge/nbs/saves/rscript/libs for R_LIBS_USER
Installing package generics, testing attach ...
Attach successful. Library generics appears to have been installed
Installing package timechange, testing attach ...
Attach successful. Library timechange appears to have been installed
Installing package rlang, testing attach ...
Attach successful. Library rlang appears to have been installed
# install_R_package_wait(
# ['zoo'],
# libfolder=get_rscript_libpath(save_dir))
# install_R_package_wait(
# sorted(list(set([L for V in data_file_flow.values() for L in V['libs']]))),
# libfolder=get_rscript_libpath(save_dir))
Running the scripts
Installing scripts
unpack_assets
unpack_assets (assets_dir:str, save_dir:str)
Unpack the assets folder to the save_dir
Exported source
def unpack_assets(assets_dir:str, save_dir:str):
"""
Unpack the assets folder to the save_dir
"""
= subprocess.Popen(
unpack_result 'unzip', '-un', '-d', save_dir, os.path.join(assets_dir, '*.zip')],
[=subprocess.PIPE,
stdout=subprocess.PIPE
stderr
)return unpack_result
= unpack_assets(assets_dir, save_dir) unpack_result
unpack_result.args
['unzip',
'-un',
'-d',
'/home/fenke/repos/corebridge/nbs/saves/rscript',
'/home/fenke/repos/corebridge/nbs/assets/rscript/*.zip']
unpack_result.poll()
0
print(unpack_result.stdout.read().decode('UTF-8'))
Archive: /home/fenke/repos/corebridge/nbs/assets/rscript/SapflowPredictionScripts.zip
Archive: /home/fenke/repos/corebridge/nbs/assets/rscript/SapflowPredictionData.zip
Checksum calculation
Each script has it’s own set of input files and should be run to update it’s output when either it’s inputs have changed or it’s expected output does not exist.
We can check for filechanges using a hashing algorithm, for instance MD5 or SHA-256. These are available either in Python or from the commandline.
Lets look at the commandline version of MD5, on linux this is md5sum
, with the input file for the preparation stage:
print(json.dumps(data_file_flow[list(data_file_flow.keys())[0]]['in'], indent=3))
{
"Meta_data": "Data/Meta_data.csv",
"Sapflux_Tilia_train": "Data/Sapflux_Tilia_train.csv",
"Weather_Tilia_train": "Data/Weather_Tilia_train.csv",
"Weather_Tilia_pred": "Data/Weather_Tilia_pred.csv"
}
md5sum will output hashes to stdout, which subprocess.run
captures for us
= 0
flow_object_index = list(data_file_flow[flow_object_index]['in'].values())
input_files
print(json.dumps(input_files, indent=3))
[
"Data/Meta_data.csv",
"Data/Sapflux_Tilia_train.csv",
"Data/Weather_Tilia_train.csv",
"Data/Weather_Tilia_pred.csv"
]
= subprocess.run(
md5_encode_result 'md5sum','-b']+
[
input_files, =save_dir,
cwd=True)
capture_outputprint(md5_encode_result.stdout.decode('UTF-8'))
4bed61a77505bfd52032591d5c3a6050 *Data/Meta_data.csv
6d705d98caa6618a4a990c3742c16564 *Data/Sapflux_Tilia_train.csv
1232592f9488ce4fbb4ae11ba5be0349 *Data/Weather_Tilia_train.csv
366dac1bf64003d1d08fca6121c036bd *Data/Weather_Tilia_pred.csv
If we want to check the files we run it with the -c
option and a file with the previously calculated checksums
= data_file_flow[flow_object_index]['name']
script_name
= get_save_path(f"input-checksum-{script_name.split('.')[0]}", save_dir)
checksum_file with open(checksum_file, 'wt') as cf:
'UTF-8')) cf.write(md5_encode_result.stdout.decode(
= subprocess.run(
md5_check_result 'md5sum', '-c', checksum_file],
[=save_dir,
cwd=True)
capture_outputprint(md5_check_result.stdout.decode('UTF-8'))
print(f"Run returned code {md5_check_result.returncode}")
if md5_check_result.returncode:
print(md5_check_result.stderr.decode('UTF-8'))
Data/Meta_data.csv: OK
Data/Sapflux_Tilia_train.csv: OK
Data/Weather_Tilia_train.csv: OK
Data/Weather_Tilia_pred.csv: OK
Run returned code 0
Had there been a change to a file it would have looked like
= subprocess.run(
md5_check_result 'md5sum', '-c', checksum_file+'-modified'],
[=save_dir,
cwd=True)
capture_outputprint(md5_check_result.stdout.decode('UTF-8'))
print(f"Run returned code {md5_check_result.returncode}")
Run returned code 1
We don’t really need specifics, only the return code will do for our purpose.
Checking files
Generating names
calc_hash_from_data_files
calc_hash_from_data_files (flow_object:dict, save_dir:str)
Calculate hash from the contents of the input files for a given flow object
Exported source
= 1024 * 32
read_chunk_size def calc_hash_from_flowobject(flow_object:dict)->str:
'''Calculate a unique hash for a given flow object'''
return hashlib.md5(repr(flow_object).encode('UTF-8')).hexdigest()
def calc_hash_from_files(files:list, save_dir:str)->str:
'''Calculate hash from the contents of the input files'''
= hashlib.md5()
hashobj
# iterate over files
for data_file in files:
= os.path.join(save_dir, data_file)
full_name if not os.path.isfile(full_name):
continue
with open(full_name, 'rb') as f:
# loop till the end of the file
while True:
# read only 1024 bytes at a time
= f.read(read_chunk_size)
chunk if not chunk:
break
hashobj.update(chunk)
return hashobj.hexdigest()
def calc_hash_from_input_files(flow_object:dict, save_dir:str)->str:
'''Calculate hash from the contents of the input files for a given flow object'''
return calc_hash_from_files(list(flow_object['in'].values()), save_dir)
def calc_hash_from_data_files(flow_object:dict, save_dir:str)->str:
'''Calculate hash from the contents of the input files for a given flow object'''
return calc_hash_from_files(list(flow_object['in'].values()) + list(flow_object['out'].values()), save_dir)
calc_hash_from_input_files
calc_hash_from_input_files (flow_object:dict, save_dir:str)
Calculate hash from the contents of the input files for a given flow object
calc_hash_from_files
calc_hash_from_files (files:list, save_dir:str)
Calculate hash from the contents of the input files
calc_hash_from_flowobject
calc_hash_from_flowobject (flow_object:dict)
Calculate a unique hash for a given flow object
calc_hash_from_flowobject(data_file_flow[flow_object_index])
'5d532037f7dda7e7fad290ddef53f69d'
calc_hash_from_input_files(data_file_flow[flow_object_index], save_dir)
'32095cd16a83a2c63f1ab51a58ed96c9'
calc_hash_from_data_files(data_file_flow[flow_object_index], save_dir)
'65e6d36ac0c8aadb1fb4ee48d4ff88f3'
Inputs
check_script_inputs
check_script_inputs (flow_object:dict, workdir:str)
Check if the input files for a script are up-to-date, returns True if up-to-date.
Exported source
def check_script_inputs(flow_object:dict, workdir:str)->bool:
"""
Check if the input files for a script are up-to-date, returns True if up-to-date.
"""
= f"input-checksum-{calc_hash_from_flowobject(flow_object)}"
checksum_filename = subprocess.run(
md5_check_result 'md5sum', '-c', checksum_filename],
[=workdir,
cwd=True)
capture_outputf"Checksum check result for Flow object: {flow_object['name']}: {md5_check_result.returncode}, checksum file: {checksum_filename}")
syslog.debug(
return int(md5_check_result.returncode) == 0
1], save_dir) check_script_inputs(data_file_flow[
DEBUG 1975 __main__ 4142405122.py 11 Checksum check result for Flow object: Prediction_part1.R: 0, checksum file: input-checksum-33757e2024067c7704d44232a63c870b
True
Outputs
The output is easily checked for existence with isfile
.
check_script_output
check_script_output (flow_object:dict, workdir:str)
Check if the output files for a script exist, returns True if they all exist.
Exported source
def check_script_output(flow_object:dict, workdir:str)->bool:
"""
Check if the output files for a script exist, returns True if they all exist.
"""
= [
files_exist
os.path.isfile(get_save_path(F, workdir)) for F in flow_object['out'].values()
]f"Output files for Flow object: {flow_object['name']}: {list(zip(flow_object['out'], files_exist))}")
syslog.debug(return all(files_exist)
0], save_dir) check_script_output(data_file_flow[
DEBUG 1975 __main__ 3613995005.py 10 Output files for Flow object: Data_preparation.R: [('Modelling_data', True), ('Prediction_data', True)]
True
Generating the checksum file
generate_checksum_file
generate_checksum_file (flow_object:dict, workdir:str)
Generates the checksum file for a given flow object
Exported source
def generate_checksum_file(flow_object:dict, workdir:str)->bool:
"""Generates the checksum file for a given flow object"""
= list(flow_object['in'].values())
input_files = subprocess.run(
md5_encode_result 'md5sum','-b']+
[
input_files, =workdir,
cwd=True)
capture_output
= f"input-checksum-{calc_hash_from_flowobject(flow_object)}"
checksum_filename f"Checksum file for Flow object: {flow_object['name']} created return {md5_encode_result.returncode}, checksum file: {checksum_filename}")
syslog.debug(with open(os.path.join(workdir, checksum_filename), 'wt') as cf:
'UTF-8'))
cf.write(md5_encode_result.stdout.decode(
return md5_encode_result.returncode == 0 and check_script_inputs(flow_object, workdir)
0], save_dir) generate_checksum_file(data_file_flow[
DEBUG 1975 __main__ 3722079908.py 14 Checksum file for Flow object: Data_preparation.R created return 0, checksum file: input-checksum-5d532037f7dda7e7fad290ddef53f69d
DEBUG 1975 __main__ 4142405122.py 11 Checksum check result for Flow object: Data_preparation.R: 0, checksum file: input-checksum-5d532037f7dda7e7fad290ddef53f69d
True
Running once
We don’t need and don’t want’t to run the script if more then once. This is not a problem when a script has finished and updated the checksum file, but we also want to prevent near simultaneous runs in a multiprocessing environment.
We’ll use file locking from fcntl directly
# Use fcntl file locking to prevent multiple processes from running the same code at the same time.
# see https://docs.python.org/3/library/fcntl.html#fcntl.flock
# Create a filename based on input-file contents
= get_save_path(f"lock-{calc_hash_from_input_files(data_file_flow[0], save_dir)}", save_dir) lock_file
Now we use fcntl.flock
with flags fcntl.LOCK_EX | fcntl.LOCK_NB
to lock the file for exclusive access, while an exception is thrown if it’s already locked.
with open(lock_file, 'wt') as cf:
| fcntl.LOCK_NB)
fcntl.flock(cf, fcntl.LOCK_EX with open(lock_file, 'wt') as cf2:
try:
| fcntl.LOCK_NB)
fcntl.flock(cf2, fcntl.LOCK_EX except BlockingIOError as locked_error:
print(locked_error)
[Errno 11] Resource temporarily unavailable
The locks are removed when the file is closed, how convenient
with open(lock_file, 'wt') as cf:
| fcntl.LOCK_NB)
fcntl.flock(cf, fcntl.LOCK_EX with open(lock_file, 'wt') as cf:
| fcntl.LOCK_NB) fcntl.flock(cf, fcntl.LOCK_EX
Putting it together
Synchroneous
We need to run a script when either any of it’s inputs have changed or any of it’s outputs do not exist. Return True if a follow-up script should be executed, False if nothing changed or executing the script failed.
run_rscript_wait
run_rscript_wait (flow_object, assets_dir:str, save_dir:str)
Run a script in R args: flow_object: dict of flow object returns: bool: True if a follow-up script might need to be run, False if not
Exported source
def run_rscript_wait(flow_object, assets_dir:str, save_dir:str):
""" Run a script in R
args:
flow_object: dict of flow object
returns:
bool: True if a follow-up script might need to be run, False if not
"""
f"Running script {flow_object['name']}")
syslog.debug(# Check if output exists and inputs have not changed and return False if
# output exists and inputs have not changed
if check_script_output(flow_object, save_dir) and check_script_inputs(flow_object, save_dir):
return True
# Create the lock file
= get_save_path(f"lock-{calc_hash_from_flowobject(flow_object)}", save_dir)
lock_file with open(lock_file, 'wt') as cf:
try:
f"Locking {lock_file}")
syslog.debug(# Get exclusive lock on the file, is released on file close
| fcntl.LOCK_NB)
fcntl.flock(cf, fcntl.LOCK_EX
# run the script
= subprocess.run(
run_script_result 'Rscript', '--vanilla', get_asset_path(flow_object['name'], assets_dir)],
[=save_dir,
cwd=True
capture_output
)
# check the return code
if run_script_result.returncode:
f"Run returned code {run_script_result.returncode}\n")
cf.write(f"STDOUT------------\n{run_script_result.stdout.decode('UTF-8')}\n")
cf.write(f"STDERR------------\n{run_script_result.stderr.decode('UTF-8')}\n")
cf.write(return False
except BlockingIOError as locked_error:
syslog.debug(locked_error)return False
# check the output and generate the checksum file
return check_script_output(flow_object, save_dir) and generate_checksum_file(flow_object, save_dir)
#run_rscript_wait(data_file_flow[0], assets_dir, save_dir)
#run_rscript_wait(data_file_flow[1], assets_dir, save_dir)
#run_rscript_wait(data_file_flow[2], assets_dir, save_dir)
def clear_results(flow_object :dict, save_dir:str):
"""Clear the results of a given flow object"""
for fname in flow_object['out']:
try:
os.remove(get_save_path(fname, save_dir))except FileNotFoundError:
pass
# [clear_results(O, save_dir) for O in data_file_flow.values()]
Asynchronous
In the API we can not wait for a script to finish and we’ll use Popen instead. This means we’ll have to keep track of the process.
run_rscript_nowait
run_rscript_nowait (flow_object, workdir:str, libfolder:str=None, pkg_repo:str='https://cloud.r-project.org')
Run a script in R args: flow_object: dict of flow object workdir: working directory pkg_repo: CRAN package repository returns: RScriptProcess: Popen container object for the script
Exported source
= namedtuple('RScriptProcess', ['flow_object', 'lock_file', 'stdout','stderr', 'popen_args', 'popen'])
RScriptProcess
#### Asynchronous RScript processing ------------------------------------------------
def run_rscript_nowait(
flow_object, str,
workdir:str=None,
libfolder:str='https://cloud.r-project.org') -> RScriptProcess:
pkg_repo:
""" Run a script in R
args:
flow_object: dict of flow object
workdir: working directory
pkg_repo: CRAN package repository
returns:
RScriptProcess: Popen container object for the script
"""
f"Starting rscript for {flow_object['name']}")
syslog.debug(
# lockfile -------------------------------------------------------------------
'temp')), exist_ok=True)
os.makedirs(os.path.abspath(os.path.join(workdir, def get_temp_path(lname):
return os.path.abspath(os.path.join(workdir, 'temp', lname))
= 'run_flow_object-'+calc_hash_from_flowobject(flow_object)
lock_name
# lock maintenance
if run_rscript_nowait.lock_objects.get(lock_name):
= run_rscript_nowait.lock_objects[lock_name]
lock_object if not lock_object.lock_file.closed:
f"Lockfile is open for {flow_object['name']} ({lock_name})")
syslog.debug(# If the lockfile is open, check if the process is still running
if lock_object.popen is None:
f"No process running for {flow_object['name']} ({lock_name})")
syslog.debug(elif lock_object.popen.poll() is None:
f"Script is still running for {flow_object['name']} ({lock_name})")
syslog.debug(return lock_object
else:
f"Script has finished for {flow_object['name']} ({lock_name}), returned {lock_object.popen.returncode}")
syslog.debug(# since poll return not-None the script has finished so close the lockfile
lock_object.lock_file.close()
lock_object.stdout.close()
lock_object.stderr.close()if lock_object.popen.returncode != 0:
f"Script failed for {flow_object['name']} ({lock_name}), returned {lock_object.popen.returncode}")
syslog.error(f"Args were: {lock_object.popen_args}")
syslog.error(with open(lock_object.stdout.name, 'rb') as so:
f"STDOUT\n{so.read().decode('UTF-8')}")
syslog.error(with open(lock_object.stderr.name, 'rb') as se:
f"STDERR\n{se.read().decode('UTF-8')}")
syslog.error(else:
f"Script was successful for {flow_object['name']} ({lock_name})")
syslog.debug(
generate_checksum_file(flow_object, os.path.abspath(workdir))
#os.remove(lock_object.stdout.name)
#os.remove(lock_object.stderr.name)
# Check if output exists and inputs have not changed and return False if
# output exists and inputs have not changed
if check_script_output(flow_object, os.path.abspath(workdir)) and check_script_inputs(flow_object, os.path.abspath(workdir)):
f"Output and inputs are up-to-date for {flow_object['name']}")
syslog.debug(return run_rscript_nowait.lock_objects.get(lock_name)
if not all([os.path.exists(get_save_path(fname, os.path.abspath(workdir))) for fname in flow_object['in'].values()]):
f"Inputs missing for {flow_object['name']}")
syslog.debug(return run_rscript_nowait.lock_objects.get(lock_name)
# Create the lock file -----------------------------------------------------------
f"Preparing to run scripts for {flow_object['name']}, creating lockfile ({lock_name})")
syslog.debug(= open(get_temp_path(f"lock-{lock_name}"), 'wt')
cf
try:
# Set lock on lockfile
| fcntl.LOCK_NB)
fcntl.flock(cf, fcntl.LOCK_EX
= open(get_temp_path(f"stdout-{lock_name}"), 'wt')
so = open(get_temp_path(f"stderr-{lock_name}"), 'wt')
se
# check libs
if not libfolder:
=os.path.abspath(os.path.join(workdir, 'libs'))
libfolder
=True)
os.makedirs(libfolder, exist_okf"Using libfolder {libfolder} for packages")
syslog.debug(
= dict(os.environ)
env 'R_LIBS_USER'] = libfolder
env[F"Using libfolder {env['R_LIBS_USER']} for R_LIBS_USER")
syslog.debug(
if not check_rscript_libs(flow_object['libs'], libfolder):
for pkg_i in flow_object['libs']:
f"Checking lib {pkg_i} for {flow_object['name']} ({lock_name})")
syslog.debug(if not check_rscript_lib(pkg_i, libfolder):
f"Starting installation of {pkg_i} for {flow_object['name']} ({lock_name})")
syslog.debug(= [
popen_args 'Rscript','-e',
f"install.packages('{pkg_i}', repos='{pkg_repo}', lib='{libfolder}', dependencies=TRUE)",
]= subprocess.Popen(
run_script_install
popen_args, =os.path.abspath(workdir),
cwd=so,
stdout=se,
stderr='UTF-8',
encoding=env,
env
)= RScriptProcess(flow_object, cf, so, se, popen_args, run_script_install)
run_rscript_nowait.lock_objects[lock_name] return run_rscript_nowait.lock_objects.get(lock_name)
f"Libs are up-to-date, starting script for {flow_object['name']} ({lock_name})")
syslog.debug(# run the script
= ['Rscript', flow_object['name']]
popen_args = subprocess.Popen(
popen_run
popen_args,=os.path.abspath(workdir),
cwd=so,
stdout=se,
stderr='UTF-8',
encoding=env,
env
)
= RScriptProcess(flow_object, cf, so, se, popen_args, popen_run)
run_rscript_nowait.lock_objects[lock_name]
except BlockingIOError as locked_error:
cf.close()#syslog.error(f"{flow_object['name']} is locked, cannot run", exc_info=locked_error)
f"Done with {flow_object['name']}.")
syslog.debug(
return run_rscript_nowait.lock_objects.get(lock_name)
= {} run_rscript_nowait.lock_objects
release_script_lock
release_script_lock (flow_object, save_dir)
Exported source
def release_script_lock(flow_object, save_dir):
= run_rscript_nowait.lock_objects.get(flow_object['name'])
process if process.popen and process.popen.poll() is not None:
f"Closing lockfile {process.lock_file.name}")
syslog.debug( process.lock_file.close()
for flow_object in data_file_flow.values():
f"{flow_object['name']} --------------------")
syslog.info(= run_rscript_nowait(flow_object, workdir=save_dir)
startresult
#print(f"Args: {startresult.popen_args if startresult else None}")
INFO 1975 __main__ 2844065539.py 2 Data_preparation.R --------------------
DEBUG 1975 __main__ 319997563.py 22 Starting rscript for Data_preparation.R
DEBUG 1975 __main__ 3613995005.py 10 Output files for Flow object: Data_preparation.R: [('Modelling_data', True), ('Prediction_data', True)]
DEBUG 1975 __main__ 4142405122.py 11 Checksum check result for Flow object: Data_preparation.R: 0, checksum file: input-checksum-5d532037f7dda7e7fad290ddef53f69d
DEBUG 1975 __main__ 319997563.py 67 Output and inputs are up-to-date for Data_preparation.R
INFO 1975 __main__ 2844065539.py 2 Prediction_part1.R --------------------
DEBUG 1975 __main__ 319997563.py 22 Starting rscript for Prediction_part1.R
DEBUG 1975 __main__ 3613995005.py 10 Output files for Flow object: Prediction_part1.R: [('Fitted_models', True), ('Weights', True)]
DEBUG 1975 __main__ 4142405122.py 11 Checksum check result for Flow object: Prediction_part1.R: 0, checksum file: input-checksum-33757e2024067c7704d44232a63c870b
DEBUG 1975 __main__ 319997563.py 67 Output and inputs are up-to-date for Prediction_part1.R
INFO 1975 __main__ 2844065539.py 2 Prediction_part2.R --------------------
DEBUG 1975 __main__ 319997563.py 22 Starting rscript for Prediction_part2.R
DEBUG 1975 __main__ 3613995005.py 10 Output files for Flow object: Prediction_part2.R: [('Predicted_sapflux', True)]
DEBUG 1975 __main__ 4142405122.py 11 Checksum check result for Flow object: Prediction_part2.R: 0, checksum file: input-checksum-d37df0122cd5162bc80b4c25b959e4b0
DEBUG 1975 __main__ 319997563.py 67 Output and inputs are up-to-date for Prediction_part2.R
INFO 1975 __main__ 2844065539.py 2 Prediction_part3.R --------------------
DEBUG 1975 __main__ 319997563.py 22 Starting rscript for Prediction_part3.R
DEBUG 1975 __main__ 3613995005.py 10 Output files for Flow object: Prediction_part3.R: [('Predicted_water_usage', True)]
DEBUG 1975 __main__ 4142405122.py 11 Checksum check result for Flow object: Prediction_part3.R: 0, checksum file: input-checksum-b4e0cb61356d5cc9830bc495b1584e9c
DEBUG 1975 __main__ 319997563.py 67 Output and inputs are up-to-date for Prediction_part3.R
for name, process in run_rscript_nowait.lock_objects.items():
if process.popen:
print(f"process {name} for {process.flow_object['name']} is done? {process.popen.poll()}")
print(f"args: {process.popen_args}")
for flow_object in data_file_flow.values():
f"Checking {flow_object['name']}")
syslog.info(if check_script_output(flow_object, save_dir) and check_script_inputs(flow_object, save_dir):
f"Output and inputs are up-to-date for {flow_object['name']}")
syslog.info(else:
print([
(F,os.path.isfile(get_save_path(F, save_dir)) )for F in flow_object['out']
])= get_save_path(f"input-checksum-{calc_hash_from_flowobject(flow_object)}", save_dir)
checksum_file = subprocess.run(
md5_check_result 'md5sum', '-c', checksum_file],
[=save_dir,
cwd=True)
capture_output
print(md5_check_result.returncode, md5_check_result.stderr.decode('UTF-8'))
INFO 1975 __main__ 1138821752.py 2 Checking Data_preparation.R
DEBUG 1975 __main__ 3613995005.py 10 Output files for Flow object: Data_preparation.R: [('Modelling_data', True), ('Prediction_data', True)]
DEBUG 1975 __main__ 4142405122.py 11 Checksum check result for Flow object: Data_preparation.R: 0, checksum file: input-checksum-5d532037f7dda7e7fad290ddef53f69d
INFO 1975 __main__ 1138821752.py 4 Output and inputs are up-to-date for Data_preparation.R
INFO 1975 __main__ 1138821752.py 2 Checking Prediction_part1.R
DEBUG 1975 __main__ 3613995005.py 10 Output files for Flow object: Prediction_part1.R: [('Fitted_models', True), ('Weights', True)]
DEBUG 1975 __main__ 4142405122.py 11 Checksum check result for Flow object: Prediction_part1.R: 0, checksum file: input-checksum-33757e2024067c7704d44232a63c870b
INFO 1975 __main__ 1138821752.py 4 Output and inputs are up-to-date for Prediction_part1.R
INFO 1975 __main__ 1138821752.py 2 Checking Prediction_part2.R
DEBUG 1975 __main__ 3613995005.py 10 Output files for Flow object: Prediction_part2.R: [('Predicted_sapflux', True)]
DEBUG 1975 __main__ 4142405122.py 11 Checksum check result for Flow object: Prediction_part2.R: 0, checksum file: input-checksum-d37df0122cd5162bc80b4c25b959e4b0
INFO 1975 __main__ 1138821752.py 4 Output and inputs are up-to-date for Prediction_part2.R
INFO 1975 __main__ 1138821752.py 2 Checking Prediction_part3.R
DEBUG 1975 __main__ 3613995005.py 10 Output files for Flow object: Prediction_part3.R: [('Predicted_water_usage', True)]
DEBUG 1975 __main__ 4142405122.py 11 Checksum check result for Flow object: Prediction_part3.R: 0, checksum file: input-checksum-b4e0cb61356d5cc9830bc495b1584e9c
INFO 1975 __main__ 1138821752.py 4 Output and inputs are up-to-date for Prediction_part3.R
for process in run_rscript_nowait.lock_objects.values():
if process.popen and process.popen.poll() is not None:
print(f"Closing lockfile {process.lock_file.name}")
process.lock_file.close()
AICore module class
Init
AICoreRScriptModule
AICoreRScriptModule (flow_mapping:dict, save_dir:str, assets_dir:str, cran_repo:str='https://cloud.r-project.org', *args, **kwargs)
Initialize self. See help(type(self)) for accurate signature.
Type | Default | Details | |
---|---|---|---|
flow_mapping | dict | scripts flow map | |
save_dir | str | path where the module can keep files | |
assets_dir | str | path to support files (scripts, metadata, etc) | |
cran_repo | str | https://cloud.r-project.org | CRAN repo |
args | VAR_POSITIONAL | ||
kwargs | VAR_KEYWORD |
Exported source
class AICoreRScriptModule(AICoreModuleBase):
def __init__(self,
dict, # scripts flow map
flow_mapping:str, # path where the module can keep files
save_dir:str, # path to support files (scripts, metadata, etc)
assets_dir:str='https://cloud.r-project.org', # CRAN repo
cran_repo:*args, **kwargs):
super().__init__(save_dir, assets_dir, *args, **kwargs)
self.corebridge_version = corebridge.__version__
self.flow_mapping = flow_mapping
self.cran_repo = cran_repo
self.data_files_map = {
D:Ffor P in self.flow_mapping.values()
for D,F in P['in'].items()
}
for N in self.data_files_map.keys():
print(N, os.path.isfile(self.get_save_path(self.data_files_map.get(N))), self.get_save_path(self.data_files_map.get(N)))
# list assets
print('Assets:\n', subprocess.run(['ls', '-la', assets_dir], capture_output=True).stdout.decode('UTF-8'))
self.unpack_result = unpack_assets(assets_dir, self.get_rscript_workdir())
# list working directory
#print('Working directory:\n', subprocess.run(['ls', '-l', '*.R', save_dir], capture_output=True).stdout.decode('UTF-8'))
self.flow_results = {
'name']:run_rscript_nowait(
flow_object[
flow_object, =self.get_rscript_workdir(),
workdir=self.get_rscript_libpath(),
libfolder=self.cran_repo
pkg_repo
)for flow_object in self.flow_mapping.values()
}
self.update_flow()
f"RScriptModule initialized with {len(flow_mapping)} flow objects.")
syslog.info(
# def get_asset_path(self,script_name):
# return os.path.abspath(os.path.join(self.init_kwargs['assets_dir'], script_name))
def get_rscript_libpath(self):
return os.path.abspath(os.path.join(self.init_kwargs['save_dir'], 'libs'))
def get_rscript_workdir(self):
return os.path.abspath(os.path.join(self.init_kwargs['save_dir'], 'workdir'))
def get_save_path(self, datafile_name:str):
return os.path.abspath(os.path.join(self.init_kwargs['save_dir'], 'workdir', datafile_name))
def get_flow_status(self):
return [
f"process {name} for {process.flow_object['name']} pollstatus: {process.popen.poll()}, args: {process.popen_args}"
for name, process in self.flow_results.items()
if process and process.popen
]
Run flow scripts to update data
AICoreRScriptModule.update_flow
AICoreRScriptModule.update_flow ()
Exported source
@patch
def update_flow(self:AICoreRScriptModule):
= self.get_rscript_workdir()
workdir = self.get_rscript_libpath()
libfolder
for flow_object in self.flow_mapping.values():
f"Update {flow_object['name']}, output: {check_script_output(flow_object, os.path.abspath(workdir))}, inputs: {check_script_inputs(flow_object, os.path.abspath(workdir))}")
syslog.debug(if (
not check_script_output(flow_object, os.path.abspath(workdir))
or not check_script_inputs(flow_object, os.path.abspath(workdir))
):if self.flow_results[flow_object['name']]:
= self.flow_results[flow_object['name']]
process if process.popen.poll() is None:
f"Process is still running: {flow_object['name']}, args: {process.popen_args}")
syslog.debug(return self.get_flow_status()
else:
f"Process finished: {flow_object['name']}, args: {process.popen_args}, returncode: {process.popen.poll()}")
syslog.debug(
f"Updating for {flow_object['name']}, starting at {workdir}")
syslog.debug(
self.flow_results[flow_object['name']] = run_rscript_nowait(
flow_object, =workdir,
workdir=libfolder,
libfolder=self.cran_repo
pkg_repo
)
f"RScriptModule flow update complete.")
syslog.info(return self.get_flow_status()
Converting RData
?rdata.read_rda
Signature:
rdata.read_rda(
file_or_path: 'AcceptableFile | os.PathLike[Any] | Traversable | str',
*,
expand_altrep: 'bool' = True,
altrep_constructor_dict: 'AltRepConstructorMap' = mappingproxy({b'deferred_string': <function deferred_string_constructor at 0x7f22ea064360>, b'compact_intseq': <function compact_intseq_constructor at 0x7f22ea0644a0>, b'compact_realseq': <function compact_realseq_constructor at 0x7f22ea064540>, b'wrap_real': <function wrap_constructor at 0x7f22ea0645e0>, b'wrap_string': <function wrap_constructor at 0x7f22ea0645e0>, b'wrap_logical': <function wrap_constructor at 0x7f22ea0645e0>, b'wrap_integer': <function wrap_constructor at 0x7f22ea0645e0>, b'wrap_complex': <function wrap_constructor at 0x7f22ea0645e0>, b'wrap_raw': <function wrap_constructor at 0x7f22ea0645e0>}),
constructor_dict: 'ConstructorDict' = mappingproxy({'data.frame': <function dataframe_constructor at 0x7f22ea066ac0>, 'factor': <function factor_constructor at 0x7f22ea066c00>, 'ordered': <function ordered_constructor at 0x7f22ea066ca0>, 'ts': <function ts_constructor at 0x7f22ea066d40>, 'srcref': <function srcref_constructor at 0x7f22ea066de0>, 'srcfile': <function srcfile_constructor at 0x7f22ea067240>, 'srcfilecopy': <function srcfilecopy_constructor at 0x7f22ea067560>}),
default_encoding: 'str | None' = None,
force_default_encoding: 'bool' = False,
global_environment: 'MutableMapping[str, Any] | None' = None,
base_environment: 'MutableMapping[str, Any] | None' = None,
) -> 'dict[str, Any]'
Docstring:
Read an RDA or RDATA file, containing an R object.
This is a convenience function that wraps :func:`rdata.parser.parse_file`
and :func:`rdata.parser.convert`, as it is the common use case.
Args:
file_or_path: File in the RDA format.
expand_altrep: Whether to translate ALTREPs to normal objects.
altrep_constructor_dict: Dictionary mapping each ALTREP to
its constructor.
constructor_dict: Dictionary mapping names of R classes to constructor
functions with the following prototype:
.. code-block :: python
def constructor(obj, attrs):
...
This dictionary can be used to support custom R classes. By
default, the dictionary used is
:data:`~rdata.conversion._conversion.DEFAULT_CLASS_MAP`
which has support for several common classes.
default_encoding: Default encoding used for strings with unknown
encoding. If `None`, the one stored in the file will be used, or
ASCII as a fallback.
force_default_encoding:
Use the default encoding even if the strings specify other
encoding.
global_environment: Global environment to use. By default is an empty
environment.
base_environment: Base environment to use. By default is an empty
environment.
Returns:
Contents of the file converted to a Python object.
See Also:
:func:`read_rds`: Similar function that parses a RDS file.
Examples:
Parse one of the included examples, containing a dataframe
>>> import rdata
>>>
>>> data = rdata.read_rda(
... rdata.TESTDATA_PATH / "test_dataframe.rda"
... )
>>> data
{'test_dataframe': class value
1 a 1
2 b 2
3 b 3}
File: ~/repos/corebridge/.devenv-corebridge/lib64/python3.11/site-packages/rdata/_read.py
Type: function
= AICoreRScriptModule(data_file_flow, save_dir, assets_dir) module
--------------------------------------------------------------------------- NameError Traceback (most recent call last) Cell In[81], line 1 ----> 1 module = AICoreRScriptModule(data_file_flow, save_dir, assets_dir) Cell In[78], line 10, in AICoreRScriptModule.__init__(self, flow_mapping, save_dir, assets_dir, cran_repo, *args, **kwargs) 3 def __init__(self, 4 flow_mapping:dict, # scripts flow map 5 save_dir:str, # path where the module can keep files 6 assets_dir:str, # path to support files (scripts, metadata, etc) 7 cran_repo:str='https://cloud.r-project.org', # CRAN repo 8 *args, **kwargs): ---> 10 super().__init__(save_dir, assets_dir, *args, **kwargs) 12 self.corebridge_version = __version__ 14 self.flow_mapping = flow_mapping File ~/repos/corebridge/corebridge/core.py:311, in __init__(self, save_dir, assets_dir, *args, **kwargs) 304 @patch 305 def __init__(self:AICoreModuleBase, 306 save_dir:str, # path where the module can keep files 307 assets_dir:str, # path to support files (scripts, metadata, etc) 308 *args, **kwargs): 310 self.init_time = datetime.datetime.now(datetime.UTC) --> 311 self.corebridge_version = __version__ 313 self.init_args = args 314 self.init_kwargs = dict( 315 **kwargs, 316 assets_dir=assets_dir, 317 save_dir=save_dir 318 ) NameError: name '__version__' is not defined
/ "test_dataframe.rda" rdata.TESTDATA_PATH
Path('/home/fenke/repos/corebridge/.devenv-corebridge/lib64/python3.11/site-packages/rdata/tests/data/test_dataframe.rda')
= rdata.read_rda(rdata.TESTDATA_PATH / "test_dataframe.rda")
converted converted
{'test_dataframe': class value
1 a 1
2 b 2
3 b 3}
if os.path.exists(module.get_save_path(module.data_files_map.get('Predicted_sapflux'))):
= rdata.read_rda(module.get_save_path(module.data_files_map.get('Predicted_sapflux')))
converted converted
if os.path.exists(module.get_save_path(module.data_files_map.get('Predicted_sapflux'))):
list(converted['ensemble_pred'].keys())
def print_structure(data, indent):
if not isinstance(data, dict):
if isinstance(data, list) or isinstance(data, np.ndarray):
print(f"\n{indent}data:{len(data)} items.")
0], indent + '--> ')
print_structure(data[else:
print(f"\n{indent}type: {type(data)}")
return
for key in data.keys():
print(f"\n{indent}{key}")
+'++> ') print_structure(data[key], indent
if os.path.exists(module.get_save_path(module.data_files_map.get('Predicted_sapflux'))):
' ') print_structure(converted,
ensemble_pred
++> ensemble
++> ++> data:10 items.
++> ++> --> equ
++> ++> --> ++> data:168 items.
++> ++> --> ++> --> type: <class 'numpy.float64'>
++> ++> --> mse
++> ++> --> ++> data:168 items.
++> ++> --> ++> --> type: <class 'numpy.float64'>
++> error
++> ++> data:10 items.
++> ++> --> equ
++> ++> --> ++> data:168 items.
++> ++> --> ++> --> type: <class 'numpy.float64'>
++> ++> --> mse
++> ++> --> ++> data:168 items.
++> ++> --> ++> --> type: <class 'numpy.float64'>
++> typical
++> ++> equ
++> ++> ++> pred
++> ++> ++> ++> data:168 items.
++> ++> ++> ++> --> type: <class 'numpy.float64'>
++> ++> ++> se
++> ++> ++> ++> data:168 items.
++> ++> ++> ++> --> type: <class 'numpy.float64'>
++> ++> mse
++> ++> ++> pred
++> ++> ++> ++> data:168 items.
++> ++> ++> ++> --> type: <class 'numpy.float64'>
++> ++> ++> se
++> ++> ++> ++> data:168 items.
++> ++> ++> ++> --> type: <class 'numpy.float64'>
++> time
++> ++> data:168 items.
++> ++> --> type: <class 'numpy.str_'>
recursive_flatten_nested_data
recursive_flatten_nested_data (data:dict, column_prefix:str='', camel_case=False)
Exported source
def snake_case_to_camel_case(snake_case:str) -> str:
= snake_case.split('_')
splittext return ''.join([x.capitalize() if n > 0 else x for x,n in zip(splittext, range(len(splittext)))])
def recursive_flatten_nested_data(
dict,
data:str='',
column_prefix:=False) -> dict:
camel_case
if isinstance(data, np.ndarray):
return {column_prefix:data}
if isinstance(data, list):
return reduce(
lambda R, X: dict(**R, **X) if R else X,
[f"{column_prefix}_{i+1}_", camel_case)
recursive_flatten_nested_data(value, for i, value in enumerate(data)
],
{}
)
if isinstance(data, dict):
#if len(data.keys()) == 0:
# return data
if len(data.keys()) > 1:
return reduce(
lambda R, X: dict(**R, **X) if R else X,
[
recursive_flatten_nested_data(
value, +'_'+str(key)) if camel_case else column_prefix+'_'+str(key),
snake_case_to_camel_case(column_prefix
camel_case
)for key, value in data.items()
],
{}
)else:
= list(data.keys())[0]
key = data[key]
value if column_prefix:
= snake_case_to_camel_case(column_prefix+'_'+str(key)) if camel_case else column_prefix+'_'+str(key)
column_name else:
= snake_case_to_camel_case(str(key)) if camel_case else str(key)
column_name return recursive_flatten_nested_data(
value, column_name, camel_case )
snake_case_to_camel_case
snake_case_to_camel_case (snake_case:str)
= recursive_flatten_nested_data(converted, camel_case=True)
flattened = pd.DataFrame(flattened)
df = [k for k,v in df.dtypes.to_dict().items() if 'float' not in str(v)][0]
time_column =True)
df.set_index( pd.DatetimeIndex(df[time_column]), inplace#df.drop('ensemble_predTime', axis=1, inplace=True)
df.info()
<class 'pandas.core.frame.DataFrame'>
DatetimeIndex: 168 entries, 2023-08-01 00:00:01 to 2023-08-07 23:00:01
Data columns (total 45 columns):
# Column Non-Null Count Dtype
--- ------ -------------- -----
0 ensemblePredEnsemble1Equ 168 non-null float64
1 ensemblePredEnsemble1Mse 168 non-null float64
2 ensemblePredEnsemble2Equ 168 non-null float64
3 ensemblePredEnsemble2Mse 168 non-null float64
4 ensemblePredEnsemble3Equ 168 non-null float64
5 ensemblePredEnsemble3Mse 168 non-null float64
6 ensemblePredEnsemble4Equ 168 non-null float64
7 ensemblePredEnsemble4Mse 168 non-null float64
8 ensemblePredEnsemble5Equ 168 non-null float64
9 ensemblePredEnsemble5Mse 168 non-null float64
10 ensemblePredEnsemble6Equ 168 non-null float64
11 ensemblePredEnsemble6Mse 168 non-null float64
12 ensemblePredEnsemble7Equ 168 non-null float64
13 ensemblePredEnsemble7Mse 168 non-null float64
14 ensemblePredEnsemble8Equ 0 non-null float64
15 ensemblePredEnsemble8Mse 168 non-null float64
16 ensemblePredEnsemble9Equ 168 non-null float64
17 ensemblePredEnsemble9Mse 168 non-null float64
18 ensemblePredEnsemble10Equ 168 non-null float64
19 ensemblePredEnsemble10Mse 168 non-null float64
20 ensemblePredError1Equ 168 non-null float64
21 ensemblePredError1Mse 168 non-null float64
22 ensemblePredError2Equ 168 non-null float64
23 ensemblePredError2Mse 168 non-null float64
24 ensemblePredError3Equ 168 non-null float64
25 ensemblePredError3Mse 168 non-null float64
26 ensemblePredError4Equ 168 non-null float64
27 ensemblePredError4Mse 168 non-null float64
28 ensemblePredError5Equ 168 non-null float64
29 ensemblePredError5Mse 168 non-null float64
30 ensemblePredError6Equ 168 non-null float64
31 ensemblePredError6Mse 168 non-null float64
32 ensemblePredError7Equ 168 non-null float64
33 ensemblePredError7Mse 168 non-null float64
34 ensemblePredError8Equ 0 non-null float64
35 ensemblePredError8Mse 0 non-null float64
36 ensemblePredError9Equ 168 non-null float64
37 ensemblePredError9Mse 168 non-null float64
38 ensemblePredError10Equ 168 non-null float64
39 ensemblePredError10Mse 168 non-null float64
40 ensemblePredTypicalEquPred 168 non-null float64
41 ensemblePredTypicalEquSe 168 non-null float64
42 ensemblePredTypicalMsePred 168 non-null float64
43 ensemblePredTypicalMseSe 168 non-null float64
44 ensemblePredTime 168 non-null object
dtypes: float64(44), object(1)
memory usage: 60.4+ KB
df
ensemblePredEnsemble1Equ | ensemblePredEnsemble1Mse | ensemblePredEnsemble2Equ | ensemblePredEnsemble2Mse | ensemblePredEnsemble3Equ | ensemblePredEnsemble3Mse | ensemblePredEnsemble4Equ | ensemblePredEnsemble4Mse | ensemblePredEnsemble5Equ | ensemblePredEnsemble5Mse | ... | ensemblePredError8Mse | ensemblePredError9Equ | ensemblePredError9Mse | ensemblePredError10Equ | ensemblePredError10Mse | ensemblePredTypicalEquPred | ensemblePredTypicalEquSe | ensemblePredTypicalMsePred | ensemblePredTypicalMseSe | ensemblePredTime | |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
ensemblePredTime | |||||||||||||||||||||
2023-08-01 00:00:01 | 0.549107 | 0.542947 | 0.666708 | 0.658971 | 0.780691 | 0.771425 | 0.660063 | 0.652414 | 0.584461 | 0.577826 | ... | NaN | 0.356676 | 0.356170 | 0.375093 | 0.375200 | 0.617332 | 0.125831 | 0.549231 | 0.126207 | 2023-08-01 00:00:01 |
2023-08-01 01:00:01 | 0.490717 | 0.483454 | 0.561241 | 0.552198 | 0.629595 | 0.618828 | 0.557256 | 0.548314 | 0.511918 | 0.504120 | ... | NaN | 0.510744 | 0.508403 | 0.527951 | 0.526323 | 0.531630 | 0.171059 | 0.471002 | 0.170967 | 2023-08-01 01:00:01 |
2023-08-01 02:00:01 | 0.455916 | 0.448803 | 0.499389 | 0.490743 | 0.541525 | 0.531393 | 0.496933 | 0.488373 | 0.468985 | 0.461412 | ... | NaN | 0.594047 | 0.589900 | 0.606447 | 0.602945 | 0.481136 | 0.191053 | 0.425821 | 0.190360 | 2023-08-01 02:00:01 |
2023-08-01 03:00:01 | 0.424082 | 0.417034 | 0.451503 | 0.443261 | 0.478080 | 0.468680 | 0.449953 | 0.441779 | 0.432326 | 0.424918 | ... | NaN | 0.641129 | 0.635564 | 0.649752 | 0.644735 | 0.439990 | 0.201109 | 0.389024 | 0.199905 | 2023-08-01 03:00:01 |
2023-08-01 04:00:01 | 0.407526 | 0.402109 | 0.425149 | 0.418847 | 0.442231 | 0.435070 | 0.424153 | 0.417901 | 0.412824 | 0.407141 | ... | NaN | 0.622489 | 0.615642 | 0.627102 | 0.620682 | 0.417750 | 0.191545 | 0.370637 | 0.189855 | 2023-08-01 04:00:01 |
... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... |
2023-08-07 19:00:01 | 3.658327 | 3.659715 | 3.658327 | 3.659715 | 3.658327 | 3.659715 | 3.658327 | 3.659715 | 3.658327 | 3.659715 | ... | NaN | 2.302566 | 2.259919 | 2.302566 | 2.259919 | 3.658327 | 0.690770 | 3.293744 | 0.677976 | 2023-08-07 19:00:01 |
2023-08-07 20:00:01 | 2.298355 | 2.278851 | 2.298355 | 2.278851 | 2.298355 | 2.278851 | 2.298355 | 2.278851 | 2.298355 | 2.278851 | ... | NaN | 1.790125 | 1.785654 | 1.790125 | 1.785654 | 2.298355 | 0.537037 | 2.050966 | 0.535696 | 2023-08-07 20:00:01 |
2023-08-07 21:00:01 | 1.512578 | 1.489109 | 1.512578 | 1.489109 | 1.512578 | 1.489109 | 1.512578 | 1.489109 | 1.512578 | 1.489109 | ... | NaN | 1.489199 | 1.497512 | 1.489199 | 1.497512 | 1.512578 | 0.446760 | 1.340198 | 0.449254 | 2023-08-07 21:00:01 |
2023-08-07 22:00:01 | 1.018082 | 1.001020 | 1.018082 | 1.001020 | 1.018082 | 1.001020 | 1.018082 | 1.001020 | 1.018082 | 1.001020 | ... | NaN | 1.051568 | 1.060695 | 1.051568 | 1.060695 | 1.018082 | 0.315470 | 0.900918 | 0.318209 | 2023-08-07 22:00:01 |
2023-08-07 23:00:01 | 0.667827 | 0.661204 | 0.667827 | 0.661204 | 0.667827 | 0.661204 | 0.667827 | 0.661204 | 0.667827 | 0.661204 | ... | NaN | 0.647845 | 0.656302 | 0.647845 | 0.656302 | 0.667827 | 0.194353 | 0.595083 | 0.196890 | 2023-08-07 23:00:01 |
168 rows × 45 columns
AICoreRScriptModule.infer
AICoreRScriptModule.infer (data:dict, *_, **kwargs)
Infer method for the RScriptModule
Exported source
@patch
def write_uploaded_data(
self:AICoreRScriptModule,
df:pd.DataFrame, str=None,
tag:**kwargs):
= self.get_save_path(self.data_files_map.get(tag, tag))
csv_filename f"Writing {df.shape[0]} rows to {csv_filename}")
syslog.debug(
=False, date_format='%Y-%m-%d %H:%M:%S')
df.reset_index().to_csv(csv_filename, index
@patch
def read_data(self:AICoreRScriptModule, tag:str=None, camel_case=False, **kwargs):
= self.get_save_path(self.data_files_map.get(tag, tag))
rdata_filename = rdata.read_rda(rdata_filename)
converted
= recursive_flatten_nested_data(converted, camel_case=camel_case)
flattened = pd.DataFrame(flattened)
df f"Read {df.shape[0]} rows from {rdata_filename} for {tag} (camel_case={camel_case})")
syslog.debug(
= [k for k,v in df.dtypes.to_dict().items() if 'float' not in str(v)][0]
time_column =True)
df.set_index( pd.DatetimeIndex(df[time_column]), inplace= 'time'
df.index.name =1, inplace=True)
df.drop(time_column, axis
return df
@patch
def infer(
self:AICoreRScriptModule,
dict,
data:*_,
**kwargs):
"""
Infer method for the RScriptModule
"""
try:
=[
msgf"Startup time: {self.init_time.isoformat()}",
f"Corebridge version: {self.corebridge_version}",
f"init_args: {self.init_args}, init_kwargs: {self.init_kwargs}",
]
+= self.update_flow()
msg # Pickup params, pop those that are not intended for the processor
= kwargs.pop('writeTag', None)
writeTag = kwargs.pop('readTag', None)
readTag = bool(kwargs.pop('camelCase', False))
camelCase f"writeTag: {writeTag}, readTag: {readTag}, camelCase: {camelCase}")
msg.append(
= kwargs.pop('lastSeen', False)
lastSeen = kwargs.pop('format', "records").lower()
recordformat = kwargs.get('timezone', 'UTC')
timezone f"lastSeen: {lastSeen}, recordformat: {recordformat}, timezone: {timezone}")
msg.append(
reversed = kwargs.pop('reversed', False)
if writeTag:
= set_time_index_zone(timeseries_dataframe_from_datadict(
df 'datetimeMeasure', 'time'], recordformat), timezone)
data, [
=True)
df.sort_index(inplace
f"Writing {df.shape[0]} rows to {writeTag}")
syslog.debug(self.write_uploaded_data(df, writeTag)
if readTag:
= self.read_data(readTag, camel_case=camelCase)
result
if reversed:
= result[::-1]
result
f"Read {result.shape[0]} rows from {readTag}")
syslog.debug(
return {
'msg':msg,
'data': timeseries_dataframe_to_datadict(
if not lastSeen else result[-1:],
result =recordformat,
recordformat=timezone,
timezone=True)
popNaN
}
return {
'msg':msg + self.get_flow_status(),
'data': []
}
except Exception as err:
''.join(traceback.format_exception(None, err, err.__traceback__)))
msg.append(f"Exception {str(err)} in infer()")
syslog.exception(return {
'msg': f"Unexpected {err=}, {type(err)=}",
'data': []
}
AICoreRScriptModule.read_data
AICoreRScriptModule.read_data (tag:str=None, camel_case=False, **kwargs)
AICoreRScriptModule.write_uploaded_data
AICoreRScriptModule.write_uploaded_data (df:pandas.core.frame.DataFrame, tag:str=None, **kwargs)
= AICoreRScriptModule(data_file_flow, save_dir, assets_dir) module
DEBUG 16824 __main__ 319997563.py 22 Starting rscript for Data_preparation.R
DEBUG 16824 __main__ 3613995005.py 10 Output files for Flow object: Data_preparation.R: [('Modelling_data', True), ('Prediction_data', True)]
DEBUG 16824 __main__ 4142405122.py 11 Checksum check result for Flow object: Data_preparation.R: 0, checksum file: input-checksum-5d532037f7dda7e7fad290ddef53f69d
DEBUG 16824 __main__ 319997563.py 67 Output and inputs are up-to-date for Data_preparation.R
DEBUG 16824 __main__ 319997563.py 22 Starting rscript for Prediction_part1.R
DEBUG 16824 __main__ 3613995005.py 10 Output files for Flow object: Prediction_part1.R: [('Fitted_models', True), ('Weights', True)]
DEBUG 16824 __main__ 4142405122.py 11 Checksum check result for Flow object: Prediction_part1.R: 0, checksum file: input-checksum-33757e2024067c7704d44232a63c870b
DEBUG 16824 __main__ 319997563.py 67 Output and inputs are up-to-date for Prediction_part1.R
DEBUG 16824 __main__ 319997563.py 22 Starting rscript for Prediction_part2.R
DEBUG 16824 __main__ 3613995005.py 10 Output files for Flow object: Prediction_part2.R: [('Predicted_sapflux', True)]
DEBUG 16824 __main__ 4142405122.py 11 Checksum check result for Flow object: Prediction_part2.R: 0, checksum file: input-checksum-d37df0122cd5162bc80b4c25b959e4b0
DEBUG 16824 __main__ 319997563.py 67 Output and inputs are up-to-date for Prediction_part2.R
DEBUG 16824 __main__ 319997563.py 22 Starting rscript for Prediction_part3.R
DEBUG 16824 __main__ 3613995005.py 10 Output files for Flow object: Prediction_part3.R: [('Predicted_water_usage', True)]
DEBUG 16824 __main__ 4142405122.py 11 Checksum check result for Flow object: Prediction_part3.R: 0, checksum file: input-checksum-b4e0cb61356d5cc9830bc495b1584e9c
DEBUG 16824 __main__ 319997563.py 67 Output and inputs are up-to-date for Prediction_part3.R
DEBUG 16824 __main__ 3613995005.py 10 Output files for Flow object: Data_preparation.R: [('Modelling_data', True), ('Prediction_data', True)]
DEBUG 16824 __main__ 4142405122.py 11 Checksum check result for Flow object: Data_preparation.R: 0, checksum file: input-checksum-5d532037f7dda7e7fad290ddef53f69d
DEBUG 16824 __main__ 2480783271.py 10 Update Data_preparation.R, output: True, inputs: True
DEBUG 16824 __main__ 3613995005.py 10 Output files for Flow object: Data_preparation.R: [('Modelling_data', True), ('Prediction_data', True)]
DEBUG 16824 __main__ 4142405122.py 11 Checksum check result for Flow object: Data_preparation.R: 0, checksum file: input-checksum-5d532037f7dda7e7fad290ddef53f69d
DEBUG 16824 __main__ 3613995005.py 10 Output files for Flow object: Prediction_part1.R: [('Fitted_models', True), ('Weights', True)]
DEBUG 16824 __main__ 4142405122.py 11 Checksum check result for Flow object: Prediction_part1.R: 0, checksum file: input-checksum-33757e2024067c7704d44232a63c870b
DEBUG 16824 __main__ 2480783271.py 10 Update Prediction_part1.R, output: True, inputs: True
DEBUG 16824 __main__ 3613995005.py 10 Output files for Flow object: Prediction_part1.R: [('Fitted_models', True), ('Weights', True)]
DEBUG 16824 __main__ 4142405122.py 11 Checksum check result for Flow object: Prediction_part1.R: 0, checksum file: input-checksum-33757e2024067c7704d44232a63c870b
DEBUG 16824 __main__ 3613995005.py 10 Output files for Flow object: Prediction_part2.R: [('Predicted_sapflux', True)]
DEBUG 16824 __main__ 4142405122.py 11 Checksum check result for Flow object: Prediction_part2.R: 0, checksum file: input-checksum-d37df0122cd5162bc80b4c25b959e4b0
DEBUG 16824 __main__ 2480783271.py 10 Update Prediction_part2.R, output: True, inputs: True
DEBUG 16824 __main__ 3613995005.py 10 Output files for Flow object: Prediction_part2.R: [('Predicted_sapflux', True)]
DEBUG 16824 __main__ 4142405122.py 11 Checksum check result for Flow object: Prediction_part2.R: 0, checksum file: input-checksum-d37df0122cd5162bc80b4c25b959e4b0
DEBUG 16824 __main__ 3613995005.py 10 Output files for Flow object: Prediction_part3.R: [('Predicted_water_usage', True)]
DEBUG 16824 __main__ 4142405122.py 11 Checksum check result for Flow object: Prediction_part3.R: 0, checksum file: input-checksum-b4e0cb61356d5cc9830bc495b1584e9c
DEBUG 16824 __main__ 2480783271.py 10 Update Prediction_part3.R, output: True, inputs: True
DEBUG 16824 __main__ 3613995005.py 10 Output files for Flow object: Prediction_part3.R: [('Predicted_water_usage', True)]
DEBUG 16824 __main__ 4142405122.py 11 Checksum check result for Flow object: Prediction_part3.R: 0, checksum file: input-checksum-b4e0cb61356d5cc9830bc495b1584e9c
INFO 16824 __main__ 2480783271.py 33 RScriptModule flow update complete.
INFO 16824 __main__ 3112676160.py 42 RScriptModule initialized with 4 flow objects.
Meta_data True /home/fenke/repos/corebridge/nbs/saves/rscript/workdir/Data/Meta_data.csv
Sapflux_Tilia_train True /home/fenke/repos/corebridge/nbs/saves/rscript/workdir/Data/Sapflux_Tilia_train.csv
Weather_Tilia_train True /home/fenke/repos/corebridge/nbs/saves/rscript/workdir/Data/Weather_Tilia_train.csv
Weather_Tilia_pred True /home/fenke/repos/corebridge/nbs/saves/rscript/workdir/Data/Weather_Tilia_pred.csv
Modelling_data True /home/fenke/repos/corebridge/nbs/saves/rscript/workdir/Modelling_data.RData
Fitted_models True /home/fenke/repos/corebridge/nbs/saves/rscript/workdir/Fitted_models.RData
Weights True /home/fenke/repos/corebridge/nbs/saves/rscript/workdir/Weights.RData
Prediction_data True /home/fenke/repos/corebridge/nbs/saves/rscript/workdir/Prediction_data.RData
Predicted_sapflux True /home/fenke/repos/corebridge/nbs/saves/rscript/workdir/Predicted_sapflux.RData
Assets:
total 2112
drwxr-xr-x 1 fenke users 264 Aug 22 14:42 .
drwxr-xr-x 1 fenke users 14 Jul 3 13:28 ..
-rw-r--r-- 1 fenke users 5790 Jun 20 11:25 Data_preparation.R
-rw-r--r-- 1 fenke users 61 Jul 5 10:31 hello.R
-rw-r--r-- 1 fenke users 6397 Jun 20 11:25 Prediction_part1.R
-rw-r--r-- 1 fenke users 4203 Jun 20 11:25 Prediction_part2.R
-rw-r--r-- 1 fenke users 3175 Jun 20 11:25 Prediction_part3.R
-rw-r--r-- 1 fenke users 2117834 Aug 22 14:41 SapflowPredictionData.zip
-rw-r--r-- 1 fenke users 6502 Aug 21 13:10 SapflowPredictionScripts.zip
module.flow_mapping
{0: {'in': {'Meta_data': 'Data/Meta_data.csv',
'Sapflux_Tilia_train': 'Data/Sapflux_Tilia_train.csv',
'Weather_Tilia_train': 'Data/Weather_Tilia_train.csv',
'Weather_Tilia_pred': 'Data/Weather_Tilia_pred.csv'},
'out': {'Modelling_data': 'Modelling_data.RData',
'Prediction_data': 'Prediction_data.RData'},
'libs': ['lubridate', 'stringr', 'zoo'],
'name': 'Data_preparation.R'},
1: {'in': {'Modelling_data': 'Modelling_data.RData'},
'out': {'Fitted_models': 'Fitted_models.RData', 'Weights': 'Weights.RData'},
'libs': ['lubridate', 'stringr', 'mgcv'],
'name': 'Prediction_part1.R'},
2: {'in': {'Fitted_models': 'Fitted_models.RData',
'Weights': 'Weights.RData',
'Modelling_data': 'Modelling_data.RData',
'Prediction_data': 'Prediction_data.RData'},
'out': {'Predicted_sapflux': 'Predicted_sapflux.RData'},
'libs': ['lubridate', 'stringr', 'mgcv'],
'name': 'Prediction_part2.R'},
3: {'in': {'Predicted_sapflux': 'Predicted_sapflux.RData'},
'out': {'Predicted_water_usage': 'Predicted_water_usage.RData'},
'libs': ['lubridate', 'stringr'],
'name': 'Prediction_part3.R'}}
module.get_flow_status()
[]
= [
test_data
{'time': '2020-01-01 00:00:00',
'temperature': 10,
'humidity': 10
},
{'time': '2020-01-01 01:00:00',
'temperature': 11,
'humidity': 11
} ]
module.data_files_map
{'Meta_data': 'Data/Meta_data.csv',
'Sapflux_Tilia_train': 'Data/Sapflux_Tilia_train.csv',
'Weather_Tilia_train': 'Data/Weather_Tilia_train.csv',
'Weather_Tilia_pred': 'Data/Weather_Tilia_pred.csv',
'Modelling_data': 'Modelling_data.RData',
'Fitted_models': 'Fitted_models.RData',
'Weights': 'Weights.RData',
'Prediction_data': 'Prediction_data.RData',
'Predicted_sapflux': 'Predicted_sapflux.RData'}
= pd.DataFrame(test_data)
test_df =pd.DatetimeIndex(test_df['time'])
test_df.index'time', axis=1, inplace=True) test_df.drop(
test_df
temperature | humidity | |
---|---|---|
time | ||
2020-01-01 00:00:00 | 10 | 10 |
2020-01-01 01:00:00 | 11 | 11 |
'upload_data.csv') module.write_uploaded_data(test_df,
DEBUG 16824 __main__ 2136897418.py 11 Writing 2 rows to /home/fenke/repos/corebridge/nbs/saves/rscript/workdir/upload_data.csv
with open(module.get_save_path('upload_data.csv'), 'rt') as cf:
print(cf.read())
time,temperature,humidity
2020-01-01 00:00:00,10,10
2020-01-01 01:00:00,11,11
='uploaded_data.csv') module.infer(test_data, writeTag
DEBUG 16824 __main__ 3613995005.py 10 Output files for Flow object: Data_preparation.R: [('Modelling_data', True), ('Prediction_data', True)]
DEBUG 16824 __main__ 4142405122.py 11 Checksum check result for Flow object: Data_preparation.R: 0, checksum file: input-checksum-5d532037f7dda7e7fad290ddef53f69d
DEBUG 16824 __main__ 2480783271.py 10 Update Data_preparation.R, output: True, inputs: True
DEBUG 16824 __main__ 3613995005.py 10 Output files for Flow object: Data_preparation.R: [('Modelling_data', True), ('Prediction_data', True)]
DEBUG 16824 __main__ 4142405122.py 11 Checksum check result for Flow object: Data_preparation.R: 0, checksum file: input-checksum-5d532037f7dda7e7fad290ddef53f69d
DEBUG 16824 __main__ 3613995005.py 10 Output files for Flow object: Prediction_part1.R: [('Fitted_models', True), ('Weights', True)]
DEBUG 16824 __main__ 4142405122.py 11 Checksum check result for Flow object: Prediction_part1.R: 0, checksum file: input-checksum-33757e2024067c7704d44232a63c870b
DEBUG 16824 __main__ 2480783271.py 10 Update Prediction_part1.R, output: True, inputs: True
DEBUG 16824 __main__ 3613995005.py 10 Output files for Flow object: Prediction_part1.R: [('Fitted_models', True), ('Weights', True)]
DEBUG 16824 __main__ 4142405122.py 11 Checksum check result for Flow object: Prediction_part1.R: 0, checksum file: input-checksum-33757e2024067c7704d44232a63c870b
DEBUG 16824 __main__ 3613995005.py 10 Output files for Flow object: Prediction_part2.R: [('Predicted_sapflux', True)]
DEBUG 16824 __main__ 4142405122.py 11 Checksum check result for Flow object: Prediction_part2.R: 0, checksum file: input-checksum-d37df0122cd5162bc80b4c25b959e4b0
DEBUG 16824 __main__ 2480783271.py 10 Update Prediction_part2.R, output: True, inputs: True
DEBUG 16824 __main__ 3613995005.py 10 Output files for Flow object: Prediction_part2.R: [('Predicted_sapflux', True)]
DEBUG 16824 __main__ 4142405122.py 11 Checksum check result for Flow object: Prediction_part2.R: 0, checksum file: input-checksum-d37df0122cd5162bc80b4c25b959e4b0
DEBUG 16824 __main__ 3613995005.py 10 Output files for Flow object: Prediction_part3.R: [('Predicted_water_usage', True)]
DEBUG 16824 __main__ 4142405122.py 11 Checksum check result for Flow object: Prediction_part3.R: 0, checksum file: input-checksum-b4e0cb61356d5cc9830bc495b1584e9c
DEBUG 16824 __main__ 2480783271.py 10 Update Prediction_part3.R, output: True, inputs: True
DEBUG 16824 __main__ 3613995005.py 10 Output files for Flow object: Prediction_part3.R: [('Predicted_water_usage', True)]
DEBUG 16824 __main__ 4142405122.py 11 Checksum check result for Flow object: Prediction_part3.R: 0, checksum file: input-checksum-b4e0cb61356d5cc9830bc495b1584e9c
INFO 16824 __main__ 2480783271.py 33 RScriptModule flow update complete.
DEBUG 16824 __main__ 2136897418.py 73 Writing 2 rows to uploaded_data.csv
DEBUG 16824 __main__ 2136897418.py 11 Writing 2 rows to /home/fenke/repos/corebridge/nbs/saves/rscript/workdir/uploaded_data.csv
{'msg': ['Startup time: 2024-09-13T08:10:16.217583+00:00',
"init_args: (), init_kwargs: {'assets_dir': '/home/fenke/repos/corebridge/nbs/assets/rscript', 'save_dir': '/home/fenke/repos/corebridge/nbs/saves/rscript'}",
'writeTag: uploaded_data.csv, readTag: None, camelCase: False',
'lastSeen: False, recordformat: records, timezone: UTC'],
'data': []}
'uploaded_data.csv') module.get_save_path(
'/home/fenke/repos/corebridge/nbs/saves/rscript/workdir/uploaded_data.csv'
with open(module.get_save_path('uploaded_data.csv'), 'rt') as cf:
print(cf.read())
time,temperature,humidity
2020-01-01 00:00:00,10,10
2020-01-01 01:00:00,11,11
#converted = rdata.read_rda(module.get_save_path(module.data_files_map.get('Predicted_sapflux')))
'Predicted_sapflux', camel_case=True) module.read_data(
DEBUG 16824 __main__ 2136897418.py 23 Read 168 rows from /home/fenke/repos/corebridge/nbs/saves/rscript/workdir/Predicted_sapflux.RData for Predicted_sapflux (camel_case=True)
ensemblePredEnsemble1Equ | ensemblePredEnsemble1Mse | ensemblePredEnsemble2Equ | ensemblePredEnsemble2Mse | ensemblePredEnsemble3Equ | ensemblePredEnsemble3Mse | ensemblePredEnsemble4Equ | ensemblePredEnsemble4Mse | ensemblePredEnsemble5Equ | ensemblePredEnsemble5Mse | ... | ensemblePredError8Equ | ensemblePredError8Mse | ensemblePredError9Equ | ensemblePredError9Mse | ensemblePredError10Equ | ensemblePredError10Mse | ensemblePredTypicalEquPred | ensemblePredTypicalEquSe | ensemblePredTypicalMsePred | ensemblePredTypicalMseSe | |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
time | |||||||||||||||||||||
2023-08-01 00:00:01 | 0.549107 | 0.542947 | 0.666708 | 0.658971 | 0.780691 | 0.771425 | 0.660063 | 0.652414 | 0.584461 | 0.577826 | ... | NaN | NaN | 0.356676 | 0.356170 | 0.375093 | 0.375200 | 0.617332 | 0.125831 | 0.549231 | 0.126207 |
2023-08-01 01:00:01 | 0.490717 | 0.483454 | 0.561241 | 0.552198 | 0.629595 | 0.618828 | 0.557256 | 0.548314 | 0.511918 | 0.504120 | ... | NaN | NaN | 0.510744 | 0.508403 | 0.527951 | 0.526323 | 0.531630 | 0.171059 | 0.471002 | 0.170967 |
2023-08-01 02:00:01 | 0.455916 | 0.448803 | 0.499389 | 0.490743 | 0.541525 | 0.531393 | 0.496933 | 0.488373 | 0.468985 | 0.461412 | ... | NaN | NaN | 0.594047 | 0.589900 | 0.606447 | 0.602945 | 0.481136 | 0.191053 | 0.425821 | 0.190360 |
2023-08-01 03:00:01 | 0.424082 | 0.417034 | 0.451503 | 0.443261 | 0.478080 | 0.468680 | 0.449953 | 0.441779 | 0.432326 | 0.424918 | ... | NaN | NaN | 0.641129 | 0.635564 | 0.649752 | 0.644735 | 0.439990 | 0.201109 | 0.389024 | 0.199905 |
2023-08-01 04:00:01 | 0.407526 | 0.402109 | 0.425149 | 0.418847 | 0.442231 | 0.435070 | 0.424153 | 0.417901 | 0.412824 | 0.407141 | ... | NaN | NaN | 0.622489 | 0.615642 | 0.627102 | 0.620682 | 0.417750 | 0.191545 | 0.370637 | 0.189855 |
... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... |
2023-08-07 19:00:01 | 3.658327 | 3.659715 | 3.658327 | 3.659715 | 3.658327 | 3.659715 | 3.658327 | 3.659715 | 3.658327 | 3.659715 | ... | NaN | NaN | 2.302566 | 2.259919 | 2.302566 | 2.259919 | 3.658327 | 0.690770 | 3.293744 | 0.677976 |
2023-08-07 20:00:01 | 2.298355 | 2.278851 | 2.298355 | 2.278851 | 2.298355 | 2.278851 | 2.298355 | 2.278851 | 2.298355 | 2.278851 | ... | NaN | NaN | 1.790125 | 1.785654 | 1.790125 | 1.785654 | 2.298355 | 0.537037 | 2.050966 | 0.535696 |
2023-08-07 21:00:01 | 1.512578 | 1.489109 | 1.512578 | 1.489109 | 1.512578 | 1.489109 | 1.512578 | 1.489109 | 1.512578 | 1.489109 | ... | NaN | NaN | 1.489199 | 1.497512 | 1.489199 | 1.497512 | 1.512578 | 0.446760 | 1.340198 | 0.449254 |
2023-08-07 22:00:01 | 1.018082 | 1.001020 | 1.018082 | 1.001020 | 1.018082 | 1.001020 | 1.018082 | 1.001020 | 1.018082 | 1.001020 | ... | NaN | NaN | 1.051568 | 1.060695 | 1.051568 | 1.060695 | 1.018082 | 0.315470 | 0.900918 | 0.318209 |
2023-08-07 23:00:01 | 0.667827 | 0.661204 | 0.667827 | 0.661204 | 0.667827 | 0.661204 | 0.667827 | 0.661204 | 0.667827 | 0.661204 | ... | NaN | NaN | 0.647845 | 0.656302 | 0.647845 | 0.656302 | 0.667827 | 0.194353 | 0.595083 | 0.196890 |
168 rows × 44 columns
=module.infer(test_data, writeTag='uploaded_data.csv', readTag='Predicted_sapflux', camelCase=True) result
DEBUG 16824 __main__ 3613995005.py 10 Output files for Flow object: Data_preparation.R: [('Modelling_data', True), ('Prediction_data', True)]
DEBUG 16824 __main__ 4142405122.py 11 Checksum check result for Flow object: Data_preparation.R: 0, checksum file: input-checksum-5d532037f7dda7e7fad290ddef53f69d
DEBUG 16824 __main__ 2480783271.py 10 Update Data_preparation.R, output: True, inputs: True
DEBUG 16824 __main__ 3613995005.py 10 Output files for Flow object: Data_preparation.R: [('Modelling_data', True), ('Prediction_data', True)]
DEBUG 16824 __main__ 4142405122.py 11 Checksum check result for Flow object: Data_preparation.R: 0, checksum file: input-checksum-5d532037f7dda7e7fad290ddef53f69d
DEBUG 16824 __main__ 3613995005.py 10 Output files for Flow object: Prediction_part1.R: [('Fitted_models', True), ('Weights', True)]
DEBUG 16824 __main__ 4142405122.py 11 Checksum check result for Flow object: Prediction_part1.R: 0, checksum file: input-checksum-33757e2024067c7704d44232a63c870b
DEBUG 16824 __main__ 2480783271.py 10 Update Prediction_part1.R, output: True, inputs: True
DEBUG 16824 __main__ 3613995005.py 10 Output files for Flow object: Prediction_part1.R: [('Fitted_models', True), ('Weights', True)]
DEBUG 16824 __main__ 4142405122.py 11 Checksum check result for Flow object: Prediction_part1.R: 0, checksum file: input-checksum-33757e2024067c7704d44232a63c870b
DEBUG 16824 __main__ 3613995005.py 10 Output files for Flow object: Prediction_part2.R: [('Predicted_sapflux', True)]
DEBUG 16824 __main__ 4142405122.py 11 Checksum check result for Flow object: Prediction_part2.R: 0, checksum file: input-checksum-d37df0122cd5162bc80b4c25b959e4b0
DEBUG 16824 __main__ 2480783271.py 10 Update Prediction_part2.R, output: True, inputs: True
DEBUG 16824 __main__ 3613995005.py 10 Output files for Flow object: Prediction_part2.R: [('Predicted_sapflux', True)]
DEBUG 16824 __main__ 4142405122.py 11 Checksum check result for Flow object: Prediction_part2.R: 0, checksum file: input-checksum-d37df0122cd5162bc80b4c25b959e4b0
DEBUG 16824 __main__ 3613995005.py 10 Output files for Flow object: Prediction_part3.R: [('Predicted_water_usage', True)]
DEBUG 16824 __main__ 4142405122.py 11 Checksum check result for Flow object: Prediction_part3.R: 0, checksum file: input-checksum-b4e0cb61356d5cc9830bc495b1584e9c
DEBUG 16824 __main__ 2480783271.py 10 Update Prediction_part3.R, output: True, inputs: True
DEBUG 16824 __main__ 3613995005.py 10 Output files for Flow object: Prediction_part3.R: [('Predicted_water_usage', True)]
DEBUG 16824 __main__ 4142405122.py 11 Checksum check result for Flow object: Prediction_part3.R: 0, checksum file: input-checksum-b4e0cb61356d5cc9830bc495b1584e9c
INFO 16824 __main__ 2480783271.py 33 RScriptModule flow update complete.
DEBUG 16824 __main__ 2136897418.py 73 Writing 2 rows to uploaded_data.csv
DEBUG 16824 __main__ 2136897418.py 11 Writing 2 rows to /home/fenke/repos/corebridge/nbs/saves/rscript/workdir/uploaded_data.csv
DEBUG 16824 __main__ 2136897418.py 23 Read 168 rows from /home/fenke/repos/corebridge/nbs/saves/rscript/workdir/Predicted_sapflux.RData for Predicted_sapflux (camel_case=True)
DEBUG 16824 __main__ 2136897418.py 82 Read 168 rows from Predicted_sapflux
'msg'] result[
['Startup time: 2024-09-13T08:10:16.217583+00:00',
"init_args: (), init_kwargs: {'assets_dir': '/home/fenke/repos/corebridge/nbs/assets/rscript', 'save_dir': '/home/fenke/repos/corebridge/nbs/saves/rscript'}",
'writeTag: uploaded_data.csv, readTag: Predicted_sapflux, camelCase: True',
'lastSeen: False, recordformat: records, timezone: UTC']
'data'], ['time'], 'records') timeseries_dataframe_from_datadict(result[
ensemblePredEnsemble1Equ | ensemblePredEnsemble1Mse | ensemblePredEnsemble2Equ | ensemblePredEnsemble2Mse | ensemblePredEnsemble3Equ | ensemblePredEnsemble3Mse | ensemblePredEnsemble4Equ | ensemblePredEnsemble4Mse | ensemblePredEnsemble5Equ | ensemblePredEnsemble5Mse | ... | ensemblePredError7Equ | ensemblePredError7Mse | ensemblePredError9Equ | ensemblePredError9Mse | ensemblePredError10Equ | ensemblePredError10Mse | ensemblePredTypicalEquPred | ensemblePredTypicalEquSe | ensemblePredTypicalMsePred | ensemblePredTypicalMseSe | |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
time | |||||||||||||||||||||
2023-08-01 00:00:01+00:00 | 0.549107 | 0.542947 | 0.666708 | 0.658971 | 0.780691 | 0.771425 | 0.660063 | 0.652414 | 0.584461 | 0.577826 | ... | 0.392640 | 0.393263 | 0.356676 | 0.356170 | 0.375093 | 0.375200 | 0.617332 | 0.125831 | 0.549231 | 0.126207 |
2023-08-01 01:00:01+00:00 | 0.490717 | 0.483454 | 0.561241 | 0.552198 | 0.629595 | 0.618828 | 0.557256 | 0.548314 | 0.511918 | 0.504120 | ... | 0.544555 | 0.543532 | 0.510744 | 0.508403 | 0.527951 | 0.526323 | 0.531630 | 0.171059 | 0.471002 | 0.170967 |
2023-08-01 02:00:01+00:00 | 0.455916 | 0.448803 | 0.499389 | 0.490743 | 0.541525 | 0.531393 | 0.496933 | 0.488373 | 0.468985 | 0.461412 | ... | 0.618437 | 0.615487 | 0.594047 | 0.589900 | 0.606447 | 0.602945 | 0.481136 | 0.191053 | 0.425821 | 0.190360 |
2023-08-01 03:00:01+00:00 | 0.424082 | 0.417034 | 0.451503 | 0.443261 | 0.478080 | 0.468680 | 0.449953 | 0.441779 | 0.432326 | 0.424918 | ... | 0.658013 | 0.653465 | 0.641129 | 0.635564 | 0.649752 | 0.644735 | 0.439990 | 0.201109 | 0.389024 | 0.199905 |
2023-08-01 04:00:01+00:00 | 0.407526 | 0.402109 | 0.425149 | 0.418847 | 0.442231 | 0.435070 | 0.424153 | 0.417901 | 0.412824 | 0.407141 | ... | 0.631589 | 0.625537 | 0.622489 | 0.615642 | 0.627102 | 0.620682 | 0.417750 | 0.191545 | 0.370637 | 0.189855 |
... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... |
2023-08-07 19:00:01+00:00 | 3.658327 | 3.659715 | 3.658327 | 3.659715 | 3.658327 | 3.659715 | 3.658327 | 3.659715 | 3.658327 | 3.659715 | ... | 2.302566 | 2.259919 | 2.302566 | 2.259919 | 2.302566 | 2.259919 | 3.658327 | 0.690770 | 3.293744 | 0.677976 |
2023-08-07 20:00:01+00:00 | 2.298355 | 2.278851 | 2.298355 | 2.278851 | 2.298355 | 2.278851 | 2.298355 | 2.278851 | 2.298355 | 2.278851 | ... | 1.790125 | 1.785654 | 1.790125 | 1.785654 | 1.790125 | 1.785654 | 2.298355 | 0.537037 | 2.050966 | 0.535696 |
2023-08-07 21:00:01+00:00 | 1.512578 | 1.489109 | 1.512578 | 1.489109 | 1.512578 | 1.489109 | 1.512578 | 1.489109 | 1.512578 | 1.489109 | ... | 1.489199 | 1.497512 | 1.489199 | 1.497512 | 1.489199 | 1.497512 | 1.512578 | 0.446760 | 1.340198 | 0.449254 |
2023-08-07 22:00:01+00:00 | 1.018082 | 1.001020 | 1.018082 | 1.001020 | 1.018082 | 1.001020 | 1.018082 | 1.001020 | 1.018082 | 1.001020 | ... | 1.051568 | 1.060695 | 1.051568 | 1.060695 | 1.051568 | 1.060695 | 1.018082 | 0.315470 | 0.900918 | 0.318209 |
2023-08-07 23:00:01+00:00 | 0.667827 | 0.661204 | 0.667827 | 0.661204 | 0.667827 | 0.661204 | 0.667827 | 0.661204 | 0.667827 | 0.661204 | ... | 0.647845 | 0.656302 | 0.647845 | 0.656302 | 0.647845 | 0.656302 | 0.667827 | 0.194353 | 0.595083 | 0.196890 |
168 rows × 41 columns
=True, writeTag='uploaded_data.csv', readTag='Predicted_sapflux')['data'], ['time'], 'records') timeseries_dataframe_from_datadict(module.infer(test_data, camelCase
DEBUG 16824 __main__ 3613995005.py 10 Output files for Flow object: Data_preparation.R: [('Modelling_data', True), ('Prediction_data', True)]
DEBUG 16824 __main__ 4142405122.py 11 Checksum check result for Flow object: Data_preparation.R: 0, checksum file: input-checksum-5d532037f7dda7e7fad290ddef53f69d
DEBUG 16824 __main__ 2480783271.py 10 Update Data_preparation.R, output: True, inputs: True
DEBUG 16824 __main__ 3613995005.py 10 Output files for Flow object: Data_preparation.R: [('Modelling_data', True), ('Prediction_data', True)]
DEBUG 16824 __main__ 4142405122.py 11 Checksum check result for Flow object: Data_preparation.R: 0, checksum file: input-checksum-5d532037f7dda7e7fad290ddef53f69d
DEBUG 16824 __main__ 3613995005.py 10 Output files for Flow object: Prediction_part1.R: [('Fitted_models', True), ('Weights', True)]
DEBUG 16824 __main__ 4142405122.py 11 Checksum check result for Flow object: Prediction_part1.R: 0, checksum file: input-checksum-33757e2024067c7704d44232a63c870b
DEBUG 16824 __main__ 2480783271.py 10 Update Prediction_part1.R, output: True, inputs: True
DEBUG 16824 __main__ 3613995005.py 10 Output files for Flow object: Prediction_part1.R: [('Fitted_models', True), ('Weights', True)]
DEBUG 16824 __main__ 4142405122.py 11 Checksum check result for Flow object: Prediction_part1.R: 0, checksum file: input-checksum-33757e2024067c7704d44232a63c870b
DEBUG 16824 __main__ 3613995005.py 10 Output files for Flow object: Prediction_part2.R: [('Predicted_sapflux', True)]
DEBUG 16824 __main__ 4142405122.py 11 Checksum check result for Flow object: Prediction_part2.R: 0, checksum file: input-checksum-d37df0122cd5162bc80b4c25b959e4b0
DEBUG 16824 __main__ 2480783271.py 10 Update Prediction_part2.R, output: True, inputs: True
DEBUG 16824 __main__ 3613995005.py 10 Output files for Flow object: Prediction_part2.R: [('Predicted_sapflux', True)]
DEBUG 16824 __main__ 4142405122.py 11 Checksum check result for Flow object: Prediction_part2.R: 0, checksum file: input-checksum-d37df0122cd5162bc80b4c25b959e4b0
DEBUG 16824 __main__ 3613995005.py 10 Output files for Flow object: Prediction_part3.R: [('Predicted_water_usage', True)]
DEBUG 16824 __main__ 4142405122.py 11 Checksum check result for Flow object: Prediction_part3.R: 0, checksum file: input-checksum-b4e0cb61356d5cc9830bc495b1584e9c
DEBUG 16824 __main__ 2480783271.py 10 Update Prediction_part3.R, output: True, inputs: True
DEBUG 16824 __main__ 3613995005.py 10 Output files for Flow object: Prediction_part3.R: [('Predicted_water_usage', True)]
DEBUG 16824 __main__ 4142405122.py 11 Checksum check result for Flow object: Prediction_part3.R: 0, checksum file: input-checksum-b4e0cb61356d5cc9830bc495b1584e9c
INFO 16824 __main__ 2480783271.py 33 RScriptModule flow update complete.
DEBUG 16824 __main__ 2136897418.py 73 Writing 2 rows to uploaded_data.csv
DEBUG 16824 __main__ 2136897418.py 11 Writing 2 rows to /home/fenke/repos/corebridge/nbs/saves/rscript/workdir/uploaded_data.csv
DEBUG 16824 __main__ 2136897418.py 23 Read 168 rows from /home/fenke/repos/corebridge/nbs/saves/rscript/workdir/Predicted_sapflux.RData for Predicted_sapflux (camel_case=True)
DEBUG 16824 __main__ 2136897418.py 82 Read 168 rows from Predicted_sapflux
ensemblePredEnsemble1Equ | ensemblePredEnsemble1Mse | ensemblePredEnsemble2Equ | ensemblePredEnsemble2Mse | ensemblePredEnsemble3Equ | ensemblePredEnsemble3Mse | ensemblePredEnsemble4Equ | ensemblePredEnsemble4Mse | ensemblePredEnsemble5Equ | ensemblePredEnsemble5Mse | ... | ensemblePredError7Equ | ensemblePredError7Mse | ensemblePredError9Equ | ensemblePredError9Mse | ensemblePredError10Equ | ensemblePredError10Mse | ensemblePredTypicalEquPred | ensemblePredTypicalEquSe | ensemblePredTypicalMsePred | ensemblePredTypicalMseSe | |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
time | |||||||||||||||||||||
2023-08-01 00:00:01+00:00 | 0.549107 | 0.542947 | 0.666708 | 0.658971 | 0.780691 | 0.771425 | 0.660063 | 0.652414 | 0.584461 | 0.577826 | ... | 0.392640 | 0.393263 | 0.356676 | 0.356170 | 0.375093 | 0.375200 | 0.617332 | 0.125831 | 0.549231 | 0.126207 |
2023-08-01 01:00:01+00:00 | 0.490717 | 0.483454 | 0.561241 | 0.552198 | 0.629595 | 0.618828 | 0.557256 | 0.548314 | 0.511918 | 0.504120 | ... | 0.544555 | 0.543532 | 0.510744 | 0.508403 | 0.527951 | 0.526323 | 0.531630 | 0.171059 | 0.471002 | 0.170967 |
2023-08-01 02:00:01+00:00 | 0.455916 | 0.448803 | 0.499389 | 0.490743 | 0.541525 | 0.531393 | 0.496933 | 0.488373 | 0.468985 | 0.461412 | ... | 0.618437 | 0.615487 | 0.594047 | 0.589900 | 0.606447 | 0.602945 | 0.481136 | 0.191053 | 0.425821 | 0.190360 |
2023-08-01 03:00:01+00:00 | 0.424082 | 0.417034 | 0.451503 | 0.443261 | 0.478080 | 0.468680 | 0.449953 | 0.441779 | 0.432326 | 0.424918 | ... | 0.658013 | 0.653465 | 0.641129 | 0.635564 | 0.649752 | 0.644735 | 0.439990 | 0.201109 | 0.389024 | 0.199905 |
2023-08-01 04:00:01+00:00 | 0.407526 | 0.402109 | 0.425149 | 0.418847 | 0.442231 | 0.435070 | 0.424153 | 0.417901 | 0.412824 | 0.407141 | ... | 0.631589 | 0.625537 | 0.622489 | 0.615642 | 0.627102 | 0.620682 | 0.417750 | 0.191545 | 0.370637 | 0.189855 |
... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... |
2023-08-07 19:00:01+00:00 | 3.658327 | 3.659715 | 3.658327 | 3.659715 | 3.658327 | 3.659715 | 3.658327 | 3.659715 | 3.658327 | 3.659715 | ... | 2.302566 | 2.259919 | 2.302566 | 2.259919 | 2.302566 | 2.259919 | 3.658327 | 0.690770 | 3.293744 | 0.677976 |
2023-08-07 20:00:01+00:00 | 2.298355 | 2.278851 | 2.298355 | 2.278851 | 2.298355 | 2.278851 | 2.298355 | 2.278851 | 2.298355 | 2.278851 | ... | 1.790125 | 1.785654 | 1.790125 | 1.785654 | 1.790125 | 1.785654 | 2.298355 | 0.537037 | 2.050966 | 0.535696 |
2023-08-07 21:00:01+00:00 | 1.512578 | 1.489109 | 1.512578 | 1.489109 | 1.512578 | 1.489109 | 1.512578 | 1.489109 | 1.512578 | 1.489109 | ... | 1.489199 | 1.497512 | 1.489199 | 1.497512 | 1.489199 | 1.497512 | 1.512578 | 0.446760 | 1.340198 | 0.449254 |
2023-08-07 22:00:01+00:00 | 1.018082 | 1.001020 | 1.018082 | 1.001020 | 1.018082 | 1.001020 | 1.018082 | 1.001020 | 1.018082 | 1.001020 | ... | 1.051568 | 1.060695 | 1.051568 | 1.060695 | 1.051568 | 1.060695 | 1.018082 | 0.315470 | 0.900918 | 0.318209 |
2023-08-07 23:00:01+00:00 | 0.667827 | 0.661204 | 0.667827 | 0.661204 | 0.667827 | 0.661204 | 0.667827 | 0.661204 | 0.667827 | 0.661204 | ... | 0.647845 | 0.656302 | 0.647845 | 0.656302 | 0.647845 | 0.656302 | 0.667827 | 0.194353 | 0.595083 | 0.196890 |
168 rows × 41 columns
module.update_flow()
DEBUG 16824 __main__ 3613995005.py 10 Output files for Flow object: Data_preparation.R: [('Modelling_data', True), ('Prediction_data', True)]
DEBUG 16824 __main__ 4142405122.py 11 Checksum check result for Flow object: Data_preparation.R: 0, checksum file: input-checksum-5d532037f7dda7e7fad290ddef53f69d
DEBUG 16824 __main__ 2480783271.py 10 Update Data_preparation.R, output: True, inputs: True
DEBUG 16824 __main__ 3613995005.py 10 Output files for Flow object: Data_preparation.R: [('Modelling_data', True), ('Prediction_data', True)]
DEBUG 16824 __main__ 4142405122.py 11 Checksum check result for Flow object: Data_preparation.R: 0, checksum file: input-checksum-5d532037f7dda7e7fad290ddef53f69d
DEBUG 16824 __main__ 3613995005.py 10 Output files for Flow object: Prediction_part1.R: [('Fitted_models', True), ('Weights', True)]
DEBUG 16824 __main__ 4142405122.py 11 Checksum check result for Flow object: Prediction_part1.R: 0, checksum file: input-checksum-33757e2024067c7704d44232a63c870b
DEBUG 16824 __main__ 2480783271.py 10 Update Prediction_part1.R, output: True, inputs: True
DEBUG 16824 __main__ 3613995005.py 10 Output files for Flow object: Prediction_part1.R: [('Fitted_models', True), ('Weights', True)]
DEBUG 16824 __main__ 4142405122.py 11 Checksum check result for Flow object: Prediction_part1.R: 0, checksum file: input-checksum-33757e2024067c7704d44232a63c870b
DEBUG 16824 __main__ 3613995005.py 10 Output files for Flow object: Prediction_part2.R: [('Predicted_sapflux', True)]
DEBUG 16824 __main__ 4142405122.py 11 Checksum check result for Flow object: Prediction_part2.R: 0, checksum file: input-checksum-d37df0122cd5162bc80b4c25b959e4b0
DEBUG 16824 __main__ 2480783271.py 10 Update Prediction_part2.R, output: True, inputs: True
DEBUG 16824 __main__ 3613995005.py 10 Output files for Flow object: Prediction_part2.R: [('Predicted_sapflux', True)]
DEBUG 16824 __main__ 4142405122.py 11 Checksum check result for Flow object: Prediction_part2.R: 0, checksum file: input-checksum-d37df0122cd5162bc80b4c25b959e4b0
DEBUG 16824 __main__ 3613995005.py 10 Output files for Flow object: Prediction_part3.R: [('Predicted_water_usage', True)]
DEBUG 16824 __main__ 4142405122.py 11 Checksum check result for Flow object: Prediction_part3.R: 0, checksum file: input-checksum-b4e0cb61356d5cc9830bc495b1584e9c
DEBUG 16824 __main__ 2480783271.py 10 Update Prediction_part3.R, output: True, inputs: True
DEBUG 16824 __main__ 3613995005.py 10 Output files for Flow object: Prediction_part3.R: [('Predicted_water_usage', True)]
DEBUG 16824 __main__ 4142405122.py 11 Checksum check result for Flow object: Prediction_part3.R: 0, checksum file: input-checksum-b4e0cb61356d5cc9830bc495b1584e9c
INFO 16824 __main__ 2480783271.py 33 RScriptModule flow update complete.
[]
module.get_flow_status()
[]
module.infer([])
DEBUG 16824 __main__ 3613995005.py 10 Output files for Flow object: Data_preparation.R: [('Modelling_data', True), ('Prediction_data', True)]
DEBUG 16824 __main__ 4142405122.py 11 Checksum check result for Flow object: Data_preparation.R: 0, checksum file: input-checksum-5d532037f7dda7e7fad290ddef53f69d
DEBUG 16824 __main__ 2480783271.py 10 Update Data_preparation.R, output: True, inputs: True
DEBUG 16824 __main__ 3613995005.py 10 Output files for Flow object: Data_preparation.R: [('Modelling_data', True), ('Prediction_data', True)]
DEBUG 16824 __main__ 4142405122.py 11 Checksum check result for Flow object: Data_preparation.R: 0, checksum file: input-checksum-5d532037f7dda7e7fad290ddef53f69d
DEBUG 16824 __main__ 3613995005.py 10 Output files for Flow object: Prediction_part1.R: [('Fitted_models', True), ('Weights', True)]
DEBUG 16824 __main__ 4142405122.py 11 Checksum check result for Flow object: Prediction_part1.R: 0, checksum file: input-checksum-33757e2024067c7704d44232a63c870b
DEBUG 16824 __main__ 2480783271.py 10 Update Prediction_part1.R, output: True, inputs: True
DEBUG 16824 __main__ 3613995005.py 10 Output files for Flow object: Prediction_part1.R: [('Fitted_models', True), ('Weights', True)]
DEBUG 16824 __main__ 4142405122.py 11 Checksum check result for Flow object: Prediction_part1.R: 0, checksum file: input-checksum-33757e2024067c7704d44232a63c870b
DEBUG 16824 __main__ 3613995005.py 10 Output files for Flow object: Prediction_part2.R: [('Predicted_sapflux', True)]
DEBUG 16824 __main__ 4142405122.py 11 Checksum check result for Flow object: Prediction_part2.R: 0, checksum file: input-checksum-d37df0122cd5162bc80b4c25b959e4b0
DEBUG 16824 __main__ 2480783271.py 10 Update Prediction_part2.R, output: True, inputs: True
DEBUG 16824 __main__ 3613995005.py 10 Output files for Flow object: Prediction_part2.R: [('Predicted_sapflux', True)]
DEBUG 16824 __main__ 4142405122.py 11 Checksum check result for Flow object: Prediction_part2.R: 0, checksum file: input-checksum-d37df0122cd5162bc80b4c25b959e4b0
DEBUG 16824 __main__ 3613995005.py 10 Output files for Flow object: Prediction_part3.R: [('Predicted_water_usage', True)]
DEBUG 16824 __main__ 4142405122.py 11 Checksum check result for Flow object: Prediction_part3.R: 0, checksum file: input-checksum-b4e0cb61356d5cc9830bc495b1584e9c
DEBUG 16824 __main__ 2480783271.py 10 Update Prediction_part3.R, output: True, inputs: True
DEBUG 16824 __main__ 3613995005.py 10 Output files for Flow object: Prediction_part3.R: [('Predicted_water_usage', True)]
DEBUG 16824 __main__ 4142405122.py 11 Checksum check result for Flow object: Prediction_part3.R: 0, checksum file: input-checksum-b4e0cb61356d5cc9830bc495b1584e9c
INFO 16824 __main__ 2480783271.py 33 RScriptModule flow update complete.
{'msg': ['Startup time: 2024-09-13T08:10:16.217583+00:00',
"init_args: (), init_kwargs: {'assets_dir': '/home/fenke/repos/corebridge/nbs/assets/rscript', 'save_dir': '/home/fenke/repos/corebridge/nbs/saves/rscript'}",
'writeTag: None, readTag: None, camelCase: False',
'lastSeen: False, recordformat: records, timezone: UTC'],
'data': []}
run_rscript_nowait.lock_objects
{}