Skip to content

Commit

Permalink
basic support for hello fairy
Browse files Browse the repository at this point in the history
  • Loading branch information
jr4 committed Dec 16, 2024
1 parent 51facab commit 73c4d72
Show file tree
Hide file tree
Showing 5 changed files with 228 additions and 27 deletions.
19 changes: 17 additions & 2 deletions examples/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,7 @@

_LOGGER = logging.getLogger(__name__)

ADDRESS = "D0291B39-3A1B-7FF2-787B-4E743FED5B25"
ADDRESS = "D0291B39-3A1B-7FF2-787B-4E743FED5B25"
ADDRESS = "BE:27:E1:00:10:63" # Hello Fairy-1063PPPP


async def run() -> None:
Expand All @@ -34,20 +33,36 @@ def on_state_changed(state: LEDBLEState) -> None:
device = await future
led = LEDBLE(device)
cancel_callback = led.register_callback(on_state_changed)
_LOGGER.info("update...")
await led.update()
_LOGGER.info("turn_on...")
await led.turn_on()
_LOGGER.info("set_rgb(red)...")
await led.set_rgb((255, 0, 0), 255)
await asyncio.sleep(1)
_LOGGER.info("set_rgb(green)...")
await led.set_rgb((0, 255, 0), 128)
await asyncio.sleep(1)
_LOGGER.info("set_rgb(blue)...")
await led.set_rgb((0, 0, 255), 255)
await asyncio.sleep(1)
_LOGGER.info("set_rgbw(white)...")
await led.set_rgbw((255, 255, 255, 128), 255)
await asyncio.sleep(1)
_LOGGER.info("set_preset_pattern(1)...")
await led.async_set_preset_pattern(1, 100, 100)
await asyncio.sleep(2)
_LOGGER.info("set_preset_pattern(59)...")
await led.async_set_preset_pattern(59, 100, 100)
await asyncio.sleep(2)
_LOGGER.info("turn_off...")
await led.turn_off()
_LOGGER.info("update...")
await led.update()
_LOGGER.info("finish...")
cancel_callback()
await scanner.stop()
_LOGGER.info("done")


logging.basicConfig(level=logging.INFO)
Expand Down
9 changes: 7 additions & 2 deletions src/led_ble/const.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,16 @@ class CharacteristicMissingError(Exception):
"""Raised when a characteristic is missing."""


HELLO_FAIRY_WRITE_CHARACTERISTIC = "49535343-8841-43f4-a8d4-ecbe34729bb3"
HELLO_FAIRY_READ_CHARACTERISTIC = "49535343-1e4d-4bd9-ba61-23c647249616"

POSSIBLE_WRITE_CHARACTERISTIC_UUIDS = [
BASE_UUID_FORMAT.format(part) for part in ["ff01", "ffd5", "ffd9", "ffe5", "ffe9"]
]
] + [HELLO_FAIRY_WRITE_CHARACTERISTIC]

POSSIBLE_READ_CHARACTERISTIC_UUIDS = [
BASE_UUID_FORMAT.format(part) for part in ["ff02", "ffd0", "ffd4", "ffe0", "ffe4"]
]
] + [HELLO_FAIRY_READ_CHARACTERISTIC]


QUERY_STATE_BYTES = bytearray([0xEF, 0x01, 0x77])
99 changes: 76 additions & 23 deletions src/led_ble/led_ble.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,10 @@
from flux_led.utils import rgbw_brightness

from led_ble.model_db import LEDBLEModel
from led_ble.protocol import ProtocolFairy

from .const import (
HELLO_FAIRY_READ_CHARACTERISTIC,
POSSIBLE_READ_CHARACTERISTIC_UUIDS,
POSSIBLE_WRITE_CHARACTERISTIC_UUIDS,
STATE_COMMAND,
Expand Down Expand Up @@ -279,10 +281,14 @@ def _generate_preset_pattern(
brightness = int(brightness * 255 / 100)
speed = int(speed * 255 / 100)
return bytearray([0x9E, 0x00, pattern, speed, brightness, 0x00, 0xE9])
PresetPattern.valid_or_raise(pattern)
if not self._is_hello_fairy():
PresetPattern.valid_or_raise(pattern)
if not (1 <= brightness <= 100):
raise ValueError("Brightness must be between 1 and 100")
assert self._protocol is not None # nosec
if self._is_hello_fairy() and pattern > 58:
rgb = [[255, 0, 0], [0, 255, 0], [0, 0, 255]] * 8 + [[255, 0, 0]]
return self._protocol.construct_custom_effect(rgb, speed, "")
return self._protocol.construct_preset_pattern(pattern, speed, brightness)

async def async_set_preset_pattern(
Expand Down Expand Up @@ -431,29 +437,64 @@ def _named_effect(self) -> str | None:
"""Returns the named effect."""
return EFFECT_ID_NAME.get(self.preset_pattern_num)

# ideally replace with classes to encapsulate the differences between device makes
def _is_hello_fairy(self) -> bool:
if self._read_char is None:
return False
d = self._read_char.descriptors
c = d[0].characteristic_uuid if (len(d) > 0) else None
return c == HELLO_FAIRY_READ_CHARACTERISTIC

def _notification_handler(self, _sender: int, data: bytearray) -> None:
"""Handle notification responses."""
_LOGGER.debug("%s: Notification received: %s", self.name, data.hex())

if len(data) == 4 and data[0] == 0xCC:
on = data[1] == 0x23
self._state = replace(self._state, power=on)
return
if len(data) < 11:
return
model_num = data[1]
on = data[2] == 0x23
preset_pattern = data[3]
mode = data[4]
speed = data[5]
r = data[6]
g = data[7]
b = data[8]
w = data[9]
version = data[10]
self._state = LEDBLEState(
on, (r, g, b), w, model_num, preset_pattern, mode, speed, version
)
model_num = 0
if self._is_hello_fairy():
if data[0] == 0xAA:
if data[1] == 0x00: # hw info
if len(data) > 7:
version_string = data[3:8].decode("ascii")
_LOGGER.debug("version %s", version_string)
self._state = replace(
self._state,
version_num=(data[3] - 48) * 100
+ (data[5] - 48) * 10
+ (data[7] - 48),
)
if len(data) > 12:
model = data[8:13].decode("ascii")
_LOGGER.debug("model %s", model)
if len(data) > 24:
lights = data[24] # guessing
_LOGGER.debug("lights %d", lights)
if len(data) > 33:
effects = data[33] # guessing
_LOGGER.debug("effects %d", effects)

if data[1] == 0x01: # state info
if len(data) > 6:
self._state = replace(self._state, power=data[6] > 0)
else:
if len(data) == 4 and data[0] == 0xCC:
on = data[1] == 0x23
self._state = replace(self._state, power=on)
return
if len(data) < 11:
return
model_num = data[1]
on = data[2] == 0x23
preset_pattern = data[3]
mode = data[4]
speed = data[5]
r = data[6]
g = data[7]
b = data[8]
w = data[9]
version = data[10]
self._state = LEDBLEState(
on, (r, g, b), w, model_num, preset_pattern, mode, speed, version
)

_LOGGER.debug(
"%s: Notification received; RSSI: %s: %s %s",
Expand All @@ -466,8 +507,10 @@ def _notification_handler(self, _sender: int, data: bytearray) -> None:
if not self._resolve_protocol_event.is_set():
self._resolve_protocol_event.set()
self._model_data = get_model(model_num)
self._set_protocol(self._model_data.protocol_for_version_num(version))

if self._is_hello_fairy():
self._protocol = ProtocolFairy()
else:
self._set_protocol(self._model_data.protocol_for_version_num(version))
self._fire_callbacks()

def _reset_disconnect_timer(self) -> None:
Expand Down Expand Up @@ -622,13 +665,23 @@ def _resolve_characteristics(self, services: BleakGATTServiceCollection) -> bool
if char := services.get_characteristic(characteristic):
self._write_char = char
break
_LOGGER.debug(
"using characteristic %s for read, characteristic %s for write",
self._read_char,
self._write_char,
)
return bool(self._read_char and self._write_char)

async def _resolve_protocol(self) -> None:
"""Resolve protocol."""
if self._resolve_protocol_event.is_set():
return
await self._send_command_while_connected([STATE_COMMAND])
if self._is_hello_fairy():
await self._send_command_while_connected(
[b"\xaa\x00\x00\xaa"]
) # get version and capabilities
else:
await self._send_command_while_connected([STATE_COMMAND])
async with asyncio_timeout(10):
await self._resolve_protocol_event.wait()

Expand Down
9 changes: 9 additions & 0 deletions src/led_ble/model_db.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,15 @@ def protocol_for_version_num(self, version_num: int) -> str:


MODELS = [
LEDBLEModel(
model_num=0x00,
models=["Hello Fairy:BMSL6"],
description="Controller RGB",
protocols=[
MinVersionProtocol(0, "Fairy"),
],
color_modes=COLOR_MODES_RGB_W, # Formerly rgbwcapable
),
LEDBLEModel(
model_num=0x04,
models=["Triones:C10511000166"],
Expand Down
119 changes: 119 additions & 0 deletions src/led_ble/protocol.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
import colorsys
from math import floor
from flux_led.protocol import ProtocolBase
from led_ble.led_ble import LevelWriteMode


class ProtocolFairy(ProtocolBase):
"""Protocol for Hello Fairy devices."""

@property
def name(self) -> str:
"""The name of the protocol."""
return "Fairy"

def construct_state_query(self) -> bytearray:
"""The bytes to send for a query request."""
return self.construct_message(bytearray([0xAA, 0x01, 0x00]))

def construct_state_change(self, turn_on: int) -> bytearray:
"""The bytes to send for a state change request."""
return self.construct_message(
bytearray([0xAA, 0x02, 0x01, 1 if turn_on else 0])
)

def construct_message(self, raw_bytes: bytearray) -> bytearray:
"""Calculate checksum of byte array and add to end."""
csum = sum(raw_bytes) & 0xFF
raw_bytes.append(csum)
return raw_bytes

def construct_levels_change(
self,
persist: int,
red: int | None,
green: int | None,
blue: int | None,
warm_white: int | None,
cool_white: int | None,
write_mode: LevelWriteMode,
) -> list[bytearray]:
"""The bytes to send for a level change request."""
h, s, v = colorsys.rgb_to_hsv(
(red or 0) / 255, (green or 0) / 255, (blue or 0) / 255
)
h_scaled = min(359, floor(h * 360))
s_scaled = round(s * 1000)
v_scaled = round(v * 1000)
return [
self.construct_message(
bytearray(
[
0xAA,
0x03,
0x07,
0x01,
h_scaled >> 8,
h_scaled & 0xFF,
s_scaled >> 8,
s_scaled & 0xFF,
v_scaled >> 8,
v_scaled & 0xFF,
]
)
)
]

def construct_preset_pattern(
self, pattern: int, speed: int, brightness: int
) -> list[bytearray]:
"""The bytes to send for a preset pattern."""
return [
self.construct_message(
bytearray(
[
0xAA,
0x03,
0x04,
0x02,
pattern & 0xFF,
(brightness >> 8) & 0xFF,
brightness & 0xFF,
]
)
),
self.construct_message(bytearray([0xAA, 0x0C, 0x01, min(speed, 100)])),
]

def construct_custom_effect(
self, rgb_list: list[tuple[int, int, int]], speed: int, transition_type: str
) -> list[bytearray]:
"""The bytes to send for a custom effect."""
data_bytes = len(rgb_list) * 3 + 1
hue_message = bytearray(data_bytes + 3)
hue_message[0:4] = [0xAA, 0xDA, data_bytes, 0x01]
for [i, [r, g, b]] in enumerate(rgb_list):
h, s, v = colorsys.rgb_to_hsv(r / 255, g / 255, b / 255)
if v < 0.25:
h = 0xFE # black
elif s < 0.25:
h = 0xFF # white
else:
h = floor(h * 0xAF)
# necessary to satisfy both flake and ruff:
a = i * 3 + 4
b = a + 3
hue_message[a:b] = [i >> 8, i & 0xFF, h]
return [
*self.construct_motion(speed, 0),
self.construct_message(hue_message),
*self.construct_motion(speed, 2),
]

def construct_motion(self, speed: int, transition: int) -> list[bytearray]:
"""The bytes to send for motion speed and transition."""
return [
self.construct_message(
bytearray([0xAA, 0xD0, 0x04, transition, 0x64, speed, 0x01])
)
]

0 comments on commit 73c4d72

Please sign in to comment.