Lyrics mods

Switched to beets lyrics plugin
Fixes https://github.com/rembo10/headphones/issues/1896
This commit is contained in:
Ade
2014-09-22 15:38:17 +12:00
parent 365d4fa1bf
commit 394876992b
5 changed files with 1167 additions and 25 deletions

View File

@@ -23,9 +23,11 @@ import headphones
from beets import autotag
from beets.mediafile import MediaFile, FileTypeError, UnreadableFileError
from beets import plugins
from beetsplug import lyrics as beetslyrics
from headphones import notifiers, utorrent, transmission
from headphones import db, albumart, librarysync, lyrics
from headphones import db, albumart, librarysync
from headphones import logger, helpers, request, mb, music_encoder
postprocessor_lock = threading.Lock()
@@ -864,7 +866,6 @@ def correctMetadata(albumid, release, downloaded_track_list):
else:
logger.warn("Skipping: %s because it is not a mutagen friendly file format", downloaded_track.decode(headphones.SYS_ENCODING, 'replace'))
except Exception, e:
logger.error("Beets couldn't create an Item from: %s - not a media file? %s", downloaded_track.decode(headphones.SYS_ENCODING, 'replace'), str(e))
for items in [lossy_items, lossless_items]:
@@ -903,35 +904,49 @@ def correctMetadata(albumid, release, downloaded_track_list):
def embedLyrics(downloaded_track_list):
logger.info('Adding lyrics')
# TODO: If adding lyrics for flac & lossy, only fetch the lyrics once
# and apply it to both files
# TODO: If adding lyrics for flac & lossy, only fetch the lyrics once and apply it to both files
# TODO: Get beets to add automatically by enabling the plugin
lossy_items = []
lossless_items = []
lp = beetslyrics.LyricsPlugin()
for downloaded_track in downloaded_track_list:
track_title = downloaded_track.decode(headphones.SYS_ENCODING, 'replace')
try:
f = MediaFile(downloaded_track)
except:
logger.error('Could not read %s. Not checking lyrics', track_title)
if any(downloaded_track.lower().endswith('.' + x.lower()) for x in headphones.LOSSLESS_MEDIA_FORMATS):
lossless_items.append(beets.library.Item.from_path(downloaded_track))
elif any(downloaded_track.lower().endswith('.' + x.lower()) for x in headphones.LOSSY_MEDIA_FORMATS):
lossy_items.append(beets.library.Item.from_path(downloaded_track))
else:
logger.warn("Skipping: %s because it is not a mutagen friendly file format", downloaded_track.decode(headphones.SYS_ENCODING, 'replace'))
except Exception, e:
logger.error("Beets couldn't create an Item from: %s - not a media file? %s", downloaded_track.decode(headphones.SYS_ENCODING, 'replace'), str(e))
for items in [lossy_items, lossless_items]:
if not items:
continue
if f.albumartist and f.title:
metalyrics = lyrics.getLyrics(f.albumartist, f.title)
elif f.artist and f.title:
metalyrics = lyrics.getLyrics(f.artist, f.title)
else:
logger.info('No artist/track metadata found for track: %s. Not fetching lyrics', track_title)
metalyrics = None
for item in items:
if metalyrics:
logger.debug('Adding lyrics to: %s', track_title)
f.lyrics = metalyrics
try:
f.save()
except:
logger.error('Cannot save lyrics to: %s. Skipping', track_title)
continue
else:
logger.debug('No lyrics found for track: %s', track_title)
lyrics = None
for artist, titles in beetslyrics.search_pairs(item):
lyrics = [lp.get_lyrics(artist, title) for title in titles]
if any(lyrics):
break
lyrics = u"\n\n---\n\n".join([l for l in lyrics if l])
if lyrics:
logger.debug('Adding lyrics to: %s', item.title)
item.lyrics = lyrics
try:
item.write()
except Exception, e:
logger.error('Cannot save lyrics to: %s. Skipping', item.title)
else:
logger.debug('No lyrics found for track: %s', item.title)
def renameFiles(albumpath, downloaded_track_list, release):
logger.info('Renaming files')

19
lib/beetsplug/__init__.py Normal file
View File

@@ -0,0 +1,19 @@
# This file is part of beets.
# Copyright 2013, Adrian Sampson.
#
# Permission is hereby granted, free of charge, to any person obtaining
# a copy of this software and associated documentation files (the
# "Software"), to deal in the Software without restriction, including
# without limitation the rights to use, copy, modify, merge, publish,
# distribute, sublicense, and/or sell copies of the Software, and to
# permit persons to whom the Software is furnished to do so, subject to
# the following conditions:
#
# The above copyright notice and this permission notice shall be
# included in all copies or substantial portions of the Software.
"""A namespace package for beets plugins."""
# Make this a namespace package.
from pkgutil import extend_path
__path__ = extend_path(__path__, __name__)

194
lib/beetsplug/embedart.py Normal file
View File

@@ -0,0 +1,194 @@
# This file is part of beets.
# Copyright 2014, Adrian Sampson.
#
# Permission is hereby granted, free of charge, to any person obtaining
# a copy of this software and associated documentation files (the
# "Software"), to deal in the Software without restriction, including
# without limitation the rights to use, copy, modify, merge, publish,
# distribute, sublicense, and/or sell copies of the Software, and to
# permit persons to whom the Software is furnished to do so, subject to
# the following conditions:
#
# The above copyright notice and this permission notice shall be
# included in all copies or substantial portions of the Software.
"""Allows beets to embed album art into file metadata."""
import os.path
import logging
import imghdr
from beets.plugins import BeetsPlugin
from beets import mediafile
from beets import ui
from beets.ui import decargs
from beets.util import syspath, normpath, displayable_path
from beets.util.artresizer import ArtResizer
from beets import config
log = logging.getLogger('beets')
class EmbedCoverArtPlugin(BeetsPlugin):
"""Allows albumart to be embedded into the actual files.
"""
def __init__(self):
super(EmbedCoverArtPlugin, self).__init__()
self.config.add({
'maxwidth': 0,
'auto': True,
})
if self.config['maxwidth'].get(int) and \
not ArtResizer.shared.local:
self.config['maxwidth'] = 0
log.warn(u"embedart: ImageMagick or PIL not found; "
u"'maxwidth' option ignored")
def commands(self):
# Embed command.
embed_cmd = ui.Subcommand(
'embedart', help='embed image files into file metadata'
)
embed_cmd.parser.add_option(
'-f', '--file', metavar='PATH', help='the image file to embed'
)
maxwidth = config['embedart']['maxwidth'].get(int)
def embed_func(lib, opts, args):
if opts.file:
imagepath = normpath(opts.file)
for item in lib.items(decargs(args)):
embed_item(item, imagepath, maxwidth)
else:
for album in lib.albums(decargs(args)):
embed_album(album, maxwidth)
embed_cmd.func = embed_func
# Extract command.
extract_cmd = ui.Subcommand('extractart',
help='extract an image from file metadata')
extract_cmd.parser.add_option('-o', dest='outpath',
help='image output file')
def extract_func(lib, opts, args):
outpath = normpath(opts.outpath or 'cover')
extract(lib, outpath, decargs(args))
extract_cmd.func = extract_func
# Clear command.
clear_cmd = ui.Subcommand('clearart',
help='remove images from file metadata')
def clear_func(lib, opts, args):
clear(lib, decargs(args))
clear_cmd.func = clear_func
return [embed_cmd, extract_cmd, clear_cmd]
@EmbedCoverArtPlugin.listen('album_imported')
def album_imported(lib, album):
"""Automatically embed art into imported albums.
"""
if album.artpath and config['embedart']['auto']:
embed_album(album, config['embedart']['maxwidth'].get(int))
def embed_item(item, imagepath, maxwidth=None, itempath=None):
"""Embed an image into the item's media file.
"""
try:
item['images'] = [_mediafile_image(imagepath, maxwidth)]
item.try_write(itempath)
except IOError as exc:
log.error(u'embedart: could not read image file: {0}'.format(exc))
finally:
# We don't want to store the image in the database
del item['images']
def embed_album(album, maxwidth=None):
"""Embed album art into all of the album's items.
"""
imagepath = album.artpath
if not imagepath:
log.info(u'No album art present: {0} - {1}'.
format(album.albumartist, album.album))
return
if not os.path.isfile(imagepath):
log.error(u'Album art not found at {0}'
.format(imagepath))
return
log.info(u'Embedding album art into {0.albumartist} - {0.album}.'
.format(album))
for item in album.items():
embed_item(item, imagepath, maxwidth)
def _mediafile_image(image_path, maxwidth=None):
"""Return a `mediafile.Image` object for the path.
If maxwidth is set the image is resized if necessary.
"""
if maxwidth:
image_path = ArtResizer.shared.resize(maxwidth, syspath(image_path))
with open(syspath(image_path), 'rb') as f:
data = f.read()
return mediafile.Image(data, type=mediafile.ImageType.front)
# 'extractart' command.
def extract(lib, outpath, query):
item = lib.items(query).get()
if not item:
log.error(u'No item matches query.')
return
# Extract the art.
try:
mf = mediafile.MediaFile(syspath(item.path))
except mediafile.UnreadableFileError as exc:
log.error(u'Could not extract art from {0}: {1}'.format(
displayable_path(item.path), exc
))
return
art = mf.art
if not art:
log.error(u'No album art present in {0} - {1}.'
.format(item.artist, item.title))
return
# Add an extension to the filename.
ext = imghdr.what(None, h=art)
if not ext:
log.error(u'Unknown image type.')
return
outpath += '.' + ext
log.info(u'Extracting album art from: {0.artist} - {0.title}\n'
u'To: {1}'.format(item, displayable_path(outpath)))
with open(syspath(outpath), 'wb') as f:
f.write(art)
# 'clearart' command.
def clear(lib, query):
log.info(u'Clearing album art from items:')
for item in lib.items(query):
log.info(u'{0} - {1}'.format(item.artist, item.title))
try:
mf = mediafile.MediaFile(syspath(item.path),
config['id3v23'].get(bool))
except mediafile.UnreadableFileError as exc:
log.error(u'Could not clear art from {0}: {1}'.format(
displayable_path(item.path), exc
))
continue
mf.art = None
mf.save()

356
lib/beetsplug/fetchart.py Normal file
View File

@@ -0,0 +1,356 @@
# This file is part of beets.
# Copyright 2013, Adrian Sampson.
#
# Permission is hereby granted, free of charge, to any person obtaining
# a copy of this software and associated documentation files (the
# "Software"), to deal in the Software without restriction, including
# without limitation the rights to use, copy, modify, merge, publish,
# distribute, sublicense, and/or sell copies of the Software, and to
# permit persons to whom the Software is furnished to do so, subject to
# the following conditions:
#
# The above copyright notice and this permission notice shall be
# included in all copies or substantial portions of the Software.
"""Fetches album art.
"""
from contextlib import closing
import logging
import os
import re
from tempfile import NamedTemporaryFile
import requests
from beets.plugins import BeetsPlugin
from beets.util.artresizer import ArtResizer
from beets import importer
from beets import ui
from beets import util
from beets import config
IMAGE_EXTENSIONS = ['png', 'jpg', 'jpeg']
CONTENT_TYPES = ('image/jpeg',)
DOWNLOAD_EXTENSION = '.jpg'
log = logging.getLogger('beets')
requests_session = requests.Session()
requests_session.headers = {'User-Agent': 'beets'}
def _fetch_image(url):
"""Downloads an image from a URL and checks whether it seems to
actually be an image. If so, returns a path to the downloaded image.
Otherwise, returns None.
"""
log.debug(u'fetchart: downloading art: {0}'.format(url))
try:
with closing(requests_session.get(url, stream=True)) as resp:
if 'Content-Type' not in resp.headers \
or resp.headers['Content-Type'] not in CONTENT_TYPES:
log.debug(u'fetchart: not an image')
return
# Generate a temporary file with the correct extension.
with NamedTemporaryFile(suffix=DOWNLOAD_EXTENSION, delete=False) \
as fh:
for chunk in resp.iter_content():
fh.write(chunk)
log.debug(u'fetchart: downloaded art to: {0}'.format(
util.displayable_path(fh.name)
))
return fh.name
except (IOError, requests.RequestException):
log.debug(u'fetchart: error fetching art')
# ART SOURCES ################################################################
# Cover Art Archive.
CAA_URL = 'http://coverartarchive.org/release/{mbid}/front-500.jpg'
CAA_GROUP_URL = 'http://coverartarchive.org/release-group/{mbid}/front-500.jpg'
def caa_art(release_id):
"""Return the Cover Art Archive URL given a MusicBrainz release ID.
"""
return CAA_URL.format(mbid=release_id)
def caa_group_art(release_group_id):
"""Return the Cover Art Archive release group URL given a MusicBrainz
release group ID.
"""
return CAA_GROUP_URL.format(mbid=release_group_id)
# Art from Amazon.
AMAZON_URL = 'http://images.amazon.com/images/P/%s.%02i.LZZZZZZZ.jpg'
AMAZON_INDICES = (1, 2)
def art_for_asin(asin):
"""Generate URLs for an Amazon ID (ASIN) string."""
for index in AMAZON_INDICES:
yield AMAZON_URL % (asin, index)
# AlbumArt.org scraper.
AAO_URL = 'http://www.albumart.org/index_detail.php'
AAO_PAT = r'href\s*=\s*"([^>"]*)"[^>]*title\s*=\s*"View larger image"'
def aao_art(asin):
"""Return art URL from AlbumArt.org given an ASIN."""
# Get the page from albumart.org.
try:
resp = requests_session.get(AAO_URL, params={'asin': asin})
log.debug(u'fetchart: scraped art URL: {0}'.format(resp.url))
except requests.RequestException:
log.debug(u'fetchart: error scraping art page')
return
# Search the page for the image URL.
m = re.search(AAO_PAT, resp.text)
if m:
image_url = m.group(1)
return image_url
else:
log.debug(u'fetchart: no image found on page')
# Google Images scraper.
GOOGLE_URL = 'https://ajax.googleapis.com/ajax/services/search/images'
def google_art(album):
"""Return art URL from google.org given an album title and
interpreter.
"""
search_string = (album.albumartist + ',' + album.album).encode('utf-8')
response = requests_session.get(GOOGLE_URL, params={
'v': '1.0',
'q': search_string,
'start': '0',
})
# Get results using JSON.
try:
results = response.json()
data = results['responseData']
dataInfo = data['results']
for myUrl in dataInfo:
return myUrl['unescapedUrl']
except:
log.debug(u'fetchart: error scraping art page')
return
# Art from the filesystem.
def filename_priority(filename, cover_names):
"""Sort order for image names.
Return indexes of cover names found in the image filename. This
means that images with lower-numbered and more keywords will have higher
priority.
"""
return [idx for (idx, x) in enumerate(cover_names) if x in filename]
def art_in_path(path, cover_names, cautious):
"""Look for album art files in a specified directory."""
if not os.path.isdir(path):
return
# Find all files that look like images in the directory.
images = []
for fn in os.listdir(path):
for ext in IMAGE_EXTENSIONS:
if fn.lower().endswith('.' + ext):
images.append(fn)
# Look for "preferred" filenames.
images = sorted(images, key=lambda x: filename_priority(x, cover_names))
cover_pat = r"(\b|_)({0})(\b|_)".format('|'.join(cover_names))
for fn in images:
if re.search(cover_pat, os.path.splitext(fn)[0], re.I):
log.debug(u'fetchart: using well-named art file {0}'.format(
util.displayable_path(fn)
))
return os.path.join(path, fn)
# Fall back to any image in the folder.
if images and not cautious:
log.debug(u'fetchart: using fallback art file {0}'.format(
util.displayable_path(images[0])
))
return os.path.join(path, images[0])
# Try each source in turn.
def _source_urls(album):
"""Generate possible source URLs for an album's art. The URLs are
not guaranteed to work so they each need to be attempted in turn.
This allows the main `art_for_album` function to abort iteration
through this sequence early to avoid the cost of scraping when not
necessary.
"""
# Cover Art Archive.
if album.mb_albumid:
yield caa_art(album.mb_albumid)
if album.mb_releasegroupid:
yield caa_group_art(album.mb_releasegroupid)
# Amazon and AlbumArt.org.
if album.asin:
for url in art_for_asin(album.asin):
yield url
url = aao_art(album.asin)
if url:
yield url
if config['fetchart']['google_search']:
url = google_art(album)
if url:
yield url
def art_for_album(album, paths, maxwidth=None, local_only=False):
"""Given an Album object, returns a path to downloaded art for the
album (or None if no art is found). If `maxwidth`, then images are
resized to this maximum pixel size. If `local_only`, then only local
image files from the filesystem are returned; no network requests
are made.
"""
out = None
# Local art.
cover_names = config['fetchart']['cover_names'].as_str_seq()
cover_names = map(util.bytestring_path, cover_names)
cautious = config['fetchart']['cautious'].get(bool)
if paths:
for path in paths:
out = art_in_path(path, cover_names, cautious)
if out:
break
# Web art sources.
remote_priority = config['fetchart']['remote_priority'].get(bool)
if not local_only and (remote_priority or not out):
for url in _source_urls(album):
if maxwidth:
url = ArtResizer.shared.proxy_url(maxwidth, url)
candidate = _fetch_image(url)
if candidate:
out = candidate
break
if maxwidth and out:
out = ArtResizer.shared.resize(maxwidth, out)
return out
# PLUGIN LOGIC ###############################################################
def batch_fetch_art(lib, albums, force, maxwidth=None):
"""Fetch album art for each of the albums. This implements the manual
fetchart CLI command.
"""
for album in albums:
if album.artpath and not force:
message = 'has album art'
else:
# In ordinary invocations, look for images on the
# filesystem. When forcing, however, always go to the Web
# sources.
local_paths = None if force else [album.path]
path = art_for_album(album, local_paths, maxwidth)
if path:
album.set_art(path, False)
album.store()
message = ui.colorize('green', 'found album art')
else:
message = ui.colorize('red', 'no art found')
log.info(u'{0} - {1}: {2}'.format(album.albumartist, album.album,
message))
class FetchArtPlugin(BeetsPlugin):
def __init__(self):
super(FetchArtPlugin, self).__init__()
self.config.add({
'auto': True,
'maxwidth': 0,
'remote_priority': False,
'cautious': False,
'google_search': False,
'cover_names': ['cover', 'front', 'art', 'album', 'folder'],
})
# Holds paths to downloaded images between fetching them and
# placing them in the filesystem.
self.art_paths = {}
self.maxwidth = self.config['maxwidth'].get(int)
if self.config['auto']:
# Enable two import hooks when fetching is enabled.
self.import_stages = [self.fetch_art]
self.register_listener('import_task_files', self.assign_art)
# Asynchronous; after music is added to the library.
def fetch_art(self, session, task):
"""Find art for the album being imported."""
if task.is_album: # Only fetch art for full albums.
if task.choice_flag == importer.action.ASIS:
# For as-is imports, don't search Web sources for art.
local = True
elif task.choice_flag == importer.action.APPLY:
# Search everywhere for art.
local = False
else:
# For any other choices (e.g., TRACKS), do nothing.
return
path = art_for_album(task.album, task.paths, self.maxwidth, local)
if path:
self.art_paths[task] = path
# Synchronous; after music files are put in place.
def assign_art(self, session, task):
"""Place the discovered art in the filesystem."""
if task in self.art_paths:
path = self.art_paths.pop(task)
album = task.album
src_removed = (config['import']['delete'].get(bool) or
config['import']['move'].get(bool))
album.set_art(path, not src_removed)
album.store()
if src_removed:
task.prune(path)
# Manual album art fetching.
def commands(self):
cmd = ui.Subcommand('fetchart', help='download album art')
cmd.parser.add_option('-f', '--force', dest='force',
action='store_true', default=False,
help='re-download art when already present')
def func(lib, opts, args):
batch_fetch_art(lib, lib.albums(ui.decargs(args)), opts.force,
self.maxwidth)
cmd.func = func
return [cmd]

558
lib/beetsplug/lyrics.py Normal file
View File

@@ -0,0 +1,558 @@
# This file is part of beets.
# Copyright 2014, Adrian Sampson.
#
# Permission is hereby granted, free of charge, to any person obtaining
# a copy of this software and associated documentation files (the
# "Software"), to deal in the Software without restriction, including
# without limitation the rights to use, copy, modify, merge, publish,
# distribute, sublicense, and/or sell copies of the Software, and to
# permit persons to whom the Software is furnished to do so, subject to
# the following conditions:
#
# The above copyright notice and this permission notice shall be
# included in all copies or substantial portions of the Software.
"""Fetches, embeds, and displays lyrics.
"""
from __future__ import print_function
import re
import logging
import urllib
import json
import unicodedata
import difflib
import itertools
from beets.plugins import BeetsPlugin
from beets import ui
from beets import config
# Global logger.
log = logging.getLogger('beets')
DIV_RE = re.compile(r'<(/?)div>?')
COMMENT_RE = re.compile(r'<!--.*-->', re.S)
TAG_RE = re.compile(r'<[^>]*>')
BREAK_RE = re.compile(r'<br\s*/?>')
URL_CHARACTERS = {
u'\u2018': u"'",
u'\u2019': u"'",
u'\u201c': u'"',
u'\u201d': u'"',
u'\u2010': u'-',
u'\u2011': u'-',
u'\u2012': u'-',
u'\u2013': u'-',
u'\u2014': u'-',
u'\u2015': u'-',
u'\u2016': u'-',
u'\u2026': u'...',
}
# Utilities.
def fetch_url(url):
"""Retrieve the content at a given URL, or return None if the source
is unreachable.
"""
try:
return urllib.urlopen(url).read()
except IOError as exc:
log.debug(u'failed to fetch: {0} ({1})'.format(url, unicode(exc)))
return None
def unescape(text):
"""Resolves &#xxx; HTML entities (and some others)."""
if isinstance(text, str):
text = text.decode('utf8', 'ignore')
out = text.replace(u'&nbsp;', u' ')
def replchar(m):
num = m.group(1)
return unichr(int(num))
out = re.sub(u"&#(\d+);", replchar, out)
return out
def extract_text(html, starttag):
"""Extract the text from a <DIV> tag in the HTML starting with
``starttag``. Returns None if parsing fails.
"""
# Strip off the leading text before opening tag.
try:
_, html = html.split(starttag, 1)
except ValueError:
return
# Walk through balanced DIV tags.
level = 0
parts = []
pos = 0
for match in DIV_RE.finditer(html):
if match.group(1): # Closing tag.
level -= 1
if level == 0:
pos = match.end()
else: # Opening tag.
if level == 0:
parts.append(html[pos:match.start()])
level += 1
if level == -1:
parts.append(html[pos:match.start()])
break
else:
print('no closing tag found!')
return
lyrics = ''.join(parts)
return strip_cruft(lyrics)
def strip_cruft(lyrics, wscollapse=True):
"""Clean up HTML from an extracted lyrics string. For example, <BR>
tags are replaced with newlines.
"""
lyrics = COMMENT_RE.sub('', lyrics)
lyrics = unescape(lyrics)
if wscollapse:
lyrics = re.sub(r'\s+', ' ', lyrics) # Whitespace collapse.
lyrics = re.sub(r'<(script).*?</\1>(?s)', '', lyrics) # Strip script tags.
lyrics = BREAK_RE.sub('\n', lyrics) # <BR> newlines.
lyrics = re.sub(r'\n +', '\n', lyrics)
lyrics = re.sub(r' +\n', '\n', lyrics)
lyrics = TAG_RE.sub('', lyrics) # Strip remaining HTML tags.
lyrics = lyrics.replace('\r', '\n')
lyrics = lyrics.strip()
return lyrics
def search_pairs(item):
"""Yield a pairs of artists and titles to search for.
The first item in the pair is the name of the artist, the second
item is a list of song names.
In addition to the artist and title obtained from the `item` the
method tries to strip extra information like paranthesized suffixes
and featured artists from the strings and add them as caniddates.
The method also tries to split multiple titles separated with `/`.
"""
title, artist = item.title, item.artist
titles = [title]
artists = [artist]
# Remove any featuring artists from the artists name
pattern = r"(.*?) (&|\b(and|ft|feat(uring)?\b))"
match = re.search(pattern, artist, re.IGNORECASE)
if match:
artists.append(match.group(1))
# Remove a parenthesized suffix from a title string. Common
# examples include (live), (remix), and (acoustic).
pattern = r"(.+?)\s+[(].*[)]$"
match = re.search(pattern, title, re.IGNORECASE)
if match:
titles.append(match.group(1))
# Remove any featuring artists from the title
pattern = r"(.*?) \b(ft|feat(uring)?)\b"
for title in titles:
match = re.search(pattern, title, re.IGNORECASE)
if match:
titles.append(match.group(1))
# Check for a dual song (e.g. Pink Floyd - Speak to Me / Breathe)
# and each of them.
multi_titles = []
for title in titles:
multi_titles.append([title])
if '/' in title:
multi_titles.append([x.strip() for x in title.split('/')])
return itertools.product(artists, multi_titles)
def _encode(s):
"""Encode the string for inclusion in a URL (common to both
LyricsWiki and Lyrics.com).
"""
if isinstance(s, unicode):
for char, repl in URL_CHARACTERS.items():
s = s.replace(char, repl)
s = s.encode('utf8', 'ignore')
return urllib.quote(s)
# LyricsWiki.
LYRICSWIKI_URL_PATTERN = 'http://lyrics.wikia.com/%s:%s'
def _lw_encode(s):
s = re.sub(r'\s+', '_', s)
s = s.replace("<", "Less_Than")
s = s.replace(">", "Greater_Than")
s = s.replace("#", "Number_")
s = re.sub(r'[\[\{]', '(', s)
s = re.sub(r'[\]\}]', ')', s)
return _encode(s)
def fetch_lyricswiki(artist, title):
"""Fetch lyrics from LyricsWiki."""
url = LYRICSWIKI_URL_PATTERN % (_lw_encode(artist), _lw_encode(title))
html = fetch_url(url)
if not html:
return
lyrics = extract_text(html, "<div class='lyricbox'>")
if lyrics and 'Unfortunately, we are not licensed' not in lyrics:
return lyrics
# Lyrics.com.
LYRICSCOM_URL_PATTERN = 'http://www.lyrics.com/%s-lyrics-%s.html'
LYRICSCOM_NOT_FOUND = (
'Sorry, we do not have the lyric',
'Submit Lyrics',
)
def _lc_encode(s):
s = re.sub(r'[^\w\s-]', '', s)
s = re.sub(r'\s+', '-', s)
return _encode(s).lower()
def fetch_lyricscom(artist, title):
"""Fetch lyrics from Lyrics.com."""
url = LYRICSCOM_URL_PATTERN % (_lc_encode(title), _lc_encode(artist))
html = fetch_url(url)
if not html:
return
lyrics = extract_text(html, '<div id="lyric_space">')
if not lyrics:
return
for not_found_str in LYRICSCOM_NOT_FOUND:
if not_found_str in lyrics:
return
parts = lyrics.split('\n---\nLyrics powered by', 1)
if parts:
return parts[0]
# Optional Google custom search API backend.
def slugify(text):
"""Normalize a string and remove non-alphanumeric characters.
"""
text = re.sub(r"[-'_\s]", '_', text)
text = re.sub(r"_+", '_', text).strip('_')
pat = "([^,\(]*)\((.*?)\)" # Remove content within parentheses
text = re.sub(pat, '\g<1>', text).strip()
try:
text = unicodedata.normalize('NFKD', text).encode('ascii', 'ignore')
text = unicode(re.sub('[-\s]+', ' ', text))
except UnicodeDecodeError:
log.exception(u"Failing to normalize '{0}'".format(text))
return text
BY_TRANS = ['by', 'par', 'de', 'von']
LYRICS_TRANS = ['lyrics', 'paroles', 'letras', 'liedtexte']
def is_page_candidate(urlLink, urlTitle, title, artist):
"""Return True if the URL title makes it a good candidate to be a
page that contains lyrics of title by artist.
"""
title = slugify(title.lower())
artist = slugify(artist.lower())
sitename = re.search(u"//([^/]+)/.*", slugify(urlLink.lower())).group(1)
urlTitle = slugify(urlTitle.lower())
# Check if URL title contains song title (exact match)
if urlTitle.find(title) != -1:
return True
# or try extracting song title from URL title and check if
# they are close enough
tokens = [by + '_' + artist for by in BY_TRANS] + \
[artist, sitename, sitename.replace('www.', '')] + LYRICS_TRANS
songTitle = re.sub(u'(%s)' % u'|'.join(tokens), u'', urlTitle)
typoRatio = .8
return difflib.SequenceMatcher(None, songTitle, title).ratio() >= typoRatio
def insert_line_feeds(text):
"""Insert newlines before upper-case characters.
"""
tokensStr = re.split("([a-z][A-Z])", text)
for idx in range(1, len(tokensStr), 2):
ltoken = list(tokensStr[idx])
tokensStr[idx] = ltoken[0] + '\n' + ltoken[1]
return ''.join(tokensStr)
def sanitize_lyrics(text):
"""Clean text, returning raw lyrics as output or None if it happens
that input text is actually not lyrics content. Clean (x)html tags
in text, correct layout and syntax...
"""
text = strip_cruft(text, False)
# Restore \n in input text
if '\n' not in text:
text = insert_line_feeds(text)
while text.count('\n\n') > text.count('\n') // 4:
# Remove first occurrence of \n for each sequence of \n
text = re.sub(r'\n(\n+)', '\g<1>', text)
text = re.sub(r'\n\n+', '\n\n', text) # keep at most two \n in a row
return text
def remove_credits(text):
"""Remove first/last line of text if it contains the word 'lyrics'
eg 'Lyrics by songsdatabase.com'
"""
textlines = text.split('\n')
credits = None
for i in (0, -1):
if textlines and 'lyrics' in textlines[i].lower():
credits = textlines.pop(i)
if credits:
text = '\n'.join(textlines)
return text
def is_lyrics(text, artist=None):
"""Determine whether the text seems to be valid lyrics.
"""
if not text:
return
badTriggersOcc = []
nbLines = text.count('\n')
if nbLines <= 1:
log.debug(u"Ignoring too short lyrics '{0}'".format(text))
return 0
elif nbLines < 5:
badTriggersOcc.append('too_short')
else:
# Lyrics look legit, remove credits to avoid being penalized further
# down
text = remove_credits(text)
badTriggers = ['lyrics', 'copyright', 'property']
if artist:
badTriggersOcc += [artist]
for item in badTriggers:
badTriggersOcc += [item] * len(re.findall(r'\W%s\W' % item,
text, re.I))
if badTriggersOcc:
log.debug(u'Bad triggers detected: {0}'.format(badTriggersOcc))
return len(badTriggersOcc) < 2
def scrape_lyrics_from_url(url):
"""Scrape lyrics from a URL. If no lyrics can be found, return None
instead.
"""
from bs4 import BeautifulSoup, Comment
html = fetch_url(url)
if not html:
return None
soup = BeautifulSoup(html)
for tag in soup.findAll('br'):
tag.replaceWith('\n')
# Remove non relevant html parts
[s.extract() for s in soup(['head', 'script'])]
comments = soup.findAll(text=lambda text: isinstance(text, Comment))
[s.extract() for s in comments]
try:
for tag in soup.findAll(True):
tag.name = 'p' # keep tag contents
except Exception, e:
log.debug(u'Error {0} when replacing containing marker by p marker'
.format(e, exc_info=True))
# Make better soup from current soup! The previous unclosed <p> sections
# are now closed. Use str() rather than prettify() as it's more
# conservative concerning EOL
soup = BeautifulSoup(str(soup))
# In case lyrics are nested in no markup but <body>
# Insert the whole body in a <p>
bodyTag = soup.find('body')
if bodyTag:
pTag = soup.new_tag("p")
bodyTag.parent.insert(0, pTag)
pTag.insert(0, bodyTag)
tagTokens = []
for tag in soup.findAll('p'):
soup2 = BeautifulSoup(str(tag))
# Extract all text of <p> section.
tagTokens += soup2.findAll(text=True)
if tagTokens:
# Lyrics are expected to be the longest paragraph
tagTokens = sorted(tagTokens, key=len, reverse=True)
soup = BeautifulSoup(tagTokens[0])
return unescape(tagTokens[0].strip("\n\r: "))
def fetch_google(artist, title):
"""Fetch lyrics from Google search results.
"""
query = u"%s %s" % (artist, title)
api_key = config['lyrics']['google_API_key'].get(unicode)
engine_id = config['lyrics']['google_engine_ID'].get(unicode)
url = u'https://www.googleapis.com/customsearch/v1?key=%s&cx=%s&q=%s' % \
(api_key, engine_id, urllib.quote(query.encode('utf8')))
data = urllib.urlopen(url)
data = json.load(data)
if 'error' in data:
reason = data['error']['errors'][0]['reason']
log.debug(u'google lyrics backend error: {0}'.format(reason))
return
if 'items' in data.keys():
for item in data['items']:
urlLink = item['link']
urlTitle = item['title']
if not is_page_candidate(urlLink, urlTitle, title, artist):
continue
lyrics = scrape_lyrics_from_url(urlLink)
if not lyrics:
continue
lyrics = sanitize_lyrics(lyrics)
if is_lyrics(lyrics, artist):
log.debug(u'got lyrics from {0}'.format(item['displayLink']))
return lyrics
# Plugin logic.
class LyricsPlugin(BeetsPlugin):
def __init__(self):
super(LyricsPlugin, self).__init__()
self.import_stages = [self.imported]
self.config.add({
'auto': True,
'google_API_key': None,
'google_engine_ID': u'009217259823014548361:lndtuqkycfu',
'fallback': None,
})
self.backends = [fetch_lyricswiki, fetch_lyricscom]
if self.config['google_API_key'].get():
self.backends.insert(0, fetch_google)
def commands(self):
cmd = ui.Subcommand('lyrics', help='fetch song lyrics')
cmd.parser.add_option('-p', '--print', dest='printlyr',
action='store_true', default=False,
help='print lyrics to console')
cmd.parser.add_option('-f', '--force', dest='force_refetch',
action='store_true', default=False,
help='always re-download lyrics')
def func(lib, opts, args):
# The "write to files" option corresponds to the
# import_write config value.
write = config['import']['write'].get(bool)
for item in lib.items(ui.decargs(args)):
self.fetch_item_lyrics(lib, logging.INFO, item, write,
opts.force_refetch)
if opts.printlyr and item.lyrics:
ui.print_(item.lyrics)
cmd.func = func
return [cmd]
def imported(self, session, task):
"""Import hook for fetching lyrics automatically.
"""
if self.config['auto']:
for item in task.imported_items():
self.fetch_item_lyrics(session.lib, logging.DEBUG, item,
False, False)
def fetch_item_lyrics(self, lib, loglevel, item, write, force):
"""Fetch and store lyrics for a single item. If ``write``, then the
lyrics will also be written to the file itself. The ``loglevel``
parameter controls the visibility of the function's status log
messages.
"""
# Skip if the item already has lyrics.
if not force and item.lyrics:
log.log(loglevel, u'lyrics already present: {0} - {1}'
.format(item.artist, item.title))
return
lyrics = None
for artist, titles in search_pairs(item):
lyrics = [self.get_lyrics(artist, title) for title in titles]
if any(lyrics):
break
lyrics = u"\n\n---\n\n".join([l for l in lyrics if l])
if lyrics:
log.log(loglevel, u'fetched lyrics: {0} - {1}'
.format(item.artist, item.title))
else:
log.log(loglevel, u'lyrics not found: {0} - {1}'
.format(item.artist, item.title))
fallback = self.config['fallback'].get()
if fallback:
lyrics = fallback
else:
return
item.lyrics = lyrics
if write:
item.try_write()
item.store()
def get_lyrics(self, artist, title):
"""Fetch lyrics, trying each source in turn. Return a string or
None if no lyrics were found.
"""
for backend in self.backends:
lyrics = backend(artist, title)
if lyrics:
if isinstance(lyrics, str):
lyrics = lyrics.decode('utf8', 'ignore')
log.debug(u'got lyrics from backend: {0}'
.format(backend.__name__))
return lyrics.strip()