From 21789896b86dfd5cc6cb362e8733701dc785139c Mon Sep 17 00:00:00 2001 From: Paul Tomasula <31142705+ptomasula@users.noreply.github.com> Date: Thu, 21 Oct 2021 09:46:55 -0400 Subject: [PATCH 01/19] Draft IOBase Class --- HSP2IO/base.py | 52 ++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 52 insertions(+) create mode 100644 HSP2IO/base.py diff --git a/HSP2IO/base.py b/HSP2IO/base.py new file mode 100644 index 00000000..154ee633 --- /dev/null +++ b/HSP2IO/base.py @@ -0,0 +1,52 @@ +from abc import ABC, abstractmethod +from typing import Tuple, Dict, Any, List +from collections import defaultdict +import pandas as pd +import numba.typed +import numpy as np + +from pandas.core.frame import Pandas + +UCITuple = [defaultdict(dict), defaultdict(list), defaultdict(list), + defaultdict(list), defaultdict(dict), dict, int] +TimeSeriesDict = Dict[np.types.unicode_type,np.types.float64] + +class IOBase(ABC): + + @abstractmethod + def read_uci(self) -> UCITuple: + """""" + + @abstractmethod + def write_uci(self, UCITuple) -> None: + """""" + + @abstractmethod + def read_timeseries(self, ext_sourcesdd:List[Pandas], siminfo:Dict[str,Any]) -> TimeSeriesDict: + """""" + + @abstractmethod + #ts type hint in incorrect + def write_timeseries(self, ts:TimeSeriesDict, siminfo:Dict[str,Any], saveall:bool, + operation:str, segment:str, activity:str) -> None: + """""" + + ### Potentially need to add get_flows method as well + +class IOHDF(IOBase): + + def __init__(self): + pass + + def read_uci(self) -> UCITuple: + pass + + def write_uci(self, UCITuple) -> None: + pass + + def read_timeseries(self, ext_sourcesdd: List[Pandas], siminfo: Dict[str, Any]) -> TimeSeriesDict: + pass + + def write_timeseries(self, ts:TimeSeriesDict, siminfo:Dict[str,Any], saveall:bool, + operation:str, segment:str, activity:str) -> None: + pass \ No newline at end of file From 8691be8d2b80aeef2df1e1dceb4f53d3e88e52f9 Mon Sep 17 00:00:00 2001 From: Paul Tomasula <31142705+ptomasula@users.noreply.github.com> Date: Wed, 17 Nov 2021 09:59:16 -0500 Subject: [PATCH 02/19] Switch to Protocol based approach. A Protocol approach to IO abstraction will provide more flexibility for this application over the Abstract Base Class (ABC) approach I previously mapped out. Some of the IO abstractions will only support a subset of the operations (e.g. a UCI abstraction will not include read and write method for timeseries). The protocol approach allows up to more easily implement these types of IO abstractions while still defining what the interface typing should be. --- HSP2IO/base.py | 52 --------------------------------------------- HSP2IO/protocols.py | 29 +++++++++++++++++++++++++ 2 files changed, 29 insertions(+), 52 deletions(-) delete mode 100644 HSP2IO/base.py create mode 100644 HSP2IO/protocols.py diff --git a/HSP2IO/base.py b/HSP2IO/base.py deleted file mode 100644 index 154ee633..00000000 --- a/HSP2IO/base.py +++ /dev/null @@ -1,52 +0,0 @@ -from abc import ABC, abstractmethod -from typing import Tuple, Dict, Any, List -from collections import defaultdict -import pandas as pd -import numba.typed -import numpy as np - -from pandas.core.frame import Pandas - -UCITuple = [defaultdict(dict), defaultdict(list), defaultdict(list), - defaultdict(list), defaultdict(dict), dict, int] -TimeSeriesDict = Dict[np.types.unicode_type,np.types.float64] - -class IOBase(ABC): - - @abstractmethod - def read_uci(self) -> UCITuple: - """""" - - @abstractmethod - def write_uci(self, UCITuple) -> None: - """""" - - @abstractmethod - def read_timeseries(self, ext_sourcesdd:List[Pandas], siminfo:Dict[str,Any]) -> TimeSeriesDict: - """""" - - @abstractmethod - #ts type hint in incorrect - def write_timeseries(self, ts:TimeSeriesDict, siminfo:Dict[str,Any], saveall:bool, - operation:str, segment:str, activity:str) -> None: - """""" - - ### Potentially need to add get_flows method as well - -class IOHDF(IOBase): - - def __init__(self): - pass - - def read_uci(self) -> UCITuple: - pass - - def write_uci(self, UCITuple) -> None: - pass - - def read_timeseries(self, ext_sourcesdd: List[Pandas], siminfo: Dict[str, Any]) -> TimeSeriesDict: - pass - - def write_timeseries(self, ts:TimeSeriesDict, siminfo:Dict[str,Any], saveall:bool, - operation:str, segment:str, activity:str) -> None: - pass \ No newline at end of file diff --git a/HSP2IO/protocols.py b/HSP2IO/protocols.py new file mode 100644 index 00000000..8bd041e9 --- /dev/null +++ b/HSP2IO/protocols.py @@ -0,0 +1,29 @@ +from typing import Protocol, Tuple, Dict, Any, List +from collections import defaultdict +import pandas as pd +import numpy as np +from pandas.core.frame import Pandas + +UCITuple = [defaultdict(dict), defaultdict(list), defaultdict(list), + defaultdict(list), defaultdict(dict), dict, int] +TimeSeriesDict = Dict[np.types.unicode_type,np.types.float64] + +class ReadableUCI(Protocol): + def read_uci(self) -> UCITuple: + ... + +class WriteableUCI(Protocol): + def write_uci(self, UCITuple) -> None: + ... + +class ReadableTSStorage(Protocol): + def read_timeseries(self, ext_sourcesdd:List[Pandas], siminfo:Dict[str,Any]) -> TimeSeriesDict: + ... + +class WriteableTimeseries(Protocol): + + def write_timeseries(self, ts:TimeSeriesDict, siminfo:Dict[str,Any], saveall:bool, + operation:str, segment:str, activity:str) -> None: + ... + +### Potentially need to add get_flows method as well \ No newline at end of file From 33881da51157a262333573b2e4be8219317f0fe2 Mon Sep 17 00:00:00 2001 From: Paul Tomasula <31142705+ptomasula@users.noreply.github.com> Date: Fri, 3 Dec 2021 12:44:15 -0500 Subject: [PATCH 03/19] First cut IO abstraction --- HSP2/GENER.py | 5 +- HSP2/main.py | 443 +++++++++++++++++++++----------------------- HSP2/utilities.py | 18 +- HSP2IO/hdf.py | 107 +++++++++++ HSP2IO/protocols.py | 27 ++- 5 files changed, 342 insertions(+), 258 deletions(-) create mode 100644 HSP2IO/hdf.py diff --git a/HSP2/GENER.py b/HSP2/GENER.py index 62716b2f..917e7ab1 100644 --- a/HSP2/GENER.py +++ b/HSP2/GENER.py @@ -1,9 +1,6 @@ -from numba import njit import numpy as np import pandas as pd -from typing import Dict, List - -from HSP2.utilities import get_timeseries +from typing import Dict class Gener(): """ diff --git a/HSP2/main.py b/HSP2/main.py index b4340098..4b01eb4b 100644 --- a/HSP2/main.py +++ b/HSP2/main.py @@ -16,210 +16,226 @@ from HSP2.utilities import transform, versions, get_timeseries, expand_timeseries_names from HSP2.configuration import activities, noop, expand_masslinks +from HSP2IO.protocols import Category, ReadableUCI, WriteableTimeseries, ReadableTimeseries + from typing import List -def main(hdfname, saveall=False, jupyterlab=True): +def main(uci: ReadableUCI, + input_timeseries: ReadableTimeseries, + output_timeseries:WriteableTimeseries, + saveall:bool=False, + jupyterlab:bool=True,) -> None: """Runs main HSP2 program. Parameters ---------- + uci: ReadableUCI + A class implementing ReadableUCI protocol hdfname: str HDF5 (path) filename used for both input and output. saveall: Boolean [optional] Default is False. Saves all calculated data ignoring SAVE tables. + + Return + ------------ + None + """ + hdfname = './' if not os.path.exists(hdfname): raise FileNotFoundError(f'{hdfname} HDF5 File Not Found') - with HDFStore(hdfname, 'a') as store: - msg = messages() - msg(1, f'Processing started for file {hdfname}; saveall={saveall}') - - # read user control, parameters, states, and flags from HDF5 file - opseq, ddlinks, ddmasslinks, ddext_sources, ddgener, uci, siminfo = get_uci(store) - start, stop = siminfo['start'], siminfo['stop'] - - copy_instances = {} - gener_instances = {} - - # main processing loop - msg(1, f'Simulation Start: {start}, Stop: {stop}') - tscat = {} - for _, operation, segment, delt in opseq.itertuples(): - msg(2, f'{operation} {segment} DELT(minutes): {delt}') - siminfo['delt'] = delt - siminfo['tindex'] = date_range(start, stop, freq=Minute(delt))[1:] - siminfo['steps'] = len(siminfo['tindex']) - - if operation == 'COPY': - copy_instances[segment] = activities[operation](store, siminfo, ddext_sources[(operation,segment)]) - elif operation == 'GENER': - try: - gener_instances[segment] = activities[operation](segment, copy_instances, gener_instances, ddlinks, ddgener) - except NotImplementedError as e: - print(f"GENER '{segment}' encountered unsupported feature during initialization and may not function correctly. Unsupported feature: '{e}'") - else: - - # now conditionally execute all activity modules for the op, segment - ts = get_timeseries(store,ddext_sources[(operation,segment)],siminfo) - ts = get_gener_timeseries(ts, gener_instances, ddlinks[segment]) - flags = uci[(operation, 'GENERAL', segment)]['ACTIVITY'] + store = uci._store + msg = messages() + msg(1, f'Processing started for file {hdfname}; saveall={saveall}') + + # read user control, parameters, states, and flags from HDF5 file + uci_parameters = uci.read_uci() + opseq, ddlinks, ddmasslinks, ddext_sources, ddgener, uci, siminfo = uci_parameters + + start, stop = siminfo['start'], siminfo['stop'] + + copy_instances = {} + gener_instances = {} + + # main processing loop + msg(1, f'Simulation Start: {start}, Stop: {stop}') + tscat = {} + for _, operation, segment, delt in opseq.itertuples(): + msg(2, f'{operation} {segment} DELT(minutes): {delt}') + siminfo['delt'] = delt + siminfo['tindex'] = date_range(start, stop, freq=Minute(delt))[1:] + siminfo['steps'] = len(siminfo['tindex']) + + if operation == 'COPY': + copy_instances[segment] = activities[operation](store, siminfo, ddext_sources[(operation,segment)]) + elif operation == 'GENER': + try: + gener_instances[segment] = activities[operation](segment, copy_instances, gener_instances, ddlinks, ddgener) + except NotImplementedError as e: + print(f"GENER '{segment}' encountered unsupported feature during initialization and may not function correctly. Unsupported feature: '{e}'") + else: + + # now conditionally execute all activity modules for the op, segment + ts = get_timeseries(input_timeseries,ddext_sources[(operation,segment)],siminfo) + ts = get_gener_timeseries(ts, gener_instances, ddlinks[segment]) + flags = uci[(operation, 'GENERAL', segment)]['ACTIVITY'] + if operation == 'RCHRES': + # Add nutrient adsorption flags: + if flags['NUTRX'] == 1: + flags['TAMFG'] = uci[(operation, 'NUTRX', segment)]['FLAGS']['NH3FG'] + flags['ADNHFG'] = uci[(operation, 'NUTRX', segment)]['FLAGS']['ADNHFG'] + flags['PO4FG'] = uci[(operation, 'NUTRX', segment)]['FLAGS']['PO4FG'] + flags['ADPOFG'] = uci[(operation, 'NUTRX', segment)]['FLAGS']['ADPOFG'] + + get_flows(store, ts, tscat, flags, uci, segment, ddlinks, ddmasslinks, siminfo['steps'], msg) + + for activity, function in activities[operation].items(): + if function == noop: #or not flags[activity]: + continue + + if (activity in flags) and (not flags[activity]): + continue + + msg(3, f'{activity}') + + ui = uci[(operation, activity, segment)] # ui is a dictionary + if operation == 'PERLND' and activity == 'SEDMNT': + # special exception here to make CSNOFG available + ui['PARAMETERS']['CSNOFG'] = uci[(operation, 'PWATER', segment)]['PARAMETERS']['CSNOFG'] + if operation == 'PERLND' and activity == 'PSTEMP': + # special exception here to make AIRTFG available + ui['PARAMETERS']['AIRTFG'] = flags['ATEMP'] + if operation == 'PERLND' and activity == 'PWTGAS': + # special exception here to make CSNOFG available + ui['PARAMETERS']['CSNOFG'] = uci[(operation, 'PWATER', segment)]['PARAMETERS']['CSNOFG'] if operation == 'RCHRES': - # Add nutrient adsorption flags: - if flags['NUTRX'] == 1: - flags['TAMFG'] = uci[(operation, 'NUTRX', segment)]['FLAGS']['NH3FG'] - flags['ADNHFG'] = uci[(operation, 'NUTRX', segment)]['FLAGS']['ADNHFG'] - flags['PO4FG'] = uci[(operation, 'NUTRX', segment)]['FLAGS']['PO4FG'] - flags['ADPOFG'] = uci[(operation, 'NUTRX', segment)]['FLAGS']['ADPOFG'] + if not 'PARAMETERS' in ui: + ui['PARAMETERS'] = {} + ui['PARAMETERS']['NEXITS'] = uci[(operation, 'HYDR', segment)]['PARAMETERS']['NEXITS'] + if activity == 'ADCALC': + ui['PARAMETERS']['ADFG'] = flags['ADCALC'] + ui['PARAMETERS']['KS'] = uci[(operation, 'HYDR', segment)]['PARAMETERS']['KS'] + ui['PARAMETERS']['VOL'] = uci[(operation, 'HYDR', segment)]['STATES']['VOL'] + ui['PARAMETERS']['ROS'] = uci[(operation, 'HYDR', segment)]['PARAMETERS']['ROS'] + if activity == 'HTRCH': + ui['PARAMETERS']['ADFG'] = flags['ADCALC'] + ui['advectData'] = uci[(operation, 'ADCALC', segment)]['adcalcData'] + # ui['STATES']['VOL'] = uci[(operation, 'HYDR', segment)]['STATES']['VOL'] + if activity == 'CONS': + ui['advectData'] = uci[(operation, 'ADCALC', segment)]['adcalcData'] + if activity == 'SEDTRN': + ui['PARAMETERS']['ADFG'] = flags['ADCALC'] + ui['advectData'] = uci[(operation, 'ADCALC', segment)]['adcalcData'] + # ui['STATES']['VOL'] = uci[(operation, 'HYDR', segment)]['STATES']['VOL'] + ui['PARAMETERS']['HTFG'] = flags['HTRCH'] + ui['PARAMETERS']['AUX3FG'] = 0 + if flags['HYDR']: + ui['PARAMETERS']['LEN'] = uci[(operation, 'HYDR', segment)]['PARAMETERS']['LEN'] + ui['PARAMETERS']['DELTH'] = uci[(operation, 'HYDR', segment)]['PARAMETERS']['DELTH'] + ui['PARAMETERS']['DB50'] = uci[(operation, 'HYDR', segment)]['PARAMETERS']['DB50'] + ui['PARAMETERS']['AUX3FG'] = uci[(operation, 'HYDR', segment)]['PARAMETERS']['AUX3FG'] + if activity == 'GQUAL': + ui['advectData'] = uci[(operation, 'ADCALC', segment)]['adcalcData'] + ui['PARAMETERS']['HTFG'] = flags['HTRCH'] + ui['PARAMETERS']['SEDFG'] = flags['SEDTRN'] + # ui['PARAMETERS']['REAMFG'] = uci[(operation, 'OXRX', segment)]['PARAMETERS']['REAMFG'] + ui['PARAMETERS']['HYDRFG'] = flags['HYDR'] + if flags['HYDR']: + ui['PARAMETERS']['LKFG'] = uci[(operation, 'HYDR', segment)]['PARAMETERS']['LKFG'] + ui['PARAMETERS']['AUX1FG'] = uci[(operation, 'HYDR', segment)]['PARAMETERS']['AUX1FG'] + ui['PARAMETERS']['AUX2FG'] = uci[(operation, 'HYDR', segment)]['PARAMETERS']['AUX2FG'] + ui['PARAMETERS']['LEN'] = uci[(operation, 'HYDR', segment)]['PARAMETERS']['LEN'] + ui['PARAMETERS']['DELTH'] = uci[(operation, 'HYDR', segment)]['PARAMETERS']['DELTH'] + if flags['OXRX']: + ui['PARAMETERS']['LKFG'] = uci[(operation, 'HYDR', segment)]['PARAMETERS']['LKFG'] + ui['PARAMETERS']['CFOREA'] = uci[(operation, 'OXRX', segment)]['PARAMETERS']['CFOREA'] + if flags['SEDTRN']: + ui['PARAMETERS']['SSED1'] = uci[(operation, 'SEDTRN', segment)]['STATES']['SSED1'] + ui['PARAMETERS']['SSED2'] = uci[(operation, 'SEDTRN', segment)]['STATES']['SSED2'] + ui['PARAMETERS']['SSED3'] = uci[(operation, 'SEDTRN', segment)]['STATES']['SSED3'] + if flags['HTRCH']: + ui['PARAMETERS']['CFSAEX'] = uci[(operation, 'HTRCH', segment)]['PARAMETERS']['CFSAEX'] + elif flags['PLANK']: + if 'CFSAEX' in uci[(operation, 'PLANK', segment)]['PARAMETERS']: + ui['PARAMETERS']['CFSAEX'] = uci[(operation, 'PLANK', segment)]['PARAMETERS']['CFSAEX'] - get_flows(store, ts, tscat, flags, uci, segment, ddlinks, ddmasslinks, siminfo['steps'], msg) - - for activity, function in activities[operation].items(): - if function == noop: #or not flags[activity]: - continue - - if (activity in flags) and (not flags[activity]): - continue - - msg(3, f'{activity}') - - ui = uci[(operation, activity, segment)] # ui is a dictionary - if operation == 'PERLND' and activity == 'SEDMNT': - # special exception here to make CSNOFG available - ui['PARAMETERS']['CSNOFG'] = uci[(operation, 'PWATER', segment)]['PARAMETERS']['CSNOFG'] - if operation == 'PERLND' and activity == 'PSTEMP': - # special exception here to make AIRTFG available - ui['PARAMETERS']['AIRTFG'] = flags['ATEMP'] - if operation == 'PERLND' and activity == 'PWTGAS': - # special exception here to make CSNOFG available - ui['PARAMETERS']['CSNOFG'] = uci[(operation, 'PWATER', segment)]['PARAMETERS']['CSNOFG'] - if operation == 'RCHRES': - if not 'PARAMETERS' in ui: - ui['PARAMETERS'] = {} - ui['PARAMETERS']['NEXITS'] = uci[(operation, 'HYDR', segment)]['PARAMETERS']['NEXITS'] - if activity == 'ADCALC': - ui['PARAMETERS']['ADFG'] = flags['ADCALC'] - ui['PARAMETERS']['KS'] = uci[(operation, 'HYDR', segment)]['PARAMETERS']['KS'] - ui['PARAMETERS']['VOL'] = uci[(operation, 'HYDR', segment)]['STATES']['VOL'] - ui['PARAMETERS']['ROS'] = uci[(operation, 'HYDR', segment)]['PARAMETERS']['ROS'] - if activity == 'HTRCH': - ui['PARAMETERS']['ADFG'] = flags['ADCALC'] - ui['advectData'] = uci[(operation, 'ADCALC', segment)]['adcalcData'] - # ui['STATES']['VOL'] = uci[(operation, 'HYDR', segment)]['STATES']['VOL'] - if activity == 'CONS': - ui['advectData'] = uci[(operation, 'ADCALC', segment)]['adcalcData'] - if activity == 'SEDTRN': - ui['PARAMETERS']['ADFG'] = flags['ADCALC'] - ui['advectData'] = uci[(operation, 'ADCALC', segment)]['adcalcData'] - # ui['STATES']['VOL'] = uci[(operation, 'HYDR', segment)]['STATES']['VOL'] - ui['PARAMETERS']['HTFG'] = flags['HTRCH'] - ui['PARAMETERS']['AUX3FG'] = 0 - if flags['HYDR']: - ui['PARAMETERS']['LEN'] = uci[(operation, 'HYDR', segment)]['PARAMETERS']['LEN'] - ui['PARAMETERS']['DELTH'] = uci[(operation, 'HYDR', segment)]['PARAMETERS']['DELTH'] - ui['PARAMETERS']['DB50'] = uci[(operation, 'HYDR', segment)]['PARAMETERS']['DB50'] - ui['PARAMETERS']['AUX3FG'] = uci[(operation, 'HYDR', segment)]['PARAMETERS']['AUX3FG'] - if activity == 'GQUAL': - ui['advectData'] = uci[(operation, 'ADCALC', segment)]['adcalcData'] - ui['PARAMETERS']['HTFG'] = flags['HTRCH'] - ui['PARAMETERS']['SEDFG'] = flags['SEDTRN'] - # ui['PARAMETERS']['REAMFG'] = uci[(operation, 'OXRX', segment)]['PARAMETERS']['REAMFG'] - ui['PARAMETERS']['HYDRFG'] = flags['HYDR'] - if flags['HYDR']: - ui['PARAMETERS']['LKFG'] = uci[(operation, 'HYDR', segment)]['PARAMETERS']['LKFG'] - ui['PARAMETERS']['AUX1FG'] = uci[(operation, 'HYDR', segment)]['PARAMETERS']['AUX1FG'] - ui['PARAMETERS']['AUX2FG'] = uci[(operation, 'HYDR', segment)]['PARAMETERS']['AUX2FG'] - ui['PARAMETERS']['LEN'] = uci[(operation, 'HYDR', segment)]['PARAMETERS']['LEN'] - ui['PARAMETERS']['DELTH'] = uci[(operation, 'HYDR', segment)]['PARAMETERS']['DELTH'] - if flags['OXRX']: - ui['PARAMETERS']['LKFG'] = uci[(operation, 'HYDR', segment)]['PARAMETERS']['LKFG'] - ui['PARAMETERS']['CFOREA'] = uci[(operation, 'OXRX', segment)]['PARAMETERS']['CFOREA'] - if flags['SEDTRN']: - ui['PARAMETERS']['SSED1'] = uci[(operation, 'SEDTRN', segment)]['STATES']['SSED1'] - ui['PARAMETERS']['SSED2'] = uci[(operation, 'SEDTRN', segment)]['STATES']['SSED2'] - ui['PARAMETERS']['SSED3'] = uci[(operation, 'SEDTRN', segment)]['STATES']['SSED3'] - if flags['HTRCH']: - ui['PARAMETERS']['CFSAEX'] = uci[(operation, 'HTRCH', segment)]['PARAMETERS']['CFSAEX'] - elif flags['PLANK']: - if 'CFSAEX' in uci[(operation, 'PLANK', segment)]['PARAMETERS']: - ui['PARAMETERS']['CFSAEX'] = uci[(operation, 'PLANK', segment)]['PARAMETERS']['CFSAEX'] + if activity == 'RQUAL': + # RQUAL inputs: + ui['advectData'] = uci[(operation, 'ADCALC', segment)]['adcalcData'] + if flags['HYDR']: + ui['PARAMETERS']['LKFG'] = uci[(operation, 'HYDR', segment)]['PARAMETERS']['LKFG'] + + ui['FLAGS']['HTFG'] = flags['HTRCH'] + ui['FLAGS']['SEDFG'] = flags['SEDTRN'] + ui['FLAGS']['GQFG'] = flags['GQUAL'] + ui['FLAGS']['OXFG'] = flags['OXFG'] + ui['FLAGS']['NUTFG'] = flags['NUTRX'] + ui['FLAGS']['PLKFG'] = flags['PLANK'] + ui['FLAGS']['PHFG'] = flags['PHCARB'] + if flags['CONS']: + if 'PARAMETERS' in uci[(operation, 'CONS', segment)]: + if 'NCONS' in uci[(operation, 'CONS', segment)]['PARAMETERS']: + ui['PARAMETERS']['NCONS'] = uci[(operation, 'CONS', segment)]['PARAMETERS']['NCONS'] + + # OXRX module inputs: + ui_oxrx = uci[(operation, 'OXRX', segment)] + + if flags['HYDR']: + ui_oxrx['PARAMETERS']['LEN'] = uci[(operation, 'HYDR', segment)]['PARAMETERS']['LEN'] + ui_oxrx['PARAMETERS']['DELTH'] = uci[(operation, 'HYDR', segment)]['PARAMETERS']['DELTH'] - if activity == 'RQUAL': - # RQUAL inputs: - ui['advectData'] = uci[(operation, 'ADCALC', segment)]['adcalcData'] - if flags['HYDR']: - ui['PARAMETERS']['LKFG'] = uci[(operation, 'HYDR', segment)]['PARAMETERS']['LKFG'] - - ui['FLAGS']['HTFG'] = flags['HTRCH'] - ui['FLAGS']['SEDFG'] = flags['SEDTRN'] - ui['FLAGS']['GQFG'] = flags['GQUAL'] - ui['FLAGS']['OXFG'] = flags['OXFG'] - ui['FLAGS']['NUTFG'] = flags['NUTRX'] - ui['FLAGS']['PLKFG'] = flags['PLANK'] - ui['FLAGS']['PHFG'] = flags['PHCARB'] - if flags['CONS']: - if 'PARAMETERS' in uci[(operation, 'CONS', segment)]: - if 'NCONS' in uci[(operation, 'CONS', segment)]['PARAMETERS']: - ui['PARAMETERS']['NCONS'] = uci[(operation, 'CONS', segment)]['PARAMETERS']['NCONS'] - - # OXRX module inputs: - ui_oxrx = uci[(operation, 'OXRX', segment)] - - if flags['HYDR']: - ui_oxrx['PARAMETERS']['LEN'] = uci[(operation, 'HYDR', segment)]['PARAMETERS']['LEN'] - ui_oxrx['PARAMETERS']['DELTH'] = uci[(operation, 'HYDR', segment)]['PARAMETERS']['DELTH'] - - if flags['HTRCH']: - ui_oxrx['PARAMETERS']['ELEV'] = uci[(operation, 'HTRCH', segment)]['PARAMETERS']['ELEV'] - - if flags['SEDTRN']: - ui['PARAMETERS']['SSED1'] = uci[(operation, 'SEDTRN', segment)]['STATES']['SSED1'] - ui['PARAMETERS']['SSED2'] = uci[(operation, 'SEDTRN', segment)]['STATES']['SSED2'] - ui['PARAMETERS']['SSED3'] = uci[(operation, 'SEDTRN', segment)]['STATES']['SSED3'] - - # PLANK module inputs: - if flags['HTRCH']: - ui['PARAMETERS']['CFSAEX'] = uci[(operation, 'HTRCH', segment)]['PARAMETERS']['CFSAEX'] - - # NUTRX, PLANK, PHCARB module inputs: - ui_nutrx = uci[(operation, 'NUTRX', segment)] - ui_plank = uci[(operation, 'PLANK', segment)] - ui_phcarb = uci[(operation, 'PHCARB', segment)] - - ############ calls activity function like snow() ############## - if operation not in ['COPY','GENER']: - if (activity != 'RQUAL'): - errors, errmessages = function(store, siminfo, ui, ts) - else: - errors, errmessages = function(store, siminfo, ui, ui_oxrx, ui_nutrx, ui_plank, ui_phcarb, ts) - ############################################################### - - for errorcnt, errormsg in zip(errors, errmessages): - if errorcnt > 0: - msg(4, f'Error count {errorcnt}: {errormsg}') - if 'SAVE' in ui: - save_timeseries(store,ts,ui['SAVE'],siminfo,saveall,operation,segment,activity,jupyterlab) - - if (activity == 'RQUAL'): - if 'SAVE' in ui_oxrx: save_timeseries(store,ts,ui_oxrx['SAVE'],siminfo,saveall,operation,segment,'OXRX',jupyterlab) - if 'SAVE' in ui_nutrx and flags['NUTRX'] == 1: save_timeseries(store,ts,ui_nutrx['SAVE'],siminfo,saveall,operation,segment,'NUTRX',jupyterlab) - if 'SAVE' in ui_plank and flags['PLANK'] == 1: save_timeseries(store,ts,ui_plank['SAVE'],siminfo,saveall,operation,segment,'PLANK',jupyterlab) - if 'SAVE' in ui_phcarb and flags['PHCARB'] == 1: save_timeseries(store,ts,ui_phcarb['SAVE'],siminfo,saveall,operation,segment,'PHCARB',jupyterlab) + if flags['HTRCH']: + ui_oxrx['PARAMETERS']['ELEV'] = uci[(operation, 'HTRCH', segment)]['PARAMETERS']['ELEV'] + + if flags['SEDTRN']: + ui['PARAMETERS']['SSED1'] = uci[(operation, 'SEDTRN', segment)]['STATES']['SSED1'] + ui['PARAMETERS']['SSED2'] = uci[(operation, 'SEDTRN', segment)]['STATES']['SSED2'] + ui['PARAMETERS']['SSED3'] = uci[(operation, 'SEDTRN', segment)]['STATES']['SSED3'] + + # PLANK module inputs: + if flags['HTRCH']: + ui['PARAMETERS']['CFSAEX'] = uci[(operation, 'HTRCH', segment)]['PARAMETERS']['CFSAEX'] + + # NUTRX, PLANK, PHCARB module inputs: + ui_nutrx = uci[(operation, 'NUTRX', segment)] + ui_plank = uci[(operation, 'PLANK', segment)] + ui_phcarb = uci[(operation, 'PHCARB', segment)] + + ############ calls activity function like snow() ############## + if operation not in ['COPY','GENER']: + if (activity != 'RQUAL'): + errors, errmessages = function(store, siminfo, ui, ts) + else: + errors, errmessages = function(store, siminfo, ui, ui_oxrx, ui_nutrx, ui_plank, ui_phcarb, ts) + ############################################################### + + for errorcnt, errormsg in zip(errors, errmessages): + if errorcnt > 0: + msg(4, f'Error count {errorcnt}: {errormsg}') + if 'SAVE' in ui: + save_timeseries(output_timeseries,ts,ui['SAVE'],siminfo,saveall,operation,segment,activity,jupyterlab) + + if (activity == 'RQUAL'): + if 'SAVE' in ui_oxrx: save_timeseries(output_timeseries,ts,ui_oxrx['SAVE'],siminfo,saveall,operation,segment,'OXRX',jupyterlab) + if 'SAVE' in ui_nutrx and flags['NUTRX'] == 1: save_timeseries(output_timeseries,ts,ui_nutrx['SAVE'],siminfo,saveall,operation,segment,'NUTRX',jupyterlab) + if 'SAVE' in ui_plank and flags['PLANK'] == 1: save_timeseries(output_timeseries,ts,ui_plank['SAVE'],siminfo,saveall,operation,segment,'PLANK',jupyterlab) + if 'SAVE' in ui_phcarb and flags['PHCARB'] == 1: save_timeseries(output_timeseries,ts,ui_phcarb['SAVE'],siminfo,saveall,operation,segment,'PHCARB',jupyterlab) # before going on to the next operation, save the ts dict for later use tscat[segment] = ts - msglist = msg(1, 'Done', final=True) + msglist = msg(1, 'Done', final=True) - df = DataFrame(msglist, columns=['logfile']) - df.to_hdf(store, 'RUN_INFO/LOGFILE', data_columns=True, format='t') + df = DataFrame(msglist, columns=['logfile']) + df.to_hdf(store, 'RUN_INFO/LOGFILE', data_columns=True, format='t') - if jupyterlab: - df = versions(['jupyterlab', 'notebook']) - df.to_hdf(store, 'RUN_INFO/VERSIONS', data_columns=True, format='t') - print('\n\n', df) + if jupyterlab: + df = versions(['jupyterlab', 'notebook']) + df.to_hdf(store, 'RUN_INFO/VERSIONS', data_columns=True, format='t') + print('\n\n', df) return def messages(): @@ -238,58 +254,7 @@ def msg(indent, message, final=False): return mlist return msg - -def get_uci(store): - # read user control and user data from HDF5 file - uci = defaultdict(dict) - ddlinks = defaultdict(list) - ddmasslinks = defaultdict(list) - ddext_sources = defaultdict(list) - ddgener =defaultdict(dict) - siminfo = {} - opseq = 0 - - for path in store.keys(): # finds ALL data sets into HDF5 file - op, module, *other = path[1:].split(sep='/', maxsplit=3) - s = '_'.join(other) - if op == 'CONTROL': - if module =='GLOBAL': - temp = store[path].to_dict()['Info'] - siminfo['start'] = Timestamp(temp['Start']) - siminfo['stop'] = Timestamp(temp['Stop']) - siminfo['units'] = 1 - if 'Units' in temp: - if int(temp['Units']): - siminfo['units'] = int(temp['Units']) - elif module == 'LINKS': - for row in store[path].fillna('').itertuples(): - if row.TVOLNO != '': - ddlinks[f'{row.TVOLNO}'].append(row) - else: - ddlinks[f'{row.TOPFST}'].append(row) - - elif module == 'MASS_LINKS': - for row in store[path].replace('na','').itertuples(): - ddmasslinks[row.MLNO].append(row) - elif module == 'EXT_SOURCES': - for row in store[path].replace('na','').itertuples(): - ddext_sources[(row.TVOL, row.TVOLNO)].append(row) - elif module == 'OP_SEQUENCE': - opseq = store[path] - elif op in {'PERLND', 'IMPLND', 'RCHRES'}: - for id, vdict in store[path].to_dict('index').items(): - uci[(op, module, id)][s] = vdict - elif op == 'GENER': - for row in store[path].itertuples(): - if len(row.OPNID.split()) == 1: - start = int(row.OPNID) - stop = start - else: - start, stop = row.OPNID.split() - for i in range(int(start), int(stop)+1): ddgener[module][f'G{i:03d}'] = row[2] - return opseq, ddlinks, ddmasslinks, ddext_sources, ddgener, uci, siminfo - -def save_timeseries(store, ts, savedict, siminfo, saveall, operation, segment, activity, jupyterlab=True): +def save_timeseries(timeseries:WriteableTimeseries, ts, savedict, siminfo, saveall, operation, segment, activity, jupyterlab=True): # save computed timeseries (at computation DELT) save = {k for k,v in savedict.items() if v or saveall} df = DataFrame(index=siminfo['tindex']) @@ -317,10 +282,18 @@ def save_timeseries(store, ts, savedict, siminfo, saveall, operation, segment, a df = df.astype(float32).sort_index(axis='columns') path = f'RESULTS/{operation}_{segment}/{activity}' if not df.empty: - if jupyterlab: - df.to_hdf(store, path, complib='blosc', complevel=9) # This is the official version - else: - df.to_hdf(store, path, format='t', data_columns=True) # show the columns in HDFView + timeseries.write_timeseries( + data_frame=df, + category = Category.RESULTS, + operation=operation, + segment=segment, + activity=activity, + compress=jupyterlab + ) + #if jupyterlab: + # df.to_hdf(store, path, complib='blosc', complevel=9) # This is the official version + #else: + # df.to_hdf(store, path, format='t', data_columns=True) # show the columns in HDFView else: print('Save DataFrame Empty for', path) return diff --git a/HSP2/utilities.py b/HSP2/utilities.py index 487b110d..4e632ba0 100644 --- a/HSP2/utilities.py +++ b/HSP2/utilities.py @@ -10,6 +10,8 @@ from numba import types from numba.typed import Dict +from HSP2IO.protocols import Category, ReadableTimeseries + flowtype = { # EXTERNAL FLOWS @@ -230,24 +232,16 @@ def versions(import_list=[]): str(datetime.datetime.now())[0:19]]) return pandas.DataFrame(data, index=names, columns=['version']) -def get_timeseries(store, ext_sourcesdd, siminfo): +def get_timeseries(timeseries_inputs:ReadableTimeseries, ext_sourcesdd, siminfo): ''' makes timeseries for the current timestep and trucated to the sim interval''' # explicit creation of Numba dictionary with signatures ts = Dict.empty(key_type=types.unicode_type, value_type=types.float64[:]) for row in ext_sourcesdd: - if row.SVOL == '*': - path = f'TIMESERIES/{row.SVOLNO}' - if path in store: - temp1 = store[path] - else: - print('Get Timeseries ERROR for', path) - continue - else: - temp1 = read_hdf(row.SVOL, path) + data_frame = timeseries_inputs.read_timeseries(category=Category.INPUTS,segment=row.SVOLNO) if row.MFACTOR != 1.0: - temp1 *= row.MFACTOR - t = transform(temp1, row.TMEMN, row.TRAN, siminfo) + data_frame *= row.MFACTOR + t = transform(data_frame, row.TMEMN, row.TRAN, siminfo) # in some cases the subscript is irrelevant, like '1' or '1 1', and we can leave it off. # there are other cases where it is needed to distinguish, such as ISED and '1' or '1 1'. diff --git a/HSP2IO/hdf.py b/HSP2IO/hdf.py new file mode 100644 index 00000000..7b04160e --- /dev/null +++ b/HSP2IO/hdf.py @@ -0,0 +1,107 @@ +import pandas as pd +from pandas.io.pytables import read_hdf +from HSP2IO.protocols import UCITuple, Category +from collections import defaultdict +from typing import Union, Any + + + +class HDF5(): + + def __init__(self, file_path:str) -> None: + self.file_path = file_path + self._store = pd.HDFStore(file_path) + None + + def __del__(self): + self._store.close() + + def read_uci(self) -> UCITuple: + """Read UCI related tables + + Parameters: None + + Returns: UCITuple + + """ + uci = defaultdict(dict) + ddlinks = defaultdict(list) + ddmasslinks = defaultdict(list) + ddext_sources = defaultdict(list) + ddgener =defaultdict(dict) + siminfo = {} + opseq = 0 + + for path in self._store.keys(): # finds ALL data sets into HDF5 file + op, module, *other = path[1:].split(sep='/', maxsplit=3) + s = '_'.join(other) + if op == 'CONTROL': + if module =='GLOBAL': + temp = self._store[path].to_dict()['Info'] + siminfo['start'] = pd.Timestamp(temp['Start']) + siminfo['stop'] = pd.Timestamp(temp['Stop']) + siminfo['units'] = 1 + if 'Units' in temp: + if int(temp['Units']): + siminfo['units'] = int(temp['Units']) + elif module == 'LINKS': + for row in self._store[path].fillna('').itertuples(): + if row.TVOLNO != '': + ddlinks[f'{row.TVOLNO}'].append(row) + else: + ddlinks[f'{row.TOPFST}'].append(row) + + elif module == 'MASS_LINKS': + for row in self._store[path].replace('na','').itertuples(): + ddmasslinks[row.MLNO].append(row) + elif module == 'EXT_SOURCES': + for row in self._store[path].replace('na','').itertuples(): + ddext_sources[(row.TVOL, row.TVOLNO)].append(row) + elif module == 'OP_SEQUENCE': + opseq = self._store[path] + elif op in {'PERLND', 'IMPLND', 'RCHRES'}: + for id, vdict in self._store[path].to_dict('index').items(): + uci[(op, module, id)][s] = vdict + elif op == 'GENER': + for row in self._store[path].itertuples(): + if len(row.OPNID.split()) == 1: + start = int(row.OPNID) + stop = start + else: + start, stop = row.OPNID.split() + for i in range(int(start), int(stop)+1): ddgener[module][f'G{i:03d}'] = row[2] + return (opseq, ddlinks, ddmasslinks, ddext_sources, ddgener, uci, siminfo) + + def read_timeseries(self, + category:Category, + operation:Union[str,None]=None, + segment:Union[str,None]=None, + activity:Union[str,None]=None) -> pd.DataFrame: + path = '' + if category == category.INPUTS: + path = f'TIMESERIES/{segment}' + elif category == category.RESULTS: + path = f'RESULTS/{operation}_{segment}/{activity}' + print(f'Reading from: {path}') + return read_hdf(self._store, path) + + def write_timeseries(self, + data_frame:pd.DataFrame, + category: Category, + operation:str, + segment:str, + activity:str, + *args:Any, + **kwargs:Any) -> None: + """Saves timeseries to HDF5""" + path=f'{operation}_{segment}/{activity}' + if category: + path = 'RESULTS/' + path + complevel = None + if 'compress' in kwargs: + if kwargs['compress']: + complevel = 9 + print(f'writing to: {path}') + data_frame.to_hdf(self._store, path, format='t', data_columns=True, complevel=complevel) + #data_frame.to_hdf(self._store, path) + diff --git a/HSP2IO/protocols.py b/HSP2IO/protocols.py index 8bd041e9..da77e1b7 100644 --- a/HSP2IO/protocols.py +++ b/HSP2IO/protocols.py @@ -1,12 +1,17 @@ -from typing import Protocol, Tuple, Dict, Any, List +from typing import Protocol, Dict, Any, List, Union from collections import defaultdict import pandas as pd import numpy as np -from pandas.core.frame import Pandas +from enum import Enum + UCITuple = [defaultdict(dict), defaultdict(list), defaultdict(list), defaultdict(list), defaultdict(dict), dict, int] -TimeSeriesDict = Dict[np.types.unicode_type,np.types.float64] +TimeSeriesDict = Dict[str,np.float64] + +class Category(Enum): + RESULTS = 'RESULT' + INPUTS = 'INPUT' class ReadableUCI(Protocol): def read_uci(self) -> UCITuple: @@ -16,14 +21,22 @@ class WriteableUCI(Protocol): def write_uci(self, UCITuple) -> None: ... -class ReadableTSStorage(Protocol): - def read_timeseries(self, ext_sourcesdd:List[Pandas], siminfo:Dict[str,Any]) -> TimeSeriesDict: +class ReadableTimeseries(Protocol): + def read_timeseries(self, + category:Category, + operation:Union[str,None]=None, + segment:Union[str,None]=None, + activity:Union[str,None]=None) -> pd.DataFrame: ... class WriteableTimeseries(Protocol): - def write_timeseries(self, ts:TimeSeriesDict, siminfo:Dict[str,Any], saveall:bool, - operation:str, segment:str, activity:str) -> None: + def write_timeseries(self, + data_frame:pd.DataFrame, + category:Category, + operation:Union[str,None]=None, + segment:Union[str,None]=None, + activity:Union[str,None]=None) -> None: ... ### Potentially need to add get_flows method as well \ No newline at end of file From f2987ac758d0d79b4cc6660359859e2577e605ab Mon Sep 17 00:00:00 2001 From: Paul Tomasula <31142705+ptomasula@users.noreply.github.com> Date: Fri, 3 Dec 2021 14:45:01 -0500 Subject: [PATCH 04/19] Rename protocols to SupportsXXX --- HSP2/COPY.py | 6 +++-- HSP2/main.py | 62 ++++++++++++++++++++++++++++----------------- HSP2/utilities.py | 4 +-- HSP2IO/hdf.py | 2 -- HSP2IO/protocols.py | 14 ++++++---- 5 files changed, 54 insertions(+), 34 deletions(-) diff --git a/HSP2/COPY.py b/HSP2/COPY.py index f13bf906..f2f99588 100644 --- a/HSP2/COPY.py +++ b/HSP2/COPY.py @@ -2,6 +2,8 @@ import pandas as pd from typing import List, Dict +from HSP2IO.protocols import SupportsReadTS + class Copy(): """ Partial implementation of the COPY module. @@ -10,12 +12,12 @@ class Copy(): This functionality is not currently implemented, presently only loading from EXT SOURCES """ - def __init__(self, store: pd.HDFStore, sim_info: Dict, ext_sources: List) -> None: + def __init__(self, io:SupportsReadTS, sim_info: Dict, ext_sources: List) -> None: self._ts = {} self._ts['MEAN'] = {} self._ts['POINT'] = {} - ts = get_timeseries(store, ext_sources, sim_info) + ts = get_timeseries(io, ext_sources, sim_info) for source in ext_sources: themn = source.TMEMN themsb = source.TMEMSB diff --git a/HSP2/main.py b/HSP2/main.py index 4b01eb4b..a52e13f2 100644 --- a/HSP2/main.py +++ b/HSP2/main.py @@ -16,43 +16,59 @@ from HSP2.utilities import transform, versions, get_timeseries, expand_timeseries_names from HSP2.configuration import activities, noop, expand_masslinks -from HSP2IO.protocols import Category, ReadableUCI, WriteableTimeseries, ReadableTimeseries +from HSP2IO.protocols import Category, SupportsReadTS, SupportsWriteTS, SupportsReadUCI -from typing import List +from typing import List, Union -def main(uci: ReadableUCI, - input_timeseries: ReadableTimeseries, - output_timeseries:WriteableTimeseries, +def main( + io_all: Union[SupportsReadUCI, SupportsReadTS, SupportsWriteTS, None] = None, + io_uci: Union[SupportsReadUCI,None]=None, + io_input: Union[SupportsReadTS,None]=None, + io_output: Union[SupportsReadTS,SupportsWriteTS,None]=None, saveall:bool=False, - jupyterlab:bool=True,) -> None: + jupyterlab:bool=True) -> None: """Runs main HSP2 program. Parameters ---------- - uci: ReadableUCI - A class implementing ReadableUCI protocol - hdfname: str - HDF5 (path) filename used for both input and output. - saveall: Boolean - [optional] Default is False. + io_all: SupportsReadUCI, SupportsReadTS, SupportsWriteTS/None + This parameter is intended to allow users with a single file that + combined UCI, Input and Output a short cut to specify a single argument. + io_uci: SupportsReadUCI/None (Default None) + A class implementing SupportReadUCI protocol, io_all used in place of + this parameter if not specified. + io_input: SupportsReadUCI/None (Default None) + A class implementing SupportReadTS protocol, io_all used in place of + this parameter if not specified. This parameter is where the input + timeseries will be read from. + io_output: SupportsReadUCI/None (Default None) + A class implementing SupportReadUCI protocol, io_all used in place of + this parameter if not specified. This parameter is where the output + timeseries will be written to and read from. + saveall: Boolean - [optional] Default is False. Saves all calculated data ignoring SAVE tables. - + jupyterlab: Boolean - [optional] Default is True. + Flag for specific output behavior for jupyter lab. Return ------------ None """ + if io_uci is None: io_uci = io_all + if io_input is None: io_input = io_all + if io_output is None: io_output = io_all + + store = io_input._store hdfname = './' if not os.path.exists(hdfname): raise FileNotFoundError(f'{hdfname} HDF5 File Not Found') - store = uci._store msg = messages() msg(1, f'Processing started for file {hdfname}; saveall={saveall}') # read user control, parameters, states, and flags from HDF5 file - uci_parameters = uci.read_uci() + uci_parameters = io_uci.read_uci() opseq, ddlinks, ddmasslinks, ddext_sources, ddgener, uci, siminfo = uci_parameters start, stop = siminfo['start'], siminfo['stop'] @@ -70,7 +86,7 @@ def main(uci: ReadableUCI, siminfo['steps'] = len(siminfo['tindex']) if operation == 'COPY': - copy_instances[segment] = activities[operation](store, siminfo, ddext_sources[(operation,segment)]) + copy_instances[segment] = activities[operation](io_input, siminfo, ddext_sources[(operation,segment)]) elif operation == 'GENER': try: gener_instances[segment] = activities[operation](segment, copy_instances, gener_instances, ddlinks, ddgener) @@ -79,7 +95,7 @@ def main(uci: ReadableUCI, else: # now conditionally execute all activity modules for the op, segment - ts = get_timeseries(input_timeseries,ddext_sources[(operation,segment)],siminfo) + ts = get_timeseries(io_input,ddext_sources[(operation,segment)],siminfo) ts = get_gener_timeseries(ts, gener_instances, ddlinks[segment]) flags = uci[(operation, 'GENERAL', segment)]['ACTIVITY'] if operation == 'RCHRES': @@ -216,13 +232,13 @@ def main(uci: ReadableUCI, if errorcnt > 0: msg(4, f'Error count {errorcnt}: {errormsg}') if 'SAVE' in ui: - save_timeseries(output_timeseries,ts,ui['SAVE'],siminfo,saveall,operation,segment,activity,jupyterlab) + save_timeseries(io_output,ts,ui['SAVE'],siminfo,saveall,operation,segment,activity,jupyterlab) if (activity == 'RQUAL'): - if 'SAVE' in ui_oxrx: save_timeseries(output_timeseries,ts,ui_oxrx['SAVE'],siminfo,saveall,operation,segment,'OXRX',jupyterlab) - if 'SAVE' in ui_nutrx and flags['NUTRX'] == 1: save_timeseries(output_timeseries,ts,ui_nutrx['SAVE'],siminfo,saveall,operation,segment,'NUTRX',jupyterlab) - if 'SAVE' in ui_plank and flags['PLANK'] == 1: save_timeseries(output_timeseries,ts,ui_plank['SAVE'],siminfo,saveall,operation,segment,'PLANK',jupyterlab) - if 'SAVE' in ui_phcarb and flags['PHCARB'] == 1: save_timeseries(output_timeseries,ts,ui_phcarb['SAVE'],siminfo,saveall,operation,segment,'PHCARB',jupyterlab) + if 'SAVE' in ui_oxrx: save_timeseries(io_output,ts,ui_oxrx['SAVE'],siminfo,saveall,operation,segment,'OXRX',jupyterlab) + if 'SAVE' in ui_nutrx and flags['NUTRX'] == 1: save_timeseries(io_output,ts,ui_nutrx['SAVE'],siminfo,saveall,operation,segment,'NUTRX',jupyterlab) + if 'SAVE' in ui_plank and flags['PLANK'] == 1: save_timeseries(io_output,ts,ui_plank['SAVE'],siminfo,saveall,operation,segment,'PLANK',jupyterlab) + if 'SAVE' in ui_phcarb and flags['PHCARB'] == 1: save_timeseries(io_output,ts,ui_phcarb['SAVE'],siminfo,saveall,operation,segment,'PHCARB',jupyterlab) # before going on to the next operation, save the ts dict for later use tscat[segment] = ts @@ -254,7 +270,7 @@ def msg(indent, message, final=False): return mlist return msg -def save_timeseries(timeseries:WriteableTimeseries, ts, savedict, siminfo, saveall, operation, segment, activity, jupyterlab=True): +def save_timeseries(timeseries:SupportsWriteTS, ts, savedict, siminfo, saveall, operation, segment, activity, jupyterlab=True): # save computed timeseries (at computation DELT) save = {k for k,v in savedict.items() if v or saveall} df = DataFrame(index=siminfo['tindex']) diff --git a/HSP2/utilities.py b/HSP2/utilities.py index 4e632ba0..fbd34a3e 100644 --- a/HSP2/utilities.py +++ b/HSP2/utilities.py @@ -10,7 +10,7 @@ from numba import types from numba.typed import Dict -from HSP2IO.protocols import Category, ReadableTimeseries +from HSP2IO.protocols import Category, SupportsReadTS flowtype = { @@ -232,7 +232,7 @@ def versions(import_list=[]): str(datetime.datetime.now())[0:19]]) return pandas.DataFrame(data, index=names, columns=['version']) -def get_timeseries(timeseries_inputs:ReadableTimeseries, ext_sourcesdd, siminfo): +def get_timeseries(timeseries_inputs:SupportsReadTS, ext_sourcesdd, siminfo): ''' makes timeseries for the current timestep and trucated to the sim interval''' # explicit creation of Numba dictionary with signatures ts = Dict.empty(key_type=types.unicode_type, value_type=types.float64[:]) diff --git a/HSP2IO/hdf.py b/HSP2IO/hdf.py index 7b04160e..494ed060 100644 --- a/HSP2IO/hdf.py +++ b/HSP2IO/hdf.py @@ -82,7 +82,6 @@ def read_timeseries(self, path = f'TIMESERIES/{segment}' elif category == category.RESULTS: path = f'RESULTS/{operation}_{segment}/{activity}' - print(f'Reading from: {path}') return read_hdf(self._store, path) def write_timeseries(self, @@ -101,7 +100,6 @@ def write_timeseries(self, if 'compress' in kwargs: if kwargs['compress']: complevel = 9 - print(f'writing to: {path}') data_frame.to_hdf(self._store, path, format='t', data_columns=True, complevel=complevel) #data_frame.to_hdf(self._store, path) diff --git a/HSP2IO/protocols.py b/HSP2IO/protocols.py index da77e1b7..f6fdd971 100644 --- a/HSP2IO/protocols.py +++ b/HSP2IO/protocols.py @@ -1,4 +1,4 @@ -from typing import Protocol, Dict, Any, List, Union +from typing import Protocol, Dict, Any, List, Union, runtime_checkable from collections import defaultdict import pandas as pd import numpy as np @@ -13,15 +13,18 @@ class Category(Enum): RESULTS = 'RESULT' INPUTS = 'INPUT' -class ReadableUCI(Protocol): +@runtime_checkable +class SupportsReadUCI(Protocol): def read_uci(self) -> UCITuple: ... -class WriteableUCI(Protocol): +@runtime_checkable +class SupportsWritUCI(Protocol): def write_uci(self, UCITuple) -> None: ... -class ReadableTimeseries(Protocol): +@runtime_checkable +class SupportsReadTS(Protocol): def read_timeseries(self, category:Category, operation:Union[str,None]=None, @@ -29,7 +32,8 @@ def read_timeseries(self, activity:Union[str,None]=None) -> pd.DataFrame: ... -class WriteableTimeseries(Protocol): +@runtime_checkable +class SupportsWriteTS(Protocol): def write_timeseries(self, data_frame:pd.DataFrame, From 952802192d8a0fd23f707d18b6fcd7faf9fbbdd7 Mon Sep 17 00:00:00 2001 From: Paul Tomasula <31142705+ptomasula@users.noreply.github.com> Date: Fri, 3 Dec 2021 16:04:55 -0500 Subject: [PATCH 05/19] Implement IO wrapper class Having the main method keep track of the various IO objects seemed arduous. This allows up to keep any additional IO related implementation details (e.g. in memory caching of timeseries) away from the main model code. --- HSP2/main.py | 116 ++++++---------------------------------------- HSP2/utilities.py | 66 ++++++++++++++++++++++++-- HSP2IO/io.py | 74 +++++++++++++++++++++++++++++ 3 files changed, 152 insertions(+), 104 deletions(-) create mode 100644 HSP2IO/io.py diff --git a/HSP2/main.py b/HSP2/main.py index a52e13f2..4e80a2f8 100644 --- a/HSP2/main.py +++ b/HSP2/main.py @@ -13,38 +13,19 @@ from datetime import datetime as dt from copy import deepcopy import os -from HSP2.utilities import transform, versions, get_timeseries, expand_timeseries_names +from HSP2.utilities import versions, get_timeseries, expand_timeseries_names, save_timeseries, get_gener_timeseries from HSP2.configuration import activities, noop, expand_masslinks -from HSP2IO.protocols import Category, SupportsReadTS, SupportsWriteTS, SupportsReadUCI +from HSP2IO.io import IOManager from typing import List, Union -def main( - io_all: Union[SupportsReadUCI, SupportsReadTS, SupportsWriteTS, None] = None, - io_uci: Union[SupportsReadUCI,None]=None, - io_input: Union[SupportsReadTS,None]=None, - io_output: Union[SupportsReadTS,SupportsWriteTS,None]=None, - saveall:bool=False, - jupyterlab:bool=True) -> None: +def main(io_manager:IOManager, saveall:bool=False, jupyterlab:bool=True) -> None: """Runs main HSP2 program. Parameters ---------- - io_all: SupportsReadUCI, SupportsReadTS, SupportsWriteTS/None - This parameter is intended to allow users with a single file that - combined UCI, Input and Output a short cut to specify a single argument. - io_uci: SupportsReadUCI/None (Default None) - A class implementing SupportReadUCI protocol, io_all used in place of - this parameter if not specified. - io_input: SupportsReadUCI/None (Default None) - A class implementing SupportReadTS protocol, io_all used in place of - this parameter if not specified. This parameter is where the input - timeseries will be read from. - io_output: SupportsReadUCI/None (Default None) - A class implementing SupportReadUCI protocol, io_all used in place of - this parameter if not specified. This parameter is where the output - timeseries will be written to and read from. + saveall: Boolean - [optional] Default is False. Saves all calculated data ignoring SAVE tables. jupyterlab: Boolean - [optional] Default is True. @@ -55,11 +36,9 @@ def main( """ - if io_uci is None: io_uci = io_all - if io_input is None: io_input = io_all - if io_output is None: io_output = io_all - - store = io_input._store + #PRT - this is a bandaid to run the model while I implement IO Abstraction. + #Eventually all references to store will be removed + store = IOManager._io_output._store hdfname = './' if not os.path.exists(hdfname): raise FileNotFoundError(f'{hdfname} HDF5 File Not Found') @@ -68,7 +47,7 @@ def main( msg(1, f'Processing started for file {hdfname}; saveall={saveall}') # read user control, parameters, states, and flags from HDF5 file - uci_parameters = io_uci.read_uci() + uci_parameters = io_manager.read_uci() opseq, ddlinks, ddmasslinks, ddext_sources, ddgener, uci, siminfo = uci_parameters start, stop = siminfo['start'], siminfo['stop'] @@ -86,7 +65,7 @@ def main( siminfo['steps'] = len(siminfo['tindex']) if operation == 'COPY': - copy_instances[segment] = activities[operation](io_input, siminfo, ddext_sources[(operation,segment)]) + copy_instances[segment] = activities[operation](io_manager, siminfo, ddext_sources[(operation,segment)]) elif operation == 'GENER': try: gener_instances[segment] = activities[operation](segment, copy_instances, gener_instances, ddlinks, ddgener) @@ -95,7 +74,7 @@ def main( else: # now conditionally execute all activity modules for the op, segment - ts = get_timeseries(io_input,ddext_sources[(operation,segment)],siminfo) + ts = get_timeseries(io_manager,ddext_sources[(operation,segment)],siminfo) ts = get_gener_timeseries(ts, gener_instances, ddlinks[segment]) flags = uci[(operation, 'GENERAL', segment)]['ACTIVITY'] if operation == 'RCHRES': @@ -232,13 +211,13 @@ def main( if errorcnt > 0: msg(4, f'Error count {errorcnt}: {errormsg}') if 'SAVE' in ui: - save_timeseries(io_output,ts,ui['SAVE'],siminfo,saveall,operation,segment,activity,jupyterlab) + save_timeseries(io_manager,ts,ui['SAVE'],siminfo,saveall,operation,segment,activity,jupyterlab) if (activity == 'RQUAL'): - if 'SAVE' in ui_oxrx: save_timeseries(io_output,ts,ui_oxrx['SAVE'],siminfo,saveall,operation,segment,'OXRX',jupyterlab) - if 'SAVE' in ui_nutrx and flags['NUTRX'] == 1: save_timeseries(io_output,ts,ui_nutrx['SAVE'],siminfo,saveall,operation,segment,'NUTRX',jupyterlab) - if 'SAVE' in ui_plank and flags['PLANK'] == 1: save_timeseries(io_output,ts,ui_plank['SAVE'],siminfo,saveall,operation,segment,'PLANK',jupyterlab) - if 'SAVE' in ui_phcarb and flags['PHCARB'] == 1: save_timeseries(io_output,ts,ui_phcarb['SAVE'],siminfo,saveall,operation,segment,'PHCARB',jupyterlab) + if 'SAVE' in ui_oxrx: save_timeseries(io_manager,ts,ui_oxrx['SAVE'],siminfo,saveall,operation,segment,'OXRX',jupyterlab) + if 'SAVE' in ui_nutrx and flags['NUTRX'] == 1: save_timeseries(io_manager,ts,ui_nutrx['SAVE'],siminfo,saveall,operation,segment,'NUTRX',jupyterlab) + if 'SAVE' in ui_plank and flags['PLANK'] == 1: save_timeseries(io_manager,ts,ui_plank['SAVE'],siminfo,saveall,operation,segment,'PLANK',jupyterlab) + if 'SAVE' in ui_phcarb and flags['PHCARB'] == 1: save_timeseries(io_manager,ts,ui_phcarb['SAVE'],siminfo,saveall,operation,segment,'PHCARB',jupyterlab) # before going on to the next operation, save the ts dict for later use tscat[segment] = ts @@ -270,51 +249,6 @@ def msg(indent, message, final=False): return mlist return msg -def save_timeseries(timeseries:SupportsWriteTS, ts, savedict, siminfo, saveall, operation, segment, activity, jupyterlab=True): - # save computed timeseries (at computation DELT) - save = {k for k,v in savedict.items() if v or saveall} - df = DataFrame(index=siminfo['tindex']) - if (operation == 'IMPLND' and activity == 'IQUAL') or (operation == 'PERLND' and activity == 'PQUAL'): - for y in save: - for z in set(ts.keys()): - if '/' + y in z: - zrep = z.replace('/','_') - zrep2 = zrep.replace(' ', '') - df[zrep2] = ts[z] - if '_' + y in z: - df[z] = ts[z] - df = df.astype(float32).sort_index(axis='columns') - elif (operation == 'RCHRES' and (activity == 'CONS' or activity == 'GQUAL')): - for y in save: - for z in set(ts.keys()): - if '_' + y in z: - df[z] = ts[z] - for y in (save & set(ts.keys())): - df[y] = ts[y] - df = df.astype(float32).sort_index(axis='columns') - else: - for y in (save & set(ts.keys())): - df[y] = ts[y] - df = df.astype(float32).sort_index(axis='columns') - path = f'RESULTS/{operation}_{segment}/{activity}' - if not df.empty: - timeseries.write_timeseries( - data_frame=df, - category = Category.RESULTS, - operation=operation, - segment=segment, - activity=activity, - compress=jupyterlab - ) - #if jupyterlab: - # df.to_hdf(store, path, complib='blosc', complevel=9) # This is the official version - #else: - # df.to_hdf(store, path, format='t', data_columns=True) # show the columns in HDFView - else: - print('Save DataFrame Empty for', path) - return - - def get_flows(store, ts, tscat, flags, uci, segment, ddlinks, ddmasslinks, steps, msg): # get inflows to this operation for x in ddlinks[segment]: @@ -452,26 +386,6 @@ def get_flows(store, ts, tscat, flags, uci, segment, ddlinks, ddmasslinks, steps # referenced in schematic, could be commented out return -def get_gener_timeseries(ts: Dict, gener_instances: Dict, ddlinks: List) -> Dict: - """ - Uses links tables to load necessary TimeSeries from Gener class instances to TS dictionary - """ - for link in ddlinks: - if link.SVOL == 'GENER': - if link.SVOLNO in gener_instances: - gener = gener_instances[link.SVOLNO] - series = gener.get_ts() - if link.MFACTOR != 1: - series *= link.MFACTOR - - key = f'{link.TMEMN}{link.TMEMSB1} {link.TMEMSB2}'.rstrip() - if key in ts: - ts[key] = ts[key] + series - else: - ts[key] = series - return ts - - ''' # This table defines the expansion to INFLOW, ROFLOW, OFLOW for RCHRES networks diff --git a/HSP2/utilities.py b/HSP2/utilities.py index fbd34a3e..4875ce58 100644 --- a/HSP2/utilities.py +++ b/HSP2/utilities.py @@ -3,14 +3,15 @@ License: LGPL2 General routines for HSP2 ''' - +import pandas as pd +import numpy as np from pandas import Series, date_range from pandas.tseries.offsets import Minute from numpy import zeros, full, tile, float64 from numba import types from numba.typed import Dict -from HSP2IO.protocols import Category, SupportsReadTS +from HSP2IO.protocols import Category, SupportsReadTS, SupportsWriteTS flowtype = { @@ -269,6 +270,46 @@ def get_timeseries(timeseries_inputs:SupportsReadTS, ext_sourcesdd, siminfo): ts[tname] = t return ts +def save_timeseries(timeseries:SupportsWriteTS, ts, savedict, siminfo, saveall, operation, segment, activity, jupyterlab=True): + # save computed timeseries (at computation DELT) + save = {k for k,v in savedict.items() if v or saveall} + df = pd.DataFrame(index=siminfo['tindex']) + if (operation == 'IMPLND' and activity == 'IQUAL') or (operation == 'PERLND' and activity == 'PQUAL'): + for y in save: + for z in set(ts.keys()): + if '/' + y in z: + zrep = z.replace('/','_') + zrep2 = zrep.replace(' ', '') + df[zrep2] = ts[z] + if '_' + y in z: + df[z] = ts[z] + df = df.astype(np.float32).sort_index(axis='columns') + elif (operation == 'RCHRES' and (activity == 'CONS' or activity == 'GQUAL')): + for y in save: + for z in set(ts.keys()): + if '_' + y in z: + df[z] = ts[z] + for y in (save & set(ts.keys())): + df[y] = ts[y] + df = df.astype(np.float32).sort_index(axis='columns') + else: + for y in (save & set(ts.keys())): + df[y] = ts[y] + df = df.astype(np.float32).sort_index(axis='columns') + path = f'RESULTS/{operation}_{segment}/{activity}' + if not df.empty: + timeseries.write_timeseries( + data_frame=df, + category = Category.RESULTS, + operation=operation, + segment=segment, + activity=activity, + compress=jupyterlab + ) + else: + print('Save DataFrame Empty for', path) + return + def expand_timeseries_names(sgrp, smemn, smemsb1, smemsb2, tmemn, tmemsb1, tmemsb2): #special cases to expand timeseries names to resolve with output names in hdf5 file if tmemn == 'ICON': @@ -380,4 +421,23 @@ def expand_timeseries_names(sgrp, smemn, smemsb1, smemsb2, tmemn, tmemsb1, tmems if tmemn == 'PHIF': tmemn = 'PHIF' + tmemsb1 # tmemsb1 is species index - return smemn, tmemn \ No newline at end of file + return smemn, tmemn + +def get_gener_timeseries(ts: Dict, gener_instances: Dict, ddlinks: List) -> Dict: + """ + Uses links tables to load necessary TimeSeries from Gener class instances to TS dictionary + """ + for link in ddlinks: + if link.SVOL == 'GENER': + if link.SVOLNO in gener_instances: + gener = gener_instances[link.SVOLNO] + series = gener.get_ts() + if link.MFACTOR != 1: + series *= link.MFACTOR + + key = f'{link.TMEMN}{link.TMEMSB1} {link.TMEMSB2}'.rstrip() + if key in ts: + ts[key] = ts[key] + series + else: + ts[key] = series + return ts \ No newline at end of file diff --git a/HSP2IO/io.py b/HSP2IO/io.py new file mode 100644 index 00000000..7731135b --- /dev/null +++ b/HSP2IO/io.py @@ -0,0 +1,74 @@ +import pandas as pd +from pandas.core.frame import DataFrame +from protocols import UCITuple, Category, SupportsReadUCI, SupportsReadTS, SupportsWriteTS +from typing import Union + +class IOManager: + """Management class for IO operations needed to execute the HSP2 model""" + + def __init__(self, + io_all: Union[SupportsReadUCI, SupportsReadTS, SupportsWriteTS, None] = None, + io_uci: Union[SupportsReadUCI,None]=None, + io_input: Union[SupportsReadTS,None]=None, + io_output: Union[SupportsReadTS,SupportsWriteTS,None]=None) -> None: + """ io_all: SupportsReadUCI, SupportsReadTS, SupportsWriteTS/None + This parameter is intended to allow users with a single file that + combined UCI, Input and Output a short cut to specify a single argument. + io_uci: SupportsReadUCI/None (Default None) + A class implementing SupportReadUCI protocol, io_all used in place of + this parameter if not specified. + io_input: SupportsReadUCI/None (Default None) + A class implementing SupportReadTS protocol, io_all used in place of + this parameter if not specified. This parameter is where the input + timeseries will be read from. + io_output: SupportsReadUCI/None (Default None) + A class implementing SupportReadUCI protocol, io_all used in place of + this parameter if not specified. This parameter is where the output + timeseries will be written to and read from. + """ + + self.io_input = io_input if io_uci is None else io_all + self.io_output = io_output if io_uci is None else io_all + self.io_uci = io_uci if io_uci is None else io_all + + self._in_memory = {} + + def read_uci(self) -> UCITuple: + self.io_uci.read_uci() + + def write_ts(self, + data_frame:pd.DataFrame, + category:Category, + operation:Union[str,None]=None, + segment:Union[str,None]=None, + activity:Union[str,None]=None) -> None: + key = (category, operation, segment, activity) + self.io_output.write_timeseries(data_frame, category, operation, segment, activity) + self._in_memory[key] = data_frame + + def read_ts(self, + category:Category, + operation:Union[str,None]=None, + segment:Union[str,None]=None, + activity:Union[str,None]=None) -> pd.DataFrame: + data_frame = self._get_in_memory(category, operation, segment, activity) + if data_frame: return data_frame + if category == Category.INPUTS: + data_frame = self.io_input.read_timeseries(category, operation, segment, activity) + key = (category, operation, segment, activity) + self._in_memory[key] = data_frame + return data_frame + if category == Category.RESULTS: + return self.io_output.read_timeseries(category, operation, segment, activity) + return pd.DataFrame + + def _get_in_memory(self, + category:Category, + operation:Union[str,None]=None, + segment:Union[str,None]=None, + activity:Union[str,None]=None) -> Union[pd.DataFrame, None]: + key = (category, operation, segment, activity) + try: + return self._in_memory[key] + except KeyError: + return None From bb9d71daec7880739c4c8b1ef039162ece38e25b Mon Sep 17 00:00:00 2001 From: Paul Tomasula <31142705+ptomasula@users.noreply.github.com> Date: Fri, 3 Dec 2021 17:09:49 -0500 Subject: [PATCH 06/19] Fix minor bugs Fix no return on IOManger.read_uci Fixes to type hints Remove memory manage from get_flows because this is now handled by IOManger Rename IOManger read and write methods to match protocols Added *Args **Kwargs to IOMangerMethods --- HSP2/main.py | 92 ++++++++++++++++++----------------------------- HSP2/utilities.py | 5 +-- HSP2IO/io.py | 30 ++++++++-------- 3 files changed, 53 insertions(+), 74 deletions(-) diff --git a/HSP2/main.py b/HSP2/main.py index 4e80a2f8..ae8542d0 100644 --- a/HSP2/main.py +++ b/HSP2/main.py @@ -16,7 +16,7 @@ from HSP2.utilities import versions, get_timeseries, expand_timeseries_names, save_timeseries, get_gener_timeseries from HSP2.configuration import activities, noop, expand_masslinks -from HSP2IO.io import IOManager +from HSP2IO.io import IOManager, SupportsReadTS, Category from typing import List, Union @@ -38,7 +38,7 @@ def main(io_manager:IOManager, saveall:bool=False, jupyterlab:bool=True) -> None #PRT - this is a bandaid to run the model while I implement IO Abstraction. #Eventually all references to store will be removed - store = IOManager._io_output._store + store = io_manager._output._store hdfname = './' if not os.path.exists(hdfname): raise FileNotFoundError(f'{hdfname} HDF5 File Not Found') @@ -85,7 +85,7 @@ def main(io_manager:IOManager, saveall:bool=False, jupyterlab:bool=True) -> None flags['PO4FG'] = uci[(operation, 'NUTRX', segment)]['FLAGS']['PO4FG'] flags['ADPOFG'] = uci[(operation, 'NUTRX', segment)]['FLAGS']['ADPOFG'] - get_flows(store, ts, tscat, flags, uci, segment, ddlinks, ddmasslinks, siminfo['steps'], msg) + get_flows(io_manager, ts, flags, uci, segment, ddlinks, ddmasslinks, siminfo['steps'], msg) for activity, function in activities[operation].items(): if function == noop: #or not flags[activity]: @@ -219,9 +219,6 @@ def main(io_manager:IOManager, saveall:bool=False, jupyterlab:bool=True) -> None if 'SAVE' in ui_plank and flags['PLANK'] == 1: save_timeseries(io_manager,ts,ui_plank['SAVE'],siminfo,saveall,operation,segment,'PLANK',jupyterlab) if 'SAVE' in ui_phcarb and flags['PHCARB'] == 1: save_timeseries(io_manager,ts,ui_phcarb['SAVE'],siminfo,saveall,operation,segment,'PHCARB',jupyterlab) - # before going on to the next operation, save the ts dict for later use - tscat[segment] = ts - msglist = msg(1, 'Done', final=True) df = DataFrame(msglist, columns=['logfile']) @@ -249,7 +246,7 @@ def msg(indent, message, final=False): return mlist return msg -def get_flows(store, ts, tscat, flags, uci, segment, ddlinks, ddmasslinks, steps, msg): +def get_flows(io_manager:SupportsReadTS, ts, flags, uci, segment, ddlinks, ddmasslinks, steps, msg): # get inflows to this operation for x in ddlinks[segment]: mldata = ddmasslinks[x.MLNO] @@ -332,58 +329,37 @@ def get_flows(store, ts, tscat, flags, uci, segment, ddlinks, ddmasslinks, steps AFname = f'{x.SVOL}{x.SVOLNO}_AFACTR' data = f'{smemn}{smemsb1}{smemsb2}' - foundts = False - if x.SVOLNO in tscat: - # don't go back to h5 file, use in memory version - if data in tscat[x.SVOLNO]: - t = deepcopy(tscat[x.SVOLNO][data]) - foundts = True - elif smemn in tscat[x.SVOLNO]: - t = deepcopy(tscat[x.SVOLNO][smemn]) - foundts = True - if not foundts: - # haven't found in our ts catalog in memory, look on h5 file - if path in store: - if data in store[path]: - t = store[path][data].astype(float64).to_numpy()[0:steps] - foundts = True - else: - data = f'{smemn}' - if data in store[path]: - t = store[path][data].astype(float64).to_numpy()[0:steps] - foundts = True - else: - print('ERROR in FLOWS, cant resolve ', path + ' ' + smemn) - - if foundts: - if MFname in ts and AFname in ts: - t *= ts[MFname][:steps] * ts[AFname][0:steps] - msg(4, f'MFACTOR modified by timeseries {MFname}') - msg(4, f'AFACTR modified by timeseries {AFname}') - elif MFname in ts: - t *= afactr * ts[MFname][0:steps] - msg(4, f'MFACTOR modified by timeseries {MFname}') - elif AFname in ts: - t *= mfactor * ts[AFname][0:steps] - msg(4, f'AFACTR modified by timeseries {AFname}') - else: - t *= factor - - # if poht to iheat, imprecision in hspf conversion factor requires a slight adjustment - if (smemn == 'POHT' or smemn == 'SOHT') and tmemn == 'IHEAT': - t *= 0.998553 - if (smemn == 'PODOXM' or smemn == 'SODOXM') and tmemn == 'OXIF1': - t *= 1.000565 - - # ??? ISSUE: can fetched data be at different frequency - don't know how to transform. - if tmemn in ts: - ts[tmemn] += t - else: - ts[tmemn] = t + data_frame = io_manager.read_timeseries(Category.RESULTS,x.SVOL,x.SVOLNO, sgrpn) + try: + if data in data_frame.columns: t = data_frame[data].astype(float64).to_numpy()[0:steps] + else: t = data_frame[smemn].astype(float64).to_numpy()[0:steps] + except KeyError: + print('ERROR in FLOWS, cant resolve ', path + ' ' + smemn) + + if MFname in ts and AFname in ts: + t *= ts[MFname][:steps] * ts[AFname][0:steps] + msg(4, f'MFACTOR modified by timeseries {MFname}') + msg(4, f'AFACTR modified by timeseries {AFname}') + elif MFname in ts: + t *= afactr * ts[MFname][0:steps] + msg(4, f'MFACTOR modified by timeseries {MFname}') + elif AFname in ts: + t *= mfactor * ts[AFname][0:steps] + msg(4, f'AFACTR modified by timeseries {AFname}') + else: + t *= factor + + # if poht to iheat, imprecision in hspf conversion factor requires a slight adjustment + if (smemn == 'POHT' or smemn == 'SOHT') and tmemn == 'IHEAT': + t *= 0.998553 + if (smemn == 'PODOXM' or smemn == 'SODOXM') and tmemn == 'OXIF1': + t *= 1.000565 + + # ??? ISSUE: can fetched data be at different frequency - don't know how to transform. + if tmemn in ts: + ts[tmemn] += t else: - pass - # print('ERROR in FLOWS for', path) # not necessarily an error to not find an operation - # referenced in schematic, could be commented out + ts[tmemn] = t return ''' diff --git a/HSP2/utilities.py b/HSP2/utilities.py index 4875ce58..e15132ff 100644 --- a/HSP2/utilities.py +++ b/HSP2/utilities.py @@ -12,6 +12,7 @@ from numba.typed import Dict from HSP2IO.protocols import Category, SupportsReadTS, SupportsWriteTS +from typing import List flowtype = { @@ -270,7 +271,7 @@ def get_timeseries(timeseries_inputs:SupportsReadTS, ext_sourcesdd, siminfo): ts[tname] = t return ts -def save_timeseries(timeseries:SupportsWriteTS, ts, savedict, siminfo, saveall, operation, segment, activity, jupyterlab=True): +def save_timeseries(timeseries:SupportsWriteTS, ts, savedict, siminfo, saveall, operation, segment, activity, compress=True): # save computed timeseries (at computation DELT) save = {k for k,v in savedict.items() if v or saveall} df = pd.DataFrame(index=siminfo['tindex']) @@ -304,7 +305,7 @@ def save_timeseries(timeseries:SupportsWriteTS, ts, savedict, siminfo, saveall, operation=operation, segment=segment, activity=activity, - compress=jupyterlab + compress=compress ) else: print('Save DataFrame Empty for', path) diff --git a/HSP2IO/io.py b/HSP2IO/io.py index 7731135b..fe9014c7 100644 --- a/HSP2IO/io.py +++ b/HSP2IO/io.py @@ -1,6 +1,6 @@ import pandas as pd from pandas.core.frame import DataFrame -from protocols import UCITuple, Category, SupportsReadUCI, SupportsReadTS, SupportsWriteTS +from HSP2IO.protocols import UCITuple, Category, SupportsReadUCI, SupportsReadTS, SupportsWriteTS from typing import Union class IOManager: @@ -27,39 +27,41 @@ def __init__(self, timeseries will be written to and read from. """ - self.io_input = io_input if io_uci is None else io_all - self.io_output = io_output if io_uci is None else io_all - self.io_uci = io_uci if io_uci is None else io_all + self._input = io_all if io_input is None else io_input + self._output = io_all if io_input is None else io_output + self._uci = io_all if io_uci is None else io_uci self._in_memory = {} - def read_uci(self) -> UCITuple: - self.io_uci.read_uci() + def read_uci(self, *args, **kwargs) -> UCITuple: + return self._uci.read_uci() - def write_ts(self, + def write_timeseries(self, data_frame:pd.DataFrame, category:Category, operation:Union[str,None]=None, segment:Union[str,None]=None, - activity:Union[str,None]=None) -> None: + activity:Union[str,None]=None, + *args, **kwargs) -> None: key = (category, operation, segment, activity) - self.io_output.write_timeseries(data_frame, category, operation, segment, activity) + self._output.write_timeseries(data_frame, category, operation, segment, activity) self._in_memory[key] = data_frame - def read_ts(self, + def read_timeseries(self, category:Category, operation:Union[str,None]=None, segment:Union[str,None]=None, - activity:Union[str,None]=None) -> pd.DataFrame: + activity:Union[str,None]=None, + *args, **kwargs) -> pd.DataFrame: data_frame = self._get_in_memory(category, operation, segment, activity) - if data_frame: return data_frame + if data_frame is not None: return data_frame if category == Category.INPUTS: - data_frame = self.io_input.read_timeseries(category, operation, segment, activity) + data_frame = self._input.read_timeseries(category, operation, segment, activity) key = (category, operation, segment, activity) self._in_memory[key] = data_frame return data_frame if category == Category.RESULTS: - return self.io_output.read_timeseries(category, operation, segment, activity) + return self._output.read_timeseries(category, operation, segment, activity) return pd.DataFrame def _get_in_memory(self, From 17437668d2731d7a3adc6ba798e99324c9048ba5 Mon Sep 17 00:00:00 2001 From: Paul Tomasula <31142705+ptomasula@users.noreply.github.com> Date: Tue, 14 Dec 2021 16:17:41 -0500 Subject: [PATCH 07/19] Bug Fix timeseries caching Previous implementation of timeseries caching passed reference to cached timeseries. This meant that modifications to the referenced timeseries (such as applying an mfactor) would also modify the cached timeseries. Resolved by adding pd.DataFrame.copy method. --- HSP2IO/io.py | 16 +++++++++++----- 1 file changed, 11 insertions(+), 5 deletions(-) diff --git a/HSP2IO/io.py b/HSP2IO/io.py index fe9014c7..85e21fb7 100644 --- a/HSP2IO/io.py +++ b/HSP2IO/io.py @@ -28,11 +28,16 @@ def __init__(self, """ self._input = io_all if io_input is None else io_input - self._output = io_all if io_input is None else io_output + self._output = io_all if io_output is None else io_output self._uci = io_all if io_uci is None else io_uci self._in_memory = {} + def __def__(self): + del(self._input) + del(self._output) + del(self._uci) + def read_uci(self, *args, **kwargs) -> UCITuple: return self._uci.read_uci() @@ -45,7 +50,7 @@ def write_timeseries(self, *args, **kwargs) -> None: key = (category, operation, segment, activity) self._output.write_timeseries(data_frame, category, operation, segment, activity) - self._in_memory[key] = data_frame + self._in_memory[key] = data_frame.copy(deep=True) def read_timeseries(self, category:Category, @@ -54,11 +59,12 @@ def read_timeseries(self, activity:Union[str,None]=None, *args, **kwargs) -> pd.DataFrame: data_frame = self._get_in_memory(category, operation, segment, activity) - if data_frame is not None: return data_frame + if data_frame is not None: + return data_frame if category == Category.INPUTS: data_frame = self._input.read_timeseries(category, operation, segment, activity) key = (category, operation, segment, activity) - self._in_memory[key] = data_frame + self._in_memory[key] = data_frame.copy(deep=True) return data_frame if category == Category.RESULTS: return self._output.read_timeseries(category, operation, segment, activity) @@ -71,6 +77,6 @@ def _get_in_memory(self, activity:Union[str,None]=None) -> Union[pd.DataFrame, None]: key = (category, operation, segment, activity) try: - return self._in_memory[key] + return self._in_memory[key].copy(deep=True) except KeyError: return None From f71f291cd146bf6d5707981e09c1dd5afbc538ec Mon Sep 17 00:00:00 2001 From: Paul Tomasula <31142705+ptomasula@users.noreply.github.com> Date: Tue, 14 Dec 2021 16:19:02 -0500 Subject: [PATCH 08/19] Move MF and AF manipulation within Try Except block --- HSP2/main.py | 50 ++++++++++++++++++++++++++------------------------ 1 file changed, 26 insertions(+), 24 deletions(-) diff --git a/HSP2/main.py b/HSP2/main.py index ae8542d0..001c81ce 100644 --- a/HSP2/main.py +++ b/HSP2/main.py @@ -333,33 +333,35 @@ def get_flows(io_manager:SupportsReadTS, ts, flags, uci, segment, ddlinks, ddmas try: if data in data_frame.columns: t = data_frame[data].astype(float64).to_numpy()[0:steps] else: t = data_frame[smemn].astype(float64).to_numpy()[0:steps] + + if MFname in ts and AFname in ts: + t *= ts[MFname][:steps] * ts[AFname][0:steps] + msg(4, f'MFACTOR modified by timeseries {MFname}') + msg(4, f'AFACTR modified by timeseries {AFname}') + elif MFname in ts: + t *= afactr * ts[MFname][0:steps] + msg(4, f'MFACTOR modified by timeseries {MFname}') + elif AFname in ts: + t *= mfactor * ts[AFname][0:steps] + msg(4, f'AFACTR modified by timeseries {AFname}') + else: + t *= factor + + # if poht to iheat, imprecision in hspf conversion factor requires a slight adjustment + if (smemn == 'POHT' or smemn == 'SOHT') and tmemn == 'IHEAT': + t *= 0.998553 + if (smemn == 'PODOXM' or smemn == 'SODOXM') and tmemn == 'OXIF1': + t *= 1.000565 + + # ??? ISSUE: can fetched data be at different frequency - don't know how to transform. + if tmemn in ts: + ts[tmemn] += t + else: + ts[tmemn] = t + except KeyError: print('ERROR in FLOWS, cant resolve ', path + ' ' + smemn) - if MFname in ts and AFname in ts: - t *= ts[MFname][:steps] * ts[AFname][0:steps] - msg(4, f'MFACTOR modified by timeseries {MFname}') - msg(4, f'AFACTR modified by timeseries {AFname}') - elif MFname in ts: - t *= afactr * ts[MFname][0:steps] - msg(4, f'MFACTOR modified by timeseries {MFname}') - elif AFname in ts: - t *= mfactor * ts[AFname][0:steps] - msg(4, f'AFACTR modified by timeseries {AFname}') - else: - t *= factor - - # if poht to iheat, imprecision in hspf conversion factor requires a slight adjustment - if (smemn == 'POHT' or smemn == 'SOHT') and tmemn == 'IHEAT': - t *= 0.998553 - if (smemn == 'PODOXM' or smemn == 'SODOXM') and tmemn == 'OXIF1': - t *= 1.000565 - - # ??? ISSUE: can fetched data be at different frequency - don't know how to transform. - if tmemn in ts: - ts[tmemn] += t - else: - ts[tmemn] = t return ''' From 091bfaf6c16486293f86a7e214e1a20b0ae65e19 Mon Sep 17 00:00:00 2001 From: Paul Tomasula <31142705+ptomasula@users.noreply.github.com> Date: Tue, 14 Dec 2021 17:22:59 -0500 Subject: [PATCH 09/19] Replace protocols with 'ts' Rename IO protocol implementation with 'ts' instead of 'timeseries' to be consistent with protocol names. --- HSP2/utilities.py | 4 ++-- HSP2IO/hdf.py | 4 ++-- HSP2IO/io.py | 10 +++++----- HSP2IO/protocols.py | 4 ++-- 4 files changed, 11 insertions(+), 11 deletions(-) diff --git a/HSP2/utilities.py b/HSP2/utilities.py index e15132ff..233955c4 100644 --- a/HSP2/utilities.py +++ b/HSP2/utilities.py @@ -239,7 +239,7 @@ def get_timeseries(timeseries_inputs:SupportsReadTS, ext_sourcesdd, siminfo): # explicit creation of Numba dictionary with signatures ts = Dict.empty(key_type=types.unicode_type, value_type=types.float64[:]) for row in ext_sourcesdd: - data_frame = timeseries_inputs.read_timeseries(category=Category.INPUTS,segment=row.SVOLNO) + data_frame = timeseries_inputs.read_ts(category=Category.INPUTS,segment=row.SVOLNO) if row.MFACTOR != 1.0: data_frame *= row.MFACTOR @@ -299,7 +299,7 @@ def save_timeseries(timeseries:SupportsWriteTS, ts, savedict, siminfo, saveall, df = df.astype(np.float32).sort_index(axis='columns') path = f'RESULTS/{operation}_{segment}/{activity}' if not df.empty: - timeseries.write_timeseries( + timeseries.write_ts( data_frame=df, category = Category.RESULTS, operation=operation, diff --git a/HSP2IO/hdf.py b/HSP2IO/hdf.py index 494ed060..062bb45c 100644 --- a/HSP2IO/hdf.py +++ b/HSP2IO/hdf.py @@ -72,7 +72,7 @@ def read_uci(self) -> UCITuple: for i in range(int(start), int(stop)+1): ddgener[module][f'G{i:03d}'] = row[2] return (opseq, ddlinks, ddmasslinks, ddext_sources, ddgener, uci, siminfo) - def read_timeseries(self, + def read_ts(self, category:Category, operation:Union[str,None]=None, segment:Union[str,None]=None, @@ -84,7 +84,7 @@ def read_timeseries(self, path = f'RESULTS/{operation}_{segment}/{activity}' return read_hdf(self._store, path) - def write_timeseries(self, + def write_ts(self, data_frame:pd.DataFrame, category: Category, operation:str, diff --git a/HSP2IO/io.py b/HSP2IO/io.py index 85e21fb7..c544b284 100644 --- a/HSP2IO/io.py +++ b/HSP2IO/io.py @@ -41,7 +41,7 @@ def __def__(self): def read_uci(self, *args, **kwargs) -> UCITuple: return self._uci.read_uci() - def write_timeseries(self, + def write_ts(self, data_frame:pd.DataFrame, category:Category, operation:Union[str,None]=None, @@ -49,10 +49,10 @@ def write_timeseries(self, activity:Union[str,None]=None, *args, **kwargs) -> None: key = (category, operation, segment, activity) - self._output.write_timeseries(data_frame, category, operation, segment, activity) + self._output.write_ts(data_frame, category, operation, segment, activity) self._in_memory[key] = data_frame.copy(deep=True) - def read_timeseries(self, + def read_ts(self, category:Category, operation:Union[str,None]=None, segment:Union[str,None]=None, @@ -62,12 +62,12 @@ def read_timeseries(self, if data_frame is not None: return data_frame if category == Category.INPUTS: - data_frame = self._input.read_timeseries(category, operation, segment, activity) + data_frame = self._input.read_ts(category, operation, segment, activity) key = (category, operation, segment, activity) self._in_memory[key] = data_frame.copy(deep=True) return data_frame if category == Category.RESULTS: - return self._output.read_timeseries(category, operation, segment, activity) + return self._output.read_ts(category, operation, segment, activity) return pd.DataFrame def _get_in_memory(self, diff --git a/HSP2IO/protocols.py b/HSP2IO/protocols.py index f6fdd971..83d6a375 100644 --- a/HSP2IO/protocols.py +++ b/HSP2IO/protocols.py @@ -25,7 +25,7 @@ def write_uci(self, UCITuple) -> None: @runtime_checkable class SupportsReadTS(Protocol): - def read_timeseries(self, + def read_ts(self, category:Category, operation:Union[str,None]=None, segment:Union[str,None]=None, @@ -35,7 +35,7 @@ def read_timeseries(self, @runtime_checkable class SupportsWriteTS(Protocol): - def write_timeseries(self, + def write_ts(self, data_frame:pd.DataFrame, category:Category, operation:Union[str,None]=None, From 6bf978bc9a2768cc9f154060c714ec1573652a3f Mon Sep 17 00:00:00 2001 From: Paul Tomasula <31142705+ptomasula@users.noreply.github.com> Date: Tue, 14 Dec 2021 17:31:38 -0500 Subject: [PATCH 10/19] First cut at module level IO abstraction Replaces 'store' reference in majority of model modules. --- HSP2/ADCALC.py | 2 +- HSP2/ATEMP.py | 6 ++++-- HSP2/CONS.py | 2 +- HSP2/GQUAL.py | 2 +- HSP2/HTRCH.py | 2 +- HSP2/HYDR.py | 5 ++++- HSP2/IQUAL.py | 2 +- HSP2/IWATER.py | 2 +- HSP2/IWTGAS.py | 2 +- HSP2/PQUAL.py | 2 +- HSP2/PSTEMP.py | 2 +- HSP2/PWATER.py | 2 +- HSP2/PWTGAS.py | 2 +- HSP2/RQUAL.py | 5 ++++- HSP2/SEDMNT.py | 2 +- HSP2/SEDTRN.py | 2 +- HSP2/SNOW.py | 11 ++++++++--- HSP2/SOLIDS.py | 2 +- HSP2/main.py | 6 +++--- HSP2IO/hdf.py | 3 +++ 20 files changed, 40 insertions(+), 24 deletions(-) diff --git a/HSP2/ADCALC.py b/HSP2/ADCALC.py index 75c7c376..a7415b20 100644 --- a/HSP2/ADCALC.py +++ b/HSP2/ADCALC.py @@ -18,7 +18,7 @@ ''' ERRMSG = [] -def adcalc(store, siminfo, uci, ts): +def adcalc(io_manager, siminfo, uci, ts): '''Prepare to simulate advection of fully entrained constituents''' errorsV = zeros(len(ERRMSG), dtype=int) diff --git a/HSP2/ATEMP.py b/HSP2/ATEMP.py index 93f37982..2ea91642 100644 --- a/HSP2/ATEMP.py +++ b/HSP2/ATEMP.py @@ -8,13 +8,15 @@ from numpy import empty, zeros, int64 from HSP2.utilities import hoursval, make_numba_dict +from HSP2IO.protocols import SupportsReadTS, Category + ERRMSGS = () -def atemp(store, siminfo, uci, ts): +def atemp(io_manager:SupportsReadTS, siminfo, uci, ts): ''' high level driver for air temperature module''' - ts['LAPSE'] = hoursval(siminfo, store['TIMESERIES/LAPSE_Table'], lapselike=True) + ts['LAPSE'] = hoursval(siminfo, io_manager.read_ts(Category.INPUTS, 'LAPSE'), lapselike=True) ui = make_numba_dict(uci) # Note: all values coverted to float automatically ui['k'] = siminfo['delt'] * 0.000833 # convert to in/timestep diff --git a/HSP2/CONS.py b/HSP2/CONS.py index 68772469..02887087 100644 --- a/HSP2/CONS.py +++ b/HSP2/CONS.py @@ -29,7 +29,7 @@ ERRMSG = [] -def cons(store, siminfo, uci, ts): +def cons(io_manager, siminfo, uci, ts): ''' Simulate behavior of conservative constituents; calculate concentration of conservative constituents after advection''' diff --git a/HSP2/GQUAL.py b/HSP2/GQUAL.py index 50978f64..7c6b42b7 100644 --- a/HSP2/GQUAL.py +++ b/HSP2/GQUAL.py @@ -22,7 +22,7 @@ 'GQUAL: the value of sdfg is 1, but timeseries SSED4 is not available as input', #ERRMSG10 'GQUAL: the value of phytfg is 1, but timeseries PHYTO is not available as input') #ERRMSG11 -def gqual(store, siminfo, uci, ts): +def gqual(io_manager, siminfo, uci, ts): ''' Simulate the behavior of a generalized quality constituent''' errors = zeros(len(ERRMSGS)).astype(int64) diff --git a/HSP2/HTRCH.py b/HSP2/HTRCH.py index 852c7a38..21ed203c 100644 --- a/HSP2/HTRCH.py +++ b/HSP2/HTRCH.py @@ -36,7 +36,7 @@ ERRMSGS =('HTRCH: Water temperature is above 66 C (150 F) -- In most cases, this indicates an instability in advection','') #ERRMSG0 -def htrch(store, siminfo, uci, ts): +def htrch(io_manager, siminfo, uci, ts): '''Simulate heat exchange and water temperature''' advectData = uci['advectData'] diff --git a/HSP2/HYDR.py b/HSP2/HYDR.py index 54190543..68c1c20f 100644 --- a/HSP2/HYDR.py +++ b/HSP2/HYDR.py @@ -30,7 +30,7 @@ MAXLOOPS = 100 # newton method exit tolerance -def hydr(store, siminfo, uci, ts): +def hydr(io_manager, siminfo, uci, ts): ''' find the state of the reach/reservoir at the end of the time interval and the outflows during the interval @@ -40,6 +40,9 @@ def hydr(store, siminfo, uci, ts): ui is a dictionary with RID specific HSPF UCI like data ts is a dictionary with RID specific timeseries''' +#PRT - temp work around while implementing IO abstraction + store = io_manager._input._store + steps = siminfo['steps'] # number of simulation points uunits = siminfo['units'] nexits = int(uci['PARAMETERS']['NEXITS']) diff --git a/HSP2/IQUAL.py b/HSP2/IQUAL.py index 73e13c86..e0ef7194 100644 --- a/HSP2/IQUAL.py +++ b/HSP2/IQUAL.py @@ -21,7 +21,7 @@ ERRMSGS =('IQUAL: A constituent must be associated with overland flow in order to receive atmospheric deposition inputs','') #ERRMSG0 -def iqual(store, siminfo, uci, ts): +def iqual(io_manager, siminfo, uci, ts): ''' Simulate washoff of quality constituents (other than solids, Heat, dox, and co2) using simple relationships with solids And/or water yield''' diff --git a/HSP2/IWATER.py b/HSP2/IWATER.py index 0d0553ca..4842e3b9 100644 --- a/HSP2/IWATER.py +++ b/HSP2/IWATER.py @@ -16,7 +16,7 @@ ) -def iwater(store, siminfo, uci, ts): +def iwater(io_manager, siminfo, uci, ts): ''' Driver for IMPLND IWATER code. CALL: iwater(store, general, ui, ts) store is the Pandas/PyTable open store general is a dictionary with simulation info (OP_SEQUENCE for example) diff --git a/HSP2/IWTGAS.py b/HSP2/IWTGAS.py index 48849da5..8fae5ca3 100644 --- a/HSP2/IWTGAS.py +++ b/HSP2/IWTGAS.py @@ -30,7 +30,7 @@ PCFLX3 = IGCF1(3,LEV) * MFACTA ''' -def iwtgas(store, siminfo, uci, ts): +def iwtgas(io_manager, siminfo, uci, ts): ''' Estimate water temperature, dissolved oxygen, and carbon dioxide in the outflows from a impervious land segment. calculate associated fluxes through exit gate''' diff --git a/HSP2/PQUAL.py b/HSP2/PQUAL.py index f5e87ba9..e0ad15a1 100644 --- a/HSP2/PQUAL.py +++ b/HSP2/PQUAL.py @@ -24,7 +24,7 @@ PFACTA = 1.0 -def pqual(store, siminfo, uci, ts): +def pqual(io_manager, siminfo, uci, ts): ''' Simulate quality constituents (other than sediment, heat, dox, and co2) using simple relationships with sediment and water yield''' diff --git a/HSP2/PSTEMP.py b/HSP2/PSTEMP.py index 4a137684..dffedd01 100644 --- a/HSP2/PSTEMP.py +++ b/HSP2/PSTEMP.py @@ -20,7 +20,7 @@ MINTMP = -100 MAXTMP = 100 -def pstemp(store, siminfo, uci, ts): +def pstemp(io_manager, siminfo, uci, ts): '''Estimate soil temperatures in a pervious land segment''' simlen = siminfo['steps'] diff --git a/HSP2/PWATER.py b/HSP2/PWATER.py index 69389ced..de509a04 100644 --- a/HSP2/PWATER.py +++ b/HSP2/PWATER.py @@ -24,7 +24,7 @@ 'PWATER: High Water Table code not implemented', #ERRMSG9 ) -def pwater(store, siminfo, uci, ts): +def pwater(io_manager, siminfo, uci, ts): ''' PERLND WATER module CALL: pwater(store, general, ui, ts) store is the Pandas/PyTable open store diff --git a/HSP2/PWTGAS.py b/HSP2/PWTGAS.py index 31e59219..a4e2df1c 100644 --- a/HSP2/PWTGAS.py +++ b/HSP2/PWTGAS.py @@ -25,7 +25,7 @@ MFACTB = 0. -def pwtgas(store, siminfo, uci, ts): +def pwtgas(io_manager, siminfo, uci, ts): ''' Estimate water temperature, dissolved oxygen, and carbon dioxide in the outflows from a pervious landsegment. calculate associated fluxes through exit gates''' simlen = siminfo['steps'] diff --git a/HSP2/RQUAL.py b/HSP2/RQUAL.py index c1a397ff..e96e18ea 100644 --- a/HSP2/RQUAL.py +++ b/HSP2/RQUAL.py @@ -24,9 +24,12 @@ ERRMSGS_phcarb = ('PHCARB: Error -- Invalid CONS index specified for ALKCON (i.e., ALKCON > NCONS).', 'PHCARB: Error -- A satisfactory solution for pH has not been reached.') -def rqual(store, siminfo, uci, uci_oxrx, uci_nutrx, uci_plank, uci_phcarb, ts): +def rqual(io_manager, siminfo, uci, uci_oxrx, uci_nutrx, uci_plank, uci_phcarb, ts): ''' Simulate constituents involved in biochemical transformations''' + #PRT - temp work around while implementing IO abstraction + store = io_manager._input._store + # simulation information: delt60 = siminfo['delt'] / 60 # delt60 - simulation time interval in hours simlen = siminfo['steps'] diff --git a/HSP2/SEDMNT.py b/HSP2/SEDMNT.py index 1f99306f..1e203504 100644 --- a/HSP2/SEDMNT.py +++ b/HSP2/SEDMNT.py @@ -18,7 +18,7 @@ PDETS= DETS*MFACTA # convert dimensional variables to external units ''' -def sedmnt(store, siminfo, uci, ts): +def sedmnt(io_manager, siminfo, uci, ts): ''' Produce and remove sediment from the land surface''' simlen = siminfo['steps'] diff --git a/HSP2/SEDTRN.py b/HSP2/SEDTRN.py index d330856f..f78b8f83 100644 --- a/HSP2/SEDTRN.py +++ b/HSP2/SEDTRN.py @@ -17,7 +17,7 @@ 'SEDTRN: Simulation of sediment requires all 3 "auxiliary flags" (AUX1FG, etc) in section HYDR must be turned on', #ERRMSG5 'SEDTRN: When specifying the initial composition of the bed, the fraction of sand, silt, and clay must sum to a value close to 1.0.') #ERRMSG6 -def sedtrn(store, siminfo, uci, ts): +def sedtrn(io_manager, siminfo, uci, ts): ''' Simulate behavior of inorganic sediment''' # simlen = siminfo['steps'] diff --git a/HSP2/SNOW.py b/HSP2/SNOW.py index 52889754..16176c9e 100644 --- a/HSP2/SNOW.py +++ b/HSP2/SNOW.py @@ -18,11 +18,13 @@ from numba import njit from HSP2.utilities import hourflag, monthval, hoursval, make_numba_dict, initm +from HSP2IO.protocols import SupportsReadTS, Category + ERRMSGS = ('Snow simulation cannot function properly with delt> 360', #ERRMSG0 ) -def snow(store, siminfo, uci, ts): +def snow(io_manager:SupportsReadTS, siminfo, uci, ts): ''' high level driver for SNOW module CALL: snow(store, general, ui, ts) store is the Pandas/PyTable open store @@ -33,8 +35,11 @@ def snow(store, siminfo, uci, ts): steps = siminfo['steps'] # number of simulation timesteps UUNITS = siminfo['units'] - ts['SVP'] = store['TIMESERIES/Saturated_Vapor_Pressure_Table'].to_numpy() - ts['SEASONS'] = monthval(siminfo, store['TIMESERIES/SEASONS_Table']) + ts['SVP'] = io_manager.read_ts(Category.INPUTS, 'Saturated_Vapor_Pressure').to_numpy() + ts['SEASONS'] = monthval(siminfo, io_manager.read_ts(Category.INPUTS, 'SEASONS')) + + #ts['SVP'] = store['TIMESERIES/Saturated_Vapor_Pressure_Table'].to_numpy() + #ts['SEASONS'] = monthval(siminfo, store['TIMESERIES/SEASONS_Table']) cloudfg = 'CLOUD' in ts diff --git a/HSP2/SOLIDS.py b/HSP2/SOLIDS.py index 494afccd..d2709441 100644 --- a/HSP2/SOLIDS.py +++ b/HSP2/SOLIDS.py @@ -14,7 +14,7 @@ ERRMSG = [] -def solids(store, siminfo, uci, ts): +def solids(io_manager, siminfo, uci, ts): '''Accumulate and remove solids from the impervious land segment''' simlen = siminfo['steps'] diff --git a/HSP2/main.py b/HSP2/main.py index 001c81ce..336a7c94 100644 --- a/HSP2/main.py +++ b/HSP2/main.py @@ -202,9 +202,9 @@ def main(io_manager:IOManager, saveall:bool=False, jupyterlab:bool=True) -> None ############ calls activity function like snow() ############## if operation not in ['COPY','GENER']: if (activity != 'RQUAL'): - errors, errmessages = function(store, siminfo, ui, ts) + errors, errmessages = function(io_manager, siminfo, ui, ts) else: - errors, errmessages = function(store, siminfo, ui, ui_oxrx, ui_nutrx, ui_plank, ui_phcarb, ts) + errors, errmessages = function(io_manager, siminfo, ui, ui_oxrx, ui_nutrx, ui_plank, ui_phcarb, ts) ############################################################### for errorcnt, errormsg in zip(errors, errmessages): @@ -329,7 +329,7 @@ def get_flows(io_manager:SupportsReadTS, ts, flags, uci, segment, ddlinks, ddmas AFname = f'{x.SVOL}{x.SVOLNO}_AFACTR' data = f'{smemn}{smemsb1}{smemsb2}' - data_frame = io_manager.read_timeseries(Category.RESULTS,x.SVOL,x.SVOLNO, sgrpn) + data_frame = io_manager.read_ts(Category.RESULTS,x.SVOL,x.SVOLNO, sgrpn) try: if data in data_frame.columns: t = data_frame[data].astype(float64).to_numpy()[0:steps] else: t = data_frame[smemn].astype(float64).to_numpy()[0:steps] diff --git a/HSP2IO/hdf.py b/HSP2IO/hdf.py index 062bb45c..e5c8f98a 100644 --- a/HSP2IO/hdf.py +++ b/HSP2IO/hdf.py @@ -80,6 +80,9 @@ def read_ts(self, path = '' if category == category.INPUTS: path = f'TIMESERIES/{segment}' + #special cases for constant input timeseries + if operation in ['LAPSE','SEASONS','Saturated_Vapor_Pressure','SATURATED_VAPOR_PRESSURE']: + path = f'TIMESERIES/{operation}_Table' elif category == category.RESULTS: path = f'RESULTS/{operation}_{segment}/{activity}' return read_hdf(self._store, path) From a155af1dffa8e39ab48d3aec4f4fc429cbc1bc5f Mon Sep 17 00:00:00 2001 From: Paul Tomasula <31142705+ptomasula@users.noreply.github.com> Date: Wed, 15 Dec 2021 14:20:08 -0500 Subject: [PATCH 11/19] Remove store from ATEMP and SNOW My previous solution for removing the 'store' (direct IO) reference from SNOW and ATEMP was to add special case code to read_ts that allowed for these timeseries to be read from IO. However, my understanding is in HSPF these blocks are hard coded and not modifiable by the users. Rather than make this a complex IO special case, I just took the same approach and added hard coded versions in utilities. Should handling with these via IO be required, I recommend handing them via the read_uci method, as my original read_ts approach was incorrect. --- HSP2/ATEMP.py | 4 ++-- HSP2/SNOW.py | 9 +++------ HSP2/utilities.py | 14 ++++++++++++++ HSP2IO/hdf.py | 3 --- 4 files changed, 19 insertions(+), 11 deletions(-) diff --git a/HSP2/ATEMP.py b/HSP2/ATEMP.py index 2ea91642..e1abed16 100644 --- a/HSP2/ATEMP.py +++ b/HSP2/ATEMP.py @@ -6,7 +6,7 @@ from numba import njit from numpy import empty, zeros, int64 -from HSP2.utilities import hoursval, make_numba_dict +from HSP2.utilities import hoursval, make_numba_dict, LAPSE from HSP2IO.protocols import SupportsReadTS, Category @@ -16,7 +16,7 @@ def atemp(io_manager:SupportsReadTS, siminfo, uci, ts): ''' high level driver for air temperature module''' - ts['LAPSE'] = hoursval(siminfo, io_manager.read_ts(Category.INPUTS, 'LAPSE'), lapselike=True) + ts['LAPSE'] = hoursval(siminfo, LAPSE, lapselike=True) ui = make_numba_dict(uci) # Note: all values coverted to float automatically ui['k'] = siminfo['delt'] * 0.000833 # convert to in/timestep diff --git a/HSP2/SNOW.py b/HSP2/SNOW.py index 16176c9e..29b3730a 100644 --- a/HSP2/SNOW.py +++ b/HSP2/SNOW.py @@ -16,7 +16,7 @@ from numpy import zeros, ones, full, nan, int64 from math import sqrt, floor from numba import njit -from HSP2.utilities import hourflag, monthval, hoursval, make_numba_dict, initm +from HSP2.utilities import hourflag, monthval, hoursval, make_numba_dict, initm, SEASONS, SVP from HSP2IO.protocols import SupportsReadTS, Category @@ -35,11 +35,8 @@ def snow(io_manager:SupportsReadTS, siminfo, uci, ts): steps = siminfo['steps'] # number of simulation timesteps UUNITS = siminfo['units'] - ts['SVP'] = io_manager.read_ts(Category.INPUTS, 'Saturated_Vapor_Pressure').to_numpy() - ts['SEASONS'] = monthval(siminfo, io_manager.read_ts(Category.INPUTS, 'SEASONS')) - - #ts['SVP'] = store['TIMESERIES/Saturated_Vapor_Pressure_Table'].to_numpy() - #ts['SEASONS'] = monthval(siminfo, store['TIMESERIES/SEASONS_Table']) + ts['SVP'] = SVP + ts['SEASONS'] = monthval(siminfo, SEASONS) cloudfg = 'CLOUD' in ts diff --git a/HSP2/utilities.py b/HSP2/utilities.py index 233955c4..834c2bd0 100644 --- a/HSP2/utilities.py +++ b/HSP2/utilities.py @@ -35,6 +35,20 @@ 'QPREC','QBED', #HTRCH } +# These are hardcoded series in HSPF that are used various modules +# Rather than have them become a IO requirement, carry them over as +# hard coded variables for the time being. +LAPSE = Series([0.0035, 0.0035, 0.0035, 0.0035, 0.0035, 0.0035, 0.0037, + 0.0040, 0.0041, 0.0043, 0.0046, 0.0047, 0.0048, 0.0049, 0.0050, 0.0050, + 0.0048, 0.0046, 0.0044, 0.0042, 0.0040, 0.0038, 0.0037, 0.0036]) + +SEASONS = Series([0,0,0,1,1,1,1,1,1,0,0,0]).astype(bool) + +SVP = Series([1.005, 1.005, 1.005, 1.005, 1.005, 1.005, 1.005, 1.005, 1.005, + 1.005, 1.01, 1.01, 1.015, 1.02, 1.03, 1.04, 1.06, 1.08, 1.1, 1.29, 1.66, + 2.13, 2.74,3.49, 4.40, 5.55,6.87, 8.36, 10.1,12.2,14.6, 17.5, 20.9, 24.8, + 29.3, 34.6, 40.7, 47.7, 55.7, 64.9]).to_numpy() + def make_numba_dict(uci): ''' diff --git a/HSP2IO/hdf.py b/HSP2IO/hdf.py index e5c8f98a..062bb45c 100644 --- a/HSP2IO/hdf.py +++ b/HSP2IO/hdf.py @@ -80,9 +80,6 @@ def read_ts(self, path = '' if category == category.INPUTS: path = f'TIMESERIES/{segment}' - #special cases for constant input timeseries - if operation in ['LAPSE','SEASONS','Saturated_Vapor_Pressure','SATURATED_VAPOR_PRESSURE']: - path = f'TIMESERIES/{operation}_Table' elif category == category.RESULTS: path = f'RESULTS/{operation}_{segment}/{activity}' return read_hdf(self._store, path) From 31903b38b8a01026918eb2d0dcf9a524feab443b Mon Sep 17 00:00:00 2001 From: Paul Tomasula <31142705+ptomasula@users.noreply.github.com> Date: Fri, 17 Dec 2021 10:53:22 -0500 Subject: [PATCH 12/19] UCI class Implement a class to collect UCI parameters --- HSP2/main.py | 14 ++++++++++---- HSP2/uci.py | 16 ++++++++++++++++ HSP2IO/hdf.py | 40 +++++++++++++++++----------------------- HSP2IO/io.py | 9 ++++++--- HSP2IO/protocols.py | 10 ++++------ 5 files changed, 53 insertions(+), 36 deletions(-) create mode 100644 HSP2/uci.py diff --git a/HSP2/main.py b/HSP2/main.py index 336a7c94..68f8fc1d 100644 --- a/HSP2/main.py +++ b/HSP2/main.py @@ -46,10 +46,16 @@ def main(io_manager:IOManager, saveall:bool=False, jupyterlab:bool=True) -> None msg = messages() msg(1, f'Processing started for file {hdfname}; saveall={saveall}') - # read user control, parameters, states, and flags from HDF5 file - uci_parameters = io_manager.read_uci() - opseq, ddlinks, ddmasslinks, ddext_sources, ddgener, uci, siminfo = uci_parameters - + # read user control, parameters, states, and flags uci and map to local variables + uci_obj = io_manager.read_uci() + opseq = uci_obj.opseq + ddlinks = uci_obj.ddlinks + ddmasslinks = uci_obj.ddmasslinks + ddext_sources = uci_obj.ddext_sources + ddgener = uci_obj.ddgener + uci = uci_obj.uci + siminfo = uci_obj.siminfo + start, stop = siminfo['start'], siminfo['stop'] copy_instances = {} diff --git a/HSP2/uci.py b/HSP2/uci.py new file mode 100644 index 00000000..be8b0aa2 --- /dev/null +++ b/HSP2/uci.py @@ -0,0 +1,16 @@ +from collections import defaultdict + +class UCI(): + + def __init__(self) -> None: + self.uci = defaultdict(dict) + self.ddlinks = defaultdict(list) + self.ddmasslinks = defaultdict(list) + self.ddext_sources = defaultdict(list) + self.ddgener = defaultdict(dict) + self.siminfo = {} + self.opseq = defaultdict(dict) + + + + diff --git a/HSP2IO/hdf.py b/HSP2IO/hdf.py index 062bb45c..45ca67ea 100644 --- a/HSP2IO/hdf.py +++ b/HSP2IO/hdf.py @@ -1,10 +1,10 @@ import pandas as pd from pandas.io.pytables import read_hdf -from HSP2IO.protocols import UCITuple, Category +from HSP2IO.protocols import Category from collections import defaultdict from typing import Union, Any - +from HSP2.uci import UCI class HDF5(): @@ -16,7 +16,7 @@ def __init__(self, file_path:str) -> None: def __del__(self): self._store.close() - def read_uci(self) -> UCITuple: + def read_uci(self) -> UCI: """Read UCI related tables Parameters: None @@ -24,44 +24,38 @@ def read_uci(self) -> UCITuple: Returns: UCITuple """ - uci = defaultdict(dict) - ddlinks = defaultdict(list) - ddmasslinks = defaultdict(list) - ddext_sources = defaultdict(list) - ddgener =defaultdict(dict) - siminfo = {} - opseq = 0 + uci = UCI() - for path in self._store.keys(): # finds ALL data sets into HDF5 file + for path in self._store.keys(): # finds ALL data sets into HDF6 file op, module, *other = path[1:].split(sep='/', maxsplit=3) s = '_'.join(other) if op == 'CONTROL': if module =='GLOBAL': temp = self._store[path].to_dict()['Info'] - siminfo['start'] = pd.Timestamp(temp['Start']) - siminfo['stop'] = pd.Timestamp(temp['Stop']) - siminfo['units'] = 1 + uci.siminfo['start'] = pd.Timestamp(temp['Start']) + uci.siminfo['stop'] = pd.Timestamp(temp['Stop']) + uci.siminfo['units'] = 1 if 'Units' in temp: if int(temp['Units']): - siminfo['units'] = int(temp['Units']) + uci.siminfo['units'] = int(temp['Units']) elif module == 'LINKS': for row in self._store[path].fillna('').itertuples(): if row.TVOLNO != '': - ddlinks[f'{row.TVOLNO}'].append(row) + uci.ddlinks[f'{row.TVOLNO}'].append(row) else: - ddlinks[f'{row.TOPFST}'].append(row) + uci.ddlinks[f'{row.TOPFST}'].append(row) elif module == 'MASS_LINKS': for row in self._store[path].replace('na','').itertuples(): - ddmasslinks[row.MLNO].append(row) + uci.ddmasslinks[row.MLNO].append(row) elif module == 'EXT_SOURCES': for row in self._store[path].replace('na','').itertuples(): - ddext_sources[(row.TVOL, row.TVOLNO)].append(row) + uci.ddext_sources[(row.TVOL, row.TVOLNO)].append(row) elif module == 'OP_SEQUENCE': - opseq = self._store[path] + uci.opseq = self._store[path] elif op in {'PERLND', 'IMPLND', 'RCHRES'}: for id, vdict in self._store[path].to_dict('index').items(): - uci[(op, module, id)][s] = vdict + uci.uci[(op, module, id)][s] = vdict elif op == 'GENER': for row in self._store[path].itertuples(): if len(row.OPNID.split()) == 1: @@ -69,8 +63,8 @@ def read_uci(self) -> UCITuple: stop = start else: start, stop = row.OPNID.split() - for i in range(int(start), int(stop)+1): ddgener[module][f'G{i:03d}'] = row[2] - return (opseq, ddlinks, ddmasslinks, ddext_sources, ddgener, uci, siminfo) + for i in range(int(start), int(stop)+1): uci.ddgener[module][f'G{i:03d}'] = row[2] + return uci def read_ts(self, category:Category, diff --git a/HSP2IO/io.py b/HSP2IO/io.py index c544b284..c892bedc 100644 --- a/HSP2IO/io.py +++ b/HSP2IO/io.py @@ -1,8 +1,10 @@ import pandas as pd from pandas.core.frame import DataFrame -from HSP2IO.protocols import UCITuple, Category, SupportsReadUCI, SupportsReadTS, SupportsWriteTS +from HSP2IO.protocols import Category, SupportsReadUCI, SupportsReadTS, SupportsWriteTS, SupportsWriteLog from typing import Union +from HSP2.uci import UCI + class IOManager: """Management class for IO operations needed to execute the HSP2 model""" @@ -10,7 +12,8 @@ def __init__(self, io_all: Union[SupportsReadUCI, SupportsReadTS, SupportsWriteTS, None] = None, io_uci: Union[SupportsReadUCI,None]=None, io_input: Union[SupportsReadTS,None]=None, - io_output: Union[SupportsReadTS,SupportsWriteTS,None]=None) -> None: + io_output: Union[SupportsReadTS,SupportsWriteTS,None]=None, + io_log: Union[SupportsWriteLog,None]=None,) -> None: """ io_all: SupportsReadUCI, SupportsReadTS, SupportsWriteTS/None This parameter is intended to allow users with a single file that combined UCI, Input and Output a short cut to specify a single argument. @@ -38,7 +41,7 @@ def __def__(self): del(self._output) del(self._uci) - def read_uci(self, *args, **kwargs) -> UCITuple: + def read_uci(self, *args, **kwargs) -> UCI: return self._uci.read_uci() def write_ts(self, diff --git a/HSP2IO/protocols.py b/HSP2IO/protocols.py index 83d6a375..f216d9c7 100644 --- a/HSP2IO/protocols.py +++ b/HSP2IO/protocols.py @@ -3,10 +3,8 @@ import pandas as pd import numpy as np from enum import Enum +from HSP2.uci import UCI - -UCITuple = [defaultdict(dict), defaultdict(list), defaultdict(list), - defaultdict(list), defaultdict(dict), dict, int] TimeSeriesDict = Dict[str,np.float64] class Category(Enum): @@ -15,12 +13,12 @@ class Category(Enum): @runtime_checkable class SupportsReadUCI(Protocol): - def read_uci(self) -> UCITuple: + def read_uci(self) -> UCI: ... @runtime_checkable -class SupportsWritUCI(Protocol): - def write_uci(self, UCITuple) -> None: +class SupportsWriteUCI(Protocol): + def write_uci(self, UCI) -> None: ... @runtime_checkable From 5feb80d041b7b81daf7e805ba42e6940a57784e2 Mon Sep 17 00:00:00 2001 From: Paul Tomasula <31142705+ptomasula@users.noreply.github.com> Date: Fri, 17 Dec 2021 14:23:22 -0500 Subject: [PATCH 13/19] Read FTABLES as part of UCI protocol --- HSP2/HYDR.py | 8 +++----- HSP2/main.py | 5 ++++- HSP2/uci.py | 4 +++- HSP2IO/hdf.py | 2 ++ 4 files changed, 12 insertions(+), 7 deletions(-) diff --git a/HSP2/HYDR.py b/HSP2/HYDR.py index 68c1c20f..715a8aff 100644 --- a/HSP2/HYDR.py +++ b/HSP2/HYDR.py @@ -30,7 +30,7 @@ MAXLOOPS = 100 # newton method exit tolerance -def hydr(io_manager, siminfo, uci, ts): +def hydr(io_manager, siminfo, uci, ts, ftables): ''' find the state of the reach/reservoir at the end of the time interval and the outflows during the interval @@ -40,9 +40,6 @@ def hydr(io_manager, siminfo, uci, ts): ui is a dictionary with RID specific HSPF UCI like data ts is a dictionary with RID specific timeseries''' -#PRT - temp work around while implementing IO abstraction - store = io_manager._input._store - steps = siminfo['steps'] # number of simulation points uunits = siminfo['units'] nexits = int(uci['PARAMETERS']['NEXITS']) @@ -98,7 +95,8 @@ def hydr(io_manager, siminfo, uci, ts): ts['CONVF'] = initm(siminfo, uci, 'VCONFG', 'MONTHLY_CONVF', 1.0) # extract key columns of specified FTable for faster access (1d vs. 2d) - rchtab = store[f"FTABLES/{uci['PARAMETERS']['FTBUCI']}"] + rchtab = ftables[f"{uci['PARAMETERS']['FTBUCI']}"] + #rchtab = store[f"FTABLES/{uci['PARAMETERS']['FTBUCI']}"] ts['volumeFT'] = rchtab['Volume'].to_numpy() * VFACT ts['depthFT'] = rchtab['Depth'].to_numpy() ts['sareaFT'] = rchtab['Area'].to_numpy() * AFACT diff --git a/HSP2/main.py b/HSP2/main.py index 68f8fc1d..13d45fad 100644 --- a/HSP2/main.py +++ b/HSP2/main.py @@ -55,6 +55,7 @@ def main(io_manager:IOManager, saveall:bool=False, jupyterlab:bool=True) -> None ddgener = uci_obj.ddgener uci = uci_obj.uci siminfo = uci_obj.siminfo + ftables = uci_obj.ftables start, stop = siminfo['start'], siminfo['stop'] @@ -207,7 +208,9 @@ def main(io_manager:IOManager, saveall:bool=False, jupyterlab:bool=True) -> None ############ calls activity function like snow() ############## if operation not in ['COPY','GENER']: - if (activity != 'RQUAL'): + if (activity == 'HYDR'): + errors, errmessages = function(io_manager, siminfo, ui, ts, ftables) + elif (activity != 'RQUAL'): errors, errmessages = function(io_manager, siminfo, ui, ts) else: errors, errmessages = function(io_manager, siminfo, ui, ui_oxrx, ui_nutrx, ui_plank, ui_phcarb, ts) diff --git a/HSP2/uci.py b/HSP2/uci.py index be8b0aa2..8b1083dd 100644 --- a/HSP2/uci.py +++ b/HSP2/uci.py @@ -1,4 +1,5 @@ from collections import defaultdict +from pandas import DataFrame class UCI(): @@ -9,7 +10,8 @@ def __init__(self) -> None: self.ddext_sources = defaultdict(list) self.ddgener = defaultdict(dict) self.siminfo = {} - self.opseq = defaultdict(dict) + self.opseq = DataFrame() + self.ftables = {} diff --git a/HSP2IO/hdf.py b/HSP2IO/hdf.py index 45ca67ea..0a1be393 100644 --- a/HSP2IO/hdf.py +++ b/HSP2IO/hdf.py @@ -64,6 +64,8 @@ def read_uci(self) -> UCI: else: start, stop = row.OPNID.split() for i in range(int(start), int(stop)+1): uci.ddgener[module][f'G{i:03d}'] = row[2] + elif op == 'FTABLES': + uci.ftables[module] = self._store[path] return uci def read_ts(self, From d7ff358f1b0ceb7130cd127ba7cede85198b0e45 Mon Sep 17 00:00:00 2001 From: Paul Tomasula <31142705+ptomasula@users.noreply.github.com> Date: Fri, 17 Dec 2021 14:45:20 -0500 Subject: [PATCH 14/19] IO MONTHDATA from UCI protocol Need to remove IO elements from RQUAL module. --- HSP2/RQUAL.py | 13 +++++-------- HSP2/main.py | 3 ++- HSP2/uci.py | 2 +- HSP2/utilities.py | 6 +++--- HSP2IO/hdf.py | 3 +++ 5 files changed, 14 insertions(+), 13 deletions(-) diff --git a/HSP2/RQUAL.py b/HSP2/RQUAL.py index e96e18ea..42a572c9 100644 --- a/HSP2/RQUAL.py +++ b/HSP2/RQUAL.py @@ -24,12 +24,9 @@ ERRMSGS_phcarb = ('PHCARB: Error -- Invalid CONS index specified for ALKCON (i.e., ALKCON > NCONS).', 'PHCARB: Error -- A satisfactory solution for pH has not been reached.') -def rqual(io_manager, siminfo, uci, uci_oxrx, uci_nutrx, uci_plank, uci_phcarb, ts): +def rqual(io_manager, siminfo, uci, uci_oxrx, uci_nutrx, uci_plank, uci_phcarb, ts, monthdata): ''' Simulate constituents involved in biochemical transformations''' - #PRT - temp work around while implementing IO abstraction - store = io_manager._input._store - # simulation information: delt60 = siminfo['delt'] / 60 # delt60 - simulation time interval in hours simlen = siminfo['steps'] @@ -112,7 +109,7 @@ def rqual(io_manager, siminfo, uci, uci_oxrx, uci_nutrx, uci_plank, uci_phcarb, NUADFX = zeros(simlen) if nuadfg_dd > 0: - NUADFX = initmd(siminfo, store, 'MONTHDATA/MONTHDATA' + str(nuadfg_dd), 0.0) + NUADFX = initmd(siminfo, monthdata, 'MONTHDATA/MONTHDATA' + str(nuadfg_dd), 0.0) elif nuadfg_dd == -1: if 'NUADFX' + str(j) in ts: NUADFX = ts['NUADFX' + str(j)] @@ -127,7 +124,7 @@ def rqual(io_manager, siminfo, uci, uci_oxrx, uci_nutrx, uci_plank, uci_phcarb, NUADCN = zeros(simlen) if nuadfg_wd > 0: - NUADCN = initmd(siminfo, store, 'MONTHDATA/MONTHDATA' + str(nuadfg_wd), 0.0) + NUADCN = initmd(siminfo, monthdata, 'MONTHDATA/MONTHDATA' + str(nuadfg_wd), 0.0) elif nuadfg_wd == -1: if 'NUADCN' + str(j) in ts: NUADCN = ts['NUADCN' + str(j)] @@ -163,7 +160,7 @@ def rqual(io_manager, siminfo, uci, uci_oxrx, uci_nutrx, uci_plank, uci_phcarb, pladfg_dd = int(ui_plank['PLADFG' + str(j)]) if pladfg_dd > 0: - PLADFX = initmd(siminfo, store, 'MONTHDATA/MONTHDATA' + str(pladfg_dd), 0.0) + PLADFX = initmd(siminfo, monthdata, 'MONTHDATA/MONTHDATA' + str(pladfg_dd), 0.0) elif pladfg_dd == -1: if 'PLADFX' + str(j) in ts: PLADFX = ts['PLADFX' + str(j)] @@ -178,7 +175,7 @@ def rqual(io_manager, siminfo, uci, uci_oxrx, uci_nutrx, uci_plank, uci_phcarb, pladfg_wd = int(ui_plank['PLADFG' + str(n+1)]) if pladfg_wd > 0: - PLADCN = initmd(siminfo, store, 'MONTHDATA/MONTHDATA' + str(pladfg_wd), 0.0) + PLADCN = initmd(siminfo, monthdata, 'MONTHDATA/MONTHDATA' + str(pladfg_wd), 0.0) elif pladfg_wd == -1: if 'PLADCN' + str(j) in ts: PLADCN = ts['PLADCN' + str(j)] diff --git a/HSP2/main.py b/HSP2/main.py index 13d45fad..c845c59b 100644 --- a/HSP2/main.py +++ b/HSP2/main.py @@ -56,6 +56,7 @@ def main(io_manager:IOManager, saveall:bool=False, jupyterlab:bool=True) -> None uci = uci_obj.uci siminfo = uci_obj.siminfo ftables = uci_obj.ftables + monthdata = uci_obj.monthdata start, stop = siminfo['start'], siminfo['stop'] @@ -213,7 +214,7 @@ def main(io_manager:IOManager, saveall:bool=False, jupyterlab:bool=True) -> None elif (activity != 'RQUAL'): errors, errmessages = function(io_manager, siminfo, ui, ts) else: - errors, errmessages = function(io_manager, siminfo, ui, ui_oxrx, ui_nutrx, ui_plank, ui_phcarb, ts) + errors, errmessages = function(io_manager, siminfo, ui, ui_oxrx, ui_nutrx, ui_plank, ui_phcarb, ts, monthdata) ############################################################### for errorcnt, errormsg in zip(errors, errmessages): diff --git a/HSP2/uci.py b/HSP2/uci.py index 8b1083dd..510a2a5b 100644 --- a/HSP2/uci.py +++ b/HSP2/uci.py @@ -12,7 +12,7 @@ def __init__(self) -> None: self.siminfo = {} self.opseq = DataFrame() self.ftables = {} - + self.monthdata = None diff --git a/HSP2/utilities.py b/HSP2/utilities.py index 834c2bd0..d3995c35 100644 --- a/HSP2/utilities.py +++ b/HSP2/utilities.py @@ -206,10 +206,10 @@ def initm(siminfo, ui, flag, monthly, default): return full(siminfo['steps'], default) -def initmd(siminfo, store, monthly, default): +def initmd(siminfo, monthdata, monthly, default): ''' initialize timeseries from HSPF month data table''' - if monthly in store: - month = store[monthly].values[0] + if monthly in monthdata: + month = monthdata[monthly].values[0] return dayval(siminfo, list(month)) else: return full(siminfo['steps'], default) diff --git a/HSP2IO/hdf.py b/HSP2IO/hdf.py index 0a1be393..6b17842f 100644 --- a/HSP2IO/hdf.py +++ b/HSP2IO/hdf.py @@ -66,6 +66,9 @@ def read_uci(self) -> UCI: for i in range(int(start), int(stop)+1): uci.ddgener[module][f'G{i:03d}'] = row[2] elif op == 'FTABLES': uci.ftables[module] = self._store[path] + elif op == 'MONTHDATA': + if not uci.monthdata: uci.monthdata = {} + uci.monthdata[f'{op}/{module}'] = self._store[path] return uci def read_ts(self, From 3ec183257d8cf6ad80952115feea800b0cd61c9a Mon Sep 17 00:00:00 2001 From: Paul Tomasula <31142705+ptomasula@users.noreply.github.com> Date: Fri, 17 Dec 2021 15:08:35 -0500 Subject: [PATCH 15/19] Add IO protocols for logging --- HSP2/main.py | 4 ++-- HSP2IO/hdf.py | 7 +++++++ HSP2IO/io.py | 16 ++++++++++++++-- HSP2IO/protocols.py | 8 +++++++- 4 files changed, 30 insertions(+), 5 deletions(-) diff --git a/HSP2/main.py b/HSP2/main.py index c845c59b..3fbc1eb3 100644 --- a/HSP2/main.py +++ b/HSP2/main.py @@ -232,11 +232,11 @@ def main(io_manager:IOManager, saveall:bool=False, jupyterlab:bool=True) -> None msglist = msg(1, 'Done', final=True) df = DataFrame(msglist, columns=['logfile']) - df.to_hdf(store, 'RUN_INFO/LOGFILE', data_columns=True, format='t') + io_manager.write_log(df) if jupyterlab: df = versions(['jupyterlab', 'notebook']) - df.to_hdf(store, 'RUN_INFO/VERSIONS', data_columns=True, format='t') + io_manager.write_versioning(df) print('\n\n', df) return diff --git a/HSP2IO/hdf.py b/HSP2IO/hdf.py index 6b17842f..3863047e 100644 --- a/HSP2IO/hdf.py +++ b/HSP2IO/hdf.py @@ -102,3 +102,10 @@ def write_ts(self, data_frame.to_hdf(self._store, path, format='t', data_columns=True, complevel=complevel) #data_frame.to_hdf(self._store, path) + def write_log(self, hsp2_log:pd.DataFrame) -> None: + hsp2_log.to_hdf(self._store, 'RUN_INFO/LOGFILE', data_columns=True, format='t') + + def write_versioning(self, versioning:pd.DataFrame) -> None: + versioning.to_hdf(self._store, 'RUN_INFO/VERSIONS', data_columns=True, format='t') + + diff --git a/HSP2IO/io.py b/HSP2IO/io.py index c892bedc..3097e90c 100644 --- a/HSP2IO/io.py +++ b/HSP2IO/io.py @@ -1,6 +1,6 @@ import pandas as pd from pandas.core.frame import DataFrame -from HSP2IO.protocols import Category, SupportsReadUCI, SupportsReadTS, SupportsWriteTS, SupportsWriteLog +from HSP2IO.protocols import Category, SupportsReadUCI, SupportsReadTS, SupportsWriteTS, SupportsWriteLogging from typing import Union from HSP2.uci import UCI @@ -13,7 +13,7 @@ def __init__(self, io_uci: Union[SupportsReadUCI,None]=None, io_input: Union[SupportsReadTS,None]=None, io_output: Union[SupportsReadTS,SupportsWriteTS,None]=None, - io_log: Union[SupportsWriteLog,None]=None,) -> None: + io_log: Union[SupportsWriteLogging,None]=None,) -> None: """ io_all: SupportsReadUCI, SupportsReadTS, SupportsWriteTS/None This parameter is intended to allow users with a single file that combined UCI, Input and Output a short cut to specify a single argument. @@ -28,11 +28,16 @@ def __init__(self, A class implementing SupportReadUCI protocol, io_all used in place of this parameter if not specified. This parameter is where the output timeseries will be written to and read from. + io_log: SupportsWriteLogging/None (Default None) + A class implementing SupportWriteLogging protocol, io_all used in place + of this parameter if not specified. This parameter is where logging and + versioning information will be output. """ self._input = io_all if io_input is None else io_input self._output = io_all if io_output is None else io_output self._uci = io_all if io_uci is None else io_uci + self._log = io_all if io_log is None else io_log self._in_memory = {} @@ -40,6 +45,7 @@ def __def__(self): del(self._input) del(self._output) del(self._uci) + del(self._log) def read_uci(self, *args, **kwargs) -> UCI: return self._uci.read_uci() @@ -73,6 +79,12 @@ def read_ts(self, return self._output.read_ts(category, operation, segment, activity) return pd.DataFrame + def write_log(self, data_frame)-> None: + if self._log: self._log.write_log(data_frame) + + def write_versioning(self, data_frame)-> None: + if self._log: self._log.write_versioning(data_frame) + def _get_in_memory(self, category:Category, operation:Union[str,None]=None, diff --git a/HSP2IO/protocols.py b/HSP2IO/protocols.py index f216d9c7..0852a445 100644 --- a/HSP2IO/protocols.py +++ b/HSP2IO/protocols.py @@ -41,4 +41,10 @@ def write_ts(self, activity:Union[str,None]=None) -> None: ... -### Potentially need to add get_flows method as well \ No newline at end of file +class SupportsWriteLogging(Protocol): + + def write_log(self, hsp2_log:pd.DataFrame) -> None: + ... + + def write_versioning(self, versions:pd.DataFrame) -> None: + ... \ No newline at end of file From 156a4423ded15eca86f208bc389b75612ad72023 Mon Sep 17 00:00:00 2001 From: Paul Tomasula <31142705+ptomasula@users.noreply.github.com> Date: Fri, 17 Dec 2021 15:13:16 -0500 Subject: [PATCH 16/19] Remove unused package references --- HSP2/main.py | 8 +------- 1 file changed, 1 insertion(+), 7 deletions(-) diff --git a/HSP2/main.py b/HSP2/main.py index 3fbc1eb3..69f0a425 100644 --- a/HSP2/main.py +++ b/HSP2/main.py @@ -5,21 +5,15 @@ from re import S from numpy import float64, float32 -from pandas import HDFStore, Timestamp, read_hdf, DataFrame, date_range +from pandas import DataFrame, date_range from pandas.tseries.offsets import Minute -from numba import types -from numba.typed import Dict -from collections import defaultdict from datetime import datetime as dt -from copy import deepcopy import os from HSP2.utilities import versions, get_timeseries, expand_timeseries_names, save_timeseries, get_gener_timeseries from HSP2.configuration import activities, noop, expand_masslinks from HSP2IO.io import IOManager, SupportsReadTS, Category -from typing import List, Union - def main(io_manager:IOManager, saveall:bool=False, jupyterlab:bool=True) -> None: """Runs main HSP2 program. From 8b212e7ac7f9f03f317748c884f27bb3a3b5e9bd Mon Sep 17 00:00:00 2001 From: Paul Tomasula <31142705+ptomasula@users.noreply.github.com> Date: Mon, 20 Dec 2021 12:06:44 -0500 Subject: [PATCH 17/19] Clean-up docstring for IOManager --- HSP2IO/io.py | 59 ++++++++++++++++++++++++++++------------------------ 1 file changed, 32 insertions(+), 27 deletions(-) diff --git a/HSP2IO/io.py b/HSP2IO/io.py index 3097e90c..6a4ca95e 100644 --- a/HSP2IO/io.py +++ b/HSP2IO/io.py @@ -9,35 +9,40 @@ class IOManager: """Management class for IO operations needed to execute the HSP2 model""" def __init__(self, - io_all: Union[SupportsReadUCI, SupportsReadTS, SupportsWriteTS, None] = None, - io_uci: Union[SupportsReadUCI,None]=None, - io_input: Union[SupportsReadTS,None]=None, - io_output: Union[SupportsReadTS,SupportsWriteTS,None]=None, - io_log: Union[SupportsWriteLogging,None]=None,) -> None: - """ io_all: SupportsReadUCI, SupportsReadTS, SupportsWriteTS/None - This parameter is intended to allow users with a single file that - combined UCI, Input and Output a short cut to specify a single argument. - io_uci: SupportsReadUCI/None (Default None) - A class implementing SupportReadUCI protocol, io_all used in place of - this parameter if not specified. - io_input: SupportsReadUCI/None (Default None) - A class implementing SupportReadTS protocol, io_all used in place of - this parameter if not specified. This parameter is where the input - timeseries will be read from. - io_output: SupportsReadUCI/None (Default None) - A class implementing SupportReadUCI protocol, io_all used in place of - this parameter if not specified. This parameter is where the output - timeseries will be written to and read from. - io_log: SupportsWriteLogging/None (Default None) - A class implementing SupportWriteLogging protocol, io_all used in place - of this parameter if not specified. This parameter is where logging and - versioning information will be output. + io_combined: Union[SupportsReadUCI, SupportsReadTS, SupportsWriteTS, None] = None, + uci: Union[SupportsReadUCI,None]=None, + input: Union[SupportsReadTS,None]=None, + output: Union[SupportsReadTS,SupportsWriteTS,None]=None, + log: Union[SupportsWriteLogging,None]=None,) -> None: + """ io_combined: SupportsReadUCI & SupportsReadTS & SupportsWriteTS & SupportsWriteLogging / None + Intended to allow users with a object that combines protocols for + UCI, Input, Output and Log a shortcut where only a + single argument needs to be provided. If UCI, Input, Output and/or + Log are not specified this argument will be used as the default. + uci: SupportsReadUCI/None (Default None) + A class instance implementing the SupportReadUCI protocol. + This class acts as the data source for UCI information. + The argument io_combined be used in place by default if this argument is not specified. + input: SupportsReadUCI/None (Default None) + A class instance implementing SupportReadTS protocol. + This class acts as the data source for any input timeseries. + The argument io_combined be used in place by default if this argument is not specified. + output: SupportsWriteTS & SupportsReadTS / None (Default None) + A class implementing SupportsWriteTS & SupportReadTS protocol + This class acts as the location for outputing result timeseries as + well as the data source should those result timeseries be needed for + inputs into a model modules. + The argument io_combined be used in place by default if this argument is not specified. + log: SupportsWriteLogging/None (Default None) + A class implementing SupportWriteLogging protocol. This class + This class acts as the location to output logging information. + The argument io_combined be used in place by default if this argument is not specified. """ - self._input = io_all if io_input is None else io_input - self._output = io_all if io_output is None else io_output - self._uci = io_all if io_uci is None else io_uci - self._log = io_all if io_log is None else io_log + self._input = io_combined if input is None else input + self._output = io_combined if output is None else output + self._uci = io_combined if uci is None else uci + self._log = io_combined if log is None else log self._in_memory = {} From 43b98d3343eaf36017da0be2eb607d8d610a21b7 Mon Sep 17 00:00:00 2001 From: Paul Tomasula <31142705+ptomasula@users.noreply.github.com> Date: Mon, 20 Dec 2021 12:12:41 -0500 Subject: [PATCH 18/19] Fix broken __del__ method for IOManager --- HSP2IO/io.py | 2 +- HSP2IO/protocols.py | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/HSP2IO/io.py b/HSP2IO/io.py index 6a4ca95e..28cf43c4 100644 --- a/HSP2IO/io.py +++ b/HSP2IO/io.py @@ -46,7 +46,7 @@ def __init__(self, self._in_memory = {} - def __def__(self): + def __del__(self): del(self._input) del(self._output) del(self._uci) diff --git a/HSP2IO/protocols.py b/HSP2IO/protocols.py index 0852a445..6bf9dd05 100644 --- a/HSP2IO/protocols.py +++ b/HSP2IO/protocols.py @@ -41,6 +41,7 @@ def write_ts(self, activity:Union[str,None]=None) -> None: ... +@runtime_checkable class SupportsWriteLogging(Protocol): def write_log(self, hsp2_log:pd.DataFrame) -> None: From a4c67063861dfd424f33d90fdc4f0d68e1d976db Mon Sep 17 00:00:00 2001 From: Paul Tomasula <31142705+ptomasula@users.noreply.github.com> Date: Mon, 20 Dec 2021 16:24:25 -0500 Subject: [PATCH 19/19] Remove development work around from main This was a development workable to allow me to execute the model while slowly abstracting pieces of the IO. Now that it has fully been abstracted this workaround should be removed to the merge back into the develop branch. --- HSP2/main.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/HSP2/main.py b/HSP2/main.py index 69f0a425..c7816e45 100644 --- a/HSP2/main.py +++ b/HSP2/main.py @@ -30,9 +30,6 @@ def main(io_manager:IOManager, saveall:bool=False, jupyterlab:bool=True) -> None """ - #PRT - this is a bandaid to run the model while I implement IO Abstraction. - #Eventually all references to store will be removed - store = io_manager._output._store hdfname = './' if not os.path.exists(hdfname): raise FileNotFoundError(f'{hdfname} HDF5 File Not Found')