Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move EZSP send lock from EZSP to individual protocol handlers #649

Merged
merged 20 commits into from
Sep 13, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions bellows/ash.py
Original file line number Diff line number Diff line change
Expand Up @@ -631,7 +631,9 @@ async def _send_data_frame(self, frame: AshFrame) -> None:
await ack_future
except NotAcked:
_LOGGER.debug(
"NCP responded with NAK. Retrying (attempt %d)", attempt + 1
"NCP responded with NAK to %r. Retrying (attempt %d)",
frame,
attempt + 1,
)

# For timing purposes, NAK can be treated as an ACK
Expand All @@ -650,9 +652,10 @@ async def _send_data_frame(self, frame: AshFrame) -> None:
raise
except asyncio.TimeoutError:
_LOGGER.debug(
"No ACK received in %0.2fs (attempt %d)",
"No ACK received in %0.2fs (attempt %d) for %r",
self._t_rx_ack,
attempt + 1,
frame,
)
# If a DATA frame acknowledgement is not received within the
# current timeout value, then t_rx_ack is doubled.
Expand Down
23 changes: 1 addition & 22 deletions bellows/ezsp/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@
from typing import Any, Callable, Generator
import urllib.parse

from zigpy.datastructures import PriorityDynamicBoundedSemaphore

if sys.version_info[:2] < (3, 11):
from async_timeout import timeout as asyncio_timeout # pragma: no cover
else:
Expand Down Expand Up @@ -41,8 +39,6 @@
NETWORK_OPS_TIMEOUT = 10
NETWORK_COORDINATOR_STARTUP_RESET_WAIT = 1

MAX_COMMAND_CONCURRENCY = 1


class EZSP:
_BY_VERSION = {
Expand All @@ -66,7 +62,6 @@ def __init__(self, device_config: dict):
self._ezsp_version = v4.EZSPv4.VERSION
self._gw = None
self._protocol = None
self._send_sem = PriorityDynamicBoundedSemaphore(value=MAX_COMMAND_CONCURRENCY)

self._stack_status_listeners: collections.defaultdict[
t.sl_Status, list[asyncio.Future]
Expand Down Expand Up @@ -190,21 +185,6 @@ def close(self):
self._gw.close()
self._gw = None

def _get_command_priority(self, name: str) -> int:
return {
# Deprioritize any commands that send packets
"set_source_route": -1,
"setExtendedTimeout": -1,
"send_unicast": -1,
"send_multicast": -1,
"send_broadcast": -1,
# Prioritize watchdog commands
"nop": 999,
"readCounters": 999,
"readAndClearCounters": 999,
"getValue": 999,
}.get(name, 0)

async def _command(self, name: str, *args: Any, **kwargs: Any) -> Any:
command = getattr(self._protocol, name)

Expand All @@ -217,8 +197,7 @@ async def _command(self, name: str, *args: Any, **kwargs: Any) -> Any:
)
raise EzspError("EZSP is not running")

async with self._send_sem(priority=self._get_command_priority(name)):
return await command(*args, **kwargs)
return await command(*args, **kwargs)

async def _list_command(
self, name, item_frames, completion_frame, spos, *args, **kwargs
Expand Down
68 changes: 58 additions & 10 deletions bellows/ezsp/protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import functools
import logging
import sys
import time
from typing import TYPE_CHECKING, Any, AsyncGenerator, Callable, Iterable

import zigpy.state
Expand All @@ -15,6 +16,8 @@
else:
from asyncio import timeout as asyncio_timeout # pragma: no cover

from zigpy.datastructures import PriorityDynamicBoundedSemaphore

from bellows.config import CONF_EZSP_POLICIES
from bellows.exception import InvalidCommandError
import bellows.types as t
Expand All @@ -23,7 +26,9 @@
from bellows.uart import Gateway

LOGGER = logging.getLogger(__name__)

EZSP_CMD_TIMEOUT = 6 # Sum of all ASH retry timeouts: 0.4 + 0.8 + 1.6 + 3.2
MAX_COMMAND_CONCURRENCY = 1


class ProtocolHandler(abc.ABC):
Expand All @@ -42,6 +47,9 @@
for name, (cmd_id, tx_schema, rx_schema) in self.COMMANDS.items()
}
self.tc_policy = 0
self._send_semaphore = PriorityDynamicBoundedSemaphore(
value=MAX_COMMAND_CONCURRENCY
)

# Cached by `set_extended_timeout` so subsequent calls are a little faster
self._address_table_size: int | None = None
Expand All @@ -65,18 +73,58 @@
def _ezsp_frame_tx(self, name: str) -> bytes:
"""Serialize the named frame."""

def _get_command_priority(self, name: str) -> int:
return {
# Deprioritize any commands that send packets
"setSourceRoute": -1,
"setExtendedTimeout": -1,
"sendUnicast": -1,
"sendMulticast": -1,
"sendBroadcast": -1,
# Prioritize watchdog commands
"nop": 999,
"readCounters": 999,
"readAndClearCounters": 999,
"getValue": 999,
}.get(name, 0)

async def command(self, name, *args, **kwargs) -> Any:
"""Serialize command and send it."""
LOGGER.debug("Sending command %s: %s %s", name, args, kwargs)
data = self._ezsp_frame(name, *args, **kwargs)
cmd_id, _, rx_schema = self.COMMANDS[name]
future = asyncio.get_running_loop().create_future()
self._awaiting[self._seq] = (cmd_id, rx_schema, future)
self._seq = (self._seq + 1) % 256

async with asyncio_timeout(EZSP_CMD_TIMEOUT):
await self._gw.send_data(data)
return await future
delay_time = 0
puddly marked this conversation as resolved.
Show resolved Hide resolved
was_delayed = False

if self._send_semaphore.locked():
LOGGER.debug(

Check warning on line 97 in bellows/ezsp/protocol.py

View check run for this annotation

Codecov / codecov/patch

bellows/ezsp/protocol.py#L97

Added line #L97 was not covered by tests
"Send semaphore is locked, delaying before sending %s(%r, %r)",
name,
args,
kwargs,
)
delay_time = time.monotonic()
was_delayed = True

Check warning on line 104 in bellows/ezsp/protocol.py

View check run for this annotation

Codecov / codecov/patch

bellows/ezsp/protocol.py#L103-L104

Added lines #L103 - L104 were not covered by tests

async with self._send_semaphore(priority=self._get_command_priority(name)):
if was_delayed:
LOGGER.debug(

Check warning on line 108 in bellows/ezsp/protocol.py

View check run for this annotation

Codecov / codecov/patch

bellows/ezsp/protocol.py#L108

Added line #L108 was not covered by tests
"Sending command %s: %s %s after %0.2fs delay",
name,
args,
kwargs,
time.monotonic() - delay_time,
)
else:
LOGGER.debug("Sending command %s: %s %s", name, args, kwargs)

data = self._ezsp_frame(name, *args, **kwargs)
cmd_id, _, rx_schema = self.COMMANDS[name]

future = asyncio.get_running_loop().create_future()
self._awaiting[self._seq] = (cmd_id, rx_schema, future)
self._seq = (self._seq + 1) % 256

async with asyncio_timeout(EZSP_CMD_TIMEOUT):
await self._gw.send_data(data)
return await future

async def update_policies(self, policy_config: dict) -> None:
"""Set up the policies for what the NCP should do."""
Expand Down
Loading