# Copyright 2006 Joe Wreschnig # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License version 2 as # published by the Free Software Foundation. """Utility classes for Mutagen. You should not rely on the interfaces here being stable. They are intended for internal use in Mutagen only. """ import struct from fnmatch import fnmatchcase class DictMixin(object): """Implement the dict API using keys() and __*item__ methods. Similar to UserDict.DictMixin, this takes a class that defines __getitem__, __setitem__, __delitem__, and keys(), and turns it into a full dict-like object. UserDict.DictMixin is not suitable for this purpose because it's an old-style class. This class is not optimized for very large dictionaries; many functions have linear memory requirements. I recommend you override some of these functions if speed is required. """ def __iter__(self): return iter(self.keys()) def has_key(self, key): try: self[key] except KeyError: return False else: return True __contains__ = has_key iterkeys = lambda self: iter(self.keys()) def values(self): return map(self.__getitem__, self.keys()) itervalues = lambda self: iter(self.values()) def items(self): return zip(self.keys(), self.values()) iteritems = lambda s: iter(s.items()) def clear(self): map(self.__delitem__, self.keys()) def pop(self, key, *args): if len(args) > 1: raise TypeError("pop takes at most two arguments") try: value = self[key] except KeyError: if args: return args[0] else: raise del(self[key]) return value def popitem(self): try: key = self.keys()[0] return key, self.pop(key) except IndexError: raise KeyError("dictionary is empty") def update(self, other=None, **kwargs): if other is None: self.update(kwargs) other = {} try: map(self.__setitem__, other.keys(), other.values()) except AttributeError: for key, value in other: self[key] = value def setdefault(self, key, default=None): try: return self[key] except KeyError: self[key] = default return default def get(self, key, default=None): try: return self[key] except KeyError: return default def __repr__(self): return repr(dict(self.items())) def __cmp__(self, other): if other is None: return 1 else: return cmp(dict(self.items()), other) __hash__ = object.__hash__ def __len__(self): return len(self.keys()) class DictProxy(DictMixin): def __init__(self, *args, **kwargs): self.__dict = {} super(DictProxy, self).__init__(*args, **kwargs) def __getitem__(self, key): return self.__dict[key] def __setitem__(self, key, value): self.__dict[key] = value def __delitem__(self, key): del(self.__dict[key]) def keys(self): return self.__dict.keys() class cdata(object): """C character buffer to Python numeric type conversions.""" from struct import error error = error short_le = staticmethod(lambda data: struct.unpack('h', data)[0]) ushort_be = staticmethod(lambda data: struct.unpack('>H', data)[0]) int_le = staticmethod(lambda data: struct.unpack('i', data)[0]) uint_be = staticmethod(lambda data: struct.unpack('>I', data)[0]) longlong_le = staticmethod(lambda data: struct.unpack('q', data)[0]) ulonglong_be = staticmethod(lambda data: struct.unpack('>Q', data)[0]) to_short_le = staticmethod(lambda data: struct.pack('h', data)) to_ushort_be = staticmethod(lambda data: struct.pack('>H', data)) to_int_le = staticmethod(lambda data: struct.pack('i', data)) to_uint_be = staticmethod(lambda data: struct.pack('>I', data)) to_longlong_le = staticmethod(lambda data: struct.pack('q', data)) to_ulonglong_be = staticmethod(lambda data: struct.pack('>Q', data)) bitswap = ''.join([chr(sum([((val >> i) & 1) << (7-i) for i in range(8)])) for val in range(256)]) del(i) del(val) test_bit = staticmethod(lambda value, n: bool((value >> n) & 1)) def lock(fileobj): """Lock a file object 'safely'. That means a failure to lock because the platform doesn't support fcntl or filesystem locks is not considered a failure. This call does block. Returns whether or not the lock was successful, or raises an exception in more extreme circumstances (full lock table, invalid file). """ try: import fcntl except ImportError: return False else: try: fcntl.lockf(fileobj, fcntl.LOCK_EX) except IOError: # FIXME: There's possibly a lot of complicated # logic that needs to go here in case the IOError # is EACCES or EAGAIN. return False else: return True def unlock(fileobj): """Unlock a file object. Don't call this on a file object unless a call to lock() returned true. """ # If this fails there's a mismatched lock/unlock pair, # so we definitely don't want to ignore errors. import fcntl fcntl.lockf(fileobj, fcntl.LOCK_UN) def insert_bytes(fobj, size, offset, BUFFER_SIZE=2**16): """Insert size bytes of empty space starting at offset. fobj must be an open file object, open rb+ or equivalent. Mutagen tries to use mmap to resize the file, but falls back to a significantly slower method if mmap fails. """ assert 0 < size assert 0 <= offset locked = False fobj.seek(0, 2) filesize = fobj.tell() movesize = filesize - offset fobj.write('\x00' * size) fobj.flush() try: try: import mmap map = mmap.mmap(fobj.fileno(), filesize + size) try: map.move(offset + size, offset, movesize) finally: map.close() except (ValueError, EnvironmentError, ImportError): # handle broken mmap scenarios locked = lock(fobj) fobj.truncate(filesize) fobj.seek(0, 2) padsize = size # Don't generate an enormous string if we need to pad # the file out several megs. while padsize: addsize = min(BUFFER_SIZE, padsize) fobj.write("\x00" * addsize) padsize -= addsize fobj.seek(filesize, 0) while movesize: # At the start of this loop, fobj is pointing at the end # of the data we need to move, which is of movesize length. thismove = min(BUFFER_SIZE, movesize) # Seek back however much we're going to read this frame. fobj.seek(-thismove, 1) nextpos = fobj.tell() # Read it, so we're back at the end. data = fobj.read(thismove) # Seek back to where we need to write it. fobj.seek(-thismove + size, 1) # Write it. fobj.write(data) # And seek back to the end of the unmoved data. fobj.seek(nextpos) movesize -= thismove fobj.flush() finally: if locked: unlock(fobj) def delete_bytes(fobj, size, offset, BUFFER_SIZE=2**16): """Delete size bytes of empty space starting at offset. fobj must be an open file object, open rb+ or equivalent. Mutagen tries to use mmap to resize the file, but falls back to a significantly slower method if mmap fails. """ locked = False assert 0 < size assert 0 <= offset fobj.seek(0, 2) filesize = fobj.tell() movesize = filesize - offset - size assert 0 <= movesize try: if movesize > 0: fobj.flush() try: import mmap map = mmap.mmap(fobj.fileno(), filesize) try: map.move(offset, offset + size, movesize) finally: map.close() except (ValueError, EnvironmentError, ImportError): # handle broken mmap scenarios locked = lock(fobj) fobj.seek(offset + size) buf = fobj.read(BUFFER_SIZE) while buf: fobj.seek(offset) fobj.write(buf) offset += len(buf) fobj.seek(offset + size) buf = fobj.read(BUFFER_SIZE) fobj.truncate(filesize - size) fobj.flush() finally: if locked: unlock(fobj) def utf8(data): """Convert a basestring to a valid UTF-8 str.""" if isinstance(data, str): return data.decode("utf-8", "replace").encode("utf-8") elif isinstance(data, unicode): return data.encode("utf-8") else: raise TypeError("only unicode/str types can be converted to UTF-8") def dict_match(d, key, default=None): try: return d[key] except KeyError: for pattern, value in d.iteritems(): if fnmatchcase(key, pattern): return value return default