diff --git a/headphones/metadata.py b/headphones/metadata.py new file mode 100644 index 00000000..3a423b32 --- /dev/null +++ b/headphones/metadata.py @@ -0,0 +1,352 @@ +# encoding=utf8 +# This file is part of Headphones. +# +# Headphones is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# Headphones is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with Headphones. If not, see . +""" +Track/album metadata handling routines. +""" + +from __future__ import print_function +from beets.mediafile import MediaFile, UnreadableFileError +import headphones +from headphones import logger +import os.path +import datetime + +__author__ = "Andrzej Ciarkowski " + + +class MetadataDict(dict): + """ + Dictionary which allows for case-insensitive, but case-preserving lookup, + allowing to put different values under $Album and $album, but still + finding some value if only single key is present and called with any + variation of the name's case. + + Keeps case-sensitive mapping in superclass dict, and case-insensitive ( + lowercase) in member variable self._lower. If case-sensitive lookup + fails, another case-insensitive attempt is made. + """ + def __setitem__(self, key, value): + super(MetadataDict, self).__setitem__(key, value) + self._lower.__setitem__(key.lower(), value) + + def add_items(self, items): + # type: (Iterable[Tuple[Any,Any]])->None + """ + Add (key,value) pairs to this dictionary using iterable as an input. + :param items: input items. + """ + for key, value in items: + self.__setitem__(key, value) + + def __init__(self, seq=None, **kwargs): + if isinstance(seq, MetadataDict): + super(MetadataDict, self).__init__(seq) + self._lower = dict(seq._lower) + else: + super(MetadataDict, self).__init__() + self._lower = {} + if seq is not None: + try: + self.add_items(seq.iteritems()) + except KeyError: + self.add_items(seq) + + def __getitem__(self, item): + try: + return super(MetadataDict, self).__getitem__(item) + except KeyError: + return self._lower.__getitem__(item.lower()) + + def __contains__(self, item): + return self._lower.__contains__(item.lower()) + + +class Vars: + """ + Metadata $variable names (only ones set explicitly by headphones). + """ + DISC = '$Disc' + TRACK = '$Track' + TITLE = '$Title' + ARTIST = '$Artist' + SORT_ARTIST = '$SortArtist' + ALBUM = '$Album' + YEAR = '$Year' + DATE = '$Date' + EXTENSION = '$Extension' + ORIGINAL_FOLDER = '$OriginalFolder' + FIRST_LETTER = '$First' + TYPE = '$Type' + TITLE_LOWER = TITLE.lower() + ARTIST_LOWER = ARTIST.lower() + SORT_ARTIST_LOWER = SORT_ARTIST.lower() + ALBUM_LOWER = ALBUM.lower() + ORIGINAL_FOLDER_LOWER = ORIGINAL_FOLDER.lower() + FIRST_LETTER_LOWER = FIRST_LETTER.lower() + TYPE_LOWER = TYPE.lower() + + +def _verify_var_type(val): + """ + Check if type of value is allowed as a variable in pathname substitution. + """ + return isinstance(val, (basestring, int, float, datetime.date)) + + +def _as_str(val): + if isinstance(val, basestring): + return val + else: + return str(val) + + +def _media_file_to_dict(mf, d): + """ + Populate dict with tags read from media file. + """ + for fld in mf.readable_fields(): + if 'art' == fld: + # skip embedded artwork as it's a BLOB + continue + val = getattr(mf, fld) + if val is None: + val = '' + # include only types with meaningful string representation + if _verify_var_type(val): + d['$' + fld] = _as_str(val) + + +def _row_to_dict(row, d): + """ + Populate dict with database row fields. + """ + for fld in row.keys(): + val = row[fld] + if val is None: + val = '' + if _verify_var_type(val): + d['$' + fld] = _as_str(val) + + +def _date_year(release): + # type: (sqlite3.Row)->Tuple[str,str] + """ + Extract release date and year from database row + """ + try: + date = release['ReleaseDate'] + except TypeError: + date = '' + + if date is not None: + year = date[:4] + else: + year = '' + return date, year + + +def file_metadata(path, release): + # type: (str,sqlite3.Row)->Tuple[Mapping[str,str],bool] + """ + Prepare metadata dictionary for path substitution, based on file name, + the tags stored within it and release info from the db. + :param path: media file path + :param release: database row with release info + :return: pair (dict,boolean indicating if Vars.TITLE is taken from tags or + file name). (None,None) if unable to parse the media file. + """ + try: + f = MediaFile(path) + except UnreadableFileError as ex: + logger.info("MediaFile couldn't parse: %s (%s)", + path.decode(headphones.SYS_ENCODING, 'replace'), + str(ex)) + return None, None + + res = MetadataDict() + # add existing tags first, these will get overwritten by musicbrainz from db + _media_file_to_dict(f, res) + # raw database fields come next + _row_to_dict(release, res) + + date, year = _date_year(release) + if not f.disc: + disc_number = '' + else: + disc_number = '%d' % f.disc + + if not f.track: + track_number = '' + else: + track_number = '%02d' % f.track + + if not f.title: + basename = os.path.basename( + path.decode(headphones.SYS_ENCODING, 'replace')) + title = os.path.splitext(basename)[0] + from_metadata = False + else: + title = f.title + from_metadata = True + + ext = os.path.splitext(path)[1] + if release['ArtistName'] == "Various Artists" and f.artist: + artist_name = f.artist + else: + artist_name = release['ArtistName'] + + if artist_name.startswith('The '): + sort_name = artist_name[4:] + ", The" + else: + sort_name = artist_name + + album_title = release['AlbumTitle'] + override_values = { + Vars.DISC: disc_number, + Vars.TRACK: track_number, + Vars.TITLE: title, + Vars.ARTIST: artist_name, + Vars.SORT_ARTIST: sort_name, + Vars.ALBUM: album_title, + Vars.YEAR: year, + Vars.DATE: date, + Vars.EXTENSION: ext, + Vars.TITLE_LOWER: title.lower(), + Vars.ARTIST_LOWER: artist_name.lower(), + Vars.SORT_ARTIST_LOWER: sort_name.lower(), + Vars.ALBUM_LOWER: album_title.lower(), + } + res.add_items(override_values.iteritems()) + return res, from_metadata + + +def _intersect(d1, d2): + # type: (Mapping,Mapping)->Mapping + """ + Create intersection (common part) of two dictionaries. + """ + res = {} + for key, val in d1.iteritems(): + if key in d2 and d2[key] == val: + res[key] = val + return res + + +def album_metadata(path, release, common_tags): + # type: (str,sqlite3.Row,Mapping[str,str])->Mapping[str,str] + """ + Prepare metadata dictionary for path substitution of album folder. + :param path: album path to prepare metadata for. + :param release: database row with release properties. + :param common_tags: common set of tags gathered from media files. + :return: metadata dictionary with substitution variables for rendering path. + """ + date, year = _date_year(release) + artist = release['ArtistName'].replace('/', '_') + album = release['AlbumTitle'].replace('/', '_') + release_type = release['Type'].replace('/', '_') + + if release['ArtistName'].startswith('The '): + sort_name = release['ArtistName'][4:] + ", The" + else: + sort_name = release['ArtistName'] + + if sort_name[0].isdigit(): + first_char = u'0-9' + else: + first_char = sort_name[0] + + for r, d, f in os.walk(path): + try: + orig_folder = os.path.basename( + os.path.normpath(r).decode(headphones.SYS_ENCODING, 'replace')) + except: + orig_folder = u'' + + override_values = { + Vars.ARTIST: artist, + Vars.SORT_ARTIST: sort_name, + Vars.ALBUM: album, + Vars.YEAR: year, + Vars.DATE: date, + Vars.TYPE: release_type, + Vars.ORIGINAL_FOLDER: orig_folder, + Vars.FIRST_LETTER: first_char.upper(), + Vars.ARTIST_LOWER: artist.lower(), + Vars.SORT_ARTIST_LOWER: sort_name.lower(), + Vars.ALBUM_LOWER: album.lower(), + Vars.TYPE_LOWER: release_type.lower(), + Vars.FIRST_LETTER_LOWER: first_char.lower(), + Vars.ORIGINAL_FOLDER_LOWER: orig_folder.lower() + } + res = MetadataDict(common_tags) + res.add_items(override_values.iteritems()) + return res + + +def albumart_metadata(release, common_tags): + # type: (sqlite3.Row,Mapping)->Mapping + """ + Prepare metadata dictionary for path subtitution of album art file. + :param release: database row with release properties. + :param common_tags: common set of tags gathered from media files. + :return: metadata dictionary with substitution variables for rendering path. + """ + date, year = _date_year(release) + override_values = { + Vars.ARTIST: release['ArtistName'], + Vars.ALBUM: release['AlbumTitle'], + Vars.YEAR: year, + Vars.DATE: date, + Vars.ARTIST_LOWER: release['ArtistName'].lower(), + Vars.ALBUM_LOWER: release['AlbumTitle'].lower() + } + res = MetadataDict(common_tags) + res.add_items(override_values.iteritems()) + return res + + +class AlbumMetadataBuilder(object): + """ + Facilitates building of album metadata as a common set of tags retrieved + from media files. + """ + + def __init__(self): + self._common = None + + def add_media_file(self, mf): + # type: (Mapping)->None + """ + Add metadata tags read from media file to album metadata. + :param mf: MediaFile + """ + md = {} + _media_file_to_dict(mf, md) + if self._common is None: + self._common = md + else: + self._common = _intersect(self._common, md) + + def build(self): + # type: (None)->Mapping + """ + Build case-insensitive, case-preserving dict from gathered metadata + tags. + :return: dictinary-like object filled with $variables based on common + tags. + """ + return MetadataDict(self._common) diff --git a/headphones/metadata_test.py b/headphones/metadata_test.py new file mode 100644 index 00000000..c4e6bb47 --- /dev/null +++ b/headphones/metadata_test.py @@ -0,0 +1,148 @@ +# encoding=utf8 +# This file is part of Headphones. +# +# Headphones is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# Headphones is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with Headphones. If not, see . +""" +Test module for metadata. +""" +import headphones.metadata as _md +from headphones.metadata import MetadataDict +import datetime + +from unittestcompat import TestCase + + +__author__ = "Andrzej Ciarkowski " + + +class _MockMediaFile(object): + + def __init__(self, artist, album, year, track, title, label): + self.artist = artist + self.album = album + self.year = year + self.track = track + self.title = title + self.label = label + self.art = 'THIS IS ART BLOB' + + @classmethod + def readable_fields(cls): + return 'artist', 'album', 'year', 'track', 'title', 'label', 'art' + + +class _MockDatabaseRow(object): + + def __init__(self, d): + self._dict = dict(d) + + def keys(self): + return self._dict.iterkeys() + + def __getitem__(self, item): + return self._dict[item] + + +class MetadataTest(TestCase): + """ + Tests for metadata module. + """ + + def test_metadata_dict_ci(self): + """MetadataDict: case-insensitive lookup""" + expected = u'naïve' + key_var = '$TitlE' + m = MetadataDict({key_var.lower(): u'naïve'}) + self.assertFalse('$track' in m) + self.assertTrue('$tITLe' in m, "cross-case lookup with 'in'") + self.assertEqual(m[key_var], expected, "cross-case lookup success") + self.assertEqual(m[key_var.lower()], expected, "same-case lookup " + "succes") + + def test_metadata_dict_cs(self): + """MetadataDice: case-preserving lookup""" + expected_var = u'NaïVe' + key_var = '$TitlE' + m = MetadataDict({ + key_var.lower(): expected_var.lower(), + key_var: expected_var + }) + self.assertFalse('$track' in m) + self.assertTrue('$tITLe' in m, "cross-case lookup with 'in'") + self.assertEqual(m[key_var.lower()], expected_var.lower(), + "case-preserving lookup lower") + self.assertEqual(m[key_var], expected_var, + "case-preserving lookup variable") + + def test_dict_intersect(self): + """metadata: check dictionary intersect function validity""" + d1 = { + 'one': 'one', + 'two': 'two', + 'three': 'zonk' + } + d2 = { + 'two': 'two', + 'three': 'three' + } + expected = { + 'two': 'two' + } + self.assertItemsEqual( + expected, _md._intersect(d1, d2), "check dictionary intersection " + "is common part indeed" + ) + del d1['two'] + expected = {} + self.assertItemsEqual( + expected, _md._intersect(d1, d2), "check intersection empty" + ) + + def test_album_metadata_builder(self): + """AlbumMetadataBuilder: check validity""" + mb = _md.AlbumMetadataBuilder() + f1 = _MockMediaFile('artist', 'album', 2000, 1, 'track1', 'Ant-Zen') + mb.add_media_file(f1) + f2 = _MockMediaFile('artist', 'album', 2000, 2, 'track2', 'Ant-Zen') + mb.add_media_file(f2) + + md = mb.build() + expected = { + _md.Vars.ARTIST_LOWER: 'artist', + _md.Vars.ALBUM_LOWER: 'album', + _md.Vars.YEAR.lower(): 2000, + '$label': 'Ant-Zen' + } + self.assertItemsEqual( + expected, md, "check AlbumMetadataBuilder validity" + ) + + def test_populate_from_row(self): + """metadata: check populating metadata from database row""" + row = _MockDatabaseRow({ + 'ArtistName': 'artist', + 'AlbumTitle': 'album', + 'ReleaseDate': datetime.date(2004, 11, 28), + 'Variation': 5, + 'WrongTyped': complex(1, -1) + }) + md = _md.MetadataDict() + _md._row_to_dict(row, md) + expected = { + '$ArtistName': 'artist', + '$AlbumTitle': 'album', + '$ReleaseDate': '2004-11-28', + '$Variation': '5' + } + self.assertItemsEqual(expected, md, "check _row_to_dict() valid") diff --git a/headphones/postprocessor.py b/headphones/postprocessor.py index 31a758f2..dc22ef0e 100755 --- a/headphones/postprocessor.py +++ b/headphones/postprocessor.py @@ -30,6 +30,7 @@ from beetsplug import lyrics as beetslyrics from headphones import notifiers, utorrent, transmission, deluge from headphones import db, albumart, librarysync from headphones import logger, helpers, request, mb, music_encoder +from headphones import metadata postprocessor_lock = threading.Lock() @@ -339,14 +340,13 @@ def doPostProcessing(albumid, albumpath, release, tracks, downloaded_track_list, if any(files.lower().endswith('.' + x.lower()) for x in headphones.MEDIA_FORMATS): downloaded_track_list.append(os.path.join(r, files)) + builder = metadata.AlbumMetadataBuilder() # Check if files are valid media files and are writable, before the steps # below are executed. This simplifies errors and prevents unfinished steps. for downloaded_track in downloaded_track_list: try: f = MediaFile(downloaded_track) - if f is None: - # this test is just to keep pyflakes from complaining about an unused variable - return + builder.add_media_file(f) except (FileTypeError, UnreadableFileError): logger.error("Track file is not a valid media file: %s. Not " "continuing.", downloaded_track.decode( @@ -378,6 +378,7 @@ def doPostProcessing(albumid, albumpath, release, tracks, downloaded_track_list, shutil.rmtree(new_folder) return + metadata_dict = builder.build() # start encoding if headphones.CONFIG.MUSIC_ENCODER: downloaded_track_list = music_encoder.encode(albumpath) @@ -413,7 +414,7 @@ def doPostProcessing(albumid, albumpath, release, tracks, downloaded_track_list, renameNFO(albumpath) if headphones.CONFIG.ADD_ALBUM_ART and artwork: - addAlbumArt(artwork, albumpath, release) + addAlbumArt(artwork, albumpath, release, metadata_dict) if headphones.CONFIG.CORRECT_METADATA: correctedMetadata = correctMetadata(albumid, release, downloaded_track_list) @@ -433,7 +434,7 @@ def doPostProcessing(albumid, albumpath, release, tracks, downloaded_track_list, 'No DESTINATION_DIR has been set. Set "Destination Directory" to the parent directory you want to move the files to') albumpaths = [albumpath] elif headphones.CONFIG.MOVE_FILES and headphones.CONFIG.DESTINATION_DIR: - albumpaths = moveFiles(albumpath, release, tracks) + albumpaths = moveFiles(albumpath, release, metadata_dict) else: albumpaths = [albumpath] @@ -606,34 +607,20 @@ def embedAlbumArt(artwork, downloaded_track_list): continue -def addAlbumArt(artwork, albumpath, release): +def addAlbumArt(artwork, albumpath, release, metadata_dict): logger.info('Adding album art to folder') + md = metadata.album_metadata(albumpath, release, metadata_dict) - try: - date = release['ReleaseDate'] - except TypeError: - date = u'' + ext = ".jpg" + # PNGs are possibe here too + if artwork[:4] == '\x89PNG': + ext = ".png" - if date is not None: - year = date[:4] - else: - year = u'' + album_art_name = helpers.replace_all( + headphones.CONFIG.ALBUM_ART_FORMAT.strip(), md) + ext - values = {'$Artist': release['ArtistName'], - '$Album': release['AlbumTitle'], - '$Year': year, - '$Date': date, - '$artist': release['ArtistName'].lower(), - '$album': release['AlbumTitle'].lower(), - '$year': year, - '$date': date - } - - album_art_name = helpers.replace_all(headphones.CONFIG.ALBUM_ART_FORMAT.strip(), - values) + ".jpg" - - album_art_name = helpers.replace_illegal_chars(album_art_name).encode(headphones.SYS_ENCODING, - 'replace') + album_art_name = helpers.replace_illegal_chars(album_art_name).encode( + headphones.SYS_ENCODING, 'replace') if headphones.CONFIG.FILE_UNDERSCORES: album_art_name = album_art_name.replace(' ', '_') @@ -680,62 +667,15 @@ def renameNFO(albumpath): os.path.join(r, file).decode(headphones.SYS_ENCODING, 'replace'), e)) -def moveFiles(albumpath, release, tracks): +def moveFiles(albumpath, release, metadata_dict): logger.info("Moving files: %s" % albumpath) - try: - date = release['ReleaseDate'] - except TypeError: - date = u'' - if date is not None: - year = date[:4] - else: - year = u'' + md = metadata.album_metadata(albumpath, release, metadata_dict) + folder = helpers.replace_all( + headphones.CONFIG.FOLDER_FORMAT.strip(), md, normalize=True) - artist = release['ArtistName'].replace('/', '_') - album = release['AlbumTitle'].replace('/', '_') if headphones.CONFIG.FILE_UNDERSCORES: - artist = artist.replace(' ', '_') - album = album.replace(' ', '_') - - releasetype = release['Type'].replace('/', '_') - - if release['ArtistName'].startswith('The '): - sortname = release['ArtistName'][4:] + ", The" - else: - sortname = release['ArtistName'] - - if sortname[0].isdigit(): - firstchar = u'0-9' - else: - firstchar = sortname[0] - - for r, d, f in os.walk(albumpath): - try: - origfolder = os.path.basename( - os.path.normpath(r).decode(headphones.SYS_ENCODING, 'replace')) - except: - origfolder = u'' - - values = {'$Artist': artist, - '$SortArtist': sortname, - '$Album': album, - '$Year': year, - '$Date': date, - '$Type': releasetype, - '$OriginalFolder': origfolder, - '$First': firstchar.upper(), - '$artist': artist.lower(), - '$sortartist': sortname.lower(), - '$album': album.lower(), - '$year': year, - '$date': date, - '$type': releasetype.lower(), - '$first': firstchar.lower(), - '$originalfolder': origfolder.lower() - } - - folder = helpers.replace_all(headphones.CONFIG.FOLDER_FORMAT.strip(), values, normalize=True) + folder = folder.replace(' ', '_') folder = helpers.replace_illegal_chars(folder, type="folder") folder = folder.replace('./', '_/').replace('/.', '/_') @@ -1080,82 +1020,25 @@ def embedLyrics(downloaded_track_list): def renameFiles(albumpath, downloaded_track_list, release): logger.info('Renaming files') - try: - date = release['ReleaseDate'] - except TypeError: - date = u'' - - if date is not None: - year = date[:4] - else: - year = u'' - # Until tagging works better I'm going to rely on the already provided metadata for downloaded_track in downloaded_track_list: - try: - f = MediaFile(downloaded_track) - except: - logger.info("MediaFile couldn't parse: %s", - downloaded_track.decode(headphones.SYS_ENCODING, 'replace')) + md, from_metadata = metadata.file_metadata(downloaded_track, release) + if md is None: + # unable to parse media file, skip file continue - if not f.disc: - discnumber = '' - else: - discnumber = '%d' % f.disc - - if not f.track: - tracknumber = '' - else: - tracknumber = '%02d' % f.track - - if not f.title: - - basename = os.path.basename(downloaded_track.decode(headphones.SYS_ENCODING, 'replace')) - title = os.path.splitext(basename)[0] - ext = os.path.splitext(basename)[1] - + ext = md[metadata.Vars.EXTENSION] + if not from_metadata: + title = md[metadata.Vars.TITLE] new_file_name = helpers.cleanTitle(title) + ext - else: - title = f.title + new_file_name = helpers.replace_all( + headphones.CONFIG.FILE_FORMAT.strip(), md + ).replace('/', '_') + ext - if release['ArtistName'] == "Various Artists" and f.artist: - artistname = f.artist - else: - artistname = release['ArtistName'] - - if artistname.startswith('The '): - sortname = artistname[4:] + ", The" - else: - sortname = artistname - - values = {'$Disc': discnumber, - '$Track': tracknumber, - '$Title': title, - '$Artist': artistname, - '$SortArtist': sortname, - '$Album': release['AlbumTitle'], - '$Year': year, - '$Date': date, - '$disc': discnumber, - '$track': tracknumber, - '$title': title.lower(), - '$artist': artistname.lower(), - '$sortartist': sortname.lower(), - '$album': release['AlbumTitle'].lower(), - '$year': year, - '$date': date - } - - ext = os.path.splitext(downloaded_track)[1] - - new_file_name = helpers.replace_all(headphones.CONFIG.FILE_FORMAT.strip(), - values).replace('/', '_') + ext - - new_file_name = helpers.replace_illegal_chars(new_file_name).encode(headphones.SYS_ENCODING, - 'replace') + new_file_name = helpers.replace_illegal_chars(new_file_name).encode( + headphones.SYS_ENCODING, 'replace') if headphones.CONFIG.FILE_UNDERSCORES: new_file_name = new_file_name.replace(' ', '_') @@ -1166,8 +1049,8 @@ def renameFiles(albumpath, downloaded_track_list, release): new_file = os.path.join(albumpath, new_file_name) if downloaded_track == new_file_name: - logger.debug("Renaming for: " + downloaded_track.decode(headphones.SYS_ENCODING, - 'replace') + " is not neccessary") + logger.debug("Renaming for: " + downloaded_track.decode( + headphones.SYS_ENCODING, 'replace') + " is not neccessary") continue logger.debug('Renaming %s ---> %s',