From 03fbbb5a76d8be66665c1b4d4635a67ecc8fc74f Mon Sep 17 00:00:00 2001 From: Olivier Pieters Date: Tue, 29 Oct 2024 15:06:03 +0100 Subject: [PATCH 1/8] Add rate limit option to P2P management connection. --- xknx/management/management.py | 26 ++++++++++++++++++++++---- 1 file changed, 22 insertions(+), 4 deletions(-) diff --git a/xknx/management/management.py b/xknx/management/management.py index 2f6098551..0d1693d0f 100644 --- a/xknx/management/management.py +++ b/xknx/management/management.py @@ -6,6 +6,7 @@ from collections.abc import AsyncGenerator, AsyncIterator, Callable, Generator from contextlib import asynccontextmanager import logging +import time from typing import TYPE_CHECKING from xknx.exceptions import ( @@ -81,11 +82,13 @@ def process(self, telegram: Telegram) -> None: logger.debug("Unhandled management telegram: %r", telegram) return - async def connect(self, address: IndividualAddress) -> P2PConnection: + async def connect( + self, address: IndividualAddress, rate_limit: int = 0 + ) -> P2PConnection: """Open a point-to-point connection to a KNX device.""" if address in self._connections: raise ManagementConnectionError(f"Connection to {address} already exists.") - p2p_connection = P2PConnection(self.xknx, address) + p2p_connection = P2PConnection(self.xknx, address, rate_limit) try: await p2p_connection.connect() except ManagementConnectionError as exc: @@ -176,16 +179,21 @@ async def receive( class P2PConnection: """Class to manage a point-to-point connection with a KNX device.""" - def __init__(self, xknx: XKNX, address: IndividualAddress) -> None: + def __init__( + self, xknx: XKNX, address: IndividualAddress, rate_limit: int = 0 + ) -> None: """Initialize P2PConnection class.""" self.xknx = xknx self.address = address self.disconnect_hook: Callable[[], None] + self.rate_limit = rate_limit self.sequence_number = self._sequence_number_generator() self._expected_sequence_number = 0 self._connected = False + self._last_sent_time = time.time() + self._ack_waiter: asyncio.Future[TAck | TNak] | None = None self._response_waiter: asyncio.Future[Telegram] = ( asyncio.get_event_loop().create_future() @@ -359,5 +367,15 @@ async def request(self, payload: APCI, expected: type[APCI] | None) -> Telegram: raise ManagementConnectionRefused( "Management connection disconnected by the peer." ) + + if self.rate_limit: + # time in seconds since the last request operation + time_diff = time.time() - self._last_sent_time + wait_time = 1 / self.rate_limit + if time_diff < wait_time: + await asyncio.sleep(wait_time - time_diff) + await self._send_data(payload) - return await self._receive(expected) + response = await self._receive(expected) + self._last_sent_time = time.time() + return response From 25fe172ecf5569b34f3c8dc8ff7af00b34dfbe59 Mon Sep 17 00:00:00 2001 From: Olivier Pieters Date: Tue, 29 Oct 2024 15:08:09 +0100 Subject: [PATCH 2/8] Fix typo. --- test/management_tests/procedures_test.py | 12 ++++++------ xknx/management/procedures.py | 2 +- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/test/management_tests/procedures_test.py b/test/management_tests/procedures_test.py index e55ddd1f3..430e3c4e9 100644 --- a/test/management_tests/procedures_test.py +++ b/test/management_tests/procedures_test.py @@ -249,7 +249,7 @@ async def test_nm_individual_address_write(time_travel): payload=apci.IndividualAddressWrite(address=individual_address_new), ) task = asyncio.create_task( - procedures.nm_invididual_address_write( + procedures.nm_individual_address_write( xknx=xknx, individual_address=individual_address_new ) ) @@ -309,7 +309,7 @@ async def test_nm_individual_address_write_two_devices_in_programming_mode(time_ ) task = asyncio.create_task( - procedures.nm_invididual_address_write( + procedures.nm_individual_address_write( xknx=xknx, individual_address=individual_address_new ) ) @@ -358,7 +358,7 @@ async def test_nm_individual_address_write_no_device_programming_mode(time_trave ) task = asyncio.create_task( - procedures.nm_invididual_address_write( + procedures.nm_individual_address_write( xknx=xknx, individual_address=individual_address_new ) ) @@ -428,7 +428,7 @@ async def test_nm_individual_address_write_address_found(time_travel): ) task = asyncio.create_task( - procedures.nm_invididual_address_write( + procedures.nm_individual_address_write( xknx=xknx, individual_address=individual_address ) ) @@ -490,7 +490,7 @@ async def test_nm_individual_address_write_programming_failed(time_travel): payload=apci.IndividualAddressWrite(address=individual_address_new), ) task = asyncio.create_task( - procedures.nm_invididual_address_write( + procedures.nm_individual_address_write( xknx=xknx, individual_address=individual_address_new ) ) @@ -572,7 +572,7 @@ async def test_nm_individual_address_write_address_found_other_in_programming_mo ) task = asyncio.create_task( - procedures.nm_invididual_address_write( + procedures.nm_individual_address_write( xknx=xknx, individual_address=individual_address ) ) diff --git a/xknx/management/procedures.py b/xknx/management/procedures.py index 764b9463b..0c8dfd74f 100644 --- a/xknx/management/procedures.py +++ b/xknx/management/procedures.py @@ -106,7 +106,7 @@ async def nm_individual_address_read( return addresses -async def nm_invididual_address_write( +async def nm_individual_address_write( xknx: XKNX, individual_address: IndividualAddressableType ) -> None: """ From 922087d1ead1b40c8874fadb5b79ff7eac75a78d Mon Sep 17 00:00:00 2001 From: Olivier Pieters Date: Tue, 29 Oct 2024 15:10:39 +0100 Subject: [PATCH 3/8] Update changelog. --- docs/changelog.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/docs/changelog.md b/docs/changelog.md index 70ccc43ae..661fce5c2 100644 --- a/docs/changelog.md +++ b/docs/changelog.md @@ -6,6 +6,8 @@ nav_order: 2 # Changelog +- Added rate limit (in packets per second) option to P2PConnection. +- Fix typo in management procedure (`nm_invididual_address_write` was renamed to `nm_individual_address_write`) # 3.3.0 Climate humidity 2024-10-20 From fce8d9eaa35d25baf789ebadab5bc7cf087b2dbc Mon Sep 17 00:00:00 2001 From: Olivier Pieters Date: Tue, 29 Oct 2024 15:12:37 +0100 Subject: [PATCH 4/8] Update renamed procedure. --- examples/example_write_individual_address.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/examples/example_write_individual_address.py b/examples/example_write_individual_address.py index 79b92ca59..069521f4c 100644 --- a/examples/example_write_individual_address.py +++ b/examples/example_write_individual_address.py @@ -23,7 +23,7 @@ async def main(argv: list[str]) -> int: async with XKNX() as xknx: individual_address = IndividualAddress(address) - await procedures.nm_invididual_address_write(xknx, individual_address) + await procedures.nm_individual_address_write(xknx, individual_address) return 0 From b3a9b381b8b7e3999626397efe2f069f523dd894 Mon Sep 17 00:00:00 2001 From: Olivier Pieters Date: Tue, 29 Oct 2024 16:39:01 +0100 Subject: [PATCH 5/8] Add test for new rate limit option. --- test/management_tests/management_test.py | 59 ++++++++++++++++++++++++ xknx/management/management.py | 2 +- 2 files changed, 60 insertions(+), 1 deletion(-) diff --git a/test/management_tests/management_test.py b/test/management_tests/management_test.py index 655af215f..013e76c74 100644 --- a/test/management_tests/management_test.py +++ b/test/management_tests/management_test.py @@ -223,3 +223,62 @@ async def test_broadcast_message(): with pytest.raises(TypeError): await xknx.management.broadcast(connect) + + +@pytest.mark.parametrize("rate_limit", [0, 1]) +async def test_p2p_rate_limit(time_travel, rate_limit): + """Test rate limit for P2P management connections.""" + xknx = XKNX() + xknx.cemi_handler = AsyncMock() + ia = IndividualAddress("4.0.1") + + def send_responses(index): + ack = Telegram( + source_address=ia, + destination_address=IndividualAddress(0), + direction=TelegramDirection.INCOMING, + tpci=tpci.TAck(index), + ) + device_desc_resp = Telegram( + source_address=ia, + destination_address=IndividualAddress(0), + direction=TelegramDirection.INCOMING, + tpci=tpci.TDataConnected(index), + payload=apci.DeviceDescriptorResponse(), + ) + + xknx.management.process(ack) + xknx.management.process(device_desc_resp) + + conn = await xknx.management.connect(ia, rate_limit) + + # create task and request data + task = asyncio.create_task( + conn.request( + payload=apci.DeviceDescriptorRead(descriptor=0), + expected=apci.DeviceDescriptorResponse, + ) + ) + + await asyncio.sleep(0) + send_responses(0) + + await task + + # create second task + task = asyncio.create_task( + conn.request( + payload=apci.DeviceDescriptorRead(descriptor=0), + expected=apci.DeviceDescriptorResponse, + ) + ) + await asyncio.sleep(0) + send_responses(1) + + if rate_limit > 0: + # this should raise a timeout, since the data was not yet sent + with pytest.raises(ManagementConnectionTimeout): + await task + else: + # should work as expected + await task diff --git a/xknx/management/management.py b/xknx/management/management.py index 0d1693d0f..781c21b3a 100644 --- a/xknx/management/management.py +++ b/xknx/management/management.py @@ -192,7 +192,7 @@ def __init__( self._expected_sequence_number = 0 self._connected = False - self._last_sent_time = time.time() + self._last_sent_time: float = 0 self._ack_waiter: asyncio.Future[TAck | TNak] | None = None self._response_waiter: asyncio.Future[Telegram] = ( From 6de70ad0bba4e86606fff5ab5151b479751fabc5 Mon Sep 17 00:00:00 2001 From: Olivier Pieters Date: Wed, 30 Oct 2024 10:01:47 +0100 Subject: [PATCH 6/8] Address review comment. --- test/management_tests/management_test.py | 24 ++++++++++++++++++++++-- xknx/management/management.py | 6 +++--- xknx/management/procedures.py | 4 ++++ 3 files changed, 29 insertions(+), 5 deletions(-) diff --git a/test/management_tests/management_test.py b/test/management_tests/management_test.py index 013e76c74..3de3dfeda 100644 --- a/test/management_tests/management_test.py +++ b/test/management_tests/management_test.py @@ -265,6 +265,8 @@ def send_responses(index): await task + xknx.cemi_handler.reset_mock() + # create second task task = asyncio.create_task( conn.request( @@ -273,12 +275,30 @@ def send_responses(index): ) ) await asyncio.sleep(0) - send_responses(1) if rate_limit > 0: - # this should raise a timeout, since the data was not yet sent + # this should raise a timeout, since the data was never confirmed with pytest.raises(ManagementConnectionTimeout): + # the request is queued + await time_travel(0.5 / rate_limit) + assert xknx.cemi_handler.send_telegram.call_args_list == [] + + # the request should now be sent + await time_travel(0.6 / rate_limit) + + assert xknx.cemi_handler.send_telegram.call_args_list == [ + call( + Telegram( + destination_address=ia, + tpci=tpci.TDataConnected(1), + payload=apci.DeviceDescriptorRead(descriptor=0), + ) + ), + ] + await task else: # should work as expected + send_responses(1) + await task diff --git a/xknx/management/management.py b/xknx/management/management.py index 781c21b3a..e0438e776 100644 --- a/xknx/management/management.py +++ b/xknx/management/management.py @@ -192,7 +192,7 @@ def __init__( self._expected_sequence_number = 0 self._connected = False - self._last_sent_time: float = 0 + self._last_response_time: float = 0 self._ack_waiter: asyncio.Future[TAck | TNak] | None = None self._response_waiter: asyncio.Future[Telegram] = ( @@ -370,12 +370,12 @@ async def request(self, payload: APCI, expected: type[APCI] | None) -> Telegram: if self.rate_limit: # time in seconds since the last request operation - time_diff = time.time() - self._last_sent_time + time_diff = time.time() - self._last_response_time wait_time = 1 / self.rate_limit if time_diff < wait_time: await asyncio.sleep(wait_time - time_diff) await self._send_data(payload) response = await self._receive(expected) - self._last_sent_time = time.time() + self._last_response_time = time.time() return response diff --git a/xknx/management/procedures.py b/xknx/management/procedures.py index 0c8dfd74f..1dd05defd 100644 --- a/xknx/management/procedures.py +++ b/xknx/management/procedures.py @@ -182,6 +182,10 @@ async def nm_individual_address_write( await xknx.cemi_handler.send_telegram(telegram) +# for backwards compatibility +nm_invididual_address_write = nm_individual_address_write + + async def nm_individual_address_serial_number_read( xknx: XKNX, serial: bytes, From 0b0b3e69195e16f9caf11f3c5645a92009632216 Mon Sep 17 00:00:00 2001 From: Olivier Pieters Date: Wed, 30 Oct 2024 10:05:49 +0100 Subject: [PATCH 7/8] Enable rate limit by default. --- xknx/management/management.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/xknx/management/management.py b/xknx/management/management.py index e0438e776..e16ebb183 100644 --- a/xknx/management/management.py +++ b/xknx/management/management.py @@ -180,7 +180,7 @@ class P2PConnection: """Class to manage a point-to-point connection with a KNX device.""" def __init__( - self, xknx: XKNX, address: IndividualAddress, rate_limit: int = 0 + self, xknx: XKNX, address: IndividualAddress, rate_limit: int = 20 ) -> None: """Initialize P2PConnection class.""" self.xknx = xknx From c37898afe08eefe2656060447ff6bab4c1f429b9 Mon Sep 17 00:00:00 2001 From: Olivier Pieters Date: Tue, 5 Nov 2024 15:38:33 +0100 Subject: [PATCH 8/8] Avoid timeout and speed-up test. --- test/management_tests/management_test.py | 47 +++++++++++------------- 1 file changed, 22 insertions(+), 25 deletions(-) diff --git a/test/management_tests/management_test.py b/test/management_tests/management_test.py index 3de3dfeda..087a7f883 100644 --- a/test/management_tests/management_test.py +++ b/test/management_tests/management_test.py @@ -276,29 +276,26 @@ def send_responses(index): ) await asyncio.sleep(0) - if rate_limit > 0: - # this should raise a timeout, since the data was never confirmed - with pytest.raises(ManagementConnectionTimeout): - # the request is queued - await time_travel(0.5 / rate_limit) - assert xknx.cemi_handler.send_telegram.call_args_list == [] - - # the request should now be sent - await time_travel(0.6 / rate_limit) - - assert xknx.cemi_handler.send_telegram.call_args_list == [ - call( - Telegram( - destination_address=ia, - tpci=tpci.TDataConnected(1), - payload=apci.DeviceDescriptorRead(descriptor=0), - ) - ), - ] - - await task - else: - # should work as expected - send_responses(1) + if rate_limit: + await time_travel(0.5 / rate_limit) - await task + # the request is still queued + assert not xknx.cemi_handler.send_telegram.call_args_list + + await time_travel(0.5 / rate_limit) + + # the requests should be sent now, the behaviour should match no rate limit + + assert xknx.cemi_handler.send_telegram.call_args_list == [ + call( + Telegram( + destination_address=ia, + tpci=tpci.TDataConnected(1), + payload=apci.DeviceDescriptorRead(descriptor=0), + ) + ), + ] + + send_responses(1) + + await task