From a990ff46d9434cb93b2610ab8350b82b35c24ae6 Mon Sep 17 00:00:00 2001 From: farihdzkyy <78370692+farihdzkyy@users.noreply.github.com> Date: Tue, 7 Jan 2025 22:12:44 +0700 Subject: [PATCH] VERSION 2025.01.08 NEW UPDATE AVAILABLE. I just make episode downloader :D --- deezspot/__init__.py | 0 deezspot/__taggers__.py | 262 +++++++++ deezspot/deezloader/__download__.py | 672 ++++++++++++++++++++++ deezspot/deezloader/__download_utils__.py | 97 ++++ deezspot/deezloader/__init__.py | 574 ++++++++++++++++++ deezspot/deezloader/__utils__.py | 48 ++ deezspot/deezloader/dee_api.py | 294 ++++++++++ deezspot/deezloader/deegw_api.py | 322 +++++++++++ deezspot/deezloader/deezer_settings.py | 24 + deezspot/easy_spoty.py | 77 +++ deezspot/exceptions.py | 77 +++ deezspot/libutils/__init__.py | 0 deezspot/libutils/others_settings.py | 27 + deezspot/libutils/utils.py | 251 ++++++++ deezspot/models/__init__.py | 8 + deezspot/models/album.py | 20 + deezspot/models/episode.py | 34 ++ deezspot/models/playlist.py | 8 + deezspot/models/preferences.py | 15 + deezspot/models/smart.py | 13 + deezspot/models/track.py | 37 ++ deezspot/spotloader/__download__.py | 496 ++++++++++++++++ deezspot/spotloader/__init__.py | 330 +++++++++++ deezspot/spotloader/__spo_api__.py | 153 +++++ deezspot/spotloader/spotify_settings.py | 26 + 25 files changed, 3865 insertions(+) create mode 100644 deezspot/__init__.py create mode 100644 deezspot/__taggers__.py create mode 100644 deezspot/deezloader/__download__.py create mode 100644 deezspot/deezloader/__download_utils__.py create mode 100644 deezspot/deezloader/__init__.py create mode 100644 deezspot/deezloader/__utils__.py create mode 100644 deezspot/deezloader/dee_api.py create mode 100644 deezspot/deezloader/deegw_api.py create mode 100644 deezspot/deezloader/deezer_settings.py create mode 100644 deezspot/easy_spoty.py create mode 100644 deezspot/exceptions.py create mode 100644 deezspot/libutils/__init__.py create mode 100644 deezspot/libutils/others_settings.py create mode 100644 deezspot/libutils/utils.py create mode 100644 deezspot/models/__init__.py create mode 100644 deezspot/models/album.py create mode 100644 deezspot/models/episode.py create mode 100644 deezspot/models/playlist.py create mode 100644 deezspot/models/preferences.py create mode 100644 deezspot/models/smart.py create mode 100644 deezspot/models/track.py create mode 100644 deezspot/spotloader/__download__.py create mode 100644 deezspot/spotloader/__init__.py create mode 100644 deezspot/spotloader/__spo_api__.py create mode 100644 deezspot/spotloader/spotify_settings.py diff --git a/deezspot/__init__.py b/deezspot/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/deezspot/__taggers__.py b/deezspot/__taggers__.py new file mode 100644 index 0000000..14a94b9 --- /dev/null +++ b/deezspot/__taggers__.py @@ -0,0 +1,262 @@ +#!/usr/bin/python3 + +from base64 import b64encode +from mutagen.flac import FLAC, Picture +from mutagen.oggvorbis import OggVorbis +from deezspot.models import Track, Episode +import requests + +def request(url): + response = requests.get(url) + response.raise_for_status() + return response + +from mutagen.id3 import ( + ID3NoHeaderError, + ID3, APIC, USLT, SYLT, + COMM, TSRC, TRCK, TIT2, + TLEN, TEXT, TCON, TALB, TBPM, + TPE1, TYER, TDAT, TPOS, TPE2, + TPUB, TCOP, TXXX, TCOM, IPLS +) + +def __write_flac(song, data): + tag = FLAC(song) + tag.delete() + images = Picture() + images.type = 3 + images.data = data['image'] + tag.clear_pictures() + tag.add_picture(images) + tag['lyrics'] = data['lyric'] + tag['artist'] = data['artist'] + tag['title'] = data['music'] + tag['date'] = f"{data['year'].year}/{data['year'].month}/{data['year'].day}" + tag['album'] = data['album'] + tag['tracknumber'] = f"{data['tracknum']}" + tag['discnumber'] = f"{data['discnum']}" + tag['genre'] = data['genre'] + tag['albumartist'] = data['ar_album'] + tag['author'] = data['author'] + tag['composer'] = data['composer'] + tag['copyright'] = data['copyright'] + tag['bpm'] = f"{data['bpm']}" + tag['length'] = f"{data['duration']}" + tag['organization'] = data['label'] + tag['isrc'] = data['isrc'] + tag['lyricist'] = data['lyricist'] + tag['version'] = data['version'] + tag.save() + +def __write_mp3(song, data): + try: + audio = ID3(song) + audio.delete() + except ID3NoHeaderError: + audio = ID3() + + audio.add( + APIC( + mime = "image/jpeg", + type = 3, + desc = "album front cover", + data = data['image'] + ) + ) + + audio.add( + COMM( + lang = "eng", + desc = "my comment", + text = "DO NOT USE FOR YOUR OWN EARNING" + ) + ) + + audio.add( + USLT( + text = data['lyric'] + ) + ) + + audio.add( + SYLT( + type = 1, + format = 2, + desc = "sync lyric song", + text = data['lyric_sync'] + ) + ) + + audio.add( + TSRC( + text = data['isrc'] + ) + ) + + audio.add( + TRCK( + text = f"{data['tracknum']}/{data['nb_tracks']}" + ) + ) + + audio.add( + TIT2( + text = data['music'] + ) + ) + + audio.add( + TLEN( + text = f"{data['duration']}" + ) + ) + + audio.add( + TEXT( + text = data['lyricist'] + ) + ) + + audio.add( + TCON( + text = data['genre'] + ) + ) + + audio.add( + TALB( + text = data['album'] + ) + ) + + audio.add( + TBPM( + text = f"{data['bpm']}" + ) + ) + + audio.add( + TPE1( + text = data['artist'] + ) + ) + + audio.add( + TYER( + text = f"{data['year'].year}" + ) + ) + + audio.add( + TDAT( + text = f"{data['year'].day}{data['year'].month}" + ) + ) + + audio.add( + TPOS( + text = f"{data['discnum']}/{data['discnum']}" + ) + ) + + audio.add( + TPE2( + text = data['ar_album'] + ) + ) + + audio.add( + TPUB( + text = data['label'] + ) + ) + + audio.add( + TCOP( + text = data['copyright'] + ) + ) + + audio.add( + TXXX( + desc = "REPLAYGAIN_TRACK_GAIN", + text = f"{data['gain']}" + ) + ) + + audio.add( + TCOM( + text = data['composer'] + ) + ) + + audio.add( + IPLS( + people = [ + data['author'] + ] + ) + ) + + audio.save(song, v2_version = 3) + +def __write_ogg(song, song_metadata): + audio = OggVorbis(song) + audio.delete() + + for key in ['music', 'artist', 'album', 'tracknum', 'discnum', 'year', 'genre', 'isrc', 'description']: + if key in song_metadata: + audio[key] = str(song_metadata[key]) + + if 'image' in song_metadata: + image = Picture() + image.type = 3 + image.desc = 'Cover' + image.mime = 'image/jpeg' + + try: + if isinstance(song_metadata['image'], bytes): + image.data = song_metadata['image'] + else: + image.data = request(song_metadata['image']).content + + import base64 + img_data = base64.b64encode(image.write()) + audio['metadata_block_picture'] = [img_data.decode('utf-8')] + except Exception as e: + print(f"Warning: Could not add cover image: {str(e)}") + + audio.save() + +def write_tags(media): + if isinstance(media, Track): + song = media.song_path + elif isinstance(media, Episode): + song = media.episode_path + else: + raise ValueError("Unsupported media type") + + song_metadata = media.tags + f_format = media.file_format + + if f_format == ".flac": + __write_flac(song, song_metadata) + elif f_format == ".ogg": + __write_ogg(song, song_metadata) + else: + __write_mp3(song, song_metadata) + +def check_track(media): + if isinstance(media, Track): + song = media.song_path + elif isinstance(media, Episode): + song = media.episode_path + else: + raise ValueError("Unsupported media type") + + f_format = media.file_format + is_ok = False + + # Add your logic to check the track/episode here + + return is_ok \ No newline at end of file diff --git a/deezspot/deezloader/__download__.py b/deezspot/deezloader/__download__.py new file mode 100644 index 0000000..03751a7 --- /dev/null +++ b/deezspot/deezloader/__download__.py @@ -0,0 +1,672 @@ +from tqdm import tqdm +from deezspot.deezloader.dee_api import API +from copy import deepcopy +from os.path import isfile +import re +from pathlib import Path +import requests +import os +from deezspot.deezloader.deegw_api import API_GW +from deezspot.deezloader.deezer_settings import qualities +from deezspot.libutils.others_settings import answers +from deezspot.__taggers__ import write_tags, check_track +from deezspot.deezloader.__download_utils__ import decryptfile, gen_song_hash +from deezspot.exceptions import ( + TrackNotFound, + NoRightOnMedia, + QualityNotFound, +) +from deezspot.models import ( + Track, + Album, + Playlist, + Preferences, + Episode, +) +from deezspot.deezloader.__utils__ import ( + check_track_ids, + check_track_md5, + check_track_token, +) +from deezspot.libutils.utils import ( + set_path, + trasform_sync_lyric, + create_zip, +) + +class Download_JOB: + + @classmethod + def __get_url( + cls, + c_track: Track, + quality_download: str + ) -> dict: + if c_track.get('__TYPE__') == 'episode': + return { + "media": [{ + "sources": [{ + "url": c_track.get('EPISODE_DIRECT_STREAM_URL') + }] + }] + } + else: + c_md5, c_media_version = check_track_md5(c_track) + track_id_key = check_track_ids(c_track) + c_ids = c_track.get(track_id_key, "") + n_quality = qualities[quality_download]['n_quality'] + + if not c_md5: + raise ValueError("MD5_ORIGIN is missing") + if not c_media_version: + raise ValueError("MEDIA_VERSION is missing") + if not c_ids: + raise ValueError(f"{track_id_key} is missing") + + c_song_hash = gen_song_hash( + c_md5, n_quality, + c_ids, c_media_version + ) + + c_media_url = API_GW.get_song_url(c_md5[0], c_song_hash) + + return { + "media": [ + { + "sources": [ + { + "url": c_media_url + } + ] + } + ] + } + + @classmethod + def check_sources( + cls, + infos_dw: list, + quality_download: str + ) -> list: + medias = [] + + for track in infos_dw: + if track.get('__TYPE__') == 'episode': + media_json = cls.__get_url(track, quality_download) + medias.append(media_json) + continue + + tracks_token = [ + check_track_token(c_track) + for c_track in infos_dw + ] + + try: + medias = API_GW.get_medias_url(tracks_token, quality_download) + + for a in range( + len(medias) + ): + if "errors" in medias[a]: + c_media_json = cls.__get_url(infos_dw[a], quality_download) + medias[a] = c_media_json + else: + if not medias[a]['media']: + c_media_json = cls.__get_url(infos_dw[a], quality_download) + medias[a] = c_media_json + + elif len(medias[a]['media'][0]['sources']) == 1: + c_media_json = cls.__get_url(infos_dw[a], quality_download) + medias[a] = c_media_json + except NoRightOnMedia: + medias = [] + + for c_track in infos_dw: + c_media_json = cls.__get_url(c_track, quality_download) + medias.append(c_media_json) + + return medias + +class EASY_DW: + def __init__( + self, + infos_dw: dict, + preferences: Preferences + ) -> None: + + self.__infos_dw = infos_dw + self.__ids = preferences.ids + self.__link = preferences.link + self.__output_dir = preferences.output_dir + self.__method_save = preferences.method_save + self.__not_interface = preferences.not_interface + self.__quality_download = preferences.quality_download + self.__recursive_quality = preferences.recursive_quality + self.__recursive_download = preferences.recursive_download + + if self.__infos_dw.get('__TYPE__') == 'episode': + self.__song_metadata = { + 'music': self.__infos_dw.get('EPISODE_TITLE', ''), + 'artist': self.__infos_dw.get('SHOW_NAME', ''), + 'album': self.__infos_dw.get('SHOW_NAME', ''), + 'date': self.__infos_dw.get('EPISODE_PUBLISHED_TIMESTAMP', '').split()[0], + 'genre': 'Podcast', + 'explicit': self.__infos_dw.get('SHOW_IS_EXPLICIT', '2'), + 'disc': 1, + 'track': 1, + 'duration': int(self.__infos_dw.get('DURATION', 0)), + 'isrc': None + } + else: + self.__song_metadata = preferences.song_metadata + + self.__c_quality = qualities[self.__quality_download] + self.__fallback_ids = self.__ids + + self.__set_quality() + self.__write_track() + + def __set_quality(self) -> None: + self.__file_format = self.__c_quality['f_format'] + self.__song_quality = self.__c_quality['s_quality'] + + def __set_song_path(self) -> None: + self.__song_path = set_path( + self.__song_metadata, + self.__output_dir, + self.__song_quality, + self.__file_format, + self.__method_save + ) + + def __write_track(self) -> None: + self.__set_song_path() + + self.__c_track = Track( + self.__song_metadata, self.__song_path, + self.__file_format, self.__song_quality, + self.__link, self.__ids + ) + + self.__c_track.set_fallback_ids(self.__fallback_ids) + + def easy_dw(self) -> Track: + if self.__infos_dw.get('__TYPE__') == 'episode': + pic = self.__infos_dw.get('EPISODE_IMAGE_MD5', '') + else: + pic = self.__infos_dw['ALB_PICTURE'] + image = API.choose_img(pic) + self.__song_metadata['image'] = image + song = f"{self.__song_metadata['music']} - {self.__song_metadata['artist']}" + + if not self.__not_interface: + print(f"Downloading: {song}") + + try: + if self.__infos_dw.get('__TYPE__') == 'episode': + try: + return self.download_episode_try() + except Exception as e: + self.__c_track.success = False + raise e + else: + self.download_try() + except TrackNotFound: + try: + self.__fallback_ids = API.not_found(song, self.__song_metadata['music']) + self.__infos_dw = API_GW.get_song_data(self.__fallback_ids) + + media = Download_JOB.check_sources( + [self.__infos_dw], self.__quality_download + ) + + self.__infos_dw['media_url'] = media[0] + self.download_try() + except TrackNotFound: + self.__c_track = Track( + self.__song_metadata, + None, None, + None, None, None, + ) + + self.__c_track.success = False + + self.__c_track.md5_image = pic + + return self.__c_track + + def download_try(self) -> Track: + if isfile(self.__song_path) and check_track(self.__c_track): + if self.__recursive_download: + return self.__c_track + + ans = input( + f"Track \"{self.__song_path}\" already exists, do you want to redownload it?(y or n):" + ) + + if not ans in answers: + return self.__c_track + + try: + media_list = self.__infos_dw['media_url']['media'] + song_link = media_list[0]['sources'][0]['url'] + + try: + crypted_audio = API_GW.song_exist(song_link) + except TrackNotFound: + song = self.__song_metadata['music'] + artist = self.__song_metadata['artist'] + + if self.__file_format == '.flac': + print(f"\n⚠ {song} - {artist} is not available in FLAC format. Trying MP3...") + self.__quality_download = 'MP3_320' + self.__file_format = '.mp3' + self.__song_path = self.__song_path.rsplit('.', 1)[0] + '.mp3' + + media = Download_JOB.check_sources( + [self.__infos_dw], 'MP3_320' + ) + + if media: + self.__infos_dw['media_url'] = media[0] + song_link = media[0]['media'][0]['sources'][0]['url'] + crypted_audio = API_GW.song_exist(song_link) + else: + raise TrackNotFound(f"Track {song} - {artist} not available") + + else: + msg = f"\n⚠ The {song} - {artist} can't be downloaded in {self.__quality_download} quality :( ⚠\n" + + if not self.__recursive_quality: + raise QualityNotFound(msg=msg) + + print(msg) + + for c_quality in qualities: + if self.__quality_download == c_quality: + continue + + print(f"🛈 Trying to download {song} - {artist} in {c_quality}") + + media = Download_JOB.check_sources( + [self.__infos_dw], c_quality + ) + + if media: + self.__infos_dw['media_url'] = media[0] + song_link = media[0]['media'][0]['sources'][0]['url'] + try: + crypted_audio = API_GW.song_exist(song_link) + self.__c_quality = qualities[c_quality] + self.__set_quality() + break + except TrackNotFound: + if c_quality == "MP3_128": + raise TrackNotFound(f"Error with {song} - {artist}", self.__link) + continue + + c_crypted_audio = crypted_audio.iter_content(2048) + self.__fallback_ids = check_track_ids(self.__infos_dw) + + try: + self.__write_track() + decryptfile( + c_crypted_audio, self.__fallback_ids, self.__song_path + ) + self.__add_more_tags() + write_tags(self.__c_track) + except Exception as e: + if isfile(self.__song_path): + os.remove(self.__song_path) + raise TrackNotFound(f"Failed to process {self.__song_path}: {str(e)}") + + return self.__c_track + + except Exception as e: + raise TrackNotFound(self.__link) from e + + def download_episode_try(self) -> Episode: + try: + direct_url = self.__infos_dw.get('EPISODE_DIRECT_STREAM_URL') + if not direct_url: + raise TrackNotFound("No direct stream URL found") + + os.makedirs(os.path.dirname(self.__song_path), exist_ok=True) + + response = requests.get(direct_url, stream=True) + print(direct_url) + response.raise_for_status() + + total_size = int(response.headers.get('content-length', 0)) + + with open(self.__song_path, 'wb') as f: + with tqdm( + total=total_size, + unit='iB', + unit_scale=True, + desc=f"Downloading {self.__song_metadata['music']}" + ) as pbar: + for chunk in response.iter_content(chunk_size=8192): + size = f.write(chunk) + pbar.update(size) + + self.__c_track.success = True + write_tags(self.__c_track) + return self.__c_track + + except Exception as e: + if isfile(self.__song_path): + os.remove(self.__song_path) + self.__c_track.success = False + raise TrackNotFound(f"Episode download failed: {str(e)}") + + + def __add_more_tags(self) -> None: + contributors = self.__infos_dw.get('SNG_CONTRIBUTORS', {}) + + if "author" in contributors: + self.__song_metadata['author'] = " & ".join( + contributors['author'] + ) + else: + self.__song_metadata['author'] = "" + + if "composer" in contributors: + self.__song_metadata['composer'] = " & ".join( + contributors['composer'] + ) + else: + self.__song_metadata['composer'] = "" + + if "lyricist" in contributors: + self.__song_metadata['lyricist'] = " & ".join( + contributors['lyricist'] + ) + else: + self.__song_metadata['lyricist'] = "" + + if "composerlyricist" in contributors: + self.__song_metadata['composer'] = " & ".join( + contributors['composerlyricist'] + ) + else: + self.__song_metadata['composerlyricist'] = "" + + if "version" in self.__infos_dw: + self.__song_metadata['version'] = self.__infos_dw['VERSION'] + else: + self.__song_metadata['version'] = "" + + self.__song_metadata['lyric'] = "" + self.__song_metadata['copyright'] = "" + self.__song_metadata['lyricist'] = "" + self.__song_metadata['lyric_sync'] = [] + + if self.__infos_dw.get('LYRICS_ID', 0) != 0: + need = API_GW.get_lyric(self.__ids) + + if "LYRICS_SYNC_JSON" in need: + self.__song_metadata['lyric_sync'] = trasform_sync_lyric( + need['LYRICS_SYNC_JSON'] + ) + + self.__song_metadata['lyric'] = need['LYRICS_TEXT'] + self.__song_metadata['copyright'] = need['LYRICS_COPYRIGHTS'] + self.__song_metadata['lyricist'] = need['LYRICS_WRITERS'] + +class DW_TRACK: + def __init__( + self, + preferences: Preferences + ) -> None: + + self.__preferences = preferences + self.__ids = self.__preferences.ids + self.__song_metadata = self.__preferences.song_metadata + self.__quality_download = self.__preferences.quality_download + + def dw(self) -> Track: + infos_dw = API_GW.get_song_data(self.__ids) + + print(infos_dw) + + media = Download_JOB.check_sources( + [infos_dw], self.__quality_download + ) + + infos_dw['media_url'] = media[0] + + track = EASY_DW(infos_dw, self.__preferences).easy_dw() + + if not track.success: + song = f"{self.__song_metadata['music']} - {self.__song_metadata['artist']}" + error_msg = f"Cannot download {song}, maybe it's not available in this format?" + + raise TrackNotFound(message = error_msg) + + return track + +class DW_ALBUM: + def __init__( + self, + preferences: Preferences + ) -> None: + + self.__preferences = preferences + self.__ids = self.__preferences.ids + self.__make_zip = self.__preferences.make_zip + self.__output_dir = self.__preferences.output_dir + self.__method_save = self.__preferences.method_save + self.__song_metadata = self.__preferences.song_metadata + self.__not_interface = self.__preferences.not_interface + self.__quality_download = self.__preferences.quality_download + + self.__song_metadata_items = self.__song_metadata.items() + + def dw(self) -> Album: + infos_dw = API_GW.get_album_data(self.__ids)['data'] + md5_image = infos_dw[0]['ALB_PICTURE'] + image = API.choose_img(md5_image) + self.__song_metadata['image'] = image + + album = Album(self.__ids) + album.image = image + album.md5_image = md5_image + album.nb_tracks = self.__song_metadata['nb_tracks'] + album.album_name = self.__song_metadata['album'] + album.upc = self.__song_metadata['upc'] + tracks = album.tracks + album.tags = self.__song_metadata + + medias = Download_JOB.check_sources( + infos_dw, self.__quality_download + ) + + c_song_metadata = {} + + for key, item in self.__song_metadata_items: + if type(item) is not list: + c_song_metadata[key] = self.__song_metadata[key] + + t = tqdm( + range( + len(infos_dw) + ), + desc = c_song_metadata['album'], + disable = self.__not_interface + ) + + for a in t: + for key, item in self.__song_metadata_items: + if type(item) is list: + c_song_metadata[key] = self.__song_metadata[key][a] + + c_infos_dw = infos_dw[a] + c_infos_dw['media_url'] = medias[a] + song = f"{c_song_metadata['music']} - {c_song_metadata['artist']}" + t.set_description_str(song) + c_preferences = deepcopy(self.__preferences) + c_preferences.song_metadata = c_song_metadata.copy() + c_preferences.ids = c_infos_dw['SNG_ID'] + c_preferences.link = f"https://deezer.com/track/{c_preferences.ids}" + + try: + track = EASY_DW(c_infos_dw, c_preferences).download_try() + except TrackNotFound: + try: + ids = API.not_found(song, c_song_metadata['music']) + c_infos_dw = API_GW.get_song_data(ids) + + c_media = Download_JOB.check_sources( + [c_infos_dw], self.__quality_download + ) + + c_infos_dw['media_url'] = c_media[0] + + track = EASY_DW(c_infos_dw, c_preferences).download_try() + except TrackNotFound: + track = Track( + c_song_metadata, + None, None, + None, None, None, + ) + + track.success = False + print(f"Track not found: {song} :(") + + tracks.append(track) + + if self.__make_zip: + song_quality = tracks[0].quality + + zip_name = create_zip( + tracks, + output_dir = self.__output_dir, + song_metadata = self.__song_metadata, + song_quality = song_quality, + method_save = self.__method_save + ) + + album.zip_path = zip_name + + return album + +class DW_PLAYLIST: + def __init__( + self, + preferences: Preferences + ) -> None: + + self.__preferences = preferences + self.__ids = self.__preferences.ids + self.__json_data = preferences.json_data + self.__make_zip = self.__preferences.make_zip + self.__output_dir = self.__preferences.output_dir + self.__song_metadata = self.__preferences.song_metadata + self.__quality_download = self.__preferences.quality_download + + def dw(self) -> Playlist: + infos_dw = API_GW.get_playlist_data(self.__ids)['data'] + + playlist = Playlist() + tracks = playlist.tracks + + medias = Download_JOB.check_sources( + infos_dw, self.__quality_download + ) + + for c_infos_dw, c_media, c_song_metadata in zip( + infos_dw, medias, self.__song_metadata + ): + if type(c_song_metadata) is str: + print(f"Track not found {c_song_metadata} :(") + continue + + c_infos_dw['media_url'] = c_media + c_preferences = deepcopy(self.__preferences) + c_preferences.ids = c_infos_dw['SNG_ID'] + c_preferences.song_metadata = c_song_metadata + + track = EASY_DW(c_infos_dw, c_preferences).easy_dw() + + if not track.success: + song = f"{c_song_metadata['music']} - {c_song_metadata['artist']}" + print(f"Cannot download {song}") + + tracks.append(track) + + if self.__make_zip: + playlist_title = self.__json_data['title'] + zip_name = f"{self.__output_dir}/{playlist_title} [playlist {self.__ids}]" + create_zip(tracks, zip_name = zip_name) + playlist.zip_path = zip_name + + return playlist + +class DW_EPISODE: + def __init__( + self, + preferences: Preferences + ) -> None: + self.__preferences = preferences + self.__ids = preferences.ids + self.__output_dir = preferences.output_dir + self.__method_save = preferences.method_save + self.__not_interface = preferences.not_interface + self.__quality_download = preferences.quality_download + + def __sanitize_filename(self, filename: str) -> str: + return re.sub(r'[<>:"/\\|?*]', '', filename)[:200] + + def dw(self) -> Track: + infos_dw = API_GW.get_episode_data(self.__ids) + infos_dw['__TYPE__'] = 'episode' + + self.__preferences.song_metadata = { + 'music': infos_dw.get('EPISODE_TITLE', ''), + 'artist': infos_dw.get('SHOW_NAME', ''), + 'album': infos_dw.get('SHOW_NAME', ''), + 'date': infos_dw.get('EPISODE_PUBLISHED_TIMESTAMP', '').split()[0], + 'genre': 'Podcast', + 'explicit': infos_dw.get('SHOW_IS_EXPLICIT', '2'), + 'duration': int(infos_dw.get('DURATION', 0)), + } + + try: + direct_url = infos_dw.get('EPISODE_DIRECT_STREAM_URL') + if not direct_url: + raise TrackNotFound("No direct URL found") + + safe_filename = self.__sanitize_filename(self.__preferences.song_metadata['music']) + Path(self.__output_dir).mkdir(parents=True, exist_ok=True) + output_path = os.path.join(self.__output_dir, f"{safe_filename}.mp3") + + response = requests.get(direct_url, stream=True) + total_size = int(response.headers.get('content-length', 0)) + + with open(output_path, 'wb') as f: + with tqdm( + total=total_size, + unit='iB', + unit_scale=True, + desc=f"Downloading {self.__preferences.song_metadata['music']}" + ) as pbar: + for chunk in response.iter_content(8192): + size = f.write(chunk) + pbar.update(size) + + episode = Track( + self.__preferences.song_metadata, + output_path, + '.mp3', + self.__quality_download, + f"https://www.deezer.com/episode/{self.__ids}", + self.__ids + ) + episode.success = True + return episode + + except Exception as e: + if 'output_path' in locals() and os.path.exists(output_path): + os.remove(output_path) + raise TrackNotFound(f"Episode download failed: {str(e)}") \ No newline at end of file diff --git a/deezspot/deezloader/__download_utils__.py b/deezspot/deezloader/__download_utils__.py new file mode 100644 index 0000000..9cbd1d7 --- /dev/null +++ b/deezspot/deezloader/__download_utils__.py @@ -0,0 +1,97 @@ +#!/usr/bin/python3 + +from hashlib import md5 as __md5 + +from binascii import ( + a2b_hex as __a2b_hex, + b2a_hex as __b2a_hex +) + +from Crypto.Cipher.Blowfish import ( + new as __newBlowfish, + MODE_CBC as __MODE_CBC +) + +from Crypto.Cipher.AES import ( + new as __newAES, + MODE_ECB as __MODE_ECB +) + +__secret_key = "g4el58wc0zvf9na1" +__secret_key2 = b"jo6aey6haid2Teih" +__idk = __a2b_hex("0001020304050607") + +def md5hex(data: str): + hashed = __md5( + data.encode() + ).hexdigest() + + return hashed + +def gen_song_hash(md5, quality, ids, media): + data = b"\xa4".join( + a.encode() + for a in [ + md5, quality, ids, media + ] + if a + ) + + hashed = ( + __md5(data) + .hexdigest() + .encode() + ) + + data = b"\xa4".join( + [hashed, data] + ) + b"\xa4" + + if len(data) % 16: + data += b"\x00" * (16 - len(data) % 16) + + c = __newAES(__secret_key2, __MODE_ECB) + + media_url = __b2a_hex( + c.encrypt(data) + ).decode() + + return media_url + +def __calcbfkey(songid): + h = md5hex(songid) + + bfkey = "".join( + chr( + ord(h[i]) ^ ord(h[i + 16]) ^ ord(__secret_key[i]) + ) + + for i in range(16) + ) + + return bfkey + +def __blowfishDecrypt(data, key): + c = __newBlowfish( + key.encode(), __MODE_CBC, __idk + ) + + return c.decrypt(data) + +def decryptfile(content, key, name): + key = __calcbfkey(key) + decrypted_audio = open(name, "wb") + seg = 0 + + for data in content: + if ( + (seg % 3) == 0 + ) and ( + len(data) == 2048 + ): + data = __blowfishDecrypt(data, key) + + decrypted_audio.write(data) + seg += 1 + + decrypted_audio.close() \ No newline at end of file diff --git a/deezspot/deezloader/__init__.py b/deezspot/deezloader/__init__.py new file mode 100644 index 0000000..9daa19e --- /dev/null +++ b/deezspot/deezloader/__init__.py @@ -0,0 +1,574 @@ +#!/usr/bin/python3 +from deezspot.deezloader.dee_api import API +from deezspot.easy_spoty import Spo +from deezspot.deezloader.deegw_api import API_GW +from deezspot.deezloader.deezer_settings import stock_quality +from deezspot.models import ( + Track, + Album, + Playlist, + Preferences, + Smart, + Episode, +) +from deezspot.deezloader.__download__ import ( + DW_TRACK, + DW_ALBUM, + DW_PLAYLIST, + DW_EPISODE, +) +from deezspot.exceptions import ( + InvalidLink, + TrackNotFound, + NoDataApi, + AlbumNotFound, +) +from deezspot.libutils.utils import ( + create_zip, + get_ids, + link_is_valid, + what_kind, +) +from deezspot.libutils.others_settings import ( + stock_output, + stock_recursive_quality, + stock_recursive_download, + stock_not_interface, + stock_zip, + method_save, +) + +Spo() +API() + +class DeeLogin: + def __init__( + self, + arl = None, + email = None, + password = None + ) -> None: + + if arl: + self.__gw_api = API_GW(arl = arl) + else: + self.__gw_api = API_GW( + email = email, + password = password + ) + + def download_trackdee( + self, link_track, + output_dir = stock_output, + quality_download = stock_quality, + recursive_quality = stock_recursive_quality, + recursive_download = stock_recursive_download, + not_interface = stock_not_interface, + method_save = method_save + ) -> Track: + + link_is_valid(link_track) + ids = get_ids(link_track) + + try: + song_metadata = API.tracking(ids) + except NoDataApi: + infos = self.__gw_api.get_song_data(ids) + + if not "FALLBACK" in infos: + raise TrackNotFound(link_track) + + ids = infos['FALLBACK']['SNG_ID'] + song_metadata = API.tracking(ids) + + preferences = Preferences() + + preferences.link = link_track + preferences.song_metadata = song_metadata + preferences.quality_download = quality_download + preferences.output_dir = output_dir + preferences.ids = ids + preferences.recursive_quality = recursive_quality + preferences.recursive_download = recursive_download + preferences.not_interface = not_interface + preferences.method_save = method_save + + track = DW_TRACK(preferences).dw() + + return track + + def download_albumdee( + self, link_album, + output_dir = stock_output, + quality_download = stock_quality, + recursive_quality = stock_recursive_quality, + recursive_download = stock_recursive_download, + not_interface = stock_not_interface, + make_zip = stock_zip, + method_save = method_save + ) -> Album: + + link_is_valid(link_album) + ids = get_ids(link_album) + + try: + album_json = API.get_album(ids) + except NoDataApi: + raise AlbumNotFound(link_album) + + song_metadata = API.tracking_album(album_json) + + preferences = Preferences() + + preferences.link = link_album + preferences.song_metadata = song_metadata + preferences.quality_download = quality_download + preferences.output_dir = output_dir + preferences.ids = ids + preferences.json_data = album_json + preferences.recursive_quality = recursive_quality + preferences.recursive_download = recursive_download + preferences.not_interface = not_interface + preferences.method_save = method_save + preferences.make_zip = make_zip + + album = DW_ALBUM(preferences).dw() + + return album + + def download_playlistdee( + self, link_playlist, + output_dir = stock_output, + quality_download = stock_quality, + recursive_quality = stock_recursive_quality, + recursive_download = stock_recursive_download, + not_interface = stock_not_interface, + make_zip = stock_zip, + method_save = method_save + ) -> Playlist: + + link_is_valid(link_playlist) + ids = get_ids(link_playlist) + + song_metadata = [] + playlist_json = API.get_playlist(ids) + + for track in playlist_json['tracks']['data']: + c_ids = track['id'] + + try: + c_song_metadata = API.tracking(c_ids) + except NoDataApi: + infos = self.__gw_api.get_song_data(c_ids) + + if not "FALLBACK" in infos: + c_song_metadata = f"{track['title']} - {track['artist']['name']}" + else: + c_song_metadata = API.tracking(c_ids) + + song_metadata.append(c_song_metadata) + + preferences = Preferences() + + preferences.link = link_playlist + preferences.song_metadata = song_metadata + preferences.quality_download = quality_download + preferences.output_dir = output_dir + preferences.ids = ids + preferences.json_data = playlist_json + preferences.recursive_quality = recursive_quality + preferences.recursive_download = recursive_download + preferences.not_interface = not_interface + preferences.method_save = method_save + preferences.make_zip = make_zip + + playlist = DW_PLAYLIST(preferences).dw() + + return playlist + + def download_artisttopdee( + self, link_artist, + output_dir = stock_output, + quality_download = stock_quality, + recursive_quality = stock_recursive_quality, + recursive_download = stock_recursive_download, + not_interface = stock_not_interface + ) -> list[Track]: + + link_is_valid(link_artist) + ids = get_ids(link_artist) + + playlist_json = API.get_artist_top_tracks(ids)['data'] + + names = [ + self.download_trackdee( + track['link'], output_dir, + quality_download, recursive_quality, + recursive_download, not_interface + ) + + for track in playlist_json + ] + + return names + + def convert_spoty_to_dee_link_track(self, link_track): + link_is_valid(link_track) + ids = get_ids(link_track) + + track_json = Spo.get_track(ids) + external_ids = track_json['external_ids'] + + if not external_ids: + msg = f"⚠ The track \"{track_json['name']}\" can't be converted to Deezer link :( ⚠" + + raise TrackNotFound( + url = link_track, + message = msg + ) + + isrc = f"isrc:{external_ids['isrc']}" + + track_json_dee = API.get_track(isrc) + track_link_dee = track_json_dee['link'] + + return track_link_dee + + def download_trackspo( + self, link_track, + output_dir = stock_output, + quality_download = stock_quality, + recursive_quality = stock_recursive_quality, + recursive_download = stock_recursive_download, + not_interface = stock_not_interface, + method_save = method_save + ) -> Track: + + track_link_dee = self.convert_spoty_to_dee_link_track(link_track) + + track = self.download_trackdee( + track_link_dee, + output_dir = output_dir, + quality_download = quality_download, + recursive_quality = recursive_quality, + recursive_download = recursive_download, + not_interface = not_interface, + method_save = method_save + ) + + return track + + def convert_spoty_to_dee_link_album(self, link_album): + link_is_valid(link_album) + ids = get_ids(link_album) + link_dee = None + + tracks = Spo.get_album(ids) + + try: + external_ids = tracks['external_ids'] + + if not external_ids: + raise AlbumNotFound + + upc = f"0{external_ids['upc']}" + + while upc[0] == "0": + upc = upc[1:] + + try: + upc = f"upc:{upc}" + url = API.get_album(upc) + link_dee = url['link'] + break + except NoDataApi: + if upc[0] != "0": + raise AlbumNotFound + except AlbumNotFound: + tot = tracks['total_tracks'] + tracks = tracks['tracks']['items'] + tot2 = None + + for track in tracks: + track_link = track['external_urls']['spotify'] + track_info = Spo.get_track(track_link) + + try: + isrc = f"isrc:{track_info['external_ids']['isrc']}" + track_data = API.get_track(isrc) + + if not "id" in track_data['album']: + continue + + album_ids = track_data['album']['id'] + album_json = API.get_album(album_ids) + tot2 = album_json['nb_tracks'] + + if tot == tot2: + link_dee = album_json['link'] + break + except NoDataApi: + pass + + if tot != tot2: + raise AlbumNotFound(link_album) + + return link_dee + + def download_albumspo( + self, link_album, + output_dir = stock_output, + quality_download = stock_quality, + recursive_quality = stock_recursive_quality, + recursive_download = stock_recursive_download, + not_interface = stock_not_interface, + make_zip = stock_zip, + method_save = method_save + ) -> Album: + + link_dee = self.convert_spoty_to_dee_link_album(link_album) + + album = self.download_albumdee( + link_dee, output_dir, + quality_download, recursive_quality, + recursive_download, not_interface, + make_zip, method_save + ) + + return album + + def download_playlistspo( + self, link_playlist, + output_dir = stock_output, + quality_download = stock_quality, + recursive_quality = stock_recursive_quality, + recursive_download = stock_recursive_download, + not_interface = stock_not_interface, + make_zip = stock_zip, + method_save = method_save + ) -> Playlist: + + link_is_valid(link_playlist) + ids = get_ids(link_playlist) + + playlist_json = Spo.get_playlist(ids) + playlist_tracks = playlist_json['tracks']['items'] + playlist = Playlist() + tracks = playlist.tracks + + for track in playlist_tracks: + is_track = track['track'] + + if not is_track: + continue + + external_urls = is_track['external_urls'] + + if not external_urls: + print(f"The track \"{is_track['name']}\" is not avalaible on Spotify :(") + continue + + link_track = external_urls['spotify'] + + try: + track = self.download_trackspo( + link_track, + output_dir = output_dir, + quality_download = quality_download, + recursive_quality = recursive_quality, + recursive_download = recursive_download, + not_interface = not_interface, + method_save = method_save + ) + except (TrackNotFound, NoDataApi): + info = track['track'] + artist = info['artists'][0]['name'] + song = info['name'] + track = f"{song} - {artist}" + + tracks.append(track) + + if make_zip: + playlist_name = playlist_json['name'] + zip_name = f"{output_dir}playlist {playlist_name}.zip" + create_zip(tracks, zip_name = zip_name) + playlist.zip_path = zip_name + + return playlist + + def download_name( + self, artist, song, + output_dir = stock_output, + quality_download = stock_quality, + recursive_quality = stock_recursive_quality, + recursive_download = stock_recursive_download, + not_interface = stock_not_interface, + method_save = method_save + ) -> Track: + + query = f"track:{song} artist:{artist}" + search = self.__spo.search(query) + items = search['tracks']['items'] + + if len(items) == 0: + msg = f"No result for {query} :(" + raise TrackNotFound(message = msg) + + link_track = items[0]['external_urls']['spotify'] + + track = self.download_trackspo( + link_track, + output_dir = output_dir, + quality_download = quality_download, + recursive_quality = recursive_quality, + recursive_download = recursive_download, + not_interface = not_interface, + method_save = method_save + ) + + return track + + def download_episode( + self, + link_episode, + output_dir = stock_output, + quality_download = stock_quality, + recursive_quality = stock_recursive_quality, + recursive_download = stock_recursive_download, + not_interface = stock_not_interface, + method_save = method_save + ) -> Episode: + + link_is_valid(link_episode) + ids = get_ids(link_episode) + + try: + episode_metadata = API.tracking(ids) + except NoDataApi: + infos = self.__gw_api.get_episode_data(ids) + if not infos: + raise TrackNotFound("Episode not found") + episode_metadata = { + 'music': infos.get('EPISODE_TITLE', ''), + 'artist': infos.get('SHOW_NAME', ''), + 'album': infos.get('SHOW_NAME', ''), + 'date': infos.get('EPISODE_PUBLISHED_TIMESTAMP', '').split()[0], + 'genre': 'Podcast', + 'explicit': infos.get('SHOW_IS_EXPLICIT', '2'), + 'disc': 1, + 'track': 1, + 'duration': int(infos.get('DURATION', 0)), + 'isrc': None, + 'image': infos.get('EPISODE_IMAGE_MD5', '') + } + + preferences = Preferences() + preferences.link = link_episode + preferences.song_metadata = episode_metadata + preferences.quality_download = quality_download + preferences.output_dir = output_dir + preferences.ids = ids + preferences.recursive_quality = recursive_quality + preferences.recursive_download = recursive_download + preferences.not_interface = not_interface + preferences.method_save = method_save + + episode = DW_EPISODE(preferences).dw() + + return episode + + def download_smart( + self, link, + output_dir=stock_output, + quality_download=stock_quality, + recursive_quality=stock_recursive_quality, + recursive_download=stock_recursive_download, + not_interface=stock_not_interface, + make_zip=stock_zip, + method_save=method_save + ) -> Smart: + + link_is_valid(link) + link = what_kind(link) + smart = Smart() + + if "spotify.com" in link: + source = "https://spotify.com" + + elif "deezer.com" in link: + source = "https://deezer.com" + + smart.source = source + + if "track/" in link: + if "spotify.com" in link: + func = self.download_trackspo + + elif "deezer.com" in link: + func = self.download_trackdee + + else: + raise InvalidLink(link) + + track = func( + link, + output_dir=output_dir, + quality_download=quality_download, + recursive_quality=recursive_quality, + recursive_download=recursive_download, + not_interface=not_interface, + method_save=method_save + ) + + smart.type = "track" + smart.track = track + + elif "album/" in link: + if "spotify.com" in link: + func = self.download_albumspo + + elif "deezer.com" in link: + func = self.download_albumdee + + else: + raise InvalidLink(link) + + album = func( + link, + output_dir=output_dir, + quality_download=quality_download, + recursive_quality=recursive_quality, + recursive_download=recursive_download, + not_interface=not_interface, + make_zip=make_zip, + method_save=method_save + ) + + smart.type = "album" + smart.album = album + + elif "playlist/" in link: + if "spotify.com" in link: + func = self.download_playlistspo + + elif "deezer.com" in link: + func = self.download_playlistdee + + else: + raise InvalidLink(link) + + playlist = func( + link, + output_dir=output_dir, + quality_download=quality_download, + recursive_quality=recursive_quality, + recursive_download=recursive_download, + not_interface=not_interface, + make_zip=make_zip, + method_save=method_save + ) + + smart.type = "playlist" + smart.playlist = playlist + + return smart diff --git a/deezspot/deezloader/__utils__.py b/deezspot/deezloader/__utils__.py new file mode 100644 index 0000000..3c9635b --- /dev/null +++ b/deezspot/deezloader/__utils__.py @@ -0,0 +1,48 @@ +#!/usr/bin/python3 + +def artist_sort(array: list): + if len(array) > 1: + for a in array: + for b in array: + if a in b and a != b: + array.remove(b) + + array = list( + dict.fromkeys(array) + ) + + artists = " & ".join(array) + + return artists + +def check_track_token(infos: dict): + if "FALLBACK" in infos: + track_token = infos['FALLBACK']['TRACK_TOKEN'] + else: + track_token = infos['TRACK_TOKEN'] + + return track_token + +def check_track_ids(infos: dict) -> str: + if "FALLBACK" in infos: + if 'EPISODE_ID' in infos['FALLBACK']: + return infos['FALLBACK']['EPISODE_ID'] + elif 'SNG_ID' in infos['FALLBACK']: + return infos['FALLBACK']['SNG_ID'] + else: + if 'EPISODE_ID' in infos: + return infos['EPISODE_ID'] + elif 'SNG_ID' in infos: + return infos['SNG_ID'] + + raise ValueError("Track ID not found in the provided information") + +def check_track_md5(infos: dict): + if "FALLBACK" in infos: + song_md5 = infos['FALLBACK']['MD5_ORIGIN'] + version = infos['FALLBACK']['MEDIA_VERSION'] if 'MEDIA_VERSION' in infos['FALLBACK'] else None + else: + song_md5 = infos['MD5_ORIGIN'] + version = infos['MEDIA_VERSION'] if 'MEDIA_VERSION' in infos else None + + return song_md5, version \ No newline at end of file diff --git a/deezspot/deezloader/dee_api.py b/deezspot/deezloader/dee_api.py new file mode 100644 index 0000000..2bde89d --- /dev/null +++ b/deezspot/deezloader/dee_api.py @@ -0,0 +1,294 @@ +#!/usr/bin/python3 + +from time import sleep +from datetime import datetime +from deezspot.deezloader.__utils__ import artist_sort +from requests import get as req_get +from deezspot.libutils.utils import convert_to_date +from deezspot.libutils.others_settings import header +from deezspot.exceptions import ( + NoDataApi, + QuotaExceeded, + TrackNotFound, +) + +class API: + + @classmethod + def __init__(cls): + cls.__api_link = "https://api.deezer.com/" + cls.__cover = "https://e-cdns-images.dzcdn.net/images/cover/%s/{}-000000-80-0-0.jpg" + + @classmethod + def __get_api(cls, url, quota_exceeded = False): + json = req_get(url, headers = header).json() + + if "error" in json: + if json['error']['message'] == "no data": + raise NoDataApi("No data avalaible :(") + + elif json['error']['message'] == "Quota limit exceeded": + if not quota_exceeded: + sleep(0.8) + json = cls.__get_api(url, True) + else: + raise QuotaExceeded + + return json + + @classmethod + def get_chart(cls, index = 0): + url = f"{cls.__api_link}chart/{index}" + infos = cls.__get_api(url) + + return infos + + @classmethod + def get_track(cls, ids): + url = f"{cls.__api_link}track/{ids}" + infos = cls.__get_api(url) + + return infos + + @classmethod + def get_album(cls, ids): + url = f"{cls.__api_link}album/{ids}" + infos = cls.__get_api(url) + + return infos + + @classmethod + def get_playlist(cls, ids): + url = f"{cls.__api_link}playlist/{ids}" + infos = cls.__get_api(url) + + return infos + + @classmethod + def get_artist(cls, ids): + url = f"{cls.__api_link}artist/{ids}" + infos = cls.__get_api(url) + + return infos + + @classmethod + def get_artist_top_tracks(cls, ids, limit = 25): + url = f"{cls.__api_link}artist/{ids}/top?limit={limit}" + infos = cls.__get_api(url) + + return infos + + @classmethod + def get_artist_top_albums(cls, ids, limit = 25): + url = f"{cls.__api_link}artist/{ids}/albums?limit={limit}" + infos = cls.__get_api(url) + + return infos + + @classmethod + def get_artist_related(cls, ids): + url = f"{cls.__api_link}artist/{ids}/related" + infos = cls.__get_api(url) + + return infos + + @classmethod + def get_artist_radio(cls, ids): + url = f"{cls.__api_link}artist/{ids}/radio" + infos = cls.__get_api(url) + + return infos + + @classmethod + def get_artist_top_playlists(cls, ids, limit = 25): + url = f"{cls.__api_link}artist/{ids}/playlists?limit={limit}" + infos = cls.__get_api(url) + + return infos + + @classmethod + def search(cls, query): + url = f"{cls.__api_link}search/?q={query}" + infos = cls.__get_api(url) + + if infos['total'] == 0: + raise NoDataApi(query) + + return infos + + @classmethod + def search_track(cls, query): + url = f"{cls.__api_link}search/track/?q={query}" + infos = cls.__get_api(url) + + if infos['total'] == 0: + raise NoDataApi(query) + + return infos + + @classmethod + def search_album(cls, query): + url = f"{cls.__api_link}search/album/?q={query}" + infos = cls.__get_api(url) + + if infos['total'] == 0: + raise NoDataApi(query) + + return infos + + @classmethod + def search_playlist(cls, query): + url = f"{cls.__api_link}search/playlist/?q={query}" + infos = cls.__get_api(url) + + if infos['total'] == 0: + raise NoDataApi(query) + + return infos + + @classmethod + def search_artist(cls, query): + url = f"{cls.__api_link}search/artist/?q={query}" + infos = cls.__get_api(url) + + if infos['total'] == 0: + raise NoDataApi(query) + + return infos + + @classmethod + def not_found(cls, song, title): + try: + data = cls.search_track(song)['data'] + except NoDataApi: + raise TrackNotFound(song) + + ids = None + + for track in data: + if ( + track['title'] == title + ) or ( + title in track['title_short'] + ): + ids = track['id'] + break + + if not ids: + raise TrackNotFound(song) + + return str(ids) + + @classmethod + def get_img_url(cls, md5_image, size = "1200x1200"): + cover = cls.__cover.format(size) + image_url = cover % md5_image + + return image_url + + @classmethod + def choose_img(cls, md5_image, size = "1200x1200"): + image_url = cls.get_img_url(md5_image, size) + image = req_get(image_url).content + + if len(image) == 13: + image_url = cls.get_img_url("", size) + image = req_get(image_url).content + + return image + + @classmethod + def tracking(cls, ids, album = False) -> dict: + song_metadata = {} + json_track = cls.get_track(ids) + + if not album: + album_ids = json_track['album']['id'] + album_json = cls.get_album(album_ids) + genres = [] + + if "genres" in album_json: + for genre in album_json['genres']['data']: + genres.append(genre['name']) + + song_metadata['genre'] = " & ".join(genres) + ar_album = [] + + for contributor in album_json['contributors']: + if contributor['role'] == "Main": + ar_album.append(contributor['name']) + + song_metadata['ar_album'] = " & ".join(ar_album) + song_metadata['album'] = album_json['title'] + song_metadata['label'] = album_json['label'] + song_metadata['upc'] = album_json['upc'] + song_metadata['nb_tracks'] = album_json['nb_tracks'] + + song_metadata['music'] = json_track['title'] + array = [] + + for contributor in json_track['contributors']: + if contributor['name'] != "": + array.append(contributor['name']) + + array.append( + json_track['artist']['name'] + ) + + song_metadata['artist'] = artist_sort(array) + song_metadata['tracknum'] = json_track['track_position'] + song_metadata['discnum'] = json_track['disk_number'] + song_metadata['year'] = convert_to_date(json_track['release_date']) + song_metadata['bpm'] = json_track['bpm'] + song_metadata['duration'] = json_track['duration'] + song_metadata['isrc'] = json_track['isrc'] + song_metadata['gain'] = json_track['gain'] + + return song_metadata + + @classmethod + def tracking_album(cls, album_json): + song_metadata: dict[ + str, + list or str or int or datetime + ] = { + "music": [], + "artist": [], + "tracknum": [], + "discnum": [], + "bpm": [], + "duration": [], + "isrc": [], + "gain": [], + "album": album_json['title'], + "label": album_json['label'], + "year": convert_to_date(album_json['release_date']), + "upc": album_json['upc'], + "nb_tracks": album_json['nb_tracks'] + } + + genres = [] + + if "genres" in album_json: + for a in album_json['genres']['data']: + genres.append(a['name']) + + song_metadata['genre'] = " & ".join(genres) + ar_album = [] + + for a in album_json['contributors']: + if a['role'] == "Main": + ar_album.append(a['name']) + + song_metadata['ar_album'] = " & ".join(ar_album) + sm_items = song_metadata.items() + + for track in album_json['tracks']['data']: + c_ids = track['id'] + detas = cls.tracking(c_ids, album = True) + + for key, item in sm_items: + if type(item) is list: + song_metadata[key].append(detas[key]) + + return song_metadata \ No newline at end of file diff --git a/deezspot/deezloader/deegw_api.py b/deezspot/deezloader/deegw_api.py new file mode 100644 index 0000000..d126f65 --- /dev/null +++ b/deezspot/deezloader/deegw_api.py @@ -0,0 +1,322 @@ +#!/usr/bin/python3 + +from requests import Session +from deezspot.deezloader.deezer_settings import qualities +from deezspot.deezloader.__download_utils__ import md5hex +from deezspot.exceptions import ( + BadCredentials, + TrackNotFound, + NoRightOnMedia, +) +from requests import ( + get as req_get, + post as req_post, +) +class API_GW: + + @classmethod + def __init__( + cls, + arl = None, + email = None, + password = None + ): + cls.__req = Session() + cls.__arl = arl + cls.__email = email + cls.__password = password + cls.__token = "null" + + cls.__client_id = 172365 + cls.__client_secret = "fb0bec7ccc063dab0417eb7b0d847f34" + cls.__try_link = "https://api.deezer.com/platform/generic/track/3135556" + + cls.__get_lyric = "song.getLyrics" + cls.__get_song_data = "song.getData" + cls.__get_user_getArl = "user.getArl" + cls.__get_page_track = "deezer.pageTrack" + cls.__get_user_data = "deezer.getUserData" + cls.__get_album_data = "song.getListByAlbum" + cls.__get_playlist_data = "playlist.getSongs" + cls.__get_episode_data = "episode.getData" + + cls.__get_media_url = "https://media.deezer.com/v1/get_url" + cls.__get_auth_token_url = "https://api.deezer.com/auth/token" + cls.__private_api_link = "https://www.deezer.com/ajax/gw-light.php" + cls.__song_server = "https://e-cdns-proxy-{}.dzcdn.net/mobile/1/{}" + + cls.__refresh_token() + + @classmethod + def __login(cls): + if ( + (not cls.__arl) and + (not cls.__email) and + (not cls.__password) + ): + msg = f"NO LOGIN STUFF INSERTED :)))" + + raise BadCredentials(msg = msg) + + if cls.__arl: + cls.__req.cookies['arl'] = cls.__arl + else: + cls.__set_arl() + + @classmethod + def __set_arl(cls): + access_token = cls.__get_access_token() + + c_headers = { + "Authorization": f"Bearer {access_token}" + } + + cls.__req.get(cls.__try_link, headers = c_headers).json() + cls.__arl = cls.__get_api(cls.__get_user_getArl) + + @classmethod + def __get_access_token(cls): + password = md5hex(cls.__password) + + to_hash = ( + f"{cls.__client_id}{cls.__email}{password}{cls.__client_secret}" + ) + + request_hash = md5hex(to_hash) + + params = { + "app_id": cls.__client_id, + "login": cls.__email, + "password": password, + "hash": request_hash + } + + results = req_get(cls.__get_auth_token_url, params = params).json() + + if "error" in results: + raise BadCredentials( + email = cls.__email, + password = cls.__password + ) + + access_token = results['access_token'] + + return access_token + + def __cool_api(cls): + guest_sid = cls.__req.cookies.get("sid") + url = "https://api.deezer.com/1.0/gateway.php" + + params = { + 'api_key': "4VCYIJUCDLOUELGD1V8WBVYBNVDYOXEWSLLZDONGBBDFVXTZJRXPR29JRLQFO6ZE", + 'sid': guest_sid, + 'input': '3', + 'output': '3', + 'method': 'song_getData' + } + + json = {'sng_id': 302127} + + json = req_post(url, params = params, json = json).json() + print(json) + + @classmethod + def __get_api( + cls, method, + json_data = None, + repeats = 4 + ): + params = { + "api_version": "1.0", + "api_token": cls.__token, + "input": "3", + "method": method + } + + results = cls.__req.post( + cls.__private_api_link, + params = params, + json = json_data + ).json()['results'] + + if not results and repeats != 0: + cls.__refresh_token() + + cls.__get_api( + method, json_data, + repeats = repeats - 1 + ) + + return results + + @classmethod + def get_user(cls): + data = cls.__get_api(cls.__get_user_data) + + return data + + @classmethod + def __refresh_token(cls): + cls.__req.cookies.clear_session_cookies() + + if not cls.amIlog(): + cls.__login() + cls.am_I_log() + + data = cls.get_user() + cls.__token = data['checkForm'] + cls.__license_token = cls.__get_license_token() + + @classmethod + def __get_license_token(cls): + data = cls.get_user() + license_token = data['USER']['OPTIONS']['license_token'] + + return license_token + + @classmethod + def amIlog(cls): + data = cls.get_user() + user_id = data['USER']['USER_ID'] + is_logged = False + + if user_id != 0: + is_logged = True + + return is_logged + + @classmethod + def am_I_log(cls): + if not cls.amIlog(): + raise BadCredentials(arl = cls.__arl) + + @classmethod + def get_song_data(cls, ids): + json_data = { + "sng_id" : ids + } + + infos = cls.__get_api(cls.__get_song_data, json_data) + + return infos + + @classmethod + def get_album_data(cls, ids): + json_data = { + "alb_id": ids, + "nb": -1 + } + + infos = cls.__get_api(cls.__get_album_data, json_data) + + return infos + + @classmethod + def get_lyric(cls, ids): + json_data = { + "sng_id": ids + } + + infos = cls.__get_api(cls.__get_lyric, json_data) + + return infos + + @classmethod + def get_playlist_data(cls, ids): + json_data = { + "playlist_id": ids, + "nb": -1 + } + + infos = cls.__get_api(cls.__get_playlist_data, json_data) + + return infos + + @classmethod + def get_page_track(cls, ids): + json_data = { + "sng_id" : ids + } + + infos = cls.__get_api(cls.__get_page_track, json_data) + + return infos + + @classmethod + def get_episode_data(cls, ids): + json_data = { + "episode_id": ids + } + + infos = cls.__get_api(cls.__get_episode_data, json_data) + + if infos: + infos['MEDIA_VERSION'] = '1' + infos['SNG_ID'] = infos.get('EPISODE_ID') + if 'EPISODE_DIRECT_STREAM_URL' in infos: + infos['MD5_ORIGIN'] = 'episode' + + return infos + + @classmethod + def get_song_url(cls, n, song_hash): + song_url = cls.__song_server.format(n, song_hash) + + return song_url + + @classmethod + def song_exist(cls, song_link): + if song_link and 'spreaker.com' in song_link: + return req_get(song_link, stream=True) + + crypted_audio = req_get(song_link) + + if len(crypted_audio.content) == 0: + raise TrackNotFound + + return crypted_audio + + @classmethod + def get_medias_url(cls, tracks_token, quality): + others_qualities = [] + + for c_quality in qualities: + if c_quality == quality: + continue + + c_quality_set = { + "cipher": "BF_CBC_STRIPE", + "format": c_quality + } + + others_qualities.append(c_quality_set) + + json_data = { + "license_token": cls.__license_token, + "media": [ + { + "type": "FULL", + "formats": [ + { + "cipher": "BF_CBC_STRIPE", + "format": quality + } + ] + others_qualities + } + ], + "track_tokens": tracks_token + } + + infos = req_post( + cls.__get_media_url, + json = json_data + ).json() + + if "errors" in infos: + msg = infos['errors'][0]['message'] + + raise NoRightOnMedia(msg) + + medias = infos['data'] + + return medias diff --git a/deezspot/deezloader/deezer_settings.py b/deezspot/deezloader/deezer_settings.py new file mode 100644 index 0000000..0e833c9 --- /dev/null +++ b/deezspot/deezloader/deezer_settings.py @@ -0,0 +1,24 @@ +#!/usr/bin/python3 + +stock_quality = "MP3_320" +method_saves = ["0", "1", "2"] + +qualities = { + "MP3_320": { + "n_quality": "3", + "f_format": ".mp3", + "s_quality": "320" + }, + + "FLAC": { + "n_quality": "9", + "f_format": ".flac", + "s_quality": "FLAC" + }, + + "MP3_128": { + "n_quality": "1", + "f_format": ".mp3", + "s_quality": "128" + } +} \ No newline at end of file diff --git a/deezspot/easy_spoty.py b/deezspot/easy_spoty.py new file mode 100644 index 0000000..7d8b20e --- /dev/null +++ b/deezspot/easy_spoty.py @@ -0,0 +1,77 @@ +#!/usr/bin/python3 + +from spotipy import Spotify +from deezspot.exceptions import InvalidLink +from spotipy.exceptions import SpotifyException +from spotipy_anon import SpotifyAnon + +class Spo: + __error_codes = [404, 400] + + @classmethod + def __init__(cls): + cls.__api = Spotify( + auth_manager=SpotifyAnon() # Instead using spotipy client use spotify anon because spotify client can't fetch information from track or playlist from official spotify + ) + + @classmethod + def __lazy(cls, results): + albums = results['items'] + + while results['next']: + results = cls.__api.next(results) + albums.extend(results['items']) + + return results + + @classmethod + def get_track(cls, ids): + try: + track_json = cls.__api.track(ids) + except SpotifyException as error: + if error.http_status in cls.__error_codes: + raise InvalidLink(ids) + + return track_json + + @classmethod + def get_album(cls, ids): + try: + album_json = cls.__api.album(ids) + except SpotifyException as error: + if error.http_status in cls.__error_codes: + raise InvalidLink(ids) + + tracks = album_json['tracks'] + cls.__lazy(tracks) + + return album_json + + @classmethod + def get_playlist(cls, ids): + try: + playlist_json = cls.__api.playlist(ids) + except SpotifyException as error: + if error.http_status in cls.__error_codes: + raise InvalidLink(ids) + + tracks = playlist_json['tracks'] + cls.__lazy(tracks) + + return playlist_json + + @classmethod + def get_episode(cls, ids): + try: + episode_json = cls.__api.episode(ids) + except SpotifyException as error: + if error.http_status in cls.__error_codes: + raise InvalidLink(ids) + + return episode_json + + @classmethod + def search(cls, query): + search = cls.__api.search(query) + + return search \ No newline at end of file diff --git a/deezspot/exceptions.py b/deezspot/exceptions.py new file mode 100644 index 0000000..48ff07d --- /dev/null +++ b/deezspot/exceptions.py @@ -0,0 +1,77 @@ +#!/usr/bin/python3 + +class TrackNotFound(Exception): + def __init__(self, url = None, message = None): + self.url = url + + if not message: + self.message = f"Track {self.url} not found :(" + else: + self.message = message + + super().__init__(self.message) + +class AlbumNotFound(Exception): + def __init__(self, url = None): + self.url = url + self.msg = f"Album {self.url} not found :(" + super().__init__(self.msg) + +class InvalidLink(Exception): + def __init__(self, url): + self.url = url + self.msg = f"Invalid Link {self.url} :(" + super().__init__(self.msg) + +class QuotaExceeded(Exception): + def __init__(self, message = None): + + if not message: + self.message = "TOO MUCH REQUESTS LIMIT YOURSELF !!! :)" + + super().__init__(self.message) + +class QualityNotFound(Exception): + def __init__(self, quality = None, msg = None): + self.quality = quality + + if not msg: + self.msg = ( + f"The {quality} quality doesn't exist :)\ + \nThe qualities have to be FLAC or MP3_320 or MP3_256 or MP3_128" + ) + else: + self.msg = msg + + super().__init__(self.msg) + +class NoRightOnMedia(Exception): + def __init__(self, msg): + self.msg = msg + super().__init__(msg) + +class NoDataApi(Exception): + def __init__(self, message): + super().__init__(message) + +class BadCredentials(Exception): + def __init__( + self, + arl = None, + email = None, + password = None, + msg = None + ): + if msg: + self.msg = msg + else: + self.arl = arl + self.email = email + self.password = password + + if arl: + self.msg = f"Wrong token: {arl} :(" + else: + self.msg = f"Wrong credentials email: {self.email}, password: {self.password}" + + super().__init__(self.msg) \ No newline at end of file diff --git a/deezspot/libutils/__init__.py b/deezspot/libutils/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/deezspot/libutils/others_settings.py b/deezspot/libutils/others_settings.py new file mode 100644 index 0000000..18a4ca4 --- /dev/null +++ b/deezspot/libutils/others_settings.py @@ -0,0 +1,27 @@ +#!/usr/bin/python3 + +method_saves = ["0", "1", "2", "3"] + +sources = [ + "dee", "spo" +] + +header = { + "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:88.0) Gecko/20100101 Firefox/88.0", + "Accept-Language": "en-US;q=0.5,en;q=0.3" +} + +supported_link = [ + "www.deezer.com", "open.spotify.com", + "deezer.com", "spotify.com", + "deezer.page.link", "www.spotify.com" +] + +answers = ["Y", "y", "Yes", "YES"] +stock_output = "Songs" +stock_recursive_quality = False +stock_recursive_download = False +stock_not_interface = False +stock_zip = False +method_save = 3 +is_thread = False # WARNING FOR TRUE, LOOP ON DEFAULT \ No newline at end of file diff --git a/deezspot/libutils/utils.py b/deezspot/libutils/utils.py new file mode 100644 index 0000000..b6fa7f8 --- /dev/null +++ b/deezspot/libutils/utils.py @@ -0,0 +1,251 @@ +#!/usr/bin/python3 + +from os import makedirs +from datetime import datetime +from urllib.parse import urlparse +from requests import get as req_get +from zipfile import ZipFile, ZIP_DEFLATED +from deezspot.models.track import Track +from deezspot.exceptions import InvalidLink +from deezspot.libutils.others_settings import supported_link, header + +from os.path import ( + isdir, basename, + join, isfile +) + +def link_is_valid(link): + netloc = urlparse(link).netloc + + if not any( + c_link == netloc + for c_link in supported_link + ): + raise InvalidLink(link) + +def get_ids(link): + parsed = urlparse(link) + path = parsed.path + ids = path.split("/")[-1] + + return ids + +def request(url): + thing = req_get(url, headers=header) + + return thing + +def __check_dir(directory): + if not isdir(directory): + makedirs(directory) + +def var_excape(string): + # Enhance character replacement for filenames + replacements = { + "\\": "", + "/": "", + ":": "", + "*": "", + "?": "", + "\"": "", + "<": "", + ">": "", + "|": "-", + "&": "and", + "$": "s", + "'": "", + "`": "", + } + + for old, new in replacements.items(): + string = string.replace(old, new) + + # Remove any other non-printable characters + string = ''.join(char for char in string if char.isprintable()) + + return string.strip() + +def convert_to_date(date: str): + if date == "0000-00-00": + date = "0001-01-01" + + elif date.isdigit(): + date = f"{date}-01-01" + + date = datetime.strptime(date, "%Y-%m-%d") + + return date + +def what_kind(link): + url = request(link).url + + if url.endswith("/"): + url = url[:-1] + + return url + +def __get_tronc(string): + l_encoded = len( + string.encode() + ) + + if l_encoded > 242: + n_tronc = len(string) - l_encoded - 242 + else: + n_tronc = 242 + + return n_tronc + +def __get_dir(song_metadata, output_dir, method_save): + if song_metadata is None: + raise ValueError("song_metadata cannot be None") + + if 'show' in song_metadata and 'name' in song_metadata: + show = var_excape(song_metadata.get('show', '')) + episode = var_excape(song_metadata.get('name', '')) + if show and episode: + dir_name = f"{show} - {episode}" + elif show: + dir_name = show + elif episode: + dir_name = episode + else: + dir_name = "Unknown Episode" + else: + album = var_excape(song_metadata.get('album', '')) + artist = var_excape(song_metadata.get('artist', '')) + if method_save == 0: + dir_name = f"{album} - {artist}" + elif method_save == 1: + dir_name = f"{artist}" + elif method_save == 2: + dir_name = f"{album} - {artist}" + elif method_save == 3: + dir_name = f"{album} - {artist}" + else: + dir_name = "Unknown" + + final_dir = join(output_dir, dir_name) + if not isdir(final_dir): + makedirs(final_dir) + + return final_dir + +def set_path( + song_metadata, output_dir, + song_quality, file_format, method_save, + is_episode=False +): + if song_metadata is None: + raise ValueError("song_metadata cannot be None") + + if is_episode: + show = var_excape(song_metadata.get('show', '')) + episode = var_excape(song_metadata.get('name', '')) + if show and episode: + song_name = f"{show} - {episode}" + elif show: + song_name = show + elif episode: + song_name = episode + else: + song_name = "Unknown Episode" + else: + album = var_excape(song_metadata.get('album', '')) + artist = var_excape(song_metadata.get('artist', '')) + music = var_excape(song_metadata.get('music', '')) + + if method_save == 0: + discnum = song_metadata.get('discnum', '') + tracknum = song_metadata.get('tracknum', '') + song_name = f"{album} CD {discnum} TRACK {tracknum}" + + elif method_save == 1: + song_name = f"{music} - {artist}" + + elif method_save == 2: + isrc = song_metadata.get('isrc', '') + song_name = f"{music} - {artist} [{isrc}]" + + elif method_save == 3: + discnum = song_metadata.get('discnum', '') + tracknum = song_metadata.get('tracknum', '') + song_name = f"{discnum}|{tracknum} - {music} - {artist}" + + # Ensure the song_name is not too long + max_length = 255 - len(output_dir) - len(file_format) - len(f" ({song_quality})") + song_name = song_name[:max_length] + + song_dir = __get_dir(song_metadata, output_dir, method_save) + __check_dir(song_dir) + n_tronc = __get_tronc(song_name) + song_path = f"{song_dir}/{song_name[:n_tronc]} ({song_quality}){file_format}" + + return song_path + +def create_zip( + tracks: list[Track], + output_dir=None, + song_metadata=None, + song_quality=None, + method_save=0, + zip_name=None +): + if not zip_name: + album = var_excape(song_metadata.get('album', '')) + song_dir = __get_dir(song_metadata, output_dir, method_save) + + if method_save == 0: + zip_name = f"{album}" + + elif method_save == 1: + artist = var_excape(song_metadata.get('ar_album', '')) + zip_name = f"{album} - {artist}" + + elif method_save == 2: + artist = var_excape(song_metadata.get('ar_album', '')) + upc = song_metadata.get('upc', '') + zip_name = f"{album} - {artist} {upc}" + + elif method_save == 3: + artist = var_excape(song_metadata.get('ar_album', '')) + upc = song_metadata.get('upc', '') + zip_name = f"{album} - {artist} {upc}" + + n_tronc = __get_tronc(zip_name) + zip_name = zip_name[:n_tronc] + zip_name += f" ({song_quality}).zip" + zip_path = f"{song_dir}{zip_name}" + else: + zip_path = zip_name + + z = ZipFile(zip_path, "w", ZIP_DEFLATED) + + for track in tracks: + if not track.success: + continue + + c_song_path = track.song_path + song_path = basename(c_song_path) + + if not isfile(c_song_path): + continue + + z.write(c_song_path, song_path) + + z.close() + + return zip_path + +def trasform_sync_lyric(lyric): + sync_array = [] + + for a in lyric: + if "milliseconds" in a: + arr = ( + a['line'], int(a['milliseconds']) + ) + + sync_array.append(arr) + + return sync_array \ No newline at end of file diff --git a/deezspot/models/__init__.py b/deezspot/models/__init__.py new file mode 100644 index 0000000..2847ff6 --- /dev/null +++ b/deezspot/models/__init__.py @@ -0,0 +1,8 @@ +#!/usr/bin/python3 + +from deezspot.models.smart import Smart +from deezspot.models.track import Track +from deezspot.models.album import Album +from deezspot.models.playlist import Playlist +from deezspot.models.preferences import Preferences +from deezspot.models.episode import Episode diff --git a/deezspot/models/album.py b/deezspot/models/album.py new file mode 100644 index 0000000..6cfb958 --- /dev/null +++ b/deezspot/models/album.py @@ -0,0 +1,20 @@ +#!/usr/bin/python3 + +from deezspot.models.track import Track + +class Album: + def __init__(self, ids: int) -> None: + self.tracks: list[Track] = [] + self.zip_path = None + self.image = None + self.album_quality = None + self.md5_image = None + self.ids = ids + self.nb_tracks = None + self.album_name = None + self.upc = None + self.tags = None + self.__set_album_md5() + + def __set_album_md5(self): + self.album_md5 = f"album/{self.ids}" \ No newline at end of file diff --git a/deezspot/models/episode.py b/deezspot/models/episode.py new file mode 100644 index 0000000..421252a --- /dev/null +++ b/deezspot/models/episode.py @@ -0,0 +1,34 @@ +#!/usr/bin/python3 + +class Episode: + def __init__( + self, + tags: dict, + episode_path: str, + file_format: str, + quality: str, + link: str, + ids: int + ) -> None: + self.tags = tags + self.__set_tags() + self.episode_name = f"{self.name} - {self.show}" + self.episode_path = episode_path + self.file_format = file_format + self.quality = quality + self.link = link + self.ids = ids + self.md5_image = None + self.success = True + self.__set_episode_md5() + + def __set_tags(self): + for tag, value in self.tags.items(): + setattr(self, tag, value) + + def __set_episode_md5(self): + self.episode_md5 = f"episode/{self.ids}" + + def set_fallback_ids(self, fallback_ids): + self.fallback_ids = fallback_ids + self.fallback_episode_md5 = f"episode/{self.fallback_ids}" \ No newline at end of file diff --git a/deezspot/models/playlist.py b/deezspot/models/playlist.py new file mode 100644 index 0000000..53c9aa7 --- /dev/null +++ b/deezspot/models/playlist.py @@ -0,0 +1,8 @@ +#!/usr/bin/python3 + +from deezspot.models.track import Track + +class Playlist: + def __init__(self) -> None: + self.tracks: list[Track] = [] + self.zip_path = None \ No newline at end of file diff --git a/deezspot/models/preferences.py b/deezspot/models/preferences.py new file mode 100644 index 0000000..8157c0f --- /dev/null +++ b/deezspot/models/preferences.py @@ -0,0 +1,15 @@ +#!/usr/bin/python3 + +class Preferences: + def __init__(self) -> None: + self.link = None + self.song_metadata: dict = None + self.quality_download = None + self.output_dir = None + self.ids = None + self.json_data = None + self.recursive_quality = None + self.recursive_download = None + self.not_interface = None + self.method_save = None + self.make_zip = None \ No newline at end of file diff --git a/deezspot/models/smart.py b/deezspot/models/smart.py new file mode 100644 index 0000000..38a980e --- /dev/null +++ b/deezspot/models/smart.py @@ -0,0 +1,13 @@ +#!/usr/bin/python3 + +from deezspot.models.track import Track +from deezspot.models.album import Album +from deezspot.models.playlist import Playlist + +class Smart: + def __init__(self) -> None: + self.track: Track = None + self.album: Album = None + self.playlist: Playlist = None + self.type = None + self.source = None \ No newline at end of file diff --git a/deezspot/models/track.py b/deezspot/models/track.py new file mode 100644 index 0000000..234279c --- /dev/null +++ b/deezspot/models/track.py @@ -0,0 +1,37 @@ +#!/usr/bin/python3 + +class Track: + def __init__( + self, + tags: dict, + song_path: str, + file_format: str, + quality: str, + link: str, + ids: int +) -> None: + + self.tags = tags + self.__set_tags() + self.song_name = f"{self.music} - {self.artist}" + self.song_path = song_path + self.file_format = file_format + self.quality = quality + self.link = link + self.ids = ids + self.md5_image = None + self.success = True + self.__set_track_md5() + + def __set_tags(self): + for tag, value in self.tags.items(): + setattr( + self, tag, value + ) + + def __set_track_md5(self): + self.track_md5 = f"track/{self.ids}" + + def set_fallback_ids(self, fallback_ids): + self.fallback_ids = fallback_ids + self.fallback_track_md5 = f"track/{self.fallback_ids}" \ No newline at end of file diff --git a/deezspot/spotloader/__download__.py b/deezspot/spotloader/__download__.py new file mode 100644 index 0000000..0f32e95 --- /dev/null +++ b/deezspot/spotloader/__download__.py @@ -0,0 +1,496 @@ +import traceback +from tqdm import tqdm +import os +from copy import deepcopy +from os.path import isfile, dirname +from librespot.core import Session +from deezspot.exceptions import TrackNotFound +from librespot.metadata import TrackId, EpisodeId +from deezspot.spotloader.spotify_settings import qualities +from deezspot.libutils.others_settings import answers +from deezspot.__taggers__ import write_tags, check_track +from librespot.audio.decoders import VorbisOnlyAudioQuality +from os import ( + remove, + system, + replace as os_replace, +) +from deezspot.models import ( + Track, + Album, + Playlist, + Preferences, + Episode, +) +from deezspot.libutils.utils import ( + set_path, + create_zip, + request, +) + +class Download_JOB: + session = None + + @classmethod + def __init__(cls, session: Session) -> None: + cls.session = session + +class EASY_DW: + def __init__( + self, + preferences: Preferences + ) -> None: + self.__ids = preferences.ids + self.__link = preferences.link + self.__output_dir = preferences.output_dir + self.__method_save = preferences.method_save + self.__song_metadata = preferences.song_metadata + self.__not_interface = preferences.not_interface + self.__quality_download = preferences.quality_download + self.__recursive_download = preferences.recursive_download + + + self.__c_quality = qualities[self.__quality_download] + self.__fallback_ids = self.__ids + + self.__set_quality() + if preferences.is_episode: + self.__write_episode() + else: + self.__write_track() + + def __set_quality(self) -> None: + self.__dw_quality = self.__c_quality['n_quality'] + self.__file_format = self.__c_quality['f_format'] + self.__song_quality = self.__c_quality['s_quality'] + + def __set_song_path(self) -> None: + self.__song_path = set_path( + self.__song_metadata, + self.__output_dir, + self.__song_quality, + self.__file_format, + self.__method_save + ) + + def __set_episode_path(self) -> None: + self.__song_path = set_path( + self.__song_metadata, + self.__output_dir, + self.__song_quality, + self.__file_format, + self.__method_save, + is_episode=True + ) + + def __write_track(self) -> None: + self.__set_song_path() + + self.__c_track = Track( + self.__song_metadata, self.__song_path, + self.__file_format, self.__song_quality, + self.__link, self.__ids + ) + + self.__c_track.md5_image = self.__ids + self.__c_track.set_fallback_ids(self.__fallback_ids) + + def __write_episode(self) -> None: + self.__set_episode_path() + + self.__c_episode = Episode( + self.__song_metadata, self.__song_path, + self.__file_format, self.__song_quality, + self.__link, self.__ids + ) + + self.__c_episode.md5_image = self.__ids + self.__c_episode.set_fallback_ids(self.__fallback_ids) + + def __convert_audio(self) -> None: + temp_filename = self.__song_path.replace(".ogg", ".tmp") + os_replace(self.__song_path, temp_filename) + ffmpeg_cmd = f"ffmpeg -y -hide_banner -loglevel error -i \"{temp_filename}\" -c:a copy \"{self.__song_path}\"" + system(ffmpeg_cmd) + remove(temp_filename) + + def get_no_dw_track(self) -> Track: + return self.__c_track + + def easy_dw(self) -> Track: + pic = self.__song_metadata['image'] + image = request(pic).content + self.__song_metadata['image'] = image + song = f"{self.__song_metadata['music']} - {self.__song_metadata['artist']}" + + if not self.__not_interface: + print(f"Downloading: {song}") + + try: + self.download_try() + except Exception as e: + traceback.print_exc() + raise e + + return self.__c_track + + def download_try(self) -> Track: + if isfile(self.__song_path) and check_track(self.__c_track): + if self.__recursive_download: + return self.__c_track + + ans = input( + f"Track \"{self.__song_path}\" already exists, do you want to redownload it?(y or n):" + ) + + if not ans in answers: + return self.__c_track + + track_id = TrackId.from_base62(self.__ids) + + try: + stream = Download_JOB.session.content_feeder().load_track( + track_id, + VorbisOnlyAudioQuality(self.__dw_quality), + False, + None + ) + except RuntimeError: + raise TrackNotFound(self.__link) + + total_size = stream.input_stream.size + + # Ensure the directory exists before writing the file + os.makedirs(dirname(self.__song_path), exist_ok=True) + + with open(self.__song_path, "wb") as f: + c_stream = stream.input_stream.stream() + data = c_stream.read(total_size) + c_stream.close() + f.write(data) + + self.__convert_audio() + + self.__write_track() + + return self.__c_track + + def download_eps(self) -> Episode: + if isfile(self.__song_path) and check_track(self.__c_episode): + if self.__recursive_download: + return self.__c_episode + + ans = input( + f"Episode \"{self.__song_path}\" already exists, do you want to redownload it?(y or n):" + ) + + if not ans in answers: + return self.__c_episode + + episode_id = EpisodeId.from_base62(self.__ids) + + try: + stream = Download_JOB.session.content_feeder().load_episode( + episode_id, + VorbisOnlyAudioQuality(self.__dw_quality), + False, + None + ) + except RuntimeError: + raise TrackNotFound(self.__link) + + total_size = stream.input_stream.size + + # Ensure the directory exists before writing the file + os.makedirs(dirname(self.__song_path), exist_ok=True) + + with open(self.__song_path, "wb") as f: + c_stream = stream.input_stream.stream() + data = c_stream.read(total_size) + c_stream.close() + f.write(data) + + self.__convert_audio() + + self.__write_episode() + + return self.__c_episode + +def download_cli(preferences: Preferences) -> None: + __link = preferences.link + __output_dir = preferences.output_dir + __method_save = preferences.method_save + __not_interface = preferences.not_interface + __quality_download = preferences.quality_download + __recursive_download = preferences.recursive_download + __recursive_quality = preferences.recursive_quality + + cmd = f"deez-dw.py -so spo -l \"{__link}\" " + + if __output_dir: + cmd += f"-o {__output_dir} " + if __method_save: + cmd += f"-sa {__method_save} " + if __not_interface: + cmd += f"-g " + if __quality_download: + cmd += f"-q {__quality_download} " + if __recursive_download: + cmd += f"-rd " + if __recursive_quality: + cmd += f"-rq" + + system(cmd) + +class DW_TRACK: + def __init__( + self, + preferences: Preferences + ) -> None: + + self.__preferences = preferences + + def dw(self) -> Track: + track = EASY_DW(self.__preferences).easy_dw() + + return track + + def dw2(self) -> Track: + track = EASY_DW(self.__preferences).get_no_dw_track() + download_cli(self.__preferences) + + return track + +class DW_ALBUM: + def __init__( + self, + preferences: Preferences + ) -> None: + + self.__preferences = preferences + self.__ids = self.__preferences.ids + self.__make_zip = self.__preferences.make_zip + self.__output_dir = self.__preferences.output_dir + self.__method_save = self.__preferences.method_save + self.__song_metadata = self.__preferences.song_metadata + self.__not_interface = self.__preferences.not_interface + + self.__song_metadata_items = self.__song_metadata.items() + + def dw(self) -> Album: + pic = self.__song_metadata['image'] + image = request(pic).content + self.__song_metadata['image'] = image + + album = Album(self.__ids) + album.image = image + album.nb_tracks = self.__song_metadata['nb_tracks'] + album.album_name = self.__song_metadata['album'] + album.upc = self.__song_metadata['upc'] + tracks = album.tracks + album.md5_image = self.__ids + album.tags = self.__song_metadata + + c_song_metadata = {} + + for key, item in self.__song_metadata_items: + if type(item) is not list: + c_song_metadata[key] = self.__song_metadata[key] + + t = tqdm( + range(album.nb_tracks), + desc = c_song_metadata['album'], + disable = self.__not_interface + ) + + for a in t: + for key, item in self.__song_metadata_items: + if type(item) is list: + c_song_metadata[key] = self.__song_metadata[key][a] + + song = f"{c_song_metadata['music']} - {c_song_metadata['artist']}" + t.set_description_str(song) + c_preferences = deepcopy(self.__preferences) + c_preferences.song_metadata = c_song_metadata.copy() + c_preferences.ids = c_song_metadata['ids'] + c_preferences.link = f"https://open.spotify.com/track/{c_preferences.ids}" + + try: + track = EASY_DW(c_preferences).download_try() + except TrackNotFound: + track = Track( + c_song_metadata, + None, None, + None, None, None, + ) + + track.success = False + print(f"Track not found: {song} :(") + + tracks.append(track) + + if self.__make_zip: + song_quality = tracks[0].quality + + zip_name = create_zip( + tracks, + output_dir = self.__output_dir, + song_metadata = self.__song_metadata, + song_quality = song_quality, + method_save = self.__method_save + ) + + album.zip_path = zip_name + + return album + + def dw2(self) -> Album: + pic = self.__song_metadata['image'] + image = request(pic).content + self.__song_metadata['image'] = image + + album = Album(self.__ids) + album.image = image + album.nb_tracks = self.__song_metadata['nb_tracks'] + album.album_name = self.__song_metadata['album'] + album.upc = self.__song_metadata['upc'] + tracks = album.tracks + album.md5_image = self.__ids + album.tags = self.__song_metadata + + c_song_metadata = {} + + for key, item in self.__song_metadata_items: + if type(item) is not list: + c_song_metadata[key] = self.__song_metadata[key] + + t = tqdm( + range(album.nb_tracks), + desc = c_song_metadata['album'], + disable = True + ) + + for a in t: + for key, item in self.__song_metadata_items: + if type(item) is list: + c_song_metadata[key] = self.__song_metadata[key][a] + + song = f"{c_song_metadata['music']} - {c_song_metadata['artist']}" + t.set_description_str(song) + c_preferences = deepcopy(self.__preferences) + c_preferences.song_metadata = c_song_metadata.copy() + c_preferences.ids = c_song_metadata['ids'] + c_preferences.link = f"https://open.spotify.com/track/{c_preferences.ids}" + + track = EASY_DW(c_preferences).get_no_dw_track() + + tracks.append(track) + + download_cli(self.__preferences) + + if self.__make_zip: + song_quality = tracks[0].quality + + zip_name = create_zip( + tracks, + output_dir = self.__output_dir, + song_metadata = self.__song_metadata, + song_quality = song_quality, + method_save = self.__method_save + ) + + album.zip_path = zip_name + + return album + +class DW_PLAYLIST: + def __init__( + self, + preferences: Preferences + ) -> None: + + self.__preferences = preferences + self.__ids = self.__preferences.ids + self.__json_data = preferences.json_data + self.__make_zip = self.__preferences.make_zip + self.__output_dir = self.__preferences.output_dir + self.__song_metadata = self.__preferences.song_metadata + + def dw(self) -> Playlist: + playlist = Playlist() + tracks = playlist.tracks + + for c_song_metadata in self.__song_metadata: + if type(c_song_metadata) is str: + print(f"Track not found {c_song_metadata} :(") + continue + + c_preferences = deepcopy(self.__preferences) + c_preferences.ids = c_song_metadata['ids'] + c_preferences.song_metadata = c_song_metadata + + track = EASY_DW(c_preferences).easy_dw() + + if not track.success: + song = f"{c_song_metadata['music']} - {c_song_metadata['artist']}" + print(f"Cannot download {song}") + + tracks.append(track) + + if self.__make_zip: + playlist_title = self.__json_data['name'] + zip_name = f"{self.__output_dir}/{playlist_title} [playlist {self.__ids}]" + create_zip(tracks, zip_name = zip_name) + playlist.zip_path = zip_name + + return playlist + + def dw2(self) -> Playlist: + playlist = Playlist() + tracks = playlist.tracks + + for c_song_metadata in self.__song_metadata: + if type(c_song_metadata) is str: + print(f"Track not found {c_song_metadata} :(") + continue + + c_preferences = deepcopy(self.__preferences) + c_preferences.ids = c_song_metadata['ids'] + c_preferences.song_metadata = c_song_metadata + + track = EASY_DW(c_preferences).get_no_dw_track() + + if not track.success: + song = f"{c_song_metadata['music']} - {c_song_metadata['artist']}" + print(f"Cannot download {song}") + + tracks.append(track) + + download_cli(self.__preferences) + + if self.__make_zip: + playlist_title = self.__json_data['name'] + zip_name = f"{self.__output_dir}/{playlist_title} [playlist {self.__ids}]" + create_zip(tracks, zip_name = zip_name) + playlist.zip_path = zip_name + + return playlist + +class DW_EPISODE: + def __init__( + self, + preferences: Preferences + ) -> None: + + self.__preferences = preferences + + def dw(self) -> Episode: + episode = EASY_DW(self.__preferences).download_eps() + + return episode + + def dw2(self) -> Episode: + episode = EASY_DW(self.__preferences).get_no_dw_track() + download_cli(self.__preferences) + + return episode \ No newline at end of file diff --git a/deezspot/spotloader/__init__.py b/deezspot/spotloader/__init__.py new file mode 100644 index 0000000..6d3fa17 --- /dev/null +++ b/deezspot/spotloader/__init__.py @@ -0,0 +1,330 @@ +#!/usr/bin/python3 +import traceback +from os.path import isfile +from deezspot.easy_spoty import Spo +from librespot.core import Session +from deezspot.exceptions import InvalidLink +from deezspot.spotloader.__spo_api__ import tracking, tracking_album, tracking_episode +from deezspot.spotloader.spotify_settings import stock_quality +from deezspot.libutils.utils import ( + get_ids, + link_is_valid, + what_kind, +) +from deezspot.models import ( + Track, + Album, + Playlist, + Preferences, + Smart, + Episode +) +from deezspot.spotloader.__download__ import ( + DW_TRACK, + DW_ALBUM, + DW_PLAYLIST, + DW_EPISODE, + Download_JOB, +) +from deezspot.libutils.others_settings import ( + stock_output, + stock_recursive_quality, + stock_recursive_download, + stock_not_interface, + stock_zip, + method_save, + is_thread, +) +Spo() + +class SpoLogin: + def __init__( + self, + credentials_path: str, + ) -> None: + self.credentials_path = credentials_path + self.__initialize_session() + + def __initialize_session(self) -> None: + session_builder = Session.Builder() + session_builder.conf.stored_credentials_file = self.credentials_path + + if isfile(self.credentials_path): + session = session_builder.stored_file().create() + else: + raise FileNotFoundError("Please fill your credentials.json location!") + + Download_JOB(session) + + def download_track( + self, link_track, + output_dir=stock_output, + quality_download=stock_quality, + recursive_quality=stock_recursive_quality, + recursive_download=stock_recursive_download, + not_interface=stock_not_interface, + method_save=method_save, + is_thread=is_thread + ) -> Track: + try: + link_is_valid(link_track) + ids = get_ids(link_track) + song_metadata = tracking(ids) + + preferences = Preferences() + + preferences.link = link_track + preferences.song_metadata = song_metadata + preferences.quality_download = quality_download + preferences.output_dir = output_dir + preferences.ids = ids + preferences.recursive_quality = recursive_quality + preferences.recursive_download = recursive_download + preferences.not_interface = not_interface + preferences.method_save = method_save + preferences.is_episode = False + + if not is_thread: + track = DW_TRACK(preferences).dw() + else: + track = DW_TRACK(preferences).dw2() + + return track + except Exception as e: + traceback.print_exc() + raise e + + def download_album( + self, link_album, + output_dir=stock_output, + quality_download=stock_quality, + recursive_quality=stock_recursive_quality, + recursive_download=stock_recursive_download, + not_interface=stock_not_interface, + make_zip=stock_zip, + method_save=method_save, + is_thread=is_thread + ) -> Album: + try: + link_is_valid(link_album) + ids = get_ids(link_album) + album_json = Spo.get_album(ids) + song_metadata = tracking_album(album_json) + + preferences = Preferences() + + preferences.link = link_album + preferences.song_metadata = song_metadata + preferences.quality_download = quality_download + preferences.output_dir = output_dir + preferences.ids = ids + preferences.json_data = album_json + preferences.recursive_quality = recursive_quality + preferences.recursive_download = recursive_download + preferences.not_interface = not_interface + preferences.method_save = method_save + preferences.make_zip = make_zip + preferences.is_episode = False + + if not is_thread: + album = DW_ALBUM(preferences).dw() + else: + album = DW_ALBUM(preferences).dw2() + + return album + except Exception as e: + traceback.print_exc() + raise e + + def download_playlist( + self, link_playlist, + output_dir=stock_output, + quality_download=stock_quality, + recursive_quality=stock_recursive_quality, + recursive_download=stock_recursive_download, + not_interface=stock_not_interface, + make_zip=stock_zip, + method_save=method_save, + is_thread=is_thread + ) -> Playlist: + try: + link_is_valid(link_playlist) + ids = get_ids(link_playlist) + + song_metadata = [] + playlist_json = Spo.get_playlist(ids) + + for track in playlist_json['tracks']['items']: + is_track = track['track'] + + if not is_track: + continue + + external_urls = is_track['external_urls'] + + if not external_urls: + c_song_metadata = f"The track \"{is_track['name']}\" is not available on Spotify :(" + else: + ids = get_ids(external_urls['spotify']) + c_song_metadata = tracking(ids) + + song_metadata.append(c_song_metadata) + + preferences = Preferences() + + preferences.link = link_playlist + preferences.song_metadata = song_metadata + preferences.quality_download = quality_download + preferences.output_dir = output_dir + preferences.ids = ids + preferences.json_data = playlist_json + preferences.recursive_quality = recursive_quality + preferences.recursive_download = recursive_download + preferences.not_interface = not_interface + preferences.method_save = method_save + preferences.make_zip = make_zip + preferences.is_episode = False + + if not is_thread: + playlist = DW_PLAYLIST(preferences).dw() + else: + playlist = DW_PLAYLIST(preferences).dw2() + + return playlist + except Exception as e: + traceback.print_exc() + raise e + + def download_episode( + self, link_episode, + output_dir=stock_output, + quality_download=stock_quality, + recursive_quality=stock_recursive_quality, + recursive_download=stock_recursive_download, + not_interface=stock_not_interface, + method_save=method_save, + is_thread=is_thread + ) -> Episode: + try: + link_is_valid(link_episode) + ids = get_ids(link_episode) + episode_json = Spo.get_episode(ids) + episode_metadata = tracking_episode(ids) + + preferences = Preferences() + + preferences.link = link_episode + preferences.song_metadata = episode_metadata + preferences.output_dir = output_dir + preferences.ids = ids + preferences.json_data = episode_json + preferences.recursive_quality = recursive_quality + preferences.recursive_download = recursive_download + preferences.not_interface = not_interface + preferences.method_save = method_save + preferences.is_episode = True + + if not is_thread: + episode = DW_EPISODE(preferences).dw() + else: + episode = DW_EPISODE(preferences).dw2() + + return episode + except Exception as e: + traceback.print_exc() + raise e + + def download_smart( + self, link, + output_dir=stock_output, + quality_download=stock_quality, + recursive_quality=stock_recursive_quality, + recursive_download=stock_recursive_download, + not_interface=stock_not_interface, + make_zip=stock_zip, + method_save=method_save + ) -> Smart: + try: + link_is_valid(link) + link = what_kind(link) + smart = Smart() + + if "spotify.com" in link: + source = "https://spotify.com" + + smart.source = source + + if "track/" in link: + if not "spotify.com" in link: + raise InvalidLink(link) + + track = self.download_track( + link, + output_dir=output_dir, + quality_download=quality_download, + recursive_quality=recursive_quality, + recursive_download=recursive_download, + not_interface=not_interface, + method_save=method_save + ) + + smart.type = "track" + smart.track = track + + elif "album/" in link: + if not "spotify.com" in link: + raise InvalidLink(link) + + album = self.download_album( + link, + output_dir=output_dir, + quality_download=quality_download, + recursive_quality=recursive_quality, + recursive_download=recursive_download, + not_interface=not_interface, + make_zip=make_zip, + method_save=method_save + ) + + smart.type = "album" + smart.album = album + + elif "playlist/" in link: + if not "spotify.com" in link: + raise InvalidLink(link) + + playlist = self.download_playlist( + link, + output_dir=output_dir, + quality_download=quality_download, + recursive_quality=recursive_quality, + recursive_download=recursive_download, + not_interface=not_interface, + make_zip=make_zip, + method_save=method_save + ) + + smart.type = "playlist" + smart.playlist = playlist + + elif "episode/" in link: + if not "spotify.com" in link: + raise InvalidLink(link) + + episode = self.download_episode( + link, + output_dir=output_dir, + quality_download=quality_download, + recursive_quality=recursive_quality, + recursive_download=recursive_download, + not_interface=not_interface, + method_save=method_save + ) + + smart.type = "episode" + smart.episode = episode + + return smart + except Exception as e: + traceback.print_exc() + raise e \ No newline at end of file diff --git a/deezspot/spotloader/__spo_api__.py b/deezspot/spotloader/__spo_api__.py new file mode 100644 index 0000000..fcf0933 --- /dev/null +++ b/deezspot/spotloader/__spo_api__.py @@ -0,0 +1,153 @@ +#!/usr/bin/python3 + +from deezspot.easy_spoty import Spo +from datetime import datetime +from deezspot.libutils.utils import convert_to_date +import traceback + +def tracking(ids, album=None): + datas = {} + try: + json_track = Spo.get_track(ids) + + if not album: + album_ids = json_track['album']['id'] + json_album = Spo.get_album(album_ids) + datas['image'] = json_album['images'][0]['url'] + datas['image2'] = json_album['images'][1]['url'] + datas['image3'] = json_album['images'][2]['url'] + datas['genre'] = " & ".join(json_album['genres']) + + ar_album = [ + artist['name'] + for artist in json_album['artists'] + ] + + datas['ar_album'] = " & ".join(ar_album) + datas['album'] = json_album['name'] + datas['label'] = json_album['label'] + + external_ids = json_album['external_ids'] + + if external_ids: + datas['upc'] = external_ids['upc'] + else: + datas['upc'] = "Unknown" + + datas['nb_tracks'] = json_album['total_tracks'] + + datas['music'] = json_track['name'] + + artists = [ + artist['name'] + for artist in json_track['artists'] + ] + + datas['artist'] = " & ".join(artists) + datas['tracknum'] = json_track['track_number'] + datas['discnum'] = json_track['disc_number'] + + datas['year'] = convert_to_date( + json_track['album']['release_date'] + ) + + datas['bpm'] = "Unknown" + datas['duration'] = json_track['duration_ms'] // 1000 + + external_ids = json_track['external_ids'] + + if external_ids: + datas['isrc'] = external_ids['isrc'] + + datas['gain'] = "Unknown" + datas['ids'] = ids + except Exception as e: + traceback.print_exc() # Print traceback + # Optionally, handle or log the exception here + return None + + return datas + +def tracking_album(album_json): + song_metadata = {} + try: + song_metadata = { + "music": [], + "artist": [], + "tracknum": [], + "discnum": [], + "bpm": [], + "duration": [], + "isrc": [], + "gain": [], + "ids": [], + "image": album_json['images'][0]['url'], + "image2": album_json['images'][1]['url'], + "image3": album_json['images'][2]['url'], + "album": album_json['name'], + "label": album_json['label'], + "year": convert_to_date(album_json['release_date']), + "nb_tracks": album_json['total_tracks'], + "genre": " & ".join(album_json['genres']) + } + + ar_album = [ + artist['name'] + for artist in album_json['artists'] + ] + + song_metadata['ar_album'] = " & ".join(ar_album) + + external_ids = album_json['external_ids'] + + if external_ids: + song_metadata['upc'] = external_ids['upc'] + else: + song_metadata['upc'] = "Unknown" + + sm_items = song_metadata.items() + + for track in album_json['tracks']['items']: + c_ids = track['id'] + detas = tracking(c_ids, album=True) + + for key, item in sm_items: + if type(item) is list: + song_metadata[key].append(detas[key]) + except Exception as e: + traceback.print_exc() # Print traceback + # Optionally, handle or log the exception here + return None + + return song_metadata + +def tracking_episode(ids): + datas = {} + try: + json_episode = Spo.get_episode(ids) + + datas['audio_preview_url'] = json_episode.get('audio_preview_url', '') + datas['description'] = json_episode.get('description', '') + datas['duration'] = json_episode.get('duration_ms', 0) // 1000 + datas['explicit'] = json_episode.get('explicit', False) + datas['external_urls'] = json_episode.get('external_urls', {}).get('spotify', '') + datas['href'] = json_episode.get('href', '') + datas['html_description'] = json_episode.get('html_description', '') + datas['id'] = json_episode.get('id', '') + datas['image'] = json_episode['images'][0]['url'] if json_episode.get('images') else '' + datas['image2'] = json_episode['images'][1]['url'] if len(json_episode.get('images', [])) > 1 else '' + datas['image3'] = json_episode['images'][2]['url'] if len(json_episode.get('images', [])) > 2 else '' + datas['is_externally_hosted'] = json_episode.get('is_externally_hosted', False) + datas['is_playable'] = json_episode.get('is_playable', False) + datas['language'] = json_episode.get('language', '') + datas['languages'] = " & ".join(json_episode.get('languages', [])) + datas['name'] = json_episode.get('name', '') + datas['release_date'] = convert_to_date(json_episode.get('release_date', '')) + datas['show'] = json_episode.get('show', {}).get('name', '') + datas['publisher'] = json_episode.get('show', {}).get('publisher', '') + datas['ids'] = ids + except Exception as e: + traceback.print_exc() + return None + + return datas \ No newline at end of file diff --git a/deezspot/spotloader/spotify_settings.py b/deezspot/spotloader/spotify_settings.py new file mode 100644 index 0000000..02f1274 --- /dev/null +++ b/deezspot/spotloader/spotify_settings.py @@ -0,0 +1,26 @@ +#!/usr/bin/python3 + +from librespot.audio.decoders import AudioQuality + +stock_quality = "HIGH" +librespot_credentials = "credentials.json" + +qualities = { + "HIGH": { + "n_quality": AudioQuality.HIGH, + "f_format": ".ogg", + "s_quality": "HIGH" + }, + + "VERY_HIGH": { + "n_quality": AudioQuality.VERY_HIGH, + "f_format": ".ogg", + "s_quality": "VERY_HIGH" + }, + + "NORMAL": { + "n_quality": AudioQuality.NORMAL, + "f_format": ".ogg", + "s_quality": "NORMAL" + } +}