diff --git a/cozy/control/artwork_cache.py b/cozy/control/artwork_cache.py index 020862f1..748529ad 100644 --- a/cozy/control/artwork_cache.py +++ b/cozy/control/artwork_cache.py @@ -1,6 +1,8 @@ import logging import os -import uuid +import shutil +from pathlib import Path +from uuid import uuid4 from gi.repository import Gdk, GdkPixbuf @@ -21,6 +23,10 @@ def __init__(self): _app_settings = inject.instance(ApplicationSettings) _app_settings.add_listener(self._on_app_setting_changed) + @property + def artwork_cache_dir(self): + return Path(get_cache_dir()) / "artwork" + def get_cover_paintable(self, book, scale, size=0) -> Gdk.Texture | None: pixbuf = None size *= scale @@ -45,18 +51,12 @@ def delete_artwork_cache(self): """ Deletes the artwork cache completely. """ - cache_dir = os.path.join(get_cache_dir(), "artwork") - - import shutil - if os.path.exists(cache_dir): - shutil.rmtree(cache_dir) - - q = ArtworkCacheModel.delete() - q.execute() + shutil.rmtree(self.artwork_cache_dir, ignore_errors=True) + ArtworkCacheModel.delete().execute() def _on_importer_event(self, event, data): if event == "scan" and data == ScanStatus.STARTED: - self.delete_artwork_cache() + self.delete_artwork_cache() def _create_artwork_cache(self, book, pixbuf, size): """ @@ -68,54 +68,49 @@ def _create_artwork_cache(self, book, pixbuf, size): :return: Resized pixbuf """ query = ArtworkCacheModel.select().where(ArtworkCacheModel.book == book.id) - gen_uuid = "" if query.exists(): - gen_uuid = str(query.first().uuid) + uuid = str(query.first().uuid) else: - gen_uuid = str(uuid.uuid4()) - ArtworkCacheModel.create(book=book.id, uuid=gen_uuid) + uuid = str(uuid4()) + ArtworkCacheModel.create(book=book.id, uuid=uuid) - cache_dir = os.path.join(os.path.join(get_cache_dir(), "artwork"), gen_uuid) - if not os.path.exists(cache_dir): - os.makedirs(cache_dir) + cache_dir = self.artwork_cache_dir / uuid + cache_dir.mkdir(exist_ok=True, parents=True) + file_path = (cache_dir / str(size)).with_suffix(".jpg") resized_pixbuf = self._resize_pixbuf(pixbuf, size) - file_path = os.path.join(cache_dir, str(size) + ".jpg") - if not os.path.exists(file_path): + + if not file_path.exists(): try: - resized_pixbuf.savev(file_path, "jpeg", ["quality", None], ["95"]) + resized_pixbuf.savev(str(file_path), "jpeg", ["quality", None], ["95"]) except Exception as e: reporter.warning("artwork_cache", "Failed to save resized cache albumart") - log.warning("Failed to save resized cache albumart for uuid %r: %s", gen_uuid, e) + log.warning("Failed to save resized cache albumart for uuid %r: %s", uuid, e) return resized_pixbuf - def get_album_art_path(self, book, size): + def get_album_art_path(self, book, size) -> str: query = ArtworkCacheModel.select().where(ArtworkCacheModel.book == book.id) - if query.exists(): - try: - uuid = query.first().uuid - except Exception: - reporter.error("artwork_cache", "load_pixbuf_from_cache: query exists but query.first().uuid crashed.") - return None - else: + if not query.exists(): return None - cache_dir = os.path.join(get_cache_dir(), "artwork") - cache_dir = os.path.join(cache_dir, uuid) - try: - if os.path.exists(cache_dir): - file_path = os.path.join(cache_dir, str(size) + ".jpg") - if os.path.exists(file_path): - return file_path - else: - return None - except Exception as e: - log.warning(e) + uuid = query.first().uuid + except Exception: + reporter.error( + "artwork_cache", + "load_pixbuf_from_cache: query exists but query.first().uuid crashed.", + ) return None + cache_dir = self.artwork_cache_dir / uuid + + if cache_dir.is_dir(): + file_path = (cache_dir / str(size)).with_suffix(".jpg") + if file_path.exists(): + return str(file_path) + return None def _load_pixbuf_from_cache(self, book, size): @@ -139,92 +134,71 @@ def _load_cover_pixbuf(self, book, app_settings: ApplicationSettings): :param size: The size of the bigger side in pixels :return: pixbuf object containing the cover """ - pixbuf = None + loading_order = [self._load_pixbuf_from_db, self._load_pixbuf_from_file] if app_settings.prefer_external_cover: - pixbuf = self._load_pixbuf_from_file(book) - - if pixbuf is None: - pixbuf = self._load_pixbuf_from_db(book) - else: - pixbuf = self._load_pixbuf_from_db(book) + loading_order.reverse() - if pixbuf is None: - pixbuf = self._load_pixbuf_from_file(book) + for loader in loading_order: + pixbuf = loader(book) + if pixbuf: + return pixbuf - return pixbuf + return None def _load_pixbuf_from_db(self, book): - pixbuf = None - - if book and book.cover: - try: - loader = GdkPixbuf.PixbufLoader.new() - loader.write(book.cover) - loader.close() - pixbuf = loader.get_pixbuf() - except Exception as e: - reporter.warning("artwork_cache", "Could not get book cover from db.") - log.warning("Could not get cover for book %r: %s", book.name, e) + if not book or not book.cover: + return None - return pixbuf + try: + loader = GdkPixbuf.PixbufLoader.new() + loader.write(book.cover) + loader.close() + except Exception as e: + reporter.warning("artwork_cache", "Could not get book cover from db.") + log.warning("Could not get cover for book %r: %s", book.name, e) + else: + return loader.get_pixbuf() def _resize_pixbuf(self, pixbuf, size): """ Resizes an pixbuf and keeps the aspect ratio. :return: Resized pixbuf. """ - resized_pixbuf = pixbuf + if size == 0: + return pixbuf - if size > 0: - if pixbuf.get_height() > pixbuf.get_width(): - width = int(pixbuf.get_width() / (pixbuf.get_height() / size)) - resized_pixbuf = pixbuf.scale_simple( - width, size, GdkPixbuf.InterpType.BILINEAR) - else: - height = int(pixbuf.get_height() / (pixbuf.get_width() / size)) - resized_pixbuf = pixbuf.scale_simple( - size, height, GdkPixbuf.InterpType.BILINEAR) - - return resized_pixbuf + if pixbuf.get_height() > pixbuf.get_width(): + width = int(pixbuf.get_width() / (pixbuf.get_height() / size)) + return pixbuf.scale_simple(width, size, GdkPixbuf.InterpType.BILINEAR) + else: + height = int(pixbuf.get_height() / (pixbuf.get_width() / size)) + return pixbuf.scale_simple(size, height, GdkPixbuf.InterpType.BILINEAR) - def _load_pixbuf_from_file(self, book): + def _load_pixbuf_from_file(self, book) -> GdkPixbuf.Pixbuf | None: """ Try to load the artwork from a book from image files. :param book: The book to load the artwork from. :return: Artwork as pixbuf object. """ - pixbuf = None - cover_files = [] - try: - directory = os.path.dirname(os.path.normpath(book.chapters[0].file)) + directory = Path(book.chapters[0].file).absolute().parent + cover_extensions = {".jpg", ".jpeg", ".png", ".gif"} + + for path in directory.glob("cover.*"): + if path.suffix.lower() not in cover_extensions: + continue - cover_files = [f for f in os.listdir(directory) - if f.lower().endswith('.png') or f.lower().endswith(".jpg") or f.lower().endswith(".gif")] - except Exception as e: - log.warning("Could not open audiobook directory and look for cover files: %s", e) - for elem in (x for x in cover_files if os.path.splitext(x.lower())[0] == "cover"): - # find cover.[jpg,png,gif] try: - pixbuf = GdkPixbuf.Pixbuf.new_from_file(os.path.join(directory, elem)) + pixbuf = GdkPixbuf.Pixbuf.new_from_file(str(path)) except Exception as e: log.debug(e) + if pixbuf: - break - if pixbuf is None: - # find other cover file (sort alphabet) - cover_files.sort(key=str.lower) - for elem in cover_files: - try: - pixbuf = GdkPixbuf.Pixbuf.new_from_file(os.path.join(directory, elem)) - except Exception as e: - log.debug(e) - if pixbuf: - break - return pixbuf + return pixbuf + + return None def _on_app_setting_changed(self, event: str, data): if event == "prefer-external-cover": self.delete_artwork_cache() - diff --git a/cozy/media/importer.py b/cozy/media/importer.py index d6e044af..aa20d6cc 100644 --- a/cozy/media/importer.py +++ b/cozy/media/importer.py @@ -4,6 +4,7 @@ import time from enum import Enum, auto from multiprocessing.pool import Pool as Pool +from pathlib import Path from urllib.parse import unquote, urlparse from cozy.architecture.event_sender import EventSender @@ -22,6 +23,8 @@ CHUNK_SIZE = 100 +AUDIO_EXTENSIONS = {".mp3", ".ogg", ".flac", ".m4a", ".m4b", ".mp4", ".wav", ".opus"} + class ScanStatus(Enum): STARTED = auto() @@ -109,13 +112,15 @@ def _execute_import(self, files_to_scan: list[str]) -> tuple[set[str], set[str]] self._progress += CHUNK_SIZE - if len(media_files) != 0: + if media_files: try: self._database_importer.insert_many(media_files) except Exception as e: log.exception("Error while inserting new tracks to the database") reporter.exception("importer", e) - self._toast.show("{}: {}".format(_("Error while importing new files"), str(e.__class__))) + self._toast.show( + "{}: {}".format(_("Error while importing new files"), str(e.__class__)) + ) if self._progress >= self._files_count: break @@ -150,10 +155,9 @@ def _get_files_to_scan(self) -> list[str]: def _get_configured_storage_paths(self) -> list[str]: """From all storage path configured by the user, we only want to scan those paths that are currently online and exist.""" - paths = [storage.path - for storage - in self._settings.storage_locations - if not storage.external] + paths = [ + storage.path for storage in self._settings.storage_locations if not storage.external + ] for storage in self._settings.external_storage_locations: try: @@ -164,13 +168,12 @@ def _get_configured_storage_paths(self) -> list[str]: return [path for path in paths if os.path.exists(path)] - def _walk_paths_to_scan(self, paths: list[str]) -> list[str]: + def _walk_paths_to_scan(self, directories: list[str]) -> list[str]: """Get all files recursive inside a directory. Returns absolute paths.""" - for path in paths: - for directory, _, files in os.walk(path): - for file in files: - filepath = os.path.join(directory, file) - yield filepath + for dir in directories: + for path in Path(dir).rglob("**/*"): + if path.suffix.lower() in AUDIO_EXTENSIONS: + yield str(path) def _filter_unchanged_files(self, files: list[str]) -> list[str]: """Filter all files that are already imported and that have not changed from a list of paths.""" @@ -179,10 +182,9 @@ def _filter_unchanged_files(self, files: list[str]) -> list[str]: for file in files: if file in imported_files: try: - chapter = next(chapter - for chapter - in self._library.chapters - if chapter.file == file) + chapter = next( + chapter for chapter in self._library.chapters if chapter.file == file + ) except StopIteration as e: log.warning("_filter_unchanged_files raised a stop iteration.") log.debug(e) diff --git a/cozy/media/media_detector.py b/cozy/media/media_detector.py index 63a894cb..e659ee29 100644 --- a/cozy/media/media_detector.py +++ b/cozy/media/media_detector.py @@ -21,44 +21,22 @@ class AudioFileCouldNotBeDiscovered(Exception): class MediaDetector(EventSender): def __init__(self, path: str): super().__init__() - self.uri = pathlib.Path(path).as_uri() + self.uri = pathlib.Path(path).absolute().as_uri() Gst.init(None) self.discoverer: GstPbutils.Discoverer = GstPbutils.Discoverer() def get_media_data(self) -> MediaFile: - if not self._has_audio_file_ending(): - raise NotAnAudioFile - try: - discoverer_info: GstPbutils.DiscovererInfo = self.discoverer.discover_uri(self.uri) + discoverer_info = self.discoverer.discover_uri(self.uri) except Exception: log.info("Skipping file because it couldn't be detected: %s", self.uri) raise AudioFileCouldNotBeDiscovered(self.uri) from None - is_valid_audio_file = self._is_valid_audio_file(discoverer_info) - if is_valid_audio_file: - tag_reader = TagReader(self.uri, discoverer_info) - tags = tag_reader.get_tags() - return tags + if self._is_valid_audio_file(discoverer_info): + return TagReader(self.uri, discoverer_info).get_tags() else: raise AudioFileCouldNotBeDiscovered(self.uri) - def _is_valid_audio_file(self, discoverer_info: GstPbutils.DiscovererInfo): - audio_streams = discoverer_info.get_audio_streams() - video_streams = discoverer_info.get_video_streams() - - if len(audio_streams) < 1: - log.info("File contains no audio stream.") - return False - elif len(audio_streams) > 1: - log.info("File contains more than one audio stream.") - return False - elif len(video_streams) > 0: - log.info("File contains a video stream.") - return False - - return True - - def _has_audio_file_ending(self) -> bool: - return self.uri.lower().endswith(('.mp3', '.ogg', '.flac', '.m4a', '.m4b', '.mp4', '.wav', '.opus')) + def _is_valid_audio_file(self, info: GstPbutils.DiscovererInfo): + return len(info.get_audio_streams()) == 1 and not info.get_video_streams() diff --git a/cozy/media/tag_reader.py b/cozy/media/tag_reader.py index 5b1b8c0a..4263eba8 100644 --- a/cozy/media/tag_reader.py +++ b/cozy/media/tag_reader.py @@ -8,7 +8,7 @@ from cozy.media.chapter import Chapter from cozy.media.media_file import MediaFile -NS_TO_SEC = 10 ** 9 +NS_TO_SEC = 10**9 class TagReader: @@ -20,7 +20,7 @@ def __init__(self, uri: str, discoverer_info: GstPbutils.DiscovererInfo): raise ValueError("discoverer_info must not be None") self.uri: str = uri - self.discoverer_info: GstPbutils.DiscovererInfo = discoverer_info + self.discoverer_info = discoverer_info self.tags: Gst.TagList = discoverer_info.get_tags() @@ -36,7 +36,7 @@ def get_tags(self) -> MediaFile: disk=self._get_disk(), chapters=self._get_chapters(), cover=self._get_cover(), - modified=self._get_modified() + modified=self._get_modified(), ) return media_file @@ -100,7 +100,7 @@ def _get_single_chapter(self): name=self._get_track_name(), position=0, length=self._get_length_in_seconds(), - number=self._get_track_number() + number=self._get_track_number(), ) return [chapter] @@ -151,12 +151,14 @@ def _get_m4b_chapters(self, mutagen_tags: MP4) -> list[Chapter]: title = chapter.title or "" - chapters.append(Chapter( - name=title, - position=int(chapter.start * NS_TO_SEC), - length=length, - number=index + 1 - )) + chapters.append( + Chapter( + name=title, + position=int(chapter.start * NS_TO_SEC), + length=length, + number=index + 1, + ) + ) return chapters diff --git a/test/cozy/media/test_media_detector.py b/test/cozy/media/test_media_detector.py deleted file mode 100644 index a7159cfa..00000000 --- a/test/cozy/media/test_media_detector.py +++ /dev/null @@ -1,15 +0,0 @@ -def test_get_media_data_should_work_with_valid_audio_files(mocker): - from cozy.media.media_detector import MediaDetector - - mocker.patch("gi.repository.GstPbutils.Discoverer") - mocker.patch("gi.repository.Gst.init") - - file_extensions = ['.mp3', '.ogg', '.flac', '.m4a', '.m4b', '.mp4', '.wav', '.opus'] - - for extension in file_extensions: - md = MediaDetector("/test.{}".format(extension)) - assert md._has_audio_file_ending() - - for extension in file_extensions: - md = MediaDetector("/test.{}".format(extension.upper())) - assert md._has_audio_file_ending()