diff --git a/cozy/control/mpris.py b/cozy/control/mpris.py index 00bd2d88..42e0746d 100644 --- a/cozy/control/mpris.py +++ b/cozy/control/mpris.py @@ -5,67 +5,95 @@ # copyright (c) 2013 Arnel A. Borja # copyright (c) 2013 Vadim Rutkovsky # copyright (c) 2017 Julian Geywitz -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# You should have received a copy of the GNU General Public License -# along with this program. If not, see . +# copyright (c) 2023 Benedek Dévényi + import logging -from gi.repository import Gio, GLib, Gtk +import re +import time +from dataclasses import dataclass -from random import randint +from gi.repository import Gio, GLib -import cozy.ui from cozy.application_settings import ApplicationSettings from cozy.control.artwork_cache import ArtworkCache from cozy.ext import inject -from cozy.media.player import Player +from cozy.media.player import NS_TO_SEC, US_TO_SEC, Player from cozy.model.book import Book from cozy.report import reporter -log = logging.getLogger("offline_cache") +log = logging.getLogger("mpris") + +CamelCasePattern = re.compile(r"(? str: + return CamelCasePattern.sub("_", name).lower() + + +@dataclass(kw_only=True, frozen=True, slots=True) +class Metadata: + track_id: str + track_number: int + title: str + album: str + artist: list[str] + length: int + url: str + artwork_uri: str + + def to_dict(self) -> dict[str, GLib.Variant]: + data = {} + data["mpris:trackid"] = GLib.Variant("o", self.track_id) + data["xesam:trackNumber"] = GLib.Variant("i", self.track_number) + data["xesam:title"] = GLib.Variant("s", self.title) + data["xesam:album"] = GLib.Variant("s", self.album) + data["xesam:artist"] = GLib.Variant("as", self.artist) + data["mpris:length"] = GLib.Variant("x", self.length) + data["xesam:url"] = GLib.Variant("s", self.url) + if self.artwork_uri: + data["mpris:artUrl"] = GLib.Variant("s", "file://" + self.artwork_uri) -class UnsupportedProperty(Exception): - pass + return data + + @staticmethod + def no_track() -> dict[str, GLib.Variant]: + no_track_path = GLib.Variant("o", "/org/mpris/MediaPlayer2/TrackList/NoTrack") + return {"mpris:trackid": no_track_path} class Server: - def __init__(self, con, path): - method_outargs = {} - method_inargs = {} - for interface in Gio.DBusNodeInfo.new_for_xml(self.__doc__).interfaces: + def __init__(self, connection: Gio.DBusConnection, path: str) -> None: + self.method_outargs = {} + self.method_inargs = {} + for interface in Gio.DBusNodeInfo.new_for_xml(self.__doc__).interfaces: for method in interface.methods: - method_outargs[method.name] = "(" + "".join( - [arg.signature for arg in method.out_args]) + ")" - method_inargs[method.name] = tuple( - arg.signature for arg in method.in_args) + self.method_inargs[method.name] = tuple( + arg.signature for arg in method.in_args + ) + out_sig = [arg.signature for arg in method.out_args] + self.method_outargs[method.name] = "(" + "".join(out_sig) + ")" try: - con.register_object(object_path=path, - interface_info=interface, - method_call_closure=self.on_method_call) - except: - log.error("MPRIS is already connected from another cozy process.") - - self.method_inargs = method_inargs - self.method_outargs = method_outargs - - def on_method_call(self, - connection, - sender, - object_path, - interface_name, - method_name, - parameters, - invocation): - + connection.register_object( + object_path=path, + interface_info=interface, + method_call_closure=self.on_method_call, + ) + except Exception: + log.error("MPRIS is already connected from another Cozy process.") + + def on_method_call( + self, + connection: Gio.DBusConnection, + sender: str, + object_path: str, + interface_name: str, + method_name: str, + parameters: GLib.Variant, + invocation: Gio.DBusMethodInvocation, + ) -> None: args = list(parameters.unpack()) for i, sig in enumerate(self.method_inargs[method_name]): if sig == "h": @@ -73,350 +101,311 @@ def on_method_call(self, fd_list = msg.get_unix_fd_list() args[i] = fd_list.get(args[i]) - out_args = None + snake_method = to_snake_case(method_name) try: - result = getattr(self, method_name)(*args) - - # out_args is atleast (signature1). + result = getattr(self, snake_method)(*args) + except AttributeError: + invocation.return_dbus_error( + "{}.Error.NotSupported".format(interface_name), "Unsupported property" + ) + except Exception as e: + log.error(e) + reporter.exception("mpris", e) + reporter.error( + "mpris", + "MPRIS method call failed with method name: {}".format(method_name), + ) + invocation.return_dbus_error( + "{}.Error.Failed".format(interface_name), "Internal exception occurred" + ) + else: + # out_args is at least (signature1). # We therefore always wrap the result as a tuple. - # Refer to https://bugzilla.gnome.org/show_bug.cgi?id=765603 + # Reference: + # https://bugzilla.gnome.org/show_bug.cgi?id=765603 result = (result,) out_args = self.method_outargs[method_name] - if out_args and out_args != "()" and result[0]: + if out_args != "()" and result[0]: variant = GLib.Variant(out_args, result) invocation.return_value(variant) else: invocation.return_value(None) - except UnsupportedProperty: - invocation.return_dbus_error("{}.Error.NotSupported".format(interface_name), "Unsupported property") - except Exception as e: - log.error(e) - reporter.exception("mpris", e) - reporter.error("mpris", "MPRIS method call failed with method name: {}".format(method_name)) - if out_args: - reporter.error("mpris", "MPRIS method call failed with out_args: {}".format(out_args)) - invocation.return_dbus_error("{}.Error.Failed".format(interface_name), "Internal exception occurred") class MPRIS(Server): """ - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + """ - __MPRIS_IFACE = "org.mpris.MediaPlayer2" - __MPRIS_PLAYER_IFACE = "org.mpris.MediaPlayer2.Player" - __MPRIS_RATINGS_IFACE = "org.mpris.MediaPlayer2.ExtensionSetRatings" - __MPRIS_COZY = "org.mpris.MediaPlayer2.Cozy" - __MPRIS_PATH = "/org/mpris/MediaPlayer2" + + MEDIA_PLAYER2_INTERFACE = "org.mpris.MediaPlayer2" + MEDIA_PLAYER2_PLAYER_INTERFACE = "org.mpris.MediaPlayer2.Player" + _player: Player = inject.attr(Player) _artwork_cache: ArtworkCache = inject.attr(ArtworkCache) _app_settings: ApplicationSettings = inject.attr(ApplicationSettings) - def __init__(self, app): - self.__app = app - self.__ui = cozy.ui.main_view.CozyUI() - self.__rating = None - self.__cozy_id = 0 - self.__metadata = {"mpris:trackid": GLib.Variant( - "o", - "/org/mpris/MediaPlayer2/TrackList/NoTrack")} - self.__track_id = self.__get_media_id(0) - self.__bus = Gio.bus_get_sync(Gio.BusType.SESSION, None) - Gio.bus_own_name_on_connection(self.__bus, - self.__MPRIS_COZY, - Gio.BusNameOwnerFlags.NONE, - None, - None) - Server.__init__(self, self.__bus, self.__MPRIS_PATH) + def __init__(self, app) -> None: + self._bus = Gio.bus_get_sync(Gio.BusType.SESSION, None) + Gio.bus_own_name_on_connection( + self._bus, + "org.mpris.MediaPlayer2.Cozy", + Gio.BusNameOwnerFlags.NONE, + None, + None, + ) + super().__init__(self._bus, "/org/mpris/MediaPlayer2") + + self._application = app + self._metadata = self._get_new_metadata() self._player.add_listener(self._on_player_changed) self._app_settings.add_listener(self._on_app_setting_changed) - def Raise(self): - try: - self.__app.ui.window.present_with_time(Gtk.get_current_event_time()) - except Exception as e: - reporter.exception("mpris", e) + def introspect(self): + return self.__doc__ - def Quit(self): - self.__app.quit() + def quit(self): + self._application.quit() - def Next(self): + def next(self): self._player.forward() - def Previous(self): + def previous(self): self._player.rewind() - def Pause(self): - self._player.pause() - - def PlayPause(self): + def play(self): self._player.play_pause() - def Stop(self): - self._player.destroy() + def pause(self): + self._player.pause() - def Play(self): + def play_pause(self): self._player.play_pause() - def SetPosition(self, track_id, position): - self._player.position = position * 10**3 + def stop(self): + self._player.destroy() - def Seek(self, offset): - self._player.position = self._player.position + offset * 10**3 + def set_position(self, track_id: str, position: int): + self._player.position = position / US_TO_SEC - def Seeked(self, position): - self.__bus.emit_signal( - None, - self.__MPRIS_PATH, - "org.freedesktop.DBus.Properties", - "Seeked", - GLib.Variant.new_tuple(GLib.Variant("x", position))) + def seek(self, offset: int): + self._player.position = self._player.position / NS_TO_SEC + offset / US_TO_SEC - def Get(self, interface, property_name): - if property_name in ["CanQuit", "CanRaise", "CanSeek", - "CanControl", "HasRatingsExtension"]: + def get(self, interface: str, property_name: str) -> GLib.Variant: + if property_name in {"CanQuit", "CanControl"}: return GLib.Variant("b", True) - elif property_name == "HasTrackList": + elif property_name in {"CanRaise", "HasTrackList"}: return GLib.Variant("b", False) - elif property_name == "Identity": - return GLib.Variant("s", "Cozy") - elif property_name == "DesktopEntry": - return GLib.Variant("s", "com.github.geigi.cozy") - elif property_name == "SupportedUriSchemes": - return GLib.Variant("as", ["file"]) - elif property_name == "SupportedMimeTypes": - return GLib.Variant("as", ["application/ogg", - "audio/x-vorbis+ogg", - "audio/x-flac", - "audio/mpeg"]) - elif property_name == "PlaybackStatus": - return GLib.Variant("s", self.__get_status()) - elif property_name == "Metadata": - return GLib.Variant("a{sv}", self.__metadata) - elif property_name == "Position": - return GLib.Variant( - "x", - round(self._player.position * 10**-3)) - elif property_name in ["CanGoNext", "CanGoPrevious", - "CanPlay", "CanPause"]: + elif property_name in { + "CanGoNext", + "CanGoPrevious", + "CanPlay", + "CanPause", + "CanSeek", + }: return GLib.Variant("b", self._player.loaded_book is not None) - elif property_name == "Volume": - return GLib.Variant("d", self._player.volume) + elif property_name in {"SupportedUriSchemes", "SupportedMimeTypes"}: + return GLib.Variant("as", []) + + # Might raise an AttributeError. We handle that in Server.on_method_call + return getattr(self, to_snake_case(property_name)) + + def get_all(self, interface): + if interface == self.MEDIA_PLAYER2_INTERFACE: + properties = ( + "CanQuit", + "CanRaise", + "HasTrackList", + "Identity", + "DesktopEntry", + "SupportedUriSchemes", + "SupportedMimeTypes", + ) + elif interface == self.MEDIA_PLAYER2_PLAYER_INTERFACE: + properties = ( + "PlaybackStatus", + "Metadata", + "Position", + "CanGoNext", + "CanGoPrevious", + "CanPlay", + "CanPause", + "CanSeek", + "CanControl", + "Volume", + ) + + return {property: self.get(interface, property) for property in properties} + + def properties_changed(self, iface_name, changed_props, invalidated_props): + self._bus.emit_signal( + None, + "/org/mpris/MediaPlayer2", + "org.freedesktop.DBus.Properties", + "PropertiesChanged", + GLib.Variant.new_tuple( + GLib.Variant("s", iface_name), + GLib.Variant("a{sv}", changed_props), + GLib.Variant("as", invalidated_props), + ), + ) + + @property + def desktop_entry(self): + return GLib.Variant("s", "com.github.geigi.cozy") + + @property + def identity(self): + return GLib.Variant("s", "Cozy") + + @property + def playback_status(self): + if self._player.playing: + return GLib.Variant("s", "Playing") + elif not self._player.loaded_book: + return GLib.Variant("s", "Stopped") else: - reporter.warning("mpris", "MPRIS required an unknown information: {}".format(property_name)) - raise UnsupportedProperty - - def GetAll(self, interface): - ret = {} - if interface == self.__MPRIS_IFACE: - for property_name in ["CanQuit", - "CanRaise", - "HasTrackList", - "Identity", - "DesktopEntry", - "SupportedUriSchemes", - "SupportedMimeTypes"]: - ret[property_name] = self.Get(interface, property_name) - elif interface == self.__MPRIS_PLAYER_IFACE: - for property_name in ["PlaybackStatus", - "Metadata", - "Position", - "CanGoNext", - "CanGoPrevious", - "CanPlay", - "CanPause", - "CanSeek", - "CanControl"]: - ret[property_name] = self.Get(interface, property_name) - elif interface == self.__MPRIS_RATINGS_IFACE: - ret["HasRatingsExtension"] = GLib.Variant("b", False) - return ret - - def Set(self, interface, property_name, new_value): - if property_name == "Volume": - self._player.volume = new_value - - def PropertiesChanged(self, interface_name, changed_properties, - invalidated_properties): - self.__bus.emit_signal(None, - self.__MPRIS_PATH, - "org.freedesktop.DBus.Properties", - "PropertiesChanged", - GLib.Variant.new_tuple( - GLib.Variant("s", interface_name), - GLib.Variant("a{sv}", changed_properties), - GLib.Variant("as", invalidated_properties))) - - def Introspect(self): - return self.__doc__ + return GLib.Variant("s", "Paused") + + @property + def metadata(self): + return GLib.Variant("a{sv}", self._metadata) - ####################### - # PRIVATE # - ####################### + @property + def position(self): + return GLib.Variant("x", round(self._player.position / 1e3)) - def __get_media_id(self, track_id): + @property + def volume(self): + return GLib.Variant("d", self._player.volume) + + def _get_track_id(self) -> float: """ - TrackId's must be unique even up to - the point that if you repeat a song - it must have a different TrackId. + Track IDs must be unique even up to the point that if a song + is repeated in a playlist it must have a different TrackId. """ - track_id = track_id + randint(10000000, 90000000) - return GLib.Variant("o", "/com/github/geigi/cozy/TrackId/%s" % track_id) + return time.time() * 1e10 % 1e10 - def __get_status(self): - if self._player.playing: - return "Playing" - elif not self._player.loaded_book: - return "Stopped" - else: - return "Paused" - - def _on_player_changed(self, event, message): + def _get_new_metadata(self, book: Book | None = None) -> dict[str, GLib.Variant]: + if book is None: + return Metadata.no_track() + + track_path_template = "/com/github/geigi/cozy/TrackId/{id:.0f}" + uri_template = "file://{path}" + + metadata = Metadata( + track_id=track_path_template.format(id=self._get_track_id()), + track_number=book.current_chapter.number, + title=book.current_chapter.name, + album=book.name, + artist=[book.author], + length=book.current_chapter.length * US_TO_SEC, + url=uri_template.format(path=book.current_chapter.file), + artwork_uri=self._artwork_cache.get_album_art_path(book, 256), + ) + return metadata.to_dict() + + def _on_player_changed(self, event: str, _) -> None: if event == "chapter-changed": self._on_current_changed() elif event == "play": - self.__on_status_changed("Playing") + self._on_status_changed("Playing") elif event == "pause": - self.__on_status_changed("Paused") + self._on_status_changed("Paused") elif event == "stop": - self.__on_status_changed("Stopped") + self._on_status_changed("Stopped") - def _on_app_setting_changed(self, event, _): + def _on_app_setting_changed(self, event: str, _): if event == "swap-author-reader": self._on_current_changed() - def __update_metadata(self, book: Book): - # if track is None: - # track = get_current_track() - if book is None: - self.__metadata = {"mpris:trackid": GLib.Variant( - "o", - "/org/mpris/MediaPlayer2/TrackList/NoTrack")} - else: - self.__metadata["mpris:trackid"] = self.__track_id - track_number = book.current_chapter.number - - self.__metadata["xesam:trackNumber"] = GLib.Variant("i", - track_number) - self.__metadata["xesam:title"] = GLib.Variant( - "s", - book.current_chapter.name) - self.__metadata["xesam:album"] = GLib.Variant( - "s", - book.name) - self.__metadata["xesam:artist"] = GLib.Variant( - "as", - [book.author]) - self.__metadata["mpris:length"] = GLib.Variant( - "x", - book.current_chapter.length * 1000 * 1000) - self.__metadata["xesam:url"] = GLib.Variant( - "s", - "file:///" + book.current_chapter.file) - - path = self._artwork_cache.get_album_art_path(book, 180) - if path: - self.__metadata["mpris:artUrl"] = GLib.Variant( - "s", - "file://" + path) - - def __on_seeked(self, player, position): - self.Seeked(position * (1000 * 1000)) - - def _on_current_changed(self): + def _on_current_changed(self) -> None: if not self._player.loaded_book: return - current_track_id = self._player.loaded_chapter.id - if current_track_id and current_track_id >= 0: - self.__cozy_id = current_track_id - else: - self.__cozy_id = 0 - self.__track_id = self.__get_media_id(self.__cozy_id) - self.__rating = None - self.__update_metadata(self._player.loaded_book) - properties = {"Metadata": GLib.Variant("a{sv}", self.__metadata), - "CanPlay": GLib.Variant("b", True), - "CanPause": GLib.Variant("b", True), - "CanGoNext": GLib.Variant("b", True), - "CanGoPrevious": GLib.Variant("b", True)} - try: - self.PropertiesChanged(self.__MPRIS_PLAYER_IFACE, properties, []) - except Exception as e: - print("MPRIS::__on_current_changed(): %s" % e) + self._metadata = self._get_new_metadata(self._player.loaded_book) + + properties = { + "Metadata": GLib.Variant("a{sv}", self._metadata), + "CanPlay": GLib.Variant("b", True), + "CanPause": GLib.Variant("b", True), + "CanGoNext": GLib.Variant("b", True), + "CanGoPrevious": GLib.Variant("b", True), + } + + self.properties_changed(self.MEDIA_PLAYER2_PLAYER_INTERFACE, properties, []) - def __on_status_changed(self, status, data=None): + def _on_status_changed(self, status: str) -> None: properties = {"PlaybackStatus": GLib.Variant("s", status)} - self.PropertiesChanged(self.__MPRIS_PLAYER_IFACE, properties, []) + self.properties_changed(self.MEDIA_PLAYER2_PLAYER_INTERFACE, properties, []) diff --git a/cozy/media/player.py b/cozy/media/player.py index 7a12122e..e018a4b0 100644 --- a/cozy/media/player.py +++ b/cozy/media/player.py @@ -22,6 +22,7 @@ log = logging.getLogger("mediaplayer") +US_TO_SEC = 10 ** 6 NS_TO_SEC = 10 ** 9 REWIND_SECONDS = 30 @@ -80,7 +81,9 @@ def position(self) -> int: @position.setter def position(self, new_value: int): - self._gst_player.position = self.loaded_chapter.start_position + (new_value * NS_TO_SEC) + # FIXME: setter expects seconds, but getter returns nanoseconds + if self.loaded_chapter is not None: + self._gst_player.position = max(self.loaded_chapter.start_position + (new_value * NS_TO_SEC), 0) @property def volume(self) -> float: @@ -172,8 +175,7 @@ def forward(self): def destroy(self): self._gst_player.dispose() - - self._stop_tick_thread() + self._stop_playback() if self._fadeout_thread: self._fadeout_thread.stop()