improved function, added cli and config support

Signed-off-by: nathannathant <74019033+pynathanthomas@users.noreply.github.com>
This commit is contained in:
nathannathant 2021-03-04 22:04:10 -08:00
parent eb19e7345c
commit 32015dca4f
3 changed files with 96 additions and 106 deletions

View File

@ -58,6 +58,7 @@ def reset_config(config_file):
config["DEFAULT"]["folder_format"] = "{artist} - {album} ({year}) " config["DEFAULT"]["folder_format"] = "{artist} - {album} ({year}) "
"[{bit_depth}B-{sampling_rate}kHz]" "[{bit_depth}B-{sampling_rate}kHz]"
config["DEFAULT"]["track_format"] = "{tracknumber}. {tracktitle}" config["DEFAULT"]["track_format"] = "{tracknumber}. {tracktitle}"
config["DEFAULT"]["smart_discography"] = "false"
with open(config_file, "w") as configfile: with open(config_file, "w") as configfile:
config.write(configfile) config.write(configfile)
logging.info( logging.info(
@ -105,16 +106,19 @@ def main():
if ( if (
"folder_format" not in config["DEFAULT"] "folder_format" not in config["DEFAULT"]
or "track_format" not in config["DEFAULT"] or "track_format" not in config["DEFAULT"]
or "smart_discography" not in config["DEFAULT"]
): ):
logging.info( logging.info(
f"{YELLOW}Config file does not include format string," " updating..." f"{YELLOW}Config file does not include some settings, updating..."
) )
config["DEFAULT"]["folder_format"] = "{artist} - {album} ({year}) " config["DEFAULT"]["folder_format"] = "{artist} - {album} ({year}) "
"[{bit_depth}B-{sampling_rate}kHz]" "[{bit_depth}B-{sampling_rate}kHz]"
config["DEFAULT"]["track_format"] = "{tracknumber}. {tracktitle}" config["DEFAULT"]["track_format"] = "{tracknumber}. {tracktitle}"
config["DEFAULT"]["smart_discography"] = "false"
with open(CONFIG_FILE, "w") as cf: with open(CONFIG_FILE, "w") as cf:
config.write(cf) config.write(cf)
smart_discography = config.getboolean("DEFAULT", "smart_discography")
folder_format = config["DEFAULT"]["folder_format"] folder_format = config["DEFAULT"]["folder_format"]
track_format = config["DEFAULT"]["track_format"] track_format = config["DEFAULT"]["track_format"]
@ -151,12 +155,9 @@ def main():
cover_og_quality=arguments.og_cover or og_cover, cover_og_quality=arguments.og_cover or og_cover,
no_cover=arguments.no_cover or no_cover, no_cover=arguments.no_cover or no_cover,
downloads_db=None if no_database or arguments.no_db else QOBUZ_DB, downloads_db=None if no_database or arguments.no_db else QOBUZ_DB,
folder_format=arguments.folder_format folder_format=arguments.folder_format or folder_format,
if arguments.folder_format is not None track_format=arguments.track_format or track_format,
else folder_format, smart_discography=arguments.smart_discography or smart_discography,
track_format=arguments.track_format
if arguments.track_format is not None
else track_format,
) )
qobuz.initialize_client(email, password, app_id, secrets) qobuz.initialize_client(email, password, app_id, secrets)

View File

@ -117,6 +117,12 @@ def add_common_arg(custom_parser, default_folder, default_quality):
metavar="PATTERN", metavar="PATTERN",
help="pattern for formatting track names. see `folder-format`.", help="pattern for formatting track names. see `folder-format`.",
) )
custom_parser.add_argument(
"-sd",
"--smart-discography",
action="store_true",
help="Try to filter out unrelated albums when requesting an artists discography.",
)
def qobuz_dl_args( def qobuz_dl_args(

View File

@ -1,7 +1,3 @@
# ----- Testing ------
import json
# --------------------
import logging import logging
import os import os
import re import re
@ -26,7 +22,12 @@ WEB_URL = "https://play.qobuz.com/"
ARTISTS_SELECTOR = "td.chartlist-artist > a" ARTISTS_SELECTOR = "td.chartlist-artist > a"
TITLE_SELECTOR = "td.chartlist-name > a" TITLE_SELECTOR = "td.chartlist-name > a"
EXTENSIONS = (".mp3", ".flac") EXTENSIONS = (".mp3", ".flac")
QUALITIES = {5: "5 - MP3", 6: "6 - FLAC", 7: "7 - 24B<96kHz", 27: "27 - 24B>96kHz"} QUALITIES = {
5: "5 - MP3",
6: "6 - 16 bit, 44.1kHz",
7: "7 - 24 bit, <96kHz",
27: "27 - 24 bit, >96kHz",
}
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -91,7 +92,7 @@ class QobuzDL:
def initialize_client(self, email, pwd, app_id, secrets): def initialize_client(self, email, pwd, app_id, secrets):
self.client = qopy.Client(email, pwd, app_id, secrets) self.client = qopy.Client(email, pwd, app_id, secrets)
logger.info(f"{YELLOW}Set quality: {QUALITIES[int(self.quality)]}\n") logger.info(f"{YELLOW}Set max quality: {QUALITIES[int(self.quality)]}\n")
def get_tokens(self): def get_tokens(self):
spoofer = spoofbuz.Spoofer() spoofer = spoofbuz.Spoofer()
@ -185,10 +186,18 @@ class QobuzDL:
os.path.join(self.directory, sanitize_filename(content_name)) os.path.join(self.directory, sanitize_filename(content_name))
) )
# items = [item[type_dict["iterable_key"]]["items"] for item in content][0] if self.smart_discography and url_type == "artist":
logger.info(f"{YELLOW}Filtering {content_name}'s discography")
items = self.smart_discography_filter( items = self.smart_discography_filter(
content, True, True, content,
save_space=True,
skip_extras=True,
) )
else:
items = [item[type_dict["iterable_key"]]["items"] for item in content][
0
]
logger.info(f"{YELLOW}{len(items)} downloads in queue") logger.info(f"{YELLOW}{len(items)} downloads in queue")
for item in items: for item in items:
self.download_from_id( self.download_from_id(
@ -482,110 +491,84 @@ class QobuzDL:
pl.write("\n\n".join(track_list)) pl.write("\n\n".join(track_list))
def smart_discography_filter( def smart_discography_filter(
self, contents: list, save_space=False, remove_extras=False self, contents: list, save_space=False, skip_extras=False
) -> list: ) -> list:
"""When downloading some artists' discography, there can be a lot """When downloading some artists' discography, many random and spam-like
of duplicate albums that needlessly use 10's of GB of bandwidth. This albums can get downloaded. This helps filter those out to just get the good stuff.
filters the duplicates.
Example (Stevie Wonder): This function removes:
* ... * albums by other artists, which may contain a feature from the requested artist
* Songs In The Key of Life [24/192] * duplicate albums in different qualities
* Songs In The Key of Life [24/96] * (optionally) removes collector's, deluxe, live albums
* Songs In The Key of Life [16/44.1]
* ...
This function should choose either [24/96] or [24/192]. :param list contents: contents returned by qobuz API
It also skips deluxe albums in favor of the originals, picks remasters :param bool save_space: choose highest bit depth, lowest sampling rate
in favor of originals, and removes albums by other artists that just :param bool remove_extras: remove albums with extra material (i.e. live, deluxe,...)
feature the requested artist. :returns: filtered items list
""" """
def print_album(a: dict): # for debugging
print( def print_album(album: dict):
f"{album['title']} - {album['version']} ({album['maximum_bit_depth']}/{album['maximum_sampling_rate']})" logger.info(
f"{album['title']} - {album.get('version', '~~')} ({album['maximum_bit_depth']}/{album['maximum_sampling_rate']} by {album['artist']['name']}) {album['id']}"
) )
def remastered(s: str) -> bool: TYPE_REGEXES = {
"""Case insensitive match to check whether "remaster": r"(?i)(re)?master(ed)?",
an album is remastered. "extra": r"(?i)(anniversary|deluxe|live|collector|demo|expanded)",
}
def is_type(album_t: str, album: dict) -> bool:
version = album.get("version", "")
title = album.get("title", "")
regex = TYPE_REGEXES[album_t]
return re.search(regex, f"{title} {version}") is not None
def essence(album: dict) -> str:
"""Ignore text in parens/brackets, return all lowercase.
Used to group two albums that may be named similarly, but not exactly
the same.
""" """
if s is None: r = re.match(r"([^\(]+)(?:\s*[\(\[][^\)][\)\]])*", album)
return False return r.group(1).strip().lower()
return re.match(r"(?i)(re)?master(ed)?", s) is not None
def extra(album: dict) -> bool: requested_artist = contents[0]["name"]
assert hasattr(album, "__getitem__"), "param must be dict-like"
if 'version' not in album:
return False
return (
re.findall(
r"(?i)(anniversary|deluxe|live|collector|demo)",
f"{album['title']} {album['version']}",
)
!= []
)
# remove all albums by other artists
artist = contents[0]["name"]
items = [item["albums"]["items"] for item in contents][0] items = [item["albums"]["items"] for item in contents][0]
artist_f = [] # artist filtered
# use dicts to group duplicate albums together by title
title_grouped = dict()
for item in items: for item in items:
if item["artist"]["name"] == artist: if (t := essence(item["title"])) not in title_grouped:
artist_f.append(item) title_grouped[t] = []
title_grouped[t].append(item)
# use dicts to group duplicate titles together items = []
titles_f = dict() for albums in title_grouped.values():
for item in artist_f: best_bit_depth = max(a["maximum_bit_depth"] for a in albums)
if (t := item["title"]) not in titles_f: get_best = min if save_space else max
titles_f[t] = [] best_sampling_rate = get_best(
titles_f[t].append(item)
# pick desired quality out of duplicates
# remasters are given preferred status
quality_f = []
for albums in titles_f.values():
# no duplicates for title
if len(albums) == 1:
quality_f.append(albums[0])
continue
# desired bit depth and sampling rate
bit_depth = max(a["maximum_bit_depth"] for a in albums)
# having sampling rate > 44.1kHz is a waste of space
# https://en.wikipedia.org/wiki/NyquistShannon_sampling_theorem
# https://en.wikipedia.org/wiki/44,100_Hz#Human_hearing_and_signal_processing
cmp_func = min if save_space else max
sampling_rate = cmp_func(
a["maximum_sampling_rate"] a["maximum_sampling_rate"]
for a in albums for a in albums
if a["maximum_bit_depth"] == bit_depth if a["maximum_bit_depth"] == best_bit_depth
) )
has_remaster = bool([a for a in albums if remastered(a["version"])]) remaster_exists = any(is_type("remaster", a) for a in albums)
# check if album has desired bit depth and sampling rate def is_valid(album):
# if there is a remaster in `item`, check if the album is a remaster return (
for album in albums: album["maximum_bit_depth"] == best_bit_depth
if ( and album["maximum_sampling_rate"] == best_sampling_rate
album["maximum_bit_depth"] == bit_depth and album["artist"]["name"] == requested_artist
and album["maximum_sampling_rate"] == sampling_rate and not ( # states that are not allowed
): (remaster_exists and not is_type("remaster", album))
if not has_remaster: or (skip_extras and is_type("extra", album))
quality_f.append(album) )
elif remastered(album["version"]): )
quality_f.append(album)
if remove_extras: filtered = tuple(filter(is_valid, albums))
final = [] # most of the time, len is 0 or 1.
# this filters those huge albums with outtakes, live performances etc. # if greater, it is a complete duplicate,
for album in quality_f: # so it doesn't matter which is chosen
if not extra(album): if len(filtered) >= 1:
final.append(album) items.append(filtered[0])
else:
final = quality_f
return final return items
# key = lambda a: a["title"]
# final.sort(key=key)
# for album in final:
# print_album(album)