Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

P2P rate limit #1596

Merged
merged 8 commits into from
Nov 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions docs/changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ nav_order: 2

# Changelog

- Added rate limit (in packets per second) option to P2PConnection.
- Fix typo in management procedure (`nm_invididual_address_write` was renamed to `nm_individual_address_write`)

# 3.3.0 Climate humidity 2024-10-20

Expand Down
2 changes: 1 addition & 1 deletion examples/example_write_individual_address.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ async def main(argv: list[str]) -> int:

async with XKNX() as xknx:
individual_address = IndividualAddress(address)
await procedures.nm_invididual_address_write(xknx, individual_address)
await procedures.nm_individual_address_write(xknx, individual_address)

return 0

Expand Down
76 changes: 76 additions & 0 deletions test/management_tests/management_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -223,3 +223,79 @@ async def test_broadcast_message():

with pytest.raises(TypeError):
await xknx.management.broadcast(connect)


@pytest.mark.parametrize("rate_limit", [0, 1])
async def test_p2p_rate_limit(time_travel, rate_limit):
"""Test rate limit for P2P management connections."""
xknx = XKNX()
xknx.cemi_handler = AsyncMock()
ia = IndividualAddress("4.0.1")

def send_responses(index):
ack = Telegram(
source_address=ia,
destination_address=IndividualAddress(0),
direction=TelegramDirection.INCOMING,
tpci=tpci.TAck(index),
)
device_desc_resp = Telegram(
source_address=ia,
destination_address=IndividualAddress(0),
direction=TelegramDirection.INCOMING,
tpci=tpci.TDataConnected(index),
payload=apci.DeviceDescriptorResponse(),
)

xknx.management.process(ack)
xknx.management.process(device_desc_resp)

conn = await xknx.management.connect(ia, rate_limit)

# create task and request data
task = asyncio.create_task(
conn.request(
payload=apci.DeviceDescriptorRead(descriptor=0),
expected=apci.DeviceDescriptorResponse,
)
)

await asyncio.sleep(0)
send_responses(0)

await task

xknx.cemi_handler.reset_mock()

# create second task
task = asyncio.create_task(
conn.request(
payload=apci.DeviceDescriptorRead(descriptor=0),
expected=apci.DeviceDescriptorResponse,
)
)
await asyncio.sleep(0)

if rate_limit:
await time_travel(0.5 / rate_limit)

# the request is still queued
assert not xknx.cemi_handler.send_telegram.call_args_list

await time_travel(0.5 / rate_limit)

# the requests should be sent now, the behaviour should match no rate limit

assert xknx.cemi_handler.send_telegram.call_args_list == [
call(
Telegram(
destination_address=ia,
tpci=tpci.TDataConnected(1),
payload=apci.DeviceDescriptorRead(descriptor=0),
)
),
]

send_responses(1)

await task
12 changes: 6 additions & 6 deletions test/management_tests/procedures_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,7 @@ async def test_nm_individual_address_write(time_travel):
payload=apci.IndividualAddressWrite(address=individual_address_new),
)
task = asyncio.create_task(
procedures.nm_invididual_address_write(
procedures.nm_individual_address_write(
xknx=xknx, individual_address=individual_address_new
)
)
Expand Down Expand Up @@ -309,7 +309,7 @@ async def test_nm_individual_address_write_two_devices_in_programming_mode(time_
)

task = asyncio.create_task(
procedures.nm_invididual_address_write(
procedures.nm_individual_address_write(
xknx=xknx, individual_address=individual_address_new
)
)
Expand Down Expand Up @@ -358,7 +358,7 @@ async def test_nm_individual_address_write_no_device_programming_mode(time_trave
)

task = asyncio.create_task(
procedures.nm_invididual_address_write(
procedures.nm_individual_address_write(
xknx=xknx, individual_address=individual_address_new
)
)
Expand Down Expand Up @@ -428,7 +428,7 @@ async def test_nm_individual_address_write_address_found(time_travel):
)

task = asyncio.create_task(
procedures.nm_invididual_address_write(
procedures.nm_individual_address_write(
xknx=xknx, individual_address=individual_address
)
)
Expand Down Expand Up @@ -490,7 +490,7 @@ async def test_nm_individual_address_write_programming_failed(time_travel):
payload=apci.IndividualAddressWrite(address=individual_address_new),
)
task = asyncio.create_task(
procedures.nm_invididual_address_write(
procedures.nm_individual_address_write(
xknx=xknx, individual_address=individual_address_new
)
)
Expand Down Expand Up @@ -572,7 +572,7 @@ async def test_nm_individual_address_write_address_found_other_in_programming_mo
)

task = asyncio.create_task(
procedures.nm_invididual_address_write(
procedures.nm_individual_address_write(
xknx=xknx, individual_address=individual_address
)
)
Expand Down
26 changes: 22 additions & 4 deletions xknx/management/management.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from collections.abc import AsyncGenerator, AsyncIterator, Callable, Generator
from contextlib import asynccontextmanager
import logging
import time
from typing import TYPE_CHECKING

from xknx.exceptions import (
Expand Down Expand Up @@ -81,11 +82,13 @@ def process(self, telegram: Telegram) -> None:
logger.debug("Unhandled management telegram: %r", telegram)
return

async def connect(self, address: IndividualAddress) -> P2PConnection:
async def connect(
self, address: IndividualAddress, rate_limit: int = 0
) -> P2PConnection:
"""Open a point-to-point connection to a KNX device."""
if address in self._connections:
raise ManagementConnectionError(f"Connection to {address} already exists.")
p2p_connection = P2PConnection(self.xknx, address)
p2p_connection = P2PConnection(self.xknx, address, rate_limit)
try:
await p2p_connection.connect()
except ManagementConnectionError as exc:
Expand Down Expand Up @@ -176,16 +179,21 @@ async def receive(
class P2PConnection:
"""Class to manage a point-to-point connection with a KNX device."""

def __init__(self, xknx: XKNX, address: IndividualAddress) -> None:
def __init__(
self, xknx: XKNX, address: IndividualAddress, rate_limit: int = 20
) -> None:
"""Initialize P2PConnection class."""
self.xknx = xknx
self.address = address
self.disconnect_hook: Callable[[], None]
self.rate_limit = rate_limit

self.sequence_number = self._sequence_number_generator()
self._expected_sequence_number = 0
self._connected = False

self._last_response_time: float = 0

self._ack_waiter: asyncio.Future[TAck | TNak] | None = None
self._response_waiter: asyncio.Future[Telegram] = (
asyncio.get_event_loop().create_future()
Expand Down Expand Up @@ -359,5 +367,15 @@ async def request(self, payload: APCI, expected: type[APCI] | None) -> Telegram:
raise ManagementConnectionRefused(
"Management connection disconnected by the peer."
)

if self.rate_limit:
# time in seconds since the last request operation
time_diff = time.time() - self._last_response_time
wait_time = 1 / self.rate_limit
if time_diff < wait_time:
await asyncio.sleep(wait_time - time_diff)

await self._send_data(payload)
return await self._receive(expected)
response = await self._receive(expected)
self._last_response_time = time.time()
return response
6 changes: 5 additions & 1 deletion xknx/management/procedures.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ async def nm_individual_address_read(
return addresses


async def nm_invididual_address_write(
async def nm_individual_address_write(
opieters marked this conversation as resolved.
Show resolved Hide resolved
xknx: XKNX, individual_address: IndividualAddressableType
) -> None:
"""
Expand Down Expand Up @@ -182,6 +182,10 @@ async def nm_invididual_address_write(
await xknx.cemi_handler.send_telegram(telegram)


# for backwards compatibility
nm_invididual_address_write = nm_individual_address_write


async def nm_individual_address_serial_number_read(
xknx: XKNX,
serial: bytes,
Expand Down