-
Notifications
You must be signed in to change notification settings - Fork 0
/
mqtt.py
147 lines (128 loc) · 5.28 KB
/
mqtt.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
"""MQTT functions for Arlo's Camera devices."""
import base64 as b64
import json
import logging
import asyncio
import time
import aiomqtt
from aiostream import stream
from decouple import config
MQTT_BROKER = config("MQTT_BROKER", cast=str, default="localhost")
MQTT_PORT = config("MQTT_PORT", cast=int, default=1883)
MQTT_USER = config("MQTT_USER", cast=str, default="arlo")
MQTT_PASS = config("MQTT_PASS", cast=str, default="arlo")
MQTT_RECONNECT_INTERVAL = config("MQTT_RECONNECT_INTERVAL", default=5)
MQTT_TOPIC_PICTURE = config("MQTT_TOPIC_PICTURE", default="arlo/picture/{name}")
MQTT_TOPIC_CONTROL = config("MQTT_TOPIC_CONTROL", default="arlo/control/{name}")
MQTT_TOPIC_STATUS = config("MQTT_TOPIC_STATUS", default="arlo/status/{name}")
MQTT_TOPIC_MOTION = config("MQTT_TOPIC_MOTION", default="arlo/motion/{name}")
DEBUG = config("DEBUG", default=False, cast=bool)
# Initialize logging
logging.basicConfig(
level=logging.DEBUG if DEBUG else logging.INFO,
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
)
logger = logging.getLogger(__name__)
async def mqtt_client(cameras: list, bases: list):
"""
Async MQTT client that initiates various generators and readers.
Args:
cameras (list): List of Camera objects.
bases (list): List of Base objects.
"""
while True:
try:
async with aiomqtt.Client(
hostname=MQTT_BROKER, # pyright: ignore [reportArgumentType]
port=MQTT_PORT, # pyright: ignore [reportArgumentType]
username=MQTT_USER, # pyright: ignore [reportArgumentType]
password=MQTT_PASS, # pyright: ignore [reportArgumentType]
) as client:
logger.info("MQTT client connected to %s", MQTT_BROKER)
await asyncio.gather(
mqtt_reader(client, cameras + bases),
device_status(client, cameras + bases),
motion_stream(client, cameras),
pic_streamer(client, cameras),
)
except aiomqtt.MqttError as error:
logger.info('MQTT "%s". reconnecting.', error)
await asyncio.sleep(MQTT_RECONNECT_INTERVAL)
async def pic_streamer(client: aiomqtt.Client, cameras: list):
"""
Merge picture streams from all cameras and publish to MQTT.
Args:
client (aiomqtt.Client): MQTT client instance.
cameras (list): List of Camera objects.
"""
pics = stream.merge(*[c.get_pictures() for c in cameras])
async with pics.stream() as streamer:
async for name, data in streamer:
timestamp = str(time.time()).replace(".", "")
await client.publish(
MQTT_TOPIC_PICTURE.format( # pyright: ignore [reportAttributeAccessIssue]
name=name
),
payload=json.dumps(
{
"filename": f"{timestamp} {name}.jpg",
"payload": b64.b64encode(data).decode("utf-8"),
}
),
)
async def device_status(client: aiomqtt.Client, devices: list):
"""
Merge device status from all devices and publish to MQTT.
Args:
client (aiomqtt.Client): MQTT client instance.
devices (list): List of Device objects (cameras and bases).
"""
statuses = stream.merge(*[d.listen_status() for d in devices])
async with statuses.stream() as streamer:
async for name, status in streamer:
await client.publish(
MQTT_TOPIC_STATUS.format( # pyright: ignore [reportAttributeAccessIssue]
name=name
),
payload=json.dumps(status),
)
async def motion_stream(client: aiomqtt.Client, cameras: list):
"""
Merge motion events from all cameras and publish to MQTT.
Args:
client (aiomqtt.Client): MQTT client instance.
cameras (list): List of Camera objects.
"""
motion_states = stream.merge(*[c.listen_motion() for c in cameras])
async with motion_states.stream() as streamer:
async for name, motion in streamer:
await client.publish(
MQTT_TOPIC_MOTION.format( # pyright: ignore [reportAttributeAccessIssue]
name=name
),
payload=json.dumps(motion),
)
async def mqtt_reader(client: aiomqtt.Client, devices: list):
"""
Subscribe to control topics and pass messages to individual devices.
Args:
client (aiomqtt.Client): MQTT client instance.
devices (list): List of Device objects (cameras and bases).
"""
# fmt: off
devs = {
MQTT_TOPIC_CONTROL.format(name=d.name): d for d in devices # pyright: ignore [reportAttributeAccessIssue]
}
# fmt: on
async with client.messages() as messages:
for name, _ in devs.items():
await client.subscribe(name)
async for message in messages:
if message.topic.value in devs:
asyncio.create_task(
devs[message.topic.value].mqtt_control(
message.payload.decode( # pyright: ignore [reportAttributeAccessIssue,reportOptionalMemberAccess]
"utf-8"
)
)
)