-
Notifications
You must be signed in to change notification settings - Fork 0
/
camera.py
378 lines (324 loc) · 12 KB
/
camera.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
"""Camera Class to manage Arlo's Camera devices."""
import subprocess # nosec B404
import logging
import asyncio
import shlex
import os
from decouple import config
from device import Device
DEBUG = config("DEBUG", default=False, cast=bool)
# Initialize logging
logging.basicConfig(
level=logging.DEBUG if DEBUG else logging.INFO,
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
)
logger = logging.getLogger(__name__)
class Camera(Device):
"""
Class representing an Arlo camera device.
Attributes:
name (str): Internal name of the camera (not necessarily identical to Arlo).
ffmpeg_out (str): FFmpeg output string.
timeout (int): Motion timeout of live stream (seconds).
status_interval (int): Interval of status messages from generator (seconds).
stream (asyncio.subprocess.Process): Current FFmpeg stream (idle or active).
"""
# pylint: disable=too-many-instance-attributes
# pylint: disable=too-many-public-methods
# Possible states
STATES = ["idle", "streaming"]
def __init__(
self, arlo_camera, ffmpeg_out: str, motion_timeout: int, status_interval: int
):
"""
Initialize the Camera instance.
Args:
arlo_camera (ArloCamera): Arlo camera object.
ffmpeg_out (str): FFmpeg output string.
motion_timeout (int): Motion timeout of live stream (seconds).
status_interval (int): Interval of status messages from generator (seconds).
"""
super().__init__(arlo_camera, status_interval)
self.name = arlo_camera.name.replace(" ", "_").lower()
self.ffmpeg_out = shlex.split(ffmpeg_out.format(name=self.name))
self.timeout = motion_timeout
self._timeout_task = None
self.motion = False
self._state = None
self._motion_event = asyncio.Event()
self.stream = None
self.proxy_stream = None
self.proxy_reader, self.proxy_writer = os.pipe()
self._pictures = asyncio.Queue()
self._listen_pictures = False
logger.info("Camera added: %s", self.name)
async def run(self):
"""
Start the camera, wait for it to become available, create event channels,
and listen for events.
"""
while self._arlo.is_unavailable:
await asyncio.sleep(5)
await self.set_state("idle")
asyncio.create_task(self.start_proxy_stream())
await super().run()
async def on_event(self, attr: str, value):
"""
Distribute events to the correct handler.
Args:
attr (str): Attribute name.
value: Attribute value.
"""
match attr:
case "motionDetected":
await self.on_motion(value)
case "activityState":
await self.on_arlo_state(value)
case "presignedLastImageData":
if self._listen_pictures:
self.put_picture(value)
case _:
pass
async def on_motion(self, motion: bool):
"""
Handle motion events. Either start live stream or reset live stream timeout.
Args:
motion (bool): Motion detected status.
"""
self.motion = motion
self.motion_event.set()
logger.info("%s motion: %s", self.name, motion)
if motion:
await self.set_state("streaming")
else:
if self._timeout_task:
self._timeout_task.cancel()
if not motion:
self._timeout_task = asyncio.create_task(self.stream_timeout())
async def on_arlo_state(self, state: str):
"""
Handle pyaarlo state change, either request stream or handle running stream.
Args:
state (str): Arlo state.
"""
if state == "idle":
if self.get_state() == "streaming":
await self.start_stream()
elif state == "userStreamActive" and self.get_state() != "streaming":
await self.set_state("streaming")
async def set_state(self, new_state: str):
"""
Set the local state when pyaarlo state changes.
Call the _on_state_change function if the state has changed.
Args:
new_state (str): New state.
"""
if new_state in self.STATES and new_state != self._state:
self._state = new_state
logger.info("%s state: %s", self.name, new_state)
await self.on_state_change(new_state)
def get_state(self):
"""
Get the current state.
Returns:
str: Current state.
"""
return self._state
async def on_state_change(self, new_state: str):
"""
Handle internal state change, stop or start stream.
Args:
new_state (str): New state.
"""
self.state_event.set()
match new_state:
case "idle":
self.stop_stream()
asyncio.create_task(self.start_idle_stream())
case "streaming":
await self.start_stream()
async def start_proxy_stream(self):
"""Start the proxy stream (continuous video stream from FFmpeg)."""
exit_code = 1
while exit_code > 0:
self.proxy_stream = await asyncio.create_subprocess_exec(
*(["ffmpeg", "-i", "pipe:"] + self.ffmpeg_out),
stdin=self.proxy_reader,
stdout=subprocess.DEVNULL,
stderr=subprocess.PIPE if DEBUG else subprocess.DEVNULL,
)
if DEBUG:
asyncio.create_task(self.log_stderr(self.proxy_stream, "proxy_stream"))
exit_code = await self.proxy_stream.wait()
if exit_code > 0:
logger.warning(
"Proxy stream for %s exited unexpectedly with code %s. Restarting...",
self.name,
exit_code,
)
await asyncio.sleep(3)
async def start_idle_stream(self):
"""Start the idle picture stream, writing to the proxy stream."""
exit_code = 1
while exit_code > 0:
# fmt: off
self.stream = await asyncio.create_subprocess_exec(
*[
"ffmpeg", "-re", "-stream_loop", "-1",
"-i", "idle.mp4",
"-c:v", "copy",
"-c:a", "libmp3lame", "-ar", "44100", "-b:a", "8k",
"-bsf", "dump_extra", "-f", "mpegts", "pipe:",
],
stdin=subprocess.DEVNULL,
stdout=self.proxy_writer,
stderr=subprocess.PIPE if DEBUG else subprocess.DEVNULL,
)
# fmt: on
if DEBUG:
asyncio.create_task(self.log_stderr(self.stream, "idle_stream"))
exit_code = await self.stream.wait()
if exit_code > 0:
logger.warning(
"Idle stream for %s exited unexpectedly with code %s. Restarting...",
self.name,
exit_code,
)
await asyncio.sleep(3)
async def start_stream(self):
"""
Request stream, grab it, kill idle stream, and start a new FFmpeg instance
writing to the proxy stream.
"""
stream = await self.event_loop.run_in_executor(None, self._arlo.get_stream)
if stream:
self.stop_stream()
# fmt: off
self.stream = await asyncio.create_subprocess_exec(
*[
"ffmpeg", "-i", stream,
"-c:v", "copy",
"-c:a", "libmp3lame", "-ar", "44100",
"-bsf", "dump_extra", "-f", "mpegts", "pipe:",
],
stdin=subprocess.DEVNULL,
stdout=self.proxy_writer,
stderr=subprocess.PIPE if DEBUG else subprocess.DEVNULL,
)
# fmt: on
if DEBUG:
asyncio.create_task(self.log_stderr(self.stream, "live_stream"))
async def stream_timeout(self):
"""Timeout the live stream after the specified duration."""
await asyncio.sleep(self.timeout)
await self.set_state("idle")
def stop_stream(self):
"""Stop the live or idle stream (not the proxy stream)."""
if self.stream:
try:
self.stream.kill()
except ProcessLookupError:
pass
async def get_pictures(self):
"""
Async generator that yields snapshots from pyaarlo.
Yields:
tuple: (name, data) where name is the camera name and data is the picture data.
"""
self._listen_pictures = True
while True:
data = await self._pictures.get()
yield self.name, data
self._pictures.task_done()
def put_picture(self, pic):
"""
Put a picture into the queue.
Args:
pic: Picture data.
"""
try:
self._pictures.put_nowait(pic)
except asyncio.QueueFull:
logger.info("picture queue full, ignoring")
def get_status(self) -> dict:
"""
Get the camera status information.
Returns:
dict: Camera status information.
"""
return {"battery": self._arlo.battery_level, "state": self.get_state()}
async def listen_motion(self):
"""
Async generator that yields motion state on change.
Yields:
tuple: (name, motion) where name is the camera name and motion is the motion state.
"""
while True:
await self.motion_event.wait()
yield self.name, self.motion
self.motion_event.clear()
async def mqtt_control(self, payload: str):
"""
Handle incoming MQTT commands.
Args:
payload (str): MQTT payload.
"""
match payload.upper():
case "START":
await self.set_state("streaming")
case "STOP":
await self.set_state("idle")
case "SNAPSHOT":
await self.event_loop.run_in_executor(None, self._arlo.request_snapshot)
async def log_stderr(self, stream, label: str):
"""
Continuously read from stderr and log the output.
Args:
stream: Stream to read from.
label (str): Label for logging.
"""
while True:
try:
line = await stream.stderr.readline()
if line:
logger.debug("%s - %s: %s", self.name, label, line.decode().strip())
else:
break
except ValueError:
pass
async def shutdown_when_idle(self):
"""Shutdown the camera when it becomes idle."""
if self.get_state() != "idle":
logger.info("%s active, waiting...", self.name)
while self.get_state() != "idle":
await asyncio.sleep(1)
self.shutdown()
def shutdown(self):
"""Immediate shutdown of the camera."""
logger.info("Shutting down %s", self.name)
for stream in [self.stream, self.proxy_stream]:
if stream: # Check if stream exists
try:
stream.terminate()
except ProcessLookupError:
# Handle the specific case where the process is gone
logger.debug("Process for %s already terminated.", self.name)
except AttributeError:
# Handle the case when stream is None
logger.debug("Stream for %s is not initialized.", self.name)
@property
def state_event(self):
"""
Get the state event object.
Returns:
asyncio.Event: The state event object.
"""
return self._state_event
@property
def motion_event(self):
"""
Get the motion event object.
Returns:
asyncio.Event: The motion event object.
"""
return self._motion_event