diff --git a/docs/changelog.md b/docs/changelog.md index 70ccc43ae..661fce5c2 100644 --- a/docs/changelog.md +++ b/docs/changelog.md @@ -6,6 +6,8 @@ nav_order: 2 # Changelog +- Added rate limit (in packets per second) option to P2PConnection. +- Fix typo in management procedure (`nm_invididual_address_write` was renamed to `nm_individual_address_write`) # 3.3.0 Climate humidity 2024-10-20 diff --git a/examples/example_write_individual_address.py b/examples/example_write_individual_address.py index 79b92ca59..069521f4c 100644 --- a/examples/example_write_individual_address.py +++ b/examples/example_write_individual_address.py @@ -23,7 +23,7 @@ async def main(argv: list[str]) -> int: async with XKNX() as xknx: individual_address = IndividualAddress(address) - await procedures.nm_invididual_address_write(xknx, individual_address) + await procedures.nm_individual_address_write(xknx, individual_address) return 0 diff --git a/test/management_tests/management_test.py b/test/management_tests/management_test.py index 655af215f..087a7f883 100644 --- a/test/management_tests/management_test.py +++ b/test/management_tests/management_test.py @@ -223,3 +223,79 @@ async def test_broadcast_message(): with pytest.raises(TypeError): await xknx.management.broadcast(connect) + + +@pytest.mark.parametrize("rate_limit", [0, 1]) +async def test_p2p_rate_limit(time_travel, rate_limit): + """Test rate limit for P2P management connections.""" + xknx = XKNX() + xknx.cemi_handler = AsyncMock() + ia = IndividualAddress("4.0.1") + + def send_responses(index): + ack = Telegram( + source_address=ia, + destination_address=IndividualAddress(0), + direction=TelegramDirection.INCOMING, + tpci=tpci.TAck(index), + ) + device_desc_resp = Telegram( + source_address=ia, + destination_address=IndividualAddress(0), + direction=TelegramDirection.INCOMING, + tpci=tpci.TDataConnected(index), + payload=apci.DeviceDescriptorResponse(), + ) + + xknx.management.process(ack) + xknx.management.process(device_desc_resp) + + conn = await xknx.management.connect(ia, rate_limit) + + # create task and request data + task = asyncio.create_task( + conn.request( + payload=apci.DeviceDescriptorRead(descriptor=0), + expected=apci.DeviceDescriptorResponse, + ) + ) + + await asyncio.sleep(0) + send_responses(0) + + await task + + xknx.cemi_handler.reset_mock() + + # create second task + task = asyncio.create_task( + conn.request( + payload=apci.DeviceDescriptorRead(descriptor=0), + expected=apci.DeviceDescriptorResponse, + ) + ) + await asyncio.sleep(0) + + if rate_limit: + await time_travel(0.5 / rate_limit) + + # the request is still queued + assert not xknx.cemi_handler.send_telegram.call_args_list + + await time_travel(0.5 / rate_limit) + + # the requests should be sent now, the behaviour should match no rate limit + + assert xknx.cemi_handler.send_telegram.call_args_list == [ + call( + Telegram( + destination_address=ia, + tpci=tpci.TDataConnected(1), + payload=apci.DeviceDescriptorRead(descriptor=0), + ) + ), + ] + + send_responses(1) + + await task diff --git a/test/management_tests/procedures_test.py b/test/management_tests/procedures_test.py index e55ddd1f3..430e3c4e9 100644 --- a/test/management_tests/procedures_test.py +++ b/test/management_tests/procedures_test.py @@ -249,7 +249,7 @@ async def test_nm_individual_address_write(time_travel): payload=apci.IndividualAddressWrite(address=individual_address_new), ) task = asyncio.create_task( - procedures.nm_invididual_address_write( + procedures.nm_individual_address_write( xknx=xknx, individual_address=individual_address_new ) ) @@ -309,7 +309,7 @@ async def test_nm_individual_address_write_two_devices_in_programming_mode(time_ ) task = asyncio.create_task( - procedures.nm_invididual_address_write( + procedures.nm_individual_address_write( xknx=xknx, individual_address=individual_address_new ) ) @@ -358,7 +358,7 @@ async def test_nm_individual_address_write_no_device_programming_mode(time_trave ) task = asyncio.create_task( - procedures.nm_invididual_address_write( + procedures.nm_individual_address_write( xknx=xknx, individual_address=individual_address_new ) ) @@ -428,7 +428,7 @@ async def test_nm_individual_address_write_address_found(time_travel): ) task = asyncio.create_task( - procedures.nm_invididual_address_write( + procedures.nm_individual_address_write( xknx=xknx, individual_address=individual_address ) ) @@ -490,7 +490,7 @@ async def test_nm_individual_address_write_programming_failed(time_travel): payload=apci.IndividualAddressWrite(address=individual_address_new), ) task = asyncio.create_task( - procedures.nm_invididual_address_write( + procedures.nm_individual_address_write( xknx=xknx, individual_address=individual_address_new ) ) @@ -572,7 +572,7 @@ async def test_nm_individual_address_write_address_found_other_in_programming_mo ) task = asyncio.create_task( - procedures.nm_invididual_address_write( + procedures.nm_individual_address_write( xknx=xknx, individual_address=individual_address ) ) diff --git a/xknx/management/management.py b/xknx/management/management.py index 2f6098551..e16ebb183 100644 --- a/xknx/management/management.py +++ b/xknx/management/management.py @@ -6,6 +6,7 @@ from collections.abc import AsyncGenerator, AsyncIterator, Callable, Generator from contextlib import asynccontextmanager import logging +import time from typing import TYPE_CHECKING from xknx.exceptions import ( @@ -81,11 +82,13 @@ def process(self, telegram: Telegram) -> None: logger.debug("Unhandled management telegram: %r", telegram) return - async def connect(self, address: IndividualAddress) -> P2PConnection: + async def connect( + self, address: IndividualAddress, rate_limit: int = 0 + ) -> P2PConnection: """Open a point-to-point connection to a KNX device.""" if address in self._connections: raise ManagementConnectionError(f"Connection to {address} already exists.") - p2p_connection = P2PConnection(self.xknx, address) + p2p_connection = P2PConnection(self.xknx, address, rate_limit) try: await p2p_connection.connect() except ManagementConnectionError as exc: @@ -176,16 +179,21 @@ async def receive( class P2PConnection: """Class to manage a point-to-point connection with a KNX device.""" - def __init__(self, xknx: XKNX, address: IndividualAddress) -> None: + def __init__( + self, xknx: XKNX, address: IndividualAddress, rate_limit: int = 20 + ) -> None: """Initialize P2PConnection class.""" self.xknx = xknx self.address = address self.disconnect_hook: Callable[[], None] + self.rate_limit = rate_limit self.sequence_number = self._sequence_number_generator() self._expected_sequence_number = 0 self._connected = False + self._last_response_time: float = 0 + self._ack_waiter: asyncio.Future[TAck | TNak] | None = None self._response_waiter: asyncio.Future[Telegram] = ( asyncio.get_event_loop().create_future() @@ -359,5 +367,15 @@ async def request(self, payload: APCI, expected: type[APCI] | None) -> Telegram: raise ManagementConnectionRefused( "Management connection disconnected by the peer." ) + + if self.rate_limit: + # time in seconds since the last request operation + time_diff = time.time() - self._last_response_time + wait_time = 1 / self.rate_limit + if time_diff < wait_time: + await asyncio.sleep(wait_time - time_diff) + await self._send_data(payload) - return await self._receive(expected) + response = await self._receive(expected) + self._last_response_time = time.time() + return response diff --git a/xknx/management/procedures.py b/xknx/management/procedures.py index 764b9463b..1dd05defd 100644 --- a/xknx/management/procedures.py +++ b/xknx/management/procedures.py @@ -106,7 +106,7 @@ async def nm_individual_address_read( return addresses -async def nm_invididual_address_write( +async def nm_individual_address_write( xknx: XKNX, individual_address: IndividualAddressableType ) -> None: """ @@ -182,6 +182,10 @@ async def nm_invididual_address_write( await xknx.cemi_handler.send_telegram(telegram) +# for backwards compatibility +nm_invididual_address_write = nm_individual_address_write + + async def nm_individual_address_serial_number_read( xknx: XKNX, serial: bytes,