diff --git a/grid2op/Backend/backend.py b/grid2op/Backend/backend.py index ba8eb402..48fccb93 100644 --- a/grid2op/Backend/backend.py +++ b/grid2op/Backend/backend.py @@ -15,7 +15,6 @@ from abc import ABC, abstractmethod import numpy as np import pandas as pd -from dataclasses import dataclass from typing import Tuple, Optional, Any, Dict, Union try: @@ -39,37 +38,7 @@ from grid2op.Space import GridObjects, DEFAULT_N_BUSBAR_PER_SUB, DEFAULT_ALLOW_DETACHMENT -@dataclass -class KirchhoffInfo: - p_or:np.ndarray - q_or:np.ndarray - v_or:np.ndarray - p_ex:np.ndarray - q_ex:np.ndarray - v_ex:np.ndarray - p_gen:np.ndarray - q_gen:np.ndarray - v_gen:np.ndarray - p_load:np.ndarray - q_load:np.ndarray - v_load:np.ndarray - p_subs:np.ndarray - q_subs:np.ndarray - p_bus:np.ndarray - q_bus:np.ndarray - v_bus:np.ndarray - topo_vect:np.ndarray - p_storage:Optional[np.ndarray] = None - q_storage:Optional[np.ndarray] = None - v_storage:Optional[np.ndarray] = None - p_s:Optional[np.ndarray] = None - q_s:Optional[np.ndarray] = None - v_s:Optional[np.ndarray] = None - bus_s:Optional[np.ndarray] = None - # TODO method to get V and theta at each bus, could be in the same shape as check_kirchoff - - class Backend(GridObjects, ABC): """ INTERNAL @@ -1284,210 +1253,53 @@ def check_kirchoff(self) -> Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray warnings.warn(message="please use backend.check_kirchhoff() instead", category=DeprecationWarning) return self.check_kirchhoff() - @staticmethod - def _check_kirchhoff_lines(cls, info:KirchhoffInfo): - for i in range(cls.n_line): - sub_or_id = cls.line_or_to_subid[i] - sub_ex_id = cls.line_ex_to_subid[i] - if (info.topo_vect[cls.line_or_pos_topo_vect[i]] == -1 or - info.topo_vect[cls.line_ex_pos_topo_vect[i]] == -1): - # line is disconnected + def _aux_check_kirchhoff(self, + n_elt:int, # cst eg. cls.n_gen + el_to_subid:np.ndarray, # cst eg. cls.gen_to_subid + el_bus:np.ndarray, # cst eg. gen_bus + el_p:np.ndarray, # cst, eg. gen_p + el_q:np.ndarray, # cst, eg. gen_q + el_v:np.ndarray, # cst, eg. gen_v + p_subs:np.ndarray, q_subs:np.ndarray, + p_bus:np.ndarray, q_bus:np.ndarray, + v_bus:np.ndarray, + # whether the object is load convention (True) or gen convention (False) + load_conv:np.ndarray=True): + for i in range(n_elt): + psubid = el_to_subid[i] + if el_bus[i] == -1: + # el is disconnected continue - loc_bus_or = info.topo_vect[cls.line_or_pos_topo_vect[i]] - 1 - loc_bus_ex = info.topo_vect[cls.line_ex_pos_topo_vect[i]] - 1 # for substations - info.p_subs[sub_or_id] += info.p_or[i] - info.p_subs[sub_ex_id] += info.p_ex[i] - - info.q_subs[sub_or_id] += info.q_or[i] - info.q_subs[sub_ex_id] += info.q_ex[i] - - # for bus - info.p_bus[sub_or_id, loc_bus_or] += info.p_or[i] - info.q_bus[sub_or_id, loc_bus_or] += info.q_or[i] - - info.p_bus[ sub_ex_id, loc_bus_ex] += info.p_ex[i] - info.q_bus[sub_ex_id, loc_bus_ex] += info.q_ex[i] - - # fill the min / max voltage per bus (initialization) - if (info.v_bus[sub_or_id, loc_bus_or,][0] == -1): - info.v_bus[sub_or_id, loc_bus_or,][0] = info.v_or[i] - if (info.v_bus[sub_ex_id, loc_bus_ex,][0] == -1): - info.v_bus[sub_ex_id, loc_bus_ex,][0] = info.v_ex[i] - if (info.v_bus[sub_or_id, loc_bus_or,][1]== -1): - info.v_bus[sub_or_id, loc_bus_or,][1] = info.v_or[i] - if (info.v_bus[sub_ex_id, loc_bus_ex,][1]== -1): - info.v_bus[sub_ex_id, loc_bus_ex,][1] = info.v_ex[i] - - # now compute the correct stuff - if info.v_or[i] > 0.0: - # line is connected - info.v_bus[sub_or_id, loc_bus_or,][0] = min(info.v_bus[sub_or_id, loc_bus_or,][0],info.v_or[i],) - info.v_bus[sub_or_id, loc_bus_or,][1] = max(info.v_bus[sub_or_id, loc_bus_or,][1],info.v_or[i],) - - if info.v_ex[i] > 0: - # line is connected - info.v_bus[sub_ex_id, loc_bus_ex,][0] = min(info.v_bus[sub_ex_id, loc_bus_ex,][0],info.v_ex[i],) - info.v_bus[sub_ex_id, loc_bus_ex,][1] = max(info.v_bus[sub_ex_id, loc_bus_ex,][1],info.v_ex[i],) - return info - - @staticmethod - def _check_kirchhoff_gens(cls, info:KirchhoffInfo): - for i in range(cls.n_gen): - gptv = cls.gen_pos_topo_vect[i] - - if info.topo_vect[gptv] == -1: - # gen is disconnected - continue - - # for substations - info.p_subs[cls.gen_to_subid[i]] -= info.p_gen[i] - info.q_subs[cls.gen_to_subid[i]] -= info.q_gen[i] + if load_conv: + p_subs[psubid] += el_p[i] + q_subs[psubid] += el_q[i] + else: + p_subs[psubid] -= el_p[i] + q_subs[psubid] -= el_q[i] - loc_bus = info.topo_vect[gptv] - 1 # for bus - info.p_bus[ - cls.gen_to_subid[i], loc_bus - ] -= info.p_gen[i] - info.q_bus[ - cls.gen_to_subid[i], loc_bus - ] -= info.q_gen[i] + loc_bus = el_bus[i] - 1 + if load_conv: + p_bus[psubid, loc_bus] += el_p[i] + q_bus[psubid, loc_bus] += el_q[i] + else: + p_bus[psubid, loc_bus] -= el_p[i] + q_bus[psubid, loc_bus] -= el_q[i] # compute max and min values - if info.v_gen[i]: + if el_v is not None and el_v[i]: # but only if gen is connected - info.v_bus[cls.gen_to_subid[i], loc_bus][ - 0 - ] = min( - info.v_bus[ - cls.gen_to_subid[i], loc_bus - ][0], - info.v_gen[i], - ) - info.v_bus[cls.gen_to_subid[i], loc_bus][ - 1 - ] = max( - info.v_bus[ - cls.gen_to_subid[i], loc_bus - ][1], - info.v_gen[i], - ) - return info - - def _check_kirchhoff_loads(cls, info:KirchhoffInfo): - for i in range(cls.n_load): - gptv = cls.load_pos_topo_vect[i] - - if info.topo_vect[gptv] == -1: - # load is disconnected - continue - loc_bus = info.topo_vect[gptv] - 1 - - # for substations - info.p_subs[cls.load_to_subid[i]] += info.p_load[i] - info.q_subs[cls.load_to_subid[i]] += info.q_load[i] - - # for buses - info.p_bus[ - cls.load_to_subid[i], loc_bus - ] += info.p_load[i] - info.q_bus[ - cls.load_to_subid[i], loc_bus - ] += info.q_load[i] - - # compute max and min values - if info.v_load[i]: - # but only if load is connected - info.v_bus[cls.load_to_subid[i], loc_bus][ - 0 - ] = min( - info.v_bus[ - cls.load_to_subid[i], loc_bus - ][0], - info.v_load[i], - ) - info.v_bus[cls.load_to_subid[i], loc_bus][ - 1 - ] = max( - info.v_bus[ - cls.load_to_subid[i], loc_bus - ][1], - info.v_load[i], + v_bus[psubid, loc_bus][0] = min( + v_bus[psubid, loc_bus][0], + el_v[i], ) - return info - - def _check_kirchhoff_storage(cls, info:KirchhoffInfo): - for i in range(cls.n_storage): - gptv = cls.storage_pos_topo_vect[i] - if info.topo_vect[gptv] == -1: - # storage is disconnected - continue - loc_bus = info.topo_vect[gptv] - 1 - - info.p_subs[cls.storage_to_subid[i]] += info.p_storage[i] - info.q_subs[cls.storage_to_subid[i]] += info.q_storage[i] - info.p_bus[ - cls.storage_to_subid[i], loc_bus - ] += info.p_storage[i] - info.q_bus[ - cls.storage_to_subid[i], loc_bus - ] += info.q_storage[i] - - # compute max and min values - if info.v_storage[i] > 0: - # the storage unit is connected - info.v_bus[ - cls.storage_to_subid[i], - loc_bus, - ][0] = min( - info.v_bus[ - cls.storage_to_subid[i], - loc_bus, - ][0], - info.v_storage[i], - ) - info.v_bus[ - cls.storage_to_subid[i], - loc_bus, - ][1] = max( - info.v_bus[ - cls.storage_to_subid[i], - loc_bus, - ][1], - info.v_storage[i], + v_bus[psubid, loc_bus][1] = max( + v_bus[psubid, loc_bus][1], + el_v[i], ) - return info - - def _check_kirchhoff_shunt(cls, info:KirchhoffInfo): - if cls.shunts_data_available: - for i in range(cls.n_shunt): - if info.bus_s[i] == -1: - # shunt is disconnected - continue - - # for substations - info.p_subs[cls.shunt_to_subid[i]] += info.p_s[i] - info.q_subs[cls.shunt_to_subid[i]] += info.q_s[i] - - # for buses - info.p_bus[cls.shunt_to_subid[i], info.bus_s[i] - 1] += info.p_s[i] - info.q_bus[cls.shunt_to_subid[i], info.bus_s[i] - 1] += info.q_s[i] - # compute max and min values - info.v_bus[cls.shunt_to_subid[i], info.bus_s[i] - 1][0] = min( - info.v_bus[cls.shunt_to_subid[i], info.bus_s[i] - 1][0], info.v_s[i] - ) - info.v_bus[cls.shunt_to_subid[i], info.bus_s[i] - 1][1] = max( - info.v_bus[cls.shunt_to_subid[i], info.bus_s[i] - 1][1], info.v_s[i] - ) - else: - warnings.warn( - "Backend.check_kirchhoff Impossible to get shunt information. Reactive information might be " - "incorrect." - ) - return info - def check_kirchhoff(self) -> Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray, np.ndarray]: """ INTERNAL @@ -1534,13 +1346,9 @@ def check_kirchhoff(self) -> Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarra if cls.n_storage > 0: p_storage, q_storage, v_storage = self.storages_info() - else: - p_storage, q_storage, v_storage = (None, None, None) if cls.shunts_data_available: p_s, q_s, v_s, bus_s = self.shunt_info() - else: - p_s, q_s, v_s, bus_s = (None, None, None, None) # fist check the "substation law" : nothing is created at any substation p_subs = np.zeros(cls.n_sub, dtype=dt_float) @@ -1553,29 +1361,61 @@ def check_kirchhoff(self) -> Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarra np.zeros((cls.n_sub, cls.n_busbar_per_sub, 2), dtype=dt_float) - 1.0 ) # sub, busbar, [min,max] topo_vect = self.get_topo_vect() - info = KirchhoffInfo(p_or, q_or, v_or, - p_ex, q_ex, v_ex, - p_gen, q_gen, v_gen, - p_load, q_load, v_load, - p_subs, q_subs, - p_bus, q_bus, v_bus, - topo_vect, - p_storage, q_storage, v_storage, - p_s, q_s, v_s, bus_s) - - # bellow i'm "forced" to do a loop otherwise, numpy do not compute the "+=" the way I want it to. - # for example, if two powerlines are such that line_or_to_subid is equal (eg both connected to substation 0) - # then numpy do not guarantee that `p_subs[self.line_or_to_subid] += p_or` will add the two "corresponding p_or" - # TODO this can be vectorized with matrix product, see example in obs.flow_bus_matrix (BaseObervation.py) - info = Backend._check_kirchhoff_lines(cls, info) - info = Backend._check_kirchhoff_gens(cls, info) - info = Backend._check_kirchhoff_loads(cls, info) - info = Backend._check_kirchhoff_storage(cls, info) + + self._aux_check_kirchhoff( + cls.n_line, + cls.line_or_to_subid, + topo_vect[cls.line_or_pos_topo_vect], + p_or, q_or, v_or, + p_subs, q_subs, + p_bus, q_bus, v_bus) + self._aux_check_kirchhoff( + cls.n_line, + cls.line_ex_to_subid, + topo_vect[cls.line_ex_pos_topo_vect], + p_ex, q_ex, v_ex, + p_subs, q_subs, + p_bus, q_bus, v_bus) + self._aux_check_kirchhoff( + cls.n_load, + cls.load_to_subid, + topo_vect[cls.load_pos_topo_vect], + p_load, q_load, v_load, + p_subs, q_subs, + p_bus, q_bus, v_bus) + self._aux_check_kirchhoff( + cls.n_gen, + cls.gen_to_subid, + topo_vect[cls.gen_pos_topo_vect], + p_gen, q_gen, v_gen, + p_subs, q_subs, + p_bus, q_bus, v_bus, + load_conv=False) + if cls.n_storage: + self._aux_check_kirchhoff( + cls.n_storage, + cls.storage_to_subid, + topo_vect[cls.storage_pos_topo_vect], + p_storage, q_storage, v_storage, + p_subs, q_subs, + p_bus, q_bus, v_bus) + if cls.shunts_data_available: - info = Backend._check_kirchhoff_shunt(info) + self._aux_check_kirchhoff( + cls.n_shunt, + cls.shunt_to_subid, + bus_s, + p_s, q_s, v_s, + p_subs, q_subs, + p_bus, q_bus, v_bus) + else: + warnings.warn( + "Observation.check_kirchhoff Impossible to get shunt information. Reactive information might be " + "incorrect." + ) diff_v_bus = np.zeros((cls.n_sub, cls.n_busbar_per_sub), dtype=dt_float) - diff_v_bus[:, :] = info.v_bus[:, :, 1] - info.v_bus[:, :, 0] - return info.p_subs, info.q_subs, info.p_bus, info.q_bus, diff_v_bus + diff_v_bus[:, :] = v_bus[:, :, 1] - v_bus[:, :, 0] + return p_subs, q_subs, p_bus, q_bus, diff_v_bus def _fill_names_obj(self): """fill the name vectors (**eg** name_line) if not done already in the backend.