From f2dc8bd84ef2c49b9726ba7d88db3c7f8b932436 Mon Sep 17 00:00:00 2001 From: Gilles Boccon-Gibod Date: Sun, 4 Feb 2024 19:56:18 -0800 Subject: [PATCH 1/3] wip (+2 squashed commits) Squashed commits: [451a295] wip [ed7b5b6] wip (+1 squashed commit) Squashed commits: [9d938c8] wip wip wip --- apps/rfcomm_bridge.py | 475 ++++++++++++++++++ bumble/l2cap.py | 1 + bumble/rfcomm.py | 101 +++- bumble/sdp.py | 2 +- .../google/bumble/btbench/SocketClient.kt | 8 +- setup.cfg | 1 + 6 files changed, 564 insertions(+), 24 deletions(-) create mode 100644 apps/rfcomm_bridge.py diff --git a/apps/rfcomm_bridge.py b/apps/rfcomm_bridge.py new file mode 100644 index 00000000..f7a08724 --- /dev/null +++ b/apps/rfcomm_bridge.py @@ -0,0 +1,475 @@ +# Copyright 2024 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# ----------------------------------------------------------------------------- +# Imports +# ----------------------------------------------------------------------------- +import asyncio +import logging +import os +import time +from typing import Optional + +import click + +from bumble.colors import color +from bumble.device import Device, Connection +from bumble import core +from bumble import hci +from bumble import rfcomm +from bumble import transport +from bumble import utils + + +# ----------------------------------------------------------------------------- +# Constants +# ----------------------------------------------------------------------------- +DEFAULT_RFCOMM_UUID = "E6D55659-C8B4-4B85-96BB-B1143AF6D3AE" +DEFAULT_MTU = 4096 +DEFAULT_TCP_PORT = 9544 + +TRACE_MAX_SIZE = 48 + + +# ----------------------------------------------------------------------------- +class Tracer: + """ + Trace data buffers transmitted from one endpoint to another, with stats. + """ + + def __init__(self, channel_name: str) -> None: + self.channel_name = channel_name + self.last_ts: float = 0.0 + + def trace_data(self, data: bytes) -> None: + now = time.time() + elapsed_s = now - self.last_ts if self.last_ts else 0 + elapsed_ms = int(elapsed_s * 1000) + instant_throughput_kbps = ((len(data) / elapsed_s) / 1000) if elapsed_s else 0.0 + + hex_str = data[:TRACE_MAX_SIZE].hex() + ( + "..." if len(data) > TRACE_MAX_SIZE else "" + ) + print( + f"[{self.channel_name}] {len(data):4} bytes " + f"(+{elapsed_ms:4}ms, {instant_throughput_kbps: 7.2f}kB/s) " + f" {hex_str}" + ) + + self.last_ts = now + + +# ----------------------------------------------------------------------------- +class ServerBridge: + """ + RFCOMM server bridge: waits for a peer to connect an RFCOMM channel. + The RFCOMM channel may be associated with a UUID published in an SDP service + description, or simply be on a system-assigned channel number. + When the connection is made, the bridge connects a TCP socket to a remote host and + bridges the data in both directions, with flow control. + When the RFCOMM channel is closed, the bridge disconnects the TCP socket + and waits for a new channel to be connected. + """ + + READ_CHUNK_SIZE = 4096 + + def __init__( + self, channel: int, uuid: str, trace: bool, tcp_host: str, tcp_port: int + ) -> None: + self.device: Optional[Device] = None + self.channel = channel + self.uuid = uuid + self.tcp_host = tcp_host + self.tcp_port = tcp_port + self.rfcomm_channel: Optional[rfcomm.DLC] = None + self.tcp_tracer: Optional[Tracer] + self.rfcomm_tracer: Optional[Tracer] + + if trace: + self.tcp_tracer = Tracer(color("RFCOMM->TCP", "cyan")) + self.rfcomm_tracer = Tracer(color("TCP->RFCOMM", "magenta")) + else: + self.rfcomm_tracer = None + self.tcp_tracer = None + + async def start(self, device: Device) -> None: + self.device = device + + # Create and register a server + rfcomm_server = rfcomm.Server(self.device) + + # Listen for incoming DLC connections + self.channel = rfcomm_server.listen(self.on_rfcomm_channel, self.channel) + + # Setup the SDP to advertise this channel + service_record_handle = 0x00010001 + self.device.sdp_service_records = { + service_record_handle: rfcomm.make_service_sdp_records( + service_record_handle, self.channel, core.UUID(self.uuid) + ) + } + + # We're ready for a connection + self.device.on("connection", self.on_connection) + await self.set_available(True) + + print( + color( + ( + f"### Listening for RFCOMM connection on {device.public_address}, " + f"channel {self.channel}" + ), + "yellow", + ) + ) + + async def set_available(self, available: bool): + # Become discoverable and connectable + assert self.device + await self.device.set_connectable(available) + await self.device.set_discoverable(available) + + def on_connection(self, connection): + print(color(f"@@@ Bluetooth connection: {connection}", "blue")) + connection.on("disconnection", self.on_disconnection) + + # Don't accept new connections until we're disconnected + utils.AsyncRunner.spawn(self.set_available(False)) + + def on_disconnection(self, reason: int): + print( + color("@@@ Bluetooth disconnection:", "blue"), + hci.HCI_Constant.error_name(reason), + ) + + # We're ready for a new connection + utils.AsyncRunner.spawn(self.set_available(True)) + + # Called when an RFCOMM channel is established + @utils.AsyncRunner.run_in_task() + async def on_rfcomm_channel(self, rfcomm_channel): + print(color("*** RFCOMM channel:", "cyan"), rfcomm_channel) + + # Connect to the TCP server + print( + color( + f"### Connecting to TCP {self.tcp_host}:{self.tcp_port}", + "yellow", + ) + ) + try: + reader, writer = await asyncio.open_connection(self.tcp_host, self.tcp_port) + except OSError: + print(color("!!! Connection failed", "red")) + await rfcomm_channel.disconnect() + return + + # Pipe data from RFCOMM to TCP + def on_rfcomm_channel_closed(): + print(color("*** RFCOMM channel closed", "cyan")) + writer.close() + + def write_rfcomm_data(data): + if self.rfcomm_tracer: + self.rfcomm_tracer.trace_data(data) + + writer.write(data) + + rfcomm_channel.sink = write_rfcomm_data + rfcomm_channel.on("close", on_rfcomm_channel_closed) + + # Pipe data from TCP to RFCOMM + while True: + try: + data = await reader.read(self.READ_CHUNK_SIZE) + + if len(data) == 0: + print(color("### TCP end of stream", "yellow")) + if rfcomm_channel.state == rfcomm.DLC.State.CONNECTED: + await rfcomm_channel.disconnect() + return + + if self.tcp_tracer: + self.tcp_tracer.trace_data(data) + + rfcomm_channel.write(data) + await rfcomm_channel.drain() + except Exception as error: + print(f"!!! Exception: {error}") + break + + writer.close() + await writer.wait_closed() + print(color("~~~ Bye bye", "magenta")) + + +# ----------------------------------------------------------------------------- +class ClientBridge: + """ + RFCOMM client bridge: connects to a BR/EDR device, then waits for an inbound + TCP connection on a specified port number. When a TCP client connects, an + RFCOMM connection to the device is established, and the data is bridged in both + directions, with flow control. + When the TCP connection is closed by the client, the RFCOMM channel is + disconnected, but the connection to the device remains, ready for a new TCP client + to connect. + """ + + READ_CHUNK_SIZE = 4096 + + def __init__( + self, + channel: int, + uuid: str, + trace: bool, + address: str, + tcp_host: str, + tcp_port: int, + encrypt: bool, + ): + self.channel = channel + self.uuid = uuid + self.trace = trace + self.address = address + self.tcp_host = tcp_host + self.tcp_port = tcp_port + self.encrypt = encrypt + self.device: Optional[Device] = None + self.connection: Optional[Connection] = None + self.rfcomm_client: Optional[rfcomm.Client] + self.rfcomm_mux: Optional[rfcomm.Multiplexer] + self.tcp_connected: bool = False + + self.tcp_tracer: Optional[Tracer] + self.rfcomm_tracer: Optional[Tracer] + + if trace: + self.tcp_tracer = Tracer(color("RFCOMM->TCP", "cyan")) + self.rfcomm_tracer = Tracer(color("TCP->RFCOMM", "magenta")) + else: + self.rfcomm_tracer = None + self.tcp_tracer = None + + async def connect(self) -> None: + if self.connection: + return + + print(color(f"@@@ Connecting to Bluetooth {self.address}", "blue")) + assert self.device + self.connection = await self.device.connect( + self.address, transport=core.BT_BR_EDR_TRANSPORT + ) + print(color(f"@@@ Bluetooth connection: {self.connection}", "blue")) + + if self.encrypt: + print(color("@@@ Encrypting Bluetooth connection", "blue")) + await self.connection.encrypt() + print(color("@@@ Bluetooth connection encrypted", "blue")) + + self.rfcomm_client = rfcomm.Client(self.connection) + self.rfcomm_mux = await self.rfcomm_client.start() + + def on_disconnection(reason): + print( + color("@@@ Bluetooth disconnection:", "red"), + hci.HCI_Constant.error_name(reason), + ) + self.connection = None + + self.connection.on("disconnection", on_disconnection) + + async def start(self, device: Device) -> None: + self.device = device + + # Called when a TCP connection is established + async def on_tcp_connection(reader, writer): + print(color("<<< TCP connection", "magenta")) + if self.tcp_connected: + print( + color("!!! TCP connection already active, rejecting new one", "red") + ) + writer.close() + return + self.tcp_connected = True + + await self.pipe(reader, writer) + writer.close() + await writer.wait_closed() + + await asyncio.start_server( + on_tcp_connection, + host=self.tcp_host if self.tcp_host != "_" else None, + port=self.tcp_port, + ) + print( + color( + f"### Listening for TCP connections on port {self.tcp_port}", "magenta" + ) + ) + + async def pipe( + self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter + ) -> None: + # Resolve the channel number from the UUID if needed + if self.channel == 0: + await self.connect() + assert self.connection + channel = await rfcomm.find_rfcomm_channel_with_uuid( + self.connection, self.uuid + ) + if channel: + print(color(f"### Found RFCOMM channel {channel}", "yellow")) + else: + print(color(f"!!! RFCOMM channel with UUID {self.uuid} not found")) + return + else: + channel = self.channel + + # Connect a new RFCOMM channel + await self.connect() + assert self.rfcomm_mux + print(color(f'*** Opening RFCOMM channel {channel}', 'green')) + try: + rfcomm_channel = await self.rfcomm_mux.open_dlc(channel) + print(color(f'*** RFCOMM channel open: {rfcomm_channel}', "green")) + except Exception as error: + print(color(f'!!! RFCOMM open failed: {error}', 'red')) + return + + # Pipe data from RFCOMM to TCP + def on_rfcomm_channel_closed(): + print(color("*** RFCOMM channel closed", "green")) + + def write_rfcomm_data(data): + if self.trace: + self.rfcomm_tracer.trace_data(data) + + writer.write(data) + + rfcomm_channel.on("close", on_rfcomm_channel_closed) + rfcomm_channel.sink = write_rfcomm_data + + # Pipe data from TCP to RFCOMM + while True: + try: + data = await reader.read(self.READ_CHUNK_SIZE) + + if len(data) == 0: + print(color("### TCP end of stream", "yellow")) + if rfcomm_channel.state == rfcomm.DLC.State.CONNECTED: + await rfcomm_channel.disconnect() + self.tcp_connected = False + return + + if self.tcp_tracer: + self.tcp_tracer.trace_data(data) + + rfcomm_channel.write(data) + await rfcomm_channel.drain() + except Exception as error: + print(f"!!! Exception: {error}") + break + + print(color("~~~ Bye bye", "magenta")) + + +# ----------------------------------------------------------------------------- +async def run(device_config, hci_transport, bridge): + print("<<< connecting to HCI...") + async with await transport.open_transport_or_link(hci_transport) as ( + hci_source, + hci_sink, + ): + print("<<< connected") + + device = Device.from_config_file_with_hci(device_config, hci_source, hci_sink) + device.classic_enabled = True + + # Let's go + await device.power_on() + try: + await bridge.start(device) + + # Wait until the transport terminates + await hci_source.wait_for_termination() + except core.ConnectionError as error: + print(color(f"!!! Bluetooth connection failed: {error}", "red")) + except Exception as error: + print(f"Exception while running bridge: {error}") + + +# ----------------------------------------------------------------------------- +@click.group() +@click.pass_context +@click.option("--device-config", help="Device configuration file", required=True) +@click.option("--hci-transport", help="HCI transport", required=True) +@click.option("--trace", is_flag=True, help="Trace bridged data to stdout") +@click.option("--channel", help="RFCOMM channel number", type=int, default=0) +@click.option("--uuid", help="UUID for the RFCOMM channel", default=DEFAULT_RFCOMM_UUID) +def cli( + context, + device_config, + hci_transport, + trace, + channel, + uuid, +): + context.ensure_object(dict) + context.obj["device_config"] = device_config + context.obj["hci_transport"] = hci_transport + context.obj["trace"] = trace + context.obj["channel"] = channel + context.obj["uuid"] = uuid + + +# ----------------------------------------------------------------------------- +@cli.command() +@click.pass_context +@click.option("--tcp-host", help="TCP host", default="localhost") +@click.option("--tcp-port", help="TCP port", default=DEFAULT_TCP_PORT) +def server(context, tcp_host, tcp_port): + bridge = ServerBridge( + context.obj["channel"], + context.obj["uuid"], + context.obj["trace"], + tcp_host, + tcp_port, + ) + asyncio.run(run(context.obj["device_config"], context.obj["hci_transport"], bridge)) + + +# ----------------------------------------------------------------------------- +@cli.command() +@click.pass_context +@click.argument("bluetooth-address") +@click.option("--tcp-host", help="TCP host", default="_") +@click.option("--tcp-port", help="TCP port", default=DEFAULT_TCP_PORT) +@click.option("--encrypt", is_flag=True, help="Encrypt the connection") +def client(context, bluetooth_address, tcp_host, tcp_port, encrypt): + bridge = ClientBridge( + context.obj["channel"], + context.obj["uuid"], + context.obj["trace"], + bluetooth_address, + tcp_host, + tcp_port, + encrypt, + ) + asyncio.run(run(context.obj["device_config"], context.obj["hci_transport"], bridge)) + + +# ----------------------------------------------------------------------------- +logging.basicConfig(level=os.environ.get("BUMBLE_LOGLEVEL", "WARNING").upper()) +if __name__ == "__main__": + cli(obj={}) # pylint: disable=no-value-for-parameter diff --git a/bumble/l2cap.py b/bumble/l2cap.py index cec14b85..3eef0e05 100644 --- a/bumble/l2cap.py +++ b/bumble/l2cap.py @@ -70,6 +70,7 @@ L2CAP_MIN_LE_MTU = 23 L2CAP_MIN_BR_EDR_MTU = 48 +L2CAP_MAX_BR_EDR_MTU = 65535 L2CAP_DEFAULT_MTU = 2048 # Default value for the MTU we are willing to accept diff --git a/bumble/rfcomm.py b/bumble/rfcomm.py index 1020a1ea..c95a4c78 100644 --- a/bumble/rfcomm.py +++ b/bumble/rfcomm.py @@ -446,10 +446,6 @@ class State(enum.IntEnum): DISCONNECTED = 0x04 RESET = 0x05 - connection_result: Optional[asyncio.Future] - _sink: Optional[Callable[[bytes], None]] - _enqueued_rx_packets: collections.deque[bytes] - def __init__( self, multiplexer: Multiplexer, @@ -469,12 +465,15 @@ def __init__( self.state = DLC.State.INIT self.role = multiplexer.role self.c_r = 1 if self.role == Multiplexer.Role.INITIATOR else 0 - self.connection_result = None + self.connection_result: Optional[asyncio.Future] = None + self.disconnection_result: Optional[asyncio.Future] = None self.drained = asyncio.Event() self.drained.set() # Queued packets when sink is not set. - self._enqueued_rx_packets = collections.deque(maxlen=DEFAULT_RX_QUEUE_SIZE) - self._sink = None + self._enqueued_rx_packets: collections.deque[bytes] = collections.deque( + maxlen=DEFAULT_RX_QUEUE_SIZE + ) + self._sink: Optional[Callable[[bytes], None]] = None # Compute the MTU max_overhead = 4 + 1 # header with 2-byte length + fcs @@ -525,20 +524,35 @@ def on_sabm_frame(self, _frame: RFCOMM_Frame) -> None: self.emit('open') def on_ua_frame(self, _frame: RFCOMM_Frame) -> None: - if self.state != DLC.State.CONNECTING: + if self.state == DLC.State.CONNECTING: + # Exchange the modem status with the peer + msc = RFCOMM_MCC_MSC(dlci=self.dlci, fc=0, rtc=1, rtr=1, ic=0, dv=1) + mcc = RFCOMM_Frame.make_mcc(mcc_type=MccType.MSC, c_r=1, data=bytes(msc)) + logger.debug(f'>>> MCC MSC Command: {msc}') + self.send_frame(RFCOMM_Frame.uih(c_r=self.c_r, dlci=0, information=mcc)) + + self.change_state(DLC.State.CONNECTED) + if self.connection_result: + self.connection_result.set_result(None) + self.connection_result = None + self.multiplexer.on_dlc_open_complete(self) + elif self.state == DLC.State.DISCONNECTING: + self.change_state(DLC.State.DISCONNECTED) + if self.disconnection_result: + self.disconnection_result.set_result(None) + self.disconnection_result = None + self.multiplexer.on_dlc_disconnection(self) + self.emit('close') + else: logger.warning( - color('!!! received SABM when not in CONNECTING state', 'red') + color( + ( + '!!! received UA frame when not in ' + 'CONNECTING or DISCONNECTING state' + ), + 'red', + ) ) - return - - # Exchange the modem status with the peer - msc = RFCOMM_MCC_MSC(dlci=self.dlci, fc=0, rtc=1, rtr=1, ic=0, dv=1) - mcc = RFCOMM_Frame.make_mcc(mcc_type=MccType.MSC, c_r=1, data=bytes(msc)) - logger.debug(f'>>> MCC MSC Command: {msc}') - self.send_frame(RFCOMM_Frame.uih(c_r=self.c_r, dlci=0, information=mcc)) - - self.change_state(DLC.State.CONNECTED) - self.multiplexer.on_dlc_open_complete(self) def on_dm_frame(self, frame: RFCOMM_Frame) -> None: # TODO: handle all states @@ -609,6 +623,19 @@ def connect(self) -> None: self.connection_result = asyncio.get_running_loop().create_future() self.send_frame(RFCOMM_Frame.sabm(c_r=self.c_r, dlci=self.dlci)) + async def disconnect(self) -> None: + if self.state != DLC.State.CONNECTED: + raise InvalidStateError('invalid state') + + self.disconnection_result = asyncio.get_running_loop().create_future() + self.change_state(DLC.State.DISCONNECTING) + self.send_frame( + RFCOMM_Frame.disc( + c_r=1 if self.role == Multiplexer.Role.INITIATOR else 0, dlci=self.dlci + ) + ) + await self.disconnection_result + def accept(self) -> None: if self.state != DLC.State.INIT: raise InvalidStateError('invalid state') @@ -689,6 +716,17 @@ def write(self, data: Union[bytes, str]) -> None: async def drain(self) -> None: await self.drained.wait() + def abort(self) -> None: + logger.debug(f'aborting DLC: {self}') + if self.connection_result: + self.connection_result.cancel() + self.connection_result = None + if self.disconnection_result: + self.disconnection_result.cancel() + self.disconnection_result = None + self.change_state(DLC.State.RESET) + self.emit('close') + def __str__(self) -> str: return f'DLC(dlci={self.dlci},state={self.state.name})' @@ -728,6 +766,8 @@ def __init__(self, l2cap_channel: l2cap.ClassicChannel, role: Role) -> None: # Become a sink for the L2CAP channel l2cap_channel.sink = self.on_pdu + l2cap_channel.on('close', self.on_l2cap_channel_close) + def change_state(self, new_state: State) -> None: logger.debug(f'{self} state change -> {color(new_state.name, "cyan")}') self.state = new_state @@ -791,6 +831,7 @@ def on_dm_frame(self, _frame: RFCOMM_Frame) -> None: 'rfcomm', ) ) + self.open_result = None else: logger.warning(f'unexpected state for DM: {self}') @@ -915,15 +956,31 @@ async def open_dlc( information=mcc, ) ) - result = await self.open_result - self.open_result = None - return result + return await self.open_result def on_dlc_open_complete(self, dlc: DLC) -> None: logger.debug(f'DLC [{dlc.dlci}] open complete') + self.change_state(Multiplexer.State.CONNECTED) + if self.open_result: self.open_result.set_result(dlc) + self.open_result = None + + def on_dlc_disconnection(self, dlc: DLC) -> None: + logger.debug(f'DLC [{dlc.dlci}] disconnection') + self.dlcs.pop(dlc.dlci, None) + + def on_l2cap_channel_close(self) -> None: + logger.debug('L2CAP channel closed, cleaning up') + if self.open_result: + self.open_result.cancel() + self.open_result = None + if self.disconnection_result: + self.disconnection_result.cancel() + self.disconnection_result = None + for dlc in self.dlcs.values(): + dlc.abort() def __str__(self) -> str: return f'Multiplexer(state={self.state.name})' diff --git a/bumble/sdp.py b/bumble/sdp.py index 35b4a3a1..543c322f 100644 --- a/bumble/sdp.py +++ b/bumble/sdp.py @@ -997,7 +997,7 @@ def on_pdu(self, pdu): try: handler(sdp_pdu) except Exception as error: - logger.warning(f'{color("!!! Exception in handler:", "red")} {error}') + logger.exception(f'{color("!!! Exception in handler:", "red")} {error}') self.send_response( SDP_ErrorResponse( transaction_id=sdp_pdu.transaction_id, diff --git a/extras/android/BtBench/app/src/main/java/com/github/google/bumble/btbench/SocketClient.kt b/extras/android/BtBench/app/src/main/java/com/github/google/bumble/btbench/SocketClient.kt index bd5b7f4a..46a014a4 100644 --- a/extras/android/BtBench/app/src/main/java/com/github/google/bumble/btbench/SocketClient.kt +++ b/extras/android/BtBench/app/src/main/java/com/github/google/bumble/btbench/SocketClient.kt @@ -56,13 +56,19 @@ class SocketClient(private val viewModel: AppViewModel, private val socket: Blue thread { socketDataSource.receive() + socket.close() + sender.abort() } Log.info("Startup delay: $DEFAULT_STARTUP_DELAY") Thread.sleep(DEFAULT_STARTUP_DELAY.toLong()); Log.info("Starting to send") - sender.run() + try { + sender.run() + } catch (error: IOException) { + Log.info("run ended abruptly") + } cleanup() } } diff --git a/setup.cfg b/setup.cfg index 05d64319..44d85415 100644 --- a/setup.cfg +++ b/setup.cfg @@ -62,6 +62,7 @@ console_scripts = bumble-gatt-dump = bumble.apps.gatt_dump:main bumble-hci-bridge = bumble.apps.hci_bridge:main bumble-l2cap-bridge = bumble.apps.l2cap_bridge:main + bumble-rfcomm-bridge = bumble.apps.rfcomm_bridge:main bumble-pair = bumble.apps.pair:main bumble-scan = bumble.apps.scan:main bumble-show = bumble.apps.show:main From f5baf51132778887ccc46d42233928a5a9f94095 Mon Sep 17 00:00:00 2001 From: Gilles Boccon-Gibod Date: Mon, 3 Jun 2024 18:11:13 -0700 Subject: [PATCH 2/3] improve DLC parameters --- apps/bench.py | 81 +++++++++++++++++++++++++++----- apps/rfcomm_bridge.py | 88 ++++++++++++++++++++++++---------- bumble/l2cap.py | 6 ++- bumble/rfcomm.py | 107 +++++++++++++++++++++++++++--------------- 4 files changed, 204 insertions(+), 78 deletions(-) diff --git a/apps/bench.py b/apps/bench.py index 25c226f2..f0e8b58f 100644 --- a/apps/bench.py +++ b/apps/bench.py @@ -899,14 +899,26 @@ async def drain(self): # RfcommClient # ----------------------------------------------------------------------------- class RfcommClient(StreamedPacketIO): - def __init__(self, device, channel, uuid, l2cap_mtu, max_frame_size, window_size): + def __init__( + self, + device, + channel, + uuid, + l2cap_mtu, + max_frame_size, + initial_credits, + max_credits, + credits_threshold, + ): super().__init__() self.device = device self.channel = channel self.uuid = uuid self.l2cap_mtu = l2cap_mtu self.max_frame_size = max_frame_size - self.window_size = window_size + self.initial_credits = initial_credits + self.max_credits = max_credits + self.credits_threshold = credits_threshold self.rfcomm_session = None self.ready = asyncio.Event() @@ -940,12 +952,17 @@ async def on_connection(self, connection): logging.info(color(f'### Opening session for channel {channel}...', 'yellow')) try: dlc_options = {} - if self.max_frame_size: + if self.max_frame_size is not None: dlc_options['max_frame_size'] = self.max_frame_size - if self.window_size: - dlc_options['window_size'] = self.window_size + if self.initial_credits is not None: + dlc_options['initial_credits'] = self.initial_credits rfcomm_session = await rfcomm_mux.open_dlc(channel, **dlc_options) logging.info(color(f'### Session open: {rfcomm_session}', 'yellow')) + if self.max_credits is not None: + rfcomm_session.rx_max_credits = self.max_credits + if self.credits_threshold is not None: + rfcomm_session.rx_credits_threshold = self.credits_threshold + except bumble.core.ConnectionError as error: logging.info(color(f'!!! Session open failed: {error}', 'red')) await rfcomm_mux.disconnect() @@ -969,8 +986,19 @@ async def drain(self): # RfcommServer # ----------------------------------------------------------------------------- class RfcommServer(StreamedPacketIO): - def __init__(self, device, channel, l2cap_mtu): + def __init__( + self, + device, + channel, + l2cap_mtu, + max_frame_size, + initial_credits, + max_credits, + credits_threshold, + ): super().__init__() + self.max_credits = max_credits + self.credits_threshold = credits_threshold self.dlc = None self.ready = asyncio.Event() @@ -981,7 +1009,12 @@ def __init__(self, device, channel, l2cap_mtu): rfcomm_server = bumble.rfcomm.Server(device, **server_options) # Listen for incoming DLC connections - channel_number = rfcomm_server.listen(self.on_dlc, channel) + dlc_options = {} + if max_frame_size is not None: + dlc_options['max_frame_size'] = max_frame_size + if initial_credits is not None: + dlc_options['initial_credits'] = initial_credits + channel_number = rfcomm_server.listen(self.on_dlc, channel, **dlc_options) # Setup the SDP to advertise this channel device.sdp_service_records = make_sdp_records(channel_number) @@ -1004,6 +1037,10 @@ def on_dlc(self, dlc): dlc.sink = self.on_packet self.io_sink = dlc.write self.dlc = dlc + if self.max_credits is not None: + dlc.rx_max_credits = self.max_credits + if self.credits_threshold is not None: + dlc.rx_credits_threshold = self.credits_threshold async def drain(self): assert self.dlc @@ -1321,7 +1358,9 @@ def create_mode(device): uuid=ctx.obj['rfcomm_uuid'], l2cap_mtu=ctx.obj['rfcomm_l2cap_mtu'], max_frame_size=ctx.obj['rfcomm_max_frame_size'], - window_size=ctx.obj['rfcomm_window_size'], + initial_credits=ctx.obj['rfcomm_initial_credits'], + max_credits=ctx.obj['rfcomm_max_credits'], + credits_threshold=ctx.obj['rfcomm_credits_threshold'], ) if mode == 'rfcomm-server': @@ -1329,6 +1368,10 @@ def create_mode(device): device, channel=ctx.obj['rfcomm_channel'], l2cap_mtu=ctx.obj['rfcomm_l2cap_mtu'], + max_frame_size=ctx.obj['rfcomm_max_frame_size'], + initial_credits=ctx.obj['rfcomm_initial_credits'], + max_credits=ctx.obj['rfcomm_max_credits'], + credits_threshold=ctx.obj['rfcomm_credits_threshold'], ) raise ValueError('invalid mode') @@ -1427,9 +1470,19 @@ def create_role(packet_io): help='RFComm maximum frame size', ) @click.option( - '--rfcomm-window-size', + '--rfcomm-initial-credits', + type=int, + help='RFComm initial credits', +) +@click.option( + '--rfcomm-max-credits', + type=int, + help='RFComm max credits', +) +@click.option( + '--rfcomm-credits-threshold', type=int, - help='RFComm window size', + help='RFComm credits threshold', ) @click.option( '--l2cap-psm', @@ -1530,7 +1583,9 @@ def bench( rfcomm_uuid, rfcomm_l2cap_mtu, rfcomm_max_frame_size, - rfcomm_window_size, + rfcomm_initial_credits, + rfcomm_max_credits, + rfcomm_credits_threshold, l2cap_psm, l2cap_mtu, l2cap_mps, @@ -1545,7 +1600,9 @@ def bench( ctx.obj['rfcomm_uuid'] = rfcomm_uuid ctx.obj['rfcomm_l2cap_mtu'] = rfcomm_l2cap_mtu ctx.obj['rfcomm_max_frame_size'] = rfcomm_max_frame_size - ctx.obj['rfcomm_window_size'] = rfcomm_window_size + ctx.obj['rfcomm_initial_credits'] = rfcomm_initial_credits + ctx.obj['rfcomm_max_credits'] = rfcomm_max_credits + ctx.obj['rfcomm_credits_threshold'] = rfcomm_credits_threshold ctx.obj['l2cap_psm'] = l2cap_psm ctx.obj['l2cap_mtu'] = l2cap_mtu ctx.obj['l2cap_mps'] = l2cap_mps diff --git a/apps/rfcomm_bridge.py b/apps/rfcomm_bridge.py index f7a08724..728e7cfc 100644 --- a/apps/rfcomm_bridge.py +++ b/apps/rfcomm_bridge.py @@ -24,7 +24,7 @@ import click from bumble.colors import color -from bumble.device import Device, Connection +from bumble.device import Device, DeviceConfiguration, Connection from bumble import core from bumble import hci from bumble import rfcomm @@ -37,7 +37,8 @@ # ----------------------------------------------------------------------------- DEFAULT_RFCOMM_UUID = "E6D55659-C8B4-4B85-96BB-B1143AF6D3AE" DEFAULT_MTU = 4096 -DEFAULT_TCP_PORT = 9544 +DEFAULT_CLIENT_TCP_PORT = 9544 +DEFAULT_SERVER_TCP_PORT = 9545 TRACE_MAX_SIZE = 48 @@ -149,7 +150,7 @@ def on_connection(self, connection): def on_disconnection(self, reason: int): print( - color("@@@ Bluetooth disconnection:", "blue"), + color("@@@ Bluetooth disconnection:", "red"), hci.HCI_Constant.error_name(reason), ) @@ -271,6 +272,7 @@ async def connect(self) -> None: self.address, transport=core.BT_BR_EDR_TRANSPORT ) print(color(f"@@@ Bluetooth connection: {self.connection}", "blue")) + self.connection.on("disconnection", self.on_disconnection) if self.encrypt: print(color("@@@ Encrypting Bluetooth connection", "blue")) @@ -278,19 +280,16 @@ async def connect(self) -> None: print(color("@@@ Bluetooth connection encrypted", "blue")) self.rfcomm_client = rfcomm.Client(self.connection) - self.rfcomm_mux = await self.rfcomm_client.start() - - def on_disconnection(reason): - print( - color("@@@ Bluetooth disconnection:", "red"), - hci.HCI_Constant.error_name(reason), - ) - self.connection = None - - self.connection.on("disconnection", on_disconnection) + try: + self.rfcomm_mux = await self.rfcomm_client.start() + except BaseException as e: + print(color("!!! Failed to setup RFCOMM connection", "red"), e) + raise async def start(self, device: Device) -> None: self.device = device + await device.set_connectable(False) + await device.set_discoverable(False) # Called when a TCP connection is established async def on_tcp_connection(reader, writer): @@ -303,9 +302,15 @@ async def on_tcp_connection(reader, writer): return self.tcp_connected = True - await self.pipe(reader, writer) - writer.close() - await writer.wait_closed() + try: + await self.pipe(reader, writer) + except BaseException as error: + print(color("!!! Exception while piping data:", "red"), error) + return + finally: + writer.close() + await writer.wait_closed() + self.tcp_connected = False await asyncio.start_server( on_tcp_connection, @@ -339,12 +344,12 @@ async def pipe( # Connect a new RFCOMM channel await self.connect() assert self.rfcomm_mux - print(color(f'*** Opening RFCOMM channel {channel}', 'green')) + print(color(f"*** Opening RFCOMM channel {channel}", "green")) try: rfcomm_channel = await self.rfcomm_mux.open_dlc(channel) - print(color(f'*** RFCOMM channel open: {rfcomm_channel}', "green")) + print(color(f"*** RFCOMM channel open: {rfcomm_channel}", "green")) except Exception as error: - print(color(f'!!! RFCOMM open failed: {error}', 'red')) + print(color(f"!!! RFCOMM open failed: {error}", "red")) return # Pipe data from RFCOMM to TCP @@ -383,6 +388,13 @@ def write_rfcomm_data(data): print(color("~~~ Bye bye", "magenta")) + def on_disconnection(self, reason: int) -> None: + print( + color("@@@ Bluetooth disconnection:", "red"), + hci.HCI_Constant.error_name(reason), + ) + self.connection = None + # ----------------------------------------------------------------------------- async def run(device_config, hci_transport, bridge): @@ -393,7 +405,14 @@ async def run(device_config, hci_transport, bridge): ): print("<<< connected") - device = Device.from_config_file_with_hci(device_config, hci_source, hci_sink) + if device_config: + device = Device.from_config_file_with_hci( + device_config, hci_source, hci_sink + ) + else: + device = Device.from_config_with_hci( + DeviceConfiguration(), hci_source, hci_sink + ) device.classic_enabled = True # Let's go @@ -412,11 +431,28 @@ async def run(device_config, hci_transport, bridge): # ----------------------------------------------------------------------------- @click.group() @click.pass_context -@click.option("--device-config", help="Device configuration file", required=True) -@click.option("--hci-transport", help="HCI transport", required=True) +@click.option( + "--device-config", + metavar="CONFIG_FILE", + help="Device configuration file", +) +@click.option( + "--hci-transport", metavar="TRANSPORT_NAME", help="HCI transport", required=True +) @click.option("--trace", is_flag=True, help="Trace bridged data to stdout") -@click.option("--channel", help="RFCOMM channel number", type=int, default=0) -@click.option("--uuid", help="UUID for the RFCOMM channel", default=DEFAULT_RFCOMM_UUID) +@click.option( + "--channel", + metavar="CHANNEL_NUMER", + help="RFCOMM channel number", + type=int, + default=0, +) +@click.option( + "--uuid", + metavar="UUID", + help="UUID for the RFCOMM channel", + default=DEFAULT_RFCOMM_UUID, +) def cli( context, device_config, @@ -437,7 +473,7 @@ def cli( @cli.command() @click.pass_context @click.option("--tcp-host", help="TCP host", default="localhost") -@click.option("--tcp-port", help="TCP port", default=DEFAULT_TCP_PORT) +@click.option("--tcp-port", help="TCP port", default=DEFAULT_SERVER_TCP_PORT) def server(context, tcp_host, tcp_port): bridge = ServerBridge( context.obj["channel"], @@ -454,7 +490,7 @@ def server(context, tcp_host, tcp_port): @click.pass_context @click.argument("bluetooth-address") @click.option("--tcp-host", help="TCP host", default="_") -@click.option("--tcp-port", help="TCP port", default=DEFAULT_TCP_PORT) +@click.option("--tcp-port", help="TCP port", default=DEFAULT_CLIENT_TCP_PORT) @click.option("--encrypt", is_flag=True, help="Encrypt the connection") def client(context, bluetooth_address, tcp_host, tcp_port, encrypt): bridge = ClientBridge( diff --git a/bumble/l2cap.py b/bumble/l2cap.py index 3eef0e05..b4f01215 100644 --- a/bumble/l2cap.py +++ b/bumble/l2cap.py @@ -833,7 +833,9 @@ async def connect(self) -> None: # Wait for the connection to succeed or fail try: - return await self.connection_result + return await self.connection.abort_on( + 'disconnection', self.connection_result + ) finally: self.connection_result = None @@ -2226,7 +2228,7 @@ async def create_classic_channel( # Connect try: await channel.connect() - except Exception as e: + except BaseException as e: del connection_channels[source_cid] raise e diff --git a/bumble/rfcomm.py b/bumble/rfcomm.py index c95a4c78..2d8a627a 100644 --- a/bumble/rfcomm.py +++ b/bumble/rfcomm.py @@ -106,9 +106,11 @@ class MccType(enum.IntEnum): 0XBA, 0X2B, 0X59, 0XC8, 0XBD, 0X2C, 0X5E, 0XCF ]) -RFCOMM_DEFAULT_L2CAP_MTU = 2048 -RFCOMM_DEFAULT_WINDOW_SIZE = 7 -RFCOMM_DEFAULT_MAX_FRAME_SIZE = 2000 +RFCOMM_DEFAULT_L2CAP_MTU = 2048 +RFCOMM_DEFAULT_INITIAL_CREDITS = 7 +RFCOMM_DEFAULT_MAX_CREDITS = 32 +RFCOMM_DEFAULT_CREDIT_THRESHOLD = RFCOMM_DEFAULT_MAX_CREDITS // 2 +RFCOMM_DEFAULT_MAX_FRAME_SIZE = 2000 RFCOMM_DYNAMIC_CHANNEL_NUMBER_START = 1 RFCOMM_DYNAMIC_CHANNEL_NUMBER_END = 30 @@ -365,12 +367,12 @@ class RFCOMM_MCC_PN: ack_timer: int max_frame_size: int max_retransmissions: int - window_size: int + initial_credits: int def __post_init__(self) -> None: - if self.window_size < 1 or self.window_size > 7: + if self.initial_credits < 1 or self.initial_credits > 7: logger.warning( - f'Error Recovery Window size {self.window_size} is out of range [1, 7].' + f'Initial credits {self.initial_credits} is out of range [1, 7].' ) @staticmethod @@ -382,7 +384,7 @@ def from_bytes(data: bytes) -> RFCOMM_MCC_PN: ack_timer=data[3], max_frame_size=data[4] | data[5] << 8, max_retransmissions=data[6], - window_size=data[7] & 0x07, + initial_credits=data[7] & 0x07, ) def __bytes__(self) -> bytes: @@ -396,7 +398,7 @@ def __bytes__(self) -> bytes: (self.max_frame_size >> 8) & 0xFF, self.max_retransmissions & 0xFF, # Only 3 bits are meaningful. - self.window_size & 0x07, + self.initial_credits & 0x07, ] ) @@ -450,17 +452,21 @@ def __init__( self, multiplexer: Multiplexer, dlci: int, - max_frame_size: int, - window_size: int, + tx_max_frame_size: int, + tx_initial_credits: int, + rx_max_frame_size: int, + rx_initial_credits: int, ) -> None: super().__init__() self.multiplexer = multiplexer self.dlci = dlci - self.max_frame_size = max_frame_size - self.window_size = window_size - self.rx_credits = window_size - self.rx_threshold = window_size // 2 - self.tx_credits = window_size + self.rx_max_frame_size = rx_max_frame_size + self.rx_initial_credits = rx_initial_credits + self.rx_max_credits = RFCOMM_DEFAULT_MAX_CREDITS + self.rx_credits = rx_initial_credits + self.rx_credits_threshold = RFCOMM_DEFAULT_CREDIT_THRESHOLD + self.tx_max_frame_size = tx_max_frame_size + self.tx_credits = tx_initial_credits self.tx_buffer = b'' self.state = DLC.State.INIT self.role = multiplexer.role @@ -478,7 +484,7 @@ def __init__( # Compute the MTU max_overhead = 4 + 1 # header with 2-byte length + fcs self.mtu = min( - max_frame_size, self.multiplexer.l2cap_channel.peer_mtu - max_overhead + tx_max_frame_size, self.multiplexer.l2cap_channel.peer_mtu - max_overhead ) @property @@ -645,9 +651,9 @@ def accept(self) -> None: cl=0xE0, priority=7, ack_timer=0, - max_frame_size=self.max_frame_size, + max_frame_size=self.rx_max_frame_size, max_retransmissions=0, - window_size=self.window_size, + initial_credits=self.rx_initial_credits, ) mcc = RFCOMM_Frame.make_mcc(mcc_type=MccType.PN, c_r=0, data=bytes(pn)) logger.debug(f'>>> PN Response: {pn}') @@ -655,8 +661,8 @@ def accept(self) -> None: self.change_state(DLC.State.CONNECTING) def rx_credits_needed(self) -> int: - if self.rx_credits <= self.rx_threshold: - return self.window_size - self.rx_credits + if self.rx_credits <= self.rx_credits_threshold: + return self.rx_max_credits - self.rx_credits return 0 @@ -749,7 +755,7 @@ class State(enum.IntEnum): connection_result: Optional[asyncio.Future] disconnection_result: Optional[asyncio.Future] open_result: Optional[asyncio.Future] - acceptor: Optional[Callable[[int], bool]] + acceptor: Optional[Callable[[int], Optional[Tuple[int, int]]]] dlcs: Dict[int, DLC] def __init__(self, l2cap_channel: l2cap.ClassicChannel, role: Role) -> None: @@ -761,6 +767,8 @@ def __init__(self, l2cap_channel: l2cap.ClassicChannel, role: Role) -> None: self.connection_result = None self.disconnection_result = None self.open_result = None + self.open_pn: Optional[RFCOMM_MCC_PN] = None + self.open_rx_max_credits = 0 self.acceptor = None # Become a sink for the L2CAP channel @@ -869,9 +877,16 @@ def on_mcc_pn(self, c_r: bool, pn: RFCOMM_MCC_PN) -> None: else: if self.acceptor: channel_number = pn.dlci >> 1 - if self.acceptor(channel_number): + if dlc_params := self.acceptor(channel_number): # Create a new DLC - dlc = DLC(self, pn.dlci, pn.max_frame_size, pn.window_size) + dlc = DLC( + self, + dlci=pn.dlci, + tx_max_frame_size=pn.max_frame_size, + tx_initial_credits=pn.initial_credits, + rx_max_frame_size=dlc_params[0], + rx_initial_credits=dlc_params[1], + ) self.dlcs[pn.dlci] = dlc # Re-emit the handshake completion event @@ -889,8 +904,17 @@ def on_mcc_pn(self, c_r: bool, pn: RFCOMM_MCC_PN) -> None: # Response logger.debug(f'>>> PN Response: {pn}') if self.state == Multiplexer.State.OPENING: - dlc = DLC(self, pn.dlci, pn.max_frame_size, pn.window_size) + assert self.open_pn + dlc = DLC( + self, + dlci=pn.dlci, + tx_max_frame_size=pn.max_frame_size, + tx_initial_credits=pn.initial_credits, + rx_max_frame_size=self.open_pn.max_frame_size, + rx_initial_credits=self.open_pn.initial_credits, + ) self.dlcs[pn.dlci] = dlc + self.open_pn = None dlc.connect() else: logger.warning('ignoring PN response') @@ -928,7 +952,7 @@ async def open_dlc( self, channel: int, max_frame_size: int = RFCOMM_DEFAULT_MAX_FRAME_SIZE, - window_size: int = RFCOMM_DEFAULT_WINDOW_SIZE, + initial_credits: int = RFCOMM_DEFAULT_INITIAL_CREDITS, ) -> DLC: if self.state != Multiplexer.State.CONNECTED: if self.state == Multiplexer.State.OPENING: @@ -936,17 +960,19 @@ async def open_dlc( raise InvalidStateError('not connected') - pn = RFCOMM_MCC_PN( + self.open_pn = RFCOMM_MCC_PN( dlci=channel << 1, cl=0xF0, priority=7, ack_timer=0, max_frame_size=max_frame_size, max_retransmissions=0, - window_size=window_size, + initial_credits=initial_credits, ) - mcc = RFCOMM_Frame.make_mcc(mcc_type=MccType.PN, c_r=1, data=bytes(pn)) - logger.debug(f'>>> Sending MCC: {pn}') + mcc = RFCOMM_Frame.make_mcc( + mcc_type=MccType.PN, c_r=1, data=bytes(self.open_pn) + ) + logger.debug(f'>>> Sending MCC: {self.open_pn}') self.open_result = asyncio.get_running_loop().create_future() self.change_state(Multiplexer.State.OPENING) self.send_frame( @@ -1039,15 +1065,13 @@ async def __aexit__(self, *args) -> None: # ----------------------------------------------------------------------------- class Server(EventEmitter): - acceptors: Dict[int, Callable[[DLC], None]] - def __init__( self, device: Device, l2cap_mtu: int = RFCOMM_DEFAULT_L2CAP_MTU ) -> None: super().__init__() self.device = device - self.multiplexer = None - self.acceptors = {} + self.acceptors: Dict[int, Callable[[DLC], None]] = {} + self.dlc_configs: Dict[int, Tuple[int, int]] = {} # Register ourselves with the L2CAP channel manager self.l2cap_server = device.create_l2cap_server( @@ -1055,7 +1079,13 @@ def __init__( handler=self.on_connection, ) - def listen(self, acceptor: Callable[[DLC], None], channel: int = 0) -> int: + def listen( + self, + acceptor: Callable[[DLC], None], + channel: int = 0, + max_frame_size: int = RFCOMM_DEFAULT_MAX_FRAME_SIZE, + initial_credits: int = RFCOMM_DEFAULT_INITIAL_CREDITS, + ) -> int: if channel: if channel in self.acceptors: # Busy @@ -1075,6 +1105,8 @@ def listen(self, acceptor: Callable[[DLC], None], channel: int = 0) -> int: return 0 self.acceptors[channel] = acceptor + self.dlc_configs[channel] = (max_frame_size, initial_credits) + return channel def on_connection(self, l2cap_channel: l2cap.ClassicChannel) -> None: @@ -1092,15 +1124,14 @@ def on_l2cap_channel_open(self, l2cap_channel: l2cap.ClassicChannel) -> None: # Notify self.emit('start', multiplexer) - def accept_dlc(self, channel_number: int) -> bool: - return channel_number in self.acceptors + def accept_dlc(self, channel_number: int) -> Optional[Tuple[int, int]]: + return self.dlc_configs.get(channel_number) def on_dlc(self, dlc: DLC) -> None: logger.debug(f'@@@ new DLC connected: {dlc}') # Let the acceptor know - acceptor = self.acceptors.get(dlc.dlci >> 1) - if acceptor: + if acceptor := self.acceptors.get(dlc.dlci >> 1): acceptor(dlc) def __enter__(self) -> Self: From e1d10bc482cebcfec18661c598d8222807de6f81 Mon Sep 17 00:00:00 2001 From: Gilles Boccon-Gibod Date: Wed, 5 Jun 2024 10:03:27 -0700 Subject: [PATCH 3/3] add rfcomm disconnect test --- tests/rfcomm_test.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/tests/rfcomm_test.py b/tests/rfcomm_test.py index fcd43108..5146c8b4 100644 --- a/tests/rfcomm_test.py +++ b/tests/rfcomm_test.py @@ -62,7 +62,7 @@ def test_frames(): # ----------------------------------------------------------------------------- @pytest.mark.asyncio -async def test_basic_connection() -> None: +async def test_connection_and_disconnection() -> None: devices = test_utils.TwoDevices() await devices.setup_connection() @@ -83,6 +83,11 @@ async def test_basic_connection() -> None: dlcs[1].write(b'Lorem ipsum dolor sit amet') assert await queues[0].get() == b'Lorem ipsum dolor sit amet' + closed = asyncio.Event() + dlcs[1].on('close', closed.set) + await dlcs[1].disconnect() + await closed.wait() + # ----------------------------------------------------------------------------- @pytest.mark.asyncio