From bf9b26ad20d167ffeeb09af8e176fd5dc663011b Mon Sep 17 00:00:00 2001 From: Matthias Alphart Date: Sun, 8 Dec 2024 09:38:43 +0100 Subject: [PATCH] Add DPT 29 definitions (#1617) * Add DPT 29 definitions * Common base class for struct usage --- docs/changelog.md | 5 +++ test/dpt_tests/dpt_29_test.py | 35 +++++++++++++++++++++ test/dpt_tests/dpt_lookup_test.py | 4 +++ xknx/dpt/__init__.py | 6 ++++ xknx/dpt/dpt.py | 38 ++++++++++++++++++++++ xknx/dpt/dpt_12.py | 37 ++-------------------- xknx/dpt/dpt_13.py | 5 +-- xknx/dpt/dpt_29.py | 52 +++++++++++++++++++++++++++++++ xknx/dpt/dpt_5.py | 1 + xknx/dpt/dpt_8.py | 10 ++---- 10 files changed, 149 insertions(+), 44 deletions(-) create mode 100644 test/dpt_tests/dpt_29_test.py create mode 100644 xknx/dpt/dpt_29.py diff --git a/docs/changelog.md b/docs/changelog.md index 03333483f..b3e6188d1 100644 --- a/docs/changelog.md +++ b/docs/changelog.md @@ -6,6 +6,11 @@ nav_order: 2 # Changelog + +### DPT + +- Add DPT 29 - 8byte signed definitions: generic, 29.010, 29.011, 20.012 + ### Management - Add rate limit (in packets per second) option to P2PConnection. diff --git a/test/dpt_tests/dpt_29_test.py b/test/dpt_tests/dpt_29_test.py new file mode 100644 index 000000000..71a0671ad --- /dev/null +++ b/test/dpt_tests/dpt_29_test.py @@ -0,0 +1,35 @@ +"""Unit test for KNX 8 byte signed objects.""" + +import pytest + +from xknx.dpt import DPT8ByteSigned, DPTArray +from xknx.exceptions import ConversionError + + +class TestDPT8ByteSigned: + """Test class for KNX 8 byte signed objects.""" + + @pytest.mark.parametrize( + ("raw", "expected"), + ( + (b"\x00\x00\x00\x00\x00\x00\x00\x00", 0), + (b"\x00\x00\x00\x00\x00\x00\x00\x01", 1), + (b"\x00\x00\x00\x00\x00\x00\x00\xe6", 230), + (b"\xff\xff\xff\xff\xff\xff\xff\x1a", -230), + # limits + (b"\x7f\xff\xff\xff\xff\xff\xff\xff", 9_223_372_036_854_775_807), + (b"\x80\x00\x00\x00\x00\x00\x00\x00", -9_223_372_036_854_775_808), + ), + ) + def test_values(self, raw, expected): + """Test valid values.""" + assert DPT8ByteSigned.to_knx(expected) == DPTArray(raw) + assert DPT8ByteSigned.from_knx(DPTArray(raw)) == expected + + @pytest.mark.parametrize( + "value", (9_223_372_036_854_775_808, -9_223_372_036_854_775_809) + ) + def test_exceeding_limits(self, value): + """Test invalid values.""" + with pytest.raises(ConversionError): + DPT8ByteSigned.to_knx(value) diff --git a/test/dpt_tests/dpt_lookup_test.py b/test/dpt_tests/dpt_lookup_test.py index 97475978a..9a627c268 100644 --- a/test/dpt_tests/dpt_lookup_test.py +++ b/test/dpt_tests/dpt_lookup_test.py @@ -37,6 +37,7 @@ ("4byte_float", DPT4ByteFloat, 14, None, None), ("4byte_signed", DPT4ByteSigned, 13, None, None), ("4byte_unsigned", DPT4ByteUnsigned, 12, None, None), + ("8byte_signed", DPT8ByteSigned, 29, None, None), ("absolute_humidity", DPTAbsoluteHumidity, 9, 29, "g/m³"), ("absolute_temperature", DPTAbsoluteTemperature, 14, 69, "K"), ("acceleration", DPTAcceleration, 14, 0, "m/s²"), @@ -44,6 +45,7 @@ ("ack", DPTAck, 1, 16, None), ("activation_energy", DPTActivationEnergy, 14, 2, "J/mol"), ("active_energy", DPTActiveEnergy, 13, 10, "Wh"), + ("active_energy_8byte", DPTActiveEnergy8Byte, 29, 10, "Wh"), ("active_energy_kwh", DPTActiveEnergykWh, 13, 13, "kWh"), ("active_energy_mwh", DPTActiveEnergyMWh, 13, 16, "MWh"), ("activity", DPTActivity, 14, 3, "s⁻¹"), @@ -57,6 +59,7 @@ ("angular_momentum", DPTAngularMomentum, 14, 8, "J s"), ("angular_velocity", DPTAngularVelocity, 14, 9, "rad/s"), ("apparant_energy", DPTApparantEnergy, 13, 11, "VAh"), + ("apparant_energy_8byte", DPTApparantEnergy8Byte, 29, 11, "VAh"), ("apparant_energy_kvah", DPTApparantEnergykVAh, 13, 14, "kVAh"), ("apparent_power", DPTApparentPower, 14, 80, "VA"), ("area", DPTArea, 14, 10, "m²"), @@ -175,6 +178,7 @@ ("ramp", DPTRamp, 1, 4, None), ("reactance", DPTReactance, 14, 59, "Ω"), ("reactive_energy", DPTReactiveEnergy, 13, 12, "VARh"), + ("reactive_energy_8byte", DPTReactiveEnergy8Byte, 29, 12, "VARh"), ("reactive_energy_kvarh", DPTReactiveEnergykVARh, 13, 15, "kVARh"), ("reset", DPTReset, 1, 15, None), ("resistance", DPTResistance, 14, 60, "Ω"), diff --git a/xknx/dpt/__init__.py b/xknx/dpt/__init__.py index e2d0f42cf..e2a8b7a00 100644 --- a/xknx/dpt/__init__.py +++ b/xknx/dpt/__init__.py @@ -213,6 +213,12 @@ from .dpt_18 import DPTSceneControl, SceneControl from .dpt_19 import DPTDateTime from .dpt_20 import DPTHVACContrMode, DPTHVACMode, DPTHVACStatus +from .dpt_29 import ( + DPT8ByteSigned, + DPTActiveEnergy8Byte, + DPTApparantEnergy8Byte, + DPTReactiveEnergy8Byte, +) from .dpt_232 import DPTColorRGB, RGBColor from .dpt_235 import DPTTariffActiveEnergy, TariffActiveEnergy from .dpt_242 import DPTColorXYY, XYYColor diff --git a/xknx/dpt/dpt.py b/xknx/dpt/dpt.py index c1b3f6b60..ebe62f5d8 100644 --- a/xknx/dpt/dpt.py +++ b/xknx/dpt/dpt.py @@ -7,6 +7,7 @@ from dataclasses import dataclass from enum import Enum from inspect import isabstract +import struct from typing import Any, Generic, TypedDict, TypeVar, cast, final from xknx.exceptions import ConversionError, CouldNotParseTelegram @@ -228,6 +229,43 @@ def to_knx(cls, value: int | float) -> DPTArray: """Serialize to KNX/IP raw data.""" +class DPTStructIntMixin: + """ + Mixin for DPT classes using struct to convert values. + + Base class shall be DPTNumeric. + Resolution shall always be 1. + """ + + value_min: int | float + value_max: int | float + # https://docs.python.org/3/library/struct.html#format-characters + _struct_format: str + + @classmethod + def from_knx(cls, payload: DPTArray | DPTBinary) -> int: + """Parse/deserialize from KNX/IP raw data.""" + raw = cls.validate_payload(payload) # type: ignore[attr-defined] + + try: + return struct.unpack(cls._struct_format, bytes(raw))[0] # type: ignore[no-any-return] + except struct.error as err: + raise ConversionError(f"Could not parse {cls.__name__}", raw=raw) from err + + @classmethod + def to_knx(cls, value: int | float) -> DPTArray: + """Serialize to KNX/IP raw data.""" + try: + knx_value = int(value) + if not (cls.value_min <= knx_value <= cls.value_max): + raise ValueError + return DPTArray(struct.pack(cls._struct_format, knx_value)) + except (ValueError, struct.error) as err: + raise ConversionError( + f"Could not serialize {cls.__name__}", value=value + ) from err + + class DPTEnumData(Enum): """ Base class for KNX data point types decoding Enum values. diff --git a/xknx/dpt/dpt_12.py b/xknx/dpt/dpt_12.py index ce560233f..70d5b5cd1 100644 --- a/xknx/dpt/dpt_12.py +++ b/xknx/dpt/dpt_12.py @@ -2,15 +2,10 @@ from __future__ import annotations -import struct +from .dpt import DPTNumeric, DPTStructIntMixin -from xknx.exceptions import ConversionError -from .dpt import DPTNumeric -from .payload import DPTArray, DPTBinary - - -class DPT4ByteUnsigned(DPTNumeric): +class DPT4ByteUnsigned(DPTStructIntMixin, DPTNumeric): """ Abstraction for KNX 4 Byte "32-bit unsigned". @@ -28,34 +23,6 @@ class DPT4ByteUnsigned(DPTNumeric): _struct_format = ">I" - @classmethod - def from_knx(cls, payload: DPTArray | DPTBinary) -> int: - """Parse/deserialize from KNX/IP raw data.""" - raw = cls.validate_payload(payload) - - try: - return struct.unpack(cls._struct_format, bytes(raw))[0] # type: ignore[no-any-return] - except struct.error as err: - raise ConversionError(f"Could not parse {cls.__name__}", raw=raw) from err - - @classmethod - def to_knx(cls, value: int | float) -> DPTArray: - """Serialize to KNX/IP raw data.""" - try: - knx_value = int(value) - if not cls._test_boundaries(knx_value): - raise ValueError - return DPTArray(struct.pack(cls._struct_format, knx_value)) - except (ValueError, struct.error) as err: - raise ConversionError( - f"Could not serialize {cls.__name__}", value=value - ) from err - - @classmethod - def _test_boundaries(cls, value: int) -> bool: - """Test if value is within defined range for this object.""" - return cls.value_min <= value <= cls.value_max - class DPTValue4Ucount(DPT4ByteUnsigned): """DPT 12.001 DPT_Value_4_Ucount.""" diff --git a/xknx/dpt/dpt_13.py b/xknx/dpt/dpt_13.py index 3809f3188..284fab44c 100644 --- a/xknx/dpt/dpt_13.py +++ b/xknx/dpt/dpt_13.py @@ -2,10 +2,10 @@ from __future__ import annotations -from .dpt_12 import DPT4ByteUnsigned +from .dpt import DPTNumeric, DPTStructIntMixin -class DPT4ByteSigned(DPT4ByteUnsigned): +class DPT4ByteSigned(DPTStructIntMixin, DPTNumeric): """ Abstraction for KNX 4 Byte "32-bit signed". @@ -15,6 +15,7 @@ class DPT4ByteSigned(DPT4ByteUnsigned): dpt_main_number = 13 dpt_sub_number: int | None = None value_type = "4byte_signed" + payload_length = 4 value_min = -2147483648 value_max = 2147483647 diff --git a/xknx/dpt/dpt_29.py b/xknx/dpt/dpt_29.py new file mode 100644 index 000000000..56293772c --- /dev/null +++ b/xknx/dpt/dpt_29.py @@ -0,0 +1,52 @@ +"""Implementation of Basic KNX 8-Byte signed (2's complement) values.""" + +from __future__ import annotations + +from .dpt import DPTNumeric, DPTStructIntMixin + + +class DPT8ByteSigned(DPTStructIntMixin, DPTNumeric): + """ + Abstraction for KNX 8 Byte "64-bit signed". + + DPT 29.*** + """ + + dpt_main_number = 29 + dpt_sub_number: int | None = None + payload_length = 8 + value_type = "8byte_signed" + + value_min = -9_223_372_036_854_775_808 + value_max = 9_223_372_036_854_775_807 + resolution = 1 + + _struct_format = ">q" + + +class DPTActiveEnergy8Byte(DPT8ByteSigned): + """DPT 29.010 DPT_Active_Energy_V64.""" + + dpt_main_number = 29 + dpt_sub_number = 10 + value_type = "active_energy_8byte" + unit = "Wh" + ha_device_class = "energy" + + +class DPTApparantEnergy8Byte(DPT8ByteSigned): + """DPT 29.011 DPT_Apparant_Energy_V64 (VAh).""" + + dpt_main_number = 29 + dpt_sub_number = 11 + value_type = "apparant_energy_8byte" + unit = "VAh" + + +class DPTReactiveEnergy8Byte(DPT8ByteSigned): + """DPT 29.012 DPT_Reactive_Energy_V64 (VARh).""" + + dpt_main_number = 29 + dpt_sub_number = 12 + value_type = "reactive_energy_8byte" + unit = "VARh" diff --git a/xknx/dpt/dpt_5.py b/xknx/dpt/dpt_5.py index 2c15388a1..d5b2a1f20 100644 --- a/xknx/dpt/dpt_5.py +++ b/xknx/dpt/dpt_5.py @@ -135,6 +135,7 @@ class DPTPercentU8(DPTValue1ByteUnsigned): dpt_sub_number = 4 value_type = "percentU8" unit = "%" + resolution = 1 class DPTDecimalFactor(DPTValue1ByteUnsigned): diff --git a/xknx/dpt/dpt_8.py b/xknx/dpt/dpt_8.py index e03df5d99..2102ea403 100644 --- a/xknx/dpt/dpt_8.py +++ b/xknx/dpt/dpt_8.py @@ -29,8 +29,9 @@ class DPT2ByteSigned(DPTNumeric): value_min = -32768 value_max = 32767 - resolution: float = 1 + resolution: int | float = 1 + # not using DPTStructIntMixin because return type of from_knx can be float when resolution is < 1 _struct_format = ">h" @classmethod @@ -48,7 +49,7 @@ def to_knx(cls, value: int | float) -> DPTArray: """Serialize to KNX/IP raw data.""" try: knx_value = int(float(value) / cls.resolution) - if not cls._test_boundaries(knx_value): + if not (cls.value_min <= knx_value <= cls.value_max): raise ValueError("Value out of range") return DPTArray(struct.pack(cls._struct_format, knx_value)) except (ValueError, struct.error) as err: @@ -56,11 +57,6 @@ def to_knx(cls, value: int | float) -> DPTArray: f"Could not serialize {cls.__name__}", value=value ) from err - @classmethod - def _test_boundaries(cls, value: int) -> bool: - """Test if value is within defined range for this object.""" - return cls.value_min <= value <= cls.value_max - class DPTValue2Count(DPT2ByteSigned): """DPT 8.001 DPT_Value_2_Count (pulses)."""