mirror of
https://github.com/rembo10/headphones.git
synced 2026-05-02 09:49:36 +01:00
update Beets
This commit is contained in:
@@ -16,32 +16,66 @@
|
||||
"""
|
||||
|
||||
import collections
|
||||
import time
|
||||
|
||||
import unidecode
|
||||
import requests
|
||||
import unidecode
|
||||
|
||||
from beets import ui
|
||||
from beets.autotag import AlbumInfo, TrackInfo
|
||||
from beets.plugins import MetadataSourcePlugin, BeetsPlugin
|
||||
from beets.dbcore import types
|
||||
from beets.library import DateType
|
||||
from beets.plugins import BeetsPlugin, MetadataSourcePlugin
|
||||
from beets.util.id_extractors import deezer_id_regex
|
||||
|
||||
|
||||
class DeezerPlugin(MetadataSourcePlugin, BeetsPlugin):
|
||||
data_source = 'Deezer'
|
||||
data_source = "Deezer"
|
||||
|
||||
item_types = {
|
||||
"deezer_track_rank": types.INTEGER,
|
||||
"deezer_track_id": types.INTEGER,
|
||||
"deezer_updated": DateType(),
|
||||
}
|
||||
|
||||
# Base URLs for the Deezer API
|
||||
# Documentation: https://developers.deezer.com/api/
|
||||
search_url = 'https://api.deezer.com/search/'
|
||||
album_url = 'https://api.deezer.com/album/'
|
||||
track_url = 'https://api.deezer.com/track/'
|
||||
search_url = "https://api.deezer.com/search/"
|
||||
album_url = "https://api.deezer.com/album/"
|
||||
track_url = "https://api.deezer.com/track/"
|
||||
|
||||
id_regex = {
|
||||
'pattern': r'(^|deezer\.com/)([a-z]*/)?({}/)?(\d+)',
|
||||
'match_group': 4,
|
||||
}
|
||||
id_regex = deezer_id_regex
|
||||
|
||||
def __init__(self):
|
||||
super().__init__()
|
||||
|
||||
def commands(self):
|
||||
"""Add beet UI commands to interact with Deezer."""
|
||||
deezer_update_cmd = ui.Subcommand(
|
||||
"deezerupdate", help=f"Update {self.data_source} rank"
|
||||
)
|
||||
|
||||
def func(lib, opts, args):
|
||||
items = lib.items(ui.decargs(args))
|
||||
self.deezerupdate(items, ui.should_write())
|
||||
|
||||
deezer_update_cmd.func = func
|
||||
|
||||
return [deezer_update_cmd]
|
||||
|
||||
def fetch_data(self, url):
|
||||
try:
|
||||
response = requests.get(url, timeout=10)
|
||||
response.raise_for_status()
|
||||
data = response.json()
|
||||
except requests.exceptions.RequestException as e:
|
||||
self._log.error("Error fetching data from {}\n Error: {}", url, e)
|
||||
return None
|
||||
if "error" in data:
|
||||
self._log.error("Deezer API error: {}", data["error"]["message"])
|
||||
return None
|
||||
return data
|
||||
|
||||
def album_for_id(self, album_id):
|
||||
"""Fetch an album by its Deezer ID or URL and return an
|
||||
AlbumInfo object or None if the album is not found.
|
||||
@@ -51,15 +85,20 @@ class DeezerPlugin(MetadataSourcePlugin, BeetsPlugin):
|
||||
:return: AlbumInfo object for album.
|
||||
:rtype: beets.autotag.hooks.AlbumInfo or None
|
||||
"""
|
||||
deezer_id = self._get_id('album', album_id)
|
||||
deezer_id = self._get_id("album", album_id, self.id_regex)
|
||||
if deezer_id is None:
|
||||
return None
|
||||
album_data = self.fetch_data(self.album_url + deezer_id)
|
||||
if album_data is None:
|
||||
return None
|
||||
contributors = album_data.get("contributors")
|
||||
if contributors is not None:
|
||||
artist, artist_id = self.get_artist(contributors)
|
||||
else:
|
||||
artist, artist_id = None, None
|
||||
|
||||
album_data = requests.get(self.album_url + deezer_id).json()
|
||||
artist, artist_id = self.get_artist(album_data['contributors'])
|
||||
|
||||
release_date = album_data['release_date']
|
||||
date_parts = [int(part) for part in release_date.split('-')]
|
||||
release_date = album_data["release_date"]
|
||||
date_parts = [int(part) for part in release_date.split("-")]
|
||||
num_date_parts = len(date_parts)
|
||||
|
||||
if num_date_parts == 3:
|
||||
@@ -76,12 +115,23 @@ class DeezerPlugin(MetadataSourcePlugin, BeetsPlugin):
|
||||
"Invalid `release_date` returned "
|
||||
"by {} API: '{}'".format(self.data_source, release_date)
|
||||
)
|
||||
|
||||
tracks_data = requests.get(
|
||||
self.album_url + deezer_id + '/tracks'
|
||||
).json()['data']
|
||||
tracks_obj = self.fetch_data(self.album_url + deezer_id + "/tracks")
|
||||
if tracks_obj is None:
|
||||
return None
|
||||
try:
|
||||
tracks_data = tracks_obj["data"]
|
||||
except KeyError:
|
||||
self._log.debug("Error fetching album tracks for {}", deezer_id)
|
||||
tracks_data = None
|
||||
if not tracks_data:
|
||||
return None
|
||||
while "next" in tracks_obj:
|
||||
tracks_obj = requests.get(
|
||||
tracks_obj["next"],
|
||||
timeout=10,
|
||||
).json()
|
||||
tracks_data.extend(tracks_obj["data"])
|
||||
|
||||
tracks = []
|
||||
medium_totals = collections.defaultdict(int)
|
||||
for i, track_data in enumerate(tracks_data, start=1):
|
||||
@@ -93,22 +143,24 @@ class DeezerPlugin(MetadataSourcePlugin, BeetsPlugin):
|
||||
track.medium_total = medium_totals[track.medium]
|
||||
|
||||
return AlbumInfo(
|
||||
album=album_data['title'],
|
||||
album=album_data["title"],
|
||||
album_id=deezer_id,
|
||||
deezer_album_id=deezer_id,
|
||||
artist=artist,
|
||||
artist_credit=self.get_artist([album_data['artist']])[0],
|
||||
artist_credit=self.get_artist([album_data["artist"]])[0],
|
||||
artist_id=artist_id,
|
||||
tracks=tracks,
|
||||
albumtype=album_data['record_type'],
|
||||
va=len(album_data['contributors']) == 1
|
||||
and artist.lower() == 'various artists',
|
||||
albumtype=album_data["record_type"],
|
||||
va=len(album_data["contributors"]) == 1
|
||||
and artist.lower() == "various artists",
|
||||
year=year,
|
||||
month=month,
|
||||
day=day,
|
||||
label=album_data['label'],
|
||||
label=album_data["label"],
|
||||
mediums=max(medium_totals.keys()),
|
||||
data_source=self.data_source,
|
||||
data_url=album_data['link'],
|
||||
data_url=album_data["link"],
|
||||
cover_art_url=album_data.get("cover_xl"),
|
||||
)
|
||||
|
||||
def _get_track(self, track_data):
|
||||
@@ -120,19 +172,23 @@ class DeezerPlugin(MetadataSourcePlugin, BeetsPlugin):
|
||||
:rtype: beets.autotag.hooks.TrackInfo
|
||||
"""
|
||||
artist, artist_id = self.get_artist(
|
||||
track_data.get('contributors', [track_data['artist']])
|
||||
track_data.get("contributors", [track_data["artist"]])
|
||||
)
|
||||
return TrackInfo(
|
||||
title=track_data['title'],
|
||||
track_id=track_data['id'],
|
||||
title=track_data["title"],
|
||||
track_id=track_data["id"],
|
||||
deezer_track_id=track_data["id"],
|
||||
isrc=track_data.get("isrc"),
|
||||
artist=artist,
|
||||
artist_id=artist_id,
|
||||
length=track_data['duration'],
|
||||
index=track_data['track_position'],
|
||||
medium=track_data['disk_number'],
|
||||
medium_index=track_data['track_position'],
|
||||
length=track_data["duration"],
|
||||
index=track_data.get("track_position"),
|
||||
medium=track_data.get("disk_number"),
|
||||
deezer_track_rank=track_data.get("rank"),
|
||||
medium_index=track_data.get("track_position"),
|
||||
data_source=self.data_source,
|
||||
data_url=track_data['link'],
|
||||
data_url=track_data["link"],
|
||||
deezer_updated=time.time(),
|
||||
)
|
||||
|
||||
def track_for_id(self, track_id=None, track_data=None):
|
||||
@@ -149,29 +205,40 @@ class DeezerPlugin(MetadataSourcePlugin, BeetsPlugin):
|
||||
:rtype: beets.autotag.hooks.TrackInfo or None
|
||||
"""
|
||||
if track_data is None:
|
||||
deezer_id = self._get_id('track', track_id)
|
||||
deezer_id = self._get_id("track", track_id, self.id_regex)
|
||||
if deezer_id is None:
|
||||
return None
|
||||
track_data = requests.get(self.track_url + deezer_id).json()
|
||||
track_data = self.fetch_data(self.track_url + deezer_id)
|
||||
if track_data is None:
|
||||
return None
|
||||
track = self._get_track(track_data)
|
||||
|
||||
# Get album's tracks to set `track.index` (position on the entire
|
||||
# release) and `track.medium_total` (total number of tracks on
|
||||
# the track's disc).
|
||||
album_tracks_data = requests.get(
|
||||
self.album_url + str(track_data['album']['id']) + '/tracks'
|
||||
).json()['data']
|
||||
album_tracks_obj = self.fetch_data(
|
||||
self.album_url + str(track_data["album"]["id"]) + "/tracks"
|
||||
)
|
||||
if album_tracks_obj is None:
|
||||
return None
|
||||
try:
|
||||
album_tracks_data = album_tracks_obj["data"]
|
||||
except KeyError:
|
||||
self._log.debug(
|
||||
"Error fetching album tracks for {}", track_data["album"]["id"]
|
||||
)
|
||||
return None
|
||||
medium_total = 0
|
||||
for i, track_data in enumerate(album_tracks_data, start=1):
|
||||
if track_data['disk_number'] == track.medium:
|
||||
if track_data["disk_number"] == track.medium:
|
||||
medium_total += 1
|
||||
if track_data['id'] == track.track_id:
|
||||
if track_data["id"] == track.track_id:
|
||||
track.index = i
|
||||
track.medium_total = medium_total
|
||||
return track
|
||||
|
||||
@staticmethod
|
||||
def _construct_search_query(filters=None, keywords=''):
|
||||
def _construct_search_query(filters=None, keywords=""):
|
||||
"""Construct a query string with the specified filters and keywords to
|
||||
be provided to the Deezer Search API
|
||||
(https://developers.deezer.com/api/search).
|
||||
@@ -185,14 +252,14 @@ class DeezerPlugin(MetadataSourcePlugin, BeetsPlugin):
|
||||
"""
|
||||
query_components = [
|
||||
keywords,
|
||||
' '.join(f'{k}:"{v}"' for k, v in filters.items()),
|
||||
" ".join(f'{k}:"{v}"' for k, v in filters.items()),
|
||||
]
|
||||
query = ' '.join([q for q in query_components if q])
|
||||
query = " ".join([q for q in query_components if q])
|
||||
if not isinstance(query, str):
|
||||
query = query.decode('utf8')
|
||||
query = query.decode("utf8")
|
||||
return unidecode.unidecode(query)
|
||||
|
||||
def _search_api(self, query_type, filters=None, keywords=''):
|
||||
def _search_api(self, query_type, filters=None, keywords=""):
|
||||
"""Query the Deezer Search API for the specified ``keywords``, applying
|
||||
the provided ``filters``.
|
||||
|
||||
@@ -208,19 +275,17 @@ class DeezerPlugin(MetadataSourcePlugin, BeetsPlugin):
|
||||
if no search results are returned.
|
||||
:rtype: dict or None
|
||||
"""
|
||||
query = self._construct_search_query(
|
||||
keywords=keywords, filters=filters
|
||||
)
|
||||
query = self._construct_search_query(keywords=keywords, filters=filters)
|
||||
if not query:
|
||||
return None
|
||||
self._log.debug(
|
||||
f"Searching {self.data_source} for '{query}'"
|
||||
)
|
||||
self._log.debug(f"Searching {self.data_source} for '{query}'")
|
||||
response = requests.get(
|
||||
self.search_url + query_type, params={'q': query}
|
||||
self.search_url + query_type,
|
||||
params={"q": query},
|
||||
timeout=10,
|
||||
)
|
||||
response.raise_for_status()
|
||||
response_data = response.json().get('data', [])
|
||||
response_data = response.json().get("data", [])
|
||||
self._log.debug(
|
||||
"Found {} result(s) from {} for '{}'",
|
||||
len(response_data),
|
||||
@@ -228,3 +293,30 @@ class DeezerPlugin(MetadataSourcePlugin, BeetsPlugin):
|
||||
query,
|
||||
)
|
||||
return response_data
|
||||
|
||||
def deezerupdate(self, items, write):
|
||||
"""Obtain rank information from Deezer."""
|
||||
for index, item in enumerate(items, start=1):
|
||||
self._log.info(
|
||||
"Processing {}/{} tracks - {} ", index, len(items), item
|
||||
)
|
||||
try:
|
||||
deezer_track_id = item.deezer_track_id
|
||||
except AttributeError:
|
||||
self._log.debug("No deezer_track_id present for: {}", item)
|
||||
continue
|
||||
try:
|
||||
rank = self.fetch_data(
|
||||
f"{self.track_url}{deezer_track_id}"
|
||||
).get("rank")
|
||||
self._log.debug(
|
||||
"Deezer track: {} has {} rank", deezer_track_id, rank
|
||||
)
|
||||
except Exception as e:
|
||||
self._log.debug("Invalid Deezer track_id: {}", e)
|
||||
continue
|
||||
item.deezer_track_rank = int(rank)
|
||||
item.store()
|
||||
item.deezer_updated = time.time()
|
||||
if write:
|
||||
item.try_write()
|
||||
|
||||
Reference in New Issue
Block a user