diff --git a/src/antares/tsgen/cluster_import.py b/src/antares/tsgen/cluster_import.py index 9806c31..a5a23df 100644 --- a/src/antares/tsgen/cluster_import.py +++ b/src/antares/tsgen/cluster_import.py @@ -26,7 +26,7 @@ def import_thermal_cluster(path: Path, days_per_year: int = 365) -> ThermalClust return ThermalCluster( unit_count=int(array[1][1]), nominal_power=float(array[2][1]), - modulation=[float(i) for i in array[3][1 : 24 + 1]], + modulation=array[3][1 : 24 + 1].astype(int), fo_law=law_dict[array[4][1]], fo_volatility=float(array[5][1]), po_law=law_dict[array[6][1]], diff --git a/src/antares/tsgen/cluster_resolve.py b/src/antares/tsgen/cluster_resolve.py index dbf0461..91b9c8c 100644 --- a/src/antares/tsgen/cluster_resolve.py +++ b/src/antares/tsgen/cluster_resolve.py @@ -28,15 +28,15 @@ def resolve_thermal_cluster( return ThermalCluster( unit_count=parsed_yaml.unit_count, nominal_power=parsed_yaml.nominal_power, - modulation=modulation["modulation"].to_list(), + modulation=modulation["modulation"].to_numpy(dtype=float), fo_law=law_dict[parsed_yaml.fo_law], fo_volatility=parsed_yaml.fo_volatility, po_law=law_dict[parsed_yaml.po_law], po_volatility=parsed_yaml.po_volatility, - fo_duration=parameters_ts["FOD"].to_list(), - fo_rate=parameters_ts["FOR"].to_list(), - po_duration=parameters_ts["POD"].to_list(), - po_rate=parameters_ts["POR"].to_list(), - npo_min=parameters_ts["POMax"].to_list(), - npo_max=parameters_ts["POMin"].to_list(), + fo_duration=parameters_ts["FOD"].to_numpy(dtype=int), + fo_rate=parameters_ts["FOR"].to_numpy(dtype=float), + po_duration=parameters_ts["POD"].to_numpy(dtype=int), + po_rate=parameters_ts["POR"].to_numpy(dtype=float), + npo_min=parameters_ts["POMax"].to_numpy(dtype=int), + npo_max=parameters_ts["POMin"].to_numpy(dtype=int), ) diff --git a/src/antares/tsgen/duration_generator.py b/src/antares/tsgen/duration_generator.py index 2f99c94..bdfd4ff 100644 --- a/src/antares/tsgen/duration_generator.py +++ b/src/antares/tsgen/duration_generator.py @@ -12,9 +12,9 @@ from abc import ABC, abstractmethod from enum import Enum from math import log, sqrt -from typing import List import numpy as np +import numpy.typing as npt from .random_generator import RNG @@ -41,7 +41,10 @@ class GeneratorWrapper(DurationGenerator): """ def __init__( - self, delegate: DurationGenerator, volatility: float, expecs: List[int] + self, + delegate: DurationGenerator, + volatility: float, + expecs: npt.NDArray[np.int_], ) -> None: self.volatility = volatility self.expectations = expecs @@ -59,7 +62,9 @@ def generate_duration(self, day: int) -> int: class UniformDurationGenerator(DurationGenerator): - def __init__(self, rng: RNG, volatility: float, expecs: List[int]) -> None: + def __init__( + self, rng: RNG, volatility: float, expecs: npt.NDArray[np.int_] + ) -> None: self.rng = rng self.a = np.empty(len(expecs), dtype=float) self.b = np.empty(len(expecs), dtype=float) @@ -77,7 +82,9 @@ def generate_duration(self, day: int) -> int: class GeometricDurationGenerator(DurationGenerator): - def __init__(self, rng: RNG, volatility: float, expecs: List[int]) -> None: + def __init__( + self, rng: RNG, volatility: float, expecs: npt.NDArray[np.int_] + ) -> None: self.rng = rng self.a = np.empty(len(expecs), dtype=float) self.b = np.empty(len(expecs), dtype=float) @@ -100,7 +107,7 @@ def generate_duration(self, day: int) -> int: def make_duration_generator( - rng: RNG, law: ProbabilityLaw, volatility: float, expectations: List[int] + rng: RNG, law: ProbabilityLaw, volatility: float, expectations: npt.NDArray[np.int_] ) -> DurationGenerator: """ return a DurationGenerator for the given law diff --git a/src/antares/tsgen/ts_generator.py b/src/antares/tsgen/ts_generator.py index cf8d1a4..6d2fa31 100644 --- a/src/antares/tsgen/ts_generator.py +++ b/src/antares/tsgen/ts_generator.py @@ -30,17 +30,16 @@ class ThermalCluster: # nominal power nominal_power: float # modulation of the nominal power for a certain hour in the day (between 0 and 1) - # TODO: check that it should be 24 or 8760 ? - modulation: List[float] ### maybe group nominal_power and modulation in one vaiable + modulation: npt.NDArray[np.int_] # forced and planed outage parameters # indexed by day of the year - fo_duration: List[int] - fo_rate: List[float] - po_duration: List[int] - po_rate: List[float] - npo_min: List[int] # number of planed outage min in a day - npo_max: List[int] # number of planed outage max in a day + fo_duration: npt.NDArray[np.int_] + fo_rate: npt.NDArray[np.float_] + po_duration: npt.NDArray[np.int_] + po_rate: npt.NDArray[np.float_] + npo_min: npt.NDArray[np.int_] # number of planed outage min in a day + npo_max: npt.NDArray[np.int_] # number of planed outage max in a day # forced and planed outage probability law and volatility # volatility characterizes the distance from the expect at which the value drawn can be @@ -49,6 +48,84 @@ class ThermalCluster: po_law: ProbabilityLaw po_volatility: float + def __post_init__(self) -> None: + _check_cluster(self) + + +def _check_1_dim(array: npt.NDArray, name: str) -> None: + if array.ndim != 1: + raise ValueError(f"{name} must be a 1 dimension array.") + + +def _check_array(condition: npt.NDArray[np.bool_], message: str) -> None: + if condition.any(): + raise ValueError(f"{message}: {condition.nonzero()[0].tolist()}") + + +def _check_cluster(cluster: ThermalCluster) -> None: + if cluster.unit_count <= 0: + raise ValueError( + f"Unit count must be strictly positive, got {cluster.unit_count}." + ) + if cluster.nominal_power <= 0: + raise ValueError( + f"Nominal power must be strictly positive, got {cluster.nominal_power}." + ) + if cluster.fo_volatility < 0: + raise ValueError( + f"Forced outage volatility must be positive, got {cluster.unit_count}." + ) + if cluster.po_volatility < 0: + raise ValueError( + f"Planned outage volatility must be positive, got {cluster.unit_count}." + ) + + _check_1_dim(cluster.fo_rate, "Forced outage failure rate") + _check_1_dim(cluster.fo_duration, "Forced outage duration") + _check_1_dim(cluster.po_rate, "Planned failure rate") + _check_1_dim(cluster.po_duration, "Planned outage duration") + _check_1_dim(cluster.npo_min, "Minimum count of planned outages") + _check_1_dim(cluster.npo_max, "Maximum count of planned outages") + _check_1_dim(cluster.modulation, "Hourly modulation") + if len(cluster.modulation) != 24: + raise ValueError("hourly modulation array must have 24 values.") + + _check_array( + cluster.fo_rate < 0, "Forced failure rate is negative on following days" + ) + _check_array( + cluster.fo_rate > 1, "Forced failure rate is greater than 1 on following days" + ) + _check_array( + cluster.fo_duration < 0, "Forced outage duration is negative on following days" + ) + _check_array( + cluster.po_rate < 0, "Planned failure rate is negative on following days" + ) + _check_array( + cluster.po_rate > 1, "Planned failure rate is greater than 1 on following days" + ) + _check_array( + cluster.po_duration < 0, "Planned outage duration is negative on following days" + ) + _check_array( + cluster.modulation < 0, "Hourly modulation is negative on following hours" + ) + + lengths = { + len(a) + for a in [ + cluster.fo_rate, + cluster.fo_duration, + cluster.po_rate, + cluster.po_duration, + cluster.npo_min, + cluster.npo_max, + ] + } + if len(lengths) != 1: + raise ValueError(f"Not all daily arrays have same size, got {lengths}") + class OutputTimeseries: def __init__(self, ts_count: int, days: int) -> None: @@ -65,26 +142,30 @@ def __init__(self, ts_count: int, days: int) -> None: self.forced_outage_durations = np.zeros((ts_count, days), dtype=int) +def _column_powers(column: npt.NDArray[np.float_], width: int) -> npt.NDArray: + """ + Returns a matrix of given width where column[i] is the ith power of the input vector. + """ + powers = np.arange(width) + powers.shape = (1, len(powers)) + column.shape = (len(column), 1) + return pow(column, powers) + + +def _daily_to_hourly(daily_data: npt.NDArray) -> npt.NDArray: + """ + Converts daily rows of a 2D array to hourly rows + """ + if daily_data.ndim != 2: + raise ValueError("Daily data must be a 2D-array") + return np.repeat(daily_data, 24, axis=1) + + class ThermalDataGenerator: def __init__(self, rng: RNG = MersenneTwisterRNG(), days: int = 365) -> None: self.rng = rng self.days = days - # TODO: Remove this log size limits, seems useless and error prone if very large durations - self.log_size = 4000 # >= 5 * (max(df) + max(dp)) - # the number of starting (if positive)/ stopping (if negative) units (due to FO and PO) at a given time - self.LOG = [0] * self.log_size - # same but only for PO; necessary to ensure maximum and minimum PO is respected - self.LOGP = [0] * self.log_size - - # pogramed and forced outage rate - self.lf = np.zeros(days, dtype=float) - self.lp = np.zeros(days, dtype=float) - - ## ??? - self.ff = np.zeros(days, dtype=float) # ff = lf / (1 - lf) - self.pp = np.zeros(days, dtype=float) # pp = lp / (1 - lp) - def generate_time_series( self, cluster: ThermalCluster, @@ -93,51 +174,68 @@ def generate_time_series( """ generation of multiple timeseries for a given thermal cluster """ + _check_cluster(cluster) + + # TODO: Remove this log size limit, seems useless and error prone if very large durations + log_size = 4000 # >= 5 * (max(df) + max(dp)) + # the number of starting (if positive)/ stopping (if negative) units (due to FO and PO) at a given time + log = np.zeros(log_size, dtype=int) + # same but only for PO; necessary to ensure maximum and minimum PO is respected + logp = np.zeros(log_size, dtype=int) + + # pogramed and forced outage rate + daily_fo_rate = np.zeros(self.days, dtype=float) + daily_po_rate = np.zeros(self.days, dtype=float) + + ## ??? + ff = np.zeros(self.days, dtype=float) # ff = lf / (1 - lf) + pp = np.zeros(self.days, dtype=float) # pp = lp / (1 - lp) # --- precalculation --- # cached values for (1-lf)**k and (1-lp)**k - self.FPOW: List[List[float]] = [] - self.PPOW: List[List[float]] = [] - - for day in range(self.days): - # lf and lp represent the forced and programed failure rate - # failure rate means the probability to enter in outage each day - # its value is given by: OR / [OR + OD * (1 - OR)] - FOR = cluster.fo_rate[day] - FOD = cluster.fo_duration[day] - self.lf[day] = FOR / (FOR + FOD * (1 - FOR)) - - POR = cluster.po_rate[day] - POD = cluster.po_duration[day] - self.lp[day] = POR / (POR + POD * (1 - POR)) - - if self.lf[day] < 0: - raise ValueError(f"forced failure rate is negative on day {day}") - if self.lp[day] < 0: - raise ValueError(f"programed failure rate is negative on day {day}") - - ## i dont understand what these calulations are for - ## consequently reduce the lower failure rate - if self.lf[day] < self.lp[day]: - self.lf[day] *= (1 - self.lp[day]) / (1 - self.lf[day]) - if self.lp[day] < self.lf[day]: - self.lp[day] *= (1 - self.lf[day]) / (1 - self.lp[day]) - - a = 0 - b = 0 - if self.lf[day] <= FAILURE_RATE_EQ_1: - a = 1 - self.lf[day] - self.ff[day] = self.lf[day] / a - if self.lp[day] <= FAILURE_RATE_EQ_1: - b = 1 - self.lp[day] - self.pp[day] = self.lp[day] / b - - # pre calculating power values - self.FPOW.append([]) - self.PPOW.append([]) - for k in range(cluster.unit_count + 1): - self.FPOW[-1].append(pow(a, k)) - self.PPOW[-1].append(pow(b, k)) + fpow = np.zeros(shape=(self.days, cluster.unit_count + 1)) + ppow = np.zeros(shape=(self.days, cluster.unit_count + 1)) + + # lf and lp represent the forced and programed failure rate + # failure rate means the probability to enter in outage each day + # its value is given by: OR / [OR + OD * (1 - OR)] + daily_fo_rate = cluster.fo_rate / ( + cluster.fo_rate + cluster.fo_duration * (1 - cluster.fo_rate) + ) + daily_po_rate = cluster.po_rate / ( + cluster.po_rate + cluster.po_duration * (1 - cluster.po_rate) + ) + + invalid_days = daily_fo_rate < 0 + if invalid_days.any(): + raise ValueError( + f"forced failure rate is negative on days {invalid_days.nonzero()[0].tolist()}" + ) + invalid_days = daily_po_rate < 0 + if invalid_days.any(): + raise ValueError( + f"planned failure rate is negative on days {invalid_days.nonzero()[0].tolist()}" + ) + + ## i dont understand what these calulations are for + ## consequently reduce the lower failure rate + mask = daily_fo_rate < daily_po_rate + daily_fo_rate[mask] *= (1 - daily_po_rate[mask]) / (1 - daily_fo_rate[mask]) + mask = daily_po_rate < daily_fo_rate + daily_po_rate[mask] *= (1 - daily_fo_rate[mask]) / (1 - daily_po_rate[mask]) + + a = np.zeros(shape=self.days, dtype=float) + b = np.zeros(shape=self.days, dtype=float) + mask = daily_fo_rate <= FAILURE_RATE_EQ_1 + a[mask] = 1 - daily_fo_rate[mask] + ff[mask] = daily_fo_rate[mask] / a + + mask = daily_po_rate <= FAILURE_RATE_EQ_1 + b[mask] = 1 - daily_po_rate[mask] + pp[mask] = daily_po_rate[mask] / b + + fpow = _column_powers(a, cluster.unit_count + 1) + ppow = _column_powers(b, cluster.unit_count + 1) fod_generator = make_duration_generator( self.rng, cluster.fo_law, cluster.fo_volatility, cluster.fo_duration @@ -153,168 +251,175 @@ def generate_time_series( # output that will be returned output = OutputTimeseries(number_of_timeseries, self.days) - # mixed, pure planned and pure force outage - MXO = 0 - PFO = 0 - PPO = 0 - # dates now = 0 - fut = 0 # current number of PO and AU (avlaible units) - cur_nb_PO = 0 - cur_nb_AU = cluster.unit_count + current_planned_outages = 0 + current_available_units = cluster.unit_count # stock is a way to keep the number of PO pushed back due to PO max / antcipated due to PO min # stock > 0 number of PO pushed back, stock < 0 number of PO antcipated stock = 0 for ts_index in range(-2, number_of_timeseries): - # hour in the year - hour = 0 - for day in range(self.days): # = return of units wich were in outage = - cur_nb_PO -= self.LOGP[now] - self.LOGP[ + current_planned_outages -= logp[now] + logp[ now ] = 0 # set to 0 because this cell will be use again later (in self.log_size days) - cur_nb_AU += self.LOG[now] - self.LOG[now] = 0 + current_available_units += log[now] + log[now] = 0 - FOC = 0 - POC = 0 + fo_candidates = 0 + po_candidates = 0 - if self.lf[day] > 0 and self.lf[day] <= FAILURE_RATE_EQ_1: - A = self.rng.next() - last = self.FPOW[day][cur_nb_AU] - if A > last: + if daily_fo_rate[day] > 0 and daily_fo_rate[day] <= FAILURE_RATE_EQ_1: + draw = self.rng.next() + last = fpow[day, current_available_units] + if draw > last: cumul = last - for d in range(1, cur_nb_AU + 1): - last = last * self.ff[day] * (cur_nb_AU + 1 - d) / d + for d in range(1, current_available_units + 1): + last = ( + last * ff[day] * (current_available_units + 1 - d) / d + ) cumul += last - FOC = d - if A <= cumul: + fo_candidates = d + if draw <= cumul: break elif ( - self.lf[day] > FAILURE_RATE_EQ_1 + daily_fo_rate[day] > FAILURE_RATE_EQ_1 ): # TODO: not same comparison as cpp ? - FOC = cur_nb_AU + fo_candidates = current_available_units else: # self.lf[day] == 0 - FOC = 0 - - if self.lp[day] > 0 and self.lp[day] <= FAILURE_RATE_EQ_1: - # apparent number of available units - AUN_app = cur_nb_AU - if stock >= 0 and stock <= cur_nb_AU: - AUN_app -= stock - elif stock > cur_nb_AU: - AUN_app = 0 - - A = self.rng.next() - last = self.PPOW[day][cur_nb_AU] - if A > last: + fo_candidates = 0 + + if daily_po_rate[day] > 0 and daily_po_rate[day] <= FAILURE_RATE_EQ_1: + apparent_available_units = current_available_units + if stock >= 0 and stock <= current_available_units: + apparent_available_units -= stock + elif stock > current_available_units: + apparent_available_units = 0 + + draw = self.rng.next() + last = ppow[day, apparent_available_units] + if draw > last: cumul = last - for d in range(1, cur_nb_AU + 1): - last = last * self.pp[day] * (cur_nb_AU + 1 - d) / d + for d in range(1, apparent_available_units + 1): + last = ( + last * pp[day] * (apparent_available_units + 1 - d) / d + ) cumul += last - POC = d - if A <= cumul: + po_candidates = d + if draw <= cumul: break - elif self.lp[day] > FAILURE_RATE_EQ_1: - POC = cur_nb_AU + elif daily_po_rate[day] > FAILURE_RATE_EQ_1: + po_candidates = current_available_units else: # self.lf[day] == 0 - POC = 0 + po_candidates = 0 # apparent PO is compared to cur_nb_AU, considering stock - candidate = POC + stock - if 0 <= candidate and candidate <= cur_nb_AU: - POC = candidate + candidate = po_candidates + stock + if 0 <= candidate and candidate <= current_available_units: + po_candidates = candidate stock = 0 - if candidate > cur_nb_AU: - POC = cur_nb_AU - stock = candidate - cur_nb_AU + if candidate > current_available_units: + po_candidates = current_available_units + stock = candidate - current_available_units if candidate < 0: - POC = 0 + po_candidates = 0 stock = candidate # = checking min and max PO = - if POC + cur_nb_PO > cluster.npo_max[day]: + if po_candidates + current_planned_outages > cluster.npo_max[day]: # too many PO to place # the excedent is placed in stock - POC = cluster.npo_max[day] - cur_nb_PO - cur_nb_PO += POC - elif POC + cur_nb_PO < cluster.npo_min[day]: - if cluster.npo_min[day] - cur_nb_PO > cur_nb_AU: - stock -= cur_nb_AU - POC - POC = cur_nb_AU - cur_nb_PO += POC + po_candidates = cluster.npo_max[day] - current_planned_outages + current_planned_outages += po_candidates + elif po_candidates + current_planned_outages < cluster.npo_min[day]: + if ( + cluster.npo_min[day] - current_planned_outages + > current_available_units + ): + stock -= current_available_units - po_candidates + po_candidates = current_available_units + current_planned_outages += po_candidates else: - stock -= cluster.npo_min[day] - (POC + cur_nb_PO) - POC = cluster.npo_min[day] - cur_nb_PO - cur_nb_PO += POC + stock -= cluster.npo_min[day] - ( + po_candidates + current_planned_outages + ) + po_candidates = cluster.npo_min[day] - current_planned_outages + current_planned_outages += po_candidates else: - cur_nb_PO += POC + current_planned_outages += po_candidates # = distributing outage in category = # pure planed, pure forced, mixed + mixed_outages = 0 + pure_forced_outages = 0 + pure_planned_outages = 0 if cluster.unit_count == 1: - if POC == 1 and FOC == 1: - MXO = 1 - PPO = 0 - PFO = 0 + if po_candidates == 1 and fo_candidates == 1: + mixed_outages = 1 + pure_planned_outages = 0 + pure_forced_outages = 0 else: - MXO = 0 - PPO = int(POC) - PFO = int(FOC) + mixed_outages = 0 + pure_planned_outages = int(po_candidates) + pure_forced_outages = int(fo_candidates) else: - if cur_nb_AU != 0: - MXO = int(POC * FOC // cur_nb_AU) - PPO = int(POC - MXO) - PFO = int(FOC - MXO) + if current_available_units != 0: + mixed_outages = int( + po_candidates * fo_candidates // current_available_units + ) + pure_planned_outages = int(po_candidates - mixed_outages) + pure_forced_outages = int(fo_candidates - mixed_outages) else: - MXO = 0 - PPO = 0 - PFO = 0 + mixed_outages = 0 + pure_planned_outages = 0 + pure_forced_outages = 0 # = units stopping = - cur_nb_AU -= PPO + PFO + MXO + current_available_units -= ( + pure_planned_outages + pure_forced_outages + mixed_outages + ) # = generating outage duration = (from the law) - true_POD = 0 - true_FOD = 0 - - if PPO != 0 or MXO != 0: - true_POD = pod_generator.generate_duration(day) - if PFO != 0 or MXO != 0: - true_FOD = fod_generator.generate_duration(day) - - if PPO != 0: - fut = (now + true_POD) % self.log_size - self.LOG[fut] += PPO - self.LOGP[fut] += PPO - if PFO != 0: - fut = (now + true_FOD) % self.log_size - self.LOG[fut] += PFO - if MXO != 0: - fut = (now + true_POD + true_FOD) % self.log_size - self.LOG[fut] += MXO - self.LOGP[fut] += MXO + po_duration = 0 + fo_duration = 0 + + if pure_planned_outages != 0 or mixed_outages != 0: + po_duration = pod_generator.generate_duration(day) + if pure_forced_outages != 0 or mixed_outages != 0: + fo_duration = fod_generator.generate_duration(day) + + if pure_planned_outages != 0: + return_timestep = (now + po_duration) % log_size + log[return_timestep] += pure_planned_outages + logp[return_timestep] += pure_planned_outages + if pure_forced_outages != 0: + return_timestep = (now + fo_duration) % log_size + log[return_timestep] += pure_forced_outages + if mixed_outages != 0: + return_timestep = (now + po_duration + fo_duration) % log_size + log[return_timestep] += mixed_outages + logp[return_timestep] += mixed_outages # = storing output in output arrays = if ts_index >= 0: # drop the 2 first generated timeseries - output.planned_outages[ts_index][day] = PPO - output.forced_outages[ts_index][day] = PFO - output.mixed_outages[ts_index][day] = MXO - output.planned_outage_durations[ts_index][day] = true_POD - output.forced_outage_durations[ts_index][day] = true_FOD - output.available_units[ts_index][day] = cur_nb_AU + output.planned_outages[ts_index, day] = pure_planned_outages + output.forced_outages[ts_index, day] = pure_forced_outages + output.mixed_outages[ts_index, day] = mixed_outages + output.planned_outage_durations[ts_index, day] = po_duration + output.forced_outage_durations[ts_index, day] = fo_duration + output.available_units[ts_index, day] = current_available_units - now = (now + 1) % self.log_size + now = (now + 1) % log_size + hourly_available_units = _daily_to_hourly(output.available_units) + hourly_modulation = np.tile(cluster.modulation, self.days) output.available_power = ( - np.repeat(output.available_units, 24, axis=1) - * cluster.nominal_power - * np.tile(cluster.modulation, self.days) + hourly_available_units * cluster.nominal_power * hourly_modulation ) + return output diff --git a/tests/test_parsing.py b/tests/test_parsing.py index cf6722d..bfdd1b6 100644 --- a/tests/test_parsing.py +++ b/tests/test_parsing.py @@ -10,6 +10,8 @@ # # This file is part of the Antares project. +import numpy as np +import numpy.testing as npt import pytest from antares.tsgen.cluster_parsing import parse_cluster_ts, parse_yaml_cluster @@ -24,17 +26,17 @@ def cluster() -> ThermalCluster: return ThermalCluster( unit_count=1, nominal_power=500, - modulation=[1 for i in range(24)], + modulation=np.ones(dtype=float, shape=24), fo_law=ProbabilityLaw.UNIFORM, fo_volatility=0, po_law=ProbabilityLaw.UNIFORM, po_volatility=0, - fo_duration=[2 for i in range(NB_OF_DAY)], - fo_rate=[0.2 for i in range(NB_OF_DAY)], - po_duration=[1 for i in range(NB_OF_DAY)], - po_rate=[0.1 for i in range(NB_OF_DAY)], - npo_min=[0 for i in range(NB_OF_DAY)], - npo_max=[1 for i in range(NB_OF_DAY)], + fo_duration=np.ones(dtype=int, shape=NB_OF_DAY) * 2, + fo_rate=np.ones(dtype=float, shape=NB_OF_DAY) * 0.2, + po_duration=np.ones(dtype=int, shape=NB_OF_DAY), + po_rate=np.ones(dtype=float, shape=NB_OF_DAY) * 0.1, + npo_min=np.zeros(dtype=int, shape=NB_OF_DAY), + npo_max=np.ones(dtype=int, shape=NB_OF_DAY), ) @@ -47,14 +49,14 @@ def test(cluster, data_directory): assert cld.unit_count == cluster.unit_count assert cld.nominal_power == cluster.nominal_power - assert cld.modulation == cluster.modulation + npt.assert_equal(cld.modulation, cluster.modulation) assert cld.fo_law == cluster.fo_law assert cld.fo_volatility == cluster.fo_volatility assert cld.po_law == cluster.po_law assert cld.po_volatility == cluster.po_volatility - assert cld.fo_duration == cluster.fo_duration - assert cld.fo_rate == cluster.fo_rate - assert cld.po_duration == cluster.po_duration - assert cld.po_rate == cluster.po_rate - assert cld.npo_min == cluster.npo_min - assert cld.npo_max == cluster.npo_max + npt.assert_equal(cld.fo_duration, cluster.fo_duration) + npt.assert_equal(cld.fo_rate, cluster.fo_rate) + npt.assert_equal(cld.po_duration, cluster.po_duration) + npt.assert_equal(cld.po_rate, cluster.po_rate) + npt.assert_equal(cld.npo_min, cluster.npo_min) + npt.assert_equal(cld.npo_max, cluster.npo_max) diff --git a/tests/test_unit.py b/tests/test_unit.py index 1754f7a..682332e 100644 --- a/tests/test_unit.py +++ b/tests/test_unit.py @@ -9,33 +9,159 @@ # SPDX-License-Identifier: MPL-2.0 # # This file is part of the Antares project. +from typing import Any import numpy as np import numpy.testing as npt +import pytest from antares.tsgen.ts_generator import ( ProbabilityLaw, ThermalCluster, ThermalDataGenerator, + _check_cluster, + _column_powers, + _daily_to_hourly, ) +def test_daily_to_hourly(): + daily = np.array([[1, 2]]) + hourly = _daily_to_hourly(daily) + expected = [[1] * 24 + [2] * 24] + npt.assert_equal(hourly, expected) + + +def test_elevate_to_power(): + input = np.array([1, 0.5, 0.1]) + powers = _column_powers(input, 3) + expected = np.array([[1, 1, 1], [1, 0.5, 0.25], [1, 0.1, 0.01]]) + npt.assert_almost_equal(powers, expected, decimal=3) + + +@pytest.fixture() +def base_cluster_365_days(): + days = 365 + return ThermalCluster( + unit_count=10, + nominal_power=100, + modulation=np.ones(dtype=float, shape=24), + fo_law=ProbabilityLaw.UNIFORM, + fo_volatility=0, + po_law=ProbabilityLaw.UNIFORM, + po_volatility=0, + fo_duration=10 * np.ones(dtype=int, shape=days), + fo_rate=0.2 * np.ones(dtype=float, shape=days), + po_duration=10 * np.ones(dtype=int, shape=days), + po_rate=np.zeros(dtype=float, shape=days), + npo_min=np.zeros(dtype=int, shape=days), + npo_max=10 * np.ones(dtype=int, shape=days), + ) + + +def test_invalid_fo_rates(rng, base_cluster_365_days): + days = 365 + cluster = base_cluster_365_days + cluster.fo_rate[12] = -0.2 + cluster.fo_rate[10] = -0.1 + + with pytest.raises( + ValueError, + match="Forced failure rate is negative on following days: \[10, 12\]", + ): + generator = ThermalDataGenerator(rng=rng, days=days) + generator.generate_time_series(cluster, 1) + + +def test_invalid_po_rates(rng, base_cluster_365_days): + days = 365 + cluster = base_cluster_365_days + cluster.po_rate[12] = -0.2 + cluster.po_rate[10] = -0.1 + + with pytest.raises( + ValueError, + match="Planned failure rate is negative on following days: \[10, 12\]", + ): + generator = ThermalDataGenerator(rng=rng, days=days) + generator.generate_time_series(cluster, 1) + + +def valid_cluster() -> ThermalCluster: + days = 365 + return ThermalCluster( + unit_count=10, + nominal_power=100, + modulation=np.ones(dtype=float, shape=24), + fo_law=ProbabilityLaw.UNIFORM, + fo_volatility=0, + po_law=ProbabilityLaw.UNIFORM, + po_volatility=0, + fo_duration=10 * np.ones(dtype=int, shape=days), + fo_rate=0.2 * np.ones(dtype=float, shape=days), + po_duration=10 * np.ones(dtype=int, shape=days), + po_rate=np.zeros(dtype=float, shape=days), + npo_min=np.zeros(dtype=int, shape=days), + npo_max=10 * np.ones(dtype=int, shape=days), + ) + + +def test_invalid_cluster(): + cluster = valid_cluster() + _check_cluster(cluster) + + cluster = valid_cluster() + with pytest.raises(ValueError): + cluster.nominal_power = -1 + _check_cluster(cluster) + + cluster = valid_cluster() + with pytest.raises(ValueError): + cluster.unit_count = -1 + _check_cluster(cluster) + + cluster = valid_cluster() + with pytest.raises(ValueError): + cluster.fo_duration[10] = -1 + _check_cluster(cluster) + + cluster = valid_cluster() + with pytest.raises(ValueError): + cluster.po_duration[10] = -1 + _check_cluster(cluster) + + cluster = valid_cluster() + with pytest.raises(ValueError): + cluster.modulation[10] = -1 + _check_cluster(cluster) + + cluster = valid_cluster() + with pytest.raises(ValueError): + cluster.modulation = np.ones(30) + _check_cluster(cluster) + + cluster = valid_cluster() + with pytest.raises(ValueError): + cluster.fo_rate = cluster.fo_rate[:-2] + _check_cluster(cluster) + + def test_forced_outages(rng): days = 365 cluster = ThermalCluster( unit_count=10, nominal_power=100, - modulation=[1 for i in range(24)], + modulation=np.ones(dtype=float, shape=24), fo_law=ProbabilityLaw.UNIFORM, fo_volatility=0, po_law=ProbabilityLaw.UNIFORM, po_volatility=0, - fo_duration=[10 for i in range(days)], - fo_rate=[0.2 for i in range(days)], - po_duration=[10 for i in range(days)], - po_rate=[0 for i in range(days)], - npo_min=[0 for i in range(days)], - npo_max=[10 for i in range(days)], + fo_duration=10 * np.ones(dtype=int, shape=days), + fo_rate=0.2 * np.ones(dtype=float, shape=days), + po_duration=10 * np.ones(dtype=int, shape=days), + po_rate=np.zeros(dtype=float, shape=days), + npo_min=np.zeros(dtype=int, shape=days), + npo_max=10 * np.ones(dtype=int, shape=days), ) cluster.modulation[12] = 0.5 @@ -60,17 +186,17 @@ def test_planned_outages(rng): cluster = ThermalCluster( unit_count=10, nominal_power=100, - modulation=[1 for i in range(24)], + modulation=np.ones(dtype=float, shape=24), fo_law=ProbabilityLaw.UNIFORM, fo_volatility=0, po_law=ProbabilityLaw.UNIFORM, po_volatility=0, - fo_duration=[10 for i in range(days)], - fo_rate=[0 for i in range(days)], - po_duration=[10 for i in range(days)], - po_rate=[0.2 for i in range(days)], - npo_min=[0 for i in range(days)], - npo_max=[10 for i in range(days)], + fo_duration=10 * np.ones(dtype=int, shape=days), + fo_rate=np.zeros(dtype=float, shape=days), + po_duration=10 * np.ones(dtype=int, shape=days), + po_rate=0.2 * np.ones(dtype=float, shape=days), + npo_min=np.zeros(dtype=int, shape=days), + npo_max=10 * np.ones(dtype=int, shape=days), ) cluster.modulation[12] = 0.5 @@ -94,17 +220,17 @@ def test_planned_outages_limitation(rng): cluster = ThermalCluster( unit_count=10, nominal_power=100, - modulation=[1 for i in range(24)], + modulation=np.ones(dtype=float, shape=24), fo_law=ProbabilityLaw.UNIFORM, fo_volatility=0, po_law=ProbabilityLaw.UNIFORM, po_volatility=0, - fo_duration=[10 for i in range(days)], - fo_rate=[0 for i in range(days)], - po_duration=[2 for i in range(days)], - po_rate=[0.2 for i in range(days)], - npo_min=[0 for i in range(days)], - npo_max=[1 for i in range(days)], + fo_duration=10 * np.ones(dtype=int, shape=days), + fo_rate=np.zeros(dtype=float, shape=days), + po_duration=2 * np.ones(dtype=int, shape=days), + po_rate=0.2 * np.ones(dtype=float, shape=days), + npo_min=np.zeros(dtype=int, shape=days), + npo_max=1 * np.ones(dtype=int, shape=days), ) generator = ThermalDataGenerator(rng=rng, days=days) @@ -127,17 +253,17 @@ def test_planned_outages_min_limitation(rng): cluster = ThermalCluster( unit_count=10, nominal_power=100, - modulation=[1 for i in range(24)], + modulation=np.ones(dtype=float, shape=24), fo_law=ProbabilityLaw.UNIFORM, fo_volatility=0, po_law=ProbabilityLaw.UNIFORM, po_volatility=0, - fo_duration=[10 for i in range(days)], - fo_rate=[0 for i in range(days)], - po_duration=[10 for i in range(days)], - po_rate=[0.2 for i in range(days)], - npo_min=[2 for i in range(days)], - npo_max=[5 for i in range(days)], + fo_duration=10 * np.ones(dtype=int, shape=days), + fo_rate=np.zeros(dtype=float, shape=days), + po_duration=10 * np.ones(dtype=int, shape=days), + po_rate=0.2 * np.ones(dtype=float, shape=days), + npo_min=2 * np.ones(dtype=int, shape=days), + npo_max=5 * np.ones(dtype=int, shape=days), ) generator = ThermalDataGenerator(rng=rng, days=days)