-
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
16 changed files
with
2,414 additions
and
61 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Large diffs are not rendered by default.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1 +0,0 @@ | ||
__version__ = "0.0.0" | ||
Empty file.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,50 @@ | ||
"""Bluetooth cache for esphome.""" | ||
from __future__ import annotations | ||
|
||
from collections.abc import MutableMapping | ||
from dataclasses import dataclass, field | ||
|
||
from bleak.backends.service import BleakGATTServiceCollection | ||
from lru import LRU # pylint: disable=no-name-in-module | ||
|
||
MAX_CACHED_SERVICES = 128 | ||
|
||
|
||
@dataclass(slots=True) | ||
class ESPHomeBluetoothCache: | ||
"""Shared cache between all ESPHome bluetooth devices.""" | ||
|
||
_gatt_services_cache: MutableMapping[int, BleakGATTServiceCollection] = field( | ||
default_factory=lambda: LRU(MAX_CACHED_SERVICES) | ||
) | ||
_gatt_mtu_cache: MutableMapping[int, int] = field( | ||
default_factory=lambda: LRU(MAX_CACHED_SERVICES) | ||
) | ||
|
||
def get_gatt_services_cache( | ||
self, address: int | ||
) -> BleakGATTServiceCollection | None: | ||
"""Get the BleakGATTServiceCollection for the given address.""" | ||
return self._gatt_services_cache.get(address) | ||
|
||
def set_gatt_services_cache( | ||
self, address: int, services: BleakGATTServiceCollection | ||
) -> None: | ||
"""Set the BleakGATTServiceCollection for the given address.""" | ||
self._gatt_services_cache[address] = services | ||
|
||
def clear_gatt_services_cache(self, address: int) -> None: | ||
"""Clear the BleakGATTServiceCollection for the given address.""" | ||
self._gatt_services_cache.pop(address, None) | ||
|
||
def get_gatt_mtu_cache(self, address: int) -> int | None: | ||
"""Get the mtu cache for the given address.""" | ||
return self._gatt_mtu_cache.get(address) | ||
|
||
def set_gatt_mtu_cache(self, address: int, mtu: int) -> None: | ||
"""Set the mtu cache for the given address.""" | ||
self._gatt_mtu_cache[address] = mtu | ||
|
||
def clear_gatt_mtu_cache(self, address: int) -> None: | ||
"""Clear the mtu cache for the given address.""" | ||
self._gatt_mtu_cache.pop(address, None) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,96 @@ | ||
"""BleakGATTCharacteristicESPHome.""" | ||
from __future__ import annotations | ||
|
||
import contextlib | ||
from uuid import UUID | ||
|
||
from aioesphomeapi.model import BluetoothGATTCharacteristic | ||
from bleak.backends.characteristic import BleakGATTCharacteristic | ||
from bleak.backends.descriptor import BleakGATTDescriptor | ||
|
||
PROPERTY_MASKS = { | ||
2**n: prop | ||
for n, prop in enumerate( | ||
( | ||
"broadcast", | ||
"read", | ||
"write-without-response", | ||
"write", | ||
"notify", | ||
"indicate", | ||
"authenticated-signed-writes", | ||
"extended-properties", | ||
"reliable-writes", | ||
"writable-auxiliaries", | ||
) | ||
) | ||
} | ||
|
||
|
||
class BleakGATTCharacteristicESPHome(BleakGATTCharacteristic): | ||
"""GATT Characteristic implementation for the ESPHome backend.""" | ||
|
||
obj: BluetoothGATTCharacteristic | ||
|
||
def __init__( | ||
self, | ||
obj: BluetoothGATTCharacteristic, | ||
max_write_without_response_size: int, | ||
service_uuid: str, | ||
service_handle: int, | ||
) -> None: | ||
"""Init a BleakGATTCharacteristicESPHome.""" | ||
super().__init__(obj, max_write_without_response_size) | ||
self.__descriptors: list[BleakGATTDescriptor] = [] | ||
self.__service_uuid: str = service_uuid | ||
self.__service_handle: int = service_handle | ||
char_props = self.obj.properties | ||
self.__props: list[str] = [ | ||
prop for mask, prop in PROPERTY_MASKS.items() if char_props & mask | ||
] | ||
|
||
@property | ||
def service_uuid(self) -> str: | ||
"""Uuid of the Service containing this characteristic.""" | ||
return self.__service_uuid | ||
|
||
@property | ||
def service_handle(self) -> int: | ||
"""Integer handle of the Service containing this characteristic.""" | ||
return self.__service_handle | ||
|
||
@property | ||
def handle(self) -> int: | ||
"""Integer handle for this characteristic.""" | ||
return self.obj.handle | ||
|
||
@property | ||
def uuid(self) -> str: | ||
"""Uuid of this characteristic.""" | ||
return self.obj.uuid | ||
|
||
@property | ||
def properties(self) -> list[str]: | ||
"""Properties of this characteristic.""" | ||
return self.__props | ||
|
||
@property | ||
def descriptors(self) -> list[BleakGATTDescriptor]: | ||
"""List of descriptors for this service.""" | ||
return self.__descriptors | ||
|
||
def get_descriptor(self, specifier: int | str | UUID) -> BleakGATTDescriptor | None: | ||
"""Get a descriptor by handle (int) or UUID (str or uuid.UUID).""" | ||
with contextlib.suppress(StopIteration): | ||
if isinstance(specifier, int): | ||
return next(filter(lambda x: x.handle == specifier, self.descriptors)) | ||
return next(filter(lambda x: x.uuid == str(specifier), self.descriptors)) | ||
return None | ||
|
||
def add_descriptor(self, descriptor: BleakGATTDescriptor) -> None: | ||
""" | ||
Add a :py:class:`~BleakGATTDescriptor` to the characteristic. | ||
Should not be used by end user, but rather by `bleak` itself. | ||
""" | ||
self.__descriptors.append(descriptor) |
Oops, something went wrong.