clean code; fix time format

This commit is contained in:
vitiko98 2020-10-17 12:52:35 -04:00
parent a6ad33ad7d
commit 68caf02c31
8 changed files with 238 additions and 205 deletions

106
main.py
View File

@ -1,35 +1,46 @@
from qo_utils.search import Search
from qo_utils import downloader
from pick import pick
import argparse import argparse
import itertools import itertools
import re
import os
import sys
import json import json
import os
import re
import sys
from pick import pick
import qopy import qopy
from qo_utils import downloader
from qo_utils.search import Search
def getArgs(): def getArgs():
parser = argparse.ArgumentParser(prog='python3 main.py') parser = argparse.ArgumentParser(prog="python3 main.py")
parser.add_argument("-a", action="store_true", parser.add_argument("-a", action="store_true", help="enable albums-only search")
help="enable albums-only search") parser.add_argument(
parser.add_argument("-i", action="store_true", "-i", action="store_true", help="run Qo-Dl-curses on URL input mode"
help="run Qo-Dl-curses on URL input mode") )
parser.add_argument("-q", metavar="int", default=6, parser.add_argument(
help="quality (5, 6, 7, 27) (default: 6)") "-q", metavar="int", default=6, help="quality (5, 6, 7, 27) (default: 6)"
parser.add_argument("-l", metavar="int", default=10, )
help="limit of search results by type (default: 10)") parser.add_argument(
parser.add_argument("-d", metavar="PATH", default='Qobuz Downloads', "-l",
help="custom directory for downloads") metavar="int",
default=10,
help="limit of search results by type (default: 10)",
)
parser.add_argument(
"-d",
metavar="PATH",
default="Qobuz Downloads",
help="custom directory for downloads",
)
return parser.parse_args() return parser.parse_args()
def getSession(): def getSession():
print('Logging...') print("Logging...")
with open('config.json') as f: with open("config.json") as f:
config = json.load(f) config = json.load(f)
return qopy.Client(config['email'], config['password']) return qopy.Client(config["email"], config["password"])
def musicDir(dir): def musicDir(dir):
@ -40,13 +51,16 @@ def musicDir(dir):
def get_id(url): def get_id(url):
return re.match(r'https?://(?:w{0,3}|play|open)\.qobuz\.com/(?:(?' return re.match(
':album|track)/|[a-z]{2}-[a-z]{2}/album/-?\w+(?:-\w+)' r"https?://(?:w{0,3}|play|open)\.qobuz\.com/(?:(?"
'*-?/|user/library/favorites/)(\w+)', url).group(1) ":album|track)/|[a-z]{2}-[a-z]{2}/album/-?\w+(?:-\w+)"
"*-?/|user/library/favorites/)(\w+)",
url,
).group(1)
def searchSelected(Qz, path, albums, ids, types, quality): def searchSelected(Qz, path, albums, ids, types, quality):
q = ['5', '6', '7', '27'] q = ["5", "6", "7", "27"]
quality = q[quality[1]] quality = q[quality[1]]
for alb, id_, type_ in zip(albums, ids, types): for alb, id_, type_ in zip(albums, ids, types):
for al in alb: for al in alb:
@ -57,7 +71,7 @@ def searchSelected(Qz, path, albums, ids, types, quality):
def fromUrl(Qz, path, link, quality): def fromUrl(Qz, path, link, quality):
if '/track/' in link: if "/track/" in link:
id = get_id(link) id = get_id(link)
downloader.iterateIDs(Qz, id, path, quality, False) downloader.iterateIDs(Qz, id, path, quality, False)
else: else:
@ -71,32 +85,37 @@ def interactive(Qz, path, limit, tracks=True):
try: try:
while True: while True:
query = input("\nEnter your search: [Ctrl + c to quit]\n- ") query = input("\nEnter your search: [Ctrl + c to quit]\n- ")
print('Searching...') print("Searching...")
start = Search(Qz, query, limit) start = Search(Qz, query, limit)
start.getResults(tracks) start.getResults(tracks)
Types.append(start.Types) Types.append(start.Types)
IDs.append(start.IDs) IDs.append(start.IDs)
title = ('Select [space] the item(s) you want to download ' title = (
'(one or more)\nPress Ctrl + c to quit\n') "Select [space] the item(s) you want to download "
Selected = pick(start.Total, title, "(one or more)\nPress Ctrl + c to quit\n"
multiselect=True, min_selection_count=1) )
Selected = pick(
start.Total, title, multiselect=True, min_selection_count=1
)
Albums.append(Selected) Albums.append(Selected)
y_n = pick(['Yes', 'No'], 'Items were added to queue to ' y_n = pick(
'be downloaded. Keep searching?') ["Yes", "No"],
if y_n[0][0] == 'N': "Items were added to queue to be downloaded. Keep searching?",
)
if y_n[0][0] == "N":
break break
else:
pass
desc = ('Select [intro] the quality (the quality will be automat' desc = (
'ically\ndowngraded if the selected is not found)') "Select [intro] the quality (the quality will be automat"
Qualits = ['320', 'Lossless', 'Hi-res =< 96kHz', 'Hi-Res > 96 kHz'] "ically\ndowngraded if the selected is not found)"
)
Qualits = ["320", "Lossless", "Hi-res =< 96kHz", "Hi-Res > 96 kHz"]
quality = pick(Qualits, desc) quality = pick(Qualits, desc)
searchSelected(Qz, path, Albums, IDs, Types, quality) searchSelected(Qz, path, Albums, IDs, Types, quality)
except KeyboardInterrupt: except KeyboardInterrupt:
sys.exit('\nBye') sys.exit("\nBye")
def inputMode(Qz, path, quality): def inputMode(Qz, path, quality):
@ -105,18 +124,15 @@ def inputMode(Qz, path, quality):
link = input("\nAlbum/track URL: [Ctrl + c to quit]\n- ") link = input("\nAlbum/track URL: [Ctrl + c to quit]\n- ")
fromUrl(Qz, path, link, quality) fromUrl(Qz, path, link, quality)
except KeyboardInterrupt: except KeyboardInterrupt:
sys.exit('\nBye') sys.exit("\nBye")
def main(): def main():
arguments = getArgs() arguments = getArgs()
directory = musicDir(arguments.d) + '/' directory = musicDir(arguments.d) + "/"
Qz = getSession() Qz = getSession()
if not arguments.i: if not arguments.i:
if arguments.a: interactive(Qz, directory, arguments.l, not arguments.a)
interactive(Qz, directory, arguments.l, False)
else:
interactive(Qz, directory, arguments.l, True)
else: else:
inputMode(Qz, directory, arguments.q) inputMode(Qz, directory, arguments.q)

View File

@ -1,20 +1,22 @@
import os import os
import requests import requests
from qo_utils import metadata
from pathvalidate import sanitize_filename from pathvalidate import sanitize_filename
from tqdm import tqdm from tqdm import tqdm
from qo_utils import metadata
def req_tqdm(url, fname, track_name): def req_tqdm(url, fname, track_name):
r = requests.get(url, allow_redirects=True, stream=True) r = requests.get(url, allow_redirects=True, stream=True)
total = int(r.headers.get('content-length', 0)) total = int(r.headers.get("content-length", 0))
with open(fname, 'wb') as file, tqdm( with open(fname, "wb") as file, tqdm(
total=total, total=total,
unit='iB', unit="iB",
unit_scale=True, unit_scale=True,
unit_divisor=1024, unit_divisor=1024,
desc=track_name, desc=track_name,
bar_format='{n_fmt}/{total_fmt} /// {desc}', bar_format="{n_fmt}/{total_fmt} /// {desc}",
) as bar: ) as bar:
for data in r.iter_content(chunk_size=1024): for data in r.iter_content(chunk_size=1024):
size = file.write(data) size = file.write(data)
@ -25,25 +27,25 @@ def mkDir(dirn):
try: try:
os.mkdir(dirn) os.mkdir(dirn)
except FileExistsError: except FileExistsError:
print('Warning: folder already exists. Overwriting...') print("Warning: folder already exists. Overwriting...")
def getDesc(u, mt): def getDesc(u, mt):
return '{} [{}/{}]'.format(mt['title'], u['bit_depth'], u['sampling_rate']) return "{} [{}/{}]".format(mt["title"], u["bit_depth"], u["sampling_rate"])
def getCover(i, dirn): def getCover(i, dirn):
req_tqdm(i, dirn + '/cover.jpg', 'Downloading cover art') req_tqdm(i, dirn + "/cover.jpg", "Downloading cover art")
# Download and tag a file # Download and tag a file
def downloadItem(dirn, count, parse, meta, album, url, is_track, mp3): def downloadItem(dirn, count, parse, meta, album, url, is_track, mp3):
if mp3: fname = (
fname = '{}/{:02}.mp3'.format(dirn, count) "{}/{:02}.mp3".format(dirn, count)
func = metadata.tag_mp3 if mp3
else: else "{}/{:02}.flac".format(dirn, count)
fname = '{}/{:02}.flac'.format(dirn, count) )
func = metadata.tag_flac func = metadata.tag_mp3 if mp3 else metadata.tag_flac
desc = getDesc(parse, meta) desc = getDesc(parse, meta)
req_tqdm(url, fname, desc) req_tqdm(url, fname, desc)
func(fname, dirn, meta, album, is_track) func(fname, dirn, meta, album, is_track)
@ -55,46 +57,43 @@ def iterateIDs(client, id, path, quality, album=False):
if album: if album:
meta = client.get_album_meta(id) meta = client.get_album_meta(id)
print('\nDownloading: {}\n'.format(meta['title'])) print("\nDownloading: {}\n".format(meta["title"]))
dirT = (meta['artist']['name'], dirT = (
meta['title'], meta["artist"]["name"],
meta['release_date_original'].split('-')[0]) meta["title"],
sanitized_title = sanitize_filename('{} - {} [{}]'.format(*dirT)) meta["release_date_original"].split("-")[0],
)
sanitized_title = sanitize_filename("{} - {} [{}]".format(*dirT))
dirn = path + sanitized_title dirn = path + sanitized_title
mkDir(dirn) mkDir(dirn)
getCover(meta['image']['large'], dirn) getCover(meta["image"]["large"], dirn)
for i in meta['tracks']['items']: for i in meta["tracks"]["items"]:
parse = client.get_track_url(i['id'], quality) parse = client.get_track_url(i["id"], quality)
url = parse['url'] url = parse["url"]
if "sample" not in parse:
if 'sample' not in parse: is_mp3 = True if int(quality) == 5 else False
if int(quality) == 5: downloadItem(dirn, count, parse, i, meta, url, False, is_mp3)
downloadItem(dirn, count, parse, i, meta, url, False, True)
else:
downloadItem(dirn, count, parse, i, meta, url, False, False)
else: else:
print('Demo. Skipping') print("Demo. Skipping")
count = count + 1 count = count + 1
else: else:
parse = client.get_track_url(id, quality) parse = client.get_track_url(id, quality)
url = parse['url'] url = parse["url"]
if 'sample' not in parse: if "sample" not in parse:
meta = client.get_track_meta(id) meta = client.get_track_meta(id)
print('\nDownloading: {}\n'.format(meta['title'])) print("\nDownloading: {}\n".format(meta["title"]))
dirT = (meta['album']['artist']['name'], dirT = (
meta['title'], meta["album"]["artist"]["name"],
meta['album']['release_date_original'].split('-')[0]) meta["title"],
sanitized_title = sanitize_filename('{} - {} [{}]'.format(*dirT)) meta["album"]["release_date_original"].split("-")[0],
)
sanitized_title = sanitize_filename("{} - {} [{}]".format(*dirT))
dirn = path + sanitized_title dirn = path + sanitized_title
mkDir(dirn) mkDir(dirn)
getCover(meta['album']['image']['large'], dirn) getCover(meta["album"]["image"]["large"], dirn)
if int(quality) == 5: is_mp3 = True if int(quality) == 5 else False
downloadItem(dirn, count, parse, meta, meta, url, True, True) downloadItem(dirn, count, parse, meta, meta, url, True, is_mp3)
else:
downloadItem(dirn, count, parse, meta, meta, url, True, False)
else: else:
print('Demo. Skipping') print("Demo. Skipping")
print("\nCompleted\n")
print('\nCompleted\n')

View File

@ -1,73 +1,74 @@
import os
from mutagen.flac import FLAC from mutagen.flac import FLAC
from mutagen.mp3 import EasyMP3 from mutagen.mp3 import EasyMP3
from pathvalidate import sanitize_filename from pathvalidate import sanitize_filename
import os
def tag_flac(file, path, d, album, istrack=True): def tag_flac(file, path, d, album, istrack=True):
audio = FLAC(file) audio = FLAC(file)
audio['TITLE'] = d['title'] # TRACK TITLE audio["TITLE"] = d["title"] # TRACK TITLE
audio['TRACKNUMBER'] = str(d['track_number']) # TRACK NUMBER audio["TRACKNUMBER"] = str(d["track_number"]) # TRACK NUMBER
try: try:
audio['COMPOSER'] = d['composer']['name'] # COMPOSER audio["COMPOSER"] = d["composer"]["name"] # COMPOSER
except KeyError: except KeyError:
pass pass
try: try:
audio['ARTIST'] = d['performer']['name'] # TRACK ARTIST audio["ARTIST"] = d["performer"]["name"] # TRACK ARTIST
except KeyError: except KeyError:
if istrack: if istrack:
audio['ARTIST'] = d['album']['artist']['name'] # TRACK ARTIST audio["ARTIST"] = d["album"]["artist"]["name"] # TRACK ARTIST
else: else:
audio['ARTIST'] = album['artist']['name'] audio["ARTIST"] = album["artist"]["name"]
if istrack: if istrack:
audio['GENRE'] = ', '.join(d['album']['genres_list']) # GENRE audio["GENRE"] = ", ".join(d["album"]["genres_list"]) # GENRE
audio['ALBUMARTIST'] = d['album']['artist']['name'] # ALBUM ARTIST audio["ALBUMARTIST"] = d["album"]["artist"]["name"] # ALBUM ARTIST
audio['TRACKTOTAL'] = str(d['album']['tracks_count']) # TRACK TOTAL audio["TRACKTOTAL"] = str(d["album"]["tracks_count"]) # TRACK TOTAL
audio['ALBUM'] = d['album']['title'] # ALBUM TITLE audio["ALBUM"] = d["album"]["title"] # ALBUM TITLE
audio['YEAR'] = d['album']['release_date_original'].split('-')[0] audio["YEAR"] = d["album"]["release_date_original"].split("-")[0]
else: else:
audio['GENRE'] = ', '.join(album['genres_list']) # GENRE audio["GENRE"] = ", ".join(album["genres_list"]) # GENRE
audio['ALBUMARTIST'] = album['artist']['name'] # ALBUM ARTIST audio["ALBUMARTIST"] = album["artist"]["name"] # ALBUM ARTIST
audio['TRACKTOTAL'] = str(album['tracks_count']) # TRACK TOTAL audio["TRACKTOTAL"] = str(album["tracks_count"]) # TRACK TOTAL
audio['ALBUM'] = album['title'] # ALBUM TITLE audio["ALBUM"] = album["title"] # ALBUM TITLE
audio['YEAR'] = album['release_date_original'].split('-')[0] # YEAR audio["YEAR"] = album["release_date_original"].split("-")[0] # YEAR
audio.save() audio.save()
title = sanitize_filename(d['title']) title = sanitize_filename(d["title"])
os.rename(file, '{}/{:02}. {}.flac'.format(path, d['track_number'], title)) os.rename(file, "{}/{:02}. {}.flac".format(path, d["track_number"], title))
def tag_mp3(file, path, d, album, istrack=True): def tag_mp3(file, path, d, album, istrack=True):
audio = EasyMP3(file) audio = EasyMP3(file)
audio['title'] = d['title'] audio["title"] = d["title"]
audio['tracknumber'] = str(d['track_number']) audio["tracknumber"] = str(d["track_number"])
try: try:
audio['composer'] = d['composer']['name'] audio["composer"] = d["composer"]["name"]
except KeyError: except KeyError:
pass pass
try: try:
audio['artist'] = d['performer']['name'] # TRACK ARTIST audio["artist"] = d["performer"]["name"] # TRACK ARTIST
except KeyError: except KeyError:
if istrack: if istrack:
audio['artist'] = d['album']['artist']['name'] # TRACK ARTIST audio["artist"] = d["album"]["artist"]["name"] # TRACK ARTIST
else: else:
audio['artist'] = album['artist']['name'] audio["artist"] = album["artist"]["name"]
if istrack: if istrack:
audio['genre'] = ', '.join(d['album']['genres_list']) # GENRE audio["genre"] = ", ".join(d["album"]["genres_list"]) # GENRE
audio['albumartist'] = d['album']['artist']['name'] # ALBUM ARTIST audio["albumartist"] = d["album"]["artist"]["name"] # ALBUM ARTIST
audio['album'] = d['album']['title'] # ALBUM TITLE audio["album"] = d["album"]["title"] # ALBUM TITLE
audio['date'] = d['album']['release_date_original'].split('-')[0] audio["date"] = d["album"]["release_date_original"].split("-")[0]
else: else:
audio['GENRE'] = ', '.join(album['genres_list']) # GENRE audio["GENRE"] = ", ".join(album["genres_list"]) # GENRE
audio['albumartist'] = album['artist']['name'] # ALBUM ARTIST audio["albumartist"] = album["artist"]["name"] # ALBUM ARTIST
audio['album'] = album['title'] # ALBUM TITLE audio["album"] = album["title"] # ALBUM TITLE
audio['date'] = album['release_date_original'].split('-')[0] # YEAR audio["date"] = album["release_date_original"].split("-")[0] # YEAR
audio.save() audio.save()
title = sanitize_filename(d['title']) title = sanitize_filename(d["title"])
os.rename(file, '{}/{:02}. {}.mp3'.format(path, d['track_number'], title)) os.rename(file, "{}/{:02}. {}.mp3".format(path, d["track_number"], title))

View File

@ -6,33 +6,35 @@ class Search:
self.Total = [] self.Total = []
self.IDs = [] self.IDs = []
self.Types = [] self.Types = []
self.Tracks = Qz.search_tracks(query, limit)['tracks']['items'] self.Tracks = Qz.search_tracks(query, limit)["tracks"]["items"]
self.Albums = Qz.search_albums(query, limit)['albums']['items'] self.Albums = Qz.search_albums(query, limit)["albums"]["items"]
def seconds(self, duration): def seconds(self, duration):
return time.strftime("%M:%S", time.gmtime(duration)) return time.strftime("%H:%M:%S", time.gmtime(duration))
def isHRes(self, item):
if item:
return 'HI-RES'
else:
return 'Lossless'
def appendInfo(self, i, bool): def appendInfo(self, i, bool):
self.IDs.append(i['id']) self.IDs.append(i["id"])
self.Types.append(bool) self.Types.append(bool)
def itResults(self, iterable): def itResults(self, iterable):
for i in iterable: for i in iterable:
try: try:
items = (i['artist']['name'], i['title'], items = (
self.seconds(i['duration']), self.isHRes(i['hires'])) i["artist"]["name"],
self.Total.append('[RELEASE] {} - {} - {} [{}]'.format(*items)) i["title"],
self.seconds(i["duration"]),
"HI-RES" if i["hires"] else "Lossless",
)
self.Total.append("[RELEASE] {} - {} - {} [{}]".format(*items))
self.appendInfo(i, True) self.appendInfo(i, True)
except KeyError: except KeyError:
items = (i['performer']['name'], i['title'], items = (
self.seconds(i['duration']), self.isHRes(i['hires'])) i["performer"]["name"],
self.Total.append('[TRACK] {} - {} - {} [{}]'.format(*items)) i["title"],
self.seconds(i["duration"]),
"HI-RES" if i["hires"] else "Lossless",
)
self.Total.append("[TRACK] {} - {} - {} [{}]".format(*items))
self.appendInfo(i, False) self.appendInfo(i, False)
def getResults(self, tracks=False): def getResults(self, tracks=False):

View File

@ -1,23 +1,26 @@
import requests
import base64 import base64
import re import re
from collections import OrderedDict from collections import OrderedDict
class Spoofer: import requests
class Spoofer:
def __init__(self): def __init__(self):
self.seed_timezone_regex = r'[a-z]\.initialSeed\("(?P<seed>[\w=]+)",window\.utimezone\.(?P<timezone>[a-z]+)\)' self.seed_timezone_regex = r'[a-z]\.initialSeed\("(?P<seed>[\w=]+)",window\.utimezone\.(?P<timezone>[a-z]+)\)'
# note: {timezones} should be replaced with every capitalized timezone joined by a | # note: {timezones} should be replaced with every capitalized timezone joined by a |
self.info_extras_regex = r'name:"\w+/(?P<timezone>{timezones})",info:"(?P<info>[\w=]+)",extras:"(?P<extras>[\w=]+)"' self.info_extras_regex = r'name:"\w+/(?P<timezone>{timezones})",info:"(?P<info>[\w=]+)",extras:"(?P<extras>[\w=]+)"'
self.appId_regex = r'{app_id:"(?P<app_id>\d{9})",app_secret:"\w{32}",base_port:"80",base_url:"https://www\.qobuz\.com",base_method:"/api\.json/0\.2/"},n\.base_url="https://play\.qobuz\.com"' self.appId_regex = r'{app_id:"(?P<app_id>\d{9})",app_secret:"\w{32}",base_port:"80",base_url:"https://www\.qobuz\.com",base_method:"/api\.json/0\.2/"},n\.base_url="https://play\.qobuz\.com"'
login_page_request = requests.get("https://play.qobuz.com/login") login_page_request = requests.get("https://play.qobuz.com/login")
login_page = login_page_request.text login_page = login_page_request.text
bundle_url_match = re.search(r'<script src="(/resources/\d+\.\d+\.\d+-[a-z]\d{3}/bundle\.js)"></script>', bundle_url_match = re.search(
login_page) r'<script src="(/resources/\d+\.\d+\.\d+-[a-z]\d{3}/bundle\.js)"></script>',
login_page,
)
bundle_url = bundle_url_match.group(1) bundle_url = bundle_url_match.group(1)
bundle_req = requests.get("https://play.qobuz.com" + bundle_url) bundle_req = requests.get("https://play.qobuz.com" + bundle_url)
self.bundle = bundle_req.text self.bundle = bundle_req.text
def getAppId(self): def getAppId(self):
return re.search(self.appId_regex, self.bundle).group("app_id") return re.search(self.appId_regex, self.bundle).group("app_id")
@ -34,7 +37,9 @@ class Spoofer:
""" """
keypairs = list(secrets.items()) keypairs = list(secrets.items())
secrets.move_to_end(keypairs[1][0], last=False) secrets.move_to_end(keypairs[1][0], last=False)
info_extras_regex = self.info_extras_regex.format(timezones="|".join([timezone.capitalize() for timezone in secrets])) info_extras_regex = self.info_extras_regex.format(
timezones="|".join([timezone.capitalize() for timezone in secrets])
)
info_extras_matches = re.finditer(info_extras_regex, self.bundle) info_extras_matches = re.finditer(info_extras_regex, self.bundle)
for match in info_extras_matches: for match in info_extras_matches:
timezone, info, extras = match.group("timezone", "info", "extras") timezone, info, extras = match.group("timezone", "info", "extras")
@ -43,4 +48,4 @@ class Spoofer:
secrets[secret_pair] = base64.standard_b64decode( secrets[secret_pair] = base64.standard_b64decode(
"".join(secrets[secret_pair])[:-44] "".join(secrets[secret_pair])[:-44]
).decode("utf-8") ).decode("utf-8")
return secrets return secrets

View File

@ -1 +1 @@
from .qopy import Client from .qopy import Client

View File

@ -1,11 +1,14 @@
class AuthenticationError(Exception): class AuthenticationError(Exception):
pass pass
class IneligibleError(Exception): class IneligibleError(Exception):
pass pass
class InvalidAppIdError(Exception): class InvalidAppIdError(Exception):
pass pass
class InvalidAppSecretError(Exception): class InvalidAppSecretError(Exception):
pass pass

View File

@ -2,111 +2,118 @@
# of qopy, originally written by Sorrow446. All credits to the # of qopy, originally written by Sorrow446. All credits to the
# original author. # original author.
import time
import hashlib import hashlib
import requests import time
from qo_utils import spoofbuz
from qopy.exceptions import AuthenticationError, IneligibleError, InvalidAppSecretError, InvalidAppIdError import requests
from qo_utils import spoofbuz
from qopy.exceptions import (
AuthenticationError,
IneligibleError,
InvalidAppIdError,
InvalidAppSecretError,
)
class Client: class Client:
def __init__(self, email, pwd): def __init__(self, email, pwd):
print('Getting tokens...') print("Getting tokens...")
self.spoofer = spoofbuz.Spoofer() self.spoofer = spoofbuz.Spoofer()
self.id = self.spoofer.getAppId() self.id = self.spoofer.getAppId()
self.session = requests.Session() self.session = requests.Session()
self.session.headers.update({ self.session.headers.update(
'User-Agent':'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:67.0) Gecko/20100101 Firefox/67.0', {
"X-App-Id": self.id}) "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:67.0) Gecko/20100101 Firefox/67.0",
self.base = 'https://www.qobuz.com/api.json/0.2/' "X-App-Id": self.id,
}
)
self.base = "https://www.qobuz.com/api.json/0.2/"
self.auth(email, pwd) self.auth(email, pwd)
self.cfg_setup() self.cfg_setup()
def api_call(self, epoint, **kwargs): def api_call(self, epoint, **kwargs):
if epoint == "user/login?": if epoint == "user/login?":
params={ params = {
"email": kwargs['email'], "email": kwargs["email"],
"password": kwargs['pwd'], "password": kwargs["pwd"],
"app_id": self.id} "app_id": self.id,
}
elif epoint == "track/get?": elif epoint == "track/get?":
params={ params = {"track_id": kwargs["id"]}
"track_id": kwargs['id']}
elif epoint == "album/get?": elif epoint == "album/get?":
params={ params = {"album_id": kwargs["id"]}
"album_id": kwargs['id']}
elif epoint == "track/search?": elif epoint == "track/search?":
params={ params = {"query": kwargs["query"], "limit": kwargs["limit"]}
"query": kwargs['query'],
"limit": kwargs['limit']}
elif epoint == "album/search?": elif epoint == "album/search?":
params={ params = {"query": kwargs["query"], "limit": kwargs["limit"]}
"query": kwargs['query'],
"limit": kwargs['limit']}
elif epoint == "userLibrary/getAlbumsList?": elif epoint == "userLibrary/getAlbumsList?":
unix = time.time() unix = time.time()
r_sig = "userLibrarygetAlbumsList" + str(unix) + kwargs['sec'] r_sig = "userLibrarygetAlbumsList" + str(unix) + kwargs["sec"]
r_sig_hashed = hashlib.md5(r_sig.encode('utf-8')).hexdigest() r_sig_hashed = hashlib.md5(r_sig.encode("utf-8")).hexdigest()
params={ params = {
"app_id": self.id, "app_id": self.id,
"user_auth_token": self.uat, "user_auth_token": self.uat,
"request_ts": unix, "request_ts": unix,
"request_sig": r_sig_hashed} "request_sig": r_sig_hashed,
}
elif epoint == "track/getFileUrl?": elif epoint == "track/getFileUrl?":
unix = time.time() unix = time.time()
track_id = kwargs['id'] track_id = kwargs["id"]
fmt_id = kwargs['fmt_id'] fmt_id = kwargs["fmt_id"]
r_sig = "trackgetFileUrlformat_id{}intentstreamtrack_id{}{}{}".format(fmt_id, track_id, unix, self.sec) r_sig = "trackgetFileUrlformat_id{}intentstreamtrack_id{}{}{}".format(
r_sig_hashed = hashlib.md5(r_sig.encode('utf-8')).hexdigest() fmt_id, track_id, unix, self.sec
params={ )
r_sig_hashed = hashlib.md5(r_sig.encode("utf-8")).hexdigest()
params = {
"request_ts": unix, "request_ts": unix,
"request_sig": r_sig_hashed, "request_sig": r_sig_hashed,
"track_id": track_id, "track_id": track_id,
"format_id": fmt_id, "format_id": fmt_id,
"intent": 'stream'} "intent": "stream",
}
r = self.session.get(self.base + epoint, params=params) r = self.session.get(self.base + epoint, params=params)
# Do ref header. # Do ref header.
if epoint == "user/login?": if epoint == "user/login?":
if r.status_code == 401: if r.status_code == 401:
raise AuthenticationError('Invalid credentials.') raise AuthenticationError("Invalid credentials.")
elif r.status_code == 400: elif r.status_code == 400:
raise InvalidAppIdError('Invalid app id.') raise InvalidAppIdError("Invalid app id.")
else: else:
print('Logged: OK') print("Logged: OK")
elif epoint in ["track/getFileUrl?", "userLibrary/getAlbumsList?"]: elif epoint in ["track/getFileUrl?", "userLibrary/getAlbumsList?"]:
if r.status_code == 400: if r.status_code == 400:
raise InvalidAppSecretError('Invalid app secret.') raise InvalidAppSecretError("Invalid app secret.")
r.raise_for_status() r.raise_for_status()
return r.json() return r.json()
def auth(self, email, pwd): def auth(self, email, pwd):
usr_info = self.api_call('user/login?', email=email, pwd=pwd) usr_info = self.api_call("user/login?", email=email, pwd=pwd)
if not usr_info['user']['credential']['parameters']: if not usr_info["user"]["credential"]["parameters"]:
raise IneligibleError("Free accounts are not eligible to download tracks.") raise IneligibleError("Free accounts are not eligible to download tracks.")
self.uat = usr_info['user_auth_token'] self.uat = usr_info["user_auth_token"]
self.session.headers.update({ self.session.headers.update({"X-User-Auth-Token": self.uat})
"X-User-Auth-Token": self.uat}) self.label = usr_info["user"]["credential"]["parameters"]["short_label"]
self.label = usr_info['user']['credential']['parameters']['short_label'] print("Membership: {}".format(self.label))
print('Membership: {}'.format(self.label))
def get_album_meta(self, id): def get_album_meta(self, id):
return self.api_call('album/get?', id=id) return self.api_call("album/get?", id=id)
def get_track_meta(self, id): def get_track_meta(self, id):
return self.api_call('track/get?', id=id) return self.api_call("track/get?", id=id)
def get_track_url(self, id, fmt_id): def get_track_url(self, id, fmt_id):
return self.api_call('track/getFileUrl?', id=id, fmt_id=fmt_id) return self.api_call("track/getFileUrl?", id=id, fmt_id=fmt_id)
def search_albums(self, query, limit): def search_albums(self, query, limit):
return self.api_call('album/search?', query=query, limit=limit) return self.api_call("album/search?", query=query, limit=limit)
def search_tracks(self, query, limit): def search_tracks(self, query, limit):
return self.api_call('track/search?', query=query, limit=limit) return self.api_call("track/search?", query=query, limit=limit)
def test_secret(self, sec): def test_secret(self, sec):
try: try:
r = self.api_call('userLibrary/getAlbumsList?', sec=sec) r = self.api_call("userLibrary/getAlbumsList?", sec=sec)
return True return True
except InvalidAppSecretError: except InvalidAppSecretError:
return False return False