Deleted some superfluous .py~ files

This commit is contained in:
Remy Varma
2011-11-03 22:12:28 +00:00
parent 5cf995b3dc
commit 5625a2f134
12 changed files with 0 additions and 5699 deletions

View File

@@ -1,19 +0,0 @@
# This file is part of beets.
# Copyright 2011, Adrian Sampson.
#
# Permission is hereby granted, free of charge, to any person obtaining
# a copy of this software and associated documentation files (the
# "Software"), to deal in the Software without restriction, including
# without limitation the rights to use, copy, modify, merge, publish,
# distribute, sublicense, and/or sell copies of the Software, and to
# permit persons to whom the Software is furnished to do so, subject to
# the following conditions:
#
# The above copyright notice and this permission notice shall be
# included in all copies or substantial portions of the Software.
__version__ = '1.0b11'
__author__ = 'Adrian Sampson <adrian@radbox.org>'
import lib.beets.library as beetslibrary
Library = beetslibrary.Library

View File

@@ -1,109 +0,0 @@
# This file is part of beets.
# Copyright 2011, Adrian Sampson.
#
# Permission is hereby granted, free of charge, to any person obtaining
# a copy of this software and associated documentation files (the
# "Software"), to deal in the Software without restriction, including
# without limitation the rights to use, copy, modify, merge, publish,
# distribute, sublicense, and/or sell copies of the Software, and to
# permit persons to whom the Software is furnished to do so, subject to
# the following conditions:
#
# The above copyright notice and this permission notice shall be
# included in all copies or substantial portions of the Software.
"""Facilities for automatically determining files' correct metadata.
"""
import os
import logging
from lib.beets import library, mediafile
from lib.beets.util import sorted_walk
# Parts of external interface.
from .hooks import AlbumInfo, TrackInfo
from .match import AutotagError
from .match import tag_item, tag_album
from .match import RECOMMEND_STRONG, RECOMMEND_MEDIUM, RECOMMEND_NONE
from .match import STRONG_REC_THRESH, MEDIUM_REC_THRESH, REC_GAP_THRESH
# Global logger.
log = logging.getLogger('beets')
# Additional utilities for the main interface.
def albums_in_dir(path):
"""Recursively searches the given directory and returns an iterable
of (path, items) where path is a containing directory and items is
a list of Items that is probably an album. Specifically, any folder
containing any media files is an album.
"""
for root, dirs, files in sorted_walk(path):
# Get a list of items in the directory.
items = []
for filename in files:
try:
i = library.Item.from_path(os.path.join(root, filename))
except mediafile.FileTypeError:
pass
except mediafile.UnreadableFileError:
log.warn('unreadable file: ' + filename)
else:
items.append(i)
# If it's nonempty, yield it.
if items:
yield root, items
def apply_item_metadata(item, track_info):
"""Set an item's metadata from its matched TrackInfo object.
"""
item.artist = track_info.artist
item.title = track_info.title
item.mb_trackid = track_info.track_id
if track_info.artist_id:
item.mb_artistid = track_info.artist_id
# At the moment, the other metadata is left intact (including album
# and track number). Perhaps these should be emptied?
def apply_metadata(items, album_info):
"""Set the items' metadata to match an AlbumInfo object. The list
of items must be ordered.
"""
for index, (item, track_info) in enumerate(zip(items, album_info.tracks)):
# Album, artist, track count.
if track_info.artist:
item.artist = track_info.artist
else:
item.artist = album_info.artist
item.albumartist = album_info.artist
item.album = album_info.album
item.tracktotal = len(items)
# Release date.
if album_info.year:
item.year = album_info.year
if album_info.month:
item.month = album_info.month
if album_info.day:
item.day = album_info.day
# Title and track index.
item.title = track_info.title
item.track = index + 1
# MusicBrainz IDs.
item.mb_trackid = track_info.track_id
item.mb_albumid = album_info.album_id
if track_info.artist_id:
item.mb_artistid = track_info.artist_id
else:
item.mb_artistid = album_info.artist_id
item.mb_albumartistid = album_info.artist_id
item.albumtype = album_info.albumtype
if album_info.label:
item.label = album_info.label
# Compilation flag.
item.comp = album_info.va

View File

@@ -1,113 +0,0 @@
# This file is part of beets.
# Copyright 2011, Adrian Sampson.
#
# Permission is hereby granted, free of charge, to any person obtaining
# a copy of this software and associated documentation files (the
# "Software"), to deal in the Software without restriction, including
# without limitation the rights to use, copy, modify, merge, publish,
# distribute, sublicense, and/or sell copies of the Software, and to
# permit persons to whom the Software is furnished to do so, subject to
# the following conditions:
#
# The above copyright notice and this permission notice shall be
# included in all copies or substantial portions of the Software.
"""Finding album art for tagged albums."""
import urllib
import sys
import logging
import os
from beets.autotag.mb import album_for_id
IMAGE_EXTENSIONS = ['png', 'jpg', 'jpeg']
COVER_NAMES = ['cover', 'front', 'art', 'album', 'folder']
# The common logger.
log = logging.getLogger('beets')
# Art from Amazon.
AMAZON_URL = 'http://images.amazon.com/images/P/%s.%02i.LZZZZZZZ.jpg'
AMAZON_INDICES = (1,2)
AMAZON_CONTENT_TYPE = 'image/jpeg'
def art_for_asin(asin):
"""Fetches art for an Amazon ID (ASIN) string."""
for index in AMAZON_INDICES:
# Fetch the image.
url = AMAZON_URL % (asin, index)
try:
log.debug('Downloading art: %s' % url)
fn, headers = urllib.urlretrieve(url)
except IOError:
log.debug('error fetching art at URL %s' % url)
continue
# Make sure it's actually an image.
if headers.gettype() == AMAZON_CONTENT_TYPE:
log.debug('Downloaded art to: %s' % fn)
return fn
# Art from the filesystem.
def art_in_path(path):
"""Look for album art files in a specified directory."""
if not os.path.isdir(path):
return
# Find all files that look like images in the directory.
images = []
for fn in os.listdir(path):
for ext in IMAGE_EXTENSIONS:
if fn.lower().endswith('.' + ext):
images.append(fn)
# Look for "preferred" filenames.
for fn in images:
for name in COVER_NAMES:
if fn.lower().startswith(name):
log.debug('Using well-named art file %s' % fn)
return os.path.join(path, fn)
# Fall back to any image in the folder.
if images:
log.debug('Using fallback art file %s' % images[0])
return os.path.join(path, images[0])
# Main interface.
def art_for_album(album, path):
"""Given an album info dictionary from MusicBrainz, returns a path
to downloaded art for the album (or None if no art is found).
"""
if isinstance(path, basestring):
out = art_in_path(path)
if out:
return out
if album.asin:
log.debug('Fetching album art for ASIN %s.' % album.asin)
return art_for_asin(album.asin)
else:
log.debug('No ASIN available: no art found.')
return None
# Smoke test.
if __name__ == '__main__':
aid = sys.argv[1]
album = album_for_id(aid)
if not album:
print 'album not found'
else:
fn = art_for_album(album, None)
if fn:
print fn
print len(open(fn).read())/1024
else:
print 'no art found'

View File

@@ -1,125 +0,0 @@
# This file is part of beets.
# Copyright 2011, Adrian Sampson.
#
# Permission is hereby granted, free of charge, to any person obtaining
# a copy of this software and associated documentation files (the
# "Software"), to deal in the Software without restriction, including
# without limitation the rights to use, copy, modify, merge, publish,
# distribute, sublicense, and/or sell copies of the Software, and to
# permit persons to whom the Software is furnished to do so, subject to
# the following conditions:
#
# The above copyright notice and this permission notice shall be
# included in all copies or substantial portions of the Software.
"""Glue between metadata sources and the matching logic."""
from beets import plugins
from beets.autotag import mb
# Classes used to represent candidate options.
class AlbumInfo(object):
"""Describes a canonical release that may be used to match a release
in the library. Consists of these data members:
- ``album``: the release title
- ``album_id``: MusicBrainz ID; UUID fragment only
- ``artist``: name of the release's primary artist
- ``artist_id``
- ``tracks``: list of TrackInfo objects making up the release
- ``asin``: Amazon ASIN
- ``albumtype``: string describing the kind of release
- ``va``: boolean: whether the release has "various artists"
- ``year``: release year
- ``month``: release month
- ``day``: release day
- ``label``: music label responsible for the release
The fields up through ``tracks`` are required. The others are
optional and may be None.
"""
def __init__(self, album, album_id, artist, artist_id, tracks, asin=None,
albumtype=None, va=False, year=None, month=None, day=None,
label=None):
self.album = album
self.album_id = album_id
self.artist = artist
self.artist_id = artist_id
self.tracks = tracks
self.asin = asin
self.albumtype = albumtype
self.va = va
self.year = year
self.month = month
self.day = day
self.label = label
class TrackInfo(object):
"""Describes a canonical track present on a release. Appears as part
of an AlbumInfo's ``tracks`` list. Consists of these data members:
- ``title``: name of the track
- ``track_id``: MusicBrainz ID; UUID fragment only
- ``artist``: individual track artist name
- ``artist_id``
- ``length``: float: duration of the track in seconds
Only ``title`` and ``track_id`` are required. The rest of the fields
may be None.
"""
def __init__(self, title, track_id, artist=None, artist_id=None,
length=None):
self.title = title
self.track_id = track_id
self.artist = artist
self.artist_id = artist_id
self.length = length
# Aggregation of sources.
def _album_for_id(album_id):
"""Get an album corresponding to a MusicBrainz release ID."""
return mb.album_for_id(album_id)
def _track_for_id(track_id):
"""Get an item for a recording MBID."""
return mb.track_for_id(track_id)
def _album_candidates(items, artist, album, va_likely):
"""Search for album matches. ``items`` is a list of Item objects
that make up the album. ``artist`` and ``album`` are the respective
names (strings), which may be derived from the item list or may be
entered by the user. ``va_likely`` is a boolean indicating whether
the album is likely to be a "various artists" release.
"""
out = []
# Base candidates if we have album and artist to match.
if artist and album:
out.extend(mb.match_album(artist, album, len(items)))
# Also add VA matches from MusicBrainz where appropriate.
if va_likely and album:
out.extend(mb.match_album(None, album, len(items)))
# Candidates from plugins.
out.extend(plugins.candidates(items))
return out
def _item_candidates(item, artist, title):
"""Search for item matches. ``item`` is the Item to be matched.
``artist`` and ``title`` are strings and either reflect the item or
are specified by the user.
"""
out = []
# MusicBrainz candidates.
out.extend(mb.match_track(artist, title))
# Plugin candidates.
out.extend(plugins.item_candidates(item))
return out

View File

@@ -1,490 +0,0 @@
# This file is part of beets.
# Copyright 2011, Adrian Sampson.
#
# Permission is hereby granted, free of charge, to any person obtaining
# a copy of this software and associated documentation files (the
# "Software"), to deal in the Software without restriction, including
# without limitation the rights to use, copy, modify, merge, publish,
# distribute, sublicense, and/or sell copies of the Software, and to
# permit persons to whom the Software is furnished to do so, subject to
# the following conditions:
#
# The above copyright notice and this permission notice shall be
# included in all copies or substantial portions of the Software.
"""Matches existing metadata with canonical information to identify
releases and tracks.
"""
import logging
import re
from munkres import Munkres
from unidecode import unidecode
from beets import plugins
from beets.util import levenshtein, plurality
from beets.autotag import hooks
# Distance parameters.
# Text distance weights: proportions on the normalized intuitive edit
# distance.
ARTIST_WEIGHT = 3.0
ALBUM_WEIGHT = 3.0
# The weight of the entire distance calculated for a given track.
TRACK_WEIGHT = 1.0
# These distances are components of the track distance (that is, they
# compete against each other but not ARTIST_WEIGHT and ALBUM_WEIGHT;
# the overall TRACK_WEIGHT does that).
TRACK_TITLE_WEIGHT = 3.0
# Used instead of a global artist penalty for various-artist matches.
TRACK_ARTIST_WEIGHT = 2.0
# Added when the indices of tracks don't match.
TRACK_INDEX_WEIGHT = 1.0
# Track length weights: no penalty before GRACE, maximum (WEIGHT)
# penalty at GRACE+MAX discrepancy.
TRACK_LENGTH_GRACE = 10
TRACK_LENGTH_MAX = 30
TRACK_LENGTH_WEIGHT = 2.0
# MusicBrainz track ID matches.
TRACK_ID_WEIGHT = 5.0
# Parameters for string distance function.
# Words that can be moved to the end of a string using a comma.
SD_END_WORDS = ['the', 'a', 'an']
# Reduced weights for certain portions of the string.
SD_PATTERNS = [
(r'^the ', 0.1),
(r'[\[\(]?(ep|single)[\]\)]?', 0.0),
(r'[\[\(]?(featuring|feat|ft)[\. :].+', 0.1),
(r'\(.*?\)', 0.3),
(r'\[.*?\]', 0.3),
(r'(, )?(pt\.|part) .+', 0.2),
]
# Replacements to use before testing distance.
SD_REPLACE = [
(r'&', 'and'),
]
# Recommendation constants.
RECOMMEND_STRONG = 'RECOMMEND_STRONG'
RECOMMEND_MEDIUM = 'RECOMMEND_MEDIUM'
RECOMMEND_NONE = 'RECOMMEND_NONE'
# Thresholds for recommendations.
STRONG_REC_THRESH = 0.04
MEDIUM_REC_THRESH = 0.25
REC_GAP_THRESH = 0.25
# Artist signals that indicate "various artists".
VA_ARTISTS = (u'', u'various artists', u'va', u'unknown')
# Autotagging exceptions.
class AutotagError(Exception):
pass
# Global logger.
log = logging.getLogger('beets')
# Primary matching functionality.
def _string_dist_basic(str1, str2):
"""Basic edit distance between two strings, ignoring
non-alphanumeric characters and case. Comparisons are based on a
transliteration/lowering to ASCII characters. Normalized by string
length.
"""
str1 = unidecode(str1)
str2 = unidecode(str2)
str1 = re.sub(r'[^a-z0-9]', '', str1.lower())
str2 = re.sub(r'[^a-z0-9]', '', str2.lower())
if not str1 and not str2:
return 0.0
return levenshtein(str1, str2) / float(max(len(str1), len(str2)))
def string_dist(str1, str2):
"""Gives an "intuitive" edit distance between two strings. This is
an edit distance, normalized by the string length, with a number of
tweaks that reflect intuition about text.
"""
str1 = str1.lower()
str2 = str2.lower()
# Don't penalize strings that move certain words to the end. For
# example, "the something" should be considered equal to
# "something, the".
for word in SD_END_WORDS:
if str1.endswith(', %s' % word):
str1 = '%s %s' % (word, str1[:-len(word)-2])
if str2.endswith(', %s' % word):
str2 = '%s %s' % (word, str2[:-len(word)-2])
# Perform a couple of basic normalizing substitutions.
for pat, repl in SD_REPLACE:
str1 = re.sub(pat, repl, str1)
str2 = re.sub(pat, repl, str2)
# Change the weight for certain string portions matched by a set
# of regular expressions. We gradually change the strings and build
# up penalties associated with parts of the string that were
# deleted.
base_dist = _string_dist_basic(str1, str2)
penalty = 0.0
for pat, weight in SD_PATTERNS:
# Get strings that drop the pattern.
case_str1 = re.sub(pat, '', str1)
case_str2 = re.sub(pat, '', str2)
if case_str1 != str1 or case_str2 != str2:
# If the pattern was present (i.e., it is deleted in the
# the current case), recalculate the distances for the
# modified strings.
case_dist = _string_dist_basic(case_str1, case_str2)
case_delta = max(0.0, base_dist - case_dist)
if case_delta == 0.0:
continue
# Shift our baseline strings down (to avoid rematching the
# same part of the string) and add a scaled distance
# amount to the penalties.
str1 = case_str1
str2 = case_str2
base_dist = case_dist
penalty += weight * case_delta
dist = base_dist + penalty
return dist
def current_metadata(items):
"""Returns the most likely artist and album for a set of Items.
Each is determined by tag reflected by the plurality of the Items.
"""
keys = 'artist', 'album'
likelies = {}
consensus = {}
for key in keys:
values = [getattr(item, key) for item in items]
likelies[key], freq = plurality(values)
consensus[key] = (freq == len(values))
return likelies['artist'], likelies['album'], consensus['artist']
def order_items(items, trackinfo):
"""Orders the items based on how they match some canonical track
information. This always produces a result if the numbers of tracks
match.
"""
# Make sure lengths match.
if len(items) != len(trackinfo):
return None
# Construct the cost matrix.
costs = []
for cur_item in items:
row = []
for i, canon_item in enumerate(trackinfo):
row.append(track_distance(cur_item, canon_item, i+1))
costs.append(row)
# Find a minimum-cost bipartite matching.
matching = Munkres().compute(costs)
# Order items based on the matching.
ordered_items = [None]*len(items)
for cur_idx, canon_idx in matching:
ordered_items[canon_idx] = items[cur_idx]
return ordered_items
def track_distance(item, track_info, track_index=None, incl_artist=False):
"""Determines the significance of a track metadata change. Returns
a float in [0.0,1.0]. `track_index` is the track number of the
`track_info` metadata set. If `track_index` is provided and
item.track is set, then these indices are used as a component of
the distance calculation. `incl_artist` indicates that a distance
component should be included for the track artist (i.e., for
various-artist releases).
"""
# Distance and normalization accumulators.
dist, dist_max = 0.0, 0.0
# Check track length.
if not track_info.length:
# If there's no length to check, assume the worst.
dist += TRACK_LENGTH_WEIGHT
else:
diff = abs(item.length - track_info.length)
diff = max(diff - TRACK_LENGTH_GRACE, 0.0)
diff = min(diff, TRACK_LENGTH_MAX)
dist += (diff / TRACK_LENGTH_MAX) * TRACK_LENGTH_WEIGHT
dist_max += TRACK_LENGTH_WEIGHT
# Track title.
dist += string_dist(item.title, track_info.title) * TRACK_TITLE_WEIGHT
dist_max += TRACK_TITLE_WEIGHT
# Track artist, if included.
# Attention: MB DB does not have artist info for all compilations,
# so only check artist distance if there is actually an artist in
# the MB track data.
if incl_artist and track_info.artist:
dist += string_dist(item.artist, track_info.artist) * \
TRACK_ARTIST_WEIGHT
dist_max += TRACK_ARTIST_WEIGHT
# Track index.
if track_index and item.track:
if track_index != item.track:
dist += TRACK_INDEX_WEIGHT
dist_max += TRACK_INDEX_WEIGHT
# MusicBrainz track ID.
if item.mb_trackid:
if item.mb_trackid != track_info.track_id:
dist += TRACK_ID_WEIGHT
dist_max += TRACK_ID_WEIGHT
# Plugin distances.
plugin_d, plugin_dm = plugins.track_distance(item, track_info)
dist += plugin_d
dist_max += plugin_dm
return dist / dist_max
def distance(items, album_info):
"""Determines how "significant" an album metadata change would be.
Returns a float in [0.0,1.0]. The list of items must be ordered.
"""
cur_artist, cur_album, _ = current_metadata(items)
cur_artist = cur_artist or ''
cur_album = cur_album or ''
# These accumulate the possible distance components. The final
# distance will be dist/dist_max.
dist = 0.0
dist_max = 0.0
# Artist/album metadata.
if not album_info.va:
dist += string_dist(cur_artist, album_info.artist) * ARTIST_WEIGHT
dist_max += ARTIST_WEIGHT
dist += string_dist(cur_album, album_info.album) * ALBUM_WEIGHT
dist_max += ALBUM_WEIGHT
# Track distances.
for i, (item, track_info) in enumerate(zip(items, album_info.tracks)):
dist += track_distance(item, track_info, i+1, album_info.va) * \
TRACK_WEIGHT
dist_max += TRACK_WEIGHT
# Plugin distances.
plugin_d, plugin_dm = plugins.album_distance(items, album_info)
dist += plugin_d
dist_max += plugin_dm
# Normalize distance, avoiding divide-by-zero.
if dist_max == 0.0:
return 0.0
else:
return dist/dist_max
def match_by_id(items):
"""If the items are tagged with a MusicBrainz album ID, returns an
info dict for the corresponding album. Otherwise, returns None.
"""
# Is there a consensus on the MB album ID?
albumids = [item.mb_albumid for item in items if item.mb_albumid]
if not albumids:
log.debug('No album IDs found.')
return None
# If all album IDs are equal, look up the album.
if bool(reduce(lambda x,y: x if x==y else (), albumids)):
albumid = albumids[0]
log.debug('Searching for discovered album ID: ' + albumid)
return hooks._album_for_id(albumid)
else:
log.debug('No album ID consensus.')
return None
#fixme In the future, at the expense of performance, we could use
# other IDs (i.e., track and artist) in case the album tag isn't
# present, but that event seems very unlikely.
def recommendation(results):
"""Given a sorted list of result tuples, returns a recommendation
flag (RECOMMEND_STRONG, RECOMMEND_MEDIUM, RECOMMEND_NONE) based
on the results' distances.
"""
if not results:
# No candidates: no recommendation.
rec = RECOMMEND_NONE
else:
min_dist = results[0][0]
if min_dist < STRONG_REC_THRESH:
# Strong recommendation level.
rec = RECOMMEND_STRONG
elif len(results) == 1:
# Only a single candidate. Medium recommendation.
rec = RECOMMEND_MEDIUM
elif min_dist <= MEDIUM_REC_THRESH:
# Medium recommendation level.
rec = RECOMMEND_MEDIUM
elif results[1][0] - min_dist >= REC_GAP_THRESH:
# Gap between first two candidates is large.
rec = RECOMMEND_MEDIUM
else:
# No conclusion.
rec = RECOMMEND_NONE
return rec
def validate_candidate(items, tuple_dict, info):
"""Given a candidate info dict, attempt to add the candidate to
the output dictionary of result tuples. This involves checking
the track count, ordering the items, checking for duplicates, and
calculating the distance.
"""
log.debug('Candidate: %s - %s' % (info.artist, info.album))
# Don't duplicate.
if info.album_id in tuple_dict:
log.debug('Duplicate.')
return
# Make sure the album has the correct number of tracks.
if len(items) != len(info.tracks):
log.debug('Track count mismatch.')
return
# Put items in order.
ordered = order_items(items, info.tracks)
if not ordered:
log.debug('Not orderable.')
return
# Get the change distance.
dist = distance(ordered, info)
log.debug('Success. Distance: %f' % dist)
tuple_dict[info.album_id] = dist, ordered, info
def tag_album(items, timid=False, search_artist=None, search_album=None,
search_id=None):
"""Bundles together the functionality used to infer tags for a
set of items comprised by an album. Returns everything relevant:
- The current artist.
- The current album.
- A list of (distance, items, info) tuples where info is a
dictionary containing the inferred tags and items is a
reordered version of the input items list. The candidates are
sorted by distance (i.e., best match first).
- A recommendation, one of RECOMMEND_STRONG, RECOMMEND_MEDIUM,
or RECOMMEND_NONE; indicating that the first candidate is
very likely, it is somewhat likely, or no conclusion could
be reached.
If search_artist and search_album or search_id are provided, then
they are used as search terms in place of the current metadata.
May raise an AutotagError if existing metadata is insufficient.
"""
# Get current metadata.
cur_artist, cur_album, artist_consensus = current_metadata(items)
log.debug('Tagging %s - %s' % (cur_artist, cur_album))
# The output result tuples (keyed by MB album ID).
out_tuples = {}
# Try to find album indicated by MusicBrainz IDs.
if search_id:
log.debug('Searching for album ID: ' + search_id)
id_info = hooks._album_for_id(search_id)
else:
id_info = match_by_id(items)
if id_info:
validate_candidate(items, out_tuples, id_info)
rec = recommendation(out_tuples.values())
log.debug('Album ID match recommendation is ' + str(rec))
if out_tuples and not timid:
# If we have a very good MBID match, return immediately.
# Otherwise, this match will compete against metadata-based
# matches.
if rec == RECOMMEND_STRONG:
log.debug('ID match.')
return cur_artist, cur_album, out_tuples.values(), rec
# If searching by ID, don't continue to metadata search.
if search_id is not None:
if out_tuples:
return cur_artist, cur_album, out_tuples.values(), rec
else:
return cur_artist, cur_album, [], RECOMMEND_NONE
# Search terms.
if not (search_artist and search_album):
# No explicit search terms -- use current metadata.
search_artist, search_album = cur_artist, cur_album
log.debug(u'Search terms: %s - %s' % (search_artist, search_album))
# Is this album likely to be a "various artist" release?
va_likely = ((not artist_consensus) or
(search_artist.lower() in VA_ARTISTS) or
any(item.comp for item in items))
log.debug(u'Album might be VA: %s' % str(va_likely))
# Get the results from the data sources.
candidates = hooks._album_candidates(items, search_artist, search_album,
va_likely)
# Get the distance to each candidate.
log.debug(u'Evaluating %i candidates.' % len(candidates))
for info in candidates:
validate_candidate(items, out_tuples, info)
# Sort by distance.
out_tuples = out_tuples.values()
out_tuples.sort()
rec = recommendation(out_tuples)
return cur_artist, cur_album, out_tuples, rec
def tag_item(item, timid=False, search_artist=None, search_title=None,
search_id=None):
"""Attempts to find metadata for a single track. Returns a
`(candidates, recommendation)` pair where `candidates` is a list
of `(distance, track_info)` pairs. `search_artist` and
`search_title` may be used to override the current metadata for
the purposes of the MusicBrainz title; likewise `search_id`.
"""
candidates = []
# First, try matching by MusicBrainz ID.
trackid = search_id or item.mb_trackid
if trackid:
log.debug('Searching for track ID: ' + trackid)
track_info = hooks._track_for_id(trackid)
if track_info:
dist = track_distance(item, track_info, incl_artist=True)
candidates.append((dist, track_info))
# If this is a good match, then don't keep searching.
rec = recommendation(candidates)
if rec == RECOMMEND_STRONG and not timid:
log.debug('Track ID match.')
return candidates, rec
# If we're searching by ID, don't proceed.
if search_id is not None:
if candidates:
return candidates, rec
else:
return [], RECOMMEND_NONE
# Search terms.
if not (search_artist and search_title):
search_artist, search_title = item.artist, item.title
log.debug(u'Item search terms: %s - %s' % (search_artist, search_title))
# Get and evaluate candidate metadata.
for track_info in hooks._item_candidates(item, search_artist, search_title):
dist = track_distance(item, track_info, incl_artist=True)
candidates.append((dist, track_info))
# Sort by distance and return with recommendation.
log.debug('Found %i candidates.' % len(candidates))
candidates.sort()
rec = recommendation(candidates)
return candidates, rec

View File

@@ -1,171 +0,0 @@
# This file is part of beets.
# Copyright 2011, Adrian Sampson.
#
# Permission is hereby granted, free of charge, to any person obtaining
# a copy of this software and associated documentation files (the
# "Software"), to deal in the Software without restriction, including
# without limitation the rights to use, copy, modify, merge, publish,
# distribute, sublicense, and/or sell copies of the Software, and to
# permit persons to whom the Software is furnished to do so, subject to
# the following conditions:
#
# The above copyright notice and this permission notice shall be
# included in all copies or substantial portions of the Software.
"""Searches for albums in the MusicBrainz database.
"""
import logging
from . import musicbrainz3
import beets.autotag.hooks
import beets
SEARCH_LIMIT = 5
VARIOUS_ARTISTS_ID = '89ad4ac3-39f7-470e-963a-56509c546377'
musicbrainz3._useragent = 'beets/%s' % beets.__version__
class ServerBusyError(Exception): pass
class BadResponseError(Exception): pass
log = logging.getLogger('beets')
# We hard-code IDs for artists that can't easily be searched for.
SPECIAL_CASE_ARTISTS = {
'!!!': 'f26c72d3-e52c-467b-b651-679c73d8e1a7',
}
RELEASE_INCLUDES = ['artists', 'media', 'recordings', 'release-groups',
'labels']
TRACK_INCLUDES = ['artists']
def _adapt_criteria(criteria):
"""Special-case artists in a criteria dictionary before it is passed
to the MusicBrainz search server. The dictionary supplied is
mutated; nothing is returned.
"""
if 'artist' in criteria:
for artist, artist_id in SPECIAL_CASE_ARTISTS.items():
if criteria['artist'] == artist:
criteria['arid'] = artist_id
del criteria['artist']
break
def track_info(recording):
"""Translates a MusicBrainz recording result dictionary into a beets
``TrackInfo`` object.
"""
info = beets.autotag.hooks.TrackInfo(recording['title'],
recording['id'])
if 'artist-credit' in recording: # XXX: when is this not included?
artist = recording['artist-credit'][0]['artist']
info.artist = artist['name']
info.artist_id = artist['id']
if recording.get('length'):
info.length = int(recording['length'])/(1000.0)
return info
def album_info(release):
"""Takes a MusicBrainz release result dictionary and returns a beets
AlbumInfo object containing the interesting data about that release.
"""
# Basic info.
artist = release['artist-credit'][0]['artist']
tracks = []
for medium in release['medium-list']:
tracks.extend(i['recording'] for i in medium['track-list'])
info = beets.autotag.hooks.AlbumInfo(
release['title'],
release['id'],
artist['name'],
artist['id'],
[track_info(track) for track in tracks],
)
info.va = info.artist_id == VARIOUS_ARTISTS_ID
if 'asin' in release:
info.asin = release['asin']
# Release type not always populated.
reltype = release['release-group']['type']
if reltype:
info.albumtype = reltype.lower()
# Release date.
if 'date' in release: # XXX: when is this not included?
date_str = release['date']
if date_str:
date_parts = date_str.split('-')
for key in ('year', 'month', 'day'):
if date_parts:
setattr(info, key, int(date_parts.pop(0)))
# Label name.
if release.get('label-info-list'):
label = release['label-info-list'][0]['label']['name']
if label != '[no label]':
info.label = label
return info
def match_album(artist, album, tracks=None, limit=SEARCH_LIMIT):
"""Searches for a single album ("release" in MusicBrainz parlance)
and returns an iterator over AlbumInfo objects.
The query consists of an artist name, an album name, and,
optionally, a number of tracks on the album.
"""
# Build search criteria.
criteria = {'release': album}
if artist is not None:
criteria['artist'] = artist
else:
# Various Artists search.
criteria['arid'] = VARIOUS_ARTISTS_ID
if tracks is not None:
criteria['tracks'] = str(tracks)
_adapt_criteria(criteria)
res = musicbrainz3.release_search(limit=limit, **criteria)
for release in res['release-list']:
# The search result is missing some data (namely, the tracks),
# so we just use the ID and fetch the rest of the information.
yield album_for_id(release['id'])
def match_track(artist, title, limit=SEARCH_LIMIT):
"""Searches for a single track and returns an iterable of TrackInfo
objects.
"""
criteria = {
'artist': artist,
'recording': title,
}
_adapt_criteria(criteria)
res = musicbrainz3.recording_search(limit=limit, **criteria)
for recording in res['recording-list']:
yield track_info(recording)
def album_for_id(albumid):
"""Fetches an album by its MusicBrainz ID and returns an AlbumInfo
object or None if the album is not found.
"""
try:
res = musicbrainz3.get_release_by_id(albumid, RELEASE_INCLUDES)
except musicbrainz3.ResponseError:
log.debug('Album ID match failed.')
return None
return album_info(res['release'])
def track_for_id(trackid):
"""Fetches a track by its MusicBrainz ID. Returns a TrackInfo object
or None if no track is found.
"""
try:
res = musicbrainz3.get_recording_by_id(trackid, TRACK_INCLUDES)
except musicbrainz3.ResponseError:
log.debug('Track ID match failed.')
return None
return track_info(res['recording'])

View File

@@ -1,811 +0,0 @@
# This file is part of beets.
# Copyright 2011, Adrian Sampson.
#
# Permission is hereby granted, free of charge, to any person obtaining
# a copy of this software and associated documentation files (the
# "Software"), to deal in the Software without restriction, including
# without limitation the rights to use, copy, modify, merge, publish,
# distribute, sublicense, and/or sell copies of the Software, and to
# permit persons to whom the Software is furnished to do so, subject to
# the following conditions:
#
# The above copyright notice and this permission notice shall be
# included in all copies or substantial portions of the Software.
"""Provides the basic, interface-agnostic workflow for importing and
autotagging music files.
"""
from __future__ import with_statement # Python 2.5
import os
import logging
import pickle
from collections import defaultdict
from beets import autotag
from beets import library
import beets.autotag.art
from beets import plugins
from beets.util import pipeline
from beets.util import syspath, normpath, plurality
from beets.util.enumeration import enum
action = enum(
'SKIP', 'ASIS', 'TRACKS', 'MANUAL', 'APPLY', 'MANUAL_ID',
name='action'
)
QUEUE_SIZE = 128
STATE_FILE = os.path.expanduser('~/.beetsstate')
SINGLE_ARTIST_THRESH = 0.25
VARIOUS_ARTISTS = u'Various Artists'
# Global logger.
log = logging.getLogger('beets')
class ImportAbort(Exception):
"""Raised when the user aborts the tagging operation.
"""
pass
# Utilities.
def tag_log(logfile, status, path):
"""Log a message about a given album to logfile. The status should
reflect the reason the album couldn't be tagged.
"""
if logfile:
print >>logfile, '%s %s' % (status, path)
def log_choice(config, task):
"""Logs the task's current choice if it should be logged.
"""
path = task.path if task.is_album else task.item.path
if task.choice_flag is action.ASIS:
tag_log(config.logfile, 'asis', path)
elif task.choice_flag is action.SKIP:
tag_log(config.logfile, 'skip', path)
def _reopen_lib(lib):
"""Because of limitations in SQLite, a given Library is bound to
the thread in which it was created. This function reopens Library
objects so that they can be used from separate threads.
"""
if isinstance(lib, library.Library):
return library.Library(
lib.path,
lib.directory,
lib.path_formats,
lib.art_filename,
)
else:
return lib
def _duplicate_check(lib, task, recent=None):
"""Check whether an album already exists in the library. `recent`
should be a set of (artist, album) pairs that will be built up
with every call to this function and checked along with the
library.
"""
if task.choice_flag is action.ASIS:
artist = task.cur_artist
album = task.cur_album
elif task.choice_flag is action.APPLY:
artist = task.info.artist
album = task.info.album
else:
return False
if artist is None:
# As-is import with no artist. Skip check.
return False
# Try the recent albums.
if recent is not None:
if (artist, album) in recent:
return True
recent.add((artist, album))
# Look in the library.
cur_paths = set(i.path for i in task.items)
for album_cand in lib.albums(artist=artist):
if album_cand.album == album:
# Check whether the album is identical in contents, in which
# case it is not a duplicate (will be replaced).
other_paths = set(i.path for i in album_cand.items())
if other_paths == cur_paths:
continue
return True
return False
def _item_duplicate_check(lib, task, recent=None):
"""Check whether an item already exists in the library."""
if task.choice_flag is action.ASIS:
artist = task.item.artist
title = task.item.title
elif task.choice_flag is action.APPLY:
artist = task.info.artist
title = task.info.title
else:
return False
# Try recent items.
if recent is not None:
if (artist, title) in recent:
return True
recent.add((artist, title))
# Check the library.
item_iter = lib.items(artist=artist, title=title)
try:
for other_item in item_iter:
# Existing items not considered duplicates.
if other_item.path == task.item.path:
continue
return True
finally:
item_iter.close()
return False
def _infer_album_fields(task):
"""Given an album and an associated import task, massage the
album-level metadata. This ensures that the album artist is set
and that the "compilation" flag is set automatically.
"""
assert task.is_album
assert task.items
changes = {}
if task.choice_flag == action.ASIS:
# Taking metadata "as-is". Guess whether this album is VA.
plur_artist, freq = plurality([i.artist for i in task.items])
if freq == len(task.items) or (freq > 1 and
float(freq) / len(task.items) >= SINGLE_ARTIST_THRESH):
# Single-artist album.
changes['albumartist'] = plur_artist
changes['comp'] = False
else:
# VA.
changes['albumartist'] = VARIOUS_ARTISTS
changes['comp'] = True
elif task.choice_flag == action.APPLY:
# Applying autotagged metadata. Just get AA from the first
# item.
if not task.items[0].albumartist:
changes['albumartist'] = task.items[0].artist
if not task.items[0].mb_albumartistid:
changes['mb_albumartistid'] = task.items[0].mb_artistid
else:
assert False
# Apply new metadata.
for item in task.items:
for k, v in changes.iteritems():
setattr(item, k, v)
def _open_state():
"""Reads the state file, returning a dictionary."""
try:
with open(STATE_FILE) as f:
return pickle.load(f)
except IOError:
return {}
def _save_state(state):
"""Writes the state dictionary out to disk."""
with open(STATE_FILE, 'w') as f:
pickle.dump(state, f)
# Utilities for reading and writing the beets progress file, which
# allows long tagging tasks to be resumed when they pause (or crash).
PROGRESS_KEY = 'tagprogress'
def progress_set(toppath, path):
"""Record that tagging for the given `toppath` was successful up to
`path`. If path is None, then clear the progress value (indicating
that the tagging completed).
"""
state = _open_state()
if PROGRESS_KEY not in state:
state[PROGRESS_KEY] = {}
if path is None:
# Remove progress from file.
if toppath in state[PROGRESS_KEY]:
del state[PROGRESS_KEY][toppath]
else:
state[PROGRESS_KEY][toppath] = path
_save_state(state)
def progress_get(toppath):
"""Get the last successfully tagged subpath of toppath. If toppath
has no progress information, returns None.
"""
state = _open_state()
if PROGRESS_KEY not in state:
return None
return state[PROGRESS_KEY].get(toppath)
# Similarly, utilities for manipulating the "incremental" import log.
# This keeps track of all directories that were ever imported, which
# allows the importer to only import new stuff.
HISTORY_KEY = 'taghistory'
def history_add(path):
"""Indicate that the import of `path` is completed and should not
be repeated in incremental imports.
"""
state = _open_state()
if HISTORY_KEY not in state:
state[HISTORY_KEY] = set()
state[HISTORY_KEY].add(path)
_save_state(state)
def history_get():
"""Get the set of completed paths in incremental imports.
"""
state = _open_state()
if HISTORY_KEY not in state:
return set()
return state[HISTORY_KEY]
# The configuration structure.
class ImportConfig(object):
"""Contains all the settings used during an import session. Should
be used in a "write-once" way -- everything is set up initially and
then never touched again.
"""
_fields = ['lib', 'paths', 'resume', 'logfile', 'color', 'quiet',
'quiet_fallback', 'copy', 'write', 'art', 'delete',
'choose_match_func', 'should_resume_func', 'threaded',
'autot', 'singletons', 'timid', 'choose_item_func',
'query', 'incremental']
def __init__(self, **kwargs):
for slot in self._fields:
setattr(self, slot, kwargs[slot])
# Normalize the paths.
if self.paths:
self.paths = map(normpath, self.paths)
# Incremental and progress are mutually exclusive.
if self.incremental:
self.resume = False
# When based on a query instead of directories, never
# save progress or try to resume.
if self.query is not None:
self.paths = None
self.resume = False
self.incremental = False
# The importer task class.
class ImportTask(object):
"""Represents a single set of items to be imported along with its
intermediate state. May represent an album or a single item.
"""
def __init__(self, toppath=None, path=None, items=None):
self.toppath = toppath
self.path = path
self.items = items
self.sentinel = False
@classmethod
def done_sentinel(cls, toppath):
"""Create an ImportTask that indicates the end of a top-level
directory import.
"""
obj = cls(toppath)
obj.sentinel = True
return obj
@classmethod
def progress_sentinel(cls, toppath, path):
"""Create a task indicating that a single directory in a larger
import has finished. This is only required for singleton
imports; progress is implied for album imports.
"""
obj = cls(toppath, path)
obj.sentinel = True
return obj
@classmethod
def item_task(cls, item):
"""Creates an ImportTask for a single item."""
obj = cls()
obj.item = item
obj.is_album = False
return obj
def set_match(self, cur_artist, cur_album, candidates, rec):
"""Sets the candidates for this album matched by the
`autotag.tag_album` method.
"""
assert not self.sentinel
self.cur_artist = cur_artist
self.cur_album = cur_album
self.candidates = candidates
self.rec = rec
self.is_album = True
def set_null_match(self):
"""Set the candidates to indicate no album match was found.
"""
self.set_match(None, None, None, None)
def set_item_match(self, candidates, rec):
"""Set the match for a single-item task."""
assert not self.is_album
assert self.item is not None
self.item_match = (candidates, rec)
def set_null_item_match(self):
"""For single-item tasks, mark the item as having no matches.
"""
assert not self.is_album
assert self.item is not None
self.item_match = None
def set_choice(self, choice):
"""Given either an (info, items) tuple or an action constant,
indicates that an action has been selected by the user (or
automatically).
"""
assert not self.sentinel
# Not part of the task structure:
assert choice not in (action.MANUAL, action.MANUAL_ID)
assert choice != action.APPLY # Only used internally.
if choice in (action.SKIP, action.ASIS, action.TRACKS):
self.choice_flag = choice
self.info = None
else:
assert not isinstance(choice, action)
if self.is_album:
info, items = choice
self.items = items # Reordered items list.
else:
info = choice
self.info = info
self.choice_flag = action.APPLY # Implicit choice.
def save_progress(self):
"""Updates the progress state to indicate that this album has
finished.
"""
if self.sentinel and self.path is None:
# "Done" sentinel.
progress_set(self.toppath, None)
elif self.sentinel or self.is_album:
# "Directory progress" sentinel for singletons or a real
# album task, which implies the same.
progress_set(self.toppath, self.path)
def save_history(self):
"""Save the directory in the history for incremental imports.
"""
if self.sentinel or self.is_album:
history_add(self.path)
# Logical decisions.
def should_write_tags(self):
"""Should new info be written to the files' metadata?"""
if self.choice_flag == action.APPLY:
return True
elif self.choice_flag in (action.ASIS, action.TRACKS, action.SKIP):
return False
else:
assert False
def should_fetch_art(self):
"""Should album art be downloaded for this album?"""
return self.should_write_tags() and self.is_album
def should_skip(self):
"""After a choice has been made, returns True if this is a
sentinel or it has been marked for skipping.
"""
return self.sentinel or self.choice_flag == action.SKIP
# Full-album pipeline stages.
def read_tasks(config):
"""A generator yielding all the albums (as ImportTask objects) found
in the user-specified list of paths. In the case of a singleton
import, yields single-item tasks instead.
"""
# Look for saved progress.
progress = config.resume is not False
if progress:
resume_dirs = {}
for path in config.paths:
resume_dir = progress_get(path)
if resume_dir:
# Either accept immediately or prompt for input to decide.
if config.resume:
do_resume = True
log.warn('Resuming interrupted import of %s' % path)
else:
do_resume = config.should_resume_func(config, path)
if do_resume:
resume_dirs[path] = resume_dir
else:
# Clear progress; we're starting from the top.
progress_set(path, None)
# Look for saved incremental directories.
if config.incremental:
history_dirs = history_get()
for toppath in config.paths:
# Check whether the path is to a file.
if config.singletons and not os.path.isdir(syspath(toppath)):
item = library.Item.from_path(toppath)
yield ImportTask.item_task(item)
continue
# Produce paths under this directory.
if progress:
resume_dir = resume_dirs.get(toppath)
for path, items in autotag.albums_in_dir(toppath):
# Skip according to progress.
if progress and resume_dir:
# We're fast-forwarding to resume a previous tagging.
if path == resume_dir:
# We've hit the last good path! Turn off the
# fast-forwarding.
resume_dir = None
continue
# When incremental, skip paths in the history.
if config.incremental and path in history_dirs:
continue
# Yield all the necessary tasks.
if config.singletons:
for item in items:
yield ImportTask.item_task(item)
yield ImportTask.progress_sentinel(toppath, path)
else:
yield ImportTask(toppath, path, items)
# Indicate the directory is finished.
yield ImportTask.done_sentinel(toppath)
def query_tasks(config):
"""A generator that works as a drop-in-replacement for read_tasks.
Instead of finding files from the filesystem, a query is used to
match items from the library.
"""
lib = _reopen_lib(config.lib)
if config.singletons:
# Search for items.
items = list(lib.items(config.query))
for item in items:
yield ImportTask.item_task(item)
else:
# Search for albums.
albums = lib.albums(config.query)
for album in albums:
log.debug('yielding album %i: %s - %s' %
(album.id, album.albumartist, album.album))
items = list(album.items())
yield ImportTask(None, album.item_dir(), items)
def initial_lookup(config):
"""A coroutine for performing the initial MusicBrainz lookup for an
album. It accepts lists of Items and yields
(items, cur_artist, cur_album, candidates, rec) tuples. If no match
is found, all of the yielded parameters (except items) are None.
"""
task = None
while True:
task = yield task
if task.sentinel:
continue
log.debug('Looking up: %s' % task.path)
try:
task.set_match(*autotag.tag_album(task.items, config.timid))
except autotag.AutotagError:
task.set_null_match()
def user_query(config):
"""A coroutine for interfacing with the user about the tagging
process. lib is the Library to import into and logfile may be
a file-like object for logging the import process. The coroutine
accepts and yields ImportTask objects.
"""
lib = _reopen_lib(config.lib)
recent = set()
task = None
while True:
task = yield task
if task.sentinel:
continue
# Ask the user for a choice.
choice = config.choose_match_func(task, config)
task.set_choice(choice)
log_choice(config, task)
# As-tracks: transition to singleton workflow.
if choice is action.TRACKS:
# Set up a little pipeline for dealing with the singletons.
item_tasks = []
def emitter():
for item in task.items:
yield ImportTask.item_task(item)
yield ImportTask.progress_sentinel(task.toppath, task.path)
def collector():
while True:
item_task = yield
item_tasks.append(item_task)
ipl = pipeline.Pipeline((emitter(), item_lookup(config),
item_query(config), collector()))
ipl.run_sequential()
task = pipeline.multiple(item_tasks)
continue
# Check for duplicates if we have a match (or ASIS).
if _duplicate_check(lib, task, recent):
tag_log(config.logfile, 'duplicate', task.path)
log.warn("This album is already in the library!")
task.set_choice(action.SKIP)
def show_progress(config):
"""This stage replaces the initial_lookup and user_query stages
when the importer is run without autotagging. It displays the album
name and artist as the files are added.
"""
task = None
while True:
task = yield task
if task.sentinel:
continue
log.info(task.path)
# Behave as if ASIS were selected.
task.set_null_match()
task.set_choice(action.ASIS)
def apply_choices(config):
"""A coroutine for applying changes to albums during the autotag
process.
"""
lib = _reopen_lib(config.lib)
task = None
while True:
task = yield task
if task.should_skip():
continue
items = task.items if task.is_album else [task.item]
# Clear IDs in case the items are being re-tagged.
for item in items:
item.id = None
item.album_id = None
# Change metadata.
if task.should_write_tags():
if task.is_album:
autotag.apply_metadata(task.items, task.info)
else:
autotag.apply_item_metadata(task.item, task.info)
# Infer album-level fields.
if task.is_album:
_infer_album_fields(task)
# Find existing item entries that these are replacing. Old
# album structures are automatically cleaned up when the
# last item is removed.
replaced_items = defaultdict(list)
for item in items:
dup_items = list(lib.items(
library.MatchQuery('path', item.path)
))
for dup_item in dup_items:
replaced_items[item].append(dup_item)
log.debug('replacing item %i: %s' % (dup_item.id, item.path))
log.debug('%i of %i items replaced' % (len(replaced_items),
len(items)))
# Move/copy files.
task.old_paths = [item.path for item in items]
for item in items:
if config.copy:
# If we're replacing an item, then move rather than
# copying.
do_copy = not bool(replaced_items[item])
lib.move(item, do_copy, task.is_album)
if config.write and task.should_write_tags():
item.write()
# Add items to library. We consolidate this at the end to avoid
# locking while we do the copying and tag updates.
try:
# Remove old items.
for replaced in replaced_items.itervalues():
for item in replaced:
lib.remove(item)
# Add new ones.
if task.is_album:
# Add an album.
album = lib.add_album(task.items)
task.album_id = album.id
else:
# Add tracks.
for item in items:
lib.add(item)
finally:
lib.save()
def fetch_art(config):
"""A coroutine that fetches and applies album art for albums where
appropriate.
"""
lib = _reopen_lib(config.lib)
task = None
while True:
task = yield task
if task.should_skip():
continue
if task.should_fetch_art():
artpath = beets.autotag.art.art_for_album(task.info, task.path)
# Save the art if any was found.
if artpath:
try:
album = lib.get_album(task.album_id)
album.set_art(artpath)
finally:
lib.save(False)
def finalize(config):
"""A coroutine that finishes up importer tasks. In particular, the
coroutine sends plugin events, deletes old files, and saves
progress. This is a "terminal" coroutine (it yields None).
"""
lib = _reopen_lib(config.lib)
while True:
task = yield
if task.should_skip():
if config.resume is not False:
task.save_progress()
if config.incremental:
task.save_history()
continue
items = task.items if task.is_album else [task.item]
# Announce that we've added an album.
if task.is_album:
album = lib.get_album(task.album_id)
plugins.send('album_imported', lib=lib, album=album)
else:
for item in items:
plugins.send('item_imported', lib=lib, item=item)
# Finally, delete old files.
if config.copy and config.delete:
new_paths = [os.path.realpath(item.path) for item in items]
for old_path in task.old_paths:
# Only delete files that were actually moved.
if old_path not in new_paths:
os.remove(syspath(old_path))
# Update progress.
if config.resume is not False:
task.save_progress()
if config.incremental:
task.save_history()
# Singleton pipeline stages.
def item_lookup(config):
"""A coroutine used to perform the initial MusicBrainz lookup for
an item task.
"""
task = None
while True:
task = yield task
if task.sentinel:
continue
task.set_item_match(*autotag.tag_item(task.item, config.timid))
def item_query(config):
"""A coroutine that queries the user for input on single-item
lookups.
"""
lib = _reopen_lib(config.lib)
task = None
recent = set()
while True:
task = yield task
if task.sentinel:
continue
choice = config.choose_item_func(task, config)
task.set_choice(choice)
log_choice(config, task)
# Duplicate check.
if _item_duplicate_check(lib, task, recent):
tag_log(config.logfile, 'duplicate', task.item.path)
log.warn("This item is already in the library!")
task.set_choice(action.SKIP)
def item_progress(config):
"""Skips the lookup and query stages in a non-autotagged singleton
import. Just shows progress.
"""
task = None
log.info('Importing items:')
while True:
task = yield task
if task.sentinel:
continue
log.info(task.item.path)
task.set_null_item_match()
task.set_choice(action.ASIS)
# Main driver.
def run_import(**kwargs):
"""Run an import. The keyword arguments are the same as those to
ImportConfig.
"""
config = ImportConfig(**kwargs)
# Set up the pipeline.
if config.query is None:
stages = [read_tasks(config)]
else:
stages = [query_tasks(config)]
if config.singletons:
# Singleton importer.
if config.autot:
stages += [item_lookup(config), item_query(config)]
else:
stages += [item_progress(config)]
else:
# Whole-album importer.
if config.autot:
# Only look up and query the user when autotagging.
stages += [initial_lookup(config), user_query(config)]
else:
# When not autotagging, just display progress.
stages += [show_progress(config)]
stages += [apply_choices(config)]
if config.art:
stages += [fetch_art(config)]
stages += [finalize(config)]
pl = pipeline.Pipeline(stages)
# Run the pipeline.
try:
if config.threaded:
pl.run_parallel(QUEUE_SIZE)
else:
pl.run_sequential()
except ImportAbort:
# User aborted operation. Silently stop.
pass

File diff suppressed because it is too large Load Diff

View File

@@ -1,885 +0,0 @@
# This file is part of beets.
# Copyright 2011, Adrian Sampson.
#
# Permission is hereby granted, free of charge, to any person obtaining
# a copy of this software and associated documentation files (the
# "Software"), to deal in the Software without restriction, including
# without limitation the rights to use, copy, modify, merge, publish,
# distribute, sublicense, and/or sell copies of the Software, and to
# permit persons to whom the Software is furnished to do so, subject to
# the following conditions:
#
# The above copyright notice and this permission notice shall be
# included in all copies or substantial portions of the Software.
"""Handles low-level interfacing for files' tags. Wraps Mutagen to
automatically detect file types and provide a unified interface for a
useful subset of music files' tags.
Usage:
>>> f = MediaFile('Lucy.mp3')
>>> f.title
u'Lucy in the Sky with Diamonds'
>>> f.artist = 'The Beatles'
>>> f.save()
A field will always return a reasonable value of the correct type, even
if no tag is present. If no value is available, the value will be false
(e.g., zero or the empty string).
"""
import mutagen
import mutagen.mp3
import mutagen.oggvorbis
import mutagen.mp4
import mutagen.flac
import mutagen.monkeysaudio
import datetime
import re
import base64
import imghdr
import os
import logging
import traceback
from beets.util.enumeration import enum
__all__ = ['UnreadableFileError', 'FileTypeError', 'MediaFile']
# Logger.
log = logging.getLogger('beets')
# Exceptions.
# Raised for any file MediaFile can't read.
class UnreadableFileError(IOError):
pass
# Raised for files that don't seem to have a type MediaFile supports.
class FileTypeError(UnreadableFileError):
pass
# Constants.
# Human-readable type names.
TYPES = {
'mp3': 'MP3',
'mp4': 'AAC',
'ogg': 'OGG',
'flac': 'FLAC',
'ape': 'APE',
'wv': 'WavPack',
'mpc': 'Musepack',
}
# Utility.
def _safe_cast(out_type, val):
"""Tries to covert val to out_type but will never raise an
exception. If the value can't be converted, then a sensible
default value is returned. out_type should be bool, int, or
unicode; otherwise, the value is just passed through.
"""
if out_type == int:
if val is None:
return 0
elif isinstance(val, int) or isinstance(val, float):
# Just a number.
return int(val)
else:
# Process any other type as a string.
if not isinstance(val, basestring):
val = unicode(val)
# Get a number from the front of the string.
val = re.match('[0-9]*', val.strip()).group(0)
if not val:
return 0
else:
return int(val)
elif out_type == bool:
if val is None:
return False
else:
try:
# Should work for strings, bools, ints:
return bool(int(val))
except ValueError:
return False
elif out_type == unicode:
if val is None:
return u''
else:
return unicode(val)
else:
return val
# Flags for encoding field behavior.
# Determine style of packing, if any.
packing = enum('SLASHED', # pair delimited by /
'TUPLE', # a python tuple of 2 items
'DATE', # YYYY-MM-DD
name='packing')
class StorageStyle(object):
"""Parameterizes the storage behavior of a single field for a
certain tag format.
- key: The Mutagen key used to access the field's data.
- list_elem: Store item as a single object or as first element
of a list.
- as_type: Which type the value is stored as (unicode, int,
bool, or str).
- packing: If this value is packed in a multiple-value storage
unit, which type of packing (in the packing enum). Otherwise,
None. (Makes as_type irrelevant).
- pack_pos: If the value is packed, in which position it is
stored.
- ID3 storage only: match against this 'desc' field as well
as the key.
"""
def __init__(self, key, list_elem = True, as_type = unicode,
packing = None, pack_pos = 0, id3_desc = None,
id3_frame_field = 'text'):
self.key = key
self.list_elem = list_elem
self.as_type = as_type
self.packing = packing
self.pack_pos = pack_pos
self.id3_desc = id3_desc
self.id3_frame_field = id3_frame_field
# Dealing with packings.
class Packed(object):
"""Makes a packed list of values subscriptable. To access the packed
output after making changes, use packed_thing.items.
"""
def __init__(self, items, packstyle, none_val=0, out_type=int):
"""Create a Packed object for subscripting the packed values in
items. The items are packed using packstyle, which is a value
from the packing enum. none_val is returned from a request when
no suitable value is found in the items. Vales are converted to
out_type before they are returned.
"""
self.items = items
self.packstyle = packstyle
self.none_val = none_val
self.out_type = out_type
def __getitem__(self, index):
if not isinstance(index, int):
raise TypeError('index must be an integer')
if self.items is None:
return self.none_val
items = self.items
if self.packstyle == packing.DATE:
# Remove time information from dates. Usually delimited by
# a "T" or a space.
items = re.sub(r'[Tt ].*$', '', unicode(items))
# transform from a string packing into a list we can index into
if self.packstyle == packing.SLASHED:
seq = unicode(items).split('/')
elif self.packstyle == packing.DATE:
seq = unicode(items).split('-')
elif self.packstyle == packing.TUPLE:
seq = items # tuple: items is already indexable
try:
out = seq[index]
except:
out = None
if out is None or out == self.none_val or out == '':
return _safe_cast(self.out_type, self.none_val)
else:
return _safe_cast(self.out_type, out)
def __setitem__(self, index, value):
if self.packstyle in (packing.SLASHED, packing.TUPLE):
# SLASHED and TUPLE are always two-item packings
length = 2
else:
# DATE can have up to three fields
length = 3
# make a list of the items we'll pack
new_items = []
for i in range(length):
if i == index:
next_item = value
else:
next_item = self[i]
new_items.append(next_item)
if self.packstyle == packing.DATE:
# Truncate the items wherever we reach an invalid (none)
# entry. This prevents dates like 2008-00-05.
for i, item in enumerate(new_items):
if item == self.none_val or item is None:
del(new_items[i:]) # truncate
break
if self.packstyle == packing.SLASHED:
self.items = '/'.join(map(unicode, new_items))
elif self.packstyle == packing.DATE:
field_lengths = [4, 2, 2] # YYYY-MM-DD
elems = []
for i, item in enumerate(new_items):
elems.append( ('%0' + str(field_lengths[i]) + 'i') % item )
self.items = '-'.join(elems)
elif self.packstyle == packing.TUPLE:
self.items = new_items
# The field itself.
class MediaField(object):
"""A descriptor providing access to a particular (abstract) metadata
field. out_type is the type that users of MediaFile should see and
can be unicode, int, or bool. id3, mp4, and flac are StorageStyle
instances parameterizing the field's storage for each type.
"""
def __init__(self, out_type = unicode, **kwargs):
"""Creates a new MediaField.
- out_type: The field's semantic (exterior) type.
- kwargs: A hash whose keys are 'mp3', 'mp4', and 'etc'
and whose values are StorageStyle instances
parameterizing the field's storage for each type.
"""
self.out_type = out_type
if not set(['mp3', 'mp4', 'etc']) == set(kwargs):
raise TypeError('MediaField constructor must have keyword '
'arguments mp3, mp4, and etc')
self.styles = kwargs
def _fetchdata(self, obj, style):
"""Get the value associated with this descriptor's field stored
with the given StorageStyle. Unwraps from a list if necessary.
"""
# fetch the value, which may be a scalar or a list
if obj.type == 'mp3':
if style.id3_desc is not None: # also match on 'desc' field
frames = obj.mgfile.tags.getall(style.key)
entry = None
for frame in frames:
if frame.desc == style.id3_desc:
entry = getattr(frame, style.id3_frame_field)
break
if entry is None: # no desc match
return None
else:
# Get the metadata frame object.
try:
frame = obj.mgfile[style.key]
except KeyError:
return None
entry = getattr(frame, style.id3_frame_field)
else: # Not MP3.
try:
entry = obj.mgfile[style.key]
except KeyError:
return None
# possibly index the list
if style.list_elem:
if entry: # List must have at least one value.
return entry[0]
else:
return None
else:
return entry
def _storedata(self, obj, val, style):
"""Store val for this descriptor's field in the tag dictionary
according to the provided StorageStyle. Store it as a
single-item list if necessary.
"""
# wrap as a list if necessary
if style.list_elem: out = [val]
else: out = val
if obj.type == 'mp3':
# Try to match on "desc" field.
if style.id3_desc is not None:
frames = obj.mgfile.tags.getall(style.key)
# try modifying in place
found = False
for frame in frames:
if frame.desc == style.id3_desc:
setattr(frame, style.id3_frame_field, out)
found = True
break
# need to make a new frame?
if not found:
assert isinstance(style.id3_frame_field, str) # Keyword.
frame = mutagen.id3.Frames[style.key](
encoding=3,
desc=style.id3_desc,
**{style.id3_frame_field: val}
)
obj.mgfile.tags.add(frame)
# Try to match on "owner" field.
elif style.key.startswith('UFID:'):
owner = style.key.split(':', 1)[1]
frames = obj.mgfile.tags.getall(style.key)
for frame in frames:
# Replace existing frame data.
if frame.owner == owner:
setattr(frame, style.id3_frame_field, val)
else:
# New frame.
assert isinstance(style.id3_frame_field, str) # Keyword.
frame = mutagen.id3.UFID(owner=owner,
**{style.id3_frame_field: val})
obj.mgfile.tags.setall('UFID', [frame])
# Just replace based on key.
else:
assert isinstance(style.id3_frame_field, str) # Keyword.
frame = mutagen.id3.Frames[style.key](encoding = 3,
**{style.id3_frame_field: val})
obj.mgfile.tags.setall(style.key, [frame])
else: # Not MP3.
obj.mgfile[style.key] = out
def _styles(self, obj):
if obj.type in ('mp3', 'mp4'):
styles = self.styles[obj.type]
else:
styles = self.styles['etc'] # sane styles
# Make sure we always return a list of styles, even when given
# a single style for convenience.
if isinstance(styles, StorageStyle):
return [styles]
else:
return styles
def __get__(self, obj, owner):
"""Retrieve the value of this metadata field.
"""
# Fetch the data using the various StorageStyles.
styles = self._styles(obj)
for style in styles:
# Use the first style that returns a reasonable value.
out = self._fetchdata(obj, style)
if out:
break
if style.packing:
out = Packed(out, style.packing)[style.pack_pos]
# MPEG-4 freeform frames are (should be?) encoded as UTF-8.
if obj.type == 'mp4' and style.key.startswith('----:') and \
isinstance(out, str):
out = out.decode('utf8')
return _safe_cast(self.out_type, out)
def __set__(self, obj, val):
"""Set the value of this metadata field.
"""
# Store using every StorageStyle available.
styles = self._styles(obj)
for style in styles:
if style.packing:
p = Packed(self._fetchdata(obj, style), style.packing)
p[style.pack_pos] = val
out = p.items
else: # unicode, integer, or boolean scalar
out = val
# deal with Nones according to abstract type if present
if out is None:
if self.out_type == int:
out = 0
elif self.out_type == bool:
out = False
elif self.out_type == unicode:
out = u''
# We trust that packed values are handled above.
# Convert to correct storage type (irrelevant for
# packed values).
if style.as_type == unicode:
if out is None:
out = u''
else:
if self.out_type == bool:
# store bools as 1,0 instead of True,False
out = unicode(int(out))
else:
out = unicode(out)
elif style.as_type == int:
if out is None:
out = 0
else:
out = int(out)
elif style.as_type in (bool, str):
out = style.as_type(out)
# MPEG-4 "freeform" (----) frames must be encoded as UTF-8
# byte strings.
if obj.type == 'mp4' and style.key.startswith('----:') and \
isinstance(out, unicode):
out = out.encode('utf8')
# Store the data.
self._storedata(obj, out, style)
class CompositeDateField(object):
"""A MediaFile field for conveniently accessing the year, month, and
day fields as a datetime.date object. Allows both getting and
setting of the component fields.
"""
def __init__(self, year_field, month_field, day_field):
"""Create a new date field from the indicated MediaFields for
the component values.
"""
self.year_field = year_field
self.month_field = month_field
self.day_field = day_field
def __get__(self, obj, owner):
"""Return a datetime.date object whose components indicating the
smallest valid date whose components are at least as large as
the three component fields (that is, if year == 1999, month == 0,
and day == 0, then date == datetime.date(1999, 1, 1)). If the
components indicate an invalid date (e.g., if month == 47),
datetime.date.min is returned.
"""
try:
return datetime.date(
max(self.year_field.__get__(obj, owner), datetime.MINYEAR),
max(self.month_field.__get__(obj, owner), 1),
max(self.day_field.__get__(obj, owner), 1)
)
except ValueError: # Out of range values.
return datetime.date.min
def __set__(self, obj, val):
"""Set the year, month, and day fields to match the components of
the provided datetime.date object.
"""
self.year_field.__set__(obj, val.year)
self.month_field.__set__(obj, val.month)
self.day_field.__set__(obj, val.day)
class ImageField(object):
"""A descriptor providing access to a file's embedded album art.
Holds a bytestring reflecting the image data. The image should
either be a JPEG or a PNG for cross-format compatibility. It's
probably a bad idea to use anything but these two formats.
"""
@classmethod
def _mime(cls, data):
"""Return the MIME type (either image/png or image/jpeg) of the
image data (a bytestring).
"""
kind = imghdr.what(None, h=data)
if kind == 'png':
return 'image/png'
else:
# Currently just fall back to JPEG.
return 'image/jpeg'
@classmethod
def _mp4kind(cls, data):
"""Return the MPEG-4 image type code of the data. If the image
is not a PNG or JPEG, JPEG is assumed.
"""
kind = imghdr.what(None, h=data)
if kind == 'png':
return mutagen.mp4.MP4Cover.FORMAT_PNG
else:
return mutagen.mp4.MP4Cover.FORMAT_JPEG
def __get__(self, obj, owner):
if obj.type == 'mp3':
# Look for APIC frames.
for frame in obj.mgfile.tags.values():
if frame.FrameID == 'APIC':
picframe = frame
break
else:
# No APIC frame.
return None
return picframe.data
elif obj.type == 'mp4':
if 'covr' in obj.mgfile:
covers = obj.mgfile['covr']
if covers:
cover = covers[0]
# cover is an MP4Cover, which is a subclass of str.
return cover
# No cover found.
return None
else:
# Here we're assuming everything but MP3 and MPEG-4 uses
# the Xiph/Vorbis Comments standard. This may not be valid.
# http://wiki.xiph.org/VorbisComment#Cover_art
if 'metadata_block_picture' not in obj.mgfile:
# Try legacy COVERART tags.
if 'coverart' in obj.mgfile and obj.mgfile['coverart']:
return base64.b64decode(obj.mgfile['coverart'][0])
return None
for data in obj.mgfile["metadata_block_picture"]:
try:
pic = mutagen.flac.Picture(base64.b64decode(data))
break
except TypeError:
pass
else:
return None
return pic.data
def __set__(self, obj, val):
if val is not None:
if not isinstance(val, str):
raise ValueError('value must be a byte string or None')
if obj.type == 'mp3':
# Clear all APIC frames.
obj.mgfile.tags.delall('APIC')
if val is None:
# If we're clearing the image, we're done.
return
picframe = mutagen.id3.APIC(
encoding = 3,
mime = self._mime(val),
type = 3, # front cover
desc = u'',
data = val,
)
obj.mgfile['APIC'] = picframe
elif obj.type == 'mp4':
if val is None:
if 'covr' in obj.mgfile:
del obj.mgfile['covr']
else:
cover = mutagen.mp4.MP4Cover(val, self._mp4kind(val))
obj.mgfile['covr'] = [cover]
else:
# Again, assuming Vorbis Comments standard.
# Strip all art, including legacy COVERART.
if 'metadata_block_picture' in obj.mgfile:
if 'metadata_block_picture' in obj.mgfile:
del obj.mgfile['metadata_block_picture']
if 'coverart' in obj.mgfile:
del obj.mgfile['coverart']
if 'coverartmime' in obj.mgfile:
del obj.mgfile['coverartmime']
# Add new art if provided.
if val is not None:
pic = mutagen.flac.Picture()
pic.data = val
pic.mime = self._mime(val)
obj.mgfile['metadata_block_picture'] = [
base64.b64encode(pic.write())
]
# The file (a collection of fields).
class MediaFile(object):
"""Represents a multimedia file on disk and provides access to its
metadata.
"""
def __init__(self, path):
"""Constructs a new MediaFile reflecting the file at path. May
throw UnreadableFileError.
"""
self.path = path
unreadable_exc = (
mutagen.mp3.HeaderNotFoundError,
mutagen.flac.FLACNoHeaderError,
mutagen.monkeysaudio.MonkeysAudioHeaderError,
mutagen.mp4.MP4StreamInfoError,
mutagen.oggvorbis.OggVorbisHeaderError,
)
try:
self.mgfile = mutagen.File(path)
except unreadable_exc:
log.warn('header parsing failed')
raise UnreadableFileError('Mutagen could not read file')
except IOError:
raise UnreadableFileError('could not read file')
except:
# Hide bugs in Mutagen.
log.error('uncaught Mutagen exception:\n' + traceback.format_exc())
raise UnreadableFileError('Mutagen raised an exception')
if self.mgfile is None: # Mutagen couldn't guess the type
raise FileTypeError('file type unsupported by Mutagen')
elif type(self.mgfile).__name__ == 'M4A' or \
type(self.mgfile).__name__ == 'MP4':
self.type = 'mp4'
elif type(self.mgfile).__name__ == 'ID3' or \
type(self.mgfile).__name__ == 'MP3':
self.type = 'mp3'
elif type(self.mgfile).__name__ == 'FLAC':
self.type = 'flac'
elif type(self.mgfile).__name__ == 'OggVorbis':
self.type = 'ogg'
elif type(self.mgfile).__name__ == 'MonkeysAudio':
self.type = 'ape'
elif type(self.mgfile).__name__ == 'WavPack':
self.type = 'wv'
elif type(self.mgfile).__name__ == 'Musepack':
self.type = 'mpc'
else:
raise FileTypeError('file type %s unsupported by MediaFile' %
type(self.mgfile).__name__)
# add a set of tags if it's missing
if self.mgfile.tags is None:
self.mgfile.add_tags()
def save(self):
self.mgfile.save()
#### field definitions ####
title = MediaField(
mp3 = StorageStyle('TIT2'),
mp4 = StorageStyle("\xa9nam"),
etc = StorageStyle('title'),
)
artist = MediaField(
mp3 = StorageStyle('TPE1'),
mp4 = StorageStyle("\xa9ART"),
etc = StorageStyle('artist'),
)
album = MediaField(
mp3 = StorageStyle('TALB'),
mp4 = StorageStyle("\xa9alb"),
etc = StorageStyle('album'),
)
genre = MediaField(
mp3 = StorageStyle('TCON'),
mp4 = StorageStyle("\xa9gen"),
etc = StorageStyle('genre'),
)
composer = MediaField(
mp3 = StorageStyle('TCOM'),
mp4 = StorageStyle("\xa9wrt"),
etc = StorageStyle('composer'),
)
grouping = MediaField(
mp3 = StorageStyle('TIT1'),
mp4 = StorageStyle("\xa9grp"),
etc = StorageStyle('grouping'),
)
year = MediaField(out_type=int,
mp3 = StorageStyle('TDRC',
packing = packing.DATE,
pack_pos = 0),
mp4 = StorageStyle("\xa9day",
packing = packing.DATE,
pack_pos = 0),
etc = [StorageStyle('date',
packing = packing.DATE,
pack_pos = 0),
StorageStyle('year')]
)
month = MediaField(out_type=int,
mp3 = StorageStyle('TDRC',
packing = packing.DATE,
pack_pos = 1),
mp4 = StorageStyle("\xa9day",
packing = packing.DATE,
pack_pos = 1),
etc = StorageStyle('date',
packing = packing.DATE,
pack_pos = 1)
)
day = MediaField(out_type=int,
mp3 = StorageStyle('TDRC',
packing = packing.DATE,
pack_pos = 2),
mp4 = StorageStyle("\xa9day",
packing = packing.DATE,
pack_pos = 2),
etc = StorageStyle('date',
packing = packing.DATE,
pack_pos = 2)
)
date = CompositeDateField(year, month, day)
track = MediaField(out_type = int,
mp3 = StorageStyle('TRCK',
packing = packing.SLASHED,
pack_pos = 0),
mp4 = StorageStyle('trkn',
packing = packing.TUPLE,
pack_pos = 0),
etc = [StorageStyle('track'),
StorageStyle('tracknumber')]
)
tracktotal = MediaField(out_type = int,
mp3 = StorageStyle('TRCK',
packing = packing.SLASHED,
pack_pos = 1),
mp4 = StorageStyle('trkn',
packing = packing.TUPLE,
pack_pos = 1),
etc = [StorageStyle('tracktotal'),
StorageStyle('trackc'),
StorageStyle('totaltracks')]
)
disc = MediaField(out_type = int,
mp3 = StorageStyle('TPOS',
packing = packing.SLASHED,
pack_pos = 0),
mp4 = StorageStyle('disk',
packing = packing.TUPLE,
pack_pos = 0),
etc = [StorageStyle('disc'),
StorageStyle('discnumber')]
)
disctotal = MediaField(out_type = int,
mp3 = StorageStyle('TPOS',
packing = packing.SLASHED,
pack_pos = 1),
mp4 = StorageStyle('disk',
packing = packing.TUPLE,
pack_pos = 1),
etc = [StorageStyle('disctotal'),
StorageStyle('discc'),
StorageStyle('totaldiscs')]
)
lyrics = MediaField(
mp3 = StorageStyle('USLT',
list_elem = False,
id3_desc = u''),
mp4 = StorageStyle("\xa9lyr"),
etc = StorageStyle('lyrics')
)
comments = MediaField(
mp3 = StorageStyle('COMM', id3_desc = u''),
mp4 = StorageStyle("\xa9cmt"),
etc = [StorageStyle('description'),
StorageStyle('comment')]
)
bpm = MediaField(out_type = int,
mp3 = StorageStyle('TBPM'),
mp4 = StorageStyle('tmpo', as_type = int),
etc = StorageStyle('bpm')
)
comp = MediaField(out_type = bool,
mp3 = StorageStyle('TCMP'),
mp4 = StorageStyle('cpil',
list_elem = False,
as_type = bool),
etc = StorageStyle('compilation')
)
albumartist = MediaField(
mp3 = StorageStyle('TPE2'),
mp4 = StorageStyle('aART'),
etc = [StorageStyle('album artist'),
StorageStyle('albumartist')]
)
albumtype = MediaField(
mp3 = StorageStyle('TXXX', id3_desc=u'MusicBrainz Album Type'),
mp4 = StorageStyle(
'----:com.apple.iTunes:MusicBrainz Album Type'),
etc = StorageStyle('musicbrainz_albumtype')
)
label = MediaField(
mp3 = StorageStyle('TPUB'),
mp4 = [StorageStyle('----:com.apple.iTunes:Label'),
StorageStyle('----:com.apple.iTunes:publisher')],
etc = [StorageStyle('label'),
StorageStyle('publisher')] # Traktor
)
# Album art.
art = ImageField()
# MusicBrainz IDs.
mb_trackid = MediaField(
mp3 = StorageStyle('UFID:http://musicbrainz.org',
list_elem = False,
id3_frame_field = 'data'),
mp4 = StorageStyle(
'----:com.apple.iTunes:MusicBrainz Track Id',
as_type=str),
etc = StorageStyle('musicbrainz_trackid')
)
mb_albumid = MediaField(
mp3 = StorageStyle('TXXX', id3_desc=u'MusicBrainz Album Id'),
mp4 = StorageStyle(
'----:com.apple.iTunes:MusicBrainz Album Id',
as_type=str),
etc = StorageStyle('musicbrainz_albumid')
)
mb_artistid = MediaField(
mp3 = StorageStyle('TXXX', id3_desc=u'MusicBrainz Artist Id'),
mp4 = StorageStyle(
'----:com.apple.iTunes:MusicBrainz Artist Id',
as_type=str),
etc = StorageStyle('musicbrainz_artistid')
)
mb_albumartistid = MediaField(
mp3 = StorageStyle('TXXX',
id3_desc=u'MusicBrainz Album Artist Id'),
mp4 = StorageStyle(
'----:com.apple.iTunes:MusicBrainz Album Artist Id',
as_type=str),
etc = StorageStyle('musicbrainz_albumartistid')
)
@property
def length(self):
return self.mgfile.info.length
@property
def bitrate(self):
if hasattr(self.mgfile.info, 'bitrate'):
# Many formats provide it explicitly.
return self.mgfile.info.bitrate
else:
# Otherwise, we calculate bitrate from the file size. (This
# is the case for all of the lossless formats.)
size = os.path.getsize(self.path)
return int(size * 8 / self.length)
@property
def format(self):
return TYPES[self.type]

View File

@@ -1,632 +0,0 @@
# This file is part of beets.
# Copyright 2011, Adrian Sampson.
#
# Permission is hereby granted, free of charge, to any person obtaining
# a copy of this software and associated documentation files (the
# "Software"), to deal in the Software without restriction, including
# without limitation the rights to use, copy, modify, merge, publish,
# distribute, sublicense, and/or sell copies of the Software, and to
# permit persons to whom the Software is furnished to do so, subject to
# the following conditions:
#
# The above copyright notice and this permission notice shall be
# included in all copies or substantial portions of the Software.
"""This module contains all of the core logic for beets' command-line
interface. To invoke the CLI, just call beets.ui.main(). The actual
CLI commands are implemented in the ui.commands module.
"""
import os
import locale
import optparse
import textwrap
import ConfigParser
import sys
from difflib import SequenceMatcher
import logging
import sqlite3
import errno
from beets import library
from beets import plugins
from beets import util
# Constants.
CONFIG_PATH_VAR = 'BEETSCONFIG'
DEFAULT_CONFIG_FILE = os.path.expanduser('~/.beetsconfig')
DEFAULT_LIBRARY = '~/.beetsmusic.blb'
DEFAULT_DIRECTORY = '~/Music'
DEFAULT_PATH_FORMATS = {
'default': '$albumartist/$album/$track $title',
'comp': 'Compilations/$album/$track $title',
'singleton': 'Non-Album/$artist/$title',
}
DEFAULT_ART_FILENAME = 'cover'
# UI exception. Commands should throw this in order to display
# nonrecoverable errors to the user.
class UserError(Exception):
pass
# Utilities.
def _encoding():
"""Tries to guess the encoding uses by the terminal."""
try:
return locale.getdefaultlocale()[1] or 'utf8'
except ValueError:
# Invalid locale environment variable setting. To avoid
# failing entirely for no good reason, assume UTF-8.
return 'utf8'
def decargs(arglist):
"""Given a list of command-line argument bytestrings, attempts to
decode them to Unicode strings.
"""
return [s.decode(_encoding()) for s in arglist]
def print_(*strings):
"""Like print, but rather than raising an error when a character
is not in the terminal's encoding's character set, just silently
replaces it.
"""
if strings:
if isinstance(strings[0], unicode):
txt = u' '.join(strings)
else:
txt = ' '.join(strings)
else:
txt = u''
if isinstance(txt, unicode):
txt = txt.encode(_encoding(), 'replace')
print txt
def input_options(options, require=False, prompt=None, fallback_prompt=None,
numrange=None, default=None, color=False, max_width=72):
"""Prompts a user for input. The sequence of `options` defines the
choices the user has. A single-letter shortcut is inferred for each
option; the user's choice is returned as that single, lower-case
letter. The options should be provided as lower-case strings unless
a particular shortcut is desired; in that case, only that letter
should be capitalized.
By default, the first option is the default. If `require` is
provided, then there is no default. `default` can be provided to
override this. The prompt and fallback prompt are also inferred but
can be overridden.
If numrange is provided, it is a pair of `(high, low)` (both ints)
indicating that, in addition to `options`, the user may enter an
integer in that inclusive range.
`max_width` specifies the maximum number of columns in the
automatically generated prompt string.
"""
# Assign single letters to each option. Also capitalize the options
# to indicate the letter.
letters = {}
display_letters = []
capitalized = []
first = True
for option in options:
# Is a letter already capitalized?
for letter in option:
if letter.isalpha() and letter.upper() == letter:
found_letter = letter
break
else:
# Infer a letter.
for letter in option:
if not letter.isalpha():
continue # Don't use punctuation.
if letter not in letters:
found_letter = letter
break
else:
raise ValueError('no unambiguous lettering found')
letters[found_letter.lower()] = option
index = option.index(found_letter)
# Mark the option's shortcut letter for display.
if (default is None and not numrange and first) \
or (isinstance(default, basestring) and
found_letter.lower() == default.lower()):
# The first option is the default; mark it.
show_letter = '[%s]' % found_letter.upper()
is_default = True
else:
show_letter = found_letter.upper()
is_default = False
# Possibly colorize the letter shortcut.
if color:
color = 'turquoise' if is_default else 'blue'
show_letter = colorize(color, show_letter)
# Insert the highlighted letter back into the word.
capitalized.append(
option[:index] + show_letter + option[index+1:]
)
display_letters.append(found_letter.upper())
first = False
# The default is just the first option if unspecified.
if default is None:
if require:
default = None
elif numrange:
default = numrange[0]
else:
default = display_letters[0].lower()
# Make a prompt if one is not provided.
if not prompt:
prompt_parts = []
prompt_part_lengths = []
if numrange:
if isinstance(default, int):
default_name = str(default)
if color:
default_name = colorize('turquoise', default_name)
tmpl = '# selection (default %s)'
prompt_parts.append(tmpl % default_name)
prompt_part_lengths.append(len(tmpl % str(default)))
else:
prompt_parts.append('# selection')
prompt_part_lengths.append(prompt_parts[-1])
prompt_parts += capitalized
prompt_part_lengths += [len(s) for s in options]
# Wrap the query text.
prompt = ''
line_length = 0
for i, (part, length) in enumerate(zip(prompt_parts,
prompt_part_lengths)):
# Add punctuation.
if i == len(prompt_parts) - 1:
part += '?'
else:
part += ','
length += 1
# Choose either the current line or the beginning of the next.
if line_length + length + 1 > max_width:
prompt += '\n'
line_length = 0
if line_length != 0:
# Not the beginning of the line; need a space.
part = ' ' + part
length += 1
prompt += part
line_length += length
# Make a fallback prompt too. This is displayed if the user enters
# something that is not recognized.
if not fallback_prompt:
fallback_prompt = 'Enter one of '
if numrange:
fallback_prompt += '%i-%i, ' % numrange
fallback_prompt += ', '.join(display_letters) + ':'
# (raw_input(prompt) was causing problems with colors.)
print prompt,
resp = raw_input()
while True:
resp = resp.strip().lower()
# Try default option.
if default is not None and not resp:
resp = default
# Try an integer input if available.
if numrange:
try:
resp = int(resp)
except ValueError:
pass
else:
low, high = numrange
if low <= resp <= high:
return resp
else:
resp = None
# Try a normal letter input.
if resp:
resp = resp[0]
if resp in letters:
return resp
# Prompt for new input.
print fallback_prompt,
resp = raw_input()
def input_yn(prompt, require=False, color=False):
"""Prompts the user for a "yes" or "no" response. The default is
"yes" unless `require` is `True`, in which case there is no default.
"""
sel = input_options(
('y', 'n'), require, prompt, 'Enter Y or N:', color=color
)
return sel == 'y'
def config_val(config, section, name, default, vtype=None):
"""Queries the configuration file for a value (given by the
section and name). If no value is present, returns default.
vtype optionally specifies the return type (although only bool
is supported for now).
"""
if not config.has_section(section):
config.add_section(section)
try:
if vtype is bool:
return config.getboolean(section, name)
else:
return config.get(section, name)
except ConfigParser.NoOptionError:
return default
def human_bytes(size):
"""Formats size, a number of bytes, in a human-readable way."""
suffices = ['B', 'KB', 'MB', 'GB', 'TB', 'PB', 'EB', 'ZB', 'YB', 'HB']
for suffix in suffices:
if size < 1024:
return "%3.1f %s" % (size, suffix)
size /= 1024.0
return "big"
def human_seconds(interval):
"""Formats interval, a number of seconds, as a human-readable time
interval.
"""
units = [
(1, 'second'),
(60, 'minute'),
(60, 'hour'),
(24, 'day'),
(7, 'week'),
(52, 'year'),
(10, 'decade'),
]
for i in range(len(units)-1):
increment, suffix = units[i]
next_increment, _ = units[i+1]
interval /= float(increment)
if interval < next_increment:
break
else:
# Last unit.
increment, suffix = units[-1]
interval /= float(increment)
return "%3.1f %ss" % (interval, suffix)
# ANSI terminal colorization code heavily inspired by pygments:
# http://dev.pocoo.org/hg/pygments-main/file/b2deea5b5030/pygments/console.py
# (pygments is by Tim Hatch, Armin Ronacher, et al.)
COLOR_ESCAPE = "\x1b["
DARK_COLORS = ["black", "darkred", "darkgreen", "brown", "darkblue",
"purple", "teal", "lightgray"]
LIGHT_COLORS = ["darkgray", "red", "green", "yellow", "blue",
"fuchsia", "turquoise", "white"]
RESET_COLOR = COLOR_ESCAPE + "39;49;00m"
def colorize(color, text):
"""Returns a string that prints the given text in the given color
in a terminal that is ANSI color-aware. The color must be something
in DARK_COLORS or LIGHT_COLORS.
"""
if color in DARK_COLORS:
escape = COLOR_ESCAPE + "%im" % (DARK_COLORS.index(color) + 30)
elif color in LIGHT_COLORS:
escape = COLOR_ESCAPE + "%i;01m" % (LIGHT_COLORS.index(color) + 30)
else:
raise ValueError('no such color %s', color)
return escape + text + RESET_COLOR
def colordiff(a, b, highlight='red'):
"""Given two values, return the same pair of strings except with
their differences highlighted in the specified color. Strings are
highlighted intelligently to show differences; other values are
stringified and highlighted in their entirety.
"""
if not isinstance(a, basestring) or not isinstance(b, basestring):
# Non-strings: use ordinary equality.
a = unicode(a)
b = unicode(b)
if a == b:
return a, b
else:
return colorize(highlight, a), colorize(highlight, b)
a_out = []
b_out = []
matcher = SequenceMatcher(lambda x: False, a, b)
for op, a_start, a_end, b_start, b_end in matcher.get_opcodes():
if op == 'equal':
# In both strings.
a_out.append(a[a_start:a_end])
b_out.append(b[b_start:b_end])
elif op == 'insert':
# Right only.
b_out.append(colorize(highlight, b[b_start:b_end]))
elif op == 'delete':
# Left only.
a_out.append(colorize(highlight, a[a_start:a_end]))
elif op == 'replace':
# Right and left differ.
a_out.append(colorize(highlight, a[a_start:a_end]))
b_out.append(colorize(highlight, b[b_start:b_end]))
else:
assert(False)
return u''.join(a_out), u''.join(b_out)
# Subcommand parsing infrastructure.
# This is a fairly generic subcommand parser for optparse. It is
# maintained externally here:
# http://gist.github.com/462717
# There you will also find a better description of the code and a more
# succinct example program.
class Subcommand(object):
"""A subcommand of a root command-line application that may be
invoked by a SubcommandOptionParser.
"""
def __init__(self, name, parser=None, help='', aliases=()):
"""Creates a new subcommand. name is the primary way to invoke
the subcommand; aliases are alternate names. parser is an
OptionParser responsible for parsing the subcommand's options.
help is a short description of the command. If no parser is
given, it defaults to a new, empty OptionParser.
"""
self.name = name
self.parser = parser or optparse.OptionParser()
self.aliases = aliases
self.help = help
class SubcommandsOptionParser(optparse.OptionParser):
"""A variant of OptionParser that parses subcommands and their
arguments.
"""
# A singleton command used to give help on other subcommands.
_HelpSubcommand = Subcommand('help', optparse.OptionParser(),
help='give detailed help on a specific sub-command',
aliases=('?',))
def __init__(self, *args, **kwargs):
"""Create a new subcommand-aware option parser. All of the
options to OptionParser.__init__ are supported in addition
to subcommands, a sequence of Subcommand objects.
"""
# The subcommand array, with the help command included.
self.subcommands = list(kwargs.pop('subcommands', []))
self.subcommands.append(self._HelpSubcommand)
# A more helpful default usage.
if 'usage' not in kwargs:
kwargs['usage'] = """
%prog COMMAND [ARGS...]
%prog help COMMAND"""
# Super constructor.
optparse.OptionParser.__init__(self, *args, **kwargs)
# Adjust the help-visible name of each subcommand.
for subcommand in self.subcommands:
subcommand.parser.prog = '%s %s' % \
(self.get_prog_name(), subcommand.name)
# Our root parser needs to stop on the first unrecognized argument.
self.disable_interspersed_args()
def add_subcommand(self, cmd):
"""Adds a Subcommand object to the parser's list of commands.
"""
self.subcommands.append(cmd)
# Add the list of subcommands to the help message.
def format_help(self, formatter=None):
# Get the original help message, to which we will append.
out = optparse.OptionParser.format_help(self, formatter)
if formatter is None:
formatter = self.formatter
# Subcommands header.
result = ["\n"]
result.append(formatter.format_heading('Commands'))
formatter.indent()
# Generate the display names (including aliases).
# Also determine the help position.
disp_names = []
help_position = 0
for subcommand in self.subcommands:
name = subcommand.name
if subcommand.aliases:
name += ' (%s)' % ', '.join(subcommand.aliases)
disp_names.append(name)
# Set the help position based on the max width.
proposed_help_position = len(name) + formatter.current_indent + 2
if proposed_help_position <= formatter.max_help_position:
help_position = max(help_position, proposed_help_position)
# Add each subcommand to the output.
for subcommand, name in zip(self.subcommands, disp_names):
# Lifted directly from optparse.py.
name_width = help_position - formatter.current_indent - 2
if len(name) > name_width:
name = "%*s%s\n" % (formatter.current_indent, "", name)
indent_first = help_position
else:
name = "%*s%-*s " % (formatter.current_indent, "",
name_width, name)
indent_first = 0
result.append(name)
help_width = formatter.width - help_position
help_lines = textwrap.wrap(subcommand.help, help_width)
result.append("%*s%s\n" % (indent_first, "", help_lines[0]))
result.extend(["%*s%s\n" % (help_position, "", line)
for line in help_lines[1:]])
formatter.dedent()
# Concatenate the original help message with the subcommand
# list.
return out + "".join(result)
def _subcommand_for_name(self, name):
"""Return the subcommand in self.subcommands matching the
given name. The name may either be the name of a subcommand or
an alias. If no subcommand matches, returns None.
"""
for subcommand in self.subcommands:
if name == subcommand.name or \
name in subcommand.aliases:
return subcommand
return None
def parse_args(self, a=None, v=None):
"""Like OptionParser.parse_args, but returns these four items:
- options: the options passed to the root parser
- subcommand: the Subcommand object that was invoked
- suboptions: the options passed to the subcommand parser
- subargs: the positional arguments passed to the subcommand
"""
options, args = optparse.OptionParser.parse_args(self, a, v)
if not args:
# No command given.
self.print_help()
self.exit()
else:
cmdname = args.pop(0)
subcommand = self._subcommand_for_name(cmdname)
if not subcommand:
self.error('unknown command ' + cmdname)
suboptions, subargs = subcommand.parser.parse_args(args)
if subcommand is self._HelpSubcommand:
if subargs:
# particular
cmdname = subargs[0]
helpcommand = self._subcommand_for_name(cmdname)
helpcommand.parser.print_help()
self.exit()
else:
# general
self.print_help()
self.exit()
return options, subcommand, suboptions, subargs
# The root parser and its main function.
def main(args=None, configfh=None):
"""Run the main command-line interface for beets."""
# Get the default subcommands.
from beets.ui.commands import default_commands
# Read defaults from config file.
config = ConfigParser.SafeConfigParser()
if configfh:
configpath = None
elif CONFIG_PATH_VAR in os.environ:
configpath = os.path.expanduser(os.environ[CONFIG_PATH_VAR])
else:
configpath = DEFAULT_CONFIG_FILE
if configpath:
configpath = util.syspath(configpath)
if os.path.exists(util.syspath(configpath)):
configfh = open(configpath)
else:
configfh = None
if configfh:
config.readfp(configfh)
# Add plugin paths.
plugpaths = config_val(config, 'beets', 'pluginpath', '')
for plugpath in plugpaths.split(':'):
sys.path.append(os.path.expanduser(plugpath))
# Load requested plugins.
plugnames = config_val(config, 'beets', 'plugins', '')
plugins.load_plugins(plugnames.split())
plugins.load_listeners()
plugins.send("pluginload")
plugins.configure(config)
# Construct the root parser.
commands = list(default_commands)
commands += plugins.commands()
parser = SubcommandsOptionParser(subcommands=commands)
parser.add_option('-l', '--library', dest='libpath',
help='library database file to use')
parser.add_option('-d', '--directory', dest='directory',
help="destination music directory")
parser.add_option('-p', '--pathformat', dest='path_format',
help="destination path format string")
parser.add_option('-v', '--verbose', dest='verbose', action='store_true',
help='print debugging information')
# Parse the command-line!
options, subcommand, suboptions, subargs = parser.parse_args(args)
# Open library file.
libpath = options.libpath or \
config_val(config, 'beets', 'library', DEFAULT_LIBRARY)
directory = options.directory or \
config_val(config, 'beets', 'directory', DEFAULT_DIRECTORY)
legacy_path_format = config_val(config, 'beets', 'path_format', None)
if options.path_format:
# If given, -p overrides all path format settings
path_formats = {'default': options.path_format}
else:
if legacy_path_format:
# Old path formats override the default values.
path_formats = {'default': legacy_path_format}
else:
# If no legacy path format, use the defaults instead.
path_formats = DEFAULT_PATH_FORMATS
if config.has_section('paths'):
path_formats.update(config.items('paths'))
art_filename = \
config_val(config, 'beets', 'art_filename', DEFAULT_ART_FILENAME)
db_path = os.path.expanduser(libpath)
try:
lib = library.Library(db_path,
directory,
path_formats,
art_filename)
except sqlite3.OperationalError:
raise UserError("database file %s could not be opened" % db_path)
# Configure the logger.
log = logging.getLogger('beets')
if options.verbose:
log.setLevel(logging.DEBUG)
else:
log.setLevel(logging.INFO)
# Invoke the subcommand.
try:
subcommand.func(lib, config, suboptions, subargs)
except UserError, exc:
message = exc.args[0] if exc.args else None
subcommand.parser.error(message)
except IOError, exc:
if exc.errno == errno.EPIPE:
# "Broken pipe". End silently.
pass
else:
raise

File diff suppressed because it is too large Load Diff

View File

@@ -1,48 +0,0 @@
# This file is part of beets.
# Copyright 2011, Adrian Sampson.
#
# Permission is hereby granted, free of charge, to any person obtaining
# a copy of this software and associated documentation files (the
# "Software"), to deal in the Software without restriction, including
# without limitation the rights to use, copy, modify, merge, publish,
# distribute, sublicense, and/or sell copies of the Software, and to
# permit persons to whom the Software is furnished to do so, subject to
# the following conditions:
#
# The above copyright notice and this permission notice shall be
# included in all copies or substantial portions of the Software.
"""A simple utility for constructing filesystem-like trees from beets
libraries.
"""
from collections import namedtuple
from beets import util
Node = namedtuple('Node', ['files', 'dirs'])
def _insert(node, path, itemid):
"""Insert an item into a virtual filesystem node."""
if len(path) == 1:
# Last component. Insert file.
node.files[path[0]] = itemid
else:
# In a directory.
dirname = path[0]
rest = path[1:]
if dirname not in node.dirs:
node.dirs[dirname] = Node({}, {})
_insert(node.dirs[dirname], rest, itemid)
def libtree(lib):
"""Generates a filesystem-like directory tree for the files
contained in `lib`. Filesystem nodes are (files, dirs) named
tuples in which both components are dictionaries. The first
maps filenames to Item ids. The second maps directory names to
child node tuples.
"""
root = Node({}, {})
for item in lib.items():
dest = lib.destination(item, fragment=True)
parts = util.components(dest)
_insert(root, parts, item.id)
return root