Skip to content

Commit

Permalink
Improve process phases (#242)
Browse files Browse the repository at this point in the history
* improve process_phases

* Add test for 1p current

May help with testing meter value processing

* change measurand to metric in process_phases

* improve consistency when processing phases

* pass list to average_of_nonzero

* use correct unit in process_phases

Co-authored-by: lbbrhzn <@lbbrhzn>
Co-authored-by: drc38 <[email protected]>
  • Loading branch information
lbbrhzn and drc38 authored Dec 16, 2021
1 parent e5ab73d commit 99f6e10
Showing 1 changed file with 36 additions and 41 deletions.
77 changes: 36 additions & 41 deletions custom_components/ocpp/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -774,6 +774,13 @@ async def async_update_device_info(self, boot_info: dict):

def process_phases(self, data):
"""Process phase data from meter values payload."""

def average_of_nonzero(values):
nonzero_values: list = [v for v in values if float(v) != 0.0]
nof_values: int = len(nonzero_values)
average = sum(nonzero_values) / nof_values if nof_values > 0 else 0
return average

measurand_data = {}
for sv in data:
# create ordered Dict for each measurand, eg {"voltage":{"unit":"V","L1":"230"...}}
Expand All @@ -792,69 +799,57 @@ def process_phases(self, data):
self._metrics[measurand].extra_attr[phase] = float(value)
self._metrics[measurand].extra_attr[om.context.value] = context

line_phases = [Phase.l1.value, Phase.l2.value, Phase.l3.value]
line_to_neutral_phases = [Phase.l1_n.value, Phase.l2_n.value, Phase.l2_n.value]
line_to_line_phases = [Phase.l1_l2.value, Phase.l2_l3.value, Phase.l3_l1.value]

for metric, phase_info in measurand_data.items():
metric_value = None
if metric in [Measurand.voltage.value]:
if (
(Phase.l1_n.value in phase_info)
and (Phase.l2_n.value in phase_info)
and (Phase.l3_n.value in phase_info)
):
# Multiple line-neutral voltages are averaged.
metric_value = (
phase_info.get(Phase.l1_n.value, 0)
+ phase_info.get(Phase.l2_n.value, 0)
+ phase_info.get(Phase.l3_n.value, 0)
) / 3
elif Phase.l1_n.value in phase_info:
# Single line-neutral voltages are copied
metric_value = phase_info.get(Phase.l1_n.value, 0)
elif Phase.l2_n.value in phase_info:
metric_value = phase_info.get(Phase.l2_n.value, 0)
elif Phase.l3_n.value in phase_info:
metric_value = phase_info.get(Phase.l3_n.value, 0)
elif Phase.l1_l2.value in phase_info:
# Line-line voltages are converted to line-neutral and averaged.
metric_value = (
phase_info.get(Phase.l1_l2.value, 0)
+ phase_info.get(Phase.l2_l3.value, 0)
+ phase_info.get(Phase.l3_l1.value, 0)
) / (3 * sqrt(3))
if (phase_info.keys() & line_to_neutral_phases) is not None:
# Line to neutral voltages are averaged
metric_value = average_of_nonzero(
[phase_info.get(phase, 0) for phase in line_to_neutral_phases]
)
elif (phase_info.keys() & line_to_line_phases) is not None:
# Line to line voltages are averaged and converted to line to neutral
metric_value = average_of_nonzero(
[phase_info.get(phase, 0) for phase in line_to_line_phases]
) / sqrt(3)
elif metric in [
Measurand.current_import.value,
Measurand.current_export.value,
Measurand.power_active_import.value,
Measurand.power_active_export.value,
]:
# Currents and powers are always summed.
metric_value = max(
phase_info.get(Phase.l1.value, 0)
+ phase_info.get(Phase.l2.value, 0)
+ phase_info.get(Phase.l3.value, 0),
phase_info.get(Phase.l1_n.value, 0)
+ phase_info.get(Phase.l2_n.value, 0)
+ phase_info.get(Phase.l3_n.value, 0),
phase_info.get(Phase.l1_l2.value, 0)
+ phase_info.get(Phase.l2_l3.value, 0)
+ phase_info.get(Phase.l3_l1.value, 0),
)
if (phase_info.keys() & line_phases) is not None:
metric_value = sum(
phase_info.get(phase, 0) for phase in line_phases
)
elif (phase_info.keys() & line_to_neutral_phases) is not None:
# Workaround for some chargers that erroneously use line to neutral for current
metric_value = sum(
phase_info.get(phase, 0) for phase in line_to_neutral_phases
)

if metric_value is not None:
metric_unit = phase_info.get(om.unit.value)
_LOGGER.debug(
"Metric: %s, phase_info: %s value: %f",
"process_phases: metric: %s, phase_info: %s value: %f unit :%s",
metric,
phase_info,
metric_value,
metric_unit,
)
if unit == DEFAULT_POWER_UNIT:
if metric_unit == DEFAULT_POWER_UNIT:
self._metrics[metric].value = float(metric_value) / 1000
self._metrics[metric].unit = HA_POWER_UNIT
elif unit == DEFAULT_ENERGY_UNIT:
elif metric_unit == DEFAULT_ENERGY_UNIT:
self._metrics[metric].value = float(metric_value) / 1000
self._metrics[metric].unit = HA_ENERGY_UNIT
else:
self._metrics[metric].value = round(float(metric_value), 1)
self._metrics[metric].unit = unit
self._metrics[metric].unit = metric_unit

@on(Action.MeterValues)
def on_meter_values(self, connector_id: int, meter_value: Dict, **kwargs):
Expand Down

0 comments on commit 99f6e10

Please sign in to comment.