mirror of
https://github.com/Wojtek242/qobuz-dl.git
synced 2025-01-22 03:24:19 +01:00
Major changes
* Handle duplicate IDs with database (close #45) * Prettify the code with f strings * New filename format * Add label tag * Add technical info to the folder for Hi-Res downloads
This commit is contained in:
parent
0ded90c0f8
commit
36c01a326e
@ -1,5 +1,5 @@
|
||||
# qobuz-dl
|
||||
Search, discover and download Lossless and Hi-Res music from [Qobuz](https://www.qobuz.com/).
|
||||
Search, explore and download Lossless and Hi-Res music from [Qobuz](https://www.qobuz.com/).
|
||||
|
||||
## Features
|
||||
|
||||
@ -8,9 +8,11 @@ Search, discover and download Lossless and Hi-Res music from [Qobuz](https://www
|
||||
* Download albums, tracks, artists, playlists and labels with **download** mode
|
||||
* Download music from last.fm playlists (Spotify, Apple Music and Youtube playlists are also supported through this method)
|
||||
* Queue support on **interactive** mode
|
||||
* Effective duplicate handling with its own portable database
|
||||
* Support for albums with multiple discs
|
||||
* Support for M3U playlists
|
||||
* Downloads URLs from text file
|
||||
* Extended tags
|
||||
* And more
|
||||
|
||||
## Getting started
|
||||
@ -122,6 +124,8 @@ Reset your config file
|
||||
qobuz-dl -r
|
||||
```
|
||||
|
||||
By default, `qobuz-dl` will skip already downloaded items by ID with the message `This release ID ({item_id}) was already downloaded`. To avoid this check, add the flag `--no-db` at the end of a command. In extreme cases (e.g. lost collection), you can run `qobuz-dl -p` to completely reset the database.
|
||||
|
||||
## Usage
|
||||
```
|
||||
usage: qobuz-dl [-h] [-r] {fun,dl,lucky} ...
|
||||
@ -132,6 +136,7 @@ See usage examples on https://github.com/vitiko98/qobuz-dl
|
||||
optional arguments:
|
||||
-h, --help show this help message and exit
|
||||
-r, --reset create/reset config file
|
||||
-p, --purge purge/delete downloaded-IDs database
|
||||
|
||||
commands:
|
||||
run qobuz-dl <command> --help for more info
|
||||
|
@ -22,6 +22,7 @@ else:
|
||||
|
||||
CONFIG_PATH = os.path.join(OS_CONFIG, "qobuz-dl")
|
||||
CONFIG_FILE = os.path.join(CONFIG_PATH, "config.ini")
|
||||
QOBUZ_DB = os.path.join(CONFIG_PATH, "qobuz_dl.db")
|
||||
|
||||
|
||||
def reset_config(config_file):
|
||||
@ -52,6 +53,7 @@ def reset_config(config_file):
|
||||
config["DEFAULT"]["og_cover"] = "false"
|
||||
config["DEFAULT"]["embed_art"] = "false"
|
||||
config["DEFAULT"]["no_cover"] = "false"
|
||||
config["DEFAULT"]["no_database"] = "false"
|
||||
logging.info(f"{YELLOW}Getting tokens. Please wait...")
|
||||
spoofer = spoofbuz.Spoofer()
|
||||
config["DEFAULT"]["app_id"] = str(spoofer.getAppId())
|
||||
@ -97,6 +99,7 @@ def main():
|
||||
og_cover = config.getboolean("DEFAULT", "og_cover")
|
||||
embed_art = config.getboolean("DEFAULT", "embed_art")
|
||||
no_cover = config.getboolean("DEFAULT", "no_cover")
|
||||
no_database = config.getboolean("DEFAULT", "no_database")
|
||||
app_id = config["DEFAULT"]["app_id"]
|
||||
secrets = [
|
||||
secret for secret in config["DEFAULT"]["secrets"].split(",") if secret
|
||||
@ -108,11 +111,19 @@ def main():
|
||||
arguments = qobuz_dl_args().parse_args()
|
||||
if not arguments.reset:
|
||||
sys.exit(
|
||||
f"{RED}Your config file is corrupted! Run 'qobuz-dl -r' to fix this"
|
||||
f"{RED}Your config file is corrupted! Run 'qobuz-dl -r' to fix this."
|
||||
)
|
||||
|
||||
if arguments.reset:
|
||||
sys.exit(reset_config(CONFIG_FILE))
|
||||
|
||||
if arguments.purge:
|
||||
try:
|
||||
os.remove(QOBUZ_DB)
|
||||
except FileNotFoundError:
|
||||
pass
|
||||
sys.exit(f"{GREEN}The database was deleted.")
|
||||
|
||||
qobuz = QobuzDL(
|
||||
arguments.directory,
|
||||
arguments.quality,
|
||||
@ -122,6 +133,7 @@ def main():
|
||||
quality_fallback=not arguments.no_fallback or not no_fallback,
|
||||
cover_og_quality=arguments.og_cover or og_cover,
|
||||
no_cover=arguments.no_cover or no_cover,
|
||||
downloads_db=None if no_database or arguments.no_db else QOBUZ_DB
|
||||
)
|
||||
|
||||
qobuz.initialize_client(email, password, app_id, secrets)
|
||||
@ -141,7 +153,7 @@ def main():
|
||||
except KeyboardInterrupt:
|
||||
logging.info(
|
||||
f"{RED}Interrupted by user\n{YELLOW}Already downloaded items will "
|
||||
"be skipped if you try to download the same releases again"
|
||||
"be skipped if you try to download the same releases again."
|
||||
)
|
||||
|
||||
finally:
|
||||
|
@ -99,6 +99,9 @@ def add_common_arg(custom_parser, default_folder, default_quality):
|
||||
custom_parser.add_argument(
|
||||
"--no-cover", action="store_true", help="don't download cover art"
|
||||
)
|
||||
custom_parser.add_argument(
|
||||
"--no-db", action="store_true", help="don't call the database"
|
||||
)
|
||||
|
||||
|
||||
def qobuz_dl_args(
|
||||
@ -115,6 +118,12 @@ def qobuz_dl_args(
|
||||
parser.add_argument(
|
||||
"-r", "--reset", action="store_true", help="create/reset config file"
|
||||
)
|
||||
parser.add_argument(
|
||||
"-p",
|
||||
"--purge",
|
||||
action="store_true",
|
||||
help="purge/delete downloaded-IDs database",
|
||||
)
|
||||
|
||||
subparsers = parser.add_subparsers(
|
||||
title="commands",
|
||||
|
@ -1,6 +1,5 @@
|
||||
import logging
|
||||
import os
|
||||
import glob
|
||||
import re
|
||||
import string
|
||||
import sys
|
||||
@ -15,6 +14,7 @@ from pathvalidate import sanitize_filename
|
||||
import qobuz_dl.spoofbuz as spoofbuz
|
||||
from qobuz_dl import downloader, qopy
|
||||
from qobuz_dl.color import CYAN, OFF, RED, YELLOW, DF, RESET
|
||||
from qobuz_dl.db import create_db, handle_download_id
|
||||
|
||||
WEB_URL = "https://play.qobuz.com/"
|
||||
ARTISTS_SELECTOR = "td.chartlist-artist > a"
|
||||
@ -61,6 +61,7 @@ class QobuzDL:
|
||||
quality_fallback=True,
|
||||
cover_og_quality=False,
|
||||
no_cover=False,
|
||||
downloads_db=None,
|
||||
):
|
||||
self.directory = self.create_dir(directory)
|
||||
self.quality = quality
|
||||
@ -73,6 +74,7 @@ class QobuzDL:
|
||||
self.quality_fallback = quality_fallback
|
||||
self.cover_og_quality = cover_og_quality
|
||||
self.no_cover = no_cover
|
||||
self.downloads_db = create_db(downloads_db) if downloads_db else None
|
||||
|
||||
def initialize_client(self, email, pwd, app_id, secrets):
|
||||
self.client = qopy.Client(email, pwd, app_id, secrets)
|
||||
@ -99,6 +101,9 @@ class QobuzDL:
|
||||
).group(1)
|
||||
|
||||
def download_from_id(self, item_id, album=True, alt_path=None):
|
||||
if handle_download_id(self.downloads_db, item_id, add_id=False):
|
||||
logger.info(f"{OFF}This release ID ({item_id}) was already downloaded")
|
||||
return
|
||||
try:
|
||||
downloader.download_id_by_type(
|
||||
self.client,
|
||||
@ -112,6 +117,7 @@ class QobuzDL:
|
||||
self.cover_og_quality,
|
||||
self.no_cover,
|
||||
)
|
||||
handle_download_id(self.downloads_db, item_id, add_id=True)
|
||||
except requests.exceptions.RequestException as e:
|
||||
logger.error(f"{RED}Error getting release: {e}", exc_info=True)
|
||||
|
||||
|
39
qobuz_dl/db.py
Normal file
39
qobuz_dl/db.py
Normal file
@ -0,0 +1,39 @@
|
||||
import logging
|
||||
import sqlite3
|
||||
|
||||
from qobuz_dl.color import YELLOW, RED
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def create_db(db_path):
|
||||
with sqlite3.connect(db_path) as conn:
|
||||
try:
|
||||
conn.execute("CREATE TABLE downloads (id TEXT UNIQUE NOT NULL);")
|
||||
logger.info(f"{YELLOW} Downloads database created")
|
||||
except sqlite3.OperationalError:
|
||||
pass
|
||||
return db_path
|
||||
|
||||
|
||||
def handle_download_id(db_path, item_id, add_id=False):
|
||||
if not db_path:
|
||||
return
|
||||
|
||||
with sqlite3.connect(db_path) as conn:
|
||||
# If add_if is False return a string to know if the ID is in the DB
|
||||
# Otherwise just add the ID to the DB
|
||||
if add_id:
|
||||
try:
|
||||
conn.execute(
|
||||
"INSERT INTO downloads (id) VALUES (?)",
|
||||
(item_id,),
|
||||
)
|
||||
conn.commit()
|
||||
except sqlite3.Error as e:
|
||||
logger.error(f"{RED}Unexpected DB error: {e}")
|
||||
else:
|
||||
return conn.execute(
|
||||
"SELECT id FROM downloads where id=?",
|
||||
(item_id,),
|
||||
).fetchone()
|
@ -28,12 +28,11 @@ def tqdm_download(url, fname, track_name):
|
||||
bar.update(size)
|
||||
|
||||
|
||||
def get_description(u, mt, multiple=None):
|
||||
return "{} [{}/{}]".format(
|
||||
("[Disc {}] {}".format(multiple, mt["title"])) if multiple else mt["title"],
|
||||
u["bit_depth"],
|
||||
u["sampling_rate"],
|
||||
)
|
||||
def get_description(u: dict, track_title, multiple=None):
|
||||
downloading_title = f'{track_title} [{u["bit_depth"]}/{u["sampling_rate"]}]'
|
||||
if multiple:
|
||||
downloading_title = f"[Disc {multiple}] {downloading_title}"
|
||||
return downloading_title
|
||||
|
||||
|
||||
def get_format(client, item_dict, quality, is_track_id=False, track_url_dict=None):
|
||||
@ -61,30 +60,31 @@ def get_format(client, item_dict, quality, is_track_id=False, track_url_dict=Non
|
||||
and new_track_dict["sampling_rate"] == 44.1
|
||||
):
|
||||
return "FLAC", quality_met
|
||||
return "Hi-Res", quality_met
|
||||
return (
|
||||
f'{new_track_dict["bit_depth"]}B-{new_track_dict["sampling_rate"]}Khz',
|
||||
quality_met,
|
||||
)
|
||||
except (KeyError, requests.exceptions.HTTPError):
|
||||
return "Unknown", quality_met
|
||||
|
||||
|
||||
def get_title(item_dict):
|
||||
try:
|
||||
album_title = item_dict["title"]
|
||||
version = item_dict.get("version")
|
||||
is_explicit = item_dict.get("parental_warning")
|
||||
if version:
|
||||
album_title = (
|
||||
("{} ({})".format(item_dict["title"], item_dict["version"]))
|
||||
if item_dict["version"]
|
||||
and item_dict["version"].lower() not in item_dict["title"].lower()
|
||||
else item_dict["title"]
|
||||
)
|
||||
except KeyError:
|
||||
album_title = item_dict["title"]
|
||||
try:
|
||||
final_title = (
|
||||
(album_title + " (Explicit)")
|
||||
if item_dict["parental_warning"] and "explicit" not in album_title.lower()
|
||||
f"{album_title} ({version})"
|
||||
if version.lower() not in album_title.lower()
|
||||
else album_title
|
||||
)
|
||||
except KeyError:
|
||||
final_title = album_title
|
||||
return final_title
|
||||
if is_explicit:
|
||||
album_title = (
|
||||
f"{album_title} (Explicit)"
|
||||
if "explicit" not in album_title.lower()
|
||||
else album_title
|
||||
)
|
||||
return album_title
|
||||
|
||||
|
||||
def get_extra(i, dirn, extra="cover.jpg", og_quality=False):
|
||||
@ -134,21 +134,28 @@ def download_and_tag(
|
||||
return
|
||||
|
||||
if multiple:
|
||||
root_dir = os.path.join(root_dir, "Disc " + str(multiple))
|
||||
root_dir = os.path.join(root_dir, f"Disc {multiple}")
|
||||
os.makedirs(root_dir, exist_ok=True)
|
||||
|
||||
filename = os.path.join(root_dir, ".{:02}.tmp".format(tmp_count))
|
||||
filename = os.path.join(root_dir, f".{tmp_count:02}.tmp")
|
||||
|
||||
new_track_title = sanitize_filename(track_metadata["title"])
|
||||
track_file = "{:02}. {}{}".format(
|
||||
track_metadata["track_number"], new_track_title, extension
|
||||
# Determine the filename
|
||||
artist = track_metadata.get("performer", {}).get("name")
|
||||
version = track_metadata.get("version")
|
||||
new_track_title = (
|
||||
f'{artist if artist else track_metadata["album"]["artist"]["name"]}'
|
||||
f' - {track_metadata["title"]}'
|
||||
)
|
||||
final_file = os.path.join(root_dir, track_file)
|
||||
if version:
|
||||
new_track_title = f"{new_track_title} ({version})"
|
||||
track_file = f'{track_metadata["track_number"]:02}. {new_track_title}{extension}'
|
||||
final_file = os.path.join(root_dir, sanitize_filename(track_file))
|
||||
|
||||
if os.path.isfile(final_file):
|
||||
logger.info(f'{OFF}{track_metadata["title"]}was already downloaded')
|
||||
logger.info(f"{OFF}{new_track_title} was already downloaded")
|
||||
return
|
||||
|
||||
desc = get_description(track_url_dict, track_metadata, multiple)
|
||||
desc = get_description(track_url_dict, new_track_title, multiple)
|
||||
tqdm_download(url, filename, desc)
|
||||
tag_function = metadata.tag_mp3 if is_mp3 else metadata.tag_flac
|
||||
try:
|
||||
@ -211,14 +218,14 @@ def download_id_by_type(
|
||||
)
|
||||
return
|
||||
|
||||
logger.info(f"\n{YELLOW}Downloading: {album_title} [{album_format}]\n")
|
||||
logger.info(f"\n{YELLOW}Downloading: {album_title}\nQuality: {album_format}\n")
|
||||
dirT = (
|
||||
meta["artist"]["name"],
|
||||
album_title,
|
||||
meta["release_date_original"].split("-")[0],
|
||||
album_format,
|
||||
)
|
||||
sanitized_title = sanitize_filename("{} - {} [{}] [{}]".format(*dirT))
|
||||
sanitized_title = sanitize_filename("{} - {} ({}) [{}]".format(*dirT))
|
||||
dirn = os.path.join(path, sanitized_title)
|
||||
os.makedirs(dirn, exist_ok=True)
|
||||
|
||||
@ -235,11 +242,7 @@ def download_id_by_type(
|
||||
media_numbers = [track["media_number"] for track in meta["tracks"]["items"]]
|
||||
is_multiple = True if len([*{*media_numbers}]) > 1 else False
|
||||
for i in meta["tracks"]["items"]:
|
||||
try:
|
||||
parse = client.get_track_url(i["id"], quality)
|
||||
except requests.exceptions.HTTPError:
|
||||
logger.info(f"{OFF}Nothing found")
|
||||
continue
|
||||
parse = client.get_track_url(i["id"], quality)
|
||||
if "sample" not in parse and parse["sampling_rate"]:
|
||||
is_mp3 = True if int(quality) == 5 else False
|
||||
download_and_tag(
|
||||
@ -257,11 +260,7 @@ def download_id_by_type(
|
||||
logger.info(f"{OFF}Demo. Skipping")
|
||||
count = count + 1
|
||||
else:
|
||||
try:
|
||||
parse = client.get_track_url(item_id, quality)
|
||||
except requests.exceptions.HTTPError:
|
||||
logger.info(f"{OFF}Nothing found")
|
||||
return
|
||||
parse = client.get_track_url(item_id, quality)
|
||||
|
||||
if "sample" not in parse and parse["sampling_rate"]:
|
||||
meta = client.get_track_meta(item_id)
|
||||
|
@ -8,15 +8,10 @@ logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def get_title(track_dict):
|
||||
try:
|
||||
title = (
|
||||
("{} ({})".format(track_dict["title"], track_dict["version"]))
|
||||
if track_dict["version"]
|
||||
else track_dict["title"]
|
||||
)
|
||||
except KeyError:
|
||||
title = track_dict["title"]
|
||||
|
||||
title = track_dict["title"]
|
||||
version = track_dict.get("version")
|
||||
if version:
|
||||
title = f"{title} ({version})"
|
||||
# for classical works
|
||||
if track_dict.get("work"):
|
||||
title = "{}: {}".format(track_dict["work"], title)
|
||||
@ -42,7 +37,9 @@ def tag_flac(filename, root_dir, final_name, d, album, istrack=True, em_image=Fa
|
||||
audio["TITLE"] = get_title(d)
|
||||
|
||||
audio["TRACKNUMBER"] = str(d["track_number"]) # TRACK NUMBER
|
||||
audio["DISCNUMBER"] = str(d["media_number"])
|
||||
|
||||
if "Disc " in final_name:
|
||||
audio["DISCNUMBER"] = str(d["media_number"])
|
||||
|
||||
try:
|
||||
audio["COMPOSER"] = d["composer"]["name"] # COMPOSER
|
||||
@ -57,18 +54,23 @@ def tag_flac(filename, root_dir, final_name, d, album, istrack=True, em_image=Fa
|
||||
else:
|
||||
audio["ARTIST"] = album["artist"]["name"]
|
||||
|
||||
try:
|
||||
audio["LABEL"] = album["label"]["name"]
|
||||
except KeyError:
|
||||
pass
|
||||
|
||||
if istrack:
|
||||
audio["GENRE"] = ", ".join(d["album"]["genres_list"]) # GENRE
|
||||
audio["ALBUMARTIST"] = d["album"]["artist"]["name"] # ALBUM ARTIST
|
||||
audio["TRACKTOTAL"] = str(d["album"]["tracks_count"]) # TRACK TOTAL
|
||||
audio["ALBUM"] = d["album"]["title"] # ALBUM TITLE
|
||||
audio["DATE"] = d["album"]["release_date_original"].split("-")[0]
|
||||
audio["DATE"] = d["album"]["release_date_original"]
|
||||
else:
|
||||
audio["GENRE"] = ", ".join(album["genres_list"]) # GENRE
|
||||
audio["ALBUMARTIST"] = album["artist"]["name"] # ALBUM ARTIST
|
||||
audio["TRACKTOTAL"] = str(album["tracks_count"]) # TRACK TOTAL
|
||||
audio["ALBUM"] = album["title"] # ALBUM TITLE
|
||||
audio["DATE"] = album["release_date_original"].split("-")[0]
|
||||
audio["DATE"] = album["release_date_original"]
|
||||
|
||||
if em_image:
|
||||
emb_image = os.path.join(root_dir, "cover.jpg")
|
||||
@ -108,10 +110,15 @@ def tag_mp3(filename, root_dir, final_name, d, album, istrack=True, em_image=Fal
|
||||
audio["title"] = get_title(d)
|
||||
|
||||
audio["tracknumber"] = str(d["track_number"])
|
||||
|
||||
if "Disc " in final_name:
|
||||
audio["discnumber"] = str(d["media_number"])
|
||||
|
||||
try:
|
||||
audio["composer"] = d["composer"]["name"]
|
||||
except KeyError:
|
||||
pass
|
||||
|
||||
try:
|
||||
audio["artist"] = d["performer"]["name"] # TRACK ARTIST
|
||||
except KeyError:
|
||||
@ -124,12 +131,12 @@ def tag_mp3(filename, root_dir, final_name, d, album, istrack=True, em_image=Fal
|
||||
audio["genre"] = ", ".join(d["album"]["genres_list"]) # GENRE
|
||||
audio["albumartist"] = d["album"]["artist"]["name"] # ALBUM ARTIST
|
||||
audio["album"] = d["album"]["title"] # ALBUM TITLE
|
||||
audio["date"] = d["album"]["release_date_original"].split("-")[0]
|
||||
audio["date"] = d["album"]["release_date_original"]
|
||||
else:
|
||||
audio["genre"] = ", ".join(album["genres_list"]) # GENRE
|
||||
audio["albumartist"] = album["artist"]["name"] # ALBUM ARTIST
|
||||
audio["album"] = album["title"] # ALBUM TITLE
|
||||
audio["date"] = album["release_date_original"].split("-")[0] # YEAR
|
||||
audio["date"] = album["release_date_original"]
|
||||
|
||||
audio.save()
|
||||
os.rename(filename, final_name)
|
||||
|
Loading…
x
Reference in New Issue
Block a user