From ba440ba3f7f22110fbda4181839e1eba3dd06595 Mon Sep 17 00:00:00 2001 From: Ailbhe Date: Thu, 17 Oct 2024 14:24:54 +0200 Subject: [PATCH] wip: pimixin merge separated seed import with timeseries import --- src/rtctools/data/storage.py | 2 + src/rtctools/optimization/pi_mixin.py | 173 +++++++++++++++++++++++++- 2 files changed, 173 insertions(+), 2 deletions(-) diff --git a/src/rtctools/data/storage.py b/src/rtctools/data/storage.py index 3d19cff1..d5892b2d 100644 --- a/src/rtctools/data/storage.py +++ b/src/rtctools/data/storage.py @@ -26,6 +26,8 @@ class DataStoreAccessor(metaclass=ABCMeta): timeseries_import_basename = "timeseries_import" #: Export file basename timeseries_export_basename = "timeseries_export" + #: Imported seed file basename + imported_seed_basename = "imported_seed" def __init__(self, **kwargs): # Save arguments diff --git a/src/rtctools/optimization/pi_mixin.py b/src/rtctools/optimization/pi_mixin.py index f0af57f2..6d93e243 100644 --- a/src/rtctools/optimization/pi_mixin.py +++ b/src/rtctools/optimization/pi_mixin.py @@ -1,5 +1,9 @@ import logging +import math from datetime import timedelta +from typing import Dict, Union + +import numpy as np import rtctools.data.pi as pi import rtctools.data.rtc as rtc @@ -55,6 +59,46 @@ def __init__(self, **kwargs): # life time of this object. self.__data_config = rtc.DataConfig(self._input_folder) + def imported_seed_options(self) -> Dict[str, Union[str, float]]: + """ + Returns a dictionary of options controlling the seeding process. + +------------------------------------------+------------+---------------+ + | Option | Type | Default value | + +==========================================+============+===============+ + | ``import_seed`` | ``Bool`` | ``False`` | + +------------------------------------------+------------+---------------+ + | ``extend_seed_forwards`` | ``Bool`` | ``True`` | + +------------------------------------------+------------+---------------+ + | ``extend_seed_backwards`` | ``Bool`` | ``False`` | + +------------------------------------------+------------+---------------+ + | ``seed_variables_in_timeseries_import`` | ``Bool`` | ``False`` | + +------------------------------------------+------------+---------------+ + The seeding process is controlled by the seeding_options. If ``import_seed`` + is true then, The imported seed will be merged with the timeseries_import and used for + seeding for the first priority. By default these files should be given the name + "imported_seed". This can be changed by overwiting "imported_seed_basename". + The seed could for example be a copy of the timeseries_export.xml from a previous run. + + If the imported seed ends before the current forecast horizon and ``extend_seed_forwards`` + is True then the final value in the imported seed will be extrapolated to the end of the + time horizon. + + If the imported seed starts after the current forecast horizon and ``extend_seed_backwards`` + is True then the initial value in the imported seed will be extrapolated to fill that gap + at the beginning of the timehorizon. + + Note that extending a seed backwards or seeding variables included in the timeseries_import + may lead to undesirable effects if a controlled input is seeded. + + :returns: A dictionary of options for importing an isolated seed. + """ + + return { + "import_seed": False, + "extend_seed_forwards": False, + "extend_seed_backwards": False, + } + def read(self): # Call parent class first for default behaviour. super().read() @@ -130,9 +174,134 @@ def read(self): # Offer input timeseries to IOMixin self.io.reference_datetime = self.__timeseries_import.forecast_datetime + # If an imported seed has been provided merge it with the timeseries_import + # TODO if seed is missing or wrong then use default seed instead of raising exceptions + # TODO be careful for adding seeds for variables with fixed=false! + imported_seed_options = self.imported_seed_options() + if imported_seed_options["import_seed"]: + try: + self.__imported_seed_timeseries = pi.Timeseries( + self.__data_config, + self._input_folder, + self.imported_seed_basename, + binary=self.pi_binary_timeseries, + pi_validate_times=self.pi_validate_timeseries, + ) + except IOError: + raise Exception( + "PIMixin: {}.xml not found in {}.".format( + self.imported_seed_basename, self._input_folder + ) + ) + + imported_seed_times = self.__imported_seed_timeseries.times + + # Timestamp check + if self.pi_validate_timeseries: + for i in range(len(imported_seed_times) - 1): + if imported_seed_times[i] >= imported_seed_times[i + 1]: + raise Exception("PIMixin: Time stamps must be strictly increasing.") + + # Check if the timeseries are truly equidistant + if self.pi_validate_timeseries: + dt = imported_seed_times[1] - imported_seed_times[0] + for i in range(len(imported_seed_times) - 1): + if imported_seed_times[i + 1] - imported_seed_times[i] != dt: + raise Exception( + "PIMixin: Expecting equidistant timeseries, the time step " + "towards {} is not the same as the time step(s) before. Seeding using " + "an imported result is only supported for equidistant timesteps".format( + imported_seed_times[i + 1] + ) + ) + # Check if timestep is same as timeseries_import + if dt != self.__timeseries_import.dt: + raise Exception( + "PIMixin: The timesteps in timeseries_import {} differ from the " + "timesteps in the imported previous result {}. This is not " + "supported".format(self.__timeseries_import.dt, dt) + ) + + imported_seed_times_t0 = imported_seed_times[0] + t0_difference = self.io.reference_datetime - imported_seed_times_t0 + index_difference = int(t0_difference / dt) + + # Check that timeseries_import values are in the seed + if len(imported_seed_times) < abs(index_difference): + # TODO should not be an exception + raise Exception( + "Imported result does not overlap with {} range. " + "Default seed is used".format(self.timeseries_import_basename) + ) + times = timeseries_import_times + + # timeseries_import_variables_dict = {} + for ensemble_member in range(self.__timeseries_import.ensemble_size): + for variable, values in self.__timeseries_import.items(ensemble_member): + self.io.set_timeseries( + variable, timeseries_import_times, values, ensemble_member + ) + + for ensemble_member in range(self.__imported_seed_timeseries.ensemble_size): + for variable, values in self.__imported_seed_timeseries.items(ensemble_member): + write_ts = True + if index_difference >= 0: + values = np.asarray(values[index_difference:], dtype=np.float64) + if len(times) < len(values): + values = values[: len(times)] + elif len(times) > len(values): + if imported_seed_options["extend_seed_forwards"]: + # extend the last entry + values = np.append( + values, [values[-1]] * (len(times) - len(values)) + ) + else: + values = np.append(values, np.nan * (len(times) - len(values))) + else: + values = np.asarray(values, dtype=np.float64) + if imported_seed_options["extend_seed_backwards"]: + # extend first entry back to t0 + values = np.append([values[0]] * abs(index_difference), values) + else: + values = np.append(np.nan * abs(index_difference), values) + if len(times) < len(values): + values = values[: len(times)] + elif len(times) > len(values): + if imported_seed_options["extend_seed_forwards"]: + # extend the last entry + values = np.append( + values, [values[-1]] * (len(times) - len(values)) + ) + else: + values = np.append(values, np.nan * (len(times) - len(values))) + for ( + timeseries_import_variable, + timeseries_import_values, + ) in self.__timeseries_import.items(ensemble_member): + if timeseries_import_variable == variable: + if not imported_seed_options["seed_variables_in_timeseries_import"]: + write_ts = False + elif np.any(timeseries_import_values): + values = [ + a if not math.isnan(a) else b + for a, b in zip(timeseries_import_values, values) + ] + else: + write_ts = False + break + if write_ts: + self.io.set_timeseries( + variable, timeseries_import_times, values, ensemble_member + ) + + logger.info("PIMixin: updated imported timeseries with data from imported seed.") + for ensemble_member in range(self.__timeseries_import.ensemble_size): - for variable, values in self.__timeseries_import.items(ensemble_member): - self.io.set_timeseries(variable, timeseries_import_times, values, ensemble_member) + if not imported_seed_options["import_seed"]: + for variable, values in self.__timeseries_import.items(ensemble_member): + self.io.set_timeseries( + variable, timeseries_import_times, values, ensemble_member + ) # store the parameters in the internal data store. Note that we # are effectively broadcasting parameters, as ParameterConfig does