Changed findArtist to use the musicbrainzngs library, the output is identical.

(old musicbrainz 2 library is deprecated and broken)
This commit is contained in:
Patrick Speiser
2012-05-26 22:38:58 +02:00
parent c83477d541
commit f4e2d1eba0
6 changed files with 1171 additions and 994 deletions

View File

@@ -13,6 +13,8 @@ import headphones
from headphones import logger, db
from headphones.helpers import multikeysort, replace_all
import lib.musicbrainzngs as musicbrainzngs
mb_lock = threading.Lock()
@@ -20,500 +22,502 @@ mb_lock = threading.Lock()
# being used, so we can send those values to the log
def startmb(forcemb=False):
mbuser = None
mbpass = None
# Can use headphones mirror for queries
if headphones.MIRROR == "headphones" or "custom":
forcemb=False
if forcemb or headphones.MIRROR == "musicbrainz.org":
mbhost = "musicbrainz.org"
mbport = 80
sleepytime = 1
elif headphones.MIRROR == "custom":
mbhost = headphones.CUSTOMHOST
mbport = int(headphones.CUSTOMPORT)
sleepytime = int(headphones.CUSTOMSLEEP)
elif headphones.MIRROR == "headphones":
mbhost = "178.63.142.150"
mbport = 8181
mbuser = headphones.HPUSER
mbpass = headphones.HPPASS
sleepytime = 0
else:
mbhost = "tbueter.com"
mbport = 5000
sleepytime = 0
service = ws.WebService(host=mbhost, port=mbport, username=mbuser, password=mbpass, mirror=headphones.MIRROR)
q = ws.Query(service)
logger.debug('Using the following server values:\nMBHost: %s ; MBPort: %i ; Sleep Interval: %i ' % (mbhost, mbport, sleepytime))
return (q, sleepytime)
mbuser = None
mbpass = None
# Can use headphones mirror for queries
if headphones.MIRROR == "headphones" or "custom":
forcemb=False
if forcemb or headphones.MIRROR == "musicbrainz.org":
mbhost = "musicbrainz.org"
mbport = 80
sleepytime = 1
elif headphones.MIRROR == "custom":
mbhost = headphones.CUSTOMHOST
mbport = int(headphones.CUSTOMPORT)
sleepytime = int(headphones.CUSTOMSLEEP)
elif headphones.MIRROR == "headphones":
mbhost = "178.63.142.150"
mbport = 8181
mbuser = headphones.HPUSER
mbpass = headphones.HPPASS
sleepytime = 0
else:
mbhost = "tbueter.com"
mbport = 5000
sleepytime = 0
musicbrainzngs.set_useragent("headphones","0.0","https://github.com/doskir/headphones")
logger.info("set user agent")
musicbrainzngs.set_hostname(mbhost + ":" + str(mbport))
logger.info("set host and port")
#q = musicbrainzngs
service = ws.WebService(host=mbhost, port=mbport, username=mbuser, password=mbpass, mirror=headphones.MIRROR)
q = ws.Query(service)
logger.debug('Using the following server values:\nMBHost: %s ; MBPort: %i ; Sleep Interval: %i ' % (mbhost, mbport, sleepytime))
return (q, sleepytime)
def findArtist(name, limit=1):
with mb_lock:
artistlist = []
attempt = 0
artistResults = None
chars = set('!?*')
if any((c in chars) for c in name):
name = '"'+name+'"'
q, sleepytime = startmb(forcemb=True)
while attempt < 5:
try:
artistResults = q.getArtists(ws.ArtistFilter(query=name, limit=limit))
break
except WebServiceError, e:
logger.warn('Attempt to query MusicBrainz for %s failed (%s)' % (name, str(e)))
attempt += 1
time.sleep(10)
time.sleep(sleepytime)
if not artistResults:
return False
for result in artistResults:
if result.artist.name != result.artist.getUniqueName() and limit == 1:
logger.debug('Found an artist with a disambiguation: %s - doing an album based search' % name)
artistdict = findArtistbyAlbum(name)
if not artistdict:
logger.debug('Cannot determine the best match from an artist/album search. Using top match instead')
artistlist.append({
'name': result.artist.name,
'uniquename': result.artist.getUniqueName(),
'id': u.extractUuid(result.artist.id),
'url': result.artist.id,
'score': result.score
})
else:
artistlist.append(artistdict)
else:
artistlist.append({
'name': result.artist.name,
'uniquename': result.artist.getUniqueName(),
'id': u.extractUuid(result.artist.id),
'url': result.artist.id,
'score': result.score
})
return artistlist
with mb_lock:
limit = 25
artistlist = []
attempt = 0
artistResults = None
chars = set('!?*')
if any((c in chars) for c in name):
name = '"'+name+'"'
q, sleepytime = startmb(forcemb=True)
while attempt < 5:
try:
artistResults = musicbrainzngs.search_artists(query=name,limit=limit)['artist-list']
break
except WebServiceError, e:#need to update the exceptions
logger.warn('Attempt to query MusicBrainz for %s failed (%s)' % (name, str(e)))
attempt += 1
time.sleep(10)
time.sleep(sleepytime)
if not artistResults:
return False
for result in artistResults:
if 'disambiguation' in result:
uniquename = unicode(result['sort-name'] + " (" + result['disambiguation'] + ")")
else:
uniquename = unicode(result['sort-name'])
if result['name'] != uniquename and limit == 1:
logger.debug('Found an artist with a disambiguation: %s - doing an album based search' % name)
artistdict = findArtistbyAlbum(name)
if not artistdict:
logger.debug('Cannot determine the best match from an artist/album search. Using top match instead')
artistlist.append({
'name': unicode(result['sort-name']),
'uniquename': uniquename,
'id': unicode(result['id']),
'url': unicode("http://musicbrainz.org/artist/" + result['id']),#probably needs to be changed
'score': int(result['ext:score'])
})
else:
artistlist.append(artistdict)
else:
artistlist.append({
'name': unicode(result['sort-name']),
'uniquename': uniquename,
'id': unicode(result['id']),
'url': unicode("http://musicbrainz.org/artist/" + result['id']),#probably needs to be changed
'score': int(result['ext:score'])
})
return artistlist
def findRelease(name, limit=1):
with mb_lock:
releaselist = []
attempt = 0
releaseResults = None
chars = set('!?')
if any((c in chars) for c in name):
name = '"'+name+'"'
q, sleepytime = startmb(forcemb=True)
while attempt < 5:
try:
releaseResults = q.getReleases(ws.ReleaseFilter(query=name, limit=limit))
break
except WebServiceError, e:
logger.warn('Attempt to query MusicBrainz for "%s" failed: %s' % (name, str(e)))
attempt += 1
time.sleep(10)
time.sleep(sleepytime)
if not releaseResults:
return False
for result in releaseResults:
releaselist.append({
'uniquename': result.release.artist.name,
'title': result.release.title,
'id': u.extractUuid(result.release.artist.id),
'albumid': u.extractUuid(result.release.id),
'url': result.release.artist.id,
'albumurl': result.release.id,
'score': result.score
})
return releaselist
with mb_lock:
releaselist = []
attempt = 0
releaseResults = None
chars = set('!?')
if any((c in chars) for c in name):
name = '"'+name+'"'
q, sleepytime = startmb(forcemb=True)
while attempt < 5:
try:
releaseResults = q.getReleases(ws.ReleaseFilter(query=name, limit=limit))
break
except WebServiceError, e:
logger.warn('Attempt to query MusicBrainz for "%s" failed: %s' % (name, str(e)))
attempt += 1
time.sleep(10)
time.sleep(sleepytime)
if not releaseResults:
return False
for result in releaseResults:
releaselist.append({
'uniquename': result.release.artist.name,
'title': result.release.title,
'id': u.extractUuid(result.release.artist.id),
'albumid': u.extractUuid(result.release.id),
'url': result.release.artist.id,
'albumurl': result.release.id,
'score': result.score
})
return releaselist
def getArtist(artistid, extrasonly=False):
with mb_lock:
artist_dict = {}
#Get all official release groups
inc = ws.ArtistIncludes(releases=(m.Release.TYPE_OFFICIAL, m.Release.TYPE_ALBUM), releaseGroups=True)
artist = None
attempt = 0
q, sleepytime = startmb()
while attempt < 5:
try:
artist = q.getArtistById(artistid, inc)
break
except WebServiceError, e:
logger.warn('Attempt to retrieve artist information from MusicBrainz failed for artistid: %s (%s)' % (artistid, str(e)))
attempt += 1
time.sleep(5)
if not artist:
return False
time.sleep(sleepytime)
artist_dict['artist_name'] = artist.name
artist_dict['artist_sortname'] = artist.sortName
artist_dict['artist_uniquename'] = artist.getUniqueName()
artist_dict['artist_type'] = u.extractFragment(artist.type)
artist_dict['artist_begindate'] = artist.beginDate
artist_dict['artist_enddate'] = artist.endDate
releasegroups = []
if not extrasonly:
for rg in artist.getReleaseGroups():
releasegroups.append({
'title': rg.title,
'id': u.extractUuid(rg.id),
'url': rg.id,
'type': u.getReleaseTypeName(rg.type)
})
# See if we need to grab extras
myDB = db.DBConnection()
with mb_lock:
artist_dict = {}
#Get all official release groups
inc = ws.ArtistIncludes(releases=(m.Release.TYPE_OFFICIAL, m.Release.TYPE_ALBUM), releaseGroups=True)
artist = None
attempt = 0
q, sleepytime = startmb()
while attempt < 5:
try:
artist = q.getArtistById(artistid, inc)
break
except WebServiceError, e:
logger.warn('Attempt to retrieve artist information from MusicBrainz failed for artistid: %s (%s)' % (artistid, str(e)))
attempt += 1
time.sleep(5)
if not artist:
return False
time.sleep(sleepytime)
artist_dict['artist_name'] = artist.name
artist_dict['artist_sortname'] = artist.sortName
artist_dict['artist_uniquename'] = artist.getUniqueName()
artist_dict['artist_type'] = u.extractFragment(artist.type)
artist_dict['artist_begindate'] = artist.beginDate
artist_dict['artist_enddate'] = artist.endDate
releasegroups = []
if not extrasonly:
for rg in artist.getReleaseGroups():
releasegroups.append({
'title': rg.title,
'id': u.extractUuid(rg.id),
'url': rg.id,
'type': u.getReleaseTypeName(rg.type)
})
# See if we need to grab extras
myDB = db.DBConnection()
try:
includeExtras = myDB.select('SELECT IncludeExtras from artists WHERE ArtistID=?', [artistid])[0][0]
except IndexError:
includeExtras = False
if includeExtras or headphones.INCLUDE_EXTRAS:
includes = [m.Release.TYPE_COMPILATION, m.Release.TYPE_REMIX, m.Release.TYPE_SINGLE, m.Release.TYPE_LIVE, m.Release.TYPE_EP, m.Release.TYPE_SOUNDTRACK]
for include in includes:
inc = ws.ArtistIncludes(releases=(m.Release.TYPE_OFFICIAL, include), releaseGroups=True)
artist = None
attempt = 0
while attempt < 5:
try:
artist = q.getArtistById(artistid, inc)
break
except WebServiceError, e:
logger.warn('Attempt to retrieve artist information from MusicBrainz failed for artistid: %s (%s)' % (artistid, str(e)))
attempt += 1
time.sleep(5)
if not artist:
continue
for rg in artist.getReleaseGroups():
releasegroups.append({
'title': rg.title,
'id': u.extractUuid(rg.id),
'url': rg.id,
'type': u.getReleaseTypeName(rg.type)
})
artist_dict['releasegroups'] = releasegroups
return artist_dict
try:
includeExtras = myDB.select('SELECT IncludeExtras from artists WHERE ArtistID=?', [artistid])[0][0]
except IndexError:
includeExtras = False
if includeExtras or headphones.INCLUDE_EXTRAS:
includes = [m.Release.TYPE_COMPILATION, m.Release.TYPE_REMIX, m.Release.TYPE_SINGLE, m.Release.TYPE_LIVE, m.Release.TYPE_EP, m.Release.TYPE_SOUNDTRACK]
for include in includes:
inc = ws.ArtistIncludes(releases=(m.Release.TYPE_OFFICIAL, include), releaseGroups=True)
artist = None
attempt = 0
while attempt < 5:
try:
artist = q.getArtistById(artistid, inc)
break
except WebServiceError, e:
logger.warn('Attempt to retrieve artist information from MusicBrainz failed for artistid: %s (%s)' % (artistid, str(e)))
attempt += 1
time.sleep(5)
if not artist:
continue
for rg in artist.getReleaseGroups():
releasegroups.append({
'title': rg.title,
'id': u.extractUuid(rg.id),
'url': rg.id,
'type': u.getReleaseTypeName(rg.type)
})
artist_dict['releasegroups'] = releasegroups
return artist_dict
def getReleaseGroup(rgid):
"""
Returns a dictionary of the best stuff from a release group
"""
with mb_lock:
releaselist = []
inc = ws.ReleaseGroupIncludes(releases=True, artist=True)
releaseGroup = None
attempt = 0
q, sleepytime = startmb()
while attempt < 5:
try:
releaseGroup = q.getReleaseGroupById(rgid, inc)
break
except WebServiceError, e:
logger.warn('Attempt to retrieve information from MusicBrainz for release group "%s" failed (%s)' % (rgid, str(e)))
attempt += 1
time.sleep(5)
if not releaseGroup:
return False
time.sleep(sleepytime)
# I think for now we have to make separate queries for each release, in order
# to get more detailed release info (ASIN, track count, etc.)
for release in releaseGroup.releases:
inc = ws.ReleaseIncludes(tracks=True, releaseEvents=True)
releaseResult = None
attempt = 0
while attempt < 5:
try:
releaseResult = q.getReleaseById(release.id, inc)
break
except WebServiceError, e:
logger.warn('Attempt to retrieve release information for %s from MusicBrainz failed (%s)' % (releaseResult.title, str(e)))
attempt += 1
time.sleep(5)
if not releaseResult:
continue
# Release filter for non-official live albums
types = releaseResult.getTypes()
if any('Live' in type for type in types):
if not any('Official' in type for type in types):
logger.debug('%s is not an official live album. Skipping' % releaseResult.name)
continue
time.sleep(sleepytime)
formats = {
'2xVinyl': '2',
'Vinyl': '2',
'CD': '0',
'Cassette': '3',
'2xCD': '1',
'Digital Media': '0'
}
country = {
'US': '0',
'GB': '1',
'JP': '2',
}
"""
Returns a dictionary of the best stuff from a release group
"""
with mb_lock:
releaselist = []
inc = ws.ReleaseGroupIncludes(releases=True, artist=True)
releaseGroup = None
attempt = 0
q, sleepytime = startmb()
while attempt < 5:
try:
releaseGroup = q.getReleaseGroupById(rgid, inc)
break
except WebServiceError, e:
logger.warn('Attempt to retrieve information from MusicBrainz for release group "%s" failed (%s)' % (rgid, str(e)))
attempt += 1
time.sleep(5)
if not releaseGroup:
return False
time.sleep(sleepytime)
# I think for now we have to make separate queries for each release, in order
# to get more detailed release info (ASIN, track count, etc.)
for release in releaseGroup.releases:
inc = ws.ReleaseIncludes(tracks=True, releaseEvents=True)
releaseResult = None
attempt = 0
while attempt < 5:
try:
releaseResult = q.getReleaseById(release.id, inc)
break
except WebServiceError, e:
logger.warn('Attempt to retrieve release information for %s from MusicBrainz failed (%s)' % (releaseResult.title, str(e)))
attempt += 1
time.sleep(5)
if not releaseResult:
continue
# Release filter for non-official live albums
types = releaseResult.getTypes()
if any('Live' in type for type in types):
if not any('Official' in type for type in types):
logger.debug('%s is not an official live album. Skipping' % releaseResult.name)
continue
time.sleep(sleepytime)
formats = {
'2xVinyl': '2',
'Vinyl': '2',
'CD': '0',
'Cassette': '3',
'2xCD': '1',
'Digital Media': '0'
}
country = {
'US': '0',
'GB': '1',
'JP': '2',
}
try:
format = int(replace_all(u.extractFragment(releaseResult.releaseEvents[0].format), formats))
except:
format = 3
try:
country = int(replace_all(releaseResult.releaseEvents[0].country, country))
except:
country = 3
release_dict = {
'hasasin': bool(releaseResult.asin),
'asin': releaseResult.asin,
'trackscount': len(releaseResult.getTracks()),
'releaseid': u.extractUuid(releaseResult.id),
'releasedate': releaseResult.getEarliestReleaseDate(),
'format': format,
'country': country
}
tracks = []
i = 1
for track in releaseResult.tracks:
tracks.append({
'number': i,
'title': track.title,
'id': u.extractUuid(track.id),
'url': track.id,
'duration': track.duration
})
i += 1
release_dict['tracks'] = tracks
releaselist.append(release_dict)
average_tracks = sum(x['trackscount'] for x in releaselist) / float(len(releaselist))
for item in releaselist:
item['trackscount_delta'] = abs(average_tracks - item['trackscount'])
a = multikeysort(releaselist, ['-hasasin', 'country', 'format', 'trackscount_delta'])
release_dict = {'releaseid' :a[0]['releaseid'],
'releasedate' : releaselist[0]['releasedate'],
'trackcount' : a[0]['trackscount'],
'tracks' : a[0]['tracks'],
'asin' : a[0]['asin'],
'releaselist' : releaselist,
'artist_name' : releaseGroup.artist.name,
'artist_id' : u.extractUuid(releaseGroup.artist.id),
'title' : releaseGroup.title,
'type' : u.extractFragment(releaseGroup.type)
}
return release_dict
try:
format = int(replace_all(u.extractFragment(releaseResult.releaseEvents[0].format), formats))
except:
format = 3
try:
country = int(replace_all(releaseResult.releaseEvents[0].country, country))
except:
country = 3
release_dict = {
'hasasin': bool(releaseResult.asin),
'asin': releaseResult.asin,
'trackscount': len(releaseResult.getTracks()),
'releaseid': u.extractUuid(releaseResult.id),
'releasedate': releaseResult.getEarliestReleaseDate(),
'format': format,
'country': country
}
tracks = []
i = 1
for track in releaseResult.tracks:
tracks.append({
'number': i,
'title': track.title,
'id': u.extractUuid(track.id),
'url': track.id,
'duration': track.duration
})
i += 1
release_dict['tracks'] = tracks
releaselist.append(release_dict)
average_tracks = sum(x['trackscount'] for x in releaselist) / float(len(releaselist))
for item in releaselist:
item['trackscount_delta'] = abs(average_tracks - item['trackscount'])
a = multikeysort(releaselist, ['-hasasin', 'country', 'format', 'trackscount_delta'])
release_dict = {'releaseid' :a[0]['releaseid'],
'releasedate' : releaselist[0]['releasedate'],
'trackcount' : a[0]['trackscount'],
'tracks' : a[0]['tracks'],
'asin' : a[0]['asin'],
'releaselist' : releaselist,
'artist_name' : releaseGroup.artist.name,
'artist_id' : u.extractUuid(releaseGroup.artist.id),
'title' : releaseGroup.title,
'type' : u.extractFragment(releaseGroup.type)
}
return release_dict
def getRelease(releaseid):
"""
Deep release search to get track info
"""
with mb_lock:
release = {}
inc = ws.ReleaseIncludes(tracks=True, releaseEvents=True, releaseGroup=True, artist=True)
results = None
attempt = 0
q, sleepytime = startmb()
while attempt < 5:
try:
results = q.getReleaseById(releaseid, inc)
break
except WebServiceError, e:
logger.warn('Attempt to retrieve information from MusicBrainz for release "%s" failed (%s)' % (releaseid, str(e)))
attempt += 1
time.sleep(5)
if not results:
return False
time.sleep(sleepytime)
release['title'] = results.title
release['id'] = u.extractUuid(results.id)
release['asin'] = results.asin
release['date'] = results.getEarliestReleaseDate()
"""
Deep release search to get track info
"""
with mb_lock:
release = {}
inc = ws.ReleaseIncludes(tracks=True, releaseEvents=True, releaseGroup=True, artist=True)
results = None
attempt = 0
q, sleepytime = startmb()
while attempt < 5:
try:
results = q.getReleaseById(releaseid, inc)
break
except WebServiceError, e:
logger.warn('Attempt to retrieve information from MusicBrainz for release "%s" failed (%s)' % (releaseid, str(e)))
attempt += 1
time.sleep(5)
if not results:
return False
time.sleep(sleepytime)
release['title'] = results.title
release['id'] = u.extractUuid(results.id)
release['asin'] = results.asin
release['date'] = results.getEarliestReleaseDate()
rg = results.getReleaseGroup()
if rg:
release['rgid'] = u.extractUuid(rg.id)
release['rg_title'] = rg.title
release['rg_type'] = u.extractFragment(rg.type)
else:
logger.warn("Release " + releaseid + "had no ReleaseGroup associated")
#so we can start with a releaseID from anywhere and get the artist info
#it looks like MB api v1 only returns 1 artist object - 2.0 returns more...
release['artist_name'] = results.artist.name
release['artist_id'] = u.extractUuid(results.artist.id)
tracks = []
i = 1
for track in results.tracks:
tracks.append({
'number': i,
'title': track.title,
'id': u.extractUuid(track.id),
'url': track.id,
'duration': track.duration
})
i += 1
release['tracks'] = tracks
return release
rg = results.getReleaseGroup()
if rg:
release['rgid'] = u.extractUuid(rg.id)
release['rg_title'] = rg.title
release['rg_type'] = u.extractFragment(rg.type)
else:
logger.warn("Release " + releaseid + "had no ReleaseGroup associated")
#so we can start with a releaseID from anywhere and get the artist info
#it looks like MB api v1 only returns 1 artist object - 2.0 returns more...
release['artist_name'] = results.artist.name
release['artist_id'] = u.extractUuid(results.artist.id)
tracks = []
i = 1
for track in results.tracks:
tracks.append({
'number': i,
'title': track.title,
'id': u.extractUuid(track.id),
'url': track.id,
'duration': track.duration
})
i += 1
release['tracks'] = tracks
return release
# Used when there is a disambiguation
def findArtistbyAlbum(name):
myDB = db.DBConnection()
artist = myDB.action('SELECT AlbumTitle from have WHERE ArtistName=? AND AlbumTitle IS NOT NULL ORDER BY RANDOM()', [name]).fetchone()
if not artist:
return False
# Probably not neccessary but just want to double check
if not artist['AlbumTitle']:
return False
term = '"'+artist['AlbumTitle']+'" AND artist:"'+name+'"'
f = ws.ReleaseGroupFilter(query=term, limit=1)
results = None
attempt = 0
q, sleepytime = startmb(forcemb=True)
while attempt < 5:
try:
results = q.getReleaseGroups(f)
break
except WebServiceError, e:
logger.warn('Attempt to query MusicBrainz for %s failed (%s)' % (name, str(e)))
attempt += 1
time.sleep(10)
time.sleep(sleepytime)
if not results:
return False
myDB = db.DBConnection()
artist = myDB.action('SELECT AlbumTitle from have WHERE ArtistName=? AND AlbumTitle IS NOT NULL ORDER BY RANDOM()', [name]).fetchone()
if not artist:
return False
# Probably not neccessary but just want to double check
if not artist['AlbumTitle']:
return False
term = '"'+artist['AlbumTitle']+'" AND artist:"'+name+'"'
f = ws.ReleaseGroupFilter(query=term, limit=1)
results = None
attempt = 0
q, sleepytime = startmb(forcemb=True)
while attempt < 5:
try:
results = q.getReleaseGroups(f)
break
except WebServiceError, e:
logger.warn('Attempt to query MusicBrainz for %s failed (%s)' % (name, str(e)))
attempt += 1
time.sleep(10)
time.sleep(sleepytime)
if not results:
return False
artist_dict = {}
for result in results:
releaseGroup = result.releaseGroup
artist_dict['name'] = releaseGroup.artist.name
artist_dict['uniquename'] = releaseGroup.artist.getUniqueName()
artist_dict['id'] = u.extractUuid(releaseGroup.artist.id)
artist_dict['url'] = releaseGroup.artist.id
artist_dict['score'] = result.score
return artist_dict
artist_dict = {}
for result in results:
releaseGroup = result.releaseGroup
artist_dict['name'] = releaseGroup.artist.name
artist_dict['uniquename'] = releaseGroup.artist.getUniqueName()
artist_dict['id'] = u.extractUuid(releaseGroup.artist.id)
artist_dict['url'] = releaseGroup.artist.id
artist_dict['score'] = result.score
return artist_dict
def findAlbumID(artist=None, album=None):
f = ws.ReleaseGroupFilter(title=album, artistName=artist, limit=1)
results = None
attempt = 0
q, sleepytime = startmb(forcemb=True)
while attempt < 5:
try:
results = q.getReleaseGroups(f)
break
except WebServiceError, e:
logger.warn('Attempt to query MusicBrainz for %s - %s failed (%s)' % (artist, album, str(e)))
attempt += 1
time.sleep(10)
time.sleep(sleepytime)
if not results:
return False
rgid = u.extractUuid(results[0].releaseGroup.id)
return rgid
f = ws.ReleaseGroupFilter(title=album, artistName=artist, limit=1)
results = None
attempt = 0
q, sleepytime = startmb(forcemb=True)
while attempt < 5:
try:
results = q.getReleaseGroups(f)
break
except WebServiceError, e:
logger.warn('Attempt to query MusicBrainz for %s - %s failed (%s)' % (artist, album, str(e)))
attempt += 1
time.sleep(10)
time.sleep(sleepytime)
if not results:
return False
rgid = u.extractUuid(results[0].releaseGroup.id)
return rgid

View File

@@ -1 +1 @@
from musicbrainz import *
from lib.musicbrainzngs.musicbrainz import *

View File

@@ -0,0 +1,62 @@
# -*- coding: utf-8 -*-
# Copyright (c) 2012 Kenneth Reitz.
# Permission to use, copy, modify, and/or distribute this software for any
# purpose with or without fee is hereby granted, provided that the above
# copyright notice and this permission notice appear in all copies.
# THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
# WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
# MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
# ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
# WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
# ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
# OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
"""
pythoncompat
"""
import sys
# -------
# Pythons
# -------
# Syntax sugar.
_ver = sys.version_info
#: Python 2.x?
is_py2 = (_ver[0] == 2)
#: Python 3.x?
is_py3 = (_ver[0] == 3)
# ---------
# Specifics
# ---------
if is_py2:
from StringIO import StringIO
from urllib2 import HTTPPasswordMgr, HTTPDigestAuthHandler, Request,\
HTTPHandler, build_opener, HTTPError, URLError,\
build_opener
from httplib import BadStatusLine, HTTPException
from urlparse import urlunparse
from urllib import urlencode
bytes = str
unicode = unicode
basestring = basestring
elif is_py3:
from io import StringIO
from urllib.request import HTTPPasswordMgr, HTTPDigestAuthHandler, Request,\
HTTPHandler, build_opener
from urllib.error import HTTPError, URLError
from http.client import HTTPException, BadStatusLine
from urllib.parse import urlunparse, urlencode
unicode = str
bytes = bytes
basestring = (str,bytes)

View File

@@ -4,9 +4,11 @@
# See the COPYING file for more information.
import xml.etree.ElementTree as ET
import string
import StringIO
import logging
from lib.musicbrainzngs import compat
from lib.musicbrainzngs import util
try:
from ET import fixtag
except:
@@ -16,7 +18,7 @@ except:
# tag and namespace declaration, if any
if isinstance(tag, ET.QName):
tag = tag.text
namespace_uri, tag = string.split(tag[1:], "}", 1)
namespace_uri, tag = tag[1:].split("}", 1)
prefix = namespaces.get(namespace_uri)
if prefix is None:
prefix = "ns%d" % len(namespaces)
@@ -29,6 +31,7 @@ except:
xmlns = None
return "%s:%s" % (prefix, tag), xmlns
NS_MAP = {"http://musicbrainz.org/ns/mmd-2.0#": "ws2",
"http://musicbrainz.org/ns/ext#-2.0": "ext"}
_log = logging.getLogger("python-musicbrainz-ngs")
@@ -113,9 +116,7 @@ def parse_inner(inner_els, element):
return result
def parse_message(message):
s = message.read()
f = StringIO.StringIO(s)
tree = ET.ElementTree(file=f)
tree = util.bytes_to_elementtree(message)
root = tree.getroot()
result = {}
valid_elements = {"artist": parse_artist,
@@ -176,7 +177,8 @@ def parse_artist_list(al):
def parse_artist(artist):
result = {}
attribs = ["id", "type", "ext:score"]
elements = ["name", "sort-name", "country", "user-rating", "disambiguation"]
elements = ["name", "sort-name", "country", "user-rating",
"disambiguation", "gender", "ipi"]
inner_els = {"life-span": parse_artist_lifespan,
"recording-list": parse_recording_list,
"release-list": parse_release_list,
@@ -199,7 +201,8 @@ def parse_label_list(ll):
def parse_label(label):
result = {}
attribs = ["id", "type", "ext:score"]
elements = ["name", "sort-name", "country", "label-code", "user-rating"]
elements = ["name", "sort-name", "country", "label-code", "user-rating",
"ipi", "disambiguation"]
inner_els = {"life-span": parse_artist_lifespan,
"release-list": parse_release_list,
"tag-list": parse_tag_list,

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,37 @@
# This file is part of the musicbrainzngs library
# Copyright (C) Alastair Porter, Adrian Sampson, and others
# This file is distributed under a BSD-2-Clause type license.
# See the COPYING file for more information.
import sys
import locale
import xml.etree.ElementTree as ET
from . import compat
def _unicode(string, encoding=None):
"""Try to decode byte strings to unicode.
This can only be a guess, but this might be better than failing.
It is safe to use this on numbers or strings that are already unicode.
"""
if isinstance(string, compat.unicode):
unicode_string = string
elif isinstance(string, compat.bytes):
# use given encoding, stdin, preferred until something != None is found
if encoding is None:
encoding = sys.stdin.encoding
if encoding is None:
encoding = locale.getpreferredencoding()
unicode_string = string.decode(encoding, "ignore")
else:
unicode_string = compat.unicode(string)
return unicode_string.replace('\x00', '').strip()
def bytes_to_elementtree(_bytes):
if compat.is_py3:
s = _unicode(_bytes.read(), "utf-8")
else:
s = _bytes.read()
f = compat.StringIO(s)
tree = ET.ElementTree(file=f)
return tree