mirror of
https://github.com/rembo10/headphones.git
synced 2026-05-10 05:39:35 +01:00
350 lines
10 KiB
Python
350 lines
10 KiB
Python
# Copyright 2006 Joe Wreschnig
|
|
#
|
|
# This program is free software; you can redistribute it and/or modify
|
|
# it under the terms of the GNU General Public License version 2 as
|
|
# published by the Free Software Foundation.
|
|
|
|
"""Utility classes for Mutagen.
|
|
|
|
You should not rely on the interfaces here being stable. They are
|
|
intended for internal use in Mutagen only.
|
|
"""
|
|
|
|
import struct
|
|
|
|
from fnmatch import fnmatchcase
|
|
|
|
|
|
class DictMixin(object):
|
|
"""Implement the dict API using keys() and __*item__ methods.
|
|
|
|
Similar to UserDict.DictMixin, this takes a class that defines
|
|
__getitem__, __setitem__, __delitem__, and keys(), and turns it
|
|
into a full dict-like object.
|
|
|
|
UserDict.DictMixin is not suitable for this purpose because it's
|
|
an old-style class.
|
|
|
|
This class is not optimized for very large dictionaries; many
|
|
functions have linear memory requirements. I recommend you
|
|
override some of these functions if speed is required.
|
|
"""
|
|
|
|
def __iter__(self):
|
|
return iter(self.keys())
|
|
|
|
def has_key(self, key):
|
|
try:
|
|
self[key]
|
|
except KeyError:
|
|
return False
|
|
else:
|
|
return True
|
|
__contains__ = has_key
|
|
|
|
iterkeys = lambda self: iter(self.keys())
|
|
|
|
def values(self):
|
|
return map(self.__getitem__, self.keys())
|
|
itervalues = lambda self: iter(self.values())
|
|
|
|
def items(self):
|
|
return zip(self.keys(), self.values())
|
|
iteritems = lambda s: iter(s.items())
|
|
|
|
def clear(self):
|
|
map(self.__delitem__, self.keys())
|
|
|
|
def pop(self, key, *args):
|
|
if len(args) > 1:
|
|
raise TypeError("pop takes at most two arguments")
|
|
try:
|
|
value = self[key]
|
|
except KeyError:
|
|
if args:
|
|
return args[0]
|
|
else:
|
|
raise
|
|
del(self[key])
|
|
return value
|
|
|
|
def popitem(self):
|
|
try:
|
|
key = self.keys()[0]
|
|
return key, self.pop(key)
|
|
except IndexError:
|
|
raise KeyError("dictionary is empty")
|
|
|
|
def update(self, other=None, **kwargs):
|
|
if other is None:
|
|
self.update(kwargs)
|
|
other = {}
|
|
|
|
try:
|
|
map(self.__setitem__, other.keys(), other.values())
|
|
except AttributeError:
|
|
for key, value in other:
|
|
self[key] = value
|
|
|
|
def setdefault(self, key, default=None):
|
|
try:
|
|
return self[key]
|
|
except KeyError:
|
|
self[key] = default
|
|
return default
|
|
|
|
def get(self, key, default=None):
|
|
try:
|
|
return self[key]
|
|
except KeyError:
|
|
return default
|
|
|
|
def __repr__(self):
|
|
return repr(dict(self.items()))
|
|
|
|
def __cmp__(self, other):
|
|
if other is None:
|
|
return 1
|
|
else:
|
|
return cmp(dict(self.items()), other)
|
|
|
|
__hash__ = object.__hash__
|
|
|
|
def __len__(self):
|
|
return len(self.keys())
|
|
|
|
|
|
class DictProxy(DictMixin):
|
|
def __init__(self, *args, **kwargs):
|
|
self.__dict = {}
|
|
super(DictProxy, self).__init__(*args, **kwargs)
|
|
|
|
def __getitem__(self, key):
|
|
return self.__dict[key]
|
|
|
|
def __setitem__(self, key, value):
|
|
self.__dict[key] = value
|
|
|
|
def __delitem__(self, key):
|
|
del(self.__dict[key])
|
|
|
|
def keys(self):
|
|
return self.__dict.keys()
|
|
|
|
|
|
class cdata(object):
|
|
"""C character buffer to Python numeric type conversions."""
|
|
|
|
from struct import error
|
|
error = error
|
|
|
|
short_le = staticmethod(lambda data: struct.unpack('<h', data)[0])
|
|
ushort_le = staticmethod(lambda data: struct.unpack('<H', data)[0])
|
|
|
|
short_be = staticmethod(lambda data: struct.unpack('>h', data)[0])
|
|
ushort_be = staticmethod(lambda data: struct.unpack('>H', data)[0])
|
|
|
|
int_le = staticmethod(lambda data: struct.unpack('<i', data)[0])
|
|
uint_le = staticmethod(lambda data: struct.unpack('<I', data)[0])
|
|
|
|
int_be = staticmethod(lambda data: struct.unpack('>i', data)[0])
|
|
uint_be = staticmethod(lambda data: struct.unpack('>I', data)[0])
|
|
|
|
longlong_le = staticmethod(lambda data: struct.unpack('<q', data)[0])
|
|
ulonglong_le = staticmethod(lambda data: struct.unpack('<Q', data)[0])
|
|
|
|
longlong_be = staticmethod(lambda data: struct.unpack('>q', data)[0])
|
|
ulonglong_be = staticmethod(lambda data: struct.unpack('>Q', data)[0])
|
|
|
|
to_short_le = staticmethod(lambda data: struct.pack('<h', data))
|
|
to_ushort_le = staticmethod(lambda data: struct.pack('<H', data))
|
|
|
|
to_short_be = staticmethod(lambda data: struct.pack('>h', data))
|
|
to_ushort_be = staticmethod(lambda data: struct.pack('>H', data))
|
|
|
|
to_int_le = staticmethod(lambda data: struct.pack('<i', data))
|
|
to_uint_le = staticmethod(lambda data: struct.pack('<I', data))
|
|
|
|
to_int_be = staticmethod(lambda data: struct.pack('>i', data))
|
|
to_uint_be = staticmethod(lambda data: struct.pack('>I', data))
|
|
|
|
to_longlong_le = staticmethod(lambda data: struct.pack('<q', data))
|
|
to_ulonglong_le = staticmethod(lambda data: struct.pack('<Q', data))
|
|
|
|
to_longlong_be = staticmethod(lambda data: struct.pack('>q', data))
|
|
to_ulonglong_be = staticmethod(lambda data: struct.pack('>Q', data))
|
|
|
|
bitswap = ''.join([chr(sum([((val >> i) & 1) << (7-i) for i in range(8)]))
|
|
for val in range(256)])
|
|
del(i)
|
|
del(val)
|
|
|
|
test_bit = staticmethod(lambda value, n: bool((value >> n) & 1))
|
|
|
|
|
|
def lock(fileobj):
|
|
"""Lock a file object 'safely'.
|
|
|
|
That means a failure to lock because the platform doesn't
|
|
support fcntl or filesystem locks is not considered a
|
|
failure. This call does block.
|
|
|
|
Returns whether or not the lock was successful, or
|
|
raises an exception in more extreme circumstances (full
|
|
lock table, invalid file).
|
|
"""
|
|
|
|
try:
|
|
import fcntl
|
|
except ImportError:
|
|
return False
|
|
else:
|
|
try:
|
|
fcntl.lockf(fileobj, fcntl.LOCK_EX)
|
|
except IOError:
|
|
# FIXME: There's possibly a lot of complicated
|
|
# logic that needs to go here in case the IOError
|
|
# is EACCES or EAGAIN.
|
|
return False
|
|
else:
|
|
return True
|
|
|
|
|
|
def unlock(fileobj):
|
|
"""Unlock a file object.
|
|
|
|
Don't call this on a file object unless a call to lock()
|
|
returned true.
|
|
"""
|
|
|
|
# If this fails there's a mismatched lock/unlock pair,
|
|
# so we definitely don't want to ignore errors.
|
|
import fcntl
|
|
fcntl.lockf(fileobj, fcntl.LOCK_UN)
|
|
|
|
|
|
def insert_bytes(fobj, size, offset, BUFFER_SIZE=2**16):
|
|
"""Insert size bytes of empty space starting at offset.
|
|
|
|
fobj must be an open file object, open rb+ or
|
|
equivalent. Mutagen tries to use mmap to resize the file, but
|
|
falls back to a significantly slower method if mmap fails.
|
|
"""
|
|
|
|
assert 0 < size
|
|
assert 0 <= offset
|
|
locked = False
|
|
fobj.seek(0, 2)
|
|
filesize = fobj.tell()
|
|
movesize = filesize - offset
|
|
fobj.write('\x00' * size)
|
|
fobj.flush()
|
|
try:
|
|
try:
|
|
import mmap
|
|
map = mmap.mmap(fobj.fileno(), filesize + size)
|
|
try:
|
|
map.move(offset + size, offset, movesize)
|
|
finally:
|
|
map.close()
|
|
except (ValueError, EnvironmentError, ImportError):
|
|
# handle broken mmap scenarios
|
|
locked = lock(fobj)
|
|
fobj.truncate(filesize)
|
|
|
|
fobj.seek(0, 2)
|
|
padsize = size
|
|
# Don't generate an enormous string if we need to pad
|
|
# the file out several megs.
|
|
while padsize:
|
|
addsize = min(BUFFER_SIZE, padsize)
|
|
fobj.write("\x00" * addsize)
|
|
padsize -= addsize
|
|
|
|
fobj.seek(filesize, 0)
|
|
while movesize:
|
|
# At the start of this loop, fobj is pointing at the end
|
|
# of the data we need to move, which is of movesize length.
|
|
thismove = min(BUFFER_SIZE, movesize)
|
|
# Seek back however much we're going to read this frame.
|
|
fobj.seek(-thismove, 1)
|
|
nextpos = fobj.tell()
|
|
# Read it, so we're back at the end.
|
|
data = fobj.read(thismove)
|
|
# Seek back to where we need to write it.
|
|
fobj.seek(-thismove + size, 1)
|
|
# Write it.
|
|
fobj.write(data)
|
|
# And seek back to the end of the unmoved data.
|
|
fobj.seek(nextpos)
|
|
movesize -= thismove
|
|
|
|
fobj.flush()
|
|
finally:
|
|
if locked:
|
|
unlock(fobj)
|
|
|
|
|
|
def delete_bytes(fobj, size, offset, BUFFER_SIZE=2**16):
|
|
"""Delete size bytes of empty space starting at offset.
|
|
|
|
fobj must be an open file object, open rb+ or
|
|
equivalent. Mutagen tries to use mmap to resize the file, but
|
|
falls back to a significantly slower method if mmap fails.
|
|
"""
|
|
|
|
locked = False
|
|
assert 0 < size
|
|
assert 0 <= offset
|
|
fobj.seek(0, 2)
|
|
filesize = fobj.tell()
|
|
movesize = filesize - offset - size
|
|
assert 0 <= movesize
|
|
try:
|
|
if movesize > 0:
|
|
fobj.flush()
|
|
try:
|
|
import mmap
|
|
map = mmap.mmap(fobj.fileno(), filesize)
|
|
try:
|
|
map.move(offset, offset + size, movesize)
|
|
finally:
|
|
map.close()
|
|
except (ValueError, EnvironmentError, ImportError):
|
|
# handle broken mmap scenarios
|
|
locked = lock(fobj)
|
|
fobj.seek(offset + size)
|
|
buf = fobj.read(BUFFER_SIZE)
|
|
while buf:
|
|
fobj.seek(offset)
|
|
fobj.write(buf)
|
|
offset += len(buf)
|
|
fobj.seek(offset + size)
|
|
buf = fobj.read(BUFFER_SIZE)
|
|
fobj.truncate(filesize - size)
|
|
fobj.flush()
|
|
finally:
|
|
if locked:
|
|
unlock(fobj)
|
|
|
|
|
|
def utf8(data):
|
|
"""Convert a basestring to a valid UTF-8 str."""
|
|
|
|
if isinstance(data, str):
|
|
return data.decode("utf-8", "replace").encode("utf-8")
|
|
elif isinstance(data, unicode):
|
|
return data.encode("utf-8")
|
|
else:
|
|
raise TypeError("only unicode/str types can be converted to UTF-8")
|
|
|
|
|
|
def dict_match(d, key, default=None):
|
|
try:
|
|
return d[key]
|
|
except KeyError:
|
|
for pattern, value in d.iteritems():
|
|
if fnmatchcase(key, pattern):
|
|
return value
|
|
return default
|