qobuz-dl/qobuz_dl/cli.py
vitiko98 621c609721 Added new modes: interactive, download and lucky
Save files in separated folders when more than one disc. Fix for #13

Handle albums with the same name and different qualities. Fix for #15

Support for embedded artwork. Fix for #12

Check if the file exists before downloading

New README
2020-12-09 14:52:18 -04:00

315 lines
10 KiB
Python

import argparse
import configparser
import os
import re
import sys
from pick import pick
from pathvalidate import sanitize_filename
import qobuz_dl.spoofbuz as spoofbuz
from qobuz_dl import downloader, qopy
from qobuz_dl.search import Search
from qobuz_dl.commands import qobuz_dl_args
if os.name == "nt":
OS_CONFIG = os.environ.get("APPDATA")
else:
OS_CONFIG = os.path.join(os.environ["HOME"], ".config")
CONFIG_PATH = os.path.join(OS_CONFIG, "qobuz-dl")
CONFIG_FILE = os.path.join(CONFIG_PATH, "config.ini")
QUALITIES = {5: "320", 6: "LOSSLESS", 7: "24B <96KHZ", 27: "24B <196KHZ"}
def reset_config(config_file):
print("Creating config file: " + config_file)
config = configparser.ConfigParser()
config["DEFAULT"]["email"] = input("\nEnter your email:\n- ")
config["DEFAULT"]["password"] = input("\nEnter your password\n- ")
config["DEFAULT"]["default_folder"] = (
input("\nFolder for downloads (leave empy for default 'Qobuz Downloads')\n- ")
or "Qobuz Downloads"
)
config["DEFAULT"]["default_quality"] = (
input(
"\nDownload quality (5, 6, 7, 27) "
"[320, LOSSLESS, 24B <96KHZ, 24B >96KHZ]"
"\n(leave empy for default '6')\n- "
)
or "6"
)
config["DEFAULT"]["default_limit"] = "10"
print("Getting tokens. Please wait...")
spoofer = spoofbuz.Spoofer()
config["DEFAULT"]["app_id"] = str(spoofer.getAppId())
config["DEFAULT"]["secrets"] = ",".join(spoofer.getSecrets().values())
with open(config_file, "w") as configfile:
config.write(configfile)
print("Config file updated.")
def musicDir(directory):
fix = os.path.normpath(directory)
if not os.path.isdir(fix):
print("New directory created: " + fix)
os.makedirs(fix, exist_ok=True)
return fix
def get_id(url):
return re.match(
r"https?://(?:w{0,3}|play|open)\.qobuz\.com/(?:(?:album|track|artist"
"|playlist|label)/|[a-z]{2}-[a-z]{2}/album/-?\w+(?:-\w+)*-?/|user/library/favorites/)(\w+)",
url,
).group(1)
def processSelected(Qz, path, albums, ids, types, quality, embed_art=False):
quality = [i for i in QUALITIES.keys()][quality[1]]
for alb, id_, type_ in zip(albums, ids, types):
for al in alb:
downloader.download_id_by_type(
Qz,
id_[al[1]],
path,
quality,
True if type_[al[1]] else False,
embed_art,
)
def fromUrl(Qz, id, path, quality, album=True, embed_art=False):
downloader.download_id_by_type(Qz, id, path, str(quality), album, embed_art)
def handle_urls(url, client, path, quality, embed_art=False):
possibles = {
"playlist": {"func": client.get_plist_meta, "iterable_key": "tracks"},
"artist": {"func": client.get_artist_meta, "iterable_key": "albums"},
"label": {"func": client.get_label_meta, "iterable_key": "albums"},
"album": {"album": True, "func": None, "iterable_key": None},
"track": {"album": False, "func": None, "iterable_key": None},
}
try:
url_type = url.split("/")[3]
type_dict = possibles[url_type]
item_id = get_id(url)
except (KeyError, IndexError):
print('Invalid url: "{}". Use urls from https://play.qobuz.com!'.format(url))
return
if type_dict["func"]:
content = [item for item in type_dict["func"](item_id)]
content_name = content[0]["name"]
print(
"\nDownloading all the music from {} ({})!".format(content_name, url_type)
)
new_path = musicDir(os.path.join(path, sanitize_filename(content_name)))
items = [item[type_dict["iterable_key"]]["items"] for item in content][0]
for item in items:
fromUrl(
client,
item["id"],
new_path,
quality,
True if type_dict["iterable_key"] == "albums" else False,
embed_art,
)
else:
fromUrl(client, item_id, path, quality, type_dict["album"], embed_art)
def interactive(Qz, path, limit, tracks=True, embed_art=False):
while True:
Albums, Types, IDs = [], [], []
try:
while True:
query = input("\nEnter your search: [Ctrl + c to quit]\n- ")
print("Searching...")
if len(query.strip()) == 0:
break
start = Search(Qz, query, limit)
start.getResults(tracks)
if len(start.Total) == 0:
break
Types.append(start.Types)
IDs.append(start.IDs)
title = (
"Select [space] the item(s) you want to download "
"(zero or more)\nPress Ctrl + c to quit\n"
)
Selected = pick(
start.Total, title, multiselect=True, min_selection_count=0
)
if len(Selected) > 0:
Albums.append(Selected)
y_n = pick(
["Yes", "No"],
"Items were added to queue to be downloaded. Keep searching?",
)
if y_n[0][0] == "N":
break
else:
break
if len(Albums) > 0:
desc = (
"Select [intro] the quality (the quality will be automat"
"ically\ndowngraded if the selected is not found)"
)
Qualits = ["320", "Lossless", "Hi-res =< 96kHz", "Hi-Res > 96 kHz"]
quality = pick(Qualits, desc, default_index=1)
processSelected(Qz, path, Albums, IDs, Types, quality, embed_art)
except KeyboardInterrupt:
sys.exit("\nBye")
def download_by_txt_file(Qz, txt_file, path, quality, embed_art=False):
with open(txt_file, "r") as txt:
try:
urls = txt.read().strip().split()
except Exception as e:
print("Invalid text file: " + str(e))
return
print(
'qobuz-dl will download {} urls from file: "{}"\n'.format(
len(urls), txt_file
)
)
for url in urls:
handle_urls(url, Qz, path, quality, embed_art)
def download_lucky_mode(Qz, mode, query, limit, path, quality, embed_art=False):
if len(query) < 3:
sys.exit("Your search query is too short or invalid!")
print(
'Searching {}s for "{}".\n'
"qobuz-dl will attempt to download the first {} results.".format(
mode, query, limit
)
)
WEB_URL = "https://play.qobuz.com/"
possibles = {
"album": {
"func": Qz.search_albums,
"album": True,
"key": "albums",
},
"artist": {
"func": Qz.search_artists,
"album": True,
"key": "artists",
},
"track": {
"func": Qz.search_tracks,
"album": False,
"key": "tracks",
},
"playlist": {
"func": Qz.search_playlists,
"album": False,
"key": "playlists",
},
}
try:
mode_dict = possibles[mode]
results = mode_dict["func"](query, limit)
iterable = results[mode_dict["key"]]["items"]
# Use handle_urls as everything is already handled there :p
urls = ["{}{}/{}".format(WEB_URL, mode, i["id"]) for i in iterable]
print("Found {} results!".format(len(urls)))
for url in urls:
handle_urls(url, Qz, path, quality, embed_art)
except (KeyError, IndexError):
sys.exit("Invalid mode: " + str(mode))
def main():
if not os.path.isdir(CONFIG_PATH) or not os.path.isfile(CONFIG_FILE):
try:
os.makedirs(CONFIG_PATH, exist_ok=True)
except FileExistsError:
pass
reset_config(CONFIG_FILE)
if len(sys.argv) < 2:
sys.exit(qobuz_dl_args().print_help())
email = None
password = None
app_id = None
secrets = None
config = configparser.ConfigParser()
config.read(CONFIG_FILE)
try:
email = config["DEFAULT"]["email"]
password = config["DEFAULT"]["password"]
default_folder = config["DEFAULT"]["default_folder"]
default_limit = config["DEFAULT"]["default_limit"]
default_quality = config["DEFAULT"]["default_quality"]
app_id = config["DEFAULT"]["app_id"]
secrets = [
secret for secret in config["DEFAULT"]["secrets"].split(",") if secret
]
arguments = qobuz_dl_args(
default_quality, default_limit, default_folder
).parse_args()
except KeyError:
arguments = qobuz_dl_args().parse_args()
if not arguments.reset:
print("Your config file is corrupted! Run 'qobuz-dl -r' to fix this\n")
if arguments.reset:
sys.exit(reset_config(CONFIG_FILE))
directory = musicDir(arguments.directory)
Qz = qopy.Client(email, password, app_id, secrets)
try:
quality_str = QUALITIES[int(arguments.quality)]
print("Quality set: " + quality_str)
except KeyError:
sys.exit("Invalid quality!")
if arguments.command == "fun":
sys.exit(
interactive(
Qz,
directory,
arguments.limit,
not arguments.albums_only,
arguments.embed_art,
)
)
if arguments.command == "dl":
for url in arguments.SOURCE:
if os.path.isfile(url):
download_by_txt_file(
Qz, url, directory, arguments.quality, arguments.embed_art
)
else:
handle_urls(url, Qz, directory, arguments.quality, arguments.embed_art)
else:
download_lucky_mode(
Qz,
arguments.type,
" ".join(arguments.QUERY),
arguments.number,
directory,
arguments.quality,
arguments.embed_art,
)
if __name__ == "__main__":
sys.exit(main())