Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add basic mqtt support #2412

Open
wants to merge 10 commits into
base: future3/develop
Choose a base branch
from
6,622 changes: 3,485 additions & 3,137 deletions documentation/developers/docstring/README.md

Large diffs are not rendered by default.

3 changes: 3 additions & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -37,3 +37,6 @@ mock

# API docs generation
pydoc-markdown

# MQTT
paho-mqtt
232 changes: 232 additions & 0 deletions src/jukebox/components/mqtt/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,232 @@
import json
import logging
import threading
from typing import Any

import paho.mqtt.client as paho_mqtt

import jukebox.cfghandler
import jukebox.plugs as plugs
import jukebox.publishing
import jukebox.publishing.server
import jukebox.publishing.subscriber

from .mqtt_command_alias import legacy_mqtt_cmd, mqtt_cmd
from .mqtt_const import Mqtt_Attributes, topics_to_send
from .utils import (
get_args,
get_current_time_milli,
get_kwargs,
get_rpc_command,
map_repeat_mode,
split_topic,
)

logger = logging.getLogger("jb.mqtt")
cfg = jukebox.cfghandler.get_handler("jukebox")

base_topic = cfg.setndefault("mqtt", "base_topic", value="phoniebox-dev")
legacy_support_enabled = cfg.setndefault("mqtt", "enable_legacy", value=True)


class MQTT(threading.Thread):
"""A thread for monitoring events and publishing interesting events via MQTT."""

_topic_name: str
_mqtt_client: paho_mqtt.Client
_attributes: dict = {}
_available_cmds = mqtt_cmd

def __init__(self, client: paho_mqtt.Client):
super().__init__(name="MqttClient")
self._mqtt_client = client
if legacy_support_enabled:
logger.info("Supporting legacy MQTT commands.")
self._available_cmds = {**mqtt_cmd, **legacy_mqtt_cmd}

self.daemon = True
self._keep_running = True
self.listen_done = threading.Event()
self.action_done = threading.Event()

def _subscribe(self):
logger.debug("Subscribing to MQTT topics.")
self._mqtt_client.message_callback_add("phoniebox-dev/cmd/#", self._on_cmd)

def _on_cmd(self, client, userdata, msg):
cmd = split_topic(topic=msg.topic)
payload = msg.payload.decode("utf-8")
logger.debug(f'Received MQTT command "{cmd}" with payload "{payload}"')
try:
config = self._available_cmds.get(cmd)
if not config:
logger.warning(f'No configuration found for MQTT command "{cmd}"')
return

rpc = get_rpc_command(config)
args = get_args(config, payload)
kwargs = get_kwargs(config, payload)

if rpc is None:
logger.warning(f'No RPC call configured for MQTT command "{cmd}"')
return

package = rpc.get("package")
plugin = rpc.get("plugin")
method = rpc.get("method")

if package is None:
raise ValueError(
f'Missing "package" attribute for MQTT command "{cmd}"'
)
elif plugin is None:
raise ValueError(f'Missing "plugin" attribute for MQTT command "{cmd}"')
elif method is None:
raise ValueError(f'Missing "method" attribute for MQTT command "{cmd}"')
else:
logger.info(
f'Executing MQTT command "{cmd}" with package="{package}",'
+ f'plugin="{plugin}", method="{method}", args={args}, kwargs={kwargs}'
)
plugs.call_ignore_errors(
package=package,
plugin=plugin,
method=method,
args=args,
kwargs=kwargs,
)
except Exception as e:
logger.error(
f"Ignoring failed call for MQTT command '{cmd}': {e}", exc_info=True
)

def _publish(self, topic: str, payload: Any, *, qos=0, retain=False):
"""Publish a message via MQTT."""
logger.debug(
f'Publishing to topic "{topic}" with payload "{payload}", qos={qos}, retain={retain}'
)
self._mqtt_client.publish(
topic=f"{base_topic}/{topic}",
payload=json.dumps(payload),
qos=qos,
retain=retain,
)

def _send_throttled(
self, topic: str, payload: Any, *, min_time_skip=500, qos=0, retain=False
):
"""Send an MQTT message throttled unless value has changed."""
now = get_current_time_milli()

if topic in self._attributes:
prev = self._attributes[topic]
time_since_last_update = now - prev["last_update"]
if prev["value"] == payload and time_since_last_update < 30000:
return
if prev["value"] != payload and time_since_last_update < min_time_skip:
return

logger.debug(
f'Sending throttled message for topic "{topic}" with payload "{payload}"'
)
self._attributes[topic] = {"value": payload, "last_update": now}
self._publish(topic, payload, retain=retain, qos=qos)

def _send_player_state(self, payload: Any):
"""Map player state data."""
self._send_throttled(Mqtt_Attributes.STATE.value, payload["state"])
for attr in ["title", "artist", "elapsed", "duration", "track", "file"]:
if attr in payload:
self._send_throttled(Mqtt_Attributes[attr.upper()].value, payload[attr])

self._send_throttled(Mqtt_Attributes.RANDOM.value, payload.get("random") == "1")

repeat_active = bool(payload.get("repeat") == "1")
self._send_throttled(Mqtt_Attributes.REPEAT.value, repeat_active)
self._send_throttled(
Mqtt_Attributes.REPEAT_MODE.value,
map_repeat_mode(repeat_active, payload.get("single") == "1"),
)

def _send_volume(self, payload: Any):
"""Map volume data."""
logger.debug(f"Sending volume update with payload: {payload}")
if legacy_support_enabled:
self._send_throttled(Mqtt_Attributes.VOLUME.value, payload.get("volume"))
self._send_throttled(Mqtt_Attributes.MUTE.value, bool(payload.get("mute")))
self._send_throttled("status/player/volume", payload.get("volume"))
self._send_throttled("status/player/mute", bool(payload.get("mute")))

def run(self) -> None:
"""Main loop of the MQTT thread."""
logger.info("Starting MQTT Thread")
self._send_throttled("state", "online", qos=1, retain=True)
self._send_throttled("version", jukebox.version(), qos=1, retain=True) # type: ignore
self._subscribe()

sub = jukebox.publishing.subscriber.Subscriber(
"inproc://PublisherToProxy", topics_to_send
)
while self._keep_running:
topic, payload = sub.receive()
if topic == "volume.level":
self._send_volume(payload)
elif topic == "playerstatus":
self._send_player_state(payload)
logger.info("Exiting MQTT Thread")

def stop(self):
"""Stop the MQTT thread."""
logger.info("Stopping MQTT Thread")
self._send_throttled("state", "offline", qos=1, retain=True)

self._keep_running = False
self.listen_done.clear()
self.action_done.set()


mqtt: MQTT
mqtt_client: paho_mqtt.Client


def on_connect(client, userdata, flags, rc):
"""Start thread on successful MQTT connection."""
global mqtt
logger.debug(f"Connected with result code {rc} to {base_topic}")
mqtt = MQTT(client)
mqtt.start()


@plugs.initialize
def initialize():
"""Setup connection and trigger the MQTT loop."""
global mqtt_client

client_id = cfg.setndefault("mqtt", "client_id", value="phoniebox-future3")
username = cfg.setndefault("mqtt", "username", value="phoniebox-dev")
password = cfg.setndefault("mqtt", "password", value="phoniebox-dev")
host = cfg.setndefault("mqtt", "host", value="127.0.0.1")
port = cfg.setndefault("mqtt", "port", value=1883)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we have some of those useful settings in jukebox.yaml?
Also, one addition I would prefer is to be able to enable/disable this functionality (e.g. through the jukebox.yaml)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good point. I added an enable flag in the jukebox.yaml config of this component.

example config

modules:
    named:
        ...
        mqtt: mqtt
...
mqtt:
    enable: true
    # The client id used in communication with the MQTT broker and identification of the phoniebox
    client_id: phoniebox_dev
    # The username to authenticate against the broker
    username: phoniebox-dev
    # The password to authenticate against the broker
    password: secret-password
    # The host name or IP address of your mqtt broker
    host: 127.0.0.1
    # The port number of the mqtt broker. The default is 1883
    port: 1883


logger.info(
f"Initializing MQTT client with client_id={client_id}, username={username}, host={host}, port={port}"
)
mqtt_client = paho_mqtt.Client(client_id=client_id)
mqtt_client.username_pw_set(username=username, password=password)
mqtt_client.on_connect = on_connect
mqtt_client.will_set(
topic=f"{base_topic}/state", payload=json.dumps("offline"), qos=1, retain=True
)
mqtt_client.connect(host, port, 60)
mqtt_client.loop_start()
logger.info("MQTT client initialized and loop started")


@plugs.atexit
def atexit(signal_id: int, **ignored_kwargs):
global mqtt, mqtt_client
logger.info("Executing atexit handler, stopping MQTT client")
mqtt.stop()
mqtt_client.loop_stop()
mqtt_client.disconnect()
logger.info("MQTT client stopped and disconnected")
Loading