Merge pull request #14 from fc7/master

Download booklet and other minor improvements
This commit is contained in:
Vitiko 2020-12-07 11:16:31 -04:00 committed by GitHub
commit c570c25fc4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 105 additions and 59 deletions

3
.gitignore vendored
View File

@ -1,2 +1,3 @@
'Qobuz Downloads'
Qobuz Downloads
*__pycache*
.env

View File

@ -27,11 +27,21 @@ pip3 install -r requirements.txt --user
pip3 install windows-curses
pip3 install -r requirements.txt
```
#### Add your credentials to `config.py`
```python
email = "your@email.com"
password = "your_password"
#### Add your credentials to a `.env` file
```none
QOBUZ_EMAIL=your@email.com
QOBUZ_PW=your_password
```
NB: The .env file should be in the root folder, where main.py and config.py are located.
In addition to your credentials, you can also use the `.env` file to
change other default values by means of the following environment variables:
- `QOBUZ_FOLDER` (location of the download folder)
- `QOBUZ_LIMIT` (results limit)
- `QOBUZ_QUALITY` (default quality for url input mode)
#### Run qobuz-dl
##### Linux / MAC OS
```

View File

@ -1,13 +1,16 @@
import os
from dotenv import load_dotenv
load_dotenv()
# Qobuz credentials (Don't remove the quotes!)
email = "your@email.com"
password = "your_password"
email = os.getenv('QOBUZ_EMAIL')
password = os.getenv('QOBUZ_PW')
# Default folder where the releases are downloaded
default_folder = "Qobuz Downloads"
default_folder = os.getenv('QOBUZ_FOLDER', "Qobuz Downloads")
# Default per type results limit
default_limit = 10
default_limit = os.getenv('QOBUZ_LIMIT', 10)
# Default quality for url input mode. This will be ignored in interactive mode
# (5, 6, 7, 27) [320, LOSSLESS, 24B <96KHZ, 24B >96KHZ]
default_quality = 6
default_quality = os.getenv('QOBUZ_QUALITY', 6)

40
main.py
View File

@ -56,7 +56,7 @@ def get_id(url):
).group(1)
def searchSelected(Qz, path, albums, ids, types, quality):
def processSelected(Qz, path, albums, ids, types, quality):
q = ["5", "6", "7", "27"]
quality = q[quality[1]]
for alb, id_, type_ in zip(albums, ids, types):
@ -110,34 +110,42 @@ def interactive(Qz, path, limit, tracks=True):
while True:
query = input("\nEnter your search: [Ctrl + c to quit]\n- ")
print("Searching...")
if len(query.strip())==0:
break
start = Search(Qz, query, limit)
start.getResults(tracks)
if len(start.Total)==0:
break
Types.append(start.Types)
IDs.append(start.IDs)
title = (
"Select [space] the item(s) you want to download "
"(one or more)\nPress Ctrl + c to quit\n"
"(zero or more)\nPress Ctrl + c to quit\n"
)
Selected = pick(
start.Total, title, multiselect=True, min_selection_count=1
start.Total, title, multiselect=True, min_selection_count=0
)
Albums.append(Selected)
if len(Selected) > 0:
Albums.append(Selected)
y_n = pick(
["Yes", "No"],
"Items were added to queue to be downloaded. Keep searching?",
)
if y_n[0][0] == "N":
y_n = pick(
["Yes", "No"],
"Items were added to queue to be downloaded. Keep searching?",
)
if y_n[0][0] == "N":
break
else:
break
desc = (
"Select [intro] the quality (the quality will be automat"
"ically\ndowngraded if the selected is not found)"
)
Qualits = ["320", "Lossless", "Hi-res =< 96kHz", "Hi-Res > 96 kHz"]
quality = pick(Qualits, desc)
searchSelected(Qz, path, Albums, IDs, Types, quality)
if len(Albums)>0:
desc = (
"Select [intro] the quality (the quality will be automat"
"ically\ndowngraded if the selected is not found)"
)
Qualits = ["320", "Lossless", "Hi-res =< 96kHz", "Hi-Res > 96 kHz"]
quality = pick(Qualits, desc, default_index=1)
processSelected(Qz, path, Albums, IDs, Types, quality)
except KeyboardInterrupt:
sys.exit("\nBye")

View File

@ -33,6 +33,8 @@ def mkDir(dirn):
def getDesc(u, mt):
return "{} [{}/{}]".format(mt["title"], u["bit_depth"], u["sampling_rate"])
def getBooklet(i, dirn):
req_tqdm(i, dirn + "/booklet.pdf", "Downloading booklet")
def getCover(i, dirn):
req_tqdm(i, dirn + "/cover.jpg", "Downloading cover art")
@ -67,6 +69,8 @@ def iterateIDs(client, id, path, quality, album=False):
dirn = path + sanitized_title
mkDir(dirn)
getCover(meta["image"]["large"], dirn)
if "goodies" in meta:
getBooklet(meta["goodies"][0]["url"], dirn)
for i in meta["tracks"]["items"]:
parse = client.get_track_url(i["id"], quality)
try:

View File

@ -24,7 +24,7 @@ class Client:
self.session = requests.Session()
self.session.headers.update(
{
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:67.0) Gecko/20100101 Firefox/67.0",
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:83.0) Gecko/20100101 Firefox/83.0",
"X-App-Id": self.id,
}
)
@ -33,28 +33,24 @@ class Client:
self.cfg_setup()
def api_call(self, epoint, **kwargs):
if epoint == "user/login?":
if epoint == "user/login":
params = {
"email": kwargs["email"],
"password": kwargs["pwd"],
"app_id": self.id,
}
elif epoint == "track/get?":
elif epoint == "track/get":
params = {"track_id": kwargs["id"]}
elif epoint == "album/get?":
elif epoint == "album/get":
params = {"album_id": kwargs["id"]}
elif epoint == "track/search?":
params = {"query": kwargs["query"], "limit": kwargs["limit"]}
elif epoint == "album/search?":
params = {"query": kwargs["query"], "limit": kwargs["limit"]}
elif epoint == "playlist/get?":
elif epoint == "playlist/get":
params = {
"extra": "tracks",
"playlist_id": kwargs["id"],
"limit": 500,
"offset": kwargs["offset"],
}
elif epoint == "artist/get?":
elif epoint == "artist/get":
params = {
"app_id": self.id,
"artist_id": kwargs["id"],
@ -62,14 +58,14 @@ class Client:
"offset": kwargs["offset"],
"extra": "albums",
}
elif epoint == "label/get?":
elif epoint == "label/get":
params = {
"label_id": kwargs["id"],
"limit": 500,
"offset": kwargs["offset"],
"extra": "albums",
}
elif epoint == "userLibrary/getAlbumsList?":
elif epoint == "userLibrary/getAlbumsList":
unix = time.time()
r_sig = "userLibrarygetAlbumsList" + str(unix) + kwargs["sec"]
r_sig_hashed = hashlib.md5(r_sig.encode("utf-8")).hexdigest()
@ -79,7 +75,7 @@ class Client:
"request_ts": unix,
"request_sig": r_sig_hashed,
}
elif epoint == "track/getFileUrl?":
elif epoint == "track/getFileUrl":
unix = time.time()
track_id = kwargs["id"]
fmt_id = kwargs["fmt_id"]
@ -94,23 +90,25 @@ class Client:
"format_id": fmt_id,
"intent": "stream",
}
else:
params=kwargs
r = self.session.get(self.base + epoint, params=params)
# Do ref header.
if epoint == "user/login?":
if epoint == "user/login":
if r.status_code == 401:
raise AuthenticationError("Invalid credentials.")
elif r.status_code == 400:
raise InvalidAppIdError("Invalid app id.")
else:
print("Logged: OK")
elif epoint in ["track/getFileUrl?", "userLibrary/getAlbumsList?"]:
elif epoint in ["track/getFileUrl", "userLibrary/getAlbumsList"]:
if r.status_code == 400:
raise InvalidAppSecretError("Invalid app secret.")
r.raise_for_status()
return r.json()
def auth(self, email, pwd):
usr_info = self.api_call("user/login?", email=email, pwd=pwd)
usr_info = self.api_call("user/login", email=email, pwd=pwd)
if not usr_info["user"]["credential"]["parameters"]:
raise IneligibleError("Free accounts are not eligible to download tracks.")
self.uat = usr_info["user_auth_token"]
@ -135,32 +133,44 @@ class Client:
offset += 500
def get_album_meta(self, id):
return self.api_call("album/get?", id=id)
return self.api_call("album/get", id=id)
def get_track_meta(self, id):
return self.api_call("track/get?", id=id)
return self.api_call("track/get", id=id)
def get_track_url(self, id, fmt_id):
return self.api_call("track/getFileUrl?", id=id, fmt_id=fmt_id)
return self.api_call("track/getFileUrl", id=id, fmt_id=fmt_id)
def get_artist_meta(self, id):
return self.multi_meta("artist/get?", "albums_count", id, None)
return self.multi_meta("artist/get", "albums_count", id, None)
def get_plist_meta(self, id):
return self.multi_meta("playlist/get?", "tracks_count", id, None)
return self.multi_meta("playlist/get", "tracks_count", id, None)
def get_label_meta(self, id):
return self.multi_meta("label/get?", "albums_count", id, None)
return self.multi_meta("label/get", "albums_count", id, None)
def search_albums(self, query, limit):
return self.api_call("album/search?", query=query, limit=limit)
return self.api_call("album/search", query=query, limit=limit)
def search_tracks(self, query, limit):
return self.api_call("track/search?", query=query, limit=limit)
return self.api_call("track/search", query=query, limit=limit)
def get_favorite_albums(self, offset, limit):
return self.api_call("favorite/getUserFavorites", type="albums", offset=offset, limit=limit)
def get_favorite_tracks(self, offset, limit):
return self.api_call("favorite/getUserFavorites", type="tracks", offset=offset, limit=limit)
def get_favorite_artists(self, offset, limit):
return self.api_call("favorite/getUserFavorites", type="artists", offset=offset, limit=limit)
def get_user_playlists(self, limit):
return self.api_call("playlist/getUserPlaylists", limit=limit)
def test_secret(self, sec):
try:
r = self.api_call("userLibrary/getAlbumsList?", sec=sec)
r = self.api_call("userLibrary/getAlbumsList", sec=sec)
return True
except InvalidAppSecretError:
return False

View File

@ -28,14 +28,23 @@ class Search:
self.Total.append("[RELEASE] {} - {} - {} [{}]".format(*items))
self.appendInfo(i, True)
except KeyError:
items = (
i["performer"]["name"],
i["title"],
self.seconds(i["duration"]),
"HI-RES" if i["hires"] else "Lossless",
)
self.Total.append("[TRACK] {} - {} - {} [{}]".format(*items))
self.appendInfo(i, False)
try:
items = (
i["performer"]["name"],
i["title"],
self.seconds(i["duration"]),
"HI-RES" if i["hires"] else "Lossless",
)
self.Total.append("[TRACK] {} - {} - {} [{}]".format(*items))
self.appendInfo(i, False)
except KeyError:
items = (
i["title"],
self.seconds(i["duration"]),
"HI-RES" if i["hires"] else "Lossless",
)
self.Total.append("[TRACK] {} [{}]".format(*items))
self.appendInfo(i, False)
def getResults(self, tracks=False):
self.itResults(self.Albums)

View File

@ -3,3 +3,4 @@ requests==2.24.0
mutagen==1.45.1
tqdm==4.48.2
pick==0.6.7
python-dotenv==0.15.0