diff --git a/qobuz_dl/cli.py b/qobuz_dl/cli.py index 78e2526..c71b6c3 100644 --- a/qobuz_dl/cli.py +++ b/qobuz_dl/cli.py @@ -55,6 +55,9 @@ def reset_config(config_file): spoofer = spoofbuz.Spoofer() config["DEFAULT"]["app_id"] = str(spoofer.getAppId()) config["DEFAULT"]["secrets"] = ",".join(spoofer.getSecrets().values()) + config["DEFAULT"]["folder_format"] = "{artist} - {album} ({year}) " + "[{bit_depth}B-{sampling_rate}kHz]" + config["DEFAULT"]["track_format"] = "{tracknumber}. {tracktitle}" with open(config_file, "w") as configfile: config.write(configfile) logging.info( @@ -98,6 +101,20 @@ def main(): no_cover = config.getboolean("DEFAULT", "no_cover") no_database = config.getboolean("DEFAULT", "no_database") app_id = config["DEFAULT"]["app_id"] + + if ("folder_format" not in config["DEFAULT"] + or "track_format" not in config["DEFAULT"]): + logging.info(f'{YELLOW}Config file does not include format string,' + ' updating...') + config["DEFAULT"]["folder_format"] = "{artist} - {album} ({year}) " + "[{bit_depth}B-{sampling_rate}kHz]" + config["DEFAULT"]["track_format"] = "{tracknumber}. {tracktitle}" + with open(CONFIG_FILE, 'w') as cf: + config.write(cf) + + folder_format = config["DEFAULT"]["folder_format"] + track_format = config["DEFAULT"]["track_format"] + secrets = [ secret for secret in config["DEFAULT"]["secrets"].split(",") if secret ] @@ -131,6 +148,10 @@ def main(): cover_og_quality=arguments.og_cover or og_cover, no_cover=arguments.no_cover or no_cover, downloads_db=None if no_database or arguments.no_db else QOBUZ_DB, + folder_format=arguments.folder_format + if arguments.folder_format is not None else folder_format, + track_format=arguments.track_format + if arguments.track_format is not None else track_format, ) qobuz.initialize_client(email, password, app_id, secrets) diff --git a/qobuz_dl/commands.py b/qobuz_dl/commands.py index 1fcf291..41731e8 100644 --- a/qobuz_dl/commands.py +++ b/qobuz_dl/commands.py @@ -102,6 +102,21 @@ def add_common_arg(custom_parser, default_folder, default_quality): custom_parser.add_argument( "--no-db", action="store_true", help="don't call the database" ) + custom_parser.add_argument( + "-ff", + "--folder-format", + metavar='PATTERN', + help='pattern for formatting folder names, e.g ' + '"{artist} - {album} ({year})". available keys: artist, ' + 'albumartist, album, year, sampling_rate, bit_rate, tracktitle. ' + 'cannot contain characters used by the system, which includes /:<>', + ) + custom_parser.add_argument( + "-tf", + "--track-format", + metavar='PATTERN', + help='pattern for formatting track names. see `folder-format`.', + ) def qobuz_dl_args( diff --git a/qobuz_dl/core.py b/qobuz_dl/core.py index 553a388..5aa03b4 100644 --- a/qobuz_dl/core.py +++ b/qobuz_dl/core.py @@ -65,6 +65,9 @@ class QobuzDL: cover_og_quality=False, no_cover=False, downloads_db=None, + folder_format='{artist} - {album} ({year}) [{bit_depth}B-' + '{sampling_rate}kHz]', + track_format='{tracknumber}. {tracktitle}', ): self.directory = self.create_dir(directory) self.quality = quality @@ -78,6 +81,8 @@ class QobuzDL: self.cover_og_quality = cover_og_quality self.no_cover = no_cover self.downloads_db = create_db(downloads_db) if downloads_db else None + self.folder_format = folder_format + self.track_format = track_format def initialize_client(self, email, pwd, app_id, secrets): self.client = qopy.Client(email, pwd, app_id, secrets) @@ -140,6 +145,8 @@ class QobuzDL: self.quality_fallback, self.cover_og_quality, self.no_cover, + folder_format=self.folder_format, + track_format=self.track_format ) handle_download_id(self.downloads_db, item_id, add_id=True) except (requests.exceptions.RequestException, NonStreamable) as e: diff --git a/qobuz_dl/downloader.py b/qobuz_dl/downloader.py index 86dde5a..3aac3b5 100644 --- a/qobuz_dl/downloader.py +++ b/qobuz_dl/downloader.py @@ -1,5 +1,6 @@ import logging import os +from typing import Tuple import requests from pathvalidate import sanitize_filename @@ -10,6 +11,18 @@ from qobuz_dl.color import OFF, GREEN, RED, YELLOW, CYAN from qobuz_dl.exceptions import NonStreamable QL_DOWNGRADE = "FormatRestrictedByFormatAvailability" +# used in case of error +DEFAULT_FORMATS = { + 'MP3': [ + '{artist} - {album} ({year}) [MP3]', + '{tracknumber}. {tracktitle}', + ], + 'Unknown': [ + '{artist} - {album}', + '{tracknumber}. {tracktitle}', + ] +} + logger = logging.getLogger(__name__) @@ -37,11 +50,12 @@ def get_description(u: dict, track_title, multiple=None): return downloading_title -def get_format(client, item_dict, quality, - is_track_id=False, track_url_dict=None): +def get_format(client, item_dict, + quality, is_track_id=False, + track_url_dict=None) -> Tuple[str, bool, int, int]: quality_met = True if int(quality) == 5: - return "MP3", quality_met + return ("MP3", quality_met, None, None) track_dict = item_dict if not is_track_id: track_dict = item_dict["tracks"]["items"][0] @@ -59,18 +73,15 @@ def get_format(client, item_dict, quality, for restriction in restrictions ): quality_met = False - if ( - new_track_dict["bit_depth"] == 16 - and new_track_dict["sampling_rate"] == 44.1 - ): - return "FLAC", quality_met + return ( - f'{new_track_dict["bit_depth"]}B-' - f'{new_track_dict["sampling_rate"]}Khz', + "FLAC", quality_met, + new_track_dict["bit_depth"], + new_track_dict["sampling_rate"], ) except (KeyError, requests.exceptions.HTTPError): - return "Unknown", quality_met + return ("Unknown", quality_met, None, None) def get_title(item_dict): @@ -108,6 +119,7 @@ def download_and_tag( is_mp3, embed_art=False, multiple=None, + track_format='{tracknumber}. {tracktitle}', ): """ Download and tag a file @@ -120,9 +132,11 @@ def download_and_tag( :param bool is_track :param bool is_mp3 :param bool embed_art: Embed cover art into file (FLAC-only) + :param str track_format format-string that determines file naming :param multiple: Multiple disc integer :type multiple: integer or None """ + extension = ".mp3" if is_mp3 else ".flac" try: @@ -138,28 +152,28 @@ def download_and_tag( filename = os.path.join(root_dir, f".{tmp_count:02}.tmp") # Determine the filename - artist = track_metadata.get("performer", {}).get("name") - album_artist = track_metadata.get("album", {}).get("artist", - {}).get("name") - new_track_title = track_metadata.get("title") - version = track_metadata.get("version") - - if artist or album_artist: - new_track_title = ( - f"{artist if artist else album_artist}" - f' - {track_metadata["title"]}' - ) - if version: - new_track_title = f"{new_track_title} ({version})" - - track_file = f'{track_metadata["track_number"]:02}. {new_track_title}' - final_file = os.path.join(root_dir, sanitize_filename(track_file))[:250] + extension + track_title = track_metadata.get("title") + artist = _safe_get(track_metadata, "performer", "name") + filename_attr = { + 'artist': artist, + 'albumartist': _safe_get(track_metadata, "album", "artist", "name", + default=artist), + 'bit_depth': track_metadata['maximum_bit_depth'], + 'sampling_rate': track_metadata['maximum_sampling_rate'], + 'tracktitle': track_title, + 'version': track_metadata.get("version"), + 'tracknumber': f"{track_metadata['track_number']:02}" + } + # track_format is a format string + # e.g. '{tracknumber}. {artist} - {tracktitle}' + formatted_path = sanitize_filename(track_format.format(**filename_attr)) + final_file = os.path.join(root_dir, formatted_path)[:250] + extension if os.path.isfile(final_file): - logger.info(f"{OFF}{new_track_title} was already downloaded") + logger.info(f"{OFF}{track_title} was already downloaded") return - desc = get_description(track_url_dict, new_track_title, multiple) + desc = get_description(track_url_dict, track_title, multiple) tqdm_download(url, filename, desc) tag_function = metadata.tag_mp3 if is_mp3 else metadata.tag_flac try: @@ -187,6 +201,9 @@ def download_id_by_type( downgrade_quality=True, cover_og_quality=False, no_cover=False, + folder_format='{artist} - {album} ({year}) ' + '[{bit_depth}B-{sampling_rate}kHz]', + track_format='{tracknumber}. {tracktitle}', ): """ Download and get metadata by ID and type (album or track) @@ -201,6 +218,8 @@ def download_id_by_type( :param bool downgrade: Skip releases not available in set quality :param bool cover_og_quality: Download cover in its original quality :param bool no_cover: Don't download cover art + :param str folder_format: format string that determines folder naming + :param str track_format: format string that determines track naming """ count = 0 @@ -218,7 +237,10 @@ def download_id_by_type( return album_title = get_title(meta) - album_format, quality_met = get_format(client, meta, quality) + + format_info = get_format(client, meta, quality) + file_format, quality_met, bit_depth, sampling_rate = format_info + if not downgrade_quality and not quality_met: logger.info( f"{OFF}Skipping {album_title} as it doesn't " @@ -227,14 +249,21 @@ def download_id_by_type( return logger.info(f"\n{YELLOW}Downloading: {album_title}\n" - f"Quality: {album_format}\n") - dirT = ( - meta["artist"]["name"], - album_title, - meta["release_date_original"].split("-")[0], - album_format, + f"Quality: {file_format}\n") + album_attr = { + 'artist': meta["artist"]["name"], + 'album': album_title, + 'year': meta["release_date_original"].split("-")[0], + 'format': file_format, + 'bit_depth': bit_depth, + 'sampling_rate': sampling_rate + } + folder_format, track_format = _clean_format_str(folder_format, + track_format, + file_format) + sanitized_title = sanitize_filename( + folder_format.format(**album_attr) ) - sanitized_title = sanitize_filename("{} - {} ({}) [{}]".format(*dirT)) dirn = os.path.join(path, sanitized_title) os.makedirs(dirn, exist_ok=True) @@ -266,6 +295,7 @@ def download_id_by_type( is_mp3, embed_art, i["media_number"] if is_multiple else None, + track_format=track_format, ) else: logger.info(f"{OFF}Demo. Skipping") @@ -277,23 +307,31 @@ def download_id_by_type( meta = client.get_track_meta(item_id) track_title = get_title(meta) logger.info(f"\n{YELLOW}Downloading: {track_title}") - track_format, quality_met = get_format(client, meta, - quality, True, parse) + format_info = get_format(client, meta, quality, + is_track_id=True, track_url_dict=parse) + file_format, quality_met, bit_depth, sampling_rate = format_info + + folder_format, track_format = _clean_format_str(folder_format, + track_format, + bit_depth) + if not downgrade_quality and not quality_met: logger.info( f"{OFF}Skipping {track_title} as it doesn't " "meet quality requirement" ) return - dirT = ( - meta["album"]["artist"]["name"], - track_title, - meta["album"]["release_date_original"].split("-")[0], - track_format, - ) + track_attr = { + 'artist': meta["album"]["artist"]["name"], + 'tracktitle': track_title, + 'year': meta["album"]["release_date_original"].split("-")[0], + 'bit_depth': bit_depth, + 'sampling_rate': sampling_rate + } sanitized_title = sanitize_filename( - "{} - {} [{}] [{}]".format(*dirT) + folder_format.format(**track_attr) ) + dirn = os.path.join(path, sanitized_title) os.makedirs(dirn, exist_ok=True) if no_cover: @@ -305,7 +343,54 @@ def download_id_by_type( ) is_mp3 = True if int(quality) == 5 else False download_and_tag(dirn, count, parse, meta, - meta, True, is_mp3, embed_art) + meta, True, is_mp3, embed_art, + track_format=track_format) else: logger.info(f"{OFF}Demo. Skipping") logger.info(f"{GREEN}Completed") + + +# ----------- Utilities ----------- + +def _clean_format_str(folder: str, track: str, + file_format: str) -> Tuple[str, str]: + '''Cleans up the format strings, avoids errors + with MP3 files. + ''' + final = [] + for i, fs in enumerate((folder, track)): + if fs.endswith('.mp3'): + fs = fs[:-4] + elif fs.endswith('.flac'): + fs = fs[:-5] + fs = fs.strip() + + # default to pre-chosen string if format is invalid + if (file_format in ('MP3', 'Unknown') and + 'bit_depth' in file_format or 'sampling_rate' in file_format): + default = DEFAULT_FORMATS[file_format][i] + logger.error(f'{RED}invalid format string for format {file_format}' + f'. defaulting to {default}') + fs = default + final.append(fs) + + return tuple(final) + + +def _safe_get(d: dict, *keys, default=None): + '''A replacement for chained `get()` statements on dicts: + >>> d = {'foo': {'bar': 'baz'}} + >>> _safe_get(d, 'baz') + None + >>> _safe_get(d, 'foo', 'bar') + 'baz' + ''' + curr = d + res = default + for key in keys: + res = curr.get(key, default) + if res == default or not hasattr(res, '__getitem__'): + return res + else: + curr = res + return res diff --git a/qobuz_dl/metadata.py b/qobuz_dl/metadata.py index 8d594a2..cea7bdc 100644 --- a/qobuz_dl/metadata.py +++ b/qobuz_dl/metadata.py @@ -10,6 +10,9 @@ logger = logging.getLogger(__name__) # unicode symbols COPYRIGHT, PHON_COPYRIGHT = '\u2117', '\u00a9' +# if a metadata block exceeds this, mutagen will raise error +# and the file won't be tagged +FLAC_MAX_BLOCKSIZE = 16777215 def get_title(track_dict): @@ -110,6 +113,12 @@ def tag_flac(filename, root_dir, final_name, d, album, cover_image = multi_emb_image try: + # rest of the metadata still gets embedded + # when the image size is too big + if os.path.getsize(cover_image) > FLAC_MAX_BLOCKSIZE: + raise Exception("downloaded cover size too large to embed. " + "turn off `og_cover` to avoid error") + image = Picture() image.type = 3 image.mime = "image/jpeg"