diff --git a/src/cluster_import.py b/src/cluster_import.py index 21c40c0..cdc8725 100644 --- a/src/cluster_import.py +++ b/src/cluster_import.py @@ -6,21 +6,20 @@ def import_thermal_cluster(path: Path, days_per_year: int = 365): - law_dict = {"UNIFORM":ProbilityLaw.UNIFORM, "GEOMETRIC":ProbilityLaw.GEOMETRIC} - array = np.genfromtxt(path, delimiter=',', dtype=str) + law_dict = {"UNIFORM": ProbilityLaw.UNIFORM, "GEOMETRIC": ProbilityLaw.GEOMETRIC} + array = np.genfromtxt(path, delimiter=",", dtype=str) return ThermalCluster( - unit_count = int(array[1][1]), - nominal_power = float(array[2][1]), - modulation = [float(i) for i in array[3][1:24 + 1]], - fo_law = law_dict[array[4][1]], - fo_volatility = float(array[5][1]), - po_law = law_dict[array[6][1]], - po_volatility = float(array[7][1]), - - fo_duration = array[8][1:days_per_year + 1].astype(float), - fo_rate = array[9][1:days_per_year + 1].astype(float), - po_duration = array[10][1:days_per_year + 1].astype(float), - po_rate = array[11][1:days_per_year + 1].astype(float), - npo_min = array[12][1:days_per_year + 1].astype(float), - npo_max = array[13][1:days_per_year + 1].astype(float), - ) \ No newline at end of file + unit_count=int(array[1][1]), + nominal_power=float(array[2][1]), + modulation=[float(i) for i in array[3][1 : 24 + 1]], + fo_law=law_dict[array[4][1]], + fo_volatility=float(array[5][1]), + po_law=law_dict[array[6][1]], + po_volatility=float(array[7][1]), + fo_duration=array[8][1 : days_per_year + 1].astype(float), + fo_rate=array[9][1 : days_per_year + 1].astype(float), + po_duration=array[10][1 : days_per_year + 1].astype(float), + po_rate=array[11][1 : days_per_year + 1].astype(float), + npo_min=array[12][1 : days_per_year + 1].astype(float), + npo_max=array[13][1 : days_per_year + 1].astype(float), + ) diff --git a/src/ts_generator.py b/src/ts_generator.py index 55c98cd..a5f47c6 100644 --- a/src/ts_generator.py +++ b/src/ts_generator.py @@ -6,9 +6,10 @@ import numpy as np -#probabilities above FAILURE_RATE_EQ_1 are considered certain (equal to 1) +# probabilities above FAILURE_RATE_EQ_1 are considered certain (equal to 1) FAILURE_RATE_EQ_1 = 0.999 + class rndgenerator: @classmethod def next(self) -> float: @@ -22,55 +23,63 @@ class ProbilityLaw(Enum): @dataclass class ThermalCluster: - #available units of the cluster + # available units of the cluster unit_count: int - #nominal power + # nominal power nominal_power: float - #modulation of the nominal power for a certain hour in the day (between 0 and 1) - modulation: List[float] ### maybe group nominal_power and modulation in one vaiable + # modulation of the nominal power for a certain hour in the day (between 0 and 1) + modulation: List[float] ### maybe group nominal_power and modulation in one vaiable - #forced and planed outage parameters - #indexed by day of the year + # forced and planed outage parameters + # indexed by day of the year fo_duration: List[int] fo_rate: List[float] po_duration: List[int] po_rate: List[float] - npo_min: List[int] #number of planed outage min in a day - npo_max: List[int] #number of planed outage max in a day + npo_min: List[int] # number of planed outage min in a day + npo_max: List[int] # number of planed outage max in a day - #forced and planed outage probability law and volatility - #volatility characterizes the distance from the expect at which the value drawn can be + # forced and planed outage probability law and volatility + # volatility characterizes the distance from the expect at which the value drawn can be fo_law: ProbilityLaw - fo_volatility: float ### maybe create an object to store law and volatility + fo_volatility: float ### maybe create an object to store law and volatility po_law: ProbilityLaw po_volatility: float + class ThermalDataGenerator: def __init__(self, days_per_year: int = 365) -> None: self.days_per_year = days_per_year ## explain that ?? - self.log_size = 4000 # >= 5 * (max(df) + max(dp)) - #the number of starting (if positive)/ stopping (if negative) units (due to FO and PO) at a given time + self.log_size = 4000 # >= 5 * (max(df) + max(dp)) + # the number of starting (if positive)/ stopping (if negative) units (due to FO and PO) at a given time self.LOG = [0] * self.log_size - #same but only for PO; necessary to ensure maximum and minimum PO is respected + # same but only for PO; necessary to ensure maximum and minimum PO is respected self.LOGP = [0] * self.log_size - #pogramed and forced outage rate + # pogramed and forced outage rate self.lf = np.empty(days_per_year, dtype=float) self.lp = np.empty(days_per_year, dtype=float) ## ??? - self.ff = np.empty(days_per_year, dtype=float) #ff = lf / (1 - lf) - self.pp = np.empty(days_per_year, dtype=float) #pp = lp / (1 - lp) + self.ff = np.empty(days_per_year, dtype=float) # ff = lf / (1 - lf) + self.pp = np.empty(days_per_year, dtype=float) # pp = lp / (1 - lp) - #precalculated value to speed up generation of random outage duration + # precalculated value to speed up generation of random outage duration self.af = np.empty(days_per_year, dtype=float) self.bf = np.empty(days_per_year, dtype=float) self.ap = np.empty(days_per_year, dtype=float) self.bp = np.empty(days_per_year, dtype=float) - def prepare_outage_duration_constant(self, law: ProbilityLaw, volatility: float, A: List[float], B: List[float], expecs: List[int]) -> None: + def prepare_outage_duration_constant( + self, + law: ProbilityLaw, + volatility: float, + A: List[float], + B: List[float], + expecs: List[int], + ) -> None: """ precalculation of constant values use in generation of outage duration results are stored in A and B @@ -86,14 +95,16 @@ def prepare_outage_duration_constant(self, law: ProbilityLaw, volatility: float, D = expecs[day] xtemp = volatility * volatility * D * (D - 1) if xtemp != 0: - ytemp = (sqrt(4 * xtemp + 1) - 1)/(2 * xtemp) + ytemp = (sqrt(4 * xtemp + 1) - 1) / (2 * xtemp) A[day] = D - 1 / ytemp B[day] = 1 / log(1 - ytemp) else: A[day] = D - 1 B[day] = 0 - def duration_generator(self, law: ProbilityLaw, volatility: float, a: float, b: float, expec: int) -> int: + def duration_generator( + self, law: ProbilityLaw, volatility: float, a: float, b: float, expec: int + ) -> int: """ generation of random outage duration """ @@ -101,9 +112,15 @@ def duration_generator(self, law: ProbilityLaw, volatility: float, a: float, b: if law == ProbilityLaw.UNIFORM: return int(a + rnd_nb * b) elif law == ProbilityLaw.GEOMETRIC: - return min(int( 1 + a + b * log(rnd_nb)), int(self.log_size / 2 - 1)) - - def generate_time_series(self, cluster: ThermalCluster, number_of_timeseries: int, output_series: List[List[float]], output_outages: Optional[List[List[Tuple[int]]]] = None) -> None: + return min(int(1 + a + b * log(rnd_nb)), int(self.log_size / 2 - 1)) + + def generate_time_series( + self, + cluster: ThermalCluster, + number_of_timeseries: int, + output_series: List[List[float]], + output_outages: Optional[List[List[Tuple[int]]]] = None, + ) -> None: """ generation of multiple timeseries for a given thermal cluster output_series stores available power at a given time @@ -111,14 +128,14 @@ def generate_time_series(self, cluster: ThermalCluster, number_of_timeseries: in """ # --- precalculation --- - #cached values for (1-lf)**k and (1-lp)**k - self.FPOW = [] - self.PPOW = [] + # cached values for (1-lf)**k and (1-lp)**k + self.FPOW = [] + self.PPOW = [] for day in range(self.days_per_year): - #lf and lp represent the forced and programed failure rate - #failure rate means the probability to enter in outage each day - #its value is given by: OR / [OR + OD * (1 - OR)] + # lf and lp represent the forced and programed failure rate + # failure rate means the probability to enter in outage each day + # its value is given by: OR / [OR + OD * (1 - OR)] FOR = cluster.fo_rate[day] FOD = cluster.fo_duration[day] self.lf[day] = FOR / (FOR + FOD * (1 - FOR)) @@ -134,53 +151,56 @@ def generate_time_series(self, cluster: ThermalCluster, number_of_timeseries: in ## i dont understand what these calulations are for ## consequently reduce the lower failure rate - if (self.lf[day] < self.lp[day]): + if self.lf[day] < self.lp[day]: self.lf[day] *= (1 - self.lp[day]) / (1 - self.lf[day]) - if (self.lp[day] < self.lf[day]): + if self.lp[day] < self.lf[day]: self.lp[day] *= (1 - self.lf[day]) / (1 - self.lp[day]) a = 0 b = 0 - if (self.lf[day] <= FAILURE_RATE_EQ_1): + if self.lf[day] <= FAILURE_RATE_EQ_1: a = 1 - self.lf[day] self.ff[day] = self.lf[day] / a - if (self.lp[day] <= FAILURE_RATE_EQ_1): + if self.lp[day] <= FAILURE_RATE_EQ_1: b = 1 - self.lp[day] self.pp[day] = self.lp[day] / b - #pre calculating power values + # pre calculating power values self.FPOW.append([]) self.PPOW.append([]) for k in range(cluster.unit_count + 1): self.FPOW[-1].append(pow(a, k)) self.PPOW[-1].append(pow(b, k)) - - self.prepare_outage_duration_constant(cluster.fo_law, cluster.fo_volatility, self.af, self.bf, cluster.fo_duration) - self.prepare_outage_duration_constant(cluster.po_law, cluster.po_volatility, self.ap, self.bp, cluster.po_duration) + self.prepare_outage_duration_constant( + cluster.fo_law, cluster.fo_volatility, self.af, self.bf, cluster.fo_duration + ) + self.prepare_outage_duration_constant( + cluster.po_law, cluster.po_volatility, self.ap, self.bp, cluster.po_duration + ) # --- calculation --- # the two first generated time series will be dropped, necessary to make system stable and physically coherent # as a consequence, N + 2 time series will be computed - #mixed, pure planned and pure force outage + # mixed, pure planned and pure force outage MXO = 0 PFO = 0 PPO = 0 - #dates + # dates now = 0 fut = 0 - #current number of PO and AU (avlaible units) + # current number of PO and AU (avlaible units) cur_nb_PO = 0 cur_nb_AU = cluster.unit_count - #stock is a way to keep the number of PO pushed back due to PO max / antcipated due to PO min - #stock > 0 number of PO pushed back, stock < 0 number of PO antcipated + # stock is a way to keep the number of PO pushed back due to PO max / antcipated due to PO min + # stock > 0 number of PO pushed back, stock < 0 number of PO antcipated stock = 0 for ts_index in range(number_of_timeseries + 2): - #hour in the year + # hour in the year hour = 0 if ts_index > 1: if output_outages: @@ -188,18 +208,20 @@ def generate_time_series(self, cluster: ThermalCluster, number_of_timeseries: in output_serie = output_series[ts_index - 2] for day in range(self.days_per_year): - if cur_nb_AU > 100: ## it was like that in c++, i'm not shure why - #maybe it's for the pow (FPOW, PPOW) calculation, if so it might not be the right place to do + if cur_nb_AU > 100: ## it was like that in c++, i'm not shure why + # maybe it's for the pow (FPOW, PPOW) calculation, if so it might not be the right place to do raise ValueError("avalaible unit number out of bound (> 100)") # = return of units wich were in outage = cur_nb_PO -= self.LOGP[now] - self.LOGP[now] = 0 #set to 0 because this cell will be use again later (in self.log_size days) + self.LOGP[ + now + ] = 0 # set to 0 because this cell will be use again later (in self.log_size days) cur_nb_AU += self.LOG[now] self.LOG[now] = 0 # = determinating units that go on outage = - #FO and PO canditate + # FO and PO canditate FOC = 0 POC = 0 @@ -216,11 +238,11 @@ def generate_time_series(self, cluster: ThermalCluster, number_of_timeseries: in break elif self.lf[day] > FAILURE_RATE_EQ_1: FOC = cur_nb_AU - else: # self.lf[day] == 0 + else: # self.lf[day] == 0 FOC = 0 if self.lp[day] > 0 and self.lp[day] <= FAILURE_RATE_EQ_1: - #apparent number of available units + # apparent number of available units AUN_app = cur_nb_AU if stock >= 0 and stock <= cur_nb_AU: AUN_app -= stock @@ -239,11 +261,10 @@ def generate_time_series(self, cluster: ThermalCluster, number_of_timeseries: in break elif self.lp[day] > FAILURE_RATE_EQ_1: POC = cur_nb_AU - else: # self.lf[day] == 0 + else: # self.lf[day] == 0 POC = 0 - - #apparent PO is compared to cur_nb_AU, considering stock + # apparent PO is compared to cur_nb_AU, considering stock candidate = POC + stock if 0 <= candidate and candidate <= cur_nb_AU: POC = candidate @@ -257,8 +278,8 @@ def generate_time_series(self, cluster: ThermalCluster, number_of_timeseries: in # = checking min and max PO = if POC + cur_nb_PO > cluster.npo_max[day]: - #too many PO to place - #the excedent is placed in stock + # too many PO to place + # the excedent is placed in stock POC = cluster.npo_max[day] - cur_nb_PO cur_nb_PO += POC elif POC + cur_nb_PO < cluster.npo_min[day]: @@ -273,8 +294,8 @@ def generate_time_series(self, cluster: ThermalCluster, number_of_timeseries: in else: cur_nb_PO += POC - # = distributing outage in category = - #pure planed, pure forced, mixed + # = distributing outage in category = + # pure planed, pure forced, mixed if cluster.unit_count == 1: if POC == 1 and FOC == 1: MXO = 1 @@ -302,9 +323,21 @@ def generate_time_series(self, cluster: ThermalCluster, number_of_timeseries: in true_FOD = 0 if PPO != 0 or MXO != 0: - true_POD = self.duration_generator(cluster.po_law, cluster.po_volatility, self.ap[day], self.bp[day], cluster.po_duration[day]) + true_POD = self.duration_generator( + cluster.po_law, + cluster.po_volatility, + self.ap[day], + self.bp[day], + cluster.po_duration[day], + ) if PFO != 0 or MXO != 0: - true_FOD = self.duration_generator(cluster.fo_law, cluster.fo_volatility, self.af[day], self.bf[day], cluster.fo_duration[day]) + true_FOD = self.duration_generator( + cluster.fo_law, + cluster.fo_volatility, + self.af[day], + self.bf[day], + cluster.fo_duration[day], + ) if PPO != 0: fut = (now + true_POD) % self.log_size @@ -319,11 +352,13 @@ def generate_time_series(self, cluster: ThermalCluster, number_of_timeseries: in self.LOGP[fut] += MXO # = storing output in output arrays = - if ts_index > 1: #drop the 2 first generated timeseries + if ts_index > 1: # drop the 2 first generated timeseries if output_outages: output_outage[day] = (PPO, PFO, MXO, true_POD, true_FOD) for h in range(24): - output_serie[hour] = cur_nb_AU * cluster.nominal_power * cluster.modulation[h] + output_serie[hour] = ( + cur_nb_AU * cluster.nominal_power * cluster.modulation[h] + ) hour += 1 - now = (now + 1) % self.log_size \ No newline at end of file + now = (now + 1) % self.log_size diff --git a/tests/conftest.py b/tests/conftest.py index e055f6f..44d44c7 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -7,6 +7,7 @@ def data_directory() -> Path: return Path(__file__).parent / "data" + @pytest.fixture def output_directory() -> Path: - return Path(__file__).parent / "output" \ No newline at end of file + return Path(__file__).parent / "output" diff --git a/tests/test_random_generator.py b/tests/test_random_generator.py index 8e7e4c5..5eb8395 100644 --- a/tests/test_random_generator.py +++ b/tests/test_random_generator.py @@ -30,12 +30,13 @@ def test_geometric_law(output_directory): values[value] += N_inv expec /= N - assert expec == pytest.approx(10, abs = 0.1) + assert expec == pytest.approx(10, abs=0.1) plt.plot(range(nb_values), values) plt.savefig(output_directory / "geometric_law_distrib.png") plt.clf() + def test_uniform_law(output_directory): generator = ThermalDataGenerator(days_per_year=1) @@ -61,8 +62,8 @@ def test_uniform_law(output_directory): values[value] += N_inv expec /= N - assert expec == pytest.approx(10, abs = 0.1) + assert expec == pytest.approx(10, abs=0.1) plt.plot(range(nb_values), values) plt.savefig(output_directory / "uniform_law_distrib.png") - plt.clf() \ No newline at end of file + plt.clf() diff --git a/tests/test_ts_generator.py b/tests/test_ts_generator.py index bee9562..9f39188 100644 --- a/tests/test_ts_generator.py +++ b/tests/test_ts_generator.py @@ -11,10 +11,12 @@ def cluster_1(data_directory) -> Path: return import_thermal_cluster(data_directory / "cluster_1.csv") + @pytest.fixture def cluster_100(data_directory) -> Path: return import_thermal_cluster(data_directory / "cluster_100.csv") + @pytest.fixture def cluster_high_por(data_directory) -> Path: return import_thermal_cluster(data_directory / "cluster_high_por.csv") @@ -38,16 +40,20 @@ def test_one_unit_cluster(cluster_1, output_directory): true_for = tot_fo / (365 * ts_nb) with open(output_directory / "test_one_unit_cluster.csv", "w") as file: - writer = csv.writer(file, delimiter=',', quotechar='"') - + writer = csv.writer(file, delimiter=",", quotechar='"') + writer.writerow(["timeseries :"]) - writer.writerows([[line[i] for i in range(0, len(line), 24)] for line in output_series]) - + writer.writerows( + [[line[i] for i in range(0, len(line), 24)] for line in output_series] + ) + writer.writerow(["outage duration ([POD, FOD]) :"]) writer.writerows(output_outages) writer.writerow(["total PO :", tot_po, "total FO :", tot_fo]) - writer.writerow(["PO rate :", round(true_por, 4), "FO rate :", round(true_for, 4)]) + writer.writerow( + ["PO rate :", round(true_por, 4), "FO rate :", round(true_for, 4)] + ) def test_hundred_unit_cluster(cluster_100, output_directory): @@ -67,7 +73,7 @@ def test_hundred_unit_cluster(cluster_100, output_directory): true_por = tot_po / (365 * ts_nb * cluster_100.unit_count) true_for = tot_fo / (365 * ts_nb * cluster_100.unit_count) - #check the max PO + # check the max PO tots_simult_po = [[] for _ in range(ts_nb)] cursor = [0] * 10 tot_simult_po = 0 @@ -90,16 +96,20 @@ def test_hundred_unit_cluster(cluster_100, output_directory): tots_simult_po[i // 365].append(tot_simult_po) with open(output_directory / "test_100_unit_cluster.csv", "w") as file: - writer = csv.writer(file, delimiter=',', quotechar='"') - + writer = csv.writer(file, delimiter=",", quotechar='"') + writer.writerow(["timeseries :"]) - writer.writerows([[line[i] for i in range(0, len(line), 24)] for line in output_series]) - + writer.writerows( + [[line[i] for i in range(0, len(line), 24)] for line in output_series] + ) + writer.writerow(["outage duration ([POD, FOD]) :"]) writer.writerows(output_outages) writer.writerow(["total PO :", tot_po, "total FO :", tot_fo]) - writer.writerow(["PO rate :", round(true_por, 4), "FO rate :", round(true_for, 4)]) + writer.writerow( + ["PO rate :", round(true_por, 4), "FO rate :", round(true_for, 4)] + ) writer.writerow(["total simultaneous PO :"]) writer.writerows(tots_simult_po) @@ -112,7 +122,9 @@ def test_max_po(cluster_high_por, output_directory): output_series = [[0 for _ in range(365 * 24)] for __ in range(ts_nb)] output_outages = [[0 for _ in range(365)] for __ in range(ts_nb)] - generator.generate_time_series(cluster_high_por, ts_nb, output_series, output_outages) + generator.generate_time_series( + cluster_high_por, ts_nb, output_series, output_outages + ) tot_po = 0 tot_fo = 0 @@ -122,7 +134,7 @@ def test_max_po(cluster_high_por, output_directory): true_por = tot_po / (365 * ts_nb * cluster_high_por.unit_count) true_for = tot_fo / (365 * ts_nb * cluster_high_por.unit_count) - #check the max PO + # check the max PO tots_simult_po = [[] for _ in range(ts_nb)] cursor = [0] * 10 tot_simult_po = 0 @@ -145,16 +157,20 @@ def test_max_po(cluster_high_por, output_directory): tots_simult_po[i // 365].append(tot_simult_po) with open(output_directory / "test_high_por_cluster.csv", "w") as file: - writer = csv.writer(file, delimiter=',', quotechar='"') - + writer = csv.writer(file, delimiter=",", quotechar='"') + writer.writerow(["timeseries :"]) - writer.writerows([[line[i] for i in range(0, len(line), 24)] for line in output_series]) - + writer.writerows( + [[line[i] for i in range(0, len(line), 24)] for line in output_series] + ) + writer.writerow(["outage duration ([POD, FOD]) :"]) writer.writerows(output_outages) writer.writerow(["total PO :", tot_po, "total FO :", tot_fo]) - writer.writerow(["PO rate :", round(true_por, 4), "FO rate :", round(true_for, 4)]) + writer.writerow( + ["PO rate :", round(true_por, 4), "FO rate :", round(true_for, 4)] + ) writer.writerow(["total simultaneous PO :"]) writer.writerows(tots_simult_po)