Added a music scanner to read metadata from audio files rather than using folder names/xml

This commit is contained in:
Remy
2011-07-11 14:23:11 -07:00
parent fd98828cd1
commit 8f999111c5
42 changed files with 15334 additions and 14 deletions

View File

@@ -0,0 +1,599 @@
# This file is part of beets.
# Copyright 2011, Adrian Sampson.
#
# Permission is hereby granted, free of charge, to any person obtaining
# a copy of this software and associated documentation files (the
# "Software"), to deal in the Software without restriction, including
# without limitation the rights to use, copy, modify, merge, publish,
# distribute, sublicense, and/or sell copies of the Software, and to
# permit persons to whom the Software is furnished to do so, subject to
# the following conditions:
#
# The above copyright notice and this permission notice shall be
# included in all copies or substantial portions of the Software.
"""Facilities for automatically determining files' correct metadata.
"""
import os
import logging
from collections import defaultdict
import re
from lib.munkres import Munkres
from unidecode import unidecode
from lib.beets.autotag import mb
from lib.beets import library, mediafile, plugins
from lib.beets.util import levenshtein, sorted_walk
# Try 5 releases. In the future, this should be more dynamic: let the
# probability of continuing to the next release be inversely
# proportional to how good our current best is and how long we've
# already taken.
MAX_CANDIDATES = 5
# Distance parameters.
# Text distance weights: proportions on the normalized intuitive edit
# distance.
ARTIST_WEIGHT = 3.0
ALBUM_WEIGHT = 3.0
# The weight of the entire distance calculated for a given track.
TRACK_WEIGHT = 1.0
# These distances are components of the track distance (that is, they
# compete against each other but not ARTIST_WEIGHT and ALBUM_WEIGHT;
# the overall TRACK_WEIGHT does that).
TRACK_TITLE_WEIGHT = 3.0
# Used instead of a global artist penalty for various-artist matches.
TRACK_ARTIST_WEIGHT = 2.0
# Added when the indices of tracks don't match.
TRACK_INDEX_WEIGHT = 1.0
# Track length weights: no penalty before GRACE, maximum (WEIGHT)
# penalty at GRACE+MAX discrepancy.
TRACK_LENGTH_GRACE = 10
TRACK_LENGTH_MAX = 30
TRACK_LENGTH_WEIGHT = 2.0
# MusicBrainz track ID matches.
TRACK_ID_WEIGHT = 5.0
# Recommendation constants.
RECOMMEND_STRONG = 'RECOMMEND_STRONG'
RECOMMEND_MEDIUM = 'RECOMMEND_MEDIUM'
RECOMMEND_NONE = 'RECOMMEND_NONE'
# Thresholds for recommendations.
STRONG_REC_THRESH = 0.04
MEDIUM_REC_THRESH = 0.25
REC_GAP_THRESH = 0.25
# Parameters for string distance function.
# Words that can be moved to the end of a string using a comma.
SD_END_WORDS = ['the', 'a', 'an']
# Reduced weights for certain portions of the string.
SD_PATTERNS = [
(r'^the ', 0.1),
(r'[\[\(]?(ep|single)[\]\)]?', 0.0),
(r'[\[\(]?(featuring|feat|ft)[\. :].+', 0.1),
(r'\(.*?\)', 0.3),
(r'\[.*?\]', 0.3),
(r'(, )?(pt\.|part) .+', 0.2),
]
# Replacements to use before testing distance.
SD_REPLACE = [
(r'&', 'and'),
]
# Artist signals that indicate "various artists".
VA_ARTISTS = (u'', u'various artists', u'va', u'unknown')
# Autotagging exceptions.
class AutotagError(Exception):
pass
# Global logger.
log = logging.getLogger('beets')
def albums_in_dir(path):
"""Recursively searches the given directory and returns an iterable
of (path, items) where path is a containing directory and items is
a list of Items that is probably an album. Specifically, any folder
containing any media files is an album.
"""
for root, dirs, files in sorted_walk(path):
# Get a list of items in the directory.
items = []
for filename in files:
try:
i = library.Item.from_path(os.path.join(root, filename))
except mediafile.FileTypeError:
pass
except mediafile.UnreadableFileError:
log.warn('unreadable file: ' + filename)
else:
items.append(i)
# If it's nonempty, yield it.
if items:
yield root, items
def _string_dist_basic(str1, str2):
"""Basic edit distance between two strings, ignoring
non-alphanumeric characters and case. Comparisons are based on a
transliteration/lowering to ASCII characters. Normalized by string
length.
"""
str1 = unidecode(str1)
str2 = unidecode(str2)
str1 = re.sub(r'[^a-z0-9]', '', str1.lower())
str2 = re.sub(r'[^a-z0-9]', '', str2.lower())
if not str1 and not str2:
return 0.0
return levenshtein(str1, str2) / float(max(len(str1), len(str2)))
def string_dist(str1, str2):
"""Gives an "intuitive" edit distance between two strings. This is
an edit distance, normalized by the string length, with a number of
tweaks that reflect intuition about text.
"""
str1 = str1.lower()
str2 = str2.lower()
# Don't penalize strings that move certain words to the end. For
# example, "the something" should be considered equal to
# "something, the".
for word in SD_END_WORDS:
if str1.endswith(', %s' % word):
str1 = '%s %s' % (word, str1[:-len(word)-2])
if str2.endswith(', %s' % word):
str2 = '%s %s' % (word, str2[:-len(word)-2])
# Perform a couple of basic normalizing substitutions.
for pat, repl in SD_REPLACE:
str1 = re.sub(pat, repl, str1)
str2 = re.sub(pat, repl, str2)
# Change the weight for certain string portions matched by a set
# of regular expressions. We gradually change the strings and build
# up penalties associated with parts of the string that were
# deleted.
base_dist = _string_dist_basic(str1, str2)
penalty = 0.0
for pat, weight in SD_PATTERNS:
# Get strings that drop the pattern.
case_str1 = re.sub(pat, '', str1)
case_str2 = re.sub(pat, '', str2)
if case_str1 != str1 or case_str2 != str2:
# If the pattern was present (i.e., it is deleted in the
# the current case), recalculate the distances for the
# modified strings.
case_dist = _string_dist_basic(case_str1, case_str2)
case_delta = max(0.0, base_dist - case_dist)
if case_delta == 0.0:
continue
# Shift our baseline strings down (to avoid rematching the
# same part of the string) and add a scaled distance
# amount to the penalties.
str1 = case_str1
str2 = case_str2
base_dist = case_dist
penalty += weight * case_delta
dist = base_dist + penalty
return dist
def _plurality(objs):
"""Given a sequence of comparable objects, returns the object that
is most common in the set and if it is the only object is the set.
"""
# Calculate frequencies.
freqs = defaultdict(int)
for obj in objs:
freqs[obj] += 1
# Find object with maximum frequency.
max_freq = 0
res = None
for obj, freq in freqs.items():
if freq > max_freq:
max_freq = freq
res = obj
return res, len(freqs) <= 1
def current_metadata(items):
"""Returns the most likely artist and album for a set of Items.
Each is determined by tag reflected by the plurality of the Items.
"""
keys = 'artist', 'album'
likelies = {}
consensus = {}
for key in keys:
values = [getattr(item, key) for item in items]
likelies[key], consensus[key] = _plurality(values)
return likelies['artist'], likelies['album'], consensus['artist']
def order_items(items, trackinfo):
"""Orders the items based on how they match some canonical track
information. This always produces a result if the numbers of tracks
match.
"""
# Make sure lengths match.
if len(items) != len(trackinfo):
return None
# Construct the cost matrix.
costs = []
for cur_item in items:
row = []
for i, canon_item in enumerate(trackinfo):
row.append(track_distance(cur_item, canon_item, i+1))
costs.append(row)
# Find a minimum-cost bipartite matching.
matching = Munkres().compute(costs)
# Order items based on the matching.
ordered_items = [None]*len(items)
for cur_idx, canon_idx in matching:
ordered_items[canon_idx] = items[cur_idx]
return ordered_items
def track_distance(item, track_data, track_index=None, incl_artist=False):
"""Determines the significance of a track metadata change. Returns
a float in [0.0,1.0]. `track_index` is the track number of the
`track_data` metadata set. If `track_index` is provided and
item.track is set, then these indices are used as a component of
the distance calculation. `incl_artist` indicates that a distance
component should be included for the track artist (i.e., for
various-artist releases).
"""
# Distance and normalization accumulators.
dist, dist_max = 0.0, 0.0
# Check track length.
if 'length' not in track_data:
# If there's no length to check, assume the worst.
dist += TRACK_LENGTH_WEIGHT
else:
diff = abs(item.length - track_data['length'])
diff = max(diff - TRACK_LENGTH_GRACE, 0.0)
diff = min(diff, TRACK_LENGTH_MAX)
dist += (diff / TRACK_LENGTH_MAX) * TRACK_LENGTH_WEIGHT
dist_max += TRACK_LENGTH_WEIGHT
# Track title.
dist += string_dist(item.title, track_data['title']) * TRACK_TITLE_WEIGHT
dist_max += TRACK_TITLE_WEIGHT
# Track artist, if included.
# Attention: MB DB does not have artist info for all compilations,
# so only check artist distance if there is actually an artist in
# the MB track data.
if incl_artist and 'artist' in track_data:
dist += string_dist(item.artist, track_data['artist']) * \
TRACK_ARTIST_WEIGHT
dist_max += TRACK_ARTIST_WEIGHT
# Track index.
if track_index and item.track:
if track_index != item.track:
dist += TRACK_INDEX_WEIGHT
dist_max += TRACK_INDEX_WEIGHT
# MusicBrainz track ID.
if item.mb_trackid:
if item.mb_trackid != track_data['id']:
dist += TRACK_ID_WEIGHT
dist_max += TRACK_ID_WEIGHT
# Plugin distances.
plugin_d, plugin_dm = plugins.track_distance(item, track_data)
dist += plugin_d
dist_max += plugin_dm
return dist / dist_max
def distance(items, info):
"""Determines how "significant" an album metadata change would be.
Returns a float in [0.0,1.0]. The list of items must be ordered.
"""
cur_artist, cur_album, _ = current_metadata(items)
cur_artist = cur_artist or ''
cur_album = cur_album or ''
# These accumulate the possible distance components. The final
# distance will be dist/dist_max.
dist = 0.0
dist_max = 0.0
# Artist/album metadata.
if not info['va']:
dist += string_dist(cur_artist, info['artist']) * ARTIST_WEIGHT
dist_max += ARTIST_WEIGHT
dist += string_dist(cur_album, info['album']) * ALBUM_WEIGHT
dist_max += ALBUM_WEIGHT
# Track distances.
for i, (item, track_data) in enumerate(zip(items, info['tracks'])):
dist += track_distance(item, track_data, i+1, info['va']) * \
TRACK_WEIGHT
dist_max += TRACK_WEIGHT
# Plugin distances.
plugin_d, plugin_dm = plugins.album_distance(items, info)
dist += plugin_d
dist_max += plugin_dm
# Normalize distance, avoiding divide-by-zero.
if dist_max == 0.0:
return 0.0
else:
return dist/dist_max
def apply_item_metadata(item, track_data):
"""Set an item's metadata from its matched info dictionary.
"""
item.artist = track_data['artist']
item.title = track_data['title']
item.mb_trackid = track_data['id']
if 'artist_id' in track_data:
item.mb_artistid = track_data['artist_id']
# At the moment, the other metadata is left intact (including album
# and track number). Perhaps these should be emptied?
def apply_metadata(items, info):
"""Set the items' metadata to match the data given in info. The
list of items must be ordered.
"""
for index, (item, track_data) in enumerate(zip(items, info['tracks'])):
# Album, artist, track count.
if 'artist' in track_data:
item.artist = track_data['artist']
else:
item.artist = info['artist']
item.albumartist = info['artist']
item.album = info['album']
item.tracktotal = len(items)
# Release date.
if 'year' in info:
item.year = info['year']
if 'month' in info:
item.month = info['month']
if 'day' in info:
item.day = info['day']
# Title and track index.
item.title = track_data['title']
item.track = index + 1
# MusicBrainz IDs.
item.mb_trackid = track_data['id']
item.mb_albumid = info['album_id']
if 'artist_id' in track_data:
item.mb_artistid = track_data['artist_id']
else:
item.mb_artistid = info['artist_id']
item.mb_albumartistid = info['artist_id']
item.albumtype = info['albumtype']
# Compilation flag.
item.comp = info['va']
def match_by_id(items):
"""If the items are tagged with a MusicBrainz album ID, returns an
info dict for the corresponding album. Otherwise, returns None.
"""
# Is there a consensus on the MB album ID?
albumids = [item.mb_albumid for item in items if item.mb_albumid]
if not albumids:
log.debug('No album IDs found.')
return None
# If all album IDs are equal, look up the album.
if bool(reduce(lambda x,y: x if x==y else (), albumids)):
albumid = albumids[0]
log.debug('Searching for discovered album ID: ' + albumid)
return mb.album_for_id(albumid)
else:
log.debug('No album ID consensus.')
return None
#fixme In the future, at the expense of performance, we could use
# other IDs (i.e., track and artist) in case the album tag isn't
# present, but that event seems very unlikely.
def recommendation(results):
"""Given a sorted list of result tuples, returns a recommendation
flag (RECOMMEND_STRONG, RECOMMEND_MEDIUM, RECOMMEND_NONE) based
on the results' distances.
"""
if not results:
# No candidates: no recommendation.
rec = RECOMMEND_NONE
else:
min_dist = results[0][0]
if min_dist < STRONG_REC_THRESH:
# Strong recommendation level.
rec = RECOMMEND_STRONG
elif len(results) == 1:
# Only a single candidate. Medium recommendation.
rec = RECOMMEND_MEDIUM
elif min_dist <= MEDIUM_REC_THRESH:
# Medium recommendation level.
rec = RECOMMEND_MEDIUM
elif results[1][0] - min_dist >= REC_GAP_THRESH:
# Gap between first two candidates is large.
rec = RECOMMEND_MEDIUM
else:
# No conclusion.
rec = RECOMMEND_NONE
return rec
def validate_candidate(items, tuple_dict, info):
"""Given a candidate info dict, attempt to add the candidate to
the output dictionary of result tuples. This involves checking
the track count, ordering the items, checking for duplicates, and
calculating the distance.
"""
log.debug('Candidate: %s - %s' % (info['artist'], info['album']))
# Don't duplicate.
if info['album_id'] in tuple_dict:
log.debug('Duplicate.')
return
# Make sure the album has the correct number of tracks.
if len(items) != len(info['tracks']):
log.debug('Track count mismatch.')
return
# Put items in order.
ordered = order_items(items, info['tracks'])
if not ordered:
log.debug('Not orderable.')
return
# Get the change distance.
dist = distance(ordered, info)
log.debug('Success. Distance: %f' % dist)
tuple_dict[info['album_id']] = dist, ordered, info
def tag_album(items, timid=False, search_artist=None, search_album=None,
search_id=None):
"""Bundles together the functionality used to infer tags for a
set of items comprised by an album. Returns everything relevant:
- The current artist.
- The current album.
- A list of (distance, items, info) tuples where info is a
dictionary containing the inferred tags and items is a
reordered version of the input items list. The candidates are
sorted by distance (i.e., best match first).
- A recommendation, one of RECOMMEND_STRONG, RECOMMEND_MEDIUM,
or RECOMMEND_NONE; indicating that the first candidate is
very likely, it is somewhat likely, or no conclusion could
be reached.
If search_artist and search_album or search_id are provided, then
they are used as search terms in place of the current metadata.
May raise an AutotagError if existing metadata is insufficient.
"""
# Get current metadata.
cur_artist, cur_album, artist_consensus = current_metadata(items)
log.debug('Tagging %s - %s' % (cur_artist, cur_album))
# The output result tuples (keyed by MB album ID).
out_tuples = {}
# Try to find album indicated by MusicBrainz IDs.
if search_id:
log.debug('Searching for album ID: ' + search_id)
id_info = mb.album_for_id(search_id)
else:
id_info = match_by_id(items)
if id_info:
validate_candidate(items, out_tuples, id_info)
rec = recommendation(out_tuples.values())
log.debug('Album ID match recommendation is ' + str(rec))
if out_tuples and not timid:
# If we have a very good MBID match, return immediately.
# Otherwise, this match will compete against metadata-based
# matches.
if rec == RECOMMEND_STRONG:
log.debug('ID match.')
return cur_artist, cur_album, out_tuples.values(), rec
# If searching by ID, don't continue to metadata search.
if search_id is not None:
if out_tuples:
return cur_artist, cur_album, out_tuples.values(), rec
else:
return cur_artist, cur_album, [], RECOMMEND_NONE
# Search terms.
if not (search_artist and search_album):
# No explicit search terms -- use current metadata.
search_artist, search_album = cur_artist, cur_album
log.debug(u'Search terms: %s - %s' % (search_artist, search_album))
# Get candidate metadata from search.
if search_artist and search_album:
candidates = mb.match_album(search_artist, search_album,
len(items), MAX_CANDIDATES)
candidates = list(candidates)
else:
candidates = []
# Possibly add "various artists" search.
if search_album and ((not artist_consensus) or \
(search_artist.lower() in VA_ARTISTS) or \
any(item.comp for item in items)):
log.debug(u'Possibly Various Artists; adding matches.')
candidates.extend(mb.match_album(None, search_album, len(items),
MAX_CANDIDATES))
# Get candidates from plugins.
candidates.extend(plugins.candidates(items))
# Get the distance to each candidate.
log.debug(u'Evaluating %i candidates.' % len(candidates))
for info in candidates:
validate_candidate(items, out_tuples, info)
# Sort by distance.
out_tuples = out_tuples.values()
out_tuples.sort()
rec = recommendation(out_tuples)
return cur_artist, cur_album, out_tuples, rec
def tag_item(item, timid=False, search_artist=None, search_title=None,
search_id=None):
"""Attempts to find metadata for a single track. Returns a
`(candidates, recommendation)` pair where `candidates` is a list
of `(distance, track_info)` pairs. `search_artist` and
`search_title` may be used to override the current metadata for
the purposes of the MusicBrainz title; likewise `search_id`.
"""
candidates = []
# First, try matching by MusicBrainz ID.
trackid = search_id or item.mb_trackid
if trackid:
log.debug('Searching for track ID: ' + trackid)
track_info = mb.track_for_id(trackid)
if track_info:
dist = track_distance(item, track_info, incl_artist=True)
candidates.append((dist, track_info))
# If this is a good match, then don't keep searching.
rec = recommendation(candidates)
if rec == RECOMMEND_STRONG and not timid:
log.debug('Track ID match.')
return candidates, rec
# If we're searching by ID, don't proceed.
if search_id is not None:
if candidates:
return candidates, rec
else:
return [], RECOMMEND_NONE
# Search terms.
if not (search_artist and search_title):
search_artist, search_title = item.artist, item.title
log.debug(u'Item search terms: %s - %s' % (search_artist, search_title))
# Candidate metadata from search.
for track_info in mb.match_track(search_artist, search_title):
dist = track_distance(item, track_info, incl_artist=True)
candidates.append((dist, track_info))
# Add candidates from plugins.
for track_info in plugins.item_candidates(item):
dist = track_distance(item, track_info, incl_artist=True)
candidates.append((dist, track_info))
# Sort by distance and return with recommendation.
log.debug('Found %i candidates.' % len(candidates))
candidates.sort()
rec = recommendation(candidates)
return candidates, rec

77
lib/beets/autotag/art.py Normal file
View File

@@ -0,0 +1,77 @@
# This file is part of beets.
# Copyright 2010, Adrian Sampson.
#
# Permission is hereby granted, free of charge, to any person obtaining
# a copy of this software and associated documentation files (the
# "Software"), to deal in the Software without restriction, including
# without limitation the rights to use, copy, modify, merge, publish,
# distribute, sublicense, and/or sell copies of the Software, and to
# permit persons to whom the Software is furnished to do so, subject to
# the following conditions:
#
# The above copyright notice and this permission notice shall be
# included in all copies or substantial portions of the Software.
"""Finding album art for tagged albums."""
import urllib
import sys
import logging
from lib.beets.autotag.mb import album_for_id
# The common logger.
log = logging.getLogger('beets')
# Art from Amazon.
AMAZON_URL = 'http://images.amazon.com/images/P/%s.%02i.LZZZZZZZ.jpg'
AMAZON_INDICES = (1,2)
AMAZON_CONTENT_TYPE = 'image/jpeg'
def art_for_asin(asin):
"""Fetches art for an Amazon ID (ASIN) string."""
for index in AMAZON_INDICES:
# Fetch the image.
url = AMAZON_URL % (asin, index)
try:
log.debug('Downloading art: %s' % url)
fn, headers = urllib.urlretrieve(url)
except IOError:
log.debug('error fetching art at URL %s' % url)
continue
# Make sure it's actually an image.
if headers.gettype() == AMAZON_CONTENT_TYPE:
log.debug('Downloaded art to: %s' % fn)
return fn
# Main interface.
def art_for_album(album):
"""Given an album info dictionary from MusicBrainz, returns a path
to downloaded art for the album (or None if no art is found).
"""
if album['asin']:
log.debug('Fetching album art for ASIN %s.' % album['asin'])
return art_for_asin(album['asin'])
else:
log.debug('No ASIN available: no art found.')
return None
# Smoke test.
if __name__ == '__main__':
aid = sys.argv[1]
album = album_for_id(aid)
if not album:
print 'album not found'
else:
fn = art_for_album(album)
if fn:
print fn
print len(open(fn).read())/1024
else:
print 'no art found'

333
lib/beets/autotag/mb.py Normal file
View File

@@ -0,0 +1,333 @@
# This file is part of beets.
# Copyright 2011, Adrian Sampson.
#
# Permission is hereby granted, free of charge, to any person obtaining
# a copy of this software and associated documentation files (the
# "Software"), to deal in the Software without restriction, including
# without limitation the rights to use, copy, modify, merge, publish,
# distribute, sublicense, and/or sell copies of the Software, and to
# permit persons to whom the Software is furnished to do so, subject to
# the following conditions:
#
# The above copyright notice and this permission notice shall be
# included in all copies or substantial portions of the Software.
"""Searches for albums in the MusicBrainz database.
This is a thin layer over the official `python-musicbrainz2` module. It
abstracts away that module's object model, the server's Lucene query
syntax, and other uninteresting parts of using musicbrainz2. The
principal interface is the function `match_album`.
"""
from __future__ import with_statement # for Python 2.5
import re
import time
import logging
import lib.musicbrainz2.webservice as mbws
from lib.musicbrainz2.model import Release
from threading import Lock
from lib.musicbrainz2.model import VARIOUS_ARTISTS_ID
SEARCH_LIMIT = 10
VARIOUS_ARTISTS_ID = VARIOUS_ARTISTS_ID.rsplit('/', 1)[1]
class ServerBusyError(Exception): pass
class BadResponseError(Exception): pass
log = logging.getLogger('beets')
# We hard-code IDs for artists that can't easily be searched for.
SPECIAL_CASE_ARTISTS = {
'!!!': 'f26c72d3-e52c-467b-b651-679c73d8e1a7',
}
RELEASE_TYPES = [
Release.TYPE_ALBUM,
Release.TYPE_SINGLE,
Release.TYPE_EP,
Release.TYPE_COMPILATION,
Release.TYPE_SOUNDTRACK,
Release.TYPE_SPOKENWORD,
Release.TYPE_INTERVIEW,
Release.TYPE_AUDIOBOOK,
Release.TYPE_LIVE,
Release.TYPE_REMIX,
Release.TYPE_OTHER
]
RELEASE_INCLUDES = mbws.ReleaseIncludes(artist=True, tracks=True,
releaseEvents=True, labels=True,
releaseGroup=True)
TRACK_INCLUDES = mbws.TrackIncludes(artist=True)
# MusicBrainz requires that a client does not query the server more
# than once a second. This function enforces that limit using a
# module-global variable to keep track of the last time a query was
# sent.
MAX_QUERY_RETRY = 8
QUERY_WAIT_TIME = 1.0
last_query_time = 0.0
mb_lock = Lock()
def _query_wrap(fun, *args, **kwargs):
"""Wait until at least `QUERY_WAIT_TIME` seconds have passed since
the last invocation of this function. Then call
fun(*args, **kwargs). If it fails due to a "server busy" message,
then try again. Tries up to `MAX_QUERY_RETRY` times before
giving up.
"""
with mb_lock:
global last_query_time
for i in range(MAX_QUERY_RETRY):
since_last_query = time.time() - last_query_time
if since_last_query < QUERY_WAIT_TIME:
time.sleep(QUERY_WAIT_TIME - since_last_query)
last_query_time = time.time()
try:
# Try the function.
res = fun(*args, **kwargs)
except mbws.WebServiceError, e:
# Server busy. Retry.
message = str(e.reason)
for errnum in (503, 504):
if 'Error %i' % errnum in message:
break
else:
# This is not the error we're looking for.
raise
except mbws.ConnectionError:
# Typically a timeout.
pass
except mbws.ResponseError, exc:
# Malformed response from server.
log.error('Bad response from MusicBrainz: ' + str(exc))
raise BadResponseError()
else:
# Success. Return the result.
return res
# Gave up.
raise ServerBusyError()
# FIXME exponential backoff?
def get_releases(**params):
"""Given a list of parameters to ReleaseFilter, executes the
query and yields release dicts (complete with tracks).
"""
# Replace special cases.
if 'artistName' in params:
artist = params['artistName']
if artist in SPECIAL_CASE_ARTISTS:
del params['artistName']
params['artistId'] = SPECIAL_CASE_ARTISTS[artist]
# Issue query.
filt = mbws.ReleaseFilter(**params)
try:
results = _query_wrap(mbws.Query().getReleases, filter=filt)
except BadResponseError:
results = ()
# Construct results.
for result in results:
release = result.release
tracks, _ = release_info(release.id)
yield release_dict(release, tracks)
def release_info(release_id):
"""Given a MusicBrainz release ID, fetch a list of tracks on the
release and the release group ID. If the release is not found,
returns None.
"""
try:
release = _query_wrap(mbws.Query().getReleaseById, release_id,
RELEASE_INCLUDES)
except BadResponseError:
release = None
if release:
return release.getTracks(), release.getReleaseGroup().getId()
else:
return None
def _lucene_escape(text):
"""Escapes a string so it may be used verbatim in a Lucene query
string.
"""
# Regex stolen from MusicBrainz Picard.
out = re.sub(r'([+\-&|!(){}\[\]\^"~*?:\\])', r'\\\1', text)
return out.replace('\x00', '')
def _lucene_query(criteria):
"""Given a dictionary containing search criteria, produce a string
that may be used as a MusicBrainz search query.
"""
query_parts = []
for name, value in criteria.items():
value = _lucene_escape(value).strip().lower()
if value:
query_parts.append(u'%s:(%s)' % (name, value))
return u' '.join(query_parts)
def find_releases(criteria, limit=SEARCH_LIMIT):
"""Get a list of release dictionaries from the MusicBrainz
database that match `criteria`. The latter is a dictionary whose
keys are MusicBrainz field names and whose values are search terms
for those fields.
The field names are from MusicBrainz's Lucene query syntax, which
is detailed here:
http://wiki.musicbrainz.org/Text_Search_Syntax
"""
# Replace special cases.
if 'artist' in criteria:
artist = criteria['artist']
if artist in SPECIAL_CASE_ARTISTS:
del criteria['artist']
criteria['arid'] = SPECIAL_CASE_ARTISTS[artist]
# Build the filter and send the query.
if any(criteria.itervalues()):
query = _lucene_query(criteria)
log.debug('album query: %s' % query)
return get_releases(limit=limit, query=query)
def find_tracks(criteria, limit=SEARCH_LIMIT):
"""Get a sequence of track dictionaries from MusicBrainz that match
`criteria`, a search term dictionary similar to the one passed to
`find_releases`.
"""
if any(criteria.itervalues()):
query = _lucene_query(criteria)
log.debug('track query: %s' % query)
filt = mbws.TrackFilter(limit=limit, query=query)
try:
results = _query_wrap(mbws.Query().getTracks, filter=filt)
except BadResponseError:
results = ()
for result in results:
track = result.track
yield track_dict(track)
def track_dict(track):
"""Produces a dictionary summarizing a MusicBrainz `Track` object.
"""
t = {'title': track.title,
'id': track.id.rsplit('/', 1)[1]}
if track.artist is not None:
# Track artists will only be present for releases with
# multiple artists.
t['artist'] = track.artist.name
t['artist_id'] = track.artist.id.rsplit('/', 1)[1]
if track.duration is not None:
# Duration not always present.
t['length'] = track.duration/(1000.0)
return t
def release_dict(release, tracks=None):
"""Takes a MusicBrainz `Release` object and returns a dictionary
containing the interesting data about that release. A list of
`Track` objects may also be provided as `tracks`; they are then
included in the resulting dictionary.
"""
# Basic info.
out = {'album': release.title,
'album_id': release.id.rsplit('/', 1)[1],
'artist': release.artist.name,
'artist_id': release.artist.id.rsplit('/', 1)[1],
'asin': release.asin,
'albumtype': '',
}
out['va'] = out['artist_id'] == VARIOUS_ARTISTS_ID
# Release type not always populated.
for releasetype in release.types:
if releasetype in RELEASE_TYPES:
out['albumtype'] = releasetype.split('#')[1].lower()
break
# Release date and label.
try:
event = release.getEarliestReleaseEvent()
except:
# The python-musicbrainz2 module has a bug that will raise an
# exception when there is no release date to be found. In this
# case, we just skip adding a release date to the dict.
pass
else:
if event:
# Release date.
date_str = event.getDate()
if date_str:
date_parts = date_str.split('-')
for key in ('year', 'month', 'day'):
if date_parts:
out[key] = int(date_parts.pop(0))
# Label name.
label = event.getLabel()
if label:
out['label'] = label.getName()
# Tracks.
if tracks is not None:
out['tracks'] = map(track_dict, tracks)
return out
def match_album(artist, album, tracks=None, limit=SEARCH_LIMIT):
"""Searches for a single album ("release" in MusicBrainz parlance)
and returns an iterator over dictionaries of information (as
returned by `release_dict`).
The query consists of an artist name, an album name, and,
optionally, a number of tracks on the album.
"""
# Build search criteria.
criteria = {'release': album}
if artist is not None:
criteria['artist'] = artist
else:
# Various Artists search.
criteria['arid'] = VARIOUS_ARTISTS_ID
if tracks is not None:
criteria['tracks'] = str(tracks)
# Search for the release.
return find_releases(criteria)
def match_track(artist, title):
"""Searches for a single track and returns an iterable of track
info dictionaries (as returned by `track_dict`).
"""
return find_tracks({
'artist': artist,
'track': title,
})
def album_for_id(albumid):
"""Fetches an album by its MusicBrainz ID and returns an
information dictionary. If no match is found, returns None.
"""
query = mbws.Query()
try:
album = _query_wrap(query.getReleaseById, albumid, RELEASE_INCLUDES)
except BadResponseError:
return None
except (mbws.ResourceNotFoundError, mbws.RequestError), exc:
log.debug('Album ID match failed: ' + str(exc))
return None
return release_dict(album, album.tracks)
def track_for_id(trackid):
"""Fetches a track by its MusicBrainz ID. Returns a track info
dictionary or None if no track is found.
"""
query = mbws.Query()
try:
track = _query_wrap(query.getTrackById, trackid, TRACK_INCLUDES)
except BadResponseError:
return None
except (mbws.ResourceNotFoundError, mbws.RequestError), exc:
log.debug('Track ID match failed: ' + str(exc))
return None
return track_dict(track)