-
Notifications
You must be signed in to change notification settings - Fork 0
/
device.py
138 lines (108 loc) · 4.12 KB
/
device.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
"""Device Class to manage Arlo's Camera devices."""
import asyncio
class Device:
"""
Base class for Arlo devices (cameras and base stations).
Attributes:
name (str): Internal name of the device (not necessarily identical to Arlo).
status_interval (int): Interval of status messages from generator (seconds).
"""
def __init__(self, arlo_device, status_interval: int):
"""
Initialize the Device instance.
Args:
arlo_device (ArloDevice): Arlo device object.
status_interval (int): Interval of status messages from generator (seconds).
"""
self._arlo = arlo_device
self.name = self._arlo.name.replace(" ", "_").lower()
self.status_interval = status_interval
self._state_event = asyncio.Event()
self._event_loop = asyncio.get_running_loop()
async def run(self):
"""
Initialize the device, create event channels, and listen for events.
This method performs the following tasks:
- Creates an event channel between pyaarlo callbacks and async generator.
- Adds a callback to the Arlo device for all attributes.
- Starts periodic status trigger.
- Listens for and passes events to the handler.
"""
event_get, event_put = self.create_sync_async_channel()
self._arlo.add_attr_callback("*", event_put)
asyncio.create_task(self.periodic_status_trigger())
async for device, attr, value in event_get:
if device == self._arlo:
asyncio.create_task(self.on_event(attr, value))
async def on_event(self, attr: str, value):
"""
Distribute events to the correct handler.
This method should be overridden by subclasses to handle specific events.
Args:
attr (str): Attribute name.
value: Attribute value.
"""
pass # pylint: disable=unnecessary-pass
async def periodic_status_trigger(self):
"""Periodically trigger status updates."""
while True:
self.state_event.set()
await asyncio.sleep(self.status_interval)
async def listen_status(self):
"""
Async generator that periodically yields status messages for MQTT.
Yields:
tuple: (name, status) where name is the device name and status is the device status.
"""
while True:
await self.state_event.wait()
status = self.get_status()
yield self.name, status
self.state_event.clear()
def get_status(self) -> dict:
"""
Get the device status.
This method should be overridden by subclasses to provide device-specific status.
Returns:
dict: Device status information.
"""
return {}
async def mqtt_control(self, payload: str):
"""
Handle MQTT control messages.
This method should be overridden by subclasses to handle device-specific MQTT controls.
Args:
payload (str): MQTT payload.
"""
pass # pylint: disable=unnecessary-pass
def create_sync_async_channel(self):
"""
Create a synchronous/asynchronous channel for event communication.
Returns:
tuple: (get, put) where get is an async generator that yields queued data,
and put is a function used in synchronous callbacks to put data into the queue.
"""
queue = asyncio.Queue()
def put(*args):
self._event_loop.call_soon_threadsafe(queue.put_nowait, args)
async def get():
while True:
yield await queue.get()
queue.task_done()
return get(), put
@property
def state_event(self):
"""
Get the state event object.
Returns:
asyncio.Event: The state event object.
"""
return self._state_event
@property
def event_loop(self):
"""
Get the event loop object.
Returns:
asyncio.AbstractEventLoop: The event loop object.
"""
return self._event_loop