Skip to content

Commit

Permalink
Add support for remote access config (#225)
Browse files Browse the repository at this point in the history
  • Loading branch information
bdraco authored Dec 11, 2021
1 parent 07e4b5d commit 7264b5b
Show file tree
Hide file tree
Showing 7 changed files with 370 additions and 11 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,10 @@ in PyQt, Kivy, or some other framework.
* Setting timers
* Sync clock
* Music Mode for devices with a built-in microphone (asyncio version only)
* Remote access administration (asyncio version only)

##### Some missing pieces:
* Initial administration to set up WiFi SSID and passphrase/key.
* Remote access administration

##### Cool feature:
* Specify colors with names or web hex values. Requires that python "webcolors"
Expand Down
15 changes: 15 additions & 0 deletions examples/disable_remote_access.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
import asyncio
import logging
import pprint

from flux_led.aioscanner import AIOBulbScanner

logging.basicConfig(level=logging.DEBUG)


async def go():
scanner = AIOBulbScanner()
pprint.pprint(await scanner.async_disable_remote_access("192.168.106.198"))


asyncio.run(go())
19 changes: 19 additions & 0 deletions examples/enable_remote_access.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
import asyncio
import logging
import pprint

from flux_led.aioscanner import AIOBulbScanner

logging.basicConfig(level=logging.DEBUG)


async def go():
scanner = AIOBulbScanner()
pprint.pprint(
await scanner.async_enable_remote_access(
"192.168.106.198", "ra8815us02.magichue.net", 8815
)
)


asyncio.run(go())
13 changes: 13 additions & 0 deletions flux_led/aiodevice.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
)

from .aioprotocol import AIOLEDENETProtocol
from .aioscanner import AIOBulbScanner
from .base_device import PROTOCOL_PROBES, LEDENETDevice
from .const import (
COLOR_MODE_CCT,
Expand Down Expand Up @@ -352,6 +353,18 @@ async def async_set_brightness(self, brightness: int) -> None:
await self.async_set_levels(w=brightness)
return

async def async_enable_remote_access(
self, remote_access_host: str, remote_access_port: int
) -> None:
"""Enable remote access."""
await AIOBulbScanner().async_enable_remote_access(
self.ipaddr, remote_access_host, remote_access_port
)

async def async_disable_remote_access(self) -> None:
"""Disable remote access."""
await AIOBulbScanner().async_disable_remote_access(self.ipaddr)

async def _async_connect(self) -> None:
"""Create connection."""
_, self._aio_protocol = await asyncio.wait_for(
Expand Down
63 changes: 63 additions & 0 deletions flux_led/aioscanner.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,3 +97,66 @@ def _on_response(data: bytes, addr: Tuple[str, int]) -> None:
transport.close()

return self.found_bulbs

async def async_disable_remote_access(self, address: str, timeout: int = 5) -> None:
"""Disable remote access."""
await self._send_command_and_reboot(
self.send_disable_remote_access_message, address, timeout
)

async def async_enable_remote_access(
self,
address: str,
remote_access_host: str,
remote_access_port: int,
timeout: int = 5,
) -> None:
"""Enable remote access."""

def _enable_remote_access_message(
sender: asyncio.DatagramTransport, destination: Tuple[str, int]
) -> None:
self.send_enable_remote_access_message(
sender, destination, remote_access_host, remote_access_port
)

await self._send_command_and_reboot(
_enable_remote_access_message, address, timeout
)

async def _send_command_and_reboot(
self,
msg_sender: Callable[[asyncio.DatagramTransport, Tuple[str, int]], None],
address: str,
timeout: int = 5,
) -> None:
"""Send a command and reboot."""
sock = self._create_socket()
destination = self._destination_from_address(address)
response1 = asyncio.Event()
response2 = asyncio.Event()

def _on_response(data: bytes, addr: Tuple[str, int]) -> None:
_LOGGER.debug("udp: %s <= %s", addr, data)
if data.startswith(b"+ok"):
if response1.is_set():
response2.set()
else:
response1.set()

transport_proto = await self.loop.create_datagram_endpoint(
lambda: LEDENETDiscovery(
destination=destination,
on_response=_on_response,
),
sock=sock,
)
transport = cast(asyncio.DatagramTransport, transport_proto[0])
try:
self.send_start_message(transport, destination)
msg_sender(transport, destination)
await asyncio.wait_for(response1.wait(), timeout=timeout)
self.send_reboot_message(transport, destination)
await asyncio.wait_for(response2.wait(), timeout=timeout)
finally:
transport.close()
102 changes: 92 additions & 10 deletions flux_led/scanner.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,18 @@ class FluxLEDDiscovery(TypedDict):
firmware_date: Optional[date]
model_info: Optional[str] # contains if IR (and maybe BL) if the device supports IR
model_description: Optional[str]
remote_access_enabled: Optional[bool]
remote_access_host: Optional[str] # the remote access host
remote_access_port: Optional[int] # the remote access port


def create_udp_socket() -> socket.socket:
"""Create a udp socket used for communicating with the device."""
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
sock.bind(("", 0))
sock.setblocking(False)
return sock


def merge_discoveries(target: FluxLEDDiscovery, source: FluxLEDDiscovery) -> None:
Expand Down Expand Up @@ -87,13 +99,39 @@ def _process_version_message(data: FluxLEDDiscovery, decoded_data: str) -> None:
data["model_info"] = data_split[3]


def _process_remote_access_message(data: FluxLEDDiscovery, decoded_data: str) -> None:
"""Process response from b'AT+SOCKB\r'
b'+ok=TCP,8816,ra8816us02.magichue.net\r'
"""
data_split = decoded_data.replace("\r", "").split(",")
if len(data_split) < 3:
if not data.get("remote_access_enabled"):
data["remote_access_enabled"] = False
return
try:
data.update(
{
"remote_access_enabled": True,
"remote_access_port": int(data_split[1]),
"remote_access_host": data_split[2],
}
)
except ValueError:
return


class BulbScanner:

DISCOVERY_PORT = 48899
BROADCAST_FREQUENCY = 6 # At least 6 for 0xA1 models
RESPONSE_SIZE = 64
DISCOVER_MESSAGE = b"HF-A11ASSISTHREAD"
VERSION_MESSAGE = b"AT+LVER\r"
REMOTE_ACCESS_MESSAGE = b"AT+SOCKB\r"
DISABLE_REMOTE_ACCESS_MESSAGE = b"AT+SOCKB=NONE\r"
REBOOT_MESSAGE = b"AT+Z\r"
ALL_MESSAGES = {DISCOVER_MESSAGE, VERSION_MESSAGE, REMOTE_ACCESS_MESSAGE}
BROADCAST_ADDRESS = "<broadcast>"

def __init__(self) -> None:
Expand All @@ -114,11 +152,7 @@ def getBulbInfo(self) -> List[FluxLEDDiscovery]:
return self.found_bulbs

def _create_socket(self) -> socket.socket:
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
sock.bind(("", 0))
sock.setblocking(False)
return sock
return create_udp_socket()

def _destination_from_address(self, address: Optional[str]) -> Tuple[str, int]:
if address is None:
Expand All @@ -138,7 +172,7 @@ def _process_response(
"""
if data is None:
return False
if data in (self.DISCOVER_MESSAGE, self.VERSION_MESSAGE):
if data in self.ALL_MESSAGES:
return False
decoded_data = data.decode("ascii")
self._process_data(from_address, decoded_data, response_list)
Expand All @@ -165,22 +199,70 @@ def _process_data(
firmware_date=None,
model_info=None,
model_description=None,
remote_access_enabled=None,
remote_access_host=None,
remote_access_port=None,
),
)
if (
decoded_data.startswith("+ok=T")
or decoded_data == "+ok="
or decoded_data == "+ok=\r"
):
_process_remote_access_message(data, decoded_data)
if decoded_data.startswith("+ok="):
_process_version_message(data, decoded_data)
elif "," in decoded_data:
_process_discovery_message(data, decoded_data)

def send_start_message(
self,
sender: Union[socket.socket, asyncio.DatagramTransport],
destination: Tuple[str, int],
) -> None:
self._send_message(sender, destination, self.DISCOVER_MESSAGE)

def send_enable_remote_access_message(
self,
sender: Union[socket.socket, asyncio.DatagramTransport],
destination: Tuple[str, int],
remote_access_host: str,
remote_access_port: int,
) -> None:
enable_message = f"AT+SOCKB=TCP,{remote_access_port},{remote_access_host}\r"
self._send_message(sender, destination, enable_message.encode())

def send_disable_remote_access_message(
self,
sender: Union[socket.socket, asyncio.DatagramTransport],
destination: Tuple[str, int],
) -> None:
self._send_message(sender, destination, self.DISABLE_REMOTE_ACCESS_MESSAGE)

def send_reboot_message(
self,
sender: Union[socket.socket, asyncio.DatagramTransport],
destination: Tuple[str, int],
) -> None:
self._send_message(sender, destination, self.REBOOT_MESSAGE)

def _send_message(
self,
sender: Union[socket.socket, asyncio.DatagramTransport],
destination: Tuple[str, int],
message: bytes,
) -> None:
_LOGGER.debug("udp: %s => %s", destination, message)
sender.sendto(message, destination)

def send_discovery_messages(
self,
sender: Union[socket.socket, asyncio.DatagramTransport],
destination: Tuple[str, int],
) -> None:
_LOGGER.debug("discover: %s => %s", destination, self.DISCOVER_MESSAGE)
sender.sendto(self.DISCOVER_MESSAGE, destination)
_LOGGER.debug("discover: %s => %s", destination, self.VERSION_MESSAGE)
sender.sendto(self.VERSION_MESSAGE, destination)
self.send_start_message(sender, destination)
self._send_message(sender, destination, self.VERSION_MESSAGE)
self._send_message(sender, destination, self.REMOTE_ACCESS_MESSAGE)

def scan(
self, timeout: int = 10, address: Optional[str] = None
Expand Down
Loading

0 comments on commit 7264b5b

Please sign in to comment.