mirror of
https://github.com/Wojtek242/qobuz-dl.git
synced 2024-12-22 15:24:40 +01:00
parent
1bf8bb16de
commit
fd260ee0f3
@ -1,11 +1,18 @@
|
||||
import base64
|
||||
import configparser
|
||||
import logging
|
||||
import os
|
||||
import sys
|
||||
|
||||
import qobuz_dl.spoofbuz as spoofbuz
|
||||
from qobuz_dl.core import QobuzDL
|
||||
from qobuz_dl.color import DF, GREEN, MAGENTA, RED, YELLOW
|
||||
from qobuz_dl.commands import qobuz_dl_args
|
||||
from qobuz_dl.core import QobuzDL
|
||||
|
||||
logging.basicConfig(
|
||||
level=logging.INFO,
|
||||
format="%(message)s",
|
||||
)
|
||||
|
||||
if os.name == "nt":
|
||||
OS_CONFIG = os.environ.get("APPDATA")
|
||||
@ -17,32 +24,34 @@ CONFIG_FILE = os.path.join(CONFIG_PATH, "config.ini")
|
||||
|
||||
|
||||
def reset_config(config_file):
|
||||
print("Creating config file: " + config_file)
|
||||
logging.info(f"{YELLOW}Creating config file: {config_file}")
|
||||
config = configparser.ConfigParser()
|
||||
config["DEFAULT"]["email"] = input("\nEnter your email:\n- ")
|
||||
config["DEFAULT"]["email"] = input(f"{MAGENTA}Enter your email:\n-{DF} ")
|
||||
config["DEFAULT"]["password"] = base64.b64encode(
|
||||
input("\nEnter your password\n- ").encode()
|
||||
input(f"{MAGENTA}Enter your password\n-{DF} ").encode()
|
||||
).decode()
|
||||
config["DEFAULT"]["default_folder"] = (
|
||||
input("\nFolder for downloads (leave empy for default 'Qobuz Downloads')\n- ")
|
||||
input(
|
||||
f"{MAGENTA}Folder for downloads (leave empy for default 'Qobuz Downloads')\n-{DF} "
|
||||
)
|
||||
or "Qobuz Downloads"
|
||||
)
|
||||
config["DEFAULT"]["default_quality"] = (
|
||||
input(
|
||||
"\nDownload quality (5, 6, 7, 27) "
|
||||
f"{MAGENTA}Download quality (5, 6, 7, 27) "
|
||||
"[320, LOSSLESS, 24B <96KHZ, 24B >96KHZ]"
|
||||
"\n(leave empy for default '6')\n- "
|
||||
f"\n(leave empy for default '6')\n-{DF} "
|
||||
)
|
||||
or "6"
|
||||
)
|
||||
config["DEFAULT"]["default_limit"] = "20"
|
||||
print("Getting tokens. Please wait...")
|
||||
logging.info(f"{YELLOW}Getting tokens. Please wait...")
|
||||
spoofer = spoofbuz.Spoofer()
|
||||
config["DEFAULT"]["app_id"] = str(spoofer.getAppId())
|
||||
config["DEFAULT"]["secrets"] = ",".join(spoofer.getSecrets().values())
|
||||
with open(config_file, "w") as configfile:
|
||||
config.write(configfile)
|
||||
print("Config file updated.")
|
||||
logging.info(f"{GREEN}Config file updated.")
|
||||
|
||||
|
||||
def main():
|
||||
@ -77,7 +86,9 @@ def main():
|
||||
except (KeyError, UnicodeDecodeError):
|
||||
arguments = qobuz_dl_args().parse_args()
|
||||
if not arguments.reset:
|
||||
print("Your config file is corrupted! Run 'qobuz-dl -r' to fix this\n")
|
||||
logging.warning(
|
||||
f"{RED}Your config file is corrupted! Run 'qobuz-dl -r' to fix this"
|
||||
)
|
||||
if arguments.reset:
|
||||
sys.exit(reset_config(CONFIG_FILE))
|
||||
|
||||
@ -91,16 +102,22 @@ def main():
|
||||
)
|
||||
qobuz.initialize_client(email, password, app_id, secrets)
|
||||
|
||||
if arguments.command == "dl":
|
||||
qobuz.download_list_of_urls(arguments.SOURCE)
|
||||
elif arguments.command == "lucky":
|
||||
query = " ".join(arguments.QUERY)
|
||||
qobuz.lucky_type = arguments.type
|
||||
qobuz.lucky_limit = arguments.number
|
||||
qobuz.lucky_mode(query)
|
||||
else:
|
||||
qobuz.interactive_limit = arguments.limit
|
||||
qobuz.interactive()
|
||||
try:
|
||||
if arguments.command == "dl":
|
||||
qobuz.download_list_of_urls(arguments.SOURCE)
|
||||
elif arguments.command == "lucky":
|
||||
query = " ".join(arguments.QUERY)
|
||||
qobuz.lucky_type = arguments.type
|
||||
qobuz.lucky_limit = arguments.number
|
||||
qobuz.lucky_mode(query)
|
||||
else:
|
||||
qobuz.interactive_limit = arguments.limit
|
||||
qobuz.interactive()
|
||||
except KeyboardInterrupt:
|
||||
logging.info(
|
||||
f"{RED}Interrupted by user\n{MAGENTA}Already downloaded items will "
|
||||
"be skipped if you try to download the same releases again"
|
||||
)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
13
qobuz_dl/color.py
Normal file
13
qobuz_dl/color.py
Normal file
@ -0,0 +1,13 @@
|
||||
from colorama import Style, Fore, init
|
||||
|
||||
init(autoreset=True)
|
||||
|
||||
DF = Style.NORMAL
|
||||
BG = Style.BRIGHT
|
||||
OFF = Style.DIM
|
||||
RED = Fore.RED
|
||||
BLUE = Fore.BLUE
|
||||
GREEN = Fore.GREEN
|
||||
YELLOW = Fore.YELLOW
|
||||
CYAN = Fore.CYAN
|
||||
MAGENTA = Fore.MAGENTA
|
@ -1,3 +1,4 @@
|
||||
import logging
|
||||
import os
|
||||
import re
|
||||
import string
|
||||
@ -6,18 +7,21 @@ import time
|
||||
|
||||
import requests
|
||||
from bs4 import BeautifulSoup as bso
|
||||
from mutagen.flac import FLAC
|
||||
from mutagen.mp3 import EasyMP3
|
||||
from pathvalidate import sanitize_filename
|
||||
|
||||
import qobuz_dl.spoofbuz as spoofbuz
|
||||
from qobuz_dl import downloader, qopy
|
||||
|
||||
from mutagen.flac import FLAC
|
||||
from mutagen.mp3 import EasyMP3
|
||||
from qobuz_dl.color import MAGENTA, OFF, RED, YELLOW, DF
|
||||
|
||||
WEB_URL = "https://play.qobuz.com/"
|
||||
ARTISTS_SELECTOR = "td.chartlist-artist > a"
|
||||
TITLE_SELECTOR = "td.chartlist-name > a"
|
||||
EXTENSIONS = (".mp3", ".flac")
|
||||
QUALITIES = {5: "5 - MP3", 6: "6 - FLAC", 7: "7 - 24B<96kHz", 27: "27 - 24B>96kHz"}
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class PartialFormatter(string.Formatter):
|
||||
@ -67,6 +71,7 @@ class QobuzDL:
|
||||
|
||||
def initialize_client(self, email, pwd, app_id, secrets):
|
||||
self.client = qopy.Client(email, pwd, app_id, secrets)
|
||||
logger.info(f"{YELLOW}Set quality: {QUALITIES[int(self.quality)]}")
|
||||
|
||||
def get_tokens(self):
|
||||
spoofer = spoofbuz.Spoofer()
|
||||
@ -83,8 +88,8 @@ class QobuzDL:
|
||||
def get_id(self, url):
|
||||
return re.match(
|
||||
r"https?://(?:w{0,3}|play|open)\.qobuz\.com/(?:(?:album|track|artist"
|
||||
"|playlist|label)/|[a-z]{2}-[a-z]{2}/album/-?\w+(?:-\w+)*-?/|user/"
|
||||
"library/favorites/)(\w+)",
|
||||
r"|playlist|label)/|[a-z]{2}-[a-z]{2}/album/-?\w+(?:-\w+)*-?/|user/"
|
||||
r"library/favorites/)(\w+)",
|
||||
url,
|
||||
).group(1)
|
||||
|
||||
@ -122,23 +127,21 @@ class QobuzDL:
|
||||
type_dict = possibles[url_type]
|
||||
item_id = self.get_id(url)
|
||||
except (KeyError, IndexError):
|
||||
print(
|
||||
'Invalid url: "{}". Use urls from https://play.qobuz.com!'.format(url)
|
||||
logger.info(
|
||||
f'{RED}Invalid url: "{url}". Use urls from https://play.qobuz.com!'
|
||||
)
|
||||
return
|
||||
if type_dict["func"]:
|
||||
content = [item for item in type_dict["func"](item_id)]
|
||||
content_name = content[0]["name"]
|
||||
print(
|
||||
"\nDownloading all the music from {} ({})!".format(
|
||||
content_name, url_type
|
||||
)
|
||||
logger.info(
|
||||
f"{YELLOW}Downloading all the music from {content_name} ({url_type})!"
|
||||
)
|
||||
new_path = self.create_dir(
|
||||
os.path.join(self.directory, sanitize_filename(content_name))
|
||||
)
|
||||
items = [item[type_dict["iterable_key"]]["items"] for item in content][0]
|
||||
print("{} downloads in queue".format(len(items)))
|
||||
logger.info(f"{YELLOW}{len(items)} downloads in queue")
|
||||
for item in items:
|
||||
self.download_from_id(
|
||||
item["id"],
|
||||
@ -152,7 +155,7 @@ class QobuzDL:
|
||||
|
||||
def download_list_of_urls(self, urls):
|
||||
if not urls or not isinstance(urls, list):
|
||||
print("Nothing to download")
|
||||
logger.info(f"{OFF}Nothing to download")
|
||||
return
|
||||
for url in urls:
|
||||
if "last.fm" in url:
|
||||
@ -167,24 +170,21 @@ class QobuzDL:
|
||||
try:
|
||||
urls = txt.read().strip().split()
|
||||
except Exception as e:
|
||||
print("Invalid text file: " + str(e))
|
||||
logger.error(f"{RED}Invalid text file: {e}")
|
||||
return
|
||||
print(
|
||||
'qobuz-dl will download {} urls from file: "{}"\n'.format(
|
||||
len(urls), txt_file
|
||||
)
|
||||
logger.info(
|
||||
f'{YELLOW}qobuz-dl will download {len(urls)} urls from file: "{txt_file}"'
|
||||
)
|
||||
self.download_list_of_urls(urls)
|
||||
|
||||
def lucky_mode(self, query, download=True):
|
||||
if len(query) < 3:
|
||||
sys.exit("Your search query is too short or invalid!")
|
||||
logger.info(f"{RED}Your search query is too short or invalid")
|
||||
return
|
||||
|
||||
print(
|
||||
'Searching {}s for "{}".\n'
|
||||
"qobuz-dl will attempt to download the first {} results.".format(
|
||||
self.lucky_type, query, self.lucky_limit
|
||||
)
|
||||
logger.info(
|
||||
f'{YELLOW}Searching {self.lucky_type}s for "{query}".\n'
|
||||
f"{YELLOW}qobuz-dl will attempt to download the first {self.lucky_limit} results."
|
||||
)
|
||||
results = self.search_by_type(query, self.lucky_type, self.lucky_limit, True)
|
||||
|
||||
@ -198,7 +198,7 @@ class QobuzDL:
|
||||
|
||||
def search_by_type(self, query, item_type, limit=10, lucky=False):
|
||||
if len(query) < 3:
|
||||
print("Your search query is too short or invalid!")
|
||||
logger.info("{RED}Your search query is too short or invalid")
|
||||
return
|
||||
|
||||
possibles = {
|
||||
@ -252,7 +252,7 @@ class QobuzDL:
|
||||
item_list.append({"text": text, "url": url} if not lucky else url)
|
||||
return item_list
|
||||
except (KeyError, IndexError):
|
||||
print("Invalid mode: " + item_type)
|
||||
logger.info(f"{RED}Invalid type: {item_type}")
|
||||
return
|
||||
|
||||
def interactive(self, download=True):
|
||||
@ -260,8 +260,9 @@ class QobuzDL:
|
||||
from pick import pick
|
||||
except (ImportError, ModuleNotFoundError):
|
||||
if os.name == "nt":
|
||||
print('Please install curses with "pip3 install windows-curses"')
|
||||
return
|
||||
sys.exit(
|
||||
'Please install curses with "pip3 install windows-curses" to continue'
|
||||
)
|
||||
raise
|
||||
|
||||
qualities = [
|
||||
@ -282,22 +283,22 @@ class QobuzDL:
|
||||
selected_type = pick(item_types, "I'll search for:\n[press Intro]")[0][
|
||||
:-1
|
||||
].lower()
|
||||
print("Ok, we'll search for " + selected_type + "s")
|
||||
logger.info(f"{YELLOW}Ok, we'll search for {selected_type}s")
|
||||
final_url_list = []
|
||||
while True:
|
||||
query = input("\nEnter your search: [Ctrl + c to quit]\n- ")
|
||||
print("Searching...")
|
||||
query = input(f"{MAGENTA}Enter your search: [Ctrl + c to quit]\n-{DF} ")
|
||||
logger.info(f"{YELLOW}Searching...")
|
||||
options = self.search_by_type(
|
||||
query, selected_type, self.interactive_limit
|
||||
)
|
||||
if not options:
|
||||
print("Nothing found!")
|
||||
logger.info(f"{OFF}Nothing found")
|
||||
continue
|
||||
title = (
|
||||
'*** RESULTS FOR "{}" ***\n\n'
|
||||
f'*** RESULTS FOR "{query.title()}" ***\n\n'
|
||||
"Select [space] the item(s) you want to download "
|
||||
"(one or more)\nPress Ctrl + c to quit\n"
|
||||
"Don't select anything to try another search".format(query.title())
|
||||
"Don't select anything to try another search"
|
||||
)
|
||||
selected_items = pick(
|
||||
options,
|
||||
@ -315,12 +316,12 @@ class QobuzDL:
|
||||
if y_n[0][0] == "N":
|
||||
break
|
||||
else:
|
||||
print("\nOk, try again...")
|
||||
logger.info(f"{YELLOW}Ok, try again...")
|
||||
continue
|
||||
if final_url_list:
|
||||
desc = (
|
||||
"Select [intro] the quality (the quality will be automat"
|
||||
"ically\ndowngraded if the selected is not found)"
|
||||
"Select [intro] the quality (the quality will "
|
||||
"be automatically\ndowngraded if the selected is not found)"
|
||||
)
|
||||
self.quality = pick(
|
||||
qualities,
|
||||
@ -334,29 +335,36 @@ class QobuzDL:
|
||||
|
||||
return final_url_list
|
||||
except KeyboardInterrupt:
|
||||
print("\nBye")
|
||||
logger.info(f"{YELLOW}Bye")
|
||||
return
|
||||
|
||||
def download_lastfm_pl(self, playlist_url):
|
||||
# Apparently, last fm API doesn't have a playlist endpoint. If you
|
||||
# find out that it has, please fix this!
|
||||
r = requests.get(playlist_url)
|
||||
try:
|
||||
r = requests.get(playlist_url, timeout=10)
|
||||
except requests.exceptions.RequestException as e:
|
||||
logger.error(f"{RED}Playlist download failed: {e}")
|
||||
return
|
||||
soup = bso(r.content, "html.parser")
|
||||
artists = [artist.text for artist in soup.select(ARTISTS_SELECTOR)]
|
||||
titles = [title.text for title in soup.select(TITLE_SELECTOR)]
|
||||
|
||||
track_list = []
|
||||
if len(artists) == len(titles) and artists:
|
||||
track_list = [
|
||||
artist + " " + title for artist, title in zip(artists, titles)
|
||||
]
|
||||
|
||||
if not track_list:
|
||||
print("Nothing found")
|
||||
logger.info(f"{OFF}Nothing found")
|
||||
return
|
||||
|
||||
pl_title = sanitize_filename(soup.select_one("h1").text)
|
||||
pl_directory = os.path.join(self.directory, pl_title)
|
||||
print("Downloading playlist: {} ({} tracks)".format(pl_title, len(track_list)))
|
||||
logger.info(
|
||||
f"{YELLOW}Downloading playlist: {pl_title} ({len(track_list)} tracks)"
|
||||
)
|
||||
|
||||
for i in track_list:
|
||||
track_id = self.get_id(self.search_by_type(i, "track", 1, lucky=True)[0])
|
||||
|
@ -1,3 +1,4 @@
|
||||
import logging
|
||||
import os
|
||||
|
||||
import requests
|
||||
@ -5,8 +6,10 @@ from pathvalidate import sanitize_filename
|
||||
from tqdm import tqdm
|
||||
|
||||
import qobuz_dl.metadata as metadata
|
||||
from qobuz_dl.color import OFF, GREEN, RED, YELLOW, CYAN
|
||||
|
||||
QL_DOWNGRADE = "FormatRestrictedByFormatAvailability"
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def tqdm_download(url, fname, track_name):
|
||||
@ -18,7 +21,7 @@ def tqdm_download(url, fname, track_name):
|
||||
unit_scale=True,
|
||||
unit_divisor=1024,
|
||||
desc=track_name,
|
||||
bar_format="{n_fmt}/{total_fmt} /// {desc}",
|
||||
bar_format=CYAN + "{n_fmt}/{total_fmt} /// {desc}",
|
||||
) as bar:
|
||||
for data in r.iter_content(chunk_size=1024):
|
||||
size = file.write(data)
|
||||
@ -87,12 +90,12 @@ def get_title(item_dict):
|
||||
def get_extra(i, dirn, extra="cover.jpg"):
|
||||
extra_file = os.path.join(dirn, extra)
|
||||
if os.path.isfile(extra_file):
|
||||
print(extra.split(".")[0].title() + " already downloaded")
|
||||
logger.info(f"{OFF}{extra} was already downloaded")
|
||||
return
|
||||
tqdm_download(
|
||||
i.replace("_600.", "_org."),
|
||||
extra_file,
|
||||
"Downloading " + extra.split(".")[0],
|
||||
extra,
|
||||
)
|
||||
|
||||
|
||||
@ -127,7 +130,7 @@ def download_and_tag(
|
||||
try:
|
||||
url = track_url_dict["url"]
|
||||
except KeyError:
|
||||
print("Track not available for download")
|
||||
logger.info(f"{OFF}Track not available for download")
|
||||
return
|
||||
|
||||
if multiple:
|
||||
@ -142,7 +145,7 @@ def download_and_tag(
|
||||
)
|
||||
final_file = os.path.join(root_dir, track_file)
|
||||
if os.path.isfile(final_file):
|
||||
print(track_metadata["title"] + " was already downloaded. Skipping...")
|
||||
logger.info(f'{OFF}{track_metadata["title"]}was already downloaded')
|
||||
return
|
||||
|
||||
desc = get_description(track_url_dict, track_metadata, multiple)
|
||||
@ -159,8 +162,7 @@ def download_and_tag(
|
||||
embed_art,
|
||||
)
|
||||
except Exception as e:
|
||||
print("Error tagging the file: " + str(e))
|
||||
os.remove(filename)
|
||||
logger.error(f"{RED}Error tagging the file: {e}")
|
||||
|
||||
|
||||
def download_id_by_type(
|
||||
@ -194,16 +196,18 @@ def download_id_by_type(
|
||||
meta.get("release_type") != "album"
|
||||
or meta.get("artist").get("name") == "Various Artists"
|
||||
):
|
||||
print("Ignoring Single/EP/VA: " + meta.get("title", ""))
|
||||
logger.info(f'{OFF}Ignoring Single/EP/VA: {meta.get("title", "")}')
|
||||
return
|
||||
|
||||
album_title = get_title(meta)
|
||||
album_format, quality_met = get_format(client, meta, quality)
|
||||
if not downgrade_quality and not quality_met:
|
||||
print("Skipping release as doesn't met quality requirement")
|
||||
logger.info(
|
||||
f"{OFF}Skipping {album_title} as doesn't met quality requirement"
|
||||
)
|
||||
return
|
||||
|
||||
print("\nDownloading: {}\n".format(album_title))
|
||||
logger.info(f"\n{YELLOW}Downloading: {album_title} [{album_format}]\n")
|
||||
dirT = (
|
||||
meta["artist"]["name"],
|
||||
album_title,
|
||||
@ -225,7 +229,7 @@ def download_id_by_type(
|
||||
try:
|
||||
parse = client.get_track_url(i["id"], quality)
|
||||
except requests.exceptions.HTTPError:
|
||||
print("Nothing found")
|
||||
logger.info(f"{OFF}Nothing found")
|
||||
continue
|
||||
if "sample" not in parse and parse["sampling_rate"]:
|
||||
is_mp3 = True if int(quality) == 5 else False
|
||||
@ -241,22 +245,24 @@ def download_id_by_type(
|
||||
i["media_number"] if is_multiple else None,
|
||||
)
|
||||
else:
|
||||
print("Demo. Skipping")
|
||||
logger.info(f"{OFF}Demo. Skipping")
|
||||
count = count + 1
|
||||
else:
|
||||
try:
|
||||
parse = client.get_track_url(item_id, quality)
|
||||
except requests.exceptions.HTTPError:
|
||||
print("Nothing found")
|
||||
logger.info(f"{OFF}Nothing found")
|
||||
return
|
||||
|
||||
if "sample" not in parse and parse["sampling_rate"]:
|
||||
meta = client.get_track_meta(item_id)
|
||||
track_title = get_title(meta)
|
||||
print("\nDownloading: {}\n".format(track_title))
|
||||
logger.info(f"\n{YELLOW}Downloading: {track_title}")
|
||||
track_format, quality_met = get_format(client, meta, quality, True, parse)
|
||||
if not downgrade_quality and not quality_met:
|
||||
print("Skipping track as doesn't met quality requirement")
|
||||
logger.info(
|
||||
f"{OFF}Skipping {track_title} as doesn't met quality requirement"
|
||||
)
|
||||
return
|
||||
dirT = (
|
||||
meta["album"]["artist"]["name"],
|
||||
@ -271,5 +277,5 @@ def download_id_by_type(
|
||||
is_mp3 = True if int(quality) == 5 else False
|
||||
download_and_tag(dirn, count, parse, meta, meta, True, is_mp3, embed_art)
|
||||
else:
|
||||
print("Demo. Skipping")
|
||||
print("\nCompleted\n")
|
||||
logger.info(f"{OFF}Demo. Skipping")
|
||||
logger.info(f"{GREEN}Completed")
|
||||
|
@ -12,3 +12,7 @@ class InvalidAppIdError(Exception):
|
||||
|
||||
class InvalidAppSecretError(Exception):
|
||||
pass
|
||||
|
||||
|
||||
class InvalidQuality(Exception):
|
||||
pass
|
||||
|
@ -1,8 +1,11 @@
|
||||
import os
|
||||
import logging
|
||||
|
||||
from mutagen.flac import FLAC, Picture
|
||||
from mutagen.mp3 import EasyMP3
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def get_title(track_dict):
|
||||
try:
|
||||
@ -38,6 +41,7 @@ def tag_flac(filename, root_dir, final_name, d, album, istrack=True, em_image=Fa
|
||||
audio["TITLE"] = get_title(d)
|
||||
|
||||
audio["TRACKNUMBER"] = str(d["track_number"]) # TRACK NUMBER
|
||||
audio["DISCNUMBER"] = str(d["media_number"])
|
||||
|
||||
try:
|
||||
audio["COMPOSER"] = d["composer"]["name"] # COMPOSER
|
||||
@ -80,7 +84,7 @@ def tag_flac(filename, root_dir, final_name, d, album, istrack=True, em_image=Fa
|
||||
image.data = img.read()
|
||||
audio.add_picture(image)
|
||||
except Exception as e:
|
||||
print("Error embedding image: " + str(e))
|
||||
logger.error(f"Error embedding image: {e}", exc_info=True)
|
||||
|
||||
audio.save()
|
||||
os.rename(filename, final_name)
|
||||
|
@ -3,6 +3,7 @@
|
||||
# original author.
|
||||
|
||||
import hashlib
|
||||
import logging
|
||||
import time
|
||||
|
||||
import requests
|
||||
@ -12,14 +13,18 @@ from qobuz_dl.exceptions import (
|
||||
IneligibleError,
|
||||
InvalidAppIdError,
|
||||
InvalidAppSecretError,
|
||||
InvalidQuality,
|
||||
)
|
||||
from qobuz_dl.color import GREEN, YELLOW
|
||||
|
||||
RESET = "Reset your credentials with 'qobuz-dl -r'"
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class Client:
|
||||
def __init__(self, email, pwd, app_id, secrets):
|
||||
print("Logging...")
|
||||
logger.info(f"{YELLOW}Logging...")
|
||||
self.secrets = secrets
|
||||
self.id = app_id
|
||||
self.session = requests.Session()
|
||||
@ -80,6 +85,8 @@ class Client:
|
||||
unix = time.time()
|
||||
track_id = kwargs["id"]
|
||||
fmt_id = kwargs["fmt_id"]
|
||||
if int(fmt_id) not in (5, 6, 7, 27):
|
||||
raise InvalidQuality("Invalid quality id: choose between 5, 6, 7 or 27")
|
||||
r_sig = "trackgetFileUrlformat_id{}intentstreamtrack_id{}{}{}".format(
|
||||
fmt_id, track_id, unix, self.sec
|
||||
)
|
||||
@ -94,14 +101,13 @@ class Client:
|
||||
else:
|
||||
params = kwargs
|
||||
r = self.session.get(self.base + epoint, params=params)
|
||||
# Do ref header.
|
||||
if epoint == "user/login":
|
||||
if r.status_code == 401:
|
||||
raise AuthenticationError("Invalid credentials.\n" + RESET)
|
||||
elif r.status_code == 400:
|
||||
raise InvalidAppIdError("Invalid app id.\n" + RESET)
|
||||
else:
|
||||
print("Logged: OK")
|
||||
logger.info(f"{GREEN}Logged: OK")
|
||||
elif epoint in ["track/getFileUrl", "userLibrary/getAlbumsList"]:
|
||||
if r.status_code == 400:
|
||||
raise InvalidAppSecretError("Invalid app secret.\n" + RESET)
|
||||
@ -115,7 +121,7 @@ class Client:
|
||||
self.uat = usr_info["user_auth_token"]
|
||||
self.session.headers.update({"X-User-Auth-Token": self.uat})
|
||||
self.label = usr_info["user"]["credential"]["parameters"]["short_label"]
|
||||
print("Membership: {}\n".format(self.label))
|
||||
logger.info(f"{GREEN}Membership: {self.label}")
|
||||
|
||||
def multi_meta(self, epoint, key, id, type):
|
||||
total = 1
|
||||
|
@ -4,3 +4,4 @@ mutagen
|
||||
tqdm
|
||||
pick
|
||||
beautifulsoup4
|
||||
colorama
|
||||
|
Loading…
Reference in New Issue
Block a user