From 5625a2f1346b6efd1223137a1721899d4c2dc2b7 Mon Sep 17 00:00:00 2001 From: Remy Varma Date: Thu, 3 Nov 2011 22:12:28 +0000 Subject: [PATCH] Deleted some superfluous .py~ files --- lib/beets/__init__.py~ | 19 - lib/beets/autotag/__init__.py~ | 109 --- lib/beets/autotag/art.py~ | 113 --- lib/beets/autotag/hooks.py~ | 125 ---- lib/beets/autotag/match.py~ | 490 ------------- lib/beets/autotag/mb.py~ | 171 ----- lib/beets/importer.py~ | 811 -------------------- lib/beets/library.py~ | 1260 -------------------------------- lib/beets/mediafile.py~ | 885 ---------------------- lib/beets/ui/__init__.py~ | 632 ---------------- lib/beets/ui/commands.py~ | 1036 -------------------------- lib/beets/vfs.py~ | 48 -- 12 files changed, 5699 deletions(-) delete mode 100644 lib/beets/__init__.py~ delete mode 100644 lib/beets/autotag/__init__.py~ delete mode 100644 lib/beets/autotag/art.py~ delete mode 100644 lib/beets/autotag/hooks.py~ delete mode 100644 lib/beets/autotag/match.py~ delete mode 100644 lib/beets/autotag/mb.py~ delete mode 100644 lib/beets/importer.py~ delete mode 100644 lib/beets/library.py~ delete mode 100644 lib/beets/mediafile.py~ delete mode 100644 lib/beets/ui/__init__.py~ delete mode 100755 lib/beets/ui/commands.py~ delete mode 100644 lib/beets/vfs.py~ diff --git a/lib/beets/__init__.py~ b/lib/beets/__init__.py~ deleted file mode 100644 index 84907de8..00000000 --- a/lib/beets/__init__.py~ +++ /dev/null @@ -1,19 +0,0 @@ -# This file is part of beets. -# Copyright 2011, Adrian Sampson. -# -# Permission is hereby granted, free of charge, to any person obtaining -# a copy of this software and associated documentation files (the -# "Software"), to deal in the Software without restriction, including -# without limitation the rights to use, copy, modify, merge, publish, -# distribute, sublicense, and/or sell copies of the Software, and to -# permit persons to whom the Software is furnished to do so, subject to -# the following conditions: -# -# The above copyright notice and this permission notice shall be -# included in all copies or substantial portions of the Software. - -__version__ = '1.0b11' -__author__ = 'Adrian Sampson ' - -import lib.beets.library as beetslibrary -Library = beetslibrary.Library diff --git a/lib/beets/autotag/__init__.py~ b/lib/beets/autotag/__init__.py~ deleted file mode 100644 index 2388ad22..00000000 --- a/lib/beets/autotag/__init__.py~ +++ /dev/null @@ -1,109 +0,0 @@ -# This file is part of beets. -# Copyright 2011, Adrian Sampson. -# -# Permission is hereby granted, free of charge, to any person obtaining -# a copy of this software and associated documentation files (the -# "Software"), to deal in the Software without restriction, including -# without limitation the rights to use, copy, modify, merge, publish, -# distribute, sublicense, and/or sell copies of the Software, and to -# permit persons to whom the Software is furnished to do so, subject to -# the following conditions: -# -# The above copyright notice and this permission notice shall be -# included in all copies or substantial portions of the Software. - -"""Facilities for automatically determining files' correct metadata. -""" -import os -import logging - -from lib.beets import library, mediafile -from lib.beets.util import sorted_walk - -# Parts of external interface. -from .hooks import AlbumInfo, TrackInfo -from .match import AutotagError -from .match import tag_item, tag_album -from .match import RECOMMEND_STRONG, RECOMMEND_MEDIUM, RECOMMEND_NONE -from .match import STRONG_REC_THRESH, MEDIUM_REC_THRESH, REC_GAP_THRESH - -# Global logger. -log = logging.getLogger('beets') - - -# Additional utilities for the main interface. - -def albums_in_dir(path): - """Recursively searches the given directory and returns an iterable - of (path, items) where path is a containing directory and items is - a list of Items that is probably an album. Specifically, any folder - containing any media files is an album. - """ - for root, dirs, files in sorted_walk(path): - # Get a list of items in the directory. - items = [] - for filename in files: - try: - i = library.Item.from_path(os.path.join(root, filename)) - except mediafile.FileTypeError: - pass - except mediafile.UnreadableFileError: - log.warn('unreadable file: ' + filename) - else: - items.append(i) - - # If it's nonempty, yield it. - if items: - yield root, items - -def apply_item_metadata(item, track_info): - """Set an item's metadata from its matched TrackInfo object. - """ - item.artist = track_info.artist - item.title = track_info.title - item.mb_trackid = track_info.track_id - if track_info.artist_id: - item.mb_artistid = track_info.artist_id - # At the moment, the other metadata is left intact (including album - # and track number). Perhaps these should be emptied? - -def apply_metadata(items, album_info): - """Set the items' metadata to match an AlbumInfo object. The list - of items must be ordered. - """ - for index, (item, track_info) in enumerate(zip(items, album_info.tracks)): - # Album, artist, track count. - if track_info.artist: - item.artist = track_info.artist - else: - item.artist = album_info.artist - item.albumartist = album_info.artist - item.album = album_info.album - item.tracktotal = len(items) - - # Release date. - if album_info.year: - item.year = album_info.year - if album_info.month: - item.month = album_info.month - if album_info.day: - item.day = album_info.day - - # Title and track index. - item.title = track_info.title - item.track = index + 1 - - # MusicBrainz IDs. - item.mb_trackid = track_info.track_id - item.mb_albumid = album_info.album_id - if track_info.artist_id: - item.mb_artistid = track_info.artist_id - else: - item.mb_artistid = album_info.artist_id - item.mb_albumartistid = album_info.artist_id - item.albumtype = album_info.albumtype - if album_info.label: - item.label = album_info.label - - # Compilation flag. - item.comp = album_info.va diff --git a/lib/beets/autotag/art.py~ b/lib/beets/autotag/art.py~ deleted file mode 100644 index 768704b3..00000000 --- a/lib/beets/autotag/art.py~ +++ /dev/null @@ -1,113 +0,0 @@ -# This file is part of beets. -# Copyright 2011, Adrian Sampson. -# -# Permission is hereby granted, free of charge, to any person obtaining -# a copy of this software and associated documentation files (the -# "Software"), to deal in the Software without restriction, including -# without limitation the rights to use, copy, modify, merge, publish, -# distribute, sublicense, and/or sell copies of the Software, and to -# permit persons to whom the Software is furnished to do so, subject to -# the following conditions: -# -# The above copyright notice and this permission notice shall be -# included in all copies or substantial portions of the Software. - -"""Finding album art for tagged albums.""" - -import urllib -import sys -import logging -import os - -from beets.autotag.mb import album_for_id - -IMAGE_EXTENSIONS = ['png', 'jpg', 'jpeg'] -COVER_NAMES = ['cover', 'front', 'art', 'album', 'folder'] - -# The common logger. -log = logging.getLogger('beets') - - -# Art from Amazon. - -AMAZON_URL = 'http://images.amazon.com/images/P/%s.%02i.LZZZZZZZ.jpg' -AMAZON_INDICES = (1,2) -AMAZON_CONTENT_TYPE = 'image/jpeg' -def art_for_asin(asin): - """Fetches art for an Amazon ID (ASIN) string.""" - for index in AMAZON_INDICES: - # Fetch the image. - url = AMAZON_URL % (asin, index) - try: - log.debug('Downloading art: %s' % url) - fn, headers = urllib.urlretrieve(url) - except IOError: - log.debug('error fetching art at URL %s' % url) - continue - - # Make sure it's actually an image. - if headers.gettype() == AMAZON_CONTENT_TYPE: - log.debug('Downloaded art to: %s' % fn) - return fn - - -# Art from the filesystem. - -def art_in_path(path): - """Look for album art files in a specified directory.""" - if not os.path.isdir(path): - return - - # Find all files that look like images in the directory. - images = [] - for fn in os.listdir(path): - for ext in IMAGE_EXTENSIONS: - if fn.lower().endswith('.' + ext): - images.append(fn) - - # Look for "preferred" filenames. - for fn in images: - for name in COVER_NAMES: - if fn.lower().startswith(name): - log.debug('Using well-named art file %s' % fn) - return os.path.join(path, fn) - - # Fall back to any image in the folder. - if images: - log.debug('Using fallback art file %s' % images[0]) - return os.path.join(path, images[0]) - - -# Main interface. - -def art_for_album(album, path): - """Given an album info dictionary from MusicBrainz, returns a path - to downloaded art for the album (or None if no art is found). - """ - if isinstance(path, basestring): - out = art_in_path(path) - if out: - return out - - if album.asin: - log.debug('Fetching album art for ASIN %s.' % album.asin) - return art_for_asin(album.asin) - else: - log.debug('No ASIN available: no art found.') - return None - - -# Smoke test. - -if __name__ == '__main__': - aid = sys.argv[1] - album = album_for_id(aid) - if not album: - print 'album not found' - else: - fn = art_for_album(album, None) - if fn: - print fn - print len(open(fn).read())/1024 - else: - print 'no art found' diff --git a/lib/beets/autotag/hooks.py~ b/lib/beets/autotag/hooks.py~ deleted file mode 100644 index f587b35e..00000000 --- a/lib/beets/autotag/hooks.py~ +++ /dev/null @@ -1,125 +0,0 @@ -# This file is part of beets. -# Copyright 2011, Adrian Sampson. -# -# Permission is hereby granted, free of charge, to any person obtaining -# a copy of this software and associated documentation files (the -# "Software"), to deal in the Software without restriction, including -# without limitation the rights to use, copy, modify, merge, publish, -# distribute, sublicense, and/or sell copies of the Software, and to -# permit persons to whom the Software is furnished to do so, subject to -# the following conditions: -# -# The above copyright notice and this permission notice shall be -# included in all copies or substantial portions of the Software. - -"""Glue between metadata sources and the matching logic.""" - -from beets import plugins -from beets.autotag import mb - -# Classes used to represent candidate options. - -class AlbumInfo(object): - """Describes a canonical release that may be used to match a release - in the library. Consists of these data members: - - - ``album``: the release title - - ``album_id``: MusicBrainz ID; UUID fragment only - - ``artist``: name of the release's primary artist - - ``artist_id`` - - ``tracks``: list of TrackInfo objects making up the release - - ``asin``: Amazon ASIN - - ``albumtype``: string describing the kind of release - - ``va``: boolean: whether the release has "various artists" - - ``year``: release year - - ``month``: release month - - ``day``: release day - - ``label``: music label responsible for the release - - The fields up through ``tracks`` are required. The others are - optional and may be None. - """ - def __init__(self, album, album_id, artist, artist_id, tracks, asin=None, - albumtype=None, va=False, year=None, month=None, day=None, - label=None): - self.album = album - self.album_id = album_id - self.artist = artist - self.artist_id = artist_id - self.tracks = tracks - self.asin = asin - self.albumtype = albumtype - self.va = va - self.year = year - self.month = month - self.day = day - self.label = label - -class TrackInfo(object): - """Describes a canonical track present on a release. Appears as part - of an AlbumInfo's ``tracks`` list. Consists of these data members: - - - ``title``: name of the track - - ``track_id``: MusicBrainz ID; UUID fragment only - - ``artist``: individual track artist name - - ``artist_id`` - - ``length``: float: duration of the track in seconds - - Only ``title`` and ``track_id`` are required. The rest of the fields - may be None. - """ - def __init__(self, title, track_id, artist=None, artist_id=None, - length=None): - self.title = title - self.track_id = track_id - self.artist = artist - self.artist_id = artist_id - self.length = length - - -# Aggregation of sources. - -def _album_for_id(album_id): - """Get an album corresponding to a MusicBrainz release ID.""" - return mb.album_for_id(album_id) - -def _track_for_id(track_id): - """Get an item for a recording MBID.""" - return mb.track_for_id(track_id) - -def _album_candidates(items, artist, album, va_likely): - """Search for album matches. ``items`` is a list of Item objects - that make up the album. ``artist`` and ``album`` are the respective - names (strings), which may be derived from the item list or may be - entered by the user. ``va_likely`` is a boolean indicating whether - the album is likely to be a "various artists" release. - """ - out = [] - - # Base candidates if we have album and artist to match. - if artist and album: - out.extend(mb.match_album(artist, album, len(items))) - - # Also add VA matches from MusicBrainz where appropriate. - if va_likely and album: - out.extend(mb.match_album(None, album, len(items))) - - # Candidates from plugins. - out.extend(plugins.candidates(items)) - - return out - -def _item_candidates(item, artist, title): - """Search for item matches. ``item`` is the Item to be matched. - ``artist`` and ``title`` are strings and either reflect the item or - are specified by the user. - """ - out = [] - - # MusicBrainz candidates. - out.extend(mb.match_track(artist, title)) - - # Plugin candidates. - out.extend(plugins.item_candidates(item)) - - return out diff --git a/lib/beets/autotag/match.py~ b/lib/beets/autotag/match.py~ deleted file mode 100644 index 435e6fb0..00000000 --- a/lib/beets/autotag/match.py~ +++ /dev/null @@ -1,490 +0,0 @@ -# This file is part of beets. -# Copyright 2011, Adrian Sampson. -# -# Permission is hereby granted, free of charge, to any person obtaining -# a copy of this software and associated documentation files (the -# "Software"), to deal in the Software without restriction, including -# without limitation the rights to use, copy, modify, merge, publish, -# distribute, sublicense, and/or sell copies of the Software, and to -# permit persons to whom the Software is furnished to do so, subject to -# the following conditions: -# -# The above copyright notice and this permission notice shall be -# included in all copies or substantial portions of the Software. - -"""Matches existing metadata with canonical information to identify -releases and tracks. -""" -import logging -import re -from munkres import Munkres -from unidecode import unidecode - -from beets import plugins -from beets.util import levenshtein, plurality -from beets.autotag import hooks - -# Distance parameters. -# Text distance weights: proportions on the normalized intuitive edit -# distance. -ARTIST_WEIGHT = 3.0 -ALBUM_WEIGHT = 3.0 -# The weight of the entire distance calculated for a given track. -TRACK_WEIGHT = 1.0 -# These distances are components of the track distance (that is, they -# compete against each other but not ARTIST_WEIGHT and ALBUM_WEIGHT; -# the overall TRACK_WEIGHT does that). -TRACK_TITLE_WEIGHT = 3.0 -# Used instead of a global artist penalty for various-artist matches. -TRACK_ARTIST_WEIGHT = 2.0 -# Added when the indices of tracks don't match. -TRACK_INDEX_WEIGHT = 1.0 -# Track length weights: no penalty before GRACE, maximum (WEIGHT) -# penalty at GRACE+MAX discrepancy. -TRACK_LENGTH_GRACE = 10 -TRACK_LENGTH_MAX = 30 -TRACK_LENGTH_WEIGHT = 2.0 -# MusicBrainz track ID matches. -TRACK_ID_WEIGHT = 5.0 - -# Parameters for string distance function. -# Words that can be moved to the end of a string using a comma. -SD_END_WORDS = ['the', 'a', 'an'] -# Reduced weights for certain portions of the string. -SD_PATTERNS = [ - (r'^the ', 0.1), - (r'[\[\(]?(ep|single)[\]\)]?', 0.0), - (r'[\[\(]?(featuring|feat|ft)[\. :].+', 0.1), - (r'\(.*?\)', 0.3), - (r'\[.*?\]', 0.3), - (r'(, )?(pt\.|part) .+', 0.2), -] -# Replacements to use before testing distance. -SD_REPLACE = [ - (r'&', 'and'), -] - -# Recommendation constants. -RECOMMEND_STRONG = 'RECOMMEND_STRONG' -RECOMMEND_MEDIUM = 'RECOMMEND_MEDIUM' -RECOMMEND_NONE = 'RECOMMEND_NONE' -# Thresholds for recommendations. -STRONG_REC_THRESH = 0.04 -MEDIUM_REC_THRESH = 0.25 -REC_GAP_THRESH = 0.25 - -# Artist signals that indicate "various artists". -VA_ARTISTS = (u'', u'various artists', u'va', u'unknown') - -# Autotagging exceptions. -class AutotagError(Exception): - pass - -# Global logger. -log = logging.getLogger('beets') - - -# Primary matching functionality. - -def _string_dist_basic(str1, str2): - """Basic edit distance between two strings, ignoring - non-alphanumeric characters and case. Comparisons are based on a - transliteration/lowering to ASCII characters. Normalized by string - length. - """ - str1 = unidecode(str1) - str2 = unidecode(str2) - str1 = re.sub(r'[^a-z0-9]', '', str1.lower()) - str2 = re.sub(r'[^a-z0-9]', '', str2.lower()) - if not str1 and not str2: - return 0.0 - return levenshtein(str1, str2) / float(max(len(str1), len(str2))) - -def string_dist(str1, str2): - """Gives an "intuitive" edit distance between two strings. This is - an edit distance, normalized by the string length, with a number of - tweaks that reflect intuition about text. - """ - str1 = str1.lower() - str2 = str2.lower() - - # Don't penalize strings that move certain words to the end. For - # example, "the something" should be considered equal to - # "something, the". - for word in SD_END_WORDS: - if str1.endswith(', %s' % word): - str1 = '%s %s' % (word, str1[:-len(word)-2]) - if str2.endswith(', %s' % word): - str2 = '%s %s' % (word, str2[:-len(word)-2]) - - # Perform a couple of basic normalizing substitutions. - for pat, repl in SD_REPLACE: - str1 = re.sub(pat, repl, str1) - str2 = re.sub(pat, repl, str2) - - # Change the weight for certain string portions matched by a set - # of regular expressions. We gradually change the strings and build - # up penalties associated with parts of the string that were - # deleted. - base_dist = _string_dist_basic(str1, str2) - penalty = 0.0 - for pat, weight in SD_PATTERNS: - # Get strings that drop the pattern. - case_str1 = re.sub(pat, '', str1) - case_str2 = re.sub(pat, '', str2) - - if case_str1 != str1 or case_str2 != str2: - # If the pattern was present (i.e., it is deleted in the - # the current case), recalculate the distances for the - # modified strings. - case_dist = _string_dist_basic(case_str1, case_str2) - case_delta = max(0.0, base_dist - case_dist) - if case_delta == 0.0: - continue - - # Shift our baseline strings down (to avoid rematching the - # same part of the string) and add a scaled distance - # amount to the penalties. - str1 = case_str1 - str2 = case_str2 - base_dist = case_dist - penalty += weight * case_delta - dist = base_dist + penalty - - return dist - -def current_metadata(items): - """Returns the most likely artist and album for a set of Items. - Each is determined by tag reflected by the plurality of the Items. - """ - keys = 'artist', 'album' - likelies = {} - consensus = {} - for key in keys: - values = [getattr(item, key) for item in items] - likelies[key], freq = plurality(values) - consensus[key] = (freq == len(values)) - return likelies['artist'], likelies['album'], consensus['artist'] - -def order_items(items, trackinfo): - """Orders the items based on how they match some canonical track - information. This always produces a result if the numbers of tracks - match. - """ - # Make sure lengths match. - if len(items) != len(trackinfo): - return None - - # Construct the cost matrix. - costs = [] - for cur_item in items: - row = [] - for i, canon_item in enumerate(trackinfo): - row.append(track_distance(cur_item, canon_item, i+1)) - costs.append(row) - - # Find a minimum-cost bipartite matching. - matching = Munkres().compute(costs) - - # Order items based on the matching. - ordered_items = [None]*len(items) - for cur_idx, canon_idx in matching: - ordered_items[canon_idx] = items[cur_idx] - return ordered_items - -def track_distance(item, track_info, track_index=None, incl_artist=False): - """Determines the significance of a track metadata change. Returns - a float in [0.0,1.0]. `track_index` is the track number of the - `track_info` metadata set. If `track_index` is provided and - item.track is set, then these indices are used as a component of - the distance calculation. `incl_artist` indicates that a distance - component should be included for the track artist (i.e., for - various-artist releases). - """ - # Distance and normalization accumulators. - dist, dist_max = 0.0, 0.0 - - # Check track length. - if not track_info.length: - # If there's no length to check, assume the worst. - dist += TRACK_LENGTH_WEIGHT - else: - diff = abs(item.length - track_info.length) - diff = max(diff - TRACK_LENGTH_GRACE, 0.0) - diff = min(diff, TRACK_LENGTH_MAX) - dist += (diff / TRACK_LENGTH_MAX) * TRACK_LENGTH_WEIGHT - dist_max += TRACK_LENGTH_WEIGHT - - # Track title. - dist += string_dist(item.title, track_info.title) * TRACK_TITLE_WEIGHT - dist_max += TRACK_TITLE_WEIGHT - - # Track artist, if included. - # Attention: MB DB does not have artist info for all compilations, - # so only check artist distance if there is actually an artist in - # the MB track data. - if incl_artist and track_info.artist: - dist += string_dist(item.artist, track_info.artist) * \ - TRACK_ARTIST_WEIGHT - dist_max += TRACK_ARTIST_WEIGHT - - # Track index. - if track_index and item.track: - if track_index != item.track: - dist += TRACK_INDEX_WEIGHT - dist_max += TRACK_INDEX_WEIGHT - - # MusicBrainz track ID. - if item.mb_trackid: - if item.mb_trackid != track_info.track_id: - dist += TRACK_ID_WEIGHT - dist_max += TRACK_ID_WEIGHT - - # Plugin distances. - plugin_d, plugin_dm = plugins.track_distance(item, track_info) - dist += plugin_d - dist_max += plugin_dm - - return dist / dist_max - -def distance(items, album_info): - """Determines how "significant" an album metadata change would be. - Returns a float in [0.0,1.0]. The list of items must be ordered. - """ - cur_artist, cur_album, _ = current_metadata(items) - cur_artist = cur_artist or '' - cur_album = cur_album or '' - - # These accumulate the possible distance components. The final - # distance will be dist/dist_max. - dist = 0.0 - dist_max = 0.0 - - # Artist/album metadata. - if not album_info.va: - dist += string_dist(cur_artist, album_info.artist) * ARTIST_WEIGHT - dist_max += ARTIST_WEIGHT - dist += string_dist(cur_album, album_info.album) * ALBUM_WEIGHT - dist_max += ALBUM_WEIGHT - - # Track distances. - for i, (item, track_info) in enumerate(zip(items, album_info.tracks)): - dist += track_distance(item, track_info, i+1, album_info.va) * \ - TRACK_WEIGHT - dist_max += TRACK_WEIGHT - - # Plugin distances. - plugin_d, plugin_dm = plugins.album_distance(items, album_info) - dist += plugin_d - dist_max += plugin_dm - - # Normalize distance, avoiding divide-by-zero. - if dist_max == 0.0: - return 0.0 - else: - return dist/dist_max - -def match_by_id(items): - """If the items are tagged with a MusicBrainz album ID, returns an - info dict for the corresponding album. Otherwise, returns None. - """ - # Is there a consensus on the MB album ID? - albumids = [item.mb_albumid for item in items if item.mb_albumid] - if not albumids: - log.debug('No album IDs found.') - return None - - # If all album IDs are equal, look up the album. - if bool(reduce(lambda x,y: x if x==y else (), albumids)): - albumid = albumids[0] - log.debug('Searching for discovered album ID: ' + albumid) - return hooks._album_for_id(albumid) - else: - log.debug('No album ID consensus.') - return None - - #fixme In the future, at the expense of performance, we could use - # other IDs (i.e., track and artist) in case the album tag isn't - # present, but that event seems very unlikely. - -def recommendation(results): - """Given a sorted list of result tuples, returns a recommendation - flag (RECOMMEND_STRONG, RECOMMEND_MEDIUM, RECOMMEND_NONE) based - on the results' distances. - """ - if not results: - # No candidates: no recommendation. - rec = RECOMMEND_NONE - else: - min_dist = results[0][0] - if min_dist < STRONG_REC_THRESH: - # Strong recommendation level. - rec = RECOMMEND_STRONG - elif len(results) == 1: - # Only a single candidate. Medium recommendation. - rec = RECOMMEND_MEDIUM - elif min_dist <= MEDIUM_REC_THRESH: - # Medium recommendation level. - rec = RECOMMEND_MEDIUM - elif results[1][0] - min_dist >= REC_GAP_THRESH: - # Gap between first two candidates is large. - rec = RECOMMEND_MEDIUM - else: - # No conclusion. - rec = RECOMMEND_NONE - return rec - -def validate_candidate(items, tuple_dict, info): - """Given a candidate info dict, attempt to add the candidate to - the output dictionary of result tuples. This involves checking - the track count, ordering the items, checking for duplicates, and - calculating the distance. - """ - log.debug('Candidate: %s - %s' % (info.artist, info.album)) - - # Don't duplicate. - if info.album_id in tuple_dict: - log.debug('Duplicate.') - return - - # Make sure the album has the correct number of tracks. - if len(items) != len(info.tracks): - log.debug('Track count mismatch.') - return - - # Put items in order. - ordered = order_items(items, info.tracks) - if not ordered: - log.debug('Not orderable.') - return - - # Get the change distance. - dist = distance(ordered, info) - log.debug('Success. Distance: %f' % dist) - - tuple_dict[info.album_id] = dist, ordered, info - -def tag_album(items, timid=False, search_artist=None, search_album=None, - search_id=None): - """Bundles together the functionality used to infer tags for a - set of items comprised by an album. Returns everything relevant: - - The current artist. - - The current album. - - A list of (distance, items, info) tuples where info is a - dictionary containing the inferred tags and items is a - reordered version of the input items list. The candidates are - sorted by distance (i.e., best match first). - - A recommendation, one of RECOMMEND_STRONG, RECOMMEND_MEDIUM, - or RECOMMEND_NONE; indicating that the first candidate is - very likely, it is somewhat likely, or no conclusion could - be reached. - If search_artist and search_album or search_id are provided, then - they are used as search terms in place of the current metadata. - May raise an AutotagError if existing metadata is insufficient. - """ - # Get current metadata. - cur_artist, cur_album, artist_consensus = current_metadata(items) - log.debug('Tagging %s - %s' % (cur_artist, cur_album)) - - # The output result tuples (keyed by MB album ID). - out_tuples = {} - - # Try to find album indicated by MusicBrainz IDs. - if search_id: - log.debug('Searching for album ID: ' + search_id) - id_info = hooks._album_for_id(search_id) - else: - id_info = match_by_id(items) - if id_info: - validate_candidate(items, out_tuples, id_info) - rec = recommendation(out_tuples.values()) - log.debug('Album ID match recommendation is ' + str(rec)) - if out_tuples and not timid: - # If we have a very good MBID match, return immediately. - # Otherwise, this match will compete against metadata-based - # matches. - if rec == RECOMMEND_STRONG: - log.debug('ID match.') - return cur_artist, cur_album, out_tuples.values(), rec - - # If searching by ID, don't continue to metadata search. - if search_id is not None: - if out_tuples: - return cur_artist, cur_album, out_tuples.values(), rec - else: - return cur_artist, cur_album, [], RECOMMEND_NONE - - # Search terms. - if not (search_artist and search_album): - # No explicit search terms -- use current metadata. - search_artist, search_album = cur_artist, cur_album - log.debug(u'Search terms: %s - %s' % (search_artist, search_album)) - - # Is this album likely to be a "various artist" release? - va_likely = ((not artist_consensus) or - (search_artist.lower() in VA_ARTISTS) or - any(item.comp for item in items)) - log.debug(u'Album might be VA: %s' % str(va_likely)) - - # Get the results from the data sources. - candidates = hooks._album_candidates(items, search_artist, search_album, - va_likely) - - # Get the distance to each candidate. - log.debug(u'Evaluating %i candidates.' % len(candidates)) - for info in candidates: - validate_candidate(items, out_tuples, info) - - # Sort by distance. - out_tuples = out_tuples.values() - out_tuples.sort() - - rec = recommendation(out_tuples) - return cur_artist, cur_album, out_tuples, rec - -def tag_item(item, timid=False, search_artist=None, search_title=None, - search_id=None): - """Attempts to find metadata for a single track. Returns a - `(candidates, recommendation)` pair where `candidates` is a list - of `(distance, track_info)` pairs. `search_artist` and - `search_title` may be used to override the current metadata for - the purposes of the MusicBrainz title; likewise `search_id`. - """ - candidates = [] - - # First, try matching by MusicBrainz ID. - trackid = search_id or item.mb_trackid - if trackid: - log.debug('Searching for track ID: ' + trackid) - track_info = hooks._track_for_id(trackid) - if track_info: - dist = track_distance(item, track_info, incl_artist=True) - candidates.append((dist, track_info)) - # If this is a good match, then don't keep searching. - rec = recommendation(candidates) - if rec == RECOMMEND_STRONG and not timid: - log.debug('Track ID match.') - return candidates, rec - - # If we're searching by ID, don't proceed. - if search_id is not None: - if candidates: - return candidates, rec - else: - return [], RECOMMEND_NONE - - # Search terms. - if not (search_artist and search_title): - search_artist, search_title = item.artist, item.title - log.debug(u'Item search terms: %s - %s' % (search_artist, search_title)) - - # Get and evaluate candidate metadata. - for track_info in hooks._item_candidates(item, search_artist, search_title): - dist = track_distance(item, track_info, incl_artist=True) - candidates.append((dist, track_info)) - - # Sort by distance and return with recommendation. - log.debug('Found %i candidates.' % len(candidates)) - candidates.sort() - rec = recommendation(candidates) - return candidates, rec diff --git a/lib/beets/autotag/mb.py~ b/lib/beets/autotag/mb.py~ deleted file mode 100644 index a1e7a881..00000000 --- a/lib/beets/autotag/mb.py~ +++ /dev/null @@ -1,171 +0,0 @@ -# This file is part of beets. -# Copyright 2011, Adrian Sampson. -# -# Permission is hereby granted, free of charge, to any person obtaining -# a copy of this software and associated documentation files (the -# "Software"), to deal in the Software without restriction, including -# without limitation the rights to use, copy, modify, merge, publish, -# distribute, sublicense, and/or sell copies of the Software, and to -# permit persons to whom the Software is furnished to do so, subject to -# the following conditions: -# -# The above copyright notice and this permission notice shall be -# included in all copies or substantial portions of the Software. - -"""Searches for albums in the MusicBrainz database. -""" -import logging - -from . import musicbrainz3 -import beets.autotag.hooks -import beets - -SEARCH_LIMIT = 5 -VARIOUS_ARTISTS_ID = '89ad4ac3-39f7-470e-963a-56509c546377' - -musicbrainz3._useragent = 'beets/%s' % beets.__version__ - -class ServerBusyError(Exception): pass -class BadResponseError(Exception): pass - -log = logging.getLogger('beets') - -# We hard-code IDs for artists that can't easily be searched for. -SPECIAL_CASE_ARTISTS = { - '!!!': 'f26c72d3-e52c-467b-b651-679c73d8e1a7', -} - -RELEASE_INCLUDES = ['artists', 'media', 'recordings', 'release-groups', - 'labels'] -TRACK_INCLUDES = ['artists'] - -def _adapt_criteria(criteria): - """Special-case artists in a criteria dictionary before it is passed - to the MusicBrainz search server. The dictionary supplied is - mutated; nothing is returned. - """ - if 'artist' in criteria: - for artist, artist_id in SPECIAL_CASE_ARTISTS.items(): - if criteria['artist'] == artist: - criteria['arid'] = artist_id - del criteria['artist'] - break - -def track_info(recording): - """Translates a MusicBrainz recording result dictionary into a beets - ``TrackInfo`` object. - """ - info = beets.autotag.hooks.TrackInfo(recording['title'], - recording['id']) - - if 'artist-credit' in recording: # XXX: when is this not included? - artist = recording['artist-credit'][0]['artist'] - info.artist = artist['name'] - info.artist_id = artist['id'] - - if recording.get('length'): - info.length = int(recording['length'])/(1000.0) - - return info - -def album_info(release): - """Takes a MusicBrainz release result dictionary and returns a beets - AlbumInfo object containing the interesting data about that release. - """ - # Basic info. - artist = release['artist-credit'][0]['artist'] - tracks = [] - for medium in release['medium-list']: - tracks.extend(i['recording'] for i in medium['track-list']) - info = beets.autotag.hooks.AlbumInfo( - release['title'], - release['id'], - artist['name'], - artist['id'], - [track_info(track) for track in tracks], - ) - info.va = info.artist_id == VARIOUS_ARTISTS_ID - if 'asin' in release: - info.asin = release['asin'] - - # Release type not always populated. - reltype = release['release-group']['type'] - if reltype: - info.albumtype = reltype.lower() - - # Release date. - if 'date' in release: # XXX: when is this not included? - date_str = release['date'] - if date_str: - date_parts = date_str.split('-') - for key in ('year', 'month', 'day'): - if date_parts: - setattr(info, key, int(date_parts.pop(0))) - - # Label name. - if release.get('label-info-list'): - label = release['label-info-list'][0]['label']['name'] - if label != '[no label]': - info.label = label - - return info - -def match_album(artist, album, tracks=None, limit=SEARCH_LIMIT): - """Searches for a single album ("release" in MusicBrainz parlance) - and returns an iterator over AlbumInfo objects. - - The query consists of an artist name, an album name, and, - optionally, a number of tracks on the album. - """ - # Build search criteria. - criteria = {'release': album} - if artist is not None: - criteria['artist'] = artist - else: - # Various Artists search. - criteria['arid'] = VARIOUS_ARTISTS_ID - if tracks is not None: - criteria['tracks'] = str(tracks) - - _adapt_criteria(criteria) - res = musicbrainz3.release_search(limit=limit, **criteria) - for release in res['release-list']: - # The search result is missing some data (namely, the tracks), - # so we just use the ID and fetch the rest of the information. - yield album_for_id(release['id']) - -def match_track(artist, title, limit=SEARCH_LIMIT): - """Searches for a single track and returns an iterable of TrackInfo - objects. - """ - criteria = { - 'artist': artist, - 'recording': title, - } - - _adapt_criteria(criteria) - res = musicbrainz3.recording_search(limit=limit, **criteria) - for recording in res['recording-list']: - yield track_info(recording) - -def album_for_id(albumid): - """Fetches an album by its MusicBrainz ID and returns an AlbumInfo - object or None if the album is not found. - """ - try: - res = musicbrainz3.get_release_by_id(albumid, RELEASE_INCLUDES) - except musicbrainz3.ResponseError: - log.debug('Album ID match failed.') - return None - return album_info(res['release']) - -def track_for_id(trackid): - """Fetches a track by its MusicBrainz ID. Returns a TrackInfo object - or None if no track is found. - """ - try: - res = musicbrainz3.get_recording_by_id(trackid, TRACK_INCLUDES) - except musicbrainz3.ResponseError: - log.debug('Track ID match failed.') - return None - return track_info(res['recording']) diff --git a/lib/beets/importer.py~ b/lib/beets/importer.py~ deleted file mode 100644 index 9ce9e07a..00000000 --- a/lib/beets/importer.py~ +++ /dev/null @@ -1,811 +0,0 @@ -# This file is part of beets. -# Copyright 2011, Adrian Sampson. -# -# Permission is hereby granted, free of charge, to any person obtaining -# a copy of this software and associated documentation files (the -# "Software"), to deal in the Software without restriction, including -# without limitation the rights to use, copy, modify, merge, publish, -# distribute, sublicense, and/or sell copies of the Software, and to -# permit persons to whom the Software is furnished to do so, subject to -# the following conditions: -# -# The above copyright notice and this permission notice shall be -# included in all copies or substantial portions of the Software. - -"""Provides the basic, interface-agnostic workflow for importing and -autotagging music files. -""" -from __future__ import with_statement # Python 2.5 -import os -import logging -import pickle -from collections import defaultdict - -from beets import autotag -from beets import library -import beets.autotag.art -from beets import plugins -from beets.util import pipeline -from beets.util import syspath, normpath, plurality -from beets.util.enumeration import enum - -action = enum( - 'SKIP', 'ASIS', 'TRACKS', 'MANUAL', 'APPLY', 'MANUAL_ID', - name='action' -) - -QUEUE_SIZE = 128 -STATE_FILE = os.path.expanduser('~/.beetsstate') -SINGLE_ARTIST_THRESH = 0.25 -VARIOUS_ARTISTS = u'Various Artists' - -# Global logger. -log = logging.getLogger('beets') - -class ImportAbort(Exception): - """Raised when the user aborts the tagging operation. - """ - pass - - -# Utilities. - -def tag_log(logfile, status, path): - """Log a message about a given album to logfile. The status should - reflect the reason the album couldn't be tagged. - """ - if logfile: - print >>logfile, '%s %s' % (status, path) - -def log_choice(config, task): - """Logs the task's current choice if it should be logged. - """ - path = task.path if task.is_album else task.item.path - if task.choice_flag is action.ASIS: - tag_log(config.logfile, 'asis', path) - elif task.choice_flag is action.SKIP: - tag_log(config.logfile, 'skip', path) - -def _reopen_lib(lib): - """Because of limitations in SQLite, a given Library is bound to - the thread in which it was created. This function reopens Library - objects so that they can be used from separate threads. - """ - if isinstance(lib, library.Library): - return library.Library( - lib.path, - lib.directory, - lib.path_formats, - lib.art_filename, - ) - else: - return lib - -def _duplicate_check(lib, task, recent=None): - """Check whether an album already exists in the library. `recent` - should be a set of (artist, album) pairs that will be built up - with every call to this function and checked along with the - library. - """ - if task.choice_flag is action.ASIS: - artist = task.cur_artist - album = task.cur_album - elif task.choice_flag is action.APPLY: - artist = task.info.artist - album = task.info.album - else: - return False - - if artist is None: - # As-is import with no artist. Skip check. - return False - - # Try the recent albums. - if recent is not None: - if (artist, album) in recent: - return True - recent.add((artist, album)) - - # Look in the library. - cur_paths = set(i.path for i in task.items) - for album_cand in lib.albums(artist=artist): - if album_cand.album == album: - # Check whether the album is identical in contents, in which - # case it is not a duplicate (will be replaced). - other_paths = set(i.path for i in album_cand.items()) - if other_paths == cur_paths: - continue - return True - - return False - -def _item_duplicate_check(lib, task, recent=None): - """Check whether an item already exists in the library.""" - if task.choice_flag is action.ASIS: - artist = task.item.artist - title = task.item.title - elif task.choice_flag is action.APPLY: - artist = task.info.artist - title = task.info.title - else: - return False - - # Try recent items. - if recent is not None: - if (artist, title) in recent: - return True - recent.add((artist, title)) - - # Check the library. - item_iter = lib.items(artist=artist, title=title) - try: - for other_item in item_iter: - # Existing items not considered duplicates. - if other_item.path == task.item.path: - continue - return True - finally: - item_iter.close() - return False - -def _infer_album_fields(task): - """Given an album and an associated import task, massage the - album-level metadata. This ensures that the album artist is set - and that the "compilation" flag is set automatically. - """ - assert task.is_album - assert task.items - - changes = {} - - if task.choice_flag == action.ASIS: - # Taking metadata "as-is". Guess whether this album is VA. - plur_artist, freq = plurality([i.artist for i in task.items]) - if freq == len(task.items) or (freq > 1 and - float(freq) / len(task.items) >= SINGLE_ARTIST_THRESH): - # Single-artist album. - changes['albumartist'] = plur_artist - changes['comp'] = False - else: - # VA. - changes['albumartist'] = VARIOUS_ARTISTS - changes['comp'] = True - - elif task.choice_flag == action.APPLY: - # Applying autotagged metadata. Just get AA from the first - # item. - if not task.items[0].albumartist: - changes['albumartist'] = task.items[0].artist - if not task.items[0].mb_albumartistid: - changes['mb_albumartistid'] = task.items[0].mb_artistid - - else: - assert False - - # Apply new metadata. - for item in task.items: - for k, v in changes.iteritems(): - setattr(item, k, v) - -def _open_state(): - """Reads the state file, returning a dictionary.""" - try: - with open(STATE_FILE) as f: - return pickle.load(f) - except IOError: - return {} -def _save_state(state): - """Writes the state dictionary out to disk.""" - with open(STATE_FILE, 'w') as f: - pickle.dump(state, f) - - -# Utilities for reading and writing the beets progress file, which -# allows long tagging tasks to be resumed when they pause (or crash). -PROGRESS_KEY = 'tagprogress' -def progress_set(toppath, path): - """Record that tagging for the given `toppath` was successful up to - `path`. If path is None, then clear the progress value (indicating - that the tagging completed). - """ - state = _open_state() - if PROGRESS_KEY not in state: - state[PROGRESS_KEY] = {} - - if path is None: - # Remove progress from file. - if toppath in state[PROGRESS_KEY]: - del state[PROGRESS_KEY][toppath] - else: - state[PROGRESS_KEY][toppath] = path - - _save_state(state) -def progress_get(toppath): - """Get the last successfully tagged subpath of toppath. If toppath - has no progress information, returns None. - """ - state = _open_state() - if PROGRESS_KEY not in state: - return None - return state[PROGRESS_KEY].get(toppath) - - -# Similarly, utilities for manipulating the "incremental" import log. -# This keeps track of all directories that were ever imported, which -# allows the importer to only import new stuff. -HISTORY_KEY = 'taghistory' -def history_add(path): - """Indicate that the import of `path` is completed and should not - be repeated in incremental imports. - """ - state = _open_state() - if HISTORY_KEY not in state: - state[HISTORY_KEY] = set() - - state[HISTORY_KEY].add(path) - - _save_state(state) -def history_get(): - """Get the set of completed paths in incremental imports. - """ - state = _open_state() - if HISTORY_KEY not in state: - return set() - return state[HISTORY_KEY] - - -# The configuration structure. - -class ImportConfig(object): - """Contains all the settings used during an import session. Should - be used in a "write-once" way -- everything is set up initially and - then never touched again. - """ - _fields = ['lib', 'paths', 'resume', 'logfile', 'color', 'quiet', - 'quiet_fallback', 'copy', 'write', 'art', 'delete', - 'choose_match_func', 'should_resume_func', 'threaded', - 'autot', 'singletons', 'timid', 'choose_item_func', - 'query', 'incremental'] - def __init__(self, **kwargs): - for slot in self._fields: - setattr(self, slot, kwargs[slot]) - - # Normalize the paths. - if self.paths: - self.paths = map(normpath, self.paths) - - # Incremental and progress are mutually exclusive. - if self.incremental: - self.resume = False - - # When based on a query instead of directories, never - # save progress or try to resume. - if self.query is not None: - self.paths = None - self.resume = False - self.incremental = False - - -# The importer task class. - -class ImportTask(object): - """Represents a single set of items to be imported along with its - intermediate state. May represent an album or a single item. - """ - def __init__(self, toppath=None, path=None, items=None): - self.toppath = toppath - self.path = path - self.items = items - self.sentinel = False - - @classmethod - def done_sentinel(cls, toppath): - """Create an ImportTask that indicates the end of a top-level - directory import. - """ - obj = cls(toppath) - obj.sentinel = True - return obj - - @classmethod - def progress_sentinel(cls, toppath, path): - """Create a task indicating that a single directory in a larger - import has finished. This is only required for singleton - imports; progress is implied for album imports. - """ - obj = cls(toppath, path) - obj.sentinel = True - return obj - - @classmethod - def item_task(cls, item): - """Creates an ImportTask for a single item.""" - obj = cls() - obj.item = item - obj.is_album = False - return obj - - def set_match(self, cur_artist, cur_album, candidates, rec): - """Sets the candidates for this album matched by the - `autotag.tag_album` method. - """ - assert not self.sentinel - self.cur_artist = cur_artist - self.cur_album = cur_album - self.candidates = candidates - self.rec = rec - self.is_album = True - - def set_null_match(self): - """Set the candidates to indicate no album match was found. - """ - self.set_match(None, None, None, None) - - def set_item_match(self, candidates, rec): - """Set the match for a single-item task.""" - assert not self.is_album - assert self.item is not None - self.item_match = (candidates, rec) - - def set_null_item_match(self): - """For single-item tasks, mark the item as having no matches. - """ - assert not self.is_album - assert self.item is not None - self.item_match = None - - def set_choice(self, choice): - """Given either an (info, items) tuple or an action constant, - indicates that an action has been selected by the user (or - automatically). - """ - assert not self.sentinel - # Not part of the task structure: - assert choice not in (action.MANUAL, action.MANUAL_ID) - assert choice != action.APPLY # Only used internally. - if choice in (action.SKIP, action.ASIS, action.TRACKS): - self.choice_flag = choice - self.info = None - else: - assert not isinstance(choice, action) - if self.is_album: - info, items = choice - self.items = items # Reordered items list. - else: - info = choice - self.info = info - self.choice_flag = action.APPLY # Implicit choice. - - def save_progress(self): - """Updates the progress state to indicate that this album has - finished. - """ - if self.sentinel and self.path is None: - # "Done" sentinel. - progress_set(self.toppath, None) - elif self.sentinel or self.is_album: - # "Directory progress" sentinel for singletons or a real - # album task, which implies the same. - progress_set(self.toppath, self.path) - - def save_history(self): - """Save the directory in the history for incremental imports. - """ - if self.sentinel or self.is_album: - history_add(self.path) - - # Logical decisions. - def should_write_tags(self): - """Should new info be written to the files' metadata?""" - if self.choice_flag == action.APPLY: - return True - elif self.choice_flag in (action.ASIS, action.TRACKS, action.SKIP): - return False - else: - assert False - def should_fetch_art(self): - """Should album art be downloaded for this album?""" - return self.should_write_tags() and self.is_album - def should_skip(self): - """After a choice has been made, returns True if this is a - sentinel or it has been marked for skipping. - """ - return self.sentinel or self.choice_flag == action.SKIP - - -# Full-album pipeline stages. - -def read_tasks(config): - """A generator yielding all the albums (as ImportTask objects) found - in the user-specified list of paths. In the case of a singleton - import, yields single-item tasks instead. - """ - # Look for saved progress. - progress = config.resume is not False - if progress: - resume_dirs = {} - for path in config.paths: - resume_dir = progress_get(path) - if resume_dir: - - # Either accept immediately or prompt for input to decide. - if config.resume: - do_resume = True - log.warn('Resuming interrupted import of %s' % path) - else: - do_resume = config.should_resume_func(config, path) - - if do_resume: - resume_dirs[path] = resume_dir - else: - # Clear progress; we're starting from the top. - progress_set(path, None) - - # Look for saved incremental directories. - if config.incremental: - history_dirs = history_get() - - for toppath in config.paths: - # Check whether the path is to a file. - if config.singletons and not os.path.isdir(syspath(toppath)): - item = library.Item.from_path(toppath) - yield ImportTask.item_task(item) - continue - - # Produce paths under this directory. - if progress: - resume_dir = resume_dirs.get(toppath) - for path, items in autotag.albums_in_dir(toppath): - # Skip according to progress. - if progress and resume_dir: - # We're fast-forwarding to resume a previous tagging. - if path == resume_dir: - # We've hit the last good path! Turn off the - # fast-forwarding. - resume_dir = None - continue - - # When incremental, skip paths in the history. - if config.incremental and path in history_dirs: - continue - - # Yield all the necessary tasks. - if config.singletons: - for item in items: - yield ImportTask.item_task(item) - yield ImportTask.progress_sentinel(toppath, path) - else: - yield ImportTask(toppath, path, items) - - # Indicate the directory is finished. - yield ImportTask.done_sentinel(toppath) - -def query_tasks(config): - """A generator that works as a drop-in-replacement for read_tasks. - Instead of finding files from the filesystem, a query is used to - match items from the library. - """ - lib = _reopen_lib(config.lib) - - if config.singletons: - # Search for items. - items = list(lib.items(config.query)) - for item in items: - yield ImportTask.item_task(item) - - else: - # Search for albums. - albums = lib.albums(config.query) - for album in albums: - log.debug('yielding album %i: %s - %s' % - (album.id, album.albumartist, album.album)) - items = list(album.items()) - yield ImportTask(None, album.item_dir(), items) - -def initial_lookup(config): - """A coroutine for performing the initial MusicBrainz lookup for an - album. It accepts lists of Items and yields - (items, cur_artist, cur_album, candidates, rec) tuples. If no match - is found, all of the yielded parameters (except items) are None. - """ - task = None - while True: - task = yield task - if task.sentinel: - continue - - log.debug('Looking up: %s' % task.path) - try: - task.set_match(*autotag.tag_album(task.items, config.timid)) - except autotag.AutotagError: - task.set_null_match() - -def user_query(config): - """A coroutine for interfacing with the user about the tagging - process. lib is the Library to import into and logfile may be - a file-like object for logging the import process. The coroutine - accepts and yields ImportTask objects. - """ - lib = _reopen_lib(config.lib) - recent = set() - task = None - while True: - task = yield task - if task.sentinel: - continue - - # Ask the user for a choice. - choice = config.choose_match_func(task, config) - task.set_choice(choice) - log_choice(config, task) - - # As-tracks: transition to singleton workflow. - if choice is action.TRACKS: - # Set up a little pipeline for dealing with the singletons. - item_tasks = [] - def emitter(): - for item in task.items: - yield ImportTask.item_task(item) - yield ImportTask.progress_sentinel(task.toppath, task.path) - def collector(): - while True: - item_task = yield - item_tasks.append(item_task) - ipl = pipeline.Pipeline((emitter(), item_lookup(config), - item_query(config), collector())) - ipl.run_sequential() - task = pipeline.multiple(item_tasks) - continue - - # Check for duplicates if we have a match (or ASIS). - if _duplicate_check(lib, task, recent): - tag_log(config.logfile, 'duplicate', task.path) - log.warn("This album is already in the library!") - task.set_choice(action.SKIP) - -def show_progress(config): - """This stage replaces the initial_lookup and user_query stages - when the importer is run without autotagging. It displays the album - name and artist as the files are added. - """ - task = None - while True: - task = yield task - if task.sentinel: - continue - - log.info(task.path) - - # Behave as if ASIS were selected. - task.set_null_match() - task.set_choice(action.ASIS) - -def apply_choices(config): - """A coroutine for applying changes to albums during the autotag - process. - """ - lib = _reopen_lib(config.lib) - task = None - while True: - task = yield task - if task.should_skip(): - continue - - items = task.items if task.is_album else [task.item] - # Clear IDs in case the items are being re-tagged. - for item in items: - item.id = None - item.album_id = None - - # Change metadata. - if task.should_write_tags(): - if task.is_album: - autotag.apply_metadata(task.items, task.info) - else: - autotag.apply_item_metadata(task.item, task.info) - - # Infer album-level fields. - if task.is_album: - _infer_album_fields(task) - - # Find existing item entries that these are replacing. Old - # album structures are automatically cleaned up when the - # last item is removed. - replaced_items = defaultdict(list) - for item in items: - dup_items = list(lib.items( - library.MatchQuery('path', item.path) - )) - for dup_item in dup_items: - replaced_items[item].append(dup_item) - log.debug('replacing item %i: %s' % (dup_item.id, item.path)) - log.debug('%i of %i items replaced' % (len(replaced_items), - len(items))) - - # Move/copy files. - task.old_paths = [item.path for item in items] - for item in items: - if config.copy: - # If we're replacing an item, then move rather than - # copying. - do_copy = not bool(replaced_items[item]) - lib.move(item, do_copy, task.is_album) - if config.write and task.should_write_tags(): - item.write() - - # Add items to library. We consolidate this at the end to avoid - # locking while we do the copying and tag updates. - try: - # Remove old items. - for replaced in replaced_items.itervalues(): - for item in replaced: - lib.remove(item) - - # Add new ones. - if task.is_album: - # Add an album. - album = lib.add_album(task.items) - task.album_id = album.id - else: - # Add tracks. - for item in items: - lib.add(item) - finally: - lib.save() - -def fetch_art(config): - """A coroutine that fetches and applies album art for albums where - appropriate. - """ - lib = _reopen_lib(config.lib) - task = None - while True: - task = yield task - if task.should_skip(): - continue - - if task.should_fetch_art(): - artpath = beets.autotag.art.art_for_album(task.info, task.path) - - # Save the art if any was found. - if artpath: - try: - album = lib.get_album(task.album_id) - album.set_art(artpath) - finally: - lib.save(False) - -def finalize(config): - """A coroutine that finishes up importer tasks. In particular, the - coroutine sends plugin events, deletes old files, and saves - progress. This is a "terminal" coroutine (it yields None). - """ - lib = _reopen_lib(config.lib) - while True: - task = yield - if task.should_skip(): - if config.resume is not False: - task.save_progress() - if config.incremental: - task.save_history() - continue - - items = task.items if task.is_album else [task.item] - - # Announce that we've added an album. - if task.is_album: - album = lib.get_album(task.album_id) - plugins.send('album_imported', lib=lib, album=album) - else: - for item in items: - plugins.send('item_imported', lib=lib, item=item) - - # Finally, delete old files. - if config.copy and config.delete: - new_paths = [os.path.realpath(item.path) for item in items] - for old_path in task.old_paths: - # Only delete files that were actually moved. - if old_path not in new_paths: - os.remove(syspath(old_path)) - - # Update progress. - if config.resume is not False: - task.save_progress() - if config.incremental: - task.save_history() - - -# Singleton pipeline stages. - -def item_lookup(config): - """A coroutine used to perform the initial MusicBrainz lookup for - an item task. - """ - task = None - while True: - task = yield task - if task.sentinel: - continue - - task.set_item_match(*autotag.tag_item(task.item, config.timid)) - -def item_query(config): - """A coroutine that queries the user for input on single-item - lookups. - """ - lib = _reopen_lib(config.lib) - task = None - recent = set() - while True: - task = yield task - if task.sentinel: - continue - - choice = config.choose_item_func(task, config) - task.set_choice(choice) - log_choice(config, task) - - # Duplicate check. - if _item_duplicate_check(lib, task, recent): - tag_log(config.logfile, 'duplicate', task.item.path) - log.warn("This item is already in the library!") - task.set_choice(action.SKIP) - -def item_progress(config): - """Skips the lookup and query stages in a non-autotagged singleton - import. Just shows progress. - """ - task = None - log.info('Importing items:') - while True: - task = yield task - if task.sentinel: - continue - - log.info(task.item.path) - task.set_null_item_match() - task.set_choice(action.ASIS) - - -# Main driver. - -def run_import(**kwargs): - """Run an import. The keyword arguments are the same as those to - ImportConfig. - """ - config = ImportConfig(**kwargs) - - # Set up the pipeline. - if config.query is None: - stages = [read_tasks(config)] - else: - stages = [query_tasks(config)] - if config.singletons: - # Singleton importer. - if config.autot: - stages += [item_lookup(config), item_query(config)] - else: - stages += [item_progress(config)] - else: - # Whole-album importer. - if config.autot: - # Only look up and query the user when autotagging. - stages += [initial_lookup(config), user_query(config)] - else: - # When not autotagging, just display progress. - stages += [show_progress(config)] - stages += [apply_choices(config)] - if config.art: - stages += [fetch_art(config)] - stages += [finalize(config)] - pl = pipeline.Pipeline(stages) - - # Run the pipeline. - try: - if config.threaded: - pl.run_parallel(QUEUE_SIZE) - else: - pl.run_sequential() - except ImportAbort: - # User aborted operation. Silently stop. - pass diff --git a/lib/beets/library.py~ b/lib/beets/library.py~ deleted file mode 100644 index 54609956..00000000 --- a/lib/beets/library.py~ +++ /dev/null @@ -1,1260 +0,0 @@ -# This file is part of beets. -# Copyright 2011, Adrian Sampson. -# -# Permission is hereby granted, free of charge, to any person obtaining -# a copy of this software and associated documentation files (the -# "Software"), to deal in the Software without restriction, including -# without limitation the rights to use, copy, modify, merge, publish, -# distribute, sublicense, and/or sell copies of the Software, and to -# permit persons to whom the Software is furnished to do so, subject to -# the following conditions: -# -# The above copyright notice and this permission notice shall be -# included in all copies or substantial portions of the Software. - -import sqlite3 -import os -import re -import sys -from string import Template -import logging -from beets.mediafile import MediaFile -from beets import plugins -from beets import util -from beets.util import bytestring_path, syspath, normpath, samefile - -MAX_FILENAME_LENGTH = 200 - -# Fields in the "items" database table; all the metadata available for -# items in the library. These are used directly in SQL; they are -# vulnerable to injection if accessible to the user. -# Each tuple has the following values: -# - The name of the field. -# - The (SQLite) type of the field. -# - Is the field writable? -# - Does the field reflect an attribute of a MediaFile? -ITEM_FIELDS = [ - ('id', 'integer primary key', False, False), - ('path', 'blob', False, False), - ('album_id', 'int', False, False), - - ('title', 'text', True, True), - ('artist', 'text', True, True), - ('album', 'text', True, True), - ('albumartist', 'text', True, True), - ('genre', 'text', True, True), - ('composer', 'text', True, True), - ('grouping', 'text', True, True), - ('year', 'int', True, True), - ('month', 'int', True, True), - ('day', 'int', True, True), - ('track', 'int', True, True), - ('tracktotal', 'int', True, True), - ('disc', 'int', True, True), - ('disctotal', 'int', True, True), - ('lyrics', 'text', True, True), - ('comments', 'text', True, True), - ('bpm', 'int', True, True), - ('comp', 'bool', True, True), - ('mb_trackid', 'text', True, True), - ('mb_albumid', 'text', True, True), - ('mb_artistid', 'text', True, True), - ('mb_albumartistid', 'text', True, True), - ('albumtype', 'text', True, True), - ('label', 'text', True, True), - - ('length', 'real', False, True), - ('bitrate', 'int', False, True), - ('format', 'text', False, True), -] -ITEM_KEYS_WRITABLE = [f[0] for f in ITEM_FIELDS if f[3] and f[2]] -ITEM_KEYS_META = [f[0] for f in ITEM_FIELDS if f[3]] -ITEM_KEYS = [f[0] for f in ITEM_FIELDS] - -# Database fields for the "albums" table. -# The third entry in each tuple indicates whether the field reflects an -# identically-named field in the items table. -ALBUM_FIELDS = [ - ('id', 'integer primary key', False), - ('artpath', 'blob', False), - - ('albumartist', 'text', True), - ('album', 'text', True), - ('genre', 'text', True), - ('year', 'int', True), - ('month', 'int', True), - ('day', 'int', True), - ('tracktotal', 'int', True), - ('disctotal', 'int', True), - ('comp', 'bool', True), - ('mb_albumid', 'text', True), - ('mb_albumartistid', 'text', True), - ('albumtype', 'text', True), - ('label', 'text', True), -] -ALBUM_KEYS = [f[0] for f in ALBUM_FIELDS] -ALBUM_KEYS_ITEM = [f[0] for f in ALBUM_FIELDS if f[2]] - -# Default search fields for various granularities. -ARTIST_DEFAULT_FIELDS = ('artist',) -ALBUM_DEFAULT_FIELDS = ('album', 'albumartist', 'genre') -ITEM_DEFAULT_FIELDS = ARTIST_DEFAULT_FIELDS + ALBUM_DEFAULT_FIELDS + \ - ('title', 'comments') - -# Logger. -log = logging.getLogger('beets') -if not log.handlers: - log.addHandler(logging.StreamHandler()) - - -# Exceptions. - -class InvalidFieldError(Exception): - pass - - -# Library items (songs). - -class Item(object): - def __init__(self, values): - self.dirty = {} - self._fill_record(values) - self._clear_dirty() - - @classmethod - def from_path(cls, path): - """Creates a new item from the media file at the specified path. - """ - # Initiate with values that aren't read from files. - i = cls({ - 'album_id': None, - }) - i.read(path) - return i - - def _fill_record(self, values): - self.record = {} - for key in ITEM_KEYS: - try: - setattr(self, key, values[key]) - except KeyError: - setattr(self, key, None) - - def _clear_dirty(self): - self.dirty = {} - for key in ITEM_KEYS: - self.dirty[key] = False - - def __repr__(self): - return 'Item(' + repr(self.record) + ')' - - - # Item field accessors. - - def __getattr__(self, key): - """If key is an item attribute (i.e., a column in the database), - returns the record entry for that key. - """ - if key in ITEM_KEYS: - return self.record[key] - else: - raise AttributeError(key + ' is not a valid item field') - - def __setattr__(self, key, value): - """If key is an item attribute (i.e., a column in the database), - sets the record entry for that key to value. Note that to change - the attribute in the database or in the file's tags, one must - call store() or write(). - - Otherwise, performs an ordinary setattr. - """ - # Encode unicode paths and read buffers. - if key == 'path': - if isinstance(value, unicode): - value = bytestring_path(value) - elif isinstance(value, buffer): - value = str(value) - - if key in ITEM_KEYS: - if (not (key in self.record)) or (self.record[key] != value): - # don't dirty if value unchanged - self.record[key] = value - self.dirty[key] = True - else: - super(Item, self).__setattr__(key, value) - - - # Interaction with file metadata. - - def read(self, read_path=None): - """Read the metadata from the associated file. If read_path is - specified, read metadata from that file instead. - """ - if read_path is None: - read_path = self.path - else: - read_path = normpath(read_path) - f = MediaFile(syspath(read_path)) - - for key in ITEM_KEYS_META: - setattr(self, key, getattr(f, key)) - self.path = read_path - - def write(self): - """Writes the item's metadata to the associated file. - """ - f = MediaFile(syspath(self.path)) - for key in ITEM_KEYS_WRITABLE: - setattr(f, key, getattr(self, key)) - f.save() - - - # Files themselves. - - def move(self, dest, copy=False): - """Moves or copies the item's file, updating the path value if - the move succeeds. - """ - if copy: - util.copy(self.path, dest) - else: - util.move(self.path, dest) - - # Either copying or moving succeeded, so update the stored path. - self.path = dest - - -# Library queries. - -class Query(object): - """An abstract class representing a query into the item database. - """ - def clause(self): - """Returns (clause, subvals) where clause is a valid sqlite - WHERE clause implementing the query and subvals is a list of - items to be substituted for ?s in the clause. - """ - raise NotImplementedError - - def match(self, item): - """Check whether this query matches a given Item. Can be used to - perform queries on arbitrary sets of Items. - """ - raise NotImplementedError - - def statement(self, columns='*'): - """Returns (query, subvals) where clause is a sqlite SELECT - statement to enact this query and subvals is a list of values - to substitute in for ?s in the query. - """ - clause, subvals = self.clause() - return ('SELECT ' + columns + ' FROM items WHERE ' + clause, subvals) - - def count(self, library): - """Returns `(num, length)` where `num` is the number of items in - the library matching this query and `length` is their total - length in seconds. - """ - clause, subvals = self.clause() - statement = 'SELECT COUNT(id), SUM(length) FROM items WHERE ' + clause - c = library.conn.execute(statement, subvals) - result = c.fetchone() - c.close() - return (result[0], result[1] or 0.0) - - def execute(self, library): - """Runs the query in the specified library, returning a - ResultIterator. - """ - c = library.conn.cursor() - stmt, subs = self.statement() - log.debug('Executing query: %s' % stmt) - c.execute(stmt, subs) - return ResultIterator(c, library) - -class FieldQuery(Query): - """An abstract query that searches in a specific field for a - pattern. - """ - def __init__(self, field, pattern): - if field not in ITEM_KEYS: - raise InvalidFieldError(field + ' is not an item key') - self.field = field - self.pattern = pattern - -class MatchQuery(FieldQuery): - """A query that looks for exact matches in an item field.""" - def clause(self): - pattern = self.pattern - if self.field == 'path' and isinstance(pattern, str): - pattern = buffer(pattern) - return self.field + " = ?", [pattern] - - def match(self, item): - return self.pattern == getattr(item, self.field) - -class SubstringQuery(FieldQuery): - """A query that matches a substring in a specific item field.""" - def clause(self): - search = '%' + (self.pattern.replace('\\','\\\\').replace('%','\\%') - .replace('_','\\_')) + '%' - clause = self.field + " like ? escape '\\'" - subvals = [search] - return clause, subvals - - def match(self, item): - return self.pattern.lower() in getattr(item, self.field).lower() - -class BooleanQuery(MatchQuery): - """Matches a boolean field. Pattern should either be a boolean or a - string reflecting a boolean. - """ - def __init__(self, field, pattern): - super(BooleanQuery, self).__init__(field, pattern) - if isinstance(pattern, basestring): - self.pattern = util.str2bool(pattern) - self.pattern = int(self.pattern) - -class SingletonQuery(Query): - """Matches either singleton or non-singleton items.""" - def __init__(self, sense): - self.sense = sense - - def clause(self): - if self.sense: - return "album_id ISNULL", () - else: - return "NOT album_id ISNULL", () - - def match(self, item): - return (not item.album_id) == self.sense - -class CollectionQuery(Query): - """An abstract query class that aggregates other queries. Can be - indexed like a list to access the sub-queries. - """ - def __init__(self, subqueries = ()): - self.subqueries = subqueries - - # is there a better way to do this? - def __len__(self): return len(self.subqueries) - def __getitem__(self, key): return self.subqueries[key] - def __iter__(self): iter(self.subqueries) - def __contains__(self, item): item in self.subqueries - - def clause_with_joiner(self, joiner): - """Returns a clause created by joining together the clauses of - all subqueries with the string joiner (padded by spaces). - """ - clause_parts = [] - subvals = [] - for subq in self.subqueries: - subq_clause, subq_subvals = subq.clause() - clause_parts.append('(' + subq_clause + ')') - subvals += subq_subvals - clause = (' ' + joiner + ' ').join(clause_parts) - return clause, subvals - - # regular expression for _parse_query_part, below - _pq_regex = re.compile(# non-grouping optional segment for the keyword - r'(?:' - r'(\S+?)' # the keyword - r'(?>> f = MediaFile('Lucy.mp3') - >>> f.title - u'Lucy in the Sky with Diamonds' - >>> f.artist = 'The Beatles' - >>> f.save() - -A field will always return a reasonable value of the correct type, even -if no tag is present. If no value is available, the value will be false -(e.g., zero or the empty string). -""" -import mutagen -import mutagen.mp3 -import mutagen.oggvorbis -import mutagen.mp4 -import mutagen.flac -import mutagen.monkeysaudio -import datetime -import re -import base64 -import imghdr -import os -import logging -import traceback -from beets.util.enumeration import enum - -__all__ = ['UnreadableFileError', 'FileTypeError', 'MediaFile'] - - -# Logger. -log = logging.getLogger('beets') - - -# Exceptions. - -# Raised for any file MediaFile can't read. -class UnreadableFileError(IOError): - pass - -# Raised for files that don't seem to have a type MediaFile supports. -class FileTypeError(UnreadableFileError): - pass - - -# Constants. - -# Human-readable type names. -TYPES = { - 'mp3': 'MP3', - 'mp4': 'AAC', - 'ogg': 'OGG', - 'flac': 'FLAC', - 'ape': 'APE', - 'wv': 'WavPack', - 'mpc': 'Musepack', -} - - -# Utility. - -def _safe_cast(out_type, val): - """Tries to covert val to out_type but will never raise an - exception. If the value can't be converted, then a sensible - default value is returned. out_type should be bool, int, or - unicode; otherwise, the value is just passed through. - """ - if out_type == int: - if val is None: - return 0 - elif isinstance(val, int) or isinstance(val, float): - # Just a number. - return int(val) - else: - # Process any other type as a string. - if not isinstance(val, basestring): - val = unicode(val) - # Get a number from the front of the string. - val = re.match('[0-9]*', val.strip()).group(0) - if not val: - return 0 - else: - return int(val) - - elif out_type == bool: - if val is None: - return False - else: - try: - # Should work for strings, bools, ints: - return bool(int(val)) - except ValueError: - return False - - elif out_type == unicode: - if val is None: - return u'' - else: - return unicode(val) - - else: - return val - - -# Flags for encoding field behavior. - -# Determine style of packing, if any. -packing = enum('SLASHED', # pair delimited by / - 'TUPLE', # a python tuple of 2 items - 'DATE', # YYYY-MM-DD - name='packing') - -class StorageStyle(object): - """Parameterizes the storage behavior of a single field for a - certain tag format. - - key: The Mutagen key used to access the field's data. - - list_elem: Store item as a single object or as first element - of a list. - - as_type: Which type the value is stored as (unicode, int, - bool, or str). - - packing: If this value is packed in a multiple-value storage - unit, which type of packing (in the packing enum). Otherwise, - None. (Makes as_type irrelevant). - - pack_pos: If the value is packed, in which position it is - stored. - - ID3 storage only: match against this 'desc' field as well - as the key. - """ - def __init__(self, key, list_elem = True, as_type = unicode, - packing = None, pack_pos = 0, id3_desc = None, - id3_frame_field = 'text'): - self.key = key - self.list_elem = list_elem - self.as_type = as_type - self.packing = packing - self.pack_pos = pack_pos - self.id3_desc = id3_desc - self.id3_frame_field = id3_frame_field - - -# Dealing with packings. - -class Packed(object): - """Makes a packed list of values subscriptable. To access the packed - output after making changes, use packed_thing.items. - """ - - def __init__(self, items, packstyle, none_val=0, out_type=int): - """Create a Packed object for subscripting the packed values in - items. The items are packed using packstyle, which is a value - from the packing enum. none_val is returned from a request when - no suitable value is found in the items. Vales are converted to - out_type before they are returned. - """ - self.items = items - self.packstyle = packstyle - self.none_val = none_val - self.out_type = out_type - - def __getitem__(self, index): - if not isinstance(index, int): - raise TypeError('index must be an integer') - - if self.items is None: - return self.none_val - - items = self.items - if self.packstyle == packing.DATE: - # Remove time information from dates. Usually delimited by - # a "T" or a space. - items = re.sub(r'[Tt ].*$', '', unicode(items)) - - # transform from a string packing into a list we can index into - if self.packstyle == packing.SLASHED: - seq = unicode(items).split('/') - elif self.packstyle == packing.DATE: - seq = unicode(items).split('-') - elif self.packstyle == packing.TUPLE: - seq = items # tuple: items is already indexable - - try: - out = seq[index] - except: - out = None - - if out is None or out == self.none_val or out == '': - return _safe_cast(self.out_type, self.none_val) - else: - return _safe_cast(self.out_type, out) - - def __setitem__(self, index, value): - if self.packstyle in (packing.SLASHED, packing.TUPLE): - # SLASHED and TUPLE are always two-item packings - length = 2 - else: - # DATE can have up to three fields - length = 3 - - # make a list of the items we'll pack - new_items = [] - for i in range(length): - if i == index: - next_item = value - else: - next_item = self[i] - new_items.append(next_item) - - if self.packstyle == packing.DATE: - # Truncate the items wherever we reach an invalid (none) - # entry. This prevents dates like 2008-00-05. - for i, item in enumerate(new_items): - if item == self.none_val or item is None: - del(new_items[i:]) # truncate - break - - if self.packstyle == packing.SLASHED: - self.items = '/'.join(map(unicode, new_items)) - elif self.packstyle == packing.DATE: - field_lengths = [4, 2, 2] # YYYY-MM-DD - elems = [] - for i, item in enumerate(new_items): - elems.append( ('%0' + str(field_lengths[i]) + 'i') % item ) - self.items = '-'.join(elems) - elif self.packstyle == packing.TUPLE: - self.items = new_items - - -# The field itself. - -class MediaField(object): - """A descriptor providing access to a particular (abstract) metadata - field. out_type is the type that users of MediaFile should see and - can be unicode, int, or bool. id3, mp4, and flac are StorageStyle - instances parameterizing the field's storage for each type. - """ - - def __init__(self, out_type = unicode, **kwargs): - """Creates a new MediaField. - - out_type: The field's semantic (exterior) type. - - kwargs: A hash whose keys are 'mp3', 'mp4', and 'etc' - and whose values are StorageStyle instances - parameterizing the field's storage for each type. - """ - self.out_type = out_type - if not set(['mp3', 'mp4', 'etc']) == set(kwargs): - raise TypeError('MediaField constructor must have keyword ' - 'arguments mp3, mp4, and etc') - self.styles = kwargs - - def _fetchdata(self, obj, style): - """Get the value associated with this descriptor's field stored - with the given StorageStyle. Unwraps from a list if necessary. - """ - # fetch the value, which may be a scalar or a list - if obj.type == 'mp3': - if style.id3_desc is not None: # also match on 'desc' field - frames = obj.mgfile.tags.getall(style.key) - entry = None - for frame in frames: - if frame.desc == style.id3_desc: - entry = getattr(frame, style.id3_frame_field) - break - if entry is None: # no desc match - return None - else: - # Get the metadata frame object. - try: - frame = obj.mgfile[style.key] - except KeyError: - return None - - entry = getattr(frame, style.id3_frame_field) - - else: # Not MP3. - try: - entry = obj.mgfile[style.key] - except KeyError: - return None - - # possibly index the list - if style.list_elem: - if entry: # List must have at least one value. - return entry[0] - else: - return None - else: - return entry - - def _storedata(self, obj, val, style): - """Store val for this descriptor's field in the tag dictionary - according to the provided StorageStyle. Store it as a - single-item list if necessary. - """ - # wrap as a list if necessary - if style.list_elem: out = [val] - else: out = val - - if obj.type == 'mp3': - # Try to match on "desc" field. - if style.id3_desc is not None: - frames = obj.mgfile.tags.getall(style.key) - - # try modifying in place - found = False - for frame in frames: - if frame.desc == style.id3_desc: - setattr(frame, style.id3_frame_field, out) - found = True - break - - # need to make a new frame? - if not found: - assert isinstance(style.id3_frame_field, str) # Keyword. - frame = mutagen.id3.Frames[style.key]( - encoding=3, - desc=style.id3_desc, - **{style.id3_frame_field: val} - ) - obj.mgfile.tags.add(frame) - - # Try to match on "owner" field. - elif style.key.startswith('UFID:'): - owner = style.key.split(':', 1)[1] - frames = obj.mgfile.tags.getall(style.key) - - for frame in frames: - # Replace existing frame data. - if frame.owner == owner: - setattr(frame, style.id3_frame_field, val) - else: - # New frame. - assert isinstance(style.id3_frame_field, str) # Keyword. - frame = mutagen.id3.UFID(owner=owner, - **{style.id3_frame_field: val}) - obj.mgfile.tags.setall('UFID', [frame]) - - # Just replace based on key. - else: - assert isinstance(style.id3_frame_field, str) # Keyword. - frame = mutagen.id3.Frames[style.key](encoding = 3, - **{style.id3_frame_field: val}) - obj.mgfile.tags.setall(style.key, [frame]) - - else: # Not MP3. - obj.mgfile[style.key] = out - - def _styles(self, obj): - if obj.type in ('mp3', 'mp4'): - styles = self.styles[obj.type] - else: - styles = self.styles['etc'] # sane styles - - # Make sure we always return a list of styles, even when given - # a single style for convenience. - if isinstance(styles, StorageStyle): - return [styles] - else: - return styles - - def __get__(self, obj, owner): - """Retrieve the value of this metadata field. - """ - # Fetch the data using the various StorageStyles. - styles = self._styles(obj) - for style in styles: - # Use the first style that returns a reasonable value. - out = self._fetchdata(obj, style) - if out: - break - - if style.packing: - out = Packed(out, style.packing)[style.pack_pos] - - # MPEG-4 freeform frames are (should be?) encoded as UTF-8. - if obj.type == 'mp4' and style.key.startswith('----:') and \ - isinstance(out, str): - out = out.decode('utf8') - - return _safe_cast(self.out_type, out) - - def __set__(self, obj, val): - """Set the value of this metadata field. - """ - # Store using every StorageStyle available. - styles = self._styles(obj) - for style in styles: - - if style.packing: - p = Packed(self._fetchdata(obj, style), style.packing) - p[style.pack_pos] = val - out = p.items - - else: # unicode, integer, or boolean scalar - out = val - - # deal with Nones according to abstract type if present - if out is None: - if self.out_type == int: - out = 0 - elif self.out_type == bool: - out = False - elif self.out_type == unicode: - out = u'' - # We trust that packed values are handled above. - - # Convert to correct storage type (irrelevant for - # packed values). - if style.as_type == unicode: - if out is None: - out = u'' - else: - if self.out_type == bool: - # store bools as 1,0 instead of True,False - out = unicode(int(out)) - else: - out = unicode(out) - elif style.as_type == int: - if out is None: - out = 0 - else: - out = int(out) - elif style.as_type in (bool, str): - out = style.as_type(out) - - # MPEG-4 "freeform" (----) frames must be encoded as UTF-8 - # byte strings. - if obj.type == 'mp4' and style.key.startswith('----:') and \ - isinstance(out, unicode): - out = out.encode('utf8') - - # Store the data. - self._storedata(obj, out, style) - -class CompositeDateField(object): - """A MediaFile field for conveniently accessing the year, month, and - day fields as a datetime.date object. Allows both getting and - setting of the component fields. - """ - def __init__(self, year_field, month_field, day_field): - """Create a new date field from the indicated MediaFields for - the component values. - """ - self.year_field = year_field - self.month_field = month_field - self.day_field = day_field - - def __get__(self, obj, owner): - """Return a datetime.date object whose components indicating the - smallest valid date whose components are at least as large as - the three component fields (that is, if year == 1999, month == 0, - and day == 0, then date == datetime.date(1999, 1, 1)). If the - components indicate an invalid date (e.g., if month == 47), - datetime.date.min is returned. - """ - try: - return datetime.date( - max(self.year_field.__get__(obj, owner), datetime.MINYEAR), - max(self.month_field.__get__(obj, owner), 1), - max(self.day_field.__get__(obj, owner), 1) - ) - except ValueError: # Out of range values. - return datetime.date.min - - def __set__(self, obj, val): - """Set the year, month, and day fields to match the components of - the provided datetime.date object. - """ - self.year_field.__set__(obj, val.year) - self.month_field.__set__(obj, val.month) - self.day_field.__set__(obj, val.day) - -class ImageField(object): - """A descriptor providing access to a file's embedded album art. - Holds a bytestring reflecting the image data. The image should - either be a JPEG or a PNG for cross-format compatibility. It's - probably a bad idea to use anything but these two formats. - """ - @classmethod - def _mime(cls, data): - """Return the MIME type (either image/png or image/jpeg) of the - image data (a bytestring). - """ - kind = imghdr.what(None, h=data) - if kind == 'png': - return 'image/png' - else: - # Currently just fall back to JPEG. - return 'image/jpeg' - - @classmethod - def _mp4kind(cls, data): - """Return the MPEG-4 image type code of the data. If the image - is not a PNG or JPEG, JPEG is assumed. - """ - kind = imghdr.what(None, h=data) - if kind == 'png': - return mutagen.mp4.MP4Cover.FORMAT_PNG - else: - return mutagen.mp4.MP4Cover.FORMAT_JPEG - - def __get__(self, obj, owner): - if obj.type == 'mp3': - # Look for APIC frames. - for frame in obj.mgfile.tags.values(): - if frame.FrameID == 'APIC': - picframe = frame - break - else: - # No APIC frame. - return None - - return picframe.data - - elif obj.type == 'mp4': - if 'covr' in obj.mgfile: - covers = obj.mgfile['covr'] - if covers: - cover = covers[0] - # cover is an MP4Cover, which is a subclass of str. - return cover - - # No cover found. - return None - - else: - # Here we're assuming everything but MP3 and MPEG-4 uses - # the Xiph/Vorbis Comments standard. This may not be valid. - # http://wiki.xiph.org/VorbisComment#Cover_art - - if 'metadata_block_picture' not in obj.mgfile: - # Try legacy COVERART tags. - if 'coverart' in obj.mgfile and obj.mgfile['coverart']: - return base64.b64decode(obj.mgfile['coverart'][0]) - return None - - for data in obj.mgfile["metadata_block_picture"]: - try: - pic = mutagen.flac.Picture(base64.b64decode(data)) - break - except TypeError: - pass - else: - return None - - return pic.data - - def __set__(self, obj, val): - if val is not None: - if not isinstance(val, str): - raise ValueError('value must be a byte string or None') - - if obj.type == 'mp3': - # Clear all APIC frames. - obj.mgfile.tags.delall('APIC') - if val is None: - # If we're clearing the image, we're done. - return - - picframe = mutagen.id3.APIC( - encoding = 3, - mime = self._mime(val), - type = 3, # front cover - desc = u'', - data = val, - ) - obj.mgfile['APIC'] = picframe - - elif obj.type == 'mp4': - if val is None: - if 'covr' in obj.mgfile: - del obj.mgfile['covr'] - else: - cover = mutagen.mp4.MP4Cover(val, self._mp4kind(val)) - obj.mgfile['covr'] = [cover] - - else: - # Again, assuming Vorbis Comments standard. - - # Strip all art, including legacy COVERART. - if 'metadata_block_picture' in obj.mgfile: - if 'metadata_block_picture' in obj.mgfile: - del obj.mgfile['metadata_block_picture'] - if 'coverart' in obj.mgfile: - del obj.mgfile['coverart'] - if 'coverartmime' in obj.mgfile: - del obj.mgfile['coverartmime'] - - # Add new art if provided. - if val is not None: - pic = mutagen.flac.Picture() - pic.data = val - pic.mime = self._mime(val) - obj.mgfile['metadata_block_picture'] = [ - base64.b64encode(pic.write()) - ] - - -# The file (a collection of fields). - -class MediaFile(object): - """Represents a multimedia file on disk and provides access to its - metadata. - """ - - def __init__(self, path): - """Constructs a new MediaFile reflecting the file at path. May - throw UnreadableFileError. - """ - self.path = path - - unreadable_exc = ( - mutagen.mp3.HeaderNotFoundError, - mutagen.flac.FLACNoHeaderError, - mutagen.monkeysaudio.MonkeysAudioHeaderError, - mutagen.mp4.MP4StreamInfoError, - mutagen.oggvorbis.OggVorbisHeaderError, - ) - try: - self.mgfile = mutagen.File(path) - except unreadable_exc: - log.warn('header parsing failed') - raise UnreadableFileError('Mutagen could not read file') - except IOError: - raise UnreadableFileError('could not read file') - except: - # Hide bugs in Mutagen. - log.error('uncaught Mutagen exception:\n' + traceback.format_exc()) - raise UnreadableFileError('Mutagen raised an exception') - - if self.mgfile is None: # Mutagen couldn't guess the type - raise FileTypeError('file type unsupported by Mutagen') - elif type(self.mgfile).__name__ == 'M4A' or \ - type(self.mgfile).__name__ == 'MP4': - self.type = 'mp4' - elif type(self.mgfile).__name__ == 'ID3' or \ - type(self.mgfile).__name__ == 'MP3': - self.type = 'mp3' - elif type(self.mgfile).__name__ == 'FLAC': - self.type = 'flac' - elif type(self.mgfile).__name__ == 'OggVorbis': - self.type = 'ogg' - elif type(self.mgfile).__name__ == 'MonkeysAudio': - self.type = 'ape' - elif type(self.mgfile).__name__ == 'WavPack': - self.type = 'wv' - elif type(self.mgfile).__name__ == 'Musepack': - self.type = 'mpc' - else: - raise FileTypeError('file type %s unsupported by MediaFile' % - type(self.mgfile).__name__) - - # add a set of tags if it's missing - if self.mgfile.tags is None: - self.mgfile.add_tags() - - def save(self): - self.mgfile.save() - - - #### field definitions #### - - title = MediaField( - mp3 = StorageStyle('TIT2'), - mp4 = StorageStyle("\xa9nam"), - etc = StorageStyle('title'), - ) - artist = MediaField( - mp3 = StorageStyle('TPE1'), - mp4 = StorageStyle("\xa9ART"), - etc = StorageStyle('artist'), - ) - album = MediaField( - mp3 = StorageStyle('TALB'), - mp4 = StorageStyle("\xa9alb"), - etc = StorageStyle('album'), - ) - genre = MediaField( - mp3 = StorageStyle('TCON'), - mp4 = StorageStyle("\xa9gen"), - etc = StorageStyle('genre'), - ) - composer = MediaField( - mp3 = StorageStyle('TCOM'), - mp4 = StorageStyle("\xa9wrt"), - etc = StorageStyle('composer'), - ) - grouping = MediaField( - mp3 = StorageStyle('TIT1'), - mp4 = StorageStyle("\xa9grp"), - etc = StorageStyle('grouping'), - ) - year = MediaField(out_type=int, - mp3 = StorageStyle('TDRC', - packing = packing.DATE, - pack_pos = 0), - mp4 = StorageStyle("\xa9day", - packing = packing.DATE, - pack_pos = 0), - etc = [StorageStyle('date', - packing = packing.DATE, - pack_pos = 0), - StorageStyle('year')] - ) - month = MediaField(out_type=int, - mp3 = StorageStyle('TDRC', - packing = packing.DATE, - pack_pos = 1), - mp4 = StorageStyle("\xa9day", - packing = packing.DATE, - pack_pos = 1), - etc = StorageStyle('date', - packing = packing.DATE, - pack_pos = 1) - ) - day = MediaField(out_type=int, - mp3 = StorageStyle('TDRC', - packing = packing.DATE, - pack_pos = 2), - mp4 = StorageStyle("\xa9day", - packing = packing.DATE, - pack_pos = 2), - etc = StorageStyle('date', - packing = packing.DATE, - pack_pos = 2) - ) - date = CompositeDateField(year, month, day) - track = MediaField(out_type = int, - mp3 = StorageStyle('TRCK', - packing = packing.SLASHED, - pack_pos = 0), - mp4 = StorageStyle('trkn', - packing = packing.TUPLE, - pack_pos = 0), - etc = [StorageStyle('track'), - StorageStyle('tracknumber')] - ) - tracktotal = MediaField(out_type = int, - mp3 = StorageStyle('TRCK', - packing = packing.SLASHED, - pack_pos = 1), - mp4 = StorageStyle('trkn', - packing = packing.TUPLE, - pack_pos = 1), - etc = [StorageStyle('tracktotal'), - StorageStyle('trackc'), - StorageStyle('totaltracks')] - ) - disc = MediaField(out_type = int, - mp3 = StorageStyle('TPOS', - packing = packing.SLASHED, - pack_pos = 0), - mp4 = StorageStyle('disk', - packing = packing.TUPLE, - pack_pos = 0), - etc = [StorageStyle('disc'), - StorageStyle('discnumber')] - ) - disctotal = MediaField(out_type = int, - mp3 = StorageStyle('TPOS', - packing = packing.SLASHED, - pack_pos = 1), - mp4 = StorageStyle('disk', - packing = packing.TUPLE, - pack_pos = 1), - etc = [StorageStyle('disctotal'), - StorageStyle('discc'), - StorageStyle('totaldiscs')] - ) - lyrics = MediaField( - mp3 = StorageStyle('USLT', - list_elem = False, - id3_desc = u''), - mp4 = StorageStyle("\xa9lyr"), - etc = StorageStyle('lyrics') - ) - comments = MediaField( - mp3 = StorageStyle('COMM', id3_desc = u''), - mp4 = StorageStyle("\xa9cmt"), - etc = [StorageStyle('description'), - StorageStyle('comment')] - ) - bpm = MediaField(out_type = int, - mp3 = StorageStyle('TBPM'), - mp4 = StorageStyle('tmpo', as_type = int), - etc = StorageStyle('bpm') - ) - comp = MediaField(out_type = bool, - mp3 = StorageStyle('TCMP'), - mp4 = StorageStyle('cpil', - list_elem = False, - as_type = bool), - etc = StorageStyle('compilation') - ) - albumartist = MediaField( - mp3 = StorageStyle('TPE2'), - mp4 = StorageStyle('aART'), - etc = [StorageStyle('album artist'), - StorageStyle('albumartist')] - ) - albumtype = MediaField( - mp3 = StorageStyle('TXXX', id3_desc=u'MusicBrainz Album Type'), - mp4 = StorageStyle( - '----:com.apple.iTunes:MusicBrainz Album Type'), - etc = StorageStyle('musicbrainz_albumtype') - ) - label = MediaField( - mp3 = StorageStyle('TPUB'), - mp4 = [StorageStyle('----:com.apple.iTunes:Label'), - StorageStyle('----:com.apple.iTunes:publisher')], - etc = [StorageStyle('label'), - StorageStyle('publisher')] # Traktor - ) - - # Album art. - art = ImageField() - - # MusicBrainz IDs. - mb_trackid = MediaField( - mp3 = StorageStyle('UFID:http://musicbrainz.org', - list_elem = False, - id3_frame_field = 'data'), - mp4 = StorageStyle( - '----:com.apple.iTunes:MusicBrainz Track Id', - as_type=str), - etc = StorageStyle('musicbrainz_trackid') - ) - mb_albumid = MediaField( - mp3 = StorageStyle('TXXX', id3_desc=u'MusicBrainz Album Id'), - mp4 = StorageStyle( - '----:com.apple.iTunes:MusicBrainz Album Id', - as_type=str), - etc = StorageStyle('musicbrainz_albumid') - ) - mb_artistid = MediaField( - mp3 = StorageStyle('TXXX', id3_desc=u'MusicBrainz Artist Id'), - mp4 = StorageStyle( - '----:com.apple.iTunes:MusicBrainz Artist Id', - as_type=str), - etc = StorageStyle('musicbrainz_artistid') - ) - mb_albumartistid = MediaField( - mp3 = StorageStyle('TXXX', - id3_desc=u'MusicBrainz Album Artist Id'), - mp4 = StorageStyle( - '----:com.apple.iTunes:MusicBrainz Album Artist Id', - as_type=str), - etc = StorageStyle('musicbrainz_albumartistid') - ) - - @property - def length(self): - return self.mgfile.info.length - - @property - def bitrate(self): - if hasattr(self.mgfile.info, 'bitrate'): - # Many formats provide it explicitly. - return self.mgfile.info.bitrate - else: - # Otherwise, we calculate bitrate from the file size. (This - # is the case for all of the lossless formats.) - size = os.path.getsize(self.path) - return int(size * 8 / self.length) - - @property - def format(self): - return TYPES[self.type] diff --git a/lib/beets/ui/__init__.py~ b/lib/beets/ui/__init__.py~ deleted file mode 100644 index 4ae9900e..00000000 --- a/lib/beets/ui/__init__.py~ +++ /dev/null @@ -1,632 +0,0 @@ -# This file is part of beets. -# Copyright 2011, Adrian Sampson. -# -# Permission is hereby granted, free of charge, to any person obtaining -# a copy of this software and associated documentation files (the -# "Software"), to deal in the Software without restriction, including -# without limitation the rights to use, copy, modify, merge, publish, -# distribute, sublicense, and/or sell copies of the Software, and to -# permit persons to whom the Software is furnished to do so, subject to -# the following conditions: -# -# The above copyright notice and this permission notice shall be -# included in all copies or substantial portions of the Software. - -"""This module contains all of the core logic for beets' command-line -interface. To invoke the CLI, just call beets.ui.main(). The actual -CLI commands are implemented in the ui.commands module. -""" -import os -import locale -import optparse -import textwrap -import ConfigParser -import sys -from difflib import SequenceMatcher -import logging -import sqlite3 -import errno - -from beets import library -from beets import plugins -from beets import util - -# Constants. -CONFIG_PATH_VAR = 'BEETSCONFIG' -DEFAULT_CONFIG_FILE = os.path.expanduser('~/.beetsconfig') -DEFAULT_LIBRARY = '~/.beetsmusic.blb' -DEFAULT_DIRECTORY = '~/Music' -DEFAULT_PATH_FORMATS = { - 'default': '$albumartist/$album/$track $title', - 'comp': 'Compilations/$album/$track $title', - 'singleton': 'Non-Album/$artist/$title', -} -DEFAULT_ART_FILENAME = 'cover' - - -# UI exception. Commands should throw this in order to display -# nonrecoverable errors to the user. -class UserError(Exception): - pass - - -# Utilities. - -def _encoding(): - """Tries to guess the encoding uses by the terminal.""" - try: - return locale.getdefaultlocale()[1] or 'utf8' - except ValueError: - # Invalid locale environment variable setting. To avoid - # failing entirely for no good reason, assume UTF-8. - return 'utf8' - -def decargs(arglist): - """Given a list of command-line argument bytestrings, attempts to - decode them to Unicode strings. - """ - return [s.decode(_encoding()) for s in arglist] - -def print_(*strings): - """Like print, but rather than raising an error when a character - is not in the terminal's encoding's character set, just silently - replaces it. - """ - if strings: - if isinstance(strings[0], unicode): - txt = u' '.join(strings) - else: - txt = ' '.join(strings) - else: - txt = u'' - if isinstance(txt, unicode): - txt = txt.encode(_encoding(), 'replace') - print txt - -def input_options(options, require=False, prompt=None, fallback_prompt=None, - numrange=None, default=None, color=False, max_width=72): - """Prompts a user for input. The sequence of `options` defines the - choices the user has. A single-letter shortcut is inferred for each - option; the user's choice is returned as that single, lower-case - letter. The options should be provided as lower-case strings unless - a particular shortcut is desired; in that case, only that letter - should be capitalized. - - By default, the first option is the default. If `require` is - provided, then there is no default. `default` can be provided to - override this. The prompt and fallback prompt are also inferred but - can be overridden. - - If numrange is provided, it is a pair of `(high, low)` (both ints) - indicating that, in addition to `options`, the user may enter an - integer in that inclusive range. - - `max_width` specifies the maximum number of columns in the - automatically generated prompt string. - """ - # Assign single letters to each option. Also capitalize the options - # to indicate the letter. - letters = {} - display_letters = [] - capitalized = [] - first = True - for option in options: - # Is a letter already capitalized? - for letter in option: - if letter.isalpha() and letter.upper() == letter: - found_letter = letter - break - else: - # Infer a letter. - for letter in option: - if not letter.isalpha(): - continue # Don't use punctuation. - if letter not in letters: - found_letter = letter - break - else: - raise ValueError('no unambiguous lettering found') - - letters[found_letter.lower()] = option - index = option.index(found_letter) - - # Mark the option's shortcut letter for display. - if (default is None and not numrange and first) \ - or (isinstance(default, basestring) and - found_letter.lower() == default.lower()): - # The first option is the default; mark it. - show_letter = '[%s]' % found_letter.upper() - is_default = True - else: - show_letter = found_letter.upper() - is_default = False - - # Possibly colorize the letter shortcut. - if color: - color = 'turquoise' if is_default else 'blue' - show_letter = colorize(color, show_letter) - - # Insert the highlighted letter back into the word. - capitalized.append( - option[:index] + show_letter + option[index+1:] - ) - display_letters.append(found_letter.upper()) - - first = False - - # The default is just the first option if unspecified. - if default is None: - if require: - default = None - elif numrange: - default = numrange[0] - else: - default = display_letters[0].lower() - - # Make a prompt if one is not provided. - if not prompt: - prompt_parts = [] - prompt_part_lengths = [] - if numrange: - if isinstance(default, int): - default_name = str(default) - if color: - default_name = colorize('turquoise', default_name) - tmpl = '# selection (default %s)' - prompt_parts.append(tmpl % default_name) - prompt_part_lengths.append(len(tmpl % str(default))) - else: - prompt_parts.append('# selection') - prompt_part_lengths.append(prompt_parts[-1]) - prompt_parts += capitalized - prompt_part_lengths += [len(s) for s in options] - - # Wrap the query text. - prompt = '' - line_length = 0 - for i, (part, length) in enumerate(zip(prompt_parts, - prompt_part_lengths)): - # Add punctuation. - if i == len(prompt_parts) - 1: - part += '?' - else: - part += ',' - length += 1 - - # Choose either the current line or the beginning of the next. - if line_length + length + 1 > max_width: - prompt += '\n' - line_length = 0 - - if line_length != 0: - # Not the beginning of the line; need a space. - part = ' ' + part - length += 1 - - prompt += part - line_length += length - - # Make a fallback prompt too. This is displayed if the user enters - # something that is not recognized. - if not fallback_prompt: - fallback_prompt = 'Enter one of ' - if numrange: - fallback_prompt += '%i-%i, ' % numrange - fallback_prompt += ', '.join(display_letters) + ':' - - # (raw_input(prompt) was causing problems with colors.) - print prompt, - resp = raw_input() - while True: - resp = resp.strip().lower() - - # Try default option. - if default is not None and not resp: - resp = default - - # Try an integer input if available. - if numrange: - try: - resp = int(resp) - except ValueError: - pass - else: - low, high = numrange - if low <= resp <= high: - return resp - else: - resp = None - - # Try a normal letter input. - if resp: - resp = resp[0] - if resp in letters: - return resp - - # Prompt for new input. - print fallback_prompt, - resp = raw_input() - -def input_yn(prompt, require=False, color=False): - """Prompts the user for a "yes" or "no" response. The default is - "yes" unless `require` is `True`, in which case there is no default. - """ - sel = input_options( - ('y', 'n'), require, prompt, 'Enter Y or N:', color=color - ) - return sel == 'y' - -def config_val(config, section, name, default, vtype=None): - """Queries the configuration file for a value (given by the - section and name). If no value is present, returns default. - vtype optionally specifies the return type (although only bool - is supported for now). - """ - if not config.has_section(section): - config.add_section(section) - - try: - if vtype is bool: - return config.getboolean(section, name) - else: - return config.get(section, name) - except ConfigParser.NoOptionError: - return default - -def human_bytes(size): - """Formats size, a number of bytes, in a human-readable way.""" - suffices = ['B', 'KB', 'MB', 'GB', 'TB', 'PB', 'EB', 'ZB', 'YB', 'HB'] - for suffix in suffices: - if size < 1024: - return "%3.1f %s" % (size, suffix) - size /= 1024.0 - return "big" - -def human_seconds(interval): - """Formats interval, a number of seconds, as a human-readable time - interval. - """ - units = [ - (1, 'second'), - (60, 'minute'), - (60, 'hour'), - (24, 'day'), - (7, 'week'), - (52, 'year'), - (10, 'decade'), - ] - for i in range(len(units)-1): - increment, suffix = units[i] - next_increment, _ = units[i+1] - interval /= float(increment) - if interval < next_increment: - break - else: - # Last unit. - increment, suffix = units[-1] - interval /= float(increment) - - return "%3.1f %ss" % (interval, suffix) - -# ANSI terminal colorization code heavily inspired by pygments: -# http://dev.pocoo.org/hg/pygments-main/file/b2deea5b5030/pygments/console.py -# (pygments is by Tim Hatch, Armin Ronacher, et al.) -COLOR_ESCAPE = "\x1b[" -DARK_COLORS = ["black", "darkred", "darkgreen", "brown", "darkblue", - "purple", "teal", "lightgray"] -LIGHT_COLORS = ["darkgray", "red", "green", "yellow", "blue", - "fuchsia", "turquoise", "white"] -RESET_COLOR = COLOR_ESCAPE + "39;49;00m" -def colorize(color, text): - """Returns a string that prints the given text in the given color - in a terminal that is ANSI color-aware. The color must be something - in DARK_COLORS or LIGHT_COLORS. - """ - if color in DARK_COLORS: - escape = COLOR_ESCAPE + "%im" % (DARK_COLORS.index(color) + 30) - elif color in LIGHT_COLORS: - escape = COLOR_ESCAPE + "%i;01m" % (LIGHT_COLORS.index(color) + 30) - else: - raise ValueError('no such color %s', color) - return escape + text + RESET_COLOR - -def colordiff(a, b, highlight='red'): - """Given two values, return the same pair of strings except with - their differences highlighted in the specified color. Strings are - highlighted intelligently to show differences; other values are - stringified and highlighted in their entirety. - """ - if not isinstance(a, basestring) or not isinstance(b, basestring): - # Non-strings: use ordinary equality. - a = unicode(a) - b = unicode(b) - if a == b: - return a, b - else: - return colorize(highlight, a), colorize(highlight, b) - - a_out = [] - b_out = [] - - matcher = SequenceMatcher(lambda x: False, a, b) - for op, a_start, a_end, b_start, b_end in matcher.get_opcodes(): - if op == 'equal': - # In both strings. - a_out.append(a[a_start:a_end]) - b_out.append(b[b_start:b_end]) - elif op == 'insert': - # Right only. - b_out.append(colorize(highlight, b[b_start:b_end])) - elif op == 'delete': - # Left only. - a_out.append(colorize(highlight, a[a_start:a_end])) - elif op == 'replace': - # Right and left differ. - a_out.append(colorize(highlight, a[a_start:a_end])) - b_out.append(colorize(highlight, b[b_start:b_end])) - else: - assert(False) - - return u''.join(a_out), u''.join(b_out) - - -# Subcommand parsing infrastructure. - -# This is a fairly generic subcommand parser for optparse. It is -# maintained externally here: -# http://gist.github.com/462717 -# There you will also find a better description of the code and a more -# succinct example program. - -class Subcommand(object): - """A subcommand of a root command-line application that may be - invoked by a SubcommandOptionParser. - """ - def __init__(self, name, parser=None, help='', aliases=()): - """Creates a new subcommand. name is the primary way to invoke - the subcommand; aliases are alternate names. parser is an - OptionParser responsible for parsing the subcommand's options. - help is a short description of the command. If no parser is - given, it defaults to a new, empty OptionParser. - """ - self.name = name - self.parser = parser or optparse.OptionParser() - self.aliases = aliases - self.help = help - -class SubcommandsOptionParser(optparse.OptionParser): - """A variant of OptionParser that parses subcommands and their - arguments. - """ - # A singleton command used to give help on other subcommands. - _HelpSubcommand = Subcommand('help', optparse.OptionParser(), - help='give detailed help on a specific sub-command', - aliases=('?',)) - - def __init__(self, *args, **kwargs): - """Create a new subcommand-aware option parser. All of the - options to OptionParser.__init__ are supported in addition - to subcommands, a sequence of Subcommand objects. - """ - # The subcommand array, with the help command included. - self.subcommands = list(kwargs.pop('subcommands', [])) - self.subcommands.append(self._HelpSubcommand) - - # A more helpful default usage. - if 'usage' not in kwargs: - kwargs['usage'] = """ - %prog COMMAND [ARGS...] - %prog help COMMAND""" - - # Super constructor. - optparse.OptionParser.__init__(self, *args, **kwargs) - - # Adjust the help-visible name of each subcommand. - for subcommand in self.subcommands: - subcommand.parser.prog = '%s %s' % \ - (self.get_prog_name(), subcommand.name) - - # Our root parser needs to stop on the first unrecognized argument. - self.disable_interspersed_args() - - def add_subcommand(self, cmd): - """Adds a Subcommand object to the parser's list of commands. - """ - self.subcommands.append(cmd) - - # Add the list of subcommands to the help message. - def format_help(self, formatter=None): - # Get the original help message, to which we will append. - out = optparse.OptionParser.format_help(self, formatter) - if formatter is None: - formatter = self.formatter - - # Subcommands header. - result = ["\n"] - result.append(formatter.format_heading('Commands')) - formatter.indent() - - # Generate the display names (including aliases). - # Also determine the help position. - disp_names = [] - help_position = 0 - for subcommand in self.subcommands: - name = subcommand.name - if subcommand.aliases: - name += ' (%s)' % ', '.join(subcommand.aliases) - disp_names.append(name) - - # Set the help position based on the max width. - proposed_help_position = len(name) + formatter.current_indent + 2 - if proposed_help_position <= formatter.max_help_position: - help_position = max(help_position, proposed_help_position) - - # Add each subcommand to the output. - for subcommand, name in zip(self.subcommands, disp_names): - # Lifted directly from optparse.py. - name_width = help_position - formatter.current_indent - 2 - if len(name) > name_width: - name = "%*s%s\n" % (formatter.current_indent, "", name) - indent_first = help_position - else: - name = "%*s%-*s " % (formatter.current_indent, "", - name_width, name) - indent_first = 0 - result.append(name) - help_width = formatter.width - help_position - help_lines = textwrap.wrap(subcommand.help, help_width) - result.append("%*s%s\n" % (indent_first, "", help_lines[0])) - result.extend(["%*s%s\n" % (help_position, "", line) - for line in help_lines[1:]]) - formatter.dedent() - - # Concatenate the original help message with the subcommand - # list. - return out + "".join(result) - - def _subcommand_for_name(self, name): - """Return the subcommand in self.subcommands matching the - given name. The name may either be the name of a subcommand or - an alias. If no subcommand matches, returns None. - """ - for subcommand in self.subcommands: - if name == subcommand.name or \ - name in subcommand.aliases: - return subcommand - return None - - def parse_args(self, a=None, v=None): - """Like OptionParser.parse_args, but returns these four items: - - options: the options passed to the root parser - - subcommand: the Subcommand object that was invoked - - suboptions: the options passed to the subcommand parser - - subargs: the positional arguments passed to the subcommand - """ - options, args = optparse.OptionParser.parse_args(self, a, v) - - if not args: - # No command given. - self.print_help() - self.exit() - else: - cmdname = args.pop(0) - subcommand = self._subcommand_for_name(cmdname) - if not subcommand: - self.error('unknown command ' + cmdname) - - suboptions, subargs = subcommand.parser.parse_args(args) - - if subcommand is self._HelpSubcommand: - if subargs: - # particular - cmdname = subargs[0] - helpcommand = self._subcommand_for_name(cmdname) - helpcommand.parser.print_help() - self.exit() - else: - # general - self.print_help() - self.exit() - - return options, subcommand, suboptions, subargs - - -# The root parser and its main function. - -def main(args=None, configfh=None): - """Run the main command-line interface for beets.""" - # Get the default subcommands. - from beets.ui.commands import default_commands - - # Read defaults from config file. - config = ConfigParser.SafeConfigParser() - if configfh: - configpath = None - elif CONFIG_PATH_VAR in os.environ: - configpath = os.path.expanduser(os.environ[CONFIG_PATH_VAR]) - else: - configpath = DEFAULT_CONFIG_FILE - if configpath: - configpath = util.syspath(configpath) - if os.path.exists(util.syspath(configpath)): - configfh = open(configpath) - else: - configfh = None - if configfh: - config.readfp(configfh) - - # Add plugin paths. - plugpaths = config_val(config, 'beets', 'pluginpath', '') - for plugpath in plugpaths.split(':'): - sys.path.append(os.path.expanduser(plugpath)) - # Load requested plugins. - plugnames = config_val(config, 'beets', 'plugins', '') - plugins.load_plugins(plugnames.split()) - plugins.load_listeners() - plugins.send("pluginload") - plugins.configure(config) - - # Construct the root parser. - commands = list(default_commands) - commands += plugins.commands() - parser = SubcommandsOptionParser(subcommands=commands) - parser.add_option('-l', '--library', dest='libpath', - help='library database file to use') - parser.add_option('-d', '--directory', dest='directory', - help="destination music directory") - parser.add_option('-p', '--pathformat', dest='path_format', - help="destination path format string") - parser.add_option('-v', '--verbose', dest='verbose', action='store_true', - help='print debugging information') - - # Parse the command-line! - options, subcommand, suboptions, subargs = parser.parse_args(args) - - # Open library file. - libpath = options.libpath or \ - config_val(config, 'beets', 'library', DEFAULT_LIBRARY) - directory = options.directory or \ - config_val(config, 'beets', 'directory', DEFAULT_DIRECTORY) - legacy_path_format = config_val(config, 'beets', 'path_format', None) - if options.path_format: - # If given, -p overrides all path format settings - path_formats = {'default': options.path_format} - else: - if legacy_path_format: - # Old path formats override the default values. - path_formats = {'default': legacy_path_format} - else: - # If no legacy path format, use the defaults instead. - path_formats = DEFAULT_PATH_FORMATS - if config.has_section('paths'): - path_formats.update(config.items('paths')) - art_filename = \ - config_val(config, 'beets', 'art_filename', DEFAULT_ART_FILENAME) - db_path = os.path.expanduser(libpath) - try: - lib = library.Library(db_path, - directory, - path_formats, - art_filename) - except sqlite3.OperationalError: - raise UserError("database file %s could not be opened" % db_path) - - # Configure the logger. - log = logging.getLogger('beets') - if options.verbose: - log.setLevel(logging.DEBUG) - else: - log.setLevel(logging.INFO) - - # Invoke the subcommand. - try: - subcommand.func(lib, config, suboptions, subargs) - except UserError, exc: - message = exc.args[0] if exc.args else None - subcommand.parser.error(message) - except IOError, exc: - if exc.errno == errno.EPIPE: - # "Broken pipe". End silently. - pass - else: - raise diff --git a/lib/beets/ui/commands.py~ b/lib/beets/ui/commands.py~ deleted file mode 100755 index cf8ea9a9..00000000 --- a/lib/beets/ui/commands.py~ +++ /dev/null @@ -1,1036 +0,0 @@ -# This file is part of beets. -# Copyright 2011, Adrian Sampson. -# -# Permission is hereby granted, free of charge, to any person obtaining -# a copy of this software and associated documentation files (the -# "Software"), to deal in the Software without restriction, including -# without limitation the rights to use, copy, modify, merge, publish, -# distribute, sublicense, and/or sell copies of the Software, and to -# permit persons to whom the Software is furnished to do so, subject to -# the following conditions: -# -# The above copyright notice and this permission notice shall be -# included in all copies or substantial portions of the Software. - -"""This module provides the default commands for beets' command-line -interface. -""" -from __future__ import with_statement # Python 2.5 -import logging -import sys -import os -import time -import itertools -import re - -from beets import ui -from beets.ui import print_, decargs -from beets import autotag -import beets.autotag.art -from beets import plugins -from beets import importer -from beets.util import syspath, normpath, ancestry -from beets import library - -# Global logger. -log = logging.getLogger('beets') - -# The list of default subcommands. This is populated with Subcommand -# objects that can be fed to a SubcommandsOptionParser. -default_commands = [] - -# Utility. - -def _do_query(lib, query, album, also_items=True): - """For commands that operate on matched items, performs a query - and returns a list of matching items and a list of matching - albums. (The latter is only nonempty when album is True.) Raises - a UserError if no items match. also_items controls whether, when - fetching albums, the associated items should be fetched also. - """ - if album: - albums = list(lib.albums(query)) - items = [] - if also_items: - for al in albums: - items += al.items() - - else: - albums = [] - items = list(lib.items(query)) - - if album and not albums: - raise ui.UserError('No matching albums found.') - elif not album and not items: - raise ui.UserError('No matching items found.') - - return items, albums - -FLOAT_EPSILON = 0.01 -def _showdiff(field, oldval, newval, color): - """Prints out a human-readable field difference line.""" - # Considering floats incomparable for perfect equality, introduce - # an epsilon tolerance. - if isinstance(oldval, float) and isinstance(newval, float) and \ - abs(oldval - newval) < FLOAT_EPSILON: - return - - if newval != oldval: - if color: - oldval, newval = ui.colordiff(oldval, newval) - else: - oldval, newval = unicode(oldval), unicode(newval) - print_(u' %s: %s -> %s' % (field, oldval, newval)) - - -# import: Autotagger and importer. - -DEFAULT_IMPORT_COPY = True -DEFAULT_IMPORT_WRITE = True -DEFAULT_IMPORT_DELETE = False -DEFAULT_IMPORT_AUTOT = True -DEFAULT_IMPORT_TIMID = False -DEFAULT_IMPORT_ART = True -DEFAULT_IMPORT_QUIET = False -DEFAULT_IMPORT_QUIET_FALLBACK = 'skip' -DEFAULT_IMPORT_RESUME = None # "ask" -DEFAULT_IMPORT_INCREMENTAL = False -DEFAULT_THREADED = True -DEFAULT_COLOR = True - -VARIOUS_ARTISTS = u'Various Artists' - -# Importer utilities and support. - -def dist_string(dist, color): - """Formats a distance (a float) as a similarity percentage string. - The string is colorized if color is True. - """ - out = '%.1f%%' % ((1 - dist) * 100) - if color: - if dist <= autotag.STRONG_REC_THRESH: - out = ui.colorize('green', out) - elif dist <= autotag.MEDIUM_REC_THRESH: - out = ui.colorize('yellow', out) - else: - out = ui.colorize('red', out) - return out - -def show_change(cur_artist, cur_album, items, info, dist, color=True): - """Print out a representation of the changes that will be made if - tags are changed from (cur_artist, cur_album, items) to info with - distance dist. - """ - def show_album(artist, album): - if artist: - print_(' %s - %s' % (artist, album)) - elif album: - print_(' %s' % album) - else: - print_(' (unknown album)') - - # Identify the album in question. - if cur_artist != info.artist or \ - (cur_album != info.album and info.album != VARIOUS_ARTISTS): - artist_l, artist_r = cur_artist or '', info.artist - album_l, album_r = cur_album or '', info.album - if artist_r == VARIOUS_ARTISTS: - # Hide artists for VA releases. - artist_l, artist_r = u'', u'' - - if color: - artist_l, artist_r = ui.colordiff(artist_l, artist_r) - album_l, album_r = ui.colordiff(album_l, album_r) - - print_("Correcting tags from:") - show_album(artist_l, album_l) - print_("To:") - show_album(artist_r, album_r) - else: - print_("Tagging: %s - %s" % (info.artist, info.album)) - - # Distance/similarity. - print_('(Similarity: %s)' % dist_string(dist, color)) - - # Tracks. - for i, (item, track_info) in enumerate(zip(items, info.tracks)): - cur_track = str(item.track) - new_track = str(i+1) - cur_title = item.title - new_title = track_info.title - - # Possibly colorize changes. - if color: - cur_title, new_title = ui.colordiff(cur_title, new_title) - if cur_track != new_track: - cur_track = ui.colorize('red', cur_track) - new_track = ui.colorize('red', new_track) - - # Show filename (non-colorized) when title is not set. - if not item.title.strip(): - cur_title = os.path.basename(item.path) - - if cur_title != new_title and cur_track != new_track: - print_(" * %s (%s) -> %s (%s)" % ( - cur_title, cur_track, new_title, new_track - )) - elif cur_title != new_title: - print_(" * %s -> %s" % (cur_title, new_title)) - elif cur_track != new_track: - print_(" * %s (%s -> %s)" % (item.title, cur_track, new_track)) - -def show_item_change(item, info, dist, color): - """Print out the change that would occur by tagging `item` with the - metadata from `info`. - """ - cur_artist, new_artist = item.artist, info.artist - cur_title, new_title = item.title, info.title - - if cur_artist != new_artist or cur_title != new_title: - if color: - cur_artist, new_artist = ui.colordiff(cur_artist, new_artist) - cur_title, new_title = ui.colordiff(cur_title, new_title) - - print_("Correcting track tags from:") - print_(" %s - %s" % (cur_artist, cur_title)) - print_("To:") - print_(" %s - %s" % (new_artist, new_title)) - - else: - print_("Tagging track: %s - %s" % (cur_artist, cur_title)) - - print_('(Similarity: %s)' % dist_string(dist, color)) - -def should_resume(config, path): - return ui.input_yn("Import of the directory:\n%s" - "\nwas interrupted. Resume (Y/n)?" % path) - -def _quiet_fall_back(config): - """Show the user that the default action is being taken because - we're in quiet mode and the recommendation is not strong. - """ - if config.quiet_fallback == importer.action.SKIP: - print_('Skipping.') - elif config.quiet_fallback == importer.action.ASIS: - print_('Importing as-is.') - else: - assert(False) - return config.quiet_fallback - -def choose_candidate(candidates, singleton, rec, color, timid, - cur_artist=None, cur_album=None, item=None): - """Given a sorted list of candidates, ask the user for a selection - of which candidate to use. Applies to both full albums and - singletons (tracks). For albums, the candidates are `(dist, items, - info)` triples and `cur_artist` and `cur_album` must be provided. - For singletons, the candidates are `(dist, info)` pairs and `item` - must be provided. - - Returns the result of the choice, which may SKIP, ASIS, TRACKS, or - MANUAL or a candidate. For albums, a candidate is a `(info, items)` - pair; for items, it is just a TrackInfo object. - """ - # Sanity check. - if singleton: - assert item is not None - else: - assert cur_artist is not None - assert cur_album is not None - - # Zero candidates. - if not candidates: - print_("No match found.") - if singleton: - opts = ('Use as-is', 'Skip', 'Enter search', 'enter Id', - 'aBort') - else: - opts = ('Use as-is', 'as Tracks', 'Skip', 'Enter search', - 'enter Id', 'aBort') - sel = ui.input_options(opts, color=color) - if sel == 'u': - return importer.action.ASIS - elif sel == 't': - assert not singleton - return importer.action.TRACKS - elif sel == 'e': - return importer.action.MANUAL - elif sel == 's': - return importer.action.SKIP - elif sel == 'b': - raise importer.ImportAbort() - elif sel == 'i': - return importer.action.MANUAL_ID - else: - assert False - - # Is the change good enough? - bypass_candidates = False - if rec != autotag.RECOMMEND_NONE: - if singleton: - dist, info = candidates[0] - else: - dist, items, info = candidates[0] - bypass_candidates = True - - while True: - # Display and choose from candidates. - if not bypass_candidates: - # Display list of candidates. - if singleton: - print_('Finding tags for track "%s - %s".' % - (item.artist, item.title)) - print_('Candidates:') - for i, (dist, info) in enumerate(candidates): - print_('%i. %s - %s (%s)' % (i+1, info['artist'], - info['title'], dist_string(dist, color))) - else: - print_('Finding tags for album "%s - %s".' % - (cur_artist, cur_album)) - print_('Candidates:') - for i, (dist, items, info) in enumerate(candidates): - line = '%i. %s - %s' % (i+1, info['artist'], - info['album']) - - # Label and year disambiguation, if available. - label, year = None, None - if 'label' in info: - label = info['label'] - if 'year' in info and info['year']: - year = unicode(info['year']) - if label and year: - line += u' [%s, %s]' % (label, year) - elif label: - line += u' [%s]' % label - elif year: - line += u' [%s]' % year - - line += ' (%s)' % dist_string(dist, color) - print_(line) - - # Ask the user for a choice. - if singleton: - opts = ('Skip', 'Use as-is', 'Enter search', 'enter Id', - 'aBort') - else: - opts = ('Skip', 'Use as-is', 'as Tracks', 'Enter search', - 'enter Id', 'aBort') - sel = ui.input_options(opts, numrange=(1, len(candidates)), - color=color) - if sel == 's': - return importer.action.SKIP - elif sel == 'u': - return importer.action.ASIS - elif sel == 'e': - return importer.action.MANUAL - elif sel == 't': - assert not singleton - return importer.action.TRACKS - elif sel == 'b': - raise importer.ImportAbort() - elif sel == 'i': - return importer.action.MANUAL_ID - else: # Numerical selection. - if singleton: - dist, info = candidates[sel-1] - else: - dist, items, info = candidates[sel-1] - bypass_candidates = False - - # Show what we're about to do. - if singleton: - show_item_change(item, info, dist, color) - else: - show_change(cur_artist, cur_album, items, info, dist, color) - - # Exact match => tag automatically if we're not in timid mode. - if rec == autotag.RECOMMEND_STRONG and not timid: - if singleton: - return info - else: - return info, items - - # Ask for confirmation. - if singleton: - opts = ('Apply', 'More candidates', 'Skip', 'Use as-is', - 'Enter search', 'enter Id', 'aBort') - else: - opts = ('Apply', 'More candidates', 'Skip', 'Use as-is', - 'as Tracks', 'Enter search', 'enter Id', 'aBort') - sel = ui.input_options(opts, color=color) - if sel == 'a': - if singleton: - return info - else: - return info, items - elif sel == 'm': - pass - elif sel == 's': - return importer.action.SKIP - elif sel == 'u': - return importer.action.ASIS - elif sel == 't': - assert not singleton - return importer.action.TRACKS - elif sel == 'e': - return importer.action.MANUAL - elif sel == 'b': - raise importer.ImportAbort() - elif sel == 'i': - return importer.action.MANUAL_ID - -def manual_search(singleton): - """Input either an artist and album (for full albums) or artist and - track name (for singletons) for manual search. - """ - artist = raw_input('Artist: ').decode(sys.stdin.encoding) - name = raw_input('Track: ' if singleton else 'Album: ') \ - .decode(sys.stdin.encoding) - return artist.strip(), name.strip() - -def manual_id(singleton): - """Input a MusicBrainz ID, either for an album ("release") or a - track ("recording"). If no valid ID is entered, returns None. - """ - prompt = 'Enter MusicBrainz %s ID: ' % \ - ('recording' if singleton else 'release') - entry = raw_input(prompt).decode(sys.stdin.encoding).strip() - - # Find the first thing that looks like a UUID/MBID. - match = re.search('[a-f0-9]{8}(-[a-f0-9]{4}){3}-[a-f0-9]{12}', entry) - if match: - return match.group() - else: - log.error('Invalid MBID.') - return None - -def choose_match(task, config): - """Given an initial autotagging of items, go through an interactive - dance with the user to ask for a choice of metadata. Returns an - (info, items) pair, ASIS, or SKIP. - """ - # Show what we're tagging. - print_() - print_(task.path) - - if config.quiet: - # No input; just make a decision. - if task.rec == autotag.RECOMMEND_STRONG: - dist, items, info = task.candidates[0] - show_change(task.cur_artist, task.cur_album, items, info, dist, - config.color) - return info, items - else: - return _quiet_fall_back(config) - - # Loop until we have a choice. - candidates, rec = task.candidates, task.rec - while True: - # Ask for a choice from the user. - choice = choose_candidate(candidates, False, rec, config.color, - config.timid, task.cur_artist, - task.cur_album) - - # Choose which tags to use. - if choice in (importer.action.SKIP, importer.action.ASIS, - importer.action.TRACKS): - # Pass selection to main control flow. - return choice - elif choice is importer.action.MANUAL: - # Try again with manual search terms. - search_artist, search_album = manual_search(False) - try: - _, _, candidates, rec = \ - autotag.tag_album(task.items, config.timid, search_artist, - search_album) - except autotag.AutotagError: - candidates, rec = None, None - elif choice is importer.action.MANUAL_ID: - # Try a manually-entered ID. - search_id = manual_id(False) - if search_id: - try: - _, _, candidates, rec = \ - autotag.tag_album(task.items, config.timid, - search_id=search_id) - except autotag.AutotagError: - candidates, rec = None, None - else: - # We have a candidate! Finish tagging. Here, choice is - # an (info, items) pair as desired. - assert not isinstance(choice, importer.action) - return choice - -def choose_item(task, config): - """Ask the user for a choice about tagging a single item. Returns - either an action constant or a TrackInfo object. - """ - print_() - print_(task.item.path) - candidates, rec = task.item_match - - if config.quiet: - # Quiet mode; make a decision. - if rec == autotag.RECOMMEND_STRONG: - dist, track_info = candidates[0] - show_item_change(task.item, track_info, dist, config.color) - return track_info - else: - return _quiet_fall_back(config) - - while True: - # Ask for a choice. - choice = choose_candidate(candidates, True, rec, config.color, - config.timid, item=task.item) - - if choice in (importer.action.SKIP, importer.action.ASIS): - return choice - elif choice == importer.action.TRACKS: - assert False # TRACKS is only legal for albums. - elif choice == importer.action.MANUAL: - # Continue in the loop with a new set of candidates. - search_artist, search_title = manual_search(True) - candidates, rec = autotag.tag_item(task.item, config.timid, - search_artist, search_title) - elif choice == importer.action.MANUAL_ID: - # Ask for a track ID. - search_id = manual_id(True) - if search_id: - candidates, rec = autotag.tag_item(task.item, config.timid, - search_id=search_id) - else: - # Chose a candidate. - assert not isinstance(choice, importer.action) - return choice - -# The import command. - -def import_files(lib, paths, copy, write, autot, logpath, art, threaded, - color, delete, quiet, resume, quiet_fallback, singletons, - timid, query, incremental): - """Import the files in the given list of paths, tagging each leaf - directory as an album. If copy, then the files are copied into - the library folder. If write, then new metadata is written to the - files themselves. If not autot, then just import the files - without attempting to tag. If logpath is provided, then untaggable - albums will be logged there. If art, then attempt to download - cover art for each album. If threaded, then accelerate autotagging - imports by running them in multiple threads. If color, then - ANSI-colorize some terminal output. If delete, then old files are - deleted when they are copied. If quiet, then the user is - never prompted for input; instead, the tagger just skips anything - it is not confident about. resume indicates whether interrupted - imports can be resumed and is either a boolean or None. - quiet_fallback should be either ASIS or SKIP and indicates what - should happen in quiet mode when the recommendation is not strong. - """ - # Check the user-specified directories. - for path in paths: - if not singletons and not os.path.isdir(syspath(path)): - raise ui.UserError('not a directory: ' + path) - elif singletons and not os.path.exists(syspath(path)): - raise ui.UserError('no such file: ' + path) - - # Check parameter consistency. - if quiet and timid: - raise ui.UserError("can't be both quiet and timid") - - # Open the log. - if logpath: - logpath = normpath(logpath) - logfile = open(syspath(logpath), 'a') - print >>logfile, 'import started', time.asctime() - else: - logfile = None - - # Never ask for input in quiet mode. - if resume is None and quiet: - resume = False - - # Perform the import. - importer.run_import( - lib = lib, - paths = paths, - resume = resume, - logfile = logfile, - color = color, - quiet = quiet, - quiet_fallback = quiet_fallback, - copy = copy, - write = write, - art = art, - delete = delete, - threaded = threaded, - autot = autot, - choose_match_func = choose_match, - should_resume_func = should_resume, - singletons = singletons, - timid = timid, - choose_item_func = choose_item, - query = query, - incremental = incremental, - ) - - # If we were logging, close the file. - if logfile: - print >>logfile, '' - logfile.close() - - # Emit event. - plugins.send('import', lib=lib, paths=paths) - -import_cmd = ui.Subcommand('import', help='import new music', - aliases=('imp', 'im')) -import_cmd.parser.add_option('-c', '--copy', action='store_true', - default=None, help="copy tracks into library directory (default)") -import_cmd.parser.add_option('-C', '--nocopy', action='store_false', - dest='copy', help="don't copy tracks (opposite of -c)") -import_cmd.parser.add_option('-w', '--write', action='store_true', - default=None, help="write new metadata to files' tags (default)") -import_cmd.parser.add_option('-W', '--nowrite', action='store_false', - dest='write', help="don't write metadata (opposite of -w)") -import_cmd.parser.add_option('-a', '--autotag', action='store_true', - dest='autotag', help="infer tags for imported files (default)") -import_cmd.parser.add_option('-A', '--noautotag', action='store_false', - dest='autotag', - help="don't infer tags for imported files (opposite of -a)") -import_cmd.parser.add_option('-p', '--resume', action='store_true', - default=None, help="resume importing if interrupted") -import_cmd.parser.add_option('-P', '--noresume', action='store_false', - dest='resume', help="do not try to resume importing") -import_cmd.parser.add_option('-r', '--art', action='store_true', - default=None, help="try to download album art") -import_cmd.parser.add_option('-R', '--noart', action='store_false', - dest='art', help="don't album art (opposite of -r)") -import_cmd.parser.add_option('-q', '--quiet', action='store_true', - dest='quiet', help="never prompt for input: skip albums instead") -import_cmd.parser.add_option('-l', '--log', dest='logpath', - help='file to log untaggable albums for later review') -import_cmd.parser.add_option('-s', '--singletons', action='store_true', - help='import individual tracks instead of full albums') -import_cmd.parser.add_option('-t', '--timid', dest='timid', - action='store_true', help='always confirm all actions') -import_cmd.parser.add_option('-L', '--library', dest='library', - action='store_true', help='retag items matching a query') -import_cmd.parser.add_option('-i', '--incremental', dest='incremental', - action='store_true', help='skip already-imported directories') -def import_func(lib, config, opts, args): - copy = opts.copy if opts.copy is not None else \ - ui.config_val(config, 'beets', 'import_copy', - DEFAULT_IMPORT_COPY, bool) - write = opts.write if opts.write is not None else \ - ui.config_val(config, 'beets', 'import_write', - DEFAULT_IMPORT_WRITE, bool) - delete = ui.config_val(config, 'beets', 'import_delete', - DEFAULT_IMPORT_DELETE, bool) - autot = opts.autotag if opts.autotag is not None else DEFAULT_IMPORT_AUTOT - art = opts.art if opts.art is not None else \ - ui.config_val(config, 'beets', 'import_art', - DEFAULT_IMPORT_ART, bool) - threaded = ui.config_val(config, 'beets', 'threaded', - DEFAULT_THREADED, bool) - color = ui.config_val(config, 'beets', 'color', DEFAULT_COLOR, bool) - quiet = opts.quiet if opts.quiet is not None else DEFAULT_IMPORT_QUIET - quiet_fallback_str = ui.config_val(config, 'beets', 'import_quiet_fallback', - DEFAULT_IMPORT_QUIET_FALLBACK) - singletons = opts.singletons - timid = opts.timid if opts.timid is not None else \ - ui.config_val(config, 'beets', 'import_timid', - DEFAULT_IMPORT_TIMID, bool) - logpath = opts.logpath if opts.logpath is not None else \ - ui.config_val(config, 'beets', 'import_log', None) - incremental = opts.incremental if opts.incremental is not None else \ - ui.config_val(config, 'beets', 'import_incremental', - DEFAULT_IMPORT_INCREMENTAL, bool) - - # Resume has three options: yes, no, and "ask" (None). - resume = opts.resume if opts.resume is not None else \ - ui.config_val(config, 'beets', 'import_resume', DEFAULT_IMPORT_RESUME) - if isinstance(resume, basestring): - if resume.lower() in ('yes', 'true', 't', 'y', '1'): - resume = True - elif resume.lower() in ('no', 'false', 'f', 'n', '0'): - resume = False - else: - resume = None - - if quiet_fallback_str == 'asis': - quiet_fallback = importer.action.ASIS - else: - quiet_fallback = importer.action.SKIP - - if opts.library: - query = args - paths = [] - else: - query = None - paths = args - - import_files(lib, paths, copy, write, autot, logpath, art, threaded, - color, delete, quiet, resume, quiet_fallback, singletons, - timid, query, incremental) -import_cmd.func = import_func -default_commands.append(import_cmd) - - -# list: Query and show library contents. - -def list_items(lib, query, album, path): - """Print out items in lib matching query. If album, then search for - albums instead of single items. If path, print the matched objects' - paths instead of human-readable information about them. - """ - if album: - for album in lib.albums(query): - if path: - print_(album.item_dir()) - else: - print_(album.albumartist + u' - ' + album.album) - else: - for item in lib.items(query): - if path: - print_(item.path) - else: - print_(item.artist + u' - ' + item.album + u' - ' + item.title) - -list_cmd = ui.Subcommand('list', help='query the library', aliases=('ls',)) -list_cmd.parser.add_option('-a', '--album', action='store_true', - help='show matching albums instead of tracks') -list_cmd.parser.add_option('-p', '--path', action='store_true', - help='print paths for matched items or albums') -def list_func(lib, config, opts, args): - list_items(lib, decargs(args), opts.album, opts.path) -list_cmd.func = list_func -default_commands.append(list_cmd) - - -# update: Update library contents according to on-disk tags. - -def update_items(lib, query, album, move, color, pretend): - """For all the items matched by the query, update the library to - reflect the item's embedded tags. - """ - items, _ = _do_query(lib, query, album) - - # Walk through the items and pick up their changes. - affected_albums = set() - for item in items: - # Item deleted? - if not os.path.exists(syspath(item.path)): - print_(u'X %s - %s' % (item.artist, item.title)) - if not pretend: - lib.remove(item, True) - affected_albums.add(item.album_id) - continue - - # Read new data. - old_data = dict(item.record) - item.read() - - # Special-case album artist when it matches track artist. (Hacky - # but necessary for preserving album-level metadata for non- - # autotagged imports.) - if not item.albumartist and \ - old_data['albumartist'] == old_data['artist'] == item.artist: - item.albumartist = old_data['albumartist'] - item.dirty['albumartist'] = False - - # Get and save metadata changes. - changes = {} - for key in library.ITEM_KEYS_META: - if item.dirty[key]: - changes[key] = old_data[key], getattr(item, key) - if changes: - # Something changed. - print_(u'* %s - %s' % (item.artist, item.title)) - for key, (oldval, newval) in changes.iteritems(): - _showdiff(key, oldval, newval, color) - - # If we're just pretending, then don't move or save. - if pretend: - continue - - # Move the item if it's in the library. - if move and lib.directory in ancestry(item.path): - lib.move(item) - - lib.store(item) - affected_albums.add(item.album_id) - - # Skip album changes while pretending. - if pretend: - return - - # Modify affected albums to reflect changes in their items. - for album_id in affected_albums: - if album_id is None: # Singletons. - continue - album = lib.get_album(album_id) - if not album: # Empty albums have already been removed. - log.debug('emptied album %i' % album_id) - continue - al_items = list(album.items()) - - # Update album structure to reflect an item in it. - for key in library.ALBUM_KEYS_ITEM: - setattr(album, key, getattr(al_items[0], key)) - - # Move album art (and any inconsistent items). - if move and lib.directory in ancestry(al_items[0].path): - log.debug('moving album %i' % album_id) - album.move() - - lib.save() - -update_cmd = ui.Subcommand('update', - help='update the library', aliases=('upd','up',)) -update_cmd.parser.add_option('-a', '--album', action='store_true', - help='show matching albums instead of tracks') -update_cmd.parser.add_option('-M', '--nomove', action='store_false', - default=True, dest='move', help="don't move files in library") -update_cmd.parser.add_option('-p', '--pretend', action='store_true', - help="show all changes but do nothing") -def update_func(lib, config, opts, args): - color = ui.config_val(config, 'beets', 'color', DEFAULT_COLOR, bool) - update_items(lib, decargs(args), opts.album, opts.move, color, opts.pretend) -update_cmd.func = update_func -default_commands.append(update_cmd) - - -# remove: Remove items from library, delete files. - -def remove_items(lib, query, album, delete=False): - """Remove items matching query from lib. If album, then match and - remove whole albums. If delete, also remove files from disk. - """ - # Get the matching items. - items, albums = _do_query(lib, query, album) - - # Show all the items. - for item in items: - print_(item.artist + ' - ' + item.album + ' - ' + item.title) - - # Confirm with user. - print_() - if delete: - prompt = 'Really DELETE %i files (y/n)?' % len(items) - else: - prompt = 'Really remove %i items from the library (y/n)?' % \ - len(items) - if not ui.input_yn(prompt, True): - return - - # Remove (and possibly delete) items. - if album: - for al in albums: - al.remove(delete) - else: - for item in items: - lib.remove(item, delete) - - lib.save() - -remove_cmd = ui.Subcommand('remove', - help='remove matching items from the library', aliases=('rm',)) -remove_cmd.parser.add_option("-d", "--delete", action="store_true", - help="also remove files from disk") -remove_cmd.parser.add_option('-a', '--album', action='store_true', - help='match albums instead of tracks') -def remove_func(lib, config, opts, args): - remove_items(lib, decargs(args), opts.album, opts.delete) -remove_cmd.func = remove_func -default_commands.append(remove_cmd) - - -# stats: Show library/query statistics. - -def show_stats(lib, query): - """Shows some statistics about the matched items.""" - items = lib.items(query) - - total_size = 0 - total_time = 0.0 - total_items = 0 - artists = set() - albums = set() - - for item in items: - #fixme This is approximate, so people might complain that - # this total size doesn't match "du -sh". Could fix this - # by putting total file size in the database. - total_size += int(item.length * item.bitrate / 8) - total_time += item.length - total_items += 1 - artists.add(item.artist) - albums.add(item.album) - - print_("""Tracks: %i -Total time: %s -Total size: %s -Artists: %i -Albums: %i""" % ( - total_items, - ui.human_seconds(total_time), - ui.human_bytes(total_size), - len(artists), len(albums) - )) - -stats_cmd = ui.Subcommand('stats', - help='show statistics about the library or a query') -def stats_func(lib, config, opts, args): - show_stats(lib, decargs(args)) -stats_cmd.func = stats_func -default_commands.append(stats_cmd) - - -# version: Show current beets version. - -def show_version(lib, config, opts, args): - print 'beets version %s' % beets.__version__ - # Show plugins. - names = [] - for plugin in plugins.find_plugins(): - modname = plugin.__module__ - names.append(modname.split('.')[-1]) - if names: - print 'plugins:', ', '.join(names) - else: - print 'no plugins loaded' -version_cmd = ui.Subcommand('version', - help='output version information') -version_cmd.func = show_version -default_commands.append(version_cmd) - - -# modify: Declaratively change metadata. - -def modify_items(lib, mods, query, write, move, album, color, confirm): - """Modifies matching items according to key=value assignments.""" - # Parse key=value specifications into a dictionary. - allowed_keys = library.ALBUM_KEYS if album else library.ITEM_KEYS_WRITABLE - fsets = {} - for mod in mods: - key, value = mod.split('=', 1) - if key not in allowed_keys: - raise ui.UserError('"%s" is not a valid field' % key) - fsets[key] = value - - # Get the items to modify. - items, albums = _do_query(lib, query, album, False) - objs = albums if album else items - - # Preview change. - print_('Modifying %i %ss.' % (len(objs), 'album' if album else 'item')) - for obj in objs: - # Identify the changed object. - if album: - print_(u'* %s - %s' % (obj.albumartist, obj.album)) - else: - print_(u'* %s - %s' % (obj.artist, obj.title)) - - # Show each change. - for field, value in fsets.iteritems(): - curval = getattr(obj, field) - _showdiff(field, curval, value, color) - - # Confirm. - if confirm: - extra = ' and write tags' if write else '' - if not ui.input_yn('Really modify%s (Y/n)?' % extra): - return - - # Apply changes to database. - for obj in objs: - for field, value in fsets.iteritems(): - setattr(obj, field, value) - - if move: - cur_path = obj.item_dir() if album else obj.path - if lib.directory in ancestry(cur_path): # In library? - log.debug('moving object %s' % cur_path) - if album: - obj.move() - else: - lib.move(obj) - - # When modifying items, we have to store them to the database. - if not album: - lib.store(obj) - lib.save() - - # Apply tags if requested. - if write: - if album: - items = itertools.chain(*(a.items() for a in albums)) - for item in items: - item.write() - -modify_cmd = ui.Subcommand('modify', - help='change metadata fields', aliases=('mod',)) -modify_cmd.parser.add_option('-M', '--nomove', action='store_false', - default=True, dest='move', help="don't move files in library") -modify_cmd.parser.add_option('-w', '--write', action='store_true', - default=None, help="write new metadata to files' tags (default)") -modify_cmd.parser.add_option('-W', '--nowrite', action='store_false', - dest='write', help="don't write metadata (opposite of -w)") -modify_cmd.parser.add_option('-a', '--album', action='store_true', - help='modify whole albums instead of tracks') -modify_cmd.parser.add_option('-y', '--yes', action='store_true', - help='skip confirmation') -def modify_func(lib, config, opts, args): - args = decargs(args) - mods = [a for a in args if '=' in a] - query = [a for a in args if '=' not in a] - if not mods: - raise ui.UserError('no modifications specified') - write = opts.write if opts.write is not None else \ - ui.config_val(config, 'beets', 'import_write', - DEFAULT_IMPORT_WRITE, bool) - color = ui.config_val(config, 'beets', 'color', DEFAULT_COLOR, bool) - modify_items(lib, mods, query, write, opts.move, opts.album, color, - not opts.yes) -modify_cmd.func = modify_func -default_commands.append(modify_cmd) - - -# move: Move/copy files to the library or a new base directory. - -def move_items(lib, dest, query, copy, album): - """Moves or copies items to a new base directory, given by dest. If - dest is None, then the library's base directory is used, making the - command "consolidate" files. - """ - items, albums = _do_query(lib, query, album, False) - objs = albums if album else items - - action = 'Copying' if copy else 'Moving' - entity = 'album' if album else 'item' - logging.info('%s %i %ss.' % (action, len(objs), entity)) - for obj in objs: - old_path = obj.item_dir() if album else obj.path - logging.debug('moving: %s' % old_path) - - if album: - obj.move(copy, basedir=dest) - else: - lib.move(obj, copy, basedir=dest) - lib.store(obj) - lib.save() - -move_cmd = ui.Subcommand('move', - help='move or copy items', aliases=('mv',)) -move_cmd.parser.add_option('-d', '--dest', metavar='DIR', dest='dest', - help='destination directory') -move_cmd.parser.add_option('-c', '--copy', default=False, action='store_true', - help='copy instead of moving') -move_cmd.parser.add_option('-a', '--album', default=False, action='store_true', - help='match whole albums instead of tracks') -def move_func(lib, config, opts, args): - dest = opts.dest - if dest is not None: - dest = normpath(dest) - if not os.path.isdir(dest): - raise ui.UserError('no such directory: %s' % dest) - - move_items(lib, dest, decargs(args), opts.copy, opts.album) -move_cmd.func = move_func -default_commands.append(move_cmd) diff --git a/lib/beets/vfs.py~ b/lib/beets/vfs.py~ deleted file mode 100644 index 09f86575..00000000 --- a/lib/beets/vfs.py~ +++ /dev/null @@ -1,48 +0,0 @@ -# This file is part of beets. -# Copyright 2011, Adrian Sampson. -# -# Permission is hereby granted, free of charge, to any person obtaining -# a copy of this software and associated documentation files (the -# "Software"), to deal in the Software without restriction, including -# without limitation the rights to use, copy, modify, merge, publish, -# distribute, sublicense, and/or sell copies of the Software, and to -# permit persons to whom the Software is furnished to do so, subject to -# the following conditions: -# -# The above copyright notice and this permission notice shall be -# included in all copies or substantial portions of the Software. - -"""A simple utility for constructing filesystem-like trees from beets -libraries. -""" -from collections import namedtuple -from beets import util - -Node = namedtuple('Node', ['files', 'dirs']) - -def _insert(node, path, itemid): - """Insert an item into a virtual filesystem node.""" - if len(path) == 1: - # Last component. Insert file. - node.files[path[0]] = itemid - else: - # In a directory. - dirname = path[0] - rest = path[1:] - if dirname not in node.dirs: - node.dirs[dirname] = Node({}, {}) - _insert(node.dirs[dirname], rest, itemid) - -def libtree(lib): - """Generates a filesystem-like directory tree for the files - contained in `lib`. Filesystem nodes are (files, dirs) named - tuples in which both components are dictionaries. The first - maps filenames to Item ids. The second maps directory names to - child node tuples. - """ - root = Node({}, {}) - for item in lib.items(): - dest = lib.destination(item, fragment=True) - parts = util.components(dest) - _insert(root, parts, item.id) - return root