From f1e870dc55de9265998b808be26a8aee4616522d Mon Sep 17 00:00:00 2001 From: Matthias Alphart Date: Sun, 8 Dec 2024 16:10:52 +0100 Subject: [PATCH] Support 2 or 4 byte float for weather air pressure (#1618) * Support 2 or 4 byte float for weather air pressure * Create remote_value_by_length_test.py * Update remote_value_by_length_test.py * Update weather.md --- docs/changelog.md | 3 + docs/weather.md | 2 +- test/devices_tests/weather_test.py | 23 ++-- .../remote_value_by_length_test.py | 88 +++++++++++++++ xknx/devices/weather.py | 6 +- xknx/remote_value/__init__.py | 2 + xknx/remote_value/remote_value_by_length.py | 102 ++++++++++++++++++ 7 files changed, 216 insertions(+), 10 deletions(-) create mode 100644 test/remote_value_tests/remote_value_by_length_test.py create mode 100644 xknx/remote_value/remote_value_by_length.py diff --git a/docs/changelog.md b/docs/changelog.md index b3e6188d1..335c68ee7 100644 --- a/docs/changelog.md +++ b/docs/changelog.md @@ -6,6 +6,9 @@ nav_order: 2 # Changelog +### Devices + +- Weather: Support either DPT 9.006 (2byte) or DPT 14.058 (4byte) for `group_address_air_pressure` ### DPT diff --git a/docs/weather.md b/docs/weather.md index 4dd304103..90494cc10 100644 --- a/docs/weather.md +++ b/docs/weather.md @@ -49,7 +49,7 @@ The weather device is basically a set of sensors that you can obtain from your w - **group_address_wind_alarm** KNX address for reading if wind alarm is on/off. - **group_address_frost_alarm** KNX address for reading if frost alarm is on/off. - **group_address_day_night** KNX address for reading a day/night object. -- **group_address_air_pressure** KNX address reading current air pressure. **DPT 9.006** +- **group_address_air_pressure** KNX address reading current air pressure. **DPT 9.006 or 14.058** - **group_address_humidity** KNX address for reading current humidity. **DPT 9.007** - **sync_state** Periodically sync the state. - **device_updated_cb** Callback for each update. diff --git a/test/devices_tests/weather_test.py b/test/devices_tests/weather_test.py index 883f3f0ec..fa6937ad7 100644 --- a/test/devices_tests/weather_test.py +++ b/test/devices_tests/weather_test.py @@ -2,6 +2,8 @@ import datetime +import pytest + from xknx import XKNX from xknx.devices import Weather from xknx.devices.weather import WeatherCondition @@ -87,35 +89,42 @@ async def test_brightness(self): assert weather._brightness_north.unit_of_measurement == "lx" assert weather._brightness_north.ha_device_class == "illuminance" - async def test_pressure(self): - """Test resolve state with pressure.""" + @pytest.mark.parametrize( + ("value", "payload"), + [ + (98631.68, DPTArray((0x6C, 0xB4))), # 2byte float + (98631.68, DPTArray((0x47, 0xC0, 0xA3, 0xD7))), # 4byte float + ], + ) + async def test_pressure(self, value, payload): + """Test air pressure telegram.""" xknx = XKNX() weather = Weather(name="weather", xknx=xknx, group_address_air_pressure="1/3/4") weather.process( Telegram( destination_address=GroupAddress("1/3/4"), - payload=GroupValueWrite(value=DPTArray((0x6C, 0xAD))), + payload=GroupValueWrite(value=payload), ) ) - assert weather.air_pressure == 98058.24 + assert weather.air_pressure == value assert weather._air_pressure.unit_of_measurement == "Pa" assert weather._air_pressure.ha_device_class == "pressure" async def test_humidity(self): - """Test humidity.""" + """Test humidity telegram.""" xknx = XKNX() weather = Weather(name="weather", xknx=xknx, group_address_humidity="1/2/4") weather.process( Telegram( destination_address=GroupAddress("1/2/4"), - payload=GroupValueWrite(value=DPTArray((0x7E, 0xE1))), + payload=GroupValueWrite(value=DPTArray((0x15, 0x73))), ) ) - assert weather.humidity == 577044.48 + assert weather.humidity == 55.8 assert weather._humidity.unit_of_measurement == "%" assert weather._humidity.ha_device_class == "humidity" diff --git a/test/remote_value_tests/remote_value_by_length_test.py b/test/remote_value_tests/remote_value_by_length_test.py new file mode 100644 index 000000000..061035d9a --- /dev/null +++ b/test/remote_value_tests/remote_value_by_length_test.py @@ -0,0 +1,88 @@ +"""Unit test for RemoteValueByLength objects.""" + +import pytest + +from xknx import XKNX +from xknx.dpt import ( + DPTArray, + DPTBase, + DPTBinary, + DPTColorXYY, + DPTOpenClose, + DPTPressure, + DPTPressure2Byte, + DPTTemperature, + DPTValue1Count, +) +from xknx.exceptions import ConversionError, CouldNotParseTelegram +from xknx.remote_value.remote_value_by_length import RemoteValueByLength + + +class TestRemoteValueByLength: + """Test class for RemoteValueByLength objects.""" + + @pytest.mark.parametrize( + "dpt_classes", + [ + (DPTOpenClose, DPTPressure), # DPTBinary payload invalid + (DPTTemperature, DPTPressure2Byte), # similar payload_length + (DPTColorXYY, DPTValue1Count), # non-numeric DPT + ], + ) + def test_invalid_dpt_classes(self, dpt_classes: tuple[type[DPTBase]]) -> None: + """Test if invalid DPT classes raise ConversionError.""" + xknx = XKNX() + with pytest.raises(ConversionError): + RemoteValueByLength(xknx, dpt_classes=dpt_classes) # type: ignore[arg-type] + + @pytest.mark.parametrize("payload", [DPTBinary(0), DPTArray((0, 1, 2))]) + def test_invalid_payload(self, payload: DPTArray | DPTBinary) -> None: + """Test if invalid payloads raise CouldNotParseTelegram.""" + xknx = XKNX() + remote_value = RemoteValueByLength( + xknx=xknx, + dpt_classes=(DPTPressure, DPTPressure2Byte), + ) + with pytest.raises(CouldNotParseTelegram): + remote_value.from_knx(payload) + + @pytest.mark.parametrize( + ("first_dpt", "invalid_dpt"), + [ + (DPTPressure, DPTPressure2Byte), + (DPTPressure2Byte, DPTPressure), + ], + ) + def test_payload_valid_mode_assignment( + self, first_dpt: type[DPTBase], invalid_dpt: type[DPTBase] + ) -> None: + """Test if DPT is assigned properly by payload length.""" + TEST_VALUE = 1 + xknx = XKNX() + remote_value = RemoteValueByLength( + xknx=xknx, + dpt_classes=(DPTPressure, DPTPressure2Byte), + ) + first_payload = first_dpt.to_knx(TEST_VALUE) + invalid_payload = invalid_dpt.to_knx(TEST_VALUE) + + assert remote_value._internal_dpt_class is None + assert remote_value.from_knx(first_payload) == TEST_VALUE + assert remote_value._internal_dpt_class == first_dpt + with pytest.raises(CouldNotParseTelegram): + # other DPT is invalid now + remote_value.from_knx(invalid_payload) + # to_knx works when initialized + assert remote_value.to_knx(TEST_VALUE) == first_payload + + def test_to_knx_uninitialized(self) -> None: + """Test to_knx raising ConversionError when DPT is not known.""" + xknx = XKNX() + remote_value = RemoteValueByLength( + xknx=xknx, + dpt_classes=(DPTPressure, DPTPressure2Byte), + ) + + assert remote_value._internal_dpt_class is None + with pytest.raises(ConversionError): + remote_value.to_knx(1) diff --git a/xknx/devices/weather.py b/xknx/devices/weather.py index b7e625f99..773b55b0d 100644 --- a/xknx/devices/weather.py +++ b/xknx/devices/weather.py @@ -20,9 +20,11 @@ from enum import Enum from typing import TYPE_CHECKING, Any +from xknx.dpt import DPTPressure, DPTPressure2Byte from xknx.remote_value import ( GroupAddressesType, RemoteValue, + RemoteValueByLength, RemoteValueNumeric, RemoteValueSwitch, ) @@ -211,11 +213,11 @@ def __init__( after_update_cb=self.after_update, ) - self._air_pressure = RemoteValueNumeric( + self._air_pressure = RemoteValueByLength( xknx, + dpt_classes=(DPTPressure, DPTPressure2Byte), group_address_state=group_address_air_pressure, sync_state=sync_state, - value_type="pressure_2byte", device_name=self.name, feature_name="Air pressure", after_update_cb=self.after_update, diff --git a/xknx/remote_value/__init__.py b/xknx/remote_value/__init__.py index 009211f2a..486df06a7 100644 --- a/xknx/remote_value/__init__.py +++ b/xknx/remote_value/__init__.py @@ -1,6 +1,7 @@ """Module for handling values on the KNX bus.""" from .remote_value import GroupAddressesType, RemoteValue +from .remote_value_by_length import RemoteValueByLength from .remote_value_climate_mode import ( RemoteValueBinaryHeatCool, RemoteValueBinaryOperationMode, @@ -31,6 +32,7 @@ "RemoteValue", "RemoteValueBinaryHeatCool", "RemoteValueBinaryOperationMode", + "RemoteValueByLength", "RemoteValueColorRGB", "RemoteValueColorRGBW", "RemoteValueColorXYY", diff --git a/xknx/remote_value/remote_value_by_length.py b/xknx/remote_value/remote_value_by_length.py new file mode 100644 index 000000000..2a5f252b8 --- /dev/null +++ b/xknx/remote_value/remote_value_by_length.py @@ -0,0 +1,102 @@ +"""Module for managing remote value with payload length based DPT detection.""" + +from __future__ import annotations + +from collections.abc import Iterable +from typing import TYPE_CHECKING + +from xknx.dpt import DPTArray, DPTBinary, DPTNumeric +from xknx.exceptions import ConversionError, CouldNotParseTelegram + +from .remote_value import GroupAddressesType, RemoteValue, RVCallbackType + +if TYPE_CHECKING: + from xknx.xknx import XKNX + + +class RemoteValueByLength(RemoteValue[float]): + """RemoteValue with DPT detection based on payload length of first received value.""" + + def __init__( + self, + xknx: XKNX, + dpt_classes: Iterable[type[DPTNumeric]], + group_address: GroupAddressesType = None, + group_address_state: GroupAddressesType = None, + sync_state: bool | int | float | str = True, + device_name: str | None = None, + feature_name: str | None = None, + after_update_cb: RVCallbackType[float] | None = None, + ): + """Initialize RemoteValueByLength class.""" + _payload_lengths = set() + for dpt_class in dpt_classes: + if ( + not issubclass(dpt_class, DPTNumeric) + or dpt_class.payload_type is not DPTArray + ): + raise ConversionError( + "Only DPTNumeric subclasses with payload_type DPTArray are supported" + ) + if dpt_class.payload_length in _payload_lengths: + raise ConversionError( + f"Duplicate payload_length {dpt_class.payload_length} in {dpt_classes}" + ) + _payload_lengths.add(dpt_class.payload_length) + + super().__init__( + xknx, + group_address, + group_address_state, + sync_state=sync_state, + device_name=device_name, + feature_name=feature_name, + after_update_cb=after_update_cb, + ) + + self._dpt_classes = dpt_classes + self._internal_dpt_class: type[DPTNumeric] | None = None + + def to_knx(self, value: float) -> DPTArray: + """Convert value to payload.""" + if self._internal_dpt_class is None: + raise ConversionError( + f"RemoteValue DPT not initialized for {self.device_name}" + ) + return self._internal_dpt_class.to_knx(value) + + def from_knx(self, payload: DPTArray | DPTBinary) -> float: + """Convert current payload to value.""" + if self._internal_dpt_class is None: + self._internal_dpt_class = self._determine_dpt_class(payload) + + return self._internal_dpt_class.from_knx(payload) + + @property + def unit_of_measurement(self) -> str | None: + """Return the unit of measurement.""" + if not self._internal_dpt_class: + return None + return self._internal_dpt_class.unit + + @property + def ha_device_class(self) -> str | None: + """Return a string representing the home assistant device class.""" + if not self._internal_dpt_class: + return None + return getattr(self._internal_dpt_class, "ha_device_class", None) + + def _determine_dpt_class(self, payload: DPTArray | DPTBinary) -> type[DPTNumeric]: + """Test if telegram payload may be parsed.""" + if isinstance(payload, DPTArray): + try: + return next( + dpt_class + for dpt_class in self._dpt_classes + if dpt_class.payload_type is DPTArray + and dpt_class.payload_length == len(payload.value) + ) + except StopIteration: + pass + + raise CouldNotParseTelegram("Payload invalid", payload=str(payload))