Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix anonymous frame handling in Cyphal/UDP; support Python 3.11; synchronize the default MTU setting with LibUDPard #312

Merged
merged 18 commits into from
Aug 29, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 8 additions & 5 deletions .github/workflows/test-and-release.yml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
name: 'Test and Release PyCyphal'
on: push
on: [ push, pull_request ]

# Ensures that only one workflow is running at a time
concurrency:
Expand All @@ -9,11 +9,12 @@ concurrency:
jobs:
pycyphal-test:
name: Test PyCyphal
if: (github.event_name == 'push') || contains(github.event.head_commit.message, '#test')
strategy:
fail-fast: false
matrix:
os: [ ubuntu-20.04, windows-2019-npcap ]
python: [ '3.7', '3.8', '3.9', '3.10' ]
os: [ ubuntu-latest, windows-2019-npcap ]
python: [ '3.8', '3.9', '3.10', '3.11' ]
runs-on: ${{ matrix.os }}
steps:
- name: Check out
Expand Down Expand Up @@ -55,6 +56,7 @@ jobs:
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
SONAR_TOKEN: ${{ secrets.SONAR_TOKEN }}
FORCE_COLOR: 1
shell: bash

- name: Save logs
Expand All @@ -67,7 +69,9 @@ jobs:
pycyphal-release:
name: Release PyCyphal
runs-on: ubuntu-latest
if: contains(github.event.head_commit.message, '#release') || contains(github.ref, '/master')
if: >
(github.event_name == 'push') &&
(contains(github.event.head_commit.message, '#release') || contains(github.ref, '/master'))
needs: pycyphal-test
steps:
- name: Check out
Expand Down Expand Up @@ -97,4 +101,3 @@ jobs:
github_token: ${{ secrets.GITHUB_TOKEN }}
custom_tag: ${{ env.pycyphal_version }}
tag_prefix: ''

3 changes: 1 addition & 2 deletions CONTRIBUTING.rst
Original file line number Diff line number Diff line change
Expand Up @@ -339,8 +339,7 @@ Supporting newer versions of Python
Normally, this should be done a few months after a new version of CPython is released:

1. Update the CI/CD pipelines to enable the new Python version.
2. Update the CD configuration to make sure that the library is released using the newest version of Python.
3. Bump the version number using the ``.dev`` suffix to indicate that it is not release-ready until tested.
2. Bump the version number using the ``.dev`` suffix to indicate that it is not release-ready until tested.

When the CI/CD pipelines pass, you are all set.

Expand Down
2 changes: 1 addition & 1 deletion demo/launch.orc.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ uavcan:
$=:
- $=:
# Wait a bit to let the diagnostic subscriber get ready (it is launched below).
- sleep 2
- sleep 5
- # An empty statement is a join statement -- wait for the previously launched processes to exit before continuing.

# Launch the demo app that implements the thermostat.
Expand Down
30 changes: 19 additions & 11 deletions noxfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
EXTRAS_REQUIRE = dict(CONFIG["options.extras_require"])
assert EXTRAS_REQUIRE, "Config could not be read correctly"

PYTHONS = ["3.7", "3.8", "3.9", "3.10"]
PYTHONS = ["3.8", "3.9", "3.10", "3.11"]
"""The newest supported Python shall be listed last."""

nox.options.error_on_external_run = True
Expand Down Expand Up @@ -96,15 +96,13 @@ def test(session):
# Application-layer tests are run separately after the main test suite because they require DSDL for
# "uavcan" to be transpiled first. That namespace is transpiled as a side-effect of running the main suite.
pytest("--ignore", str(postponed), *map(str, src_dirs))
if is_latest_python(session):
# FIXME HACK Python 3.10 segfaults at exit. This is reproducible up to at least 3.10.10.
python_version = session.run("python", "-V", silent=True)
if "3.10." in python_version or "3.11." in python_version:
# FIXME HACK Python 3.10 & 3.11 segfault at exit. This is reproducible with at least 3.10.10.
# #0 0x00007fd9c0fa0702 in raise () from /usr/lib/libpthread.so.0
# #1 <signal handler called>
# #2 PyVectorcall_Function (callable=0x0) at ./Include/cpython/abstract.h:69
pytest(
str(postponed),
success_codes=[0, -11, 0xC0000005],
)
pytest(str(postponed), success_codes=[0, -11, 0xC0000005])
else:
pytest(str(postponed))
finally:
Expand Down Expand Up @@ -160,7 +158,7 @@ def demo(session):
Test the demo app orchestration example.
This is a separate session because it is dependent on Yakut.
"""
if sys.platform.startswith("win") or "3.7" in session.run("python", "-V", silent=True): # Drop 3.7 check when EOLed
if sys.platform.startswith("win"):
session.log("This session cannot be run on in this environment")
return 0

Expand All @@ -180,7 +178,7 @@ def demo(session):
else:
shutil.copy(s, tmp_dir)

session.env["STOP_AFTER"] = "10"
session.env["STOP_AFTER"] = "12"
session.run("yakut", "orc", "launch.orc.yaml", success_codes=[111])


Expand All @@ -205,7 +203,7 @@ def pristine(session):

@nox.session(reuse_venv=True)
def check_style(session):
session.install("black == 22.*")
session.install("black == 23.*")
session.run("black", "--check", ".")


Expand All @@ -220,7 +218,17 @@ def docs(session):
session.install("-r", "docs/requirements.txt")
out_dir = Path(session.create_tmp()).resolve()
session.cd("docs")
sphinx_args = ["-b", "html", "-W", "--keep-going", f"-j{os.cpu_count() or 1}", ".", str(out_dir)]
# We used to have "-W" here to turn warnings into errors, but it breaks with Python 3.11 because Sphinx there
# emits nonsensical warnings about redefinition of typing.Any. Here's what they look like (line breaks inserted):
#
# /usr/lib/python3.11/typing.py:docstring of typing.Any:1: WARNING:
# duplicate object description of typing.Any, other instance in
# api/pycyphal.application.plug_and_play, use :noindex: for one of them
#
# /usr/lib/python3.11/typing.py:docstring of typing.Any:1: WARNING:
# duplicate object description of typing.Any, other instance in
# api/pycyphal.presentation.subscription_synchronizer.monotonic_clustering, use :noindex: for one of them
sphinx_args = ["-b", "html", "--keep-going", f"-j{os.cpu_count() or 1}", ".", str(out_dir)]
session.run("sphinx-build", *sphinx_args)
session.log(f"DOCUMENTATION BUILD OUTPUT: file://{out_dir}/index.html")

Expand Down
2 changes: 1 addition & 1 deletion pycyphal/_version.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = "1.15.3"
__version__ = "1.15.4"
2 changes: 1 addition & 1 deletion pycyphal/transport/can/_session/_output.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ class _PendingFeedbackKey:


# noinspection PyAbstractClass
class CANOutputSession(CANSession, pycyphal.transport.OutputSession):
class CANOutputSession(CANSession, pycyphal.transport.OutputSession): # pylint: disable=abstract-method
"""
This is actually an abstract class, but its concrete inheritors are hidden from the API.
The implementation is chosen according to the type of the session requested: broadcast or unicast.
Expand Down
4 changes: 2 additions & 2 deletions pycyphal/transport/can/media/socketcan/_socketcan.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,8 @@ async def send(self, frames: typing.Iterable[Envelope], monotonic_deadline: floa
loop.sock_sendall(self._sock, self._compile_native_frame(f.frame)),
timeout=monotonic_deadline - loop.time(),
)
except asyncio.TimeoutError:
break
except OSError as err:
if self._closed: # https://github.com/OpenCyphal/pycyphal/issues/204
break
Expand All @@ -152,8 +154,6 @@ async def send(self, frames: typing.Iterable[Envelope], monotonic_deadline: floa
) from err
self._closed = self._closed or err.errno in self._errno_unrecoverable
raise err
except asyncio.TimeoutError:
break
else:
num_sent += 1
return num_sent
Expand Down
1 change: 0 additions & 1 deletion pycyphal/transport/serial/_frame.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@

@dataclasses.dataclass(frozen=True, repr=False)
class SerialFrame(pycyphal.transport.commons.high_overhead_transport.Frame):

VERSION = 1
NODE_ID_MASK = 2**16 - 1
TRANSFER_ID_MASK = 2**64 - 1
Expand Down
5 changes: 4 additions & 1 deletion pycyphal/transport/serial/_stream_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,10 @@ def proc(
# Create new instance with much larger frame size limit; feed both frames but let the first one be incomplete.
sp = StreamParser(lambda ts, buf, item: outputs.append((ts, buf, item)), 10**6)
assert [] == proc(f1.compile_into(bytearray(200))[:-2]) # First one is ended abruptly.
(tsa, _, a), (tsb, _, b), = proc(
(
(tsa, _, a),
(tsb, _, b),
) = proc(
f2.compile_into(bytearray(200))
) # Then the second frame begins.
assert tsa == ts
Expand Down
25 changes: 13 additions & 12 deletions pycyphal/transport/udp/_frame.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,12 @@ class UDPFrame(pycyphal.transport.commons.high_overhead_transport.Frame):
TRANSFER_ID_MASK = 2**64 - 1
INDEX_MASK = 2**31 - 1

NODE_ID_MAX = 0xFFFE
"""
Cyphal/UDP supports 65535 nodes per logical network, from 0 to 65534 inclusive.
65535 is reserved for the anonymous/broadcast ID.
"""

source_node_id: int | None
destination_node_id: int | None

Expand All @@ -57,10 +63,10 @@ def __post_init__(self) -> None:
if not isinstance(self.priority, pycyphal.transport.Priority):
raise TypeError(f"Invalid priority: {self.priority}") # pragma: no cover

if not (self.source_node_id is None or (0 <= self.source_node_id <= self.NODE_ID_MASK)):
if not (self.source_node_id is None or (0 <= self.source_node_id <= self.NODE_ID_MAX)):
raise ValueError(f"Invalid source node id: {self.source_node_id}")

if not (self.destination_node_id is None or (0 <= self.destination_node_id <= self.NODE_ID_MASK)):
if not (self.destination_node_id is None or (0 <= self.destination_node_id <= self.NODE_ID_MAX)):
raise ValueError(f"Invalid destination node id: {self.destination_node_id}")

if isinstance(self.data_specifier, pycyphal.transport.ServiceDataSpecifier) and self.source_node_id is None:
Expand Down Expand Up @@ -88,16 +94,12 @@ def compile_header_and_payload(self) -> typing.Tuple[memoryview, memoryview]:

if isinstance(self.data_specifier, pycyphal.transport.ServiceDataSpecifier):
snm = True
subject_id = None
service_id = self.data_specifier.service_id
rnr = self.data_specifier.role == self.data_specifier.Role.REQUEST
id_rnr = service_id | ((1 << 14) if rnr else 0)
elif isinstance(self.data_specifier, pycyphal.transport.MessageDataSpecifier):
snm = False
subject_id = self.data_specifier.subject_id
service_id = None
rnr = None
id_rnr = subject_id
id_rnr = self.data_specifier.subject_id
else:
raise TypeError(f"Invalid data specifier: {self.data_specifier}")

Expand Down Expand Up @@ -142,7 +144,7 @@ def parse(image: memoryview) -> typing.Optional[UDPFrame]:
snm = bool(data_specifier_snm & (1 << 15))
data_specifier: pycyphal.transport.DataSpecifier
if snm:
## Service
# Service
service_id = data_specifier_snm & UDPFrame.SERVICE_ID_MASK
rnr = bool(data_specifier_snm & (1 << 14))
# check the service ID
Expand All @@ -156,9 +158,8 @@ def parse(image: memoryview) -> typing.Optional[UDPFrame]:
else pycyphal.transport.ServiceDataSpecifier.Role.RESPONSE,
)
else:
## Message
# Message
subject_id = data_specifier_snm & UDPFrame.SUBJECT_ID_MASK
rnr = None
# check the subject ID
if not (0 <= subject_id <= UDPFrame.SUBJECT_ID_MASK):
return None
Expand All @@ -167,8 +168,8 @@ def parse(image: memoryview) -> typing.Optional[UDPFrame]:

return UDPFrame(
priority=pycyphal.transport.Priority(int_priority),
source_node_id=source_node_id,
destination_node_id=destination_node_id,
source_node_id=source_node_id if source_node_id <= UDPFrame.NODE_ID_MAX else None,
destination_node_id=destination_node_id if destination_node_id <= UDPFrame.NODE_ID_MAX else None,
data_specifier=data_specifier,
transfer_id=transfer_id,
index=(frame_index_eot & UDPFrame.INDEX_MASK),
Expand Down
9 changes: 2 additions & 7 deletions pycyphal/transport/udp/_session/_input.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
from .._frame import UDPFrame

_READ_SIZE = 0xFFFF # Per libpcap documentation, this is to be sufficient always.
NODE_ID_MASK = UDPFrame.NODE_ID_MASK

_logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -125,18 +124,14 @@ async def receive(self, monotonic_deadline: float) -> typing.Optional[pycyphal.t
# это проблема но мы это потом починим
if frame.data_specifier != self._specifier.data_specifier:
continue
if frame.source_node_id == self._local_node_id:
if (self._local_node_id is not None) and (frame.source_node_id == self._local_node_id):
continue
if not self.specifier.is_promiscuous:
if frame.source_node_id != self.specifier.remote_node_id:
continue
self._statistics.frames += 1
source_node_id = frame.source_node_id
assert (
isinstance(source_node_id, int) and 0 <= source_node_id <= NODE_ID_MASK
), "Internal protocol violation"
# Anonymous - no reconstruction needed
if source_node_id == NODE_ID_MASK:
if source_node_id is None: # Anonymous - no reconstruction needed
transfer = TransferReassembler.construct_anonymous_transfer(ts, frame)
else:
_logger.debug("%s: Processing frame %s", self, frame)
Expand Down
13 changes: 9 additions & 4 deletions pycyphal/transport/udp/_tracer.py
Original file line number Diff line number Diff line change
Expand Up @@ -200,13 +200,18 @@ def _get_session(self, specifier: AlienSessionSpecifier) -> _AlienSession:

class _AlienSession:
def __init__(self, specifier: AlienSessionSpecifier) -> None:
assert specifier.source_node_id is not None
self._specifier = specifier
self._reassembler = AlienTransferReassembler(specifier.source_node_id)
src = specifier.source_node_id
self._reassembler = AlienTransferReassembler(src) if src is not None else None

def update(self, timestamp: Timestamp, frame: UDPFrame) -> typing.Optional[Trace]:
tid_timeout = self._reassembler.transfer_id_timeout
tr = self._reassembler.process_frame(timestamp, frame)
reasm = self._reassembler
tid_timeout = reasm.transfer_id_timeout if reasm is not None else 0.0
tr: TransferFrom | TransferReassembler.Error | None
if reasm is not None:
tr = reasm.process_frame(timestamp, frame)
else:
tr = TransferReassembler.construct_anonymous_transfer(timestamp, frame)
if isinstance(tr, TransferReassembler.Error):
return UDPErrorTrace(timestamp=timestamp, error=tr)
if isinstance(tr, TransferFrom):
Expand Down
32 changes: 17 additions & 15 deletions pycyphal/transport/udp/_udp.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,15 +42,21 @@ class UDPTransport(pycyphal.transport.Transport):

TRANSFER_ID_MODULO = UDPFrame.TRANSFER_ID_MASK + 1

VALID_MTU_RANGE = 1200, 9000
MTU_MIN = 4
"""
The minimum is based on the IPv6 specification, which guarantees that the path MTU is at least 1280 bytes large.
This value is also acceptable for virtually all IPv4 local or real-time networks.
Lower MTU values shall not be used because they may lead to multi-frame transfer fragmentation where this is
not expected by the designer, possibly violating the real-time constraints.
This is the application-level MTU, not including the Cyphal/UDP header and other overheads.

The Cyphal/UDP protocol does not limit the maximum MTU value, but the minimum is restricted to 4 bytes
because it is necessary provide space at least for the transfer-CRC.

A conventional Ethernet jumbo frame can carry up to 9 KiB (9216 bytes).
These are the application-level MTU values, so we take overheads into account.
"""

MTU_DEFAULT = 1408
"""
This is the application-level MTU, not including the Cyphal/UDP header and other overheads. The value derived as:

1500B Ethernet MTU (RFC 894) - 60B IPv4 max header - 8B UDP Header - 24B Cyphal header = 1408B payload.
"""

VALID_SERVICE_TRANSFER_MULTIPLIER_RANGE = (1, 5)
Expand All @@ -60,7 +66,7 @@ def __init__(
local_ip_address: IPAddress | str,
local_node_id: typing.Optional[int] = 0,
*, # The following parameters are keyword-only.
mtu: int = min(VALID_MTU_RANGE),
mtu: int = MTU_DEFAULT,
service_transfer_multiplier: int = 1,
loop: typing.Optional[asyncio.AbstractEventLoop] = None,
anonymous: bool = False,
Expand All @@ -86,12 +92,9 @@ def __init__(

:param mtu: The application-level MTU for outgoing packets.
In other words, this is the maximum number of serialized bytes per Cyphal/UDP frame.
Transfers where the number of payload bytes does not exceed this value will be single-frame transfers,
otherwise, multi-frame transfers will be used.
This setting affects only outgoing frames;
the MTU of incoming frames is fixed at a sufficiently large value to accept any meaningful UDP frame.

The default value is the smallest valid value for reasons of compatibility.
Transfers where the number of payload bytes does not exceed this value minus 4 bytes for the CRC
will be single-frame transfers; otherwise, multi-frame transfers will be used.
This setting affects only outgoing frames; incoming frames of any MTU are always accepted.

:param service_transfer_multiplier: Forward error correction is disabled by default.
This parameter specifies the number of times each outgoing service transfer will be repeated.
Expand Down Expand Up @@ -123,8 +126,7 @@ def __init__(
if not (low <= self._srv_multiplier <= high):
raise ValueError(f"Invalid service transfer multiplier: {self._srv_multiplier}")

low, high = self.VALID_MTU_RANGE
if not (low <= self._mtu <= high):
if self._mtu < self.MTU_MIN:
raise ValueError(f"Invalid MTU: {self._mtu} bytes")

self._input_registry: typing.Dict[pycyphal.transport.InputSessionSpecifier, UDPInputSession] = {}
Expand Down
Loading