Skip to content

Commit

Permalink
Add asyncio protocol
Browse files Browse the repository at this point in the history
  • Loading branch information
elupus committed May 19, 2023
1 parent d5122c3 commit 687f122
Show file tree
Hide file tree
Showing 7 changed files with 229 additions and 41 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/test-and-lint.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ jobs:
runs-on: ubuntu-latest
strategy:
matrix:
python-version: ['3.7', '3.8', '3.9', '3.10']
python-version: ['3.8', '3.9', '3.10', '3.11']

steps:
- uses: actions/checkout@v1
Expand Down
177 changes: 141 additions & 36 deletions RFXtrx/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@
import threading
import time
import logging
import asyncio
from typing import Callable, Optional


from time import sleep

Expand All @@ -38,6 +41,45 @@
_LOGGER = logging.getLogger(__name__)


def parse(data):
""" Parse the given data and return an RFXtrxEvent """
if data is None:
return None
pkt = lowlevel.parse(data)
if pkt is not None:
if isinstance(pkt, lowlevel.SensorPacket):
obj = SensorEvent(pkt)
elif isinstance(pkt, lowlevel.Status):
obj = StatusEvent(pkt)
else:
obj = ControlEvent(pkt)
return obj
return None


class ParseError(Exception):
""" Error occurred during parsing of packet """


def parse_protected(data: bytes):
""" Parse the given data and always return an event object """
try:
try:
obj = parse(data)
except Exception as exception: # pylint: disable=broad-except
raise ParseError(
"Parse error for data: {0}".format(data.hex(" "))
) from exception

if obj is None:
raise ParseError("No packet for data: {0}".format(data.hex(" ")))

return obj
except Exception as exception: # pylint: disable=broad-except
_LOGGER.debug("Unexpected parse error", exc_info=True)
return ExceptionEvent(exception)


###############################################################################
# RFXtrxDevice class
###############################################################################
Expand Down Expand Up @@ -674,6 +716,18 @@ def __str__(self):
return "{0} device=[{1}]".format(
type(self), self.device)


class ExceptionEvent(RFXtrxEvent):
""" Error from parsing occured """
def __init__(self, exception):
super().__init__(None)
self.exception = exception

def __str__(self):
return "{0} {1}".format(
type(self), repr(self.exception))


###############################################################################
# DummySerial class
###############################################################################
Expand Down Expand Up @@ -741,21 +795,7 @@ class RFXtrxTransport:
@staticmethod
def parse(data):
""" Parse the given data and return an RFXtrxEvent """
if data is None:
return None
pkt = lowlevel.parse(data)
if pkt is not None:
if isinstance(pkt, lowlevel.SensorPacket):
obj = SensorEvent(pkt)
elif isinstance(pkt, lowlevel.Status):
obj = StatusEvent(pkt)
else:
obj = ControlEvent(pkt)

# Store the latest RF signal data
obj.data = data
return obj
return None
return parse(data)

def reset(self):
""" reset the rfxtrx device """
Expand Down Expand Up @@ -812,6 +852,7 @@ def receive_blocking(self):
" ".join("0x{0:02x}".format(x) for x in pkt)
)
return self.parse(pkt)
return None

def send(self, data):
""" Send the given packet """
Expand All @@ -829,7 +870,7 @@ def send(self, data):

def reset(self):
""" Reset the RFXtrx """
self.send(b'\x0D\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00')
self.send(lowlevel.COMMAND_RESET)
sleep(0.3) # Should work with 0.05, but not for me
self.serial.flushInput()

Expand Down Expand Up @@ -887,6 +928,7 @@ def receive_blocking(self):
" ".join("0x{0:02x}".format(x) for x in pkt)
)
return self.parse(pkt)
return None

def send(self, data):
""" Send the given packet """
Expand All @@ -904,7 +946,7 @@ def send(self, data):

def reset(self):
""" Reset the RFXtrx """
self.send(b'\x0D\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00')
self.send(lowlevel.COMMAND_RESET)
sleep(0.3)
self.sock.sendall(b'')

Expand Down Expand Up @@ -1021,37 +1063,100 @@ def close_connection(self):

def set_recmodes(self, modenames):
""" Sets the device modes (which protocols to decode) """
data = bytearray([0x0D, 0x00, 0x00, 0x00, 0x03, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00])

# Keep the values read during init.
data[5] = self._status.device.tranceiver_type
data[6] = self._status.device.output_power

# Build the mode data bytes from the mode names
for mode in modenames:
byteno, bitno = lowlevel.get_recmode_tuple(mode)
if byteno is None:
raise ValueError('Unknown mode name '+mode)

data[7 + byteno] |= 1 << bitno

data = lowlevel.set_mode_packet(modenames, self._status)
self.transport.send(data)
self._modes = modenames
return self.transport.receive_blocking()

def send_start(self):
""" Sends the Start RFXtrx transceiver command """
self.transport.send(b'\x0D\x00\x00\x03\x07\x00\x00'
b'\x00\x00\x00\x00\x00\x00\x00')
self.transport.send(lowlevel.COMMAND_START)
return self.transport.receive_blocking()

def send_get_status(self):
""" Sends the Get Status command """
self.transport.send(b'\x0D\x00\x00\x01\x02\x00\x00'
b'\x00\x00\x00\x00\x00\x00\x00')
self.transport.send(lowlevel.COMMAND_GET_STATUS)
return self.transport.receive_blocking()


class Core(Connect):
""" The main class for rfxcom-py. Has changed name to Connect """


class AsyncClient(asyncio.protocols.BufferedProtocol,
asyncio.protocols.Protocol):
""" Async protocol for parsing data from rfxtrx device """

def __init__(self, callback: Callable[[RFXtrxEvent], None]) -> None:
super().__init__()
self._buffer = bytearray(256)
self._view = memoryview(self._buffer)
self._pos = 0
self._len = 1
self._callback = callback
self._transport: Optional[asyncio.BaseTransport] = None

def data_received(self, data: bytes) -> None:
"""Wrapper for non buffer protocol transports."""
idx = 0
while idx < len(data):
cnt = len(data)-idx
buffer = self.get_buffer(cnt)
cnt = min(len(buffer), cnt)
assert cnt
buffer[0:cnt] = bytes(data[idx:idx+cnt])
self.buffer_updated(cnt)
idx += cnt

def get_buffer(self, _: int) -> memoryview:
"""Return view of buffer for next needed data."""
return self._view[self._pos:self._len]

def _packet_received(self, pkt: bytes):
""" Packet received. """
_LOGGER.debug(
"Recv: %s",
" ".join("0x{0:02x}".format(x) for x in pkt)
)

obj = parse_protected(pkt)
try:
self._callback(obj)
except Exception: # pylint: disable=broad-except
_LOGGER.exception("Unexpected callback error: %s", pkt)

def buffer_updated(self, nbytes):
""" New data received. """
self._pos += nbytes
assert self._pos <= self._len

if self._len == 1:
self._len = self._buffer[0] + 1

if self._len == self._pos:
try:
self._packet_received(self._buffer[:self._pos])
finally:
self._pos = 0
self._len = 1

def connection_made(self, transport: asyncio.BaseTransport) -> None:
""" Connection to server was made. """
self._transport = transport
self._pos = 0
self._len = 1

def connection_lost(self, exc: Optional[Exception]) -> None:
""" Connection to server was lost. """
if exc:
_LOGGER.error("Unexpected disconnection: %s", exc)
self._transport = None

def send(self, data: bytes):
""" Send a set of data to server. """
assert isinstance(self._transport, asyncio.WriteTransport)
self._transport.write(data)

def close(self):
""" Close down protocol. """
self._transport.close()
30 changes: 28 additions & 2 deletions RFXtrx/lowlevel.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,13 @@
# pylint: disable=C0302,R0902,R0903,R0911,R0913
# pylint: disable= too-many-lines, too-many-statements

COMMAND_RESET = b'\x0D\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'
COMMAND_START = b'\x0D\x00\x00\x03\x07\x00\x00\x00\x00\x00\x00\x00\x00\x00'
COMMAND_GET_STATUS = (
b'\x0D\x00\x00\x01\x02\x00\x00\x00\x00\x00\x00\x00\x00\x00'
)


###############################################################################
# Packet class
###############################################################################
Expand Down Expand Up @@ -2278,7 +2285,7 @@ def load_receive(self, data):
(data[10] << 8) + data[11])
self.prodwatthours = ((data[12] * pow(2, 24)) + (data[13] << 16) +
(data[14] << 8) + data[15])
self.tarif_num = (data[16] & 0x0f)
self.tarif_num = data[16] & 0x0f
self.voltage = data[17] + 200
self.currentwatt = (data[18] << 8) + data[19]
self.state_byte = data[20]
Expand Down Expand Up @@ -2378,7 +2385,7 @@ def set_transmit(self, subtype, seqnbr, id1, id2, sound):
self.id2 = id2
self.sound = sound
self.rssi = 0
self.rssi_byte = (self.rssi << 4)
self.rssi_byte = self.rssi << 4
self.data = bytearray([self.packetlength, self.packettype,
self.subtype, self.seqnbr,
self.id1, self.id2, self.sound,
Expand Down Expand Up @@ -3086,3 +3093,22 @@ def parse(data):
return None

return pkt


def set_mode_packet(modenames, status):
"""Construct a mode packet."""
data = bytearray([0x0D, 0x00, 0x00, 0x00, 0x03, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00])

# Keep the values read during init.
data[5] = status.device.tranceiver_type
data[6] = status.device.output_power

# Build the mode data bytes from the mode names
for mode in modenames:
byteno, bitno = get_recmode_tuple(mode)
if byteno is None:
raise ValueError('Unknown mode name '+mode)

data[7 + byteno] |= 1 << bitno
return data
3 changes: 2 additions & 1 deletion requirements_test.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,5 @@ pylint>=1.5.3
coveralls>=1.1
pytest>=2.8.0
pytest-cov>=2.2.0
pytest-timeout>=1.0.0
pytest-timeout>=1.0.0
pytest-asyncio>=0.20.3
1 change: 1 addition & 0 deletions setup.cfg
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
[tool:pytest]
testpaths = tests
asyncio_mode=auto
55 changes: 55 additions & 0 deletions tests/test_async.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
"""Test for async connections."""
import asyncio
import logging
from typing import List

import pytest

import RFXtrx

logging.basicConfig(level=logging.DEBUG)


EVENT_1_DATA = bytes([0x07, 0x10, 0x00, 0x2A, 0x45, 0x05, 0x01, 0x70])
EVENT_1_STR = "<class 'RFXtrx.ControlEvent'> device=[<class 'RFXtrx.LightingDevice'> type='X10 lighting' id='E5'] values=[('Command', 'On'), ('Rssi numeric', 7)]"

EVENT_2_DATA = bytes(
[0x0B, 0x55, 0x02, 0x03, 0x12, 0x34, 0x02, 0x50, 0x01, 0x23, 0x45, 0x57]
)
EVENT_2_STR = "<class 'RFXtrx.SensorEvent'> device=[<class 'RFXtrx.RFXtrxDevice'> type='PCR800' id='12:34'] values=[('Battery numeric', 7), ('Rain rate', 5.92), ('Rain total', 7456.5), ('Rssi numeric', 5)]"

# Cut an event short
EVENT_SHORT_DATA = bytes([EVENT_2_DATA[0] - 1, *EVENT_2_DATA[1:-1]])
EVENT_SHORT_STR = "<class 'RFXtrx.ExceptionEvent'> ParseError('No packet for data: 0a 55 02 03 12 34 02 50 01 23 45')"


@pytest.mark.parametrize(
["data", "events"],
[
pytest.param([EVENT_1_DATA], [EVENT_1_STR], id="one_packet"),
pytest.param(
[EVENT_1_DATA[:1], EVENT_1_DATA[1:]], [EVENT_1_STR], id="split_packet"
),
pytest.param(
[EVENT_1_DATA[:1], EVENT_1_DATA[1:4], EVENT_1_DATA[4:], EVENT_2_DATA],
[EVENT_1_STR, EVENT_2_STR],
id="combined_packet",
),
pytest.param([EVENT_SHORT_DATA], [EVENT_SHORT_STR], id="invalid_len"),
],
)
async def test_parse_segmented(data: List[bytes], events: List[str]):
"""Verify that split packets end up parsing correctly"""
received_events = []

def callback(event: RFXtrx.RFXtrxEvent):
received_events.append(str(event))

transport = asyncio.BaseTransport()
protocol: asyncio.Protocol = RFXtrx.AsyncClient(callback)
protocol.connection_made(transport)
for x in data:
protocol.data_received(x)
protocol.connection_lost(None)

assert received_events == events
2 changes: 1 addition & 1 deletion tox.ini
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@ skip_missing_interpreters = True

[gh-actions]
python =
3.7: py37
3.8: py38
3.9: py39
3.10: py310, lint
3.11: py311

[testenv]
setenv =
Expand Down

0 comments on commit 687f122

Please sign in to comment.