Skip to content

Commit

Permalink
Refact: Emulate obs._check_kirchhoff in Backend._check_kirchhoff
Browse files Browse the repository at this point in the history
Signed-off-by: Xavier Weiss <[email protected]>
  • Loading branch information
DEUCE1957 committed Dec 2, 2024
1 parent 42535de commit 9a6b696
Showing 1 changed file with 89 additions and 249 deletions.
338 changes: 89 additions & 249 deletions grid2op/Backend/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
from abc import ABC, abstractmethod
import numpy as np
import pandas as pd
from dataclasses import dataclass
from typing import Tuple, Optional, Any, Dict, Union

try:
Expand All @@ -39,37 +38,7 @@
from grid2op.Space import GridObjects, DEFAULT_N_BUSBAR_PER_SUB, DEFAULT_ALLOW_DETACHMENT


@dataclass
class KirchhoffInfo:
p_or:np.ndarray
q_or:np.ndarray
v_or:np.ndarray
p_ex:np.ndarray
q_ex:np.ndarray
v_ex:np.ndarray
p_gen:np.ndarray
q_gen:np.ndarray
v_gen:np.ndarray
p_load:np.ndarray
q_load:np.ndarray
v_load:np.ndarray
p_subs:np.ndarray
q_subs:np.ndarray
p_bus:np.ndarray
q_bus:np.ndarray
v_bus:np.ndarray
topo_vect:np.ndarray
p_storage:Optional[np.ndarray] = None
q_storage:Optional[np.ndarray] = None
v_storage:Optional[np.ndarray] = None
p_s:Optional[np.ndarray] = None
q_s:Optional[np.ndarray] = None
v_s:Optional[np.ndarray] = None
bus_s:Optional[np.ndarray] = None

# TODO method to get V and theta at each bus, could be in the same shape as check_kirchoff


class Backend(GridObjects, ABC):
"""
INTERNAL
Expand Down Expand Up @@ -1284,210 +1253,53 @@ def check_kirchoff(self) -> Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray
warnings.warn(message="please use backend.check_kirchhoff() instead", category=DeprecationWarning)
return self.check_kirchhoff()

@staticmethod
def _check_kirchhoff_lines(cls, info:KirchhoffInfo):
for i in range(cls.n_line):
sub_or_id = cls.line_or_to_subid[i]
sub_ex_id = cls.line_ex_to_subid[i]
if (info.topo_vect[cls.line_or_pos_topo_vect[i]] == -1 or
info.topo_vect[cls.line_ex_pos_topo_vect[i]] == -1):
# line is disconnected
def _aux_check_kirchhoff(self,
n_elt:int, # cst eg. cls.n_gen
el_to_subid:np.ndarray, # cst eg. cls.gen_to_subid
el_bus:np.ndarray, # cst eg. gen_bus
el_p:np.ndarray, # cst, eg. gen_p
el_q:np.ndarray, # cst, eg. gen_q
el_v:np.ndarray, # cst, eg. gen_v
p_subs:np.ndarray, q_subs:np.ndarray,
p_bus:np.ndarray, q_bus:np.ndarray,
v_bus:np.ndarray,
# whether the object is load convention (True) or gen convention (False)
load_conv:np.ndarray=True):
for i in range(n_elt):
psubid = el_to_subid[i]
if el_bus[i] == -1:
# el is disconnected
continue
loc_bus_or = info.topo_vect[cls.line_or_pos_topo_vect[i]] - 1
loc_bus_ex = info.topo_vect[cls.line_ex_pos_topo_vect[i]] - 1

# for substations
info.p_subs[sub_or_id] += info.p_or[i]
info.p_subs[sub_ex_id] += info.p_ex[i]

info.q_subs[sub_or_id] += info.q_or[i]
info.q_subs[sub_ex_id] += info.q_ex[i]

# for bus
info.p_bus[sub_or_id, loc_bus_or] += info.p_or[i]
info.q_bus[sub_or_id, loc_bus_or] += info.q_or[i]

info.p_bus[ sub_ex_id, loc_bus_ex] += info.p_ex[i]
info.q_bus[sub_ex_id, loc_bus_ex] += info.q_ex[i]

# fill the min / max voltage per bus (initialization)
if (info.v_bus[sub_or_id, loc_bus_or,][0] == -1):
info.v_bus[sub_or_id, loc_bus_or,][0] = info.v_or[i]
if (info.v_bus[sub_ex_id, loc_bus_ex,][0] == -1):
info.v_bus[sub_ex_id, loc_bus_ex,][0] = info.v_ex[i]
if (info.v_bus[sub_or_id, loc_bus_or,][1]== -1):
info.v_bus[sub_or_id, loc_bus_or,][1] = info.v_or[i]
if (info.v_bus[sub_ex_id, loc_bus_ex,][1]== -1):
info.v_bus[sub_ex_id, loc_bus_ex,][1] = info.v_ex[i]

# now compute the correct stuff
if info.v_or[i] > 0.0:
# line is connected
info.v_bus[sub_or_id, loc_bus_or,][0] = min(info.v_bus[sub_or_id, loc_bus_or,][0],info.v_or[i],)
info.v_bus[sub_or_id, loc_bus_or,][1] = max(info.v_bus[sub_or_id, loc_bus_or,][1],info.v_or[i],)

if info.v_ex[i] > 0:
# line is connected
info.v_bus[sub_ex_id, loc_bus_ex,][0] = min(info.v_bus[sub_ex_id, loc_bus_ex,][0],info.v_ex[i],)
info.v_bus[sub_ex_id, loc_bus_ex,][1] = max(info.v_bus[sub_ex_id, loc_bus_ex,][1],info.v_ex[i],)
return info

@staticmethod
def _check_kirchhoff_gens(cls, info:KirchhoffInfo):
for i in range(cls.n_gen):
gptv = cls.gen_pos_topo_vect[i]

if info.topo_vect[gptv] == -1:
# gen is disconnected
continue

# for substations
info.p_subs[cls.gen_to_subid[i]] -= info.p_gen[i]
info.q_subs[cls.gen_to_subid[i]] -= info.q_gen[i]
if load_conv:
p_subs[psubid] += el_p[i]
q_subs[psubid] += el_q[i]
else:
p_subs[psubid] -= el_p[i]
q_subs[psubid] -= el_q[i]

loc_bus = info.topo_vect[gptv] - 1
# for bus
info.p_bus[
cls.gen_to_subid[i], loc_bus
] -= info.p_gen[i]
info.q_bus[
cls.gen_to_subid[i], loc_bus
] -= info.q_gen[i]
loc_bus = el_bus[i] - 1
if load_conv:
p_bus[psubid, loc_bus] += el_p[i]
q_bus[psubid, loc_bus] += el_q[i]
else:
p_bus[psubid, loc_bus] -= el_p[i]
q_bus[psubid, loc_bus] -= el_q[i]

# compute max and min values
if info.v_gen[i]:
if el_v is not None and el_v[i]:
# but only if gen is connected
info.v_bus[cls.gen_to_subid[i], loc_bus][
0
] = min(
info.v_bus[
cls.gen_to_subid[i], loc_bus
][0],
info.v_gen[i],
)
info.v_bus[cls.gen_to_subid[i], loc_bus][
1
] = max(
info.v_bus[
cls.gen_to_subid[i], loc_bus
][1],
info.v_gen[i],
)
return info

def _check_kirchhoff_loads(cls, info:KirchhoffInfo):
for i in range(cls.n_load):
gptv = cls.load_pos_topo_vect[i]

if info.topo_vect[gptv] == -1:
# load is disconnected
continue
loc_bus = info.topo_vect[gptv] - 1

# for substations
info.p_subs[cls.load_to_subid[i]] += info.p_load[i]
info.q_subs[cls.load_to_subid[i]] += info.q_load[i]

# for buses
info.p_bus[
cls.load_to_subid[i], loc_bus
] += info.p_load[i]
info.q_bus[
cls.load_to_subid[i], loc_bus
] += info.q_load[i]

# compute max and min values
if info.v_load[i]:
# but only if load is connected
info.v_bus[cls.load_to_subid[i], loc_bus][
0
] = min(
info.v_bus[
cls.load_to_subid[i], loc_bus
][0],
info.v_load[i],
)
info.v_bus[cls.load_to_subid[i], loc_bus][
1
] = max(
info.v_bus[
cls.load_to_subid[i], loc_bus
][1],
info.v_load[i],
v_bus[psubid, loc_bus][0] = min(
v_bus[psubid, loc_bus][0],
el_v[i],
)
return info

def _check_kirchhoff_storage(cls, info:KirchhoffInfo):
for i in range(cls.n_storage):
gptv = cls.storage_pos_topo_vect[i]
if info.topo_vect[gptv] == -1:
# storage is disconnected
continue
loc_bus = info.topo_vect[gptv] - 1

info.p_subs[cls.storage_to_subid[i]] += info.p_storage[i]
info.q_subs[cls.storage_to_subid[i]] += info.q_storage[i]
info.p_bus[
cls.storage_to_subid[i], loc_bus
] += info.p_storage[i]
info.q_bus[
cls.storage_to_subid[i], loc_bus
] += info.q_storage[i]

# compute max and min values
if info.v_storage[i] > 0:
# the storage unit is connected
info.v_bus[
cls.storage_to_subid[i],
loc_bus,
][0] = min(
info.v_bus[
cls.storage_to_subid[i],
loc_bus,
][0],
info.v_storage[i],
)
info.v_bus[
cls.storage_to_subid[i],
loc_bus,
][1] = max(
info.v_bus[
cls.storage_to_subid[i],
loc_bus,
][1],
info.v_storage[i],
v_bus[psubid, loc_bus][1] = max(
v_bus[psubid, loc_bus][1],
el_v[i],
)
return info

def _check_kirchhoff_shunt(cls, info:KirchhoffInfo):
if cls.shunts_data_available:
for i in range(cls.n_shunt):
if info.bus_s[i] == -1:
# shunt is disconnected
continue

# for substations
info.p_subs[cls.shunt_to_subid[i]] += info.p_s[i]
info.q_subs[cls.shunt_to_subid[i]] += info.q_s[i]

# for buses
info.p_bus[cls.shunt_to_subid[i], info.bus_s[i] - 1] += info.p_s[i]
info.q_bus[cls.shunt_to_subid[i], info.bus_s[i] - 1] += info.q_s[i]

# compute max and min values
info.v_bus[cls.shunt_to_subid[i], info.bus_s[i] - 1][0] = min(
info.v_bus[cls.shunt_to_subid[i], info.bus_s[i] - 1][0], info.v_s[i]
)
info.v_bus[cls.shunt_to_subid[i], info.bus_s[i] - 1][1] = max(
info.v_bus[cls.shunt_to_subid[i], info.bus_s[i] - 1][1], info.v_s[i]
)
else:
warnings.warn(
"Backend.check_kirchhoff Impossible to get shunt information. Reactive information might be "
"incorrect."
)
return info

def check_kirchhoff(self) -> Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray, np.ndarray]:
"""
INTERNAL
Expand Down Expand Up @@ -1534,13 +1346,9 @@ def check_kirchhoff(self) -> Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarra

if cls.n_storage > 0:
p_storage, q_storage, v_storage = self.storages_info()
else:
p_storage, q_storage, v_storage = (None, None, None)

if cls.shunts_data_available:
p_s, q_s, v_s, bus_s = self.shunt_info()
else:
p_s, q_s, v_s, bus_s = (None, None, None, None)

# fist check the "substation law" : nothing is created at any substation
p_subs = np.zeros(cls.n_sub, dtype=dt_float)
Expand All @@ -1553,29 +1361,61 @@ def check_kirchhoff(self) -> Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarra
np.zeros((cls.n_sub, cls.n_busbar_per_sub, 2), dtype=dt_float) - 1.0
) # sub, busbar, [min,max]
topo_vect = self.get_topo_vect()
info = KirchhoffInfo(p_or, q_or, v_or,
p_ex, q_ex, v_ex,
p_gen, q_gen, v_gen,
p_load, q_load, v_load,
p_subs, q_subs,
p_bus, q_bus, v_bus,
topo_vect,
p_storage, q_storage, v_storage,
p_s, q_s, v_s, bus_s)

# bellow i'm "forced" to do a loop otherwise, numpy do not compute the "+=" the way I want it to.
# for example, if two powerlines are such that line_or_to_subid is equal (eg both connected to substation 0)
# then numpy do not guarantee that `p_subs[self.line_or_to_subid] += p_or` will add the two "corresponding p_or"
# TODO this can be vectorized with matrix product, see example in obs.flow_bus_matrix (BaseObervation.py)
info = Backend._check_kirchhoff_lines(cls, info)
info = Backend._check_kirchhoff_gens(cls, info)
info = Backend._check_kirchhoff_loads(cls, info)
info = Backend._check_kirchhoff_storage(cls, info)

self._aux_check_kirchhoff(
cls.n_line,
cls.line_or_to_subid,
topo_vect[cls.line_or_pos_topo_vect],
p_or, q_or, v_or,
p_subs, q_subs,
p_bus, q_bus, v_bus)
self._aux_check_kirchhoff(
cls.n_line,
cls.line_ex_to_subid,
topo_vect[cls.line_ex_pos_topo_vect],
p_ex, q_ex, v_ex,
p_subs, q_subs,
p_bus, q_bus, v_bus)
self._aux_check_kirchhoff(
cls.n_load,
cls.load_to_subid,
topo_vect[cls.load_pos_topo_vect],
p_load, q_load, v_load,
p_subs, q_subs,
p_bus, q_bus, v_bus)
self._aux_check_kirchhoff(
cls.n_gen,
cls.gen_to_subid,
topo_vect[cls.gen_pos_topo_vect],
p_gen, q_gen, v_gen,
p_subs, q_subs,
p_bus, q_bus, v_bus,
load_conv=False)
if cls.n_storage:
self._aux_check_kirchhoff(
cls.n_storage,
cls.storage_to_subid,
topo_vect[cls.storage_pos_topo_vect],
p_storage, q_storage, v_storage,
p_subs, q_subs,
p_bus, q_bus, v_bus)

if cls.shunts_data_available:
info = Backend._check_kirchhoff_shunt(info)
self._aux_check_kirchhoff(
cls.n_shunt,
cls.shunt_to_subid,
bus_s,
p_s, q_s, v_s,
p_subs, q_subs,
p_bus, q_bus, v_bus)
else:
warnings.warn(
"Observation.check_kirchhoff Impossible to get shunt information. Reactive information might be "
"incorrect."
)
diff_v_bus = np.zeros((cls.n_sub, cls.n_busbar_per_sub), dtype=dt_float)
diff_v_bus[:, :] = info.v_bus[:, :, 1] - info.v_bus[:, :, 0]
return info.p_subs, info.q_subs, info.p_bus, info.q_bus, diff_v_bus
diff_v_bus[:, :] = v_bus[:, :, 1] - v_bus[:, :, 0]
return p_subs, q_subs, p_bus, q_bus, diff_v_bus

def _fill_names_obj(self):
"""fill the name vectors (**eg** name_line) if not done already in the backend.
Expand Down

0 comments on commit 9a6b696

Please sign in to comment.