diff --git a/README.rst b/README.rst
index 86da220..1c4a503 100644
--- a/README.rst
+++ b/README.rst
@@ -45,7 +45,8 @@ The following configuration values are available:
Defaults to enabled.
- ``listenbrainz/token``: Your `Listenbrainz user token `_
- ``listenbrainz/url``: The URL of the API of the Listenbrainz instance to record listens to (default: api.listenbrainz.org)
-
+- ``listenbrainz/import_playlists``: Whether to import Listenbrainz playlists (default: ``false``)
+- ``listenbrainz/search_schemes``: If non empty, the search for tracks in Mopidy's library is limited to results with the given schemes. The default value is ``"local:"`` to search tracks in Mopidy-Local library. It's recommended to customize the value according to your favorite backend but beware that not all backends support the required track search by ``musicbrainz_trackid`` (Mopidy-File, Mopidy-InternetArchive, Mopidy-Podcast, Mopidy-Somafm, Mopidy-Stream don't support such searches).
Project resources
=================
@@ -60,4 +61,5 @@ Credits
- Fork of `Mopidy-Scrobbler `__ by `Stein Magnus Jodal `__
- Current maintainer: `suaviloquence `__
+- Playlist support by `Matthias Meulien `__
- `Contributors `_
diff --git a/mopidy_listenbrainz/__init__.py b/mopidy_listenbrainz/__init__.py
index a44b026..ac0ff1b 100644
--- a/mopidy_listenbrainz/__init__.py
+++ b/mopidy_listenbrainz/__init__.py
@@ -19,9 +19,15 @@ def get_config_schema(self):
schema = super().get_config_schema()
schema["token"] = config.Secret()
schema["url"] = config.String()
+ schema["import_playlists"] = config.Boolean()
+ schema["search_schemes"] = config.List(optional=True)
return schema
def setup(self, registry):
from .frontend import ListenbrainzFrontend
registry.add("frontend", ListenbrainzFrontend)
+
+ from .backend import ListenbrainzBackend
+
+ registry.add("backend", ListenbrainzBackend)
diff --git a/mopidy_listenbrainz/backend.py b/mopidy_listenbrainz/backend.py
new file mode 100644
index 0000000..cb72f7a
--- /dev/null
+++ b/mopidy_listenbrainz/backend.py
@@ -0,0 +1,30 @@
+from __future__ import annotations
+
+from typing import TYPE_CHECKING, ClassVar, NewType
+
+import pykka
+
+from mopidy import backend
+
+try:
+ from mopidy.types import UriScheme
+except ModuleNotFoundError:
+ UriScheme = NewType("UriScheme", str)
+
+from .playlists import ListenbrainzPlaylistsProvider
+
+if TYPE_CHECKING:
+ from mopidy.audio import AudioProxy
+ from mopidy.ext import Config
+
+
+class ListenbrainzBackend(pykka.ThreadingActor, backend.Backend):
+ uri_schemes: ClassVar[list[UriScheme]] = [UriScheme("listenbrainz")]
+
+ def __init__(
+ self,
+ config: Config, # noqa: ARG002
+ audio: AudioProxy, # noqa: ARG002
+ ) -> None:
+ super().__init__()
+ self.playlists = ListenbrainzPlaylistsProvider(self)
diff --git a/mopidy_listenbrainz/ext.conf b/mopidy_listenbrainz/ext.conf
index 1c15765..088394a 100644
--- a/mopidy_listenbrainz/ext.conf
+++ b/mopidy_listenbrainz/ext.conf
@@ -2,3 +2,5 @@
enabled = true
token =
url = api.listenbrainz.org
+import_playlists = false
+search_schemes = local:
diff --git a/mopidy_listenbrainz/frontend.py b/mopidy_listenbrainz/frontend.py
index a3db66d..10d6ff8 100644
--- a/mopidy_listenbrainz/frontend.py
+++ b/mopidy_listenbrainz/frontend.py
@@ -1,10 +1,14 @@
import logging
import time
+from datetime import datetime, timedelta
+from threading import Timer
+from typing import List, Optional
import pykka
from mopidy.core import CoreListener
+from mopidy.models import Playlist, Track
-from .listenbrainz import Listenbrainz
+from .listenbrainz import Listenbrainz, PlaylistData
logger = logging.getLogger(__name__)
@@ -15,14 +19,142 @@ class ListenbrainzFrontend(pykka.ThreadingActor, CoreListener):
def __init__(self, config, core):
super().__init__()
self.config = config
+ self.library = core.library
+ self.playlists = core.playlists
+ self.playlists_update_timer = None
def on_start(self):
self.lb = Listenbrainz(
self.config["listenbrainz"]["token"],
self.config["listenbrainz"]["url"],
+ self.config["proxy"],
)
logger.debug("Listenbrainz token valid!")
+ if self.config["listenbrainz"].get("import_playlists", False):
+ search_schemes = self.config["listenbrainz"].get(
+ "search_schemes", ["local:"]
+ )
+ if len(search_schemes) > 0:
+ logger.debug(
+ f"Will limit track searches to URIs: {search_schemes}"
+ )
+ else:
+ msg = "Track searches among all backends aren't stable! " \
+ "Better configure `search_schemes' to match your " \
+ "favorite backend"
+ logger.warn(msg)
+
+ self.import_playlists()
+
+ def on_stop(self):
+ if self.playlists_update_timer:
+ self.playlists_update_timer.cancel()
+
+ def import_playlists(self) -> None:
+ logger.info("Importing ListenBrainz playlists")
+
+ import_count = 0
+ playlist_datas = self.lb.list_playlists_created_for_user()
+ logger.debug(f"Found {len(playlist_datas)} playlists to import")
+
+ existing_playlists = self.playlists.as_list().get()
+ recommendation_playlist_uri_prefix = (
+ "listenbrainz:playlist:recommendation"
+ )
+ filtered_existing_playlists = dict(
+ [
+ (ref.uri, ref)
+ for ref in existing_playlists
+ if ref.uri.startswith(recommendation_playlist_uri_prefix)
+ ]
+ )
+
+ for playlist_data in playlist_datas:
+ source = playlist_data.playlist_id
+ playlist_uri = f"{recommendation_playlist_uri_prefix}:{playlist_data.playlist_id}"
+ tracks = self._collect_playlist_tracks(playlist_data)
+
+ if len(tracks) == 0:
+ logger.debug(
+ f"Skipping import of playlist with no known track for {source!r}"
+ )
+ continue
+
+ if playlist_uri in filtered_existing_playlists:
+ filtered_existing_playlists.pop(playlist_uri)
+ # must pop since filtered_existing_playlists will
+ # finally be deleted
+
+ logger.debug(f"Already known playlist {playlist_uri}")
+ # maybe there're new tracks in Mopidy's database...
+ else:
+ query = self.playlists.create(
+ name=playlist_uri, uri_scheme="listenbrainz"
+ )
+ # Hack, hack: The backend uses first parameter as URI,
+ # not name...
+ playlist = query.get()
+ if playlist is None:
+ logger.warning(f"Failed to create playlist for {source!r}")
+ continue
+
+ logger.debug(f"Playlist {playlist.uri!r} created")
+
+ complete_playlist = Playlist(
+ uri=playlist_uri,
+ name=playlist_data.name,
+ tracks=tracks,
+ last_modified=playlist_data.last_modified,
+ )
+ query = self.playlists.save(complete_playlist)
+ playlist = query.get()
+ if playlist is None:
+ logger.warning(f"Failed to save playlist for {source!r}")
+ else:
+ import_count += 1
+ logger.debug(
+ f"Playlist saved with {len(playlist.tracks)} tracks {playlist.uri!r}"
+ )
+
+ for playlist in filtered_existing_playlists.values():
+ logger.debug(f"Deletion of obsolete playlist {playlist.uri!r}")
+ self.playlists.delete(playlist.uri)
+
+ logger.info(
+ f"Successfully imported ListenBrainz playlists: {import_count}"
+ )
+ self._schedule_playlists_import()
+
+ def _collect_playlist_tracks(
+ self, playlist_data: PlaylistData
+ ) -> List[Track]:
+ tracks: List[Track] = []
+ search_schemes = self.config["listenbrainz"].get("search_schemes", ["local:"])
+
+ for track_mbid in playlist_data.track_mbids:
+ query = self.library.search(
+ {"musicbrainz_trackid": [track_mbid]}, uris=search_schemes
+ )
+ results = query.get()
+
+ found_tracks = [t for r in results for t in r.tracks]
+ if len(found_tracks) == 0:
+ continue
+
+ tracks.append(found_tracks[0])
+ return tracks
+
+ def _schedule_playlists_import(self):
+ now = datetime.now()
+ days_until_next_monday = 7 - now.weekday()
+ timer_interval = timedelta(days=days_until_next_monday).total_seconds()
+ logger.debug(f"Playlist update scheduled in {timer_interval} seconds")
+ self.playlists_update_timer = Timer(
+ timer_interval, self.import_playlists
+ )
+ self.playlists_update_timer.start()
+
def track_playback_started(self, tl_track):
track = tl_track.track
artists = ", ".join(sorted([a.name for a in track.artists]))
diff --git a/mopidy_listenbrainz/listenbrainz.py b/mopidy_listenbrainz/listenbrainz.py
index b157125..67b558d 100644
--- a/mopidy_listenbrainz/listenbrainz.py
+++ b/mopidy_listenbrainz/listenbrainz.py
@@ -1,27 +1,91 @@
+import datetime
+import logging
import time
-from typing import Any, Dict
+from dataclasses import dataclass
+from typing import Any, Dict, List, Optional
+from urllib.parse import urlparse
+
+import pkg_resources
+from mopidy import httpclient
import requests
from . import __version__
+logger = logging.getLogger(__name__)
+
+# Listenbrainz API
+LIST_PLAYLIST_CREATED_FOR_ENDPOINT = "/1/user/{user}/playlists/createdfor"
+PLAYLIST_ENDPOINT = "/1/playlist/{playlist_id}"
SUBMIT_LISTEN_ENDPOINT = "/1/submit-listens"
VALIDATE_TOKEN_ENDPOINT = "/1/validate-token"
+# Musicbrainz resources
+MUSICBRAINZ_PLAYLIST_EXTENSION_URL = "https://musicbrainz.org/doc/jspf#playlist"
+
+
+@dataclass
+class PlaylistData:
+ playlist_id: str
+ name: str
+ track_mbids: List[str]
+ last_modified: int
+
+
+def playlist_identifier_to_id(playlist_identifier: str) -> Optional[str]:
+ playlist_path = urlparse(playlist_identifier).path
+ path_prefix = "/playlist/"
+ return (
+ playlist_path[len(path_prefix) :]
+ if playlist_path.startswith(path_prefix)
+ else None
+ )
+
+
+def track_identifier_to_mbid(track_identifier: str) -> Optional[str]:
+ track_path = urlparse(track_identifier).path
+ path_prefix = "/recording/"
+ return (
+ track_path[len(path_prefix) :]
+ if track_path.startswith(path_prefix)
+ else None
+ )
+
+
+def get_requests_session(proxy_config, user_agent):
+ proxy = httpclient.format_proxy(proxy_config)
+ full_user_agent = httpclient.format_user_agent(user_agent)
+
+ session = requests.Session()
+ session.proxies.update({"http": proxy, "https": proxy})
+ session.headers.update({"user-agent": full_user_agent})
+
+ return session
+
class Listenbrainz(object):
token: str
url: str
- def __init__(self, token: str, url: str) -> None:
+ user_name: Optional[str]
+
+ def __init__(self, token: str, url: str, proxy_config: Any) -> None:
self.token = token
self.url = url
+ self.user_name = None # initialized during token validation
+
+ dist = pkg_resources.get_distribution("Mopidy-Listenbrainz")
+ self.session = get_requests_session(
+ proxy_config=proxy_config,
+ user_agent=f"{dist.project_name}/{dist.version}",
+ )
+
if not self.validate_token():
raise RuntimeError(f"Token {token} is not valid")
def validate_token(self) -> bool:
- response = requests.get(
+ response = self.session.get(
url=f"https://{self.url}{VALIDATE_TOKEN_ENDPOINT}",
headers={
"Authorization": f"Token {self.token}",
@@ -31,7 +95,9 @@ def validate_token(self) -> bool:
if response.status_code != 200:
return False
- return response.json()["valid"]
+ parsed_response = response.json()
+ self.user_name = parsed_response.get("user_name")
+ return parsed_response.get("valid")
def submit_listen(
self,
@@ -64,7 +130,7 @@ def submit_listen(
payload = [listen]
- response = requests.post(
+ response = self.session.post(
# hardcode https?
url=f"https://{self.url}{SUBMIT_LISTEN_ENDPOINT}",
json={
@@ -74,4 +140,113 @@ def submit_listen(
headers={
"Authorization": f"Token {self.token}",
},
- )
\ No newline at end of file
+ )
+
+ def list_playlists_created_for_user(self) -> List[PlaylistData]:
+ """List all playlist data from the "created for" endpoint.
+
+ The "created for" endpoint list recommendation playlists; It
+ is defined in ``LIST_PLAYLIST_CREATED_FOR_ENDPOINT``."""
+ if self.user_name is None:
+ logger.warning("No playlist created for unknown user!")
+ return []
+
+ path = LIST_PLAYLIST_CREATED_FOR_ENDPOINT.format(user=self.user_name)
+ response = self.session.get(
+ url=f"https://{self.url}{path}",
+ headers={
+ "Authorization": f"Token {self.token}",
+ },
+ )
+ parsed_response = response.json()
+ playlists: List[PlaylistData] = []
+ found_playlists: List[str] = []
+ for dto in parsed_response.get("playlists", []):
+ playlist_dto = dto.get("playlist", {})
+ playlist_identifier = playlist_dto.get("identifier")
+
+ if playlist_identifier is None:
+ logger.debug(f"Skipping playlist without identifier")
+ continue
+
+ if playlist_identifier in found_playlists:
+ logger.warning(f"Duplicated playlist {playlist_identifier!r}")
+ continue
+
+ found_playlists.append(playlist_identifier)
+
+ playlist_data = self._collect_playlist_data(playlist_identifier)
+ if playlist_data is None:
+ logger.warning(
+ f"Failed to build playlist {playlist_identifier!r}"
+ )
+ continue
+
+ playlists.append(playlist_data)
+
+ return playlists
+
+ def _collect_playlist_data(
+ self, playlist_identifier: str
+ ) -> Optional[PlaylistData]:
+ """Collect playlist data from a ListenBrainz playlist identifier.
+
+ The ListenBrainz playlist identifier is a URL whose last path
+ segment identifies the playlist. A playlist DTO is fetched
+ from that identifier using ListenBrainz API.
+
+ If the ListenBrainz playlist identifier doesn't match our
+ expectations (see ``playlist_identifier_to_id()``), or the DTO
+ hasn't the expected fields (``name``, ``date``), None is
+ returned.
+
+ MusicBrainz track identifiers are extracted from the tracks
+ identifiers found in the DTO ``tracks`` field.
+
+ """
+ playlist_id = playlist_identifier_to_id(playlist_identifier)
+ if playlist_id is None:
+ logger.warning(
+ f"Failed to extract playlist id from {playlist_identifier}"
+ )
+ return None
+
+ path = PLAYLIST_ENDPOINT.format(playlist_id=playlist_id)
+ response = self.session.get(
+ url=f"https://{self.url}{path}",
+ headers={
+ "Authorization": f"Token {self.token}",
+ },
+ )
+ parsed_response = response.json()
+ dto = parsed_response.get("playlist", {})
+ name = dto.get("title")
+ if name is None:
+ logger.debug(f"Unable to read a name from playlist {playlist_id!r}")
+ return None
+ try:
+ creation_date = datetime.datetime.fromisoformat(dto.get("date"))
+ except (ValueError, TypeError):
+ logger.warning(f"Failed to parse date for playlist {playlist_id!r}")
+ return None
+ track_mbids = []
+ for track_dto in dto.get("track", []): # not tracks!
+ track_identifier = track_dto.get("identifier", "")
+ track_mbid = track_identifier_to_mbid(track_identifier)
+ if track_mbid is None:
+ logger.debug(
+ f"Failed to identify MBID from {track_identifier!r}"
+ )
+ continue
+
+ track_mbids.append(track_mbid)
+
+ if len(track_mbids) == 0:
+ logger.debug(
+ f"No MBID found for tracks of playlist {playlist_id!r}"
+ )
+ return None
+
+ return PlaylistData(
+ playlist_id, name, track_mbids, int(creation_date.timestamp())
+ )
diff --git a/mopidy_listenbrainz/playlists.py b/mopidy_listenbrainz/playlists.py
new file mode 100644
index 0000000..802d439
--- /dev/null
+++ b/mopidy_listenbrainz/playlists.py
@@ -0,0 +1,106 @@
+import datetime
+import logging
+from typing import cast, List, NewType
+from uuid import uuid4
+
+from mopidy.backend import Backend, PlaylistsProvider
+from mopidy.models import Playlist, Ref
+
+try:
+ from mopidy.types import Uri, UriScheme
+except ModuleNotFoundError:
+ Uri = NewType("Uri", str)
+ UriScheme = NewType("UriScheme", str)
+
+
+logger = logging.getLogger(__name__)
+
+
+class ListenbrainzPlaylistsProvider(PlaylistsProvider):
+ """Provider for ListenBrainz playlists.
+
+ Note that this provider doesn't serialize the playlists. They're
+ expected to be created by the frontend on each extension setup.
+
+ This provider handles URIs with scheme ``listenbrainz:playlist``.
+
+ """
+
+ uri_prefix: UriScheme
+ playlists: List[Playlist]
+
+ def __init__(self, backend: Backend) -> None:
+ super().__init__(backend)
+
+ assert len(backend.uri_schemes) == 1
+ self.uri_prefix = cast(UriScheme, backend.uri_schemes[0] + ":playlist")
+ self.playlists = []
+
+ def as_list(self) -> List[Ref]:
+ return [Ref.playlist(uri=p.uri, name=p.name) for p in self.playlists]
+
+ def create(self, name: str) -> Playlist | None:
+ uri = name
+ if not uri.startswith(self.uri_prefix):
+ return None
+
+ playlist = Playlist(uri=uri, name=name)
+ self.playlists.append(playlist)
+ return playlist
+
+ def delete(self, uri: Uri) -> bool:
+ if not uri.startswith(self.uri_prefix):
+ return False
+
+ found = [p for p in self.playlists if p.uri == uri]
+ if len(found) == 0:
+ return False
+
+ pos = self.playlists.index(found[0])
+ del self.playlists[pos]
+ return True
+
+ def get_items(self, uri: Uri) -> List[Ref] | None:
+ if not uri.startswith(self.uri_prefix):
+ return None
+
+ found = [p for p in self.playlists if p.uri == uri]
+ if len(found) == 0:
+ return None
+
+ return [Ref.playlist(uri=p.uri, name=p.name) for p in found]
+
+ def lookup(self, uri: Uri) -> Playlist | None:
+ if not uri.startswith(self.uri_prefix):
+ return None
+
+ found = [p for p in self.playlists if p.uri == uri]
+ if len(found) == 0:
+ return None
+
+ return found[0]
+
+ def refresh(self) -> None:
+ pass
+
+ def save(self, playlist: Playlist) -> Playlist | None:
+ uri = playlist.uri
+
+ if not uri.startswith(self.uri_prefix):
+ return None
+
+ found = [p for p in self.playlists if p.uri == uri]
+ if len(found) == 0:
+ return None
+
+ if uri.startswith(self.uri_prefix + ":recommendation"):
+ if not (len(playlist.tracks) > len(found[0].tracks)):
+ # return unchanged playlist for recommendations whose
+ # track list isn't increasing, really save iff first
+ # save after creation or new tracks being available in
+ # Mopidy's database
+ return found[0]
+
+ pos = self.playlists.index(found[0])
+ self.playlists[pos] = playlist
+ return self.playlists[pos]