InRough update of the beets lib to 1.0b15

This commit is contained in:
rembo10
2012-07-28 23:45:08 +05:30
parent c1edd5085a
commit d245428ca2
17 changed files with 2786 additions and 1263 deletions

View File

@@ -1,5 +1,5 @@
# This file is part of beets.
# Copyright 2011, Adrian Sampson.
# Copyright 2012, Adrian Sampson.
#
# Permission is hereby granted, free of charge, to any person obtaining
# a copy of this software and associated documentation files (the
@@ -8,20 +8,102 @@
# distribute, sublicense, and/or sell copies of the Software, and to
# permit persons to whom the Software is furnished to do so, subject to
# the following conditions:
#
#
# The above copyright notice and this permission notice shall be
# included in all copies or substantial portions of the Software.
"""Miscellaneous utility functions."""
from __future__ import division
import os
import sys
import re
import shutil
import fnmatch
from collections import defaultdict
import traceback
MAX_FILENAME_LENGTH = 200
class HumanReadableException(Exception):
"""An Exception that can include a human-readable error message to
be logged without a traceback. Can preserve a traceback for
debugging purposes as well.
Has at least two fields: `reason`, the underlying exception or a
string describing the problem; and `verb`, the action being
performed during the error.
If `tb` is provided, it is a string containing a traceback for the
associated exception. (Note that this is not necessary in Python 3.x
and should be removed when we make the transition.)
"""
error_kind = 'Error' # Human-readable description of error type.
def __init__(self, reason, verb, tb=None):
self.reason = reason
self.verb = verb
self.tb = tb
super(HumanReadableException, self).__init__(self.get_message())
def _gerund(self):
"""Generate a (likely) gerund form of the English verb.
"""
if ' ' in self.verb:
return self.verb
gerund = self.verb[:-1] if self.verb.endswith('e') else self.verb
gerund += 'ing'
return gerund
def _reasonstr(self):
"""Get the reason as a string."""
if isinstance(self.reason, basestring):
return self.reason
elif hasattr(self.reason, 'strerror'): # i.e., EnvironmentError
return self.reason.strerror
else:
return u'"{0}"'.format(self.reason)
def get_message(self):
"""Create the human-readable description of the error, sans
introduction.
"""
raise NotImplementedError
def log(self, logger):
"""Log to the provided `logger` a human-readable message as an
error and a verbose traceback as a debug message.
"""
if self.tb:
logger.debug(self.tb)
logger.error(u'{0}: {1}'.format(self.error_kind, self.args[0]))
class FilesystemError(HumanReadableException):
"""An error that occurred while performing a filesystem manipulation
via a function in this module. The `paths` field is a sequence of
pathnames involved in the operation.
"""
def __init__(self, reason, verb, paths, tb=None):
self.paths = paths
super(FilesystemError, self).__init__(reason, verb, tb)
def get_message(self):
# Use a nicer English phrasing for some specific verbs.
if self.verb in ('move', 'copy', 'rename'):
clause = 'while {0} {1} to {2}'.format(
self._gerund(), repr(self.paths[0]), repr(self.paths[1])
)
elif self.verb in ('delete',):
clause = 'while {0} {1}'.format(
self._gerund(), repr(self.paths[0])
)
else:
clause = 'during {0} of paths {1}'.format(
self.verb, u', '.join(repr(p) for p in self.paths)
)
return u'{0} {1}'.format(self._reasonstr(), clause)
def normpath(path):
"""Provide the canonical form of the path suitable for storing in
the database.
@@ -39,11 +121,11 @@ def ancestry(path, pathmod=None):
last_path = None
while path:
path = pathmod.dirname(path)
if path == last_path:
break
last_path = path
if path: # don't yield ''
out.insert(0, path)
return out
@@ -59,7 +141,9 @@ def sorted_walk(path, ignore=()):
# Get all the directories and files at this level.
dirs = []
files = []
for base in os.listdir(path):
for base in os.listdir(syspath(path)):
base = bytestring_path(base)
# Skip ignored filenames.
skip = False
for pat in ignore:
@@ -84,7 +168,7 @@ def sorted_walk(path, ignore=()):
# Recurse into directories.
for base in dirs:
cur = os.path.join(path, base)
# yield from _sorted_walk(cur)
# yield from sorted_walk(...)
for res in sorted_walk(cur, ignore):
yield res
@@ -149,13 +233,13 @@ def components(path, pathmod=None):
comp = pathmod.basename(anc)
if comp:
comps.append(comp)
else: # root
else: # root
comps.append(anc)
last = pathmod.basename(path)
if last:
comps.append(last)
return comps
def bytestring_path(path):
@@ -168,6 +252,13 @@ def bytestring_path(path):
# Try to encode with default encodings, but fall back to UTF8.
encoding = sys.getfilesystemencoding() or sys.getdefaultencoding()
if encoding == 'mbcs':
# On Windows, a broken encoding known to Python as "MBCS" is
# used for the filesystem. However, we only use the Unicode API
# for Windows paths, so the encoding is actually immaterial so
# we can avoid dealing with this nastiness. We arbitrarily
# choose UTF-8.
encoding = 'utf8'
try:
return path.encode(encoding)
except (UnicodeError, LookupError):
@@ -202,12 +293,16 @@ def syspath(path, pathmod=None):
return path
if not isinstance(path, unicode):
# Try to decode with default encodings, but fall back to UTF8.
encoding = sys.getfilesystemencoding() or sys.getdefaultencoding()
# Beets currently represents Windows paths internally with UTF-8
# arbitrarily. But earlier versions used MBCS because it is
# reported as the FS encoding by Windows. Try both.
try:
path = path.decode(encoding, 'replace')
path = path.decode('utf8')
except UnicodeError:
path = path.decode('utf8', 'replace')
# The encoding should always be MBCS, Windows' broken
# Unicode representation.
encoding = sys.getfilesystemencoding() or sys.getdefaultencoding()
path = path.decode(encoding, 'replace')
# Add the magic prefix if it isn't already there
if not path.startswith(u'\\\\?\\'):
@@ -219,42 +314,63 @@ def samefile(p1, p2):
"""Safer equality for paths."""
return shutil._samefile(syspath(p1), syspath(p2))
def soft_remove(path):
"""Remove the file if it exists."""
def remove(path, soft=True):
"""Remove the file. If `soft`, then no error will be raised if the
file does not exist.
"""
path = syspath(path)
if os.path.exists(path):
if soft and not os.path.exists(path):
return
try:
os.remove(path)
except (OSError, IOError) as exc:
raise FilesystemError(exc, 'delete', (path,), traceback.format_exc())
def _assert_not_exists(path, pathmod=None):
"""Raises an OSError if the path exists."""
pathmod = pathmod or os.path
if pathmod.exists(path):
raise OSError('file exists: %s' % path)
def copy(path, dest, replace=False, pathmod=None):
"""Copy a plain file. Permissions are not copied. If dest already
exists, raises an OSError unless replace is True. Has no effect if
path is the same as dest. Paths are translated to system paths
before the syscall.
def copy(path, dest, replace=False, pathmod=os.path):
"""Copy a plain file. Permissions are not copied. If `dest` already
exists, raises a FilesystemError unless `replace` is True. Has no
effect if `path` is the same as `dest`. Paths are translated to
system paths before the syscall.
"""
if samefile(path, dest):
return
path = syspath(path)
dest = syspath(dest)
_assert_not_exists(dest, pathmod)
return shutil.copyfile(path, dest)
if not replace and pathmod.exists(dest):
raise FilesystemError('file exists', 'copy', (path, dest))
try:
shutil.copyfile(path, dest)
except (OSError, IOError) as exc:
raise FilesystemError(exc, 'copy', (path, dest),
traceback.format_exc())
def move(path, dest, replace=False, pathmod=None):
"""Rename a file. dest may not be a directory. If dest already
exists, raises an OSError unless replace is True. Hos no effect if
path is the same as dest. Paths are translated to system paths.
def move(path, dest, replace=False, pathmod=os.path):
"""Rename a file. `dest` may not be a directory. If `dest` already
exists, raises an OSError unless `replace` is True. Has no effect if
`path` is the same as `dest`. If the paths are on different
filesystems (or the rename otherwise fails), a copy is attempted
instead, in which case metadata will *not* be preserved. Paths are
translated to system paths.
"""
if samefile(path, dest):
return
path = syspath(path)
dest = syspath(dest)
_assert_not_exists(dest, pathmod)
return shutil.move(path, dest)
if pathmod.exists(dest):
raise FilesystemError('file exists', 'rename', (path, dest),
traceback.format_exc())
# First, try renaming the file.
try:
os.rename(path, dest)
except OSError:
# Otherwise, copy and delete the original.
try:
shutil.copyfile(path, dest)
os.remove(path)
except (OSError, IOError) as exc:
raise FilesystemError(exc, 'move', (path, dest),
traceback.format_exc())
def unique_path(path):
"""Returns a version of ``path`` that does not exist on the
@@ -277,33 +393,33 @@ def unique_path(path):
if not os.path.exists(new_path):
return new_path
# Note: POSIX actually supports \ and : -- I just think they're
# a pain. And ? has caused problems for some.
# Note: The Windows "reserved characters" are, of course, allowed on
# Unix. They are forbidden here because they cause problems on Samba
# shares, which are sufficiently common as to cause frequent problems.
# http://msdn.microsoft.com/en-us/library/windows/desktop/aa365247.aspx
CHAR_REPLACE = [
(re.compile(r'[\\/\?"]|^\.'), '_'),
(re.compile(r':'), '-'),
]
CHAR_REPLACE_WINDOWS = [
(re.compile(r'["\*<>\|]|^\.|\.$|\s+$'), '_'),
(re.compile(ur'[\\/]'), u'_'), # / and \ -- forbidden everywhere.
(re.compile(ur'^\.'), u'_'), # Leading dot (hidden files on Unix).
(re.compile(ur'[\x00-\x1f]'), u''), # Control characters.
(re.compile(ur'[<>:"\?\*\|]'), u'_'), # Windows "reserved characters".
(re.compile(ur'\.$'), u'_'), # Trailing dots.
(re.compile(ur'\s+$'), u''), # Trailing whitespace.
]
def sanitize_path(path, pathmod=None, replacements=None):
"""Takes a path and makes sure that it is legal. Returns a new path.
Only works with fragments; won't work reliably on Windows when a
path begins with a drive letter. Path separators (including altsep!)
should already be cleaned from the path components. If replacements
is specified, it is used *instead* of the default set of
replacements for the platform; it must be a list of (compiled regex,
replacement string) pairs.
"""Takes a path (as a Unicode string) and makes sure that it is
legal. Returns a new path. Only works with fragments; won't work
reliably on Windows when a path begins with a drive letter. Path
separators (including altsep!) should already be cleaned from the
path components. If replacements is specified, it is used *instead*
of the default set of replacements for the platform; it must be a
list of (compiled regex, replacement string) pairs.
"""
pathmod = pathmod or os.path
windows = pathmod.__name__ == 'ntpath'
# Choose the appropriate replacements.
if not replacements:
replacements = list(CHAR_REPLACE)
if windows:
replacements += CHAR_REPLACE_WINDOWS
comps = components(path, pathmod)
if not comps:
return ''
@@ -311,10 +427,10 @@ def sanitize_path(path, pathmod=None, replacements=None):
# Replace special characters.
for regex, repl in replacements:
comp = regex.sub(repl, comp)
# Truncate each component.
comp = comp[:MAX_FILENAME_LENGTH]
comps[i] = comp
return pathmod.join(*comps)
@@ -336,10 +452,10 @@ def sanitize_for_path(value, pathmod, key=None):
value = u'%02i' % (value or 0)
elif key == 'bitrate':
# Bitrate gets formatted as kbps.
value = u'%ikbps' % ((value or 0) / 1000)
value = u'%ikbps' % ((value or 0) // 1000)
elif key == 'samplerate':
# Sample rate formatted as kHz.
value = u'%ikHz' % ((value or 0) / 1000)
value = u'%ikHz' % ((value or 0) // 1000)
else:
value = unicode(value)
return value
@@ -360,7 +476,7 @@ def levenshtein(s1, s2):
return levenshtein(s2, s1)
if not s1:
return len(s2)
previous_row = xrange(len(s2) + 1)
for i, c1 in enumerate(s1):
current_row = [i + 1]
@@ -370,7 +486,7 @@ def levenshtein(s1, s2):
substitutions = previous_row[j] + (c1 != c2)
current_row.append(min(insertions, deletions, substitutions))
previous_row = current_row
return previous_row[-1]
def plurality(objs):