-
-
Notifications
You must be signed in to change notification settings - Fork 108
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Support 2 or 4 byte float for weather air pressure (#1618)
* Support 2 or 4 byte float for weather air pressure * Create remote_value_by_length_test.py * Update remote_value_by_length_test.py * Update weather.md
- Loading branch information
Showing
7 changed files
with
216 additions
and
10 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,88 @@ | ||
"""Unit test for RemoteValueByLength objects.""" | ||
|
||
import pytest | ||
|
||
from xknx import XKNX | ||
from xknx.dpt import ( | ||
DPTArray, | ||
DPTBase, | ||
DPTBinary, | ||
DPTColorXYY, | ||
DPTOpenClose, | ||
DPTPressure, | ||
DPTPressure2Byte, | ||
DPTTemperature, | ||
DPTValue1Count, | ||
) | ||
from xknx.exceptions import ConversionError, CouldNotParseTelegram | ||
from xknx.remote_value.remote_value_by_length import RemoteValueByLength | ||
|
||
|
||
class TestRemoteValueByLength: | ||
"""Test class for RemoteValueByLength objects.""" | ||
|
||
@pytest.mark.parametrize( | ||
"dpt_classes", | ||
[ | ||
(DPTOpenClose, DPTPressure), # DPTBinary payload invalid | ||
(DPTTemperature, DPTPressure2Byte), # similar payload_length | ||
(DPTColorXYY, DPTValue1Count), # non-numeric DPT | ||
], | ||
) | ||
def test_invalid_dpt_classes(self, dpt_classes: tuple[type[DPTBase]]) -> None: | ||
"""Test if invalid DPT classes raise ConversionError.""" | ||
xknx = XKNX() | ||
with pytest.raises(ConversionError): | ||
RemoteValueByLength(xknx, dpt_classes=dpt_classes) # type: ignore[arg-type] | ||
|
||
@pytest.mark.parametrize("payload", [DPTBinary(0), DPTArray((0, 1, 2))]) | ||
def test_invalid_payload(self, payload: DPTArray | DPTBinary) -> None: | ||
"""Test if invalid payloads raise CouldNotParseTelegram.""" | ||
xknx = XKNX() | ||
remote_value = RemoteValueByLength( | ||
xknx=xknx, | ||
dpt_classes=(DPTPressure, DPTPressure2Byte), | ||
) | ||
with pytest.raises(CouldNotParseTelegram): | ||
remote_value.from_knx(payload) | ||
|
||
@pytest.mark.parametrize( | ||
("first_dpt", "invalid_dpt"), | ||
[ | ||
(DPTPressure, DPTPressure2Byte), | ||
(DPTPressure2Byte, DPTPressure), | ||
], | ||
) | ||
def test_payload_valid_mode_assignment( | ||
self, first_dpt: type[DPTBase], invalid_dpt: type[DPTBase] | ||
) -> None: | ||
"""Test if DPT is assigned properly by payload length.""" | ||
TEST_VALUE = 1 | ||
xknx = XKNX() | ||
remote_value = RemoteValueByLength( | ||
xknx=xknx, | ||
dpt_classes=(DPTPressure, DPTPressure2Byte), | ||
) | ||
first_payload = first_dpt.to_knx(TEST_VALUE) | ||
invalid_payload = invalid_dpt.to_knx(TEST_VALUE) | ||
|
||
assert remote_value._internal_dpt_class is None | ||
assert remote_value.from_knx(first_payload) == TEST_VALUE | ||
assert remote_value._internal_dpt_class == first_dpt | ||
with pytest.raises(CouldNotParseTelegram): | ||
# other DPT is invalid now | ||
remote_value.from_knx(invalid_payload) | ||
# to_knx works when initialized | ||
assert remote_value.to_knx(TEST_VALUE) == first_payload | ||
|
||
def test_to_knx_uninitialized(self) -> None: | ||
"""Test to_knx raising ConversionError when DPT is not known.""" | ||
xknx = XKNX() | ||
remote_value = RemoteValueByLength( | ||
xknx=xknx, | ||
dpt_classes=(DPTPressure, DPTPressure2Byte), | ||
) | ||
|
||
assert remote_value._internal_dpt_class is None | ||
with pytest.raises(ConversionError): | ||
remote_value.to_knx(1) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,102 @@ | ||
"""Module for managing remote value with payload length based DPT detection.""" | ||
|
||
from __future__ import annotations | ||
|
||
from collections.abc import Iterable | ||
from typing import TYPE_CHECKING | ||
|
||
from xknx.dpt import DPTArray, DPTBinary, DPTNumeric | ||
from xknx.exceptions import ConversionError, CouldNotParseTelegram | ||
|
||
from .remote_value import GroupAddressesType, RemoteValue, RVCallbackType | ||
|
||
if TYPE_CHECKING: | ||
from xknx.xknx import XKNX | ||
|
||
|
||
class RemoteValueByLength(RemoteValue[float]): | ||
"""RemoteValue with DPT detection based on payload length of first received value.""" | ||
|
||
def __init__( | ||
self, | ||
xknx: XKNX, | ||
dpt_classes: Iterable[type[DPTNumeric]], | ||
group_address: GroupAddressesType = None, | ||
group_address_state: GroupAddressesType = None, | ||
sync_state: bool | int | float | str = True, | ||
device_name: str | None = None, | ||
feature_name: str | None = None, | ||
after_update_cb: RVCallbackType[float] | None = None, | ||
): | ||
"""Initialize RemoteValueByLength class.""" | ||
_payload_lengths = set() | ||
for dpt_class in dpt_classes: | ||
if ( | ||
not issubclass(dpt_class, DPTNumeric) | ||
or dpt_class.payload_type is not DPTArray | ||
): | ||
raise ConversionError( | ||
"Only DPTNumeric subclasses with payload_type DPTArray are supported" | ||
) | ||
if dpt_class.payload_length in _payload_lengths: | ||
raise ConversionError( | ||
f"Duplicate payload_length {dpt_class.payload_length} in {dpt_classes}" | ||
) | ||
_payload_lengths.add(dpt_class.payload_length) | ||
|
||
super().__init__( | ||
xknx, | ||
group_address, | ||
group_address_state, | ||
sync_state=sync_state, | ||
device_name=device_name, | ||
feature_name=feature_name, | ||
after_update_cb=after_update_cb, | ||
) | ||
|
||
self._dpt_classes = dpt_classes | ||
self._internal_dpt_class: type[DPTNumeric] | None = None | ||
|
||
def to_knx(self, value: float) -> DPTArray: | ||
"""Convert value to payload.""" | ||
if self._internal_dpt_class is None: | ||
raise ConversionError( | ||
f"RemoteValue DPT not initialized for {self.device_name}" | ||
) | ||
return self._internal_dpt_class.to_knx(value) | ||
|
||
def from_knx(self, payload: DPTArray | DPTBinary) -> float: | ||
"""Convert current payload to value.""" | ||
if self._internal_dpt_class is None: | ||
self._internal_dpt_class = self._determine_dpt_class(payload) | ||
|
||
return self._internal_dpt_class.from_knx(payload) | ||
|
||
@property | ||
def unit_of_measurement(self) -> str | None: | ||
"""Return the unit of measurement.""" | ||
if not self._internal_dpt_class: | ||
return None | ||
return self._internal_dpt_class.unit | ||
|
||
@property | ||
def ha_device_class(self) -> str | None: | ||
"""Return a string representing the home assistant device class.""" | ||
if not self._internal_dpt_class: | ||
return None | ||
return getattr(self._internal_dpt_class, "ha_device_class", None) | ||
|
||
def _determine_dpt_class(self, payload: DPTArray | DPTBinary) -> type[DPTNumeric]: | ||
"""Test if telegram payload may be parsed.""" | ||
if isinstance(payload, DPTArray): | ||
try: | ||
return next( | ||
dpt_class | ||
for dpt_class in self._dpt_classes | ||
if dpt_class.payload_type is DPTArray | ||
and dpt_class.payload_length == len(payload.value) | ||
) | ||
except StopIteration: | ||
pass | ||
|
||
raise CouldNotParseTelegram("Payload invalid", payload=str(payload)) |