Skip to content

Commit

Permalink
A2 devices have yet another protocol (#179)
Browse files Browse the repository at this point in the history
  • Loading branch information
bdraco authored Nov 25, 2021
1 parent 0271e66 commit 162de83
Show file tree
Hide file tree
Showing 6 changed files with 198 additions and 79 deletions.
4 changes: 2 additions & 2 deletions flux_led/aiodevice.py
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ async def async_set_brightness(self, brightness: int) -> None:
await self.async_set_white_temp(self.color_temp, brightness)
return
if self.color_mode == COLOR_MODE_RGB:
await self.async_set_levels(*self.rgb_unscaled, brightness)
await self.async_set_levels(*self.rgb_unscaled, brightness=brightness)
return
if self.color_mode == COLOR_MODE_RGBW:
await self.async_set_levels(*rgbw_brightness(self.rgbw, brightness))
Expand Down Expand Up @@ -275,7 +275,7 @@ def _async_process_message(self, msg):
return
assert self._updated_callback is not None
prev_state = self.raw_state
if self.addressable and self._protocol.is_valid_addressable_response(msg):
if self._protocol.is_valid_addressable_response(msg):
self.process_addressable_response(msg)
if self._protocol.is_valid_state_response(msg):
self.process_state_response(msg)
Expand Down
88 changes: 47 additions & 41 deletions flux_led/base_device.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,14 +40,12 @@
LevelWriteMode,
)
from .models_db import (
ADDRESSABLE_MODELS,
BASE_MODE_MAP,
CHANNEL_REMAP,
MICROPHONE_MODELS,
MODEL_DESCRIPTIONS,
MODEL_MAP,
MODEL_NUM_PROTOCOL,
ORIGINAL_ADDRESSABLE_MODELS,
RGBW_PROTOCOL_MODELS,
UNKNOWN_MODEL,
USE_9BYTE_PROTOCOL_MODELS,
Expand All @@ -69,18 +67,20 @@
PROTOCOL_LEDENET_8BYTE_DIMMABLE_EFFECTS,
PROTOCOL_LEDENET_9BYTE,
PROTOCOL_LEDENET_9BYTE_DIMMABLE_EFFECTS,
PROTOCOL_LEDENET_ADDRESSABLE,
PROTOCOL_LEDENET_ADDRESSABLE_A1,
PROTOCOL_LEDENET_ADDRESSABLE_A2,
PROTOCOL_LEDENET_ADDRESSABLE_A3,
PROTOCOL_LEDENET_ORIGINAL,
PROTOCOL_LEDENET_ORIGINAL_ADDRESSABLE,
LEDENETOriginalRawState,
LEDENETRawState,
ProtocolLEDENET8Byte,
ProtocolLEDENET8ByteDimmableEffects,
ProtocolLEDENET9Byte,
ProtocolLEDENET9ByteDimmableEffects,
ProtocolLEDENETAddressable,
ProtocolLEDENETAddressableA1,
ProtocolLEDENETAddressableA2,
ProtocolLEDENETAddressableA3,
ProtocolLEDENETOriginal,
ProtocolLEDENETOriginalAddressable,
)
from .timer import BuiltInTimer
from .utils import utils, white_levels_to_color_temp
Expand All @@ -92,11 +92,24 @@
ProtocolLEDENET8ByteDimmableEffects,
ProtocolLEDENET9Byte,
ProtocolLEDENET9ByteDimmableEffects,
ProtocolLEDENETAddressable,
ProtocolLEDENETAddressableA1,
ProtocolLEDENETAddressableA2,
ProtocolLEDENETAddressableA3,
ProtocolLEDENETOriginal,
ProtocolLEDENETOriginalAddressable,
]

ADDRESSABLE_PROTOCOLS = {
PROTOCOL_LEDENET_ADDRESSABLE_A1,
PROTOCOL_LEDENET_ADDRESSABLE_A2,
PROTOCOL_LEDENET_ADDRESSABLE_A3,
}

OLD_EFFECTS_PROTOCOLS = {PROTOCOL_LEDENET_ADDRESSABLE_A1}
NEW_EFFECTS_PROTOCOLS = {
PROTOCOL_LEDENET_ADDRESSABLE_A2,
PROTOCOL_LEDENET_ADDRESSABLE_A3,
}


class DeviceType(Enum):
Bulb = 0
Expand Down Expand Up @@ -124,6 +137,11 @@ def model_num(self) -> int:
assert self.raw_state is not None
return self.raw_state.model_num if self.raw_state else None

@property
def speed_adjust_off(self) -> int:
"""Return if the speed of an effect can be adjusted while off."""
return self.protocol != PROTOCOL_LEDENET_ADDRESSABLE_A1

@property
def model(self) -> str:
"""Return the human readable model description."""
Expand Down Expand Up @@ -152,29 +170,11 @@ def rgbwprotocol(self) -> bool:
"""Devices that don't require a separate rgb/w bit."""
return self.model_num in RGBW_PROTOCOL_MODELS

@property
def addressable(self) -> bool:
"""Devices that have addressable leds."""
return self._is_addressable(self.model_num)

def _is_addressable(self, model_num: int) -> bool:
"""Devices that have addressable leds."""
return model_num in ADDRESSABLE_MODELS

@property
def microphone(self) -> bool:
"""Devices that have a microphone built in."""
return self.model_num in MICROPHONE_MODELS

@property
def original_addressable(self) -> bool:
"""Devices that have addressable leds using the original addressable protocol."""
return self._is_original_addressable(self.model_num)

def _is_original_addressable(self, model_num) -> bool:
"""Devices that have addressable leds using the original addressable protocol."""
return model_num in ORIGINAL_ADDRESSABLE_MODELS

@property
def rgbwcapable(self) -> bool:
"""Devices that actually support rgbw."""
Expand Down Expand Up @@ -306,9 +306,10 @@ def warm_white(self) -> int:
def effect_list(self) -> List[str]:
"""Return the list of available effects."""
effects: Iterable[str] = []
if self.original_addressable:
protocol = self.protocol
if protocol in OLD_EFFECTS_PROTOCOLS:
effects = ORIGINAL_ADDRESSABLE_EFFECT_ID_NAME.values()
elif self.addressable:
elif protocol in NEW_EFFECTS_PROTOCOLS:
effects = ADDRESSABLE_EFFECT_ID_NAME.values()
elif COLOR_MODES_RGB.intersection(self.color_modes):
effects = EFFECT_LIST
Expand All @@ -324,10 +325,11 @@ def effect(self) -> Optional[str]:
if pattern_code == EFFECT_CUSTOM_CODE:
return EFFECT_CUSTOM
mode = self.raw_state.mode
if self.original_addressable:
protocol = self.protocol
if protocol in OLD_EFFECTS_PROTOCOLS:
effect_id = (pattern_code << 8) + mode - 99
return ORIGINAL_ADDRESSABLE_EFFECT_ID_NAME.get(effect_id)
if self.addressable:
if protocol in NEW_EFFECTS_PROTOCOLS:
if pattern_code == 0x25:
return ADDRESSABLE_EFFECT_ID_NAME.get(mode)
if pattern_code == 0x24:
Expand Down Expand Up @@ -397,7 +399,7 @@ def _determineMode(self) -> Optional[str]:
return MODE_PRESET
elif BuiltInTimer.valid(pattern_code):
return BuiltInTimer.valtostr(pattern_code)
elif self.addressable or self.original_addressable:
elif self.protocol in ADDRESSABLE_PROTOCOLS:
return MODE_PRESET
return None

Expand Down Expand Up @@ -652,7 +654,7 @@ def getCCT(self) -> Tuple[int, int]:
@property
def speed(self) -> int:
assert self.raw_state is not None
if self.addressable or self.original_addressable:
if self.protocol in ADDRESSABLE_PROTOCOLS:
return self.raw_state.speed
return utils.delayToSpeed(self.raw_state.speed)

Expand Down Expand Up @@ -766,7 +768,7 @@ def _set_transition_complete_time(self) -> None:
"""
assert self.raw_state is not None
latency = STATE_CHANGE_LATENCY
if self.addressable or self.original_addressable:
if self.protocol in ADDRESSABLE_PROTOCOLS:
latency = ADDRESSABLE_STATE_CHANGE_LATENCY
transition_time = latency + utils.speedToDelay(self.raw_state.speed) / 100
self._transition_complete_time = time.monotonic() + transition_time
Expand Down Expand Up @@ -814,10 +816,12 @@ def setProtocol(self, protocol: str) -> None:
self._protocol = ProtocolLEDENET9Byte()
elif protocol == PROTOCOL_LEDENET_9BYTE_DIMMABLE_EFFECTS:
self._protocol = ProtocolLEDENET9ByteDimmableEffects()
elif protocol == PROTOCOL_LEDENET_ADDRESSABLE:
self._protocol = ProtocolLEDENETAddressable()
elif protocol == PROTOCOL_LEDENET_ORIGINAL_ADDRESSABLE:
self._protocol = ProtocolLEDENETOriginalAddressable()
elif protocol == PROTOCOL_LEDENET_ADDRESSABLE_A3:
self._protocol = ProtocolLEDENETAddressableA3()
elif protocol == PROTOCOL_LEDENET_ADDRESSABLE_A2:
self._protocol = ProtocolLEDENETAddressableA2()
elif protocol == PROTOCOL_LEDENET_ADDRESSABLE_A1:
self._protocol = ProtocolLEDENETAddressableA1()
else:
raise ValueError(f"Invalid protocol: {protocol}")

Expand All @@ -833,10 +837,11 @@ def _generate_preset_pattern(
self, pattern: int, speed: int, brightness: int
) -> bytearray:
"""Generate the preset pattern protocol bytes."""
if self.original_addressable:
protocol = self.protocol
if protocol in OLD_EFFECTS_PROTOCOLS:
if pattern not in ORIGINAL_ADDRESSABLE_EFFECT_ID_NAME:
raise ValueError("Pattern must be between 1 and 300")
elif self.addressable:
elif protocol in NEW_EFFECTS_PROTOCOLS:
if pattern not in ADDRESSABLE_EFFECT_ID_NAME:
raise ValueError("Pattern must be between 1 and 100")
else:
Expand Down Expand Up @@ -868,8 +873,9 @@ def _generate_custom_patterm(

def _effect_to_pattern(self, effect: str) -> int:
"""Convert an effect to a pattern code."""
if self.addressable:
protocol = self.protocol
if protocol in NEW_EFFECTS_PROTOCOLS:
return ADDRESSABLE_EFFECT_NAME_ID[effect]
if self.original_addressable:
if protocol in OLD_EFFECTS_PROTOCOLS:
return ORIGINAL_ADDRESSABLE_EFFECT_NAME_ID[effect]
return PresetPattern.str_to_val(effect)
21 changes: 6 additions & 15 deletions flux_led/models_db.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,10 @@
PROTOCOL_LEDENET_8BYTE_DIMMABLE_EFFECTS,
PROTOCOL_LEDENET_9BYTE,
PROTOCOL_LEDENET_9BYTE_DIMMABLE_EFFECTS,
PROTOCOL_LEDENET_ADDRESSABLE,
PROTOCOL_LEDENET_ADDRESSABLE_A1,
PROTOCOL_LEDENET_ADDRESSABLE_A2,
PROTOCOL_LEDENET_ADDRESSABLE_A3,
PROTOCOL_LEDENET_ORIGINAL,
PROTOCOL_LEDENET_ORIGINAL_ADDRESSABLE,
)

LEDENETModel = namedtuple(
Expand Down Expand Up @@ -484,7 +485,7 @@
models=[],
description="RGB Symphony Original", # https://github.com/Danielhiversen/flux_led/pull/59/files#diff-09e60df19b61f5ceb8d2aef0ade582c2b84979cab660baa27c1ec65725144d76R776
always_writes_white_and_colors=False,
protocol=PROTOCOL_LEDENET_ORIGINAL_ADDRESSABLE,
protocol=PROTOCOL_LEDENET_ADDRESSABLE_A1,
mode_to_color_mode={},
color_modes=COLOR_MODES_ADDRESSABLE,
channel_map={},
Expand All @@ -495,7 +496,7 @@
models=["AK001-ZJ2104"],
description="RGB Symphony 2",
always_writes_white_and_colors=False,
protocol=PROTOCOL_LEDENET_ADDRESSABLE,
protocol=PROTOCOL_LEDENET_ADDRESSABLE_A2,
mode_to_color_mode={},
color_modes=COLOR_MODES_ADDRESSABLE,
channel_map={},
Expand All @@ -506,7 +507,7 @@
models=["K001-ZJ2148"],
description="RGB Symphony 3",
always_writes_white_and_colors=False,
protocol=PROTOCOL_LEDENET_ADDRESSABLE,
protocol=PROTOCOL_LEDENET_ADDRESSABLE_A3,
mode_to_color_mode={},
color_modes=COLOR_MODES_ADDRESSABLE,
channel_map={},
Expand Down Expand Up @@ -550,16 +551,6 @@
MODEL_MAP = {model.model_num: model for model in MODELS}
MODEL_DESCRIPTIONS = {model.model_num: model.description for model in MODELS}
CHANNEL_REMAP = {model.model_num: model.channel_map for model in MODELS}
ORIGINAL_ADDRESSABLE_MODELS = {
model.model_num
for model in MODELS
if model.protocol == PROTOCOL_LEDENET_ORIGINAL_ADDRESSABLE
}
ADDRESSABLE_MODELS = {
model.model_num
for model in MODELS
if model.protocol == PROTOCOL_LEDENET_ADDRESSABLE
}
RGBW_PROTOCOL_MODELS = {
model.model_num for model in MODELS if model.always_writes_white_and_colors
}
Expand Down
72 changes: 62 additions & 10 deletions flux_led/protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,9 @@
PROTOCOL_LEDENET_9BYTE_DIMMABLE_EFFECTS = "LEDENET_DIMMABLE_EFFECTS"
PROTOCOL_LEDENET_8BYTE = "LEDENET_8BYTE" # Previously was called None
PROTOCOL_LEDENET_8BYTE_DIMMABLE_EFFECTS = "LEDENET_8BYTE_DIMMABLE_EFFECTS"
PROTOCOL_LEDENET_ADDRESSABLE = "LEDENET_ADDRESSABLE"
PROTOCOL_LEDENET_ORIGINAL_ADDRESSABLE = "LEDENET_ORIGINAL_ADDRESSABLE"
PROTOCOL_LEDENET_ADDRESSABLE_A1 = "LEDENET_ADDRESSABLE_A1"
PROTOCOL_LEDENET_ADDRESSABLE_A2 = "LEDENET_ADDRESSABLE_A2"
PROTOCOL_LEDENET_ADDRESSABLE_A3 = "LEDENET_ADDRESSABLE_A3"

TRANSITION_BYTES = {
TRANSITION_JUMP: 0x3B,
Expand Down Expand Up @@ -135,6 +136,14 @@ class ProtocolBase:

power_state_response_length = MSG_LENGTHS[MSG_POWER_STATE]

def is_start_of_addressable_response(self, data):
"""Check if a message is the start of an addressable state response."""
return False

def is_valid_addressable_response(self, data):
"""Check if a message is a valid addressable state response."""
return False

def expected_response_length(self, data):
"""Return the number of bytes expected in the response.
Expand Down Expand Up @@ -537,11 +546,11 @@ def construct_preset_pattern(self, pattern, speed, brightness):
return self.construct_message(bytearray([0x38, pattern, delay, brightness]))


class ProtocolLEDENETOriginalAddressable(ProtocolLEDENET9Byte):
class ProtocolLEDENETAddressableA1(ProtocolLEDENET9Byte):
@property
def name(self):
"""The name of the protocol."""
return PROTOCOL_LEDENET_ORIGINAL_ADDRESSABLE
return PROTOCOL_LEDENET_ADDRESSABLE_A1

@property
def dimmable_effects(self):
Expand All @@ -556,7 +565,50 @@ def construct_preset_pattern(self, pattern, speed, brightness):
)


class ProtocolLEDENETAddressable(ProtocolLEDENET9Byte):
class ProtocolLEDENETAddressableA2(ProtocolLEDENET9Byte):
@property
def name(self):
"""The name of the protocol."""
return PROTOCOL_LEDENET_ADDRESSABLE_A2

@property
def dimmable_effects(self):
"""Protocol supports dimmable effects."""
return True

def construct_preset_pattern(self, pattern, speed, brightness):
"""The bytes to send for a preset pattern."""
return self.construct_message(bytearray([0x42, pattern, speed, brightness]))

def construct_levels_change(
self, persist, red, green, blue, warm_white, cool_white, write_mode
):
"""The bytes to send for a level change request.
white 41 01 ff ff ff 00 00 00 60 ff 00 00 9e
"""
preset_number = 0x01 # aka fixed color
return self.construct_message(
bytearray(
[
0x41,
preset_number,
red,
green,
blue,
0x00,
0x00,
0x00,
0x60,
0xFF,
0x00,
0x00,
]
)
)


class ProtocolLEDENETAddressableA3(ProtocolLEDENET9Byte):

ADDRESSABLE_HEADER = [0xB0, 0xB1, 0xB2, 0xB3, 0x00, 0x01, 0x01]
addressable_response_length = MSG_LENGTHS[MSG_ADDRESSABLE_STATE]
Expand All @@ -565,6 +617,11 @@ def __init__(self):
self._counter = 0
super().__init__()

@property
def name(self):
"""The name of the protocol."""
return PROTOCOL_LEDENET_ADDRESSABLE_A3

@property
def dimmable_effects(self):
"""Protocol supports dimmable effects."""
Expand All @@ -582,11 +639,6 @@ def is_valid_addressable_response(self, data):
return False
return self.is_checksum_correct(data)

@property
def name(self):
"""The name of the protocol."""
return PROTOCOL_LEDENET_ADDRESSABLE

def _increment_counter(self):
"""Increment the counter byte."""
self._counter += 1
Expand Down
Loading

0 comments on commit 162de83

Please sign in to comment.