Merge remote-tracking branch 'andrzejc/metadata' into develop

This commit is contained in:
rembo10
2016-04-05 12:03:15 +01:00
3 changed files with 534 additions and 151 deletions

352
headphones/metadata.py Normal file
View File

@@ -0,0 +1,352 @@
# encoding=utf8
# This file is part of Headphones.
#
# Headphones is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Headphones is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Headphones. If not, see <http://www.gnu.org/licenses/>.
"""
Track/album metadata handling routines.
"""
from __future__ import print_function
from beets.mediafile import MediaFile, UnreadableFileError
import headphones
from headphones import logger
import os.path
import datetime
__author__ = "Andrzej Ciarkowski <andrzej.ciarkowski@gmail.com>"
class MetadataDict(dict):
"""
Dictionary which allows for case-insensitive, but case-preserving lookup,
allowing to put different values under $Album and $album, but still
finding some value if only single key is present and called with any
variation of the name's case.
Keeps case-sensitive mapping in superclass dict, and case-insensitive (
lowercase) in member variable self._lower. If case-sensitive lookup
fails, another case-insensitive attempt is made.
"""
def __setitem__(self, key, value):
super(MetadataDict, self).__setitem__(key, value)
self._lower.__setitem__(key.lower(), value)
def add_items(self, items):
# type: (Iterable[Tuple[Any,Any]])->None
"""
Add (key,value) pairs to this dictionary using iterable as an input.
:param items: input items.
"""
for key, value in items:
self.__setitem__(key, value)
def __init__(self, seq=None, **kwargs):
if isinstance(seq, MetadataDict):
super(MetadataDict, self).__init__(seq)
self._lower = dict(seq._lower)
else:
super(MetadataDict, self).__init__()
self._lower = {}
if seq is not None:
try:
self.add_items(seq.iteritems())
except KeyError:
self.add_items(seq)
def __getitem__(self, item):
try:
return super(MetadataDict, self).__getitem__(item)
except KeyError:
return self._lower.__getitem__(item.lower())
def __contains__(self, item):
return self._lower.__contains__(item.lower())
class Vars:
"""
Metadata $variable names (only ones set explicitly by headphones).
"""
DISC = '$Disc'
TRACK = '$Track'
TITLE = '$Title'
ARTIST = '$Artist'
SORT_ARTIST = '$SortArtist'
ALBUM = '$Album'
YEAR = '$Year'
DATE = '$Date'
EXTENSION = '$Extension'
ORIGINAL_FOLDER = '$OriginalFolder'
FIRST_LETTER = '$First'
TYPE = '$Type'
TITLE_LOWER = TITLE.lower()
ARTIST_LOWER = ARTIST.lower()
SORT_ARTIST_LOWER = SORT_ARTIST.lower()
ALBUM_LOWER = ALBUM.lower()
ORIGINAL_FOLDER_LOWER = ORIGINAL_FOLDER.lower()
FIRST_LETTER_LOWER = FIRST_LETTER.lower()
TYPE_LOWER = TYPE.lower()
def _verify_var_type(val):
"""
Check if type of value is allowed as a variable in pathname substitution.
"""
return isinstance(val, (basestring, int, float, datetime.date))
def _as_str(val):
if isinstance(val, basestring):
return val
else:
return str(val)
def _media_file_to_dict(mf, d):
"""
Populate dict with tags read from media file.
"""
for fld in mf.readable_fields():
if 'art' == fld:
# skip embedded artwork as it's a BLOB
continue
val = getattr(mf, fld)
if val is None:
val = ''
# include only types with meaningful string representation
if _verify_var_type(val):
d['$' + fld] = _as_str(val)
def _row_to_dict(row, d):
"""
Populate dict with database row fields.
"""
for fld in row.keys():
val = row[fld]
if val is None:
val = ''
if _verify_var_type(val):
d['$' + fld] = _as_str(val)
def _date_year(release):
# type: (sqlite3.Row)->Tuple[str,str]
"""
Extract release date and year from database row
"""
try:
date = release['ReleaseDate']
except TypeError:
date = ''
if date is not None:
year = date[:4]
else:
year = ''
return date, year
def file_metadata(path, release):
# type: (str,sqlite3.Row)->Tuple[Mapping[str,str],bool]
"""
Prepare metadata dictionary for path substitution, based on file name,
the tags stored within it and release info from the db.
:param path: media file path
:param release: database row with release info
:return: pair (dict,boolean indicating if Vars.TITLE is taken from tags or
file name). (None,None) if unable to parse the media file.
"""
try:
f = MediaFile(path)
except UnreadableFileError as ex:
logger.info("MediaFile couldn't parse: %s (%s)",
path.decode(headphones.SYS_ENCODING, 'replace'),
str(ex))
return None, None
res = MetadataDict()
# add existing tags first, these will get overwritten by musicbrainz from db
_media_file_to_dict(f, res)
# raw database fields come next
_row_to_dict(release, res)
date, year = _date_year(release)
if not f.disc:
disc_number = ''
else:
disc_number = '%d' % f.disc
if not f.track:
track_number = ''
else:
track_number = '%02d' % f.track
if not f.title:
basename = os.path.basename(
path.decode(headphones.SYS_ENCODING, 'replace'))
title = os.path.splitext(basename)[0]
from_metadata = False
else:
title = f.title
from_metadata = True
ext = os.path.splitext(path)[1]
if release['ArtistName'] == "Various Artists" and f.artist:
artist_name = f.artist
else:
artist_name = release['ArtistName']
if artist_name.startswith('The '):
sort_name = artist_name[4:] + ", The"
else:
sort_name = artist_name
album_title = release['AlbumTitle']
override_values = {
Vars.DISC: disc_number,
Vars.TRACK: track_number,
Vars.TITLE: title,
Vars.ARTIST: artist_name,
Vars.SORT_ARTIST: sort_name,
Vars.ALBUM: album_title,
Vars.YEAR: year,
Vars.DATE: date,
Vars.EXTENSION: ext,
Vars.TITLE_LOWER: title.lower(),
Vars.ARTIST_LOWER: artist_name.lower(),
Vars.SORT_ARTIST_LOWER: sort_name.lower(),
Vars.ALBUM_LOWER: album_title.lower(),
}
res.add_items(override_values.iteritems())
return res, from_metadata
def _intersect(d1, d2):
# type: (Mapping,Mapping)->Mapping
"""
Create intersection (common part) of two dictionaries.
"""
res = {}
for key, val in d1.iteritems():
if key in d2 and d2[key] == val:
res[key] = val
return res
def album_metadata(path, release, common_tags):
# type: (str,sqlite3.Row,Mapping[str,str])->Mapping[str,str]
"""
Prepare metadata dictionary for path substitution of album folder.
:param path: album path to prepare metadata for.
:param release: database row with release properties.
:param common_tags: common set of tags gathered from media files.
:return: metadata dictionary with substitution variables for rendering path.
"""
date, year = _date_year(release)
artist = release['ArtistName'].replace('/', '_')
album = release['AlbumTitle'].replace('/', '_')
release_type = release['Type'].replace('/', '_')
if release['ArtistName'].startswith('The '):
sort_name = release['ArtistName'][4:] + ", The"
else:
sort_name = release['ArtistName']
if sort_name[0].isdigit():
first_char = u'0-9'
else:
first_char = sort_name[0]
for r, d, f in os.walk(path):
try:
orig_folder = os.path.basename(
os.path.normpath(r).decode(headphones.SYS_ENCODING, 'replace'))
except:
orig_folder = u''
override_values = {
Vars.ARTIST: artist,
Vars.SORT_ARTIST: sort_name,
Vars.ALBUM: album,
Vars.YEAR: year,
Vars.DATE: date,
Vars.TYPE: release_type,
Vars.ORIGINAL_FOLDER: orig_folder,
Vars.FIRST_LETTER: first_char.upper(),
Vars.ARTIST_LOWER: artist.lower(),
Vars.SORT_ARTIST_LOWER: sort_name.lower(),
Vars.ALBUM_LOWER: album.lower(),
Vars.TYPE_LOWER: release_type.lower(),
Vars.FIRST_LETTER_LOWER: first_char.lower(),
Vars.ORIGINAL_FOLDER_LOWER: orig_folder.lower()
}
res = MetadataDict(common_tags)
res.add_items(override_values.iteritems())
return res
def albumart_metadata(release, common_tags):
# type: (sqlite3.Row,Mapping)->Mapping
"""
Prepare metadata dictionary for path subtitution of album art file.
:param release: database row with release properties.
:param common_tags: common set of tags gathered from media files.
:return: metadata dictionary with substitution variables for rendering path.
"""
date, year = _date_year(release)
override_values = {
Vars.ARTIST: release['ArtistName'],
Vars.ALBUM: release['AlbumTitle'],
Vars.YEAR: year,
Vars.DATE: date,
Vars.ARTIST_LOWER: release['ArtistName'].lower(),
Vars.ALBUM_LOWER: release['AlbumTitle'].lower()
}
res = MetadataDict(common_tags)
res.add_items(override_values.iteritems())
return res
class AlbumMetadataBuilder(object):
"""
Facilitates building of album metadata as a common set of tags retrieved
from media files.
"""
def __init__(self):
self._common = None
def add_media_file(self, mf):
# type: (Mapping)->None
"""
Add metadata tags read from media file to album metadata.
:param mf: MediaFile
"""
md = {}
_media_file_to_dict(mf, md)
if self._common is None:
self._common = md
else:
self._common = _intersect(self._common, md)
def build(self):
# type: (None)->Mapping
"""
Build case-insensitive, case-preserving dict from gathered metadata
tags.
:return: dictinary-like object filled with $variables based on common
tags.
"""
return MetadataDict(self._common)

148
headphones/metadata_test.py Normal file
View File

@@ -0,0 +1,148 @@
# encoding=utf8
# This file is part of Headphones.
#
# Headphones is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Headphones is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Headphones. If not, see <http://www.gnu.org/licenses/>.
"""
Test module for metadata.
"""
import headphones.metadata as _md
from headphones.metadata import MetadataDict
import datetime
from unittestcompat import TestCase
__author__ = "Andrzej Ciarkowski <andrzej.ciarkowski@gmail.com>"
class _MockMediaFile(object):
def __init__(self, artist, album, year, track, title, label):
self.artist = artist
self.album = album
self.year = year
self.track = track
self.title = title
self.label = label
self.art = 'THIS IS ART BLOB'
@classmethod
def readable_fields(cls):
return 'artist', 'album', 'year', 'track', 'title', 'label', 'art'
class _MockDatabaseRow(object):
def __init__(self, d):
self._dict = dict(d)
def keys(self):
return self._dict.iterkeys()
def __getitem__(self, item):
return self._dict[item]
class MetadataTest(TestCase):
"""
Tests for metadata module.
"""
def test_metadata_dict_ci(self):
"""MetadataDict: case-insensitive lookup"""
expected = u'naïve'
key_var = '$TitlE'
m = MetadataDict({key_var.lower(): u'naïve'})
self.assertFalse('$track' in m)
self.assertTrue('$tITLe' in m, "cross-case lookup with 'in'")
self.assertEqual(m[key_var], expected, "cross-case lookup success")
self.assertEqual(m[key_var.lower()], expected, "same-case lookup "
"succes")
def test_metadata_dict_cs(self):
"""MetadataDice: case-preserving lookup"""
expected_var = u'NaïVe'
key_var = '$TitlE'
m = MetadataDict({
key_var.lower(): expected_var.lower(),
key_var: expected_var
})
self.assertFalse('$track' in m)
self.assertTrue('$tITLe' in m, "cross-case lookup with 'in'")
self.assertEqual(m[key_var.lower()], expected_var.lower(),
"case-preserving lookup lower")
self.assertEqual(m[key_var], expected_var,
"case-preserving lookup variable")
def test_dict_intersect(self):
"""metadata: check dictionary intersect function validity"""
d1 = {
'one': 'one',
'two': 'two',
'three': 'zonk'
}
d2 = {
'two': 'two',
'three': 'three'
}
expected = {
'two': 'two'
}
self.assertItemsEqual(
expected, _md._intersect(d1, d2), "check dictionary intersection "
"is common part indeed"
)
del d1['two']
expected = {}
self.assertItemsEqual(
expected, _md._intersect(d1, d2), "check intersection empty"
)
def test_album_metadata_builder(self):
"""AlbumMetadataBuilder: check validity"""
mb = _md.AlbumMetadataBuilder()
f1 = _MockMediaFile('artist', 'album', 2000, 1, 'track1', 'Ant-Zen')
mb.add_media_file(f1)
f2 = _MockMediaFile('artist', 'album', 2000, 2, 'track2', 'Ant-Zen')
mb.add_media_file(f2)
md = mb.build()
expected = {
_md.Vars.ARTIST_LOWER: 'artist',
_md.Vars.ALBUM_LOWER: 'album',
_md.Vars.YEAR.lower(): 2000,
'$label': 'Ant-Zen'
}
self.assertItemsEqual(
expected, md, "check AlbumMetadataBuilder validity"
)
def test_populate_from_row(self):
"""metadata: check populating metadata from database row"""
row = _MockDatabaseRow({
'ArtistName': 'artist',
'AlbumTitle': 'album',
'ReleaseDate': datetime.date(2004, 11, 28),
'Variation': 5,
'WrongTyped': complex(1, -1)
})
md = _md.MetadataDict()
_md._row_to_dict(row, md)
expected = {
'$ArtistName': 'artist',
'$AlbumTitle': 'album',
'$ReleaseDate': '2004-11-28',
'$Variation': '5'
}
self.assertItemsEqual(expected, md, "check _row_to_dict() valid")

View File

@@ -30,6 +30,7 @@ from beetsplug import lyrics as beetslyrics
from headphones import notifiers, utorrent, transmission, deluge
from headphones import db, albumart, librarysync
from headphones import logger, helpers, request, mb, music_encoder
from headphones import metadata
postprocessor_lock = threading.Lock()
@@ -339,14 +340,13 @@ def doPostProcessing(albumid, albumpath, release, tracks, downloaded_track_list,
if any(files.lower().endswith('.' + x.lower()) for x in headphones.MEDIA_FORMATS):
downloaded_track_list.append(os.path.join(r, files))
builder = metadata.AlbumMetadataBuilder()
# Check if files are valid media files and are writable, before the steps
# below are executed. This simplifies errors and prevents unfinished steps.
for downloaded_track in downloaded_track_list:
try:
f = MediaFile(downloaded_track)
if f is None:
# this test is just to keep pyflakes from complaining about an unused variable
return
builder.add_media_file(f)
except (FileTypeError, UnreadableFileError):
logger.error("Track file is not a valid media file: %s. Not "
"continuing.", downloaded_track.decode(
@@ -378,6 +378,7 @@ def doPostProcessing(albumid, albumpath, release, tracks, downloaded_track_list,
shutil.rmtree(new_folder)
return
metadata_dict = builder.build()
# start encoding
if headphones.CONFIG.MUSIC_ENCODER:
downloaded_track_list = music_encoder.encode(albumpath)
@@ -413,7 +414,7 @@ def doPostProcessing(albumid, albumpath, release, tracks, downloaded_track_list,
renameNFO(albumpath)
if headphones.CONFIG.ADD_ALBUM_ART and artwork:
addAlbumArt(artwork, albumpath, release)
addAlbumArt(artwork, albumpath, release, metadata_dict)
if headphones.CONFIG.CORRECT_METADATA:
correctedMetadata = correctMetadata(albumid, release, downloaded_track_list)
@@ -433,7 +434,7 @@ def doPostProcessing(albumid, albumpath, release, tracks, downloaded_track_list,
'No DESTINATION_DIR has been set. Set "Destination Directory" to the parent directory you want to move the files to')
albumpaths = [albumpath]
elif headphones.CONFIG.MOVE_FILES and headphones.CONFIG.DESTINATION_DIR:
albumpaths = moveFiles(albumpath, release, tracks)
albumpaths = moveFiles(albumpath, release, metadata_dict)
else:
albumpaths = [albumpath]
@@ -606,34 +607,20 @@ def embedAlbumArt(artwork, downloaded_track_list):
continue
def addAlbumArt(artwork, albumpath, release):
def addAlbumArt(artwork, albumpath, release, metadata_dict):
logger.info('Adding album art to folder')
md = metadata.album_metadata(albumpath, release, metadata_dict)
try:
date = release['ReleaseDate']
except TypeError:
date = u''
ext = ".jpg"
# PNGs are possibe here too
if artwork[:4] == '\x89PNG':
ext = ".png"
if date is not None:
year = date[:4]
else:
year = u''
album_art_name = helpers.replace_all(
headphones.CONFIG.ALBUM_ART_FORMAT.strip(), md) + ext
values = {'$Artist': release['ArtistName'],
'$Album': release['AlbumTitle'],
'$Year': year,
'$Date': date,
'$artist': release['ArtistName'].lower(),
'$album': release['AlbumTitle'].lower(),
'$year': year,
'$date': date
}
album_art_name = helpers.replace_all(headphones.CONFIG.ALBUM_ART_FORMAT.strip(),
values) + ".jpg"
album_art_name = helpers.replace_illegal_chars(album_art_name).encode(headphones.SYS_ENCODING,
'replace')
album_art_name = helpers.replace_illegal_chars(album_art_name).encode(
headphones.SYS_ENCODING, 'replace')
if headphones.CONFIG.FILE_UNDERSCORES:
album_art_name = album_art_name.replace(' ', '_')
@@ -680,62 +667,15 @@ def renameNFO(albumpath):
os.path.join(r, file).decode(headphones.SYS_ENCODING, 'replace'), e))
def moveFiles(albumpath, release, tracks):
def moveFiles(albumpath, release, metadata_dict):
logger.info("Moving files: %s" % albumpath)
try:
date = release['ReleaseDate']
except TypeError:
date = u''
if date is not None:
year = date[:4]
else:
year = u''
md = metadata.album_metadata(albumpath, release, metadata_dict)
folder = helpers.replace_all(
headphones.CONFIG.FOLDER_FORMAT.strip(), md, normalize=True)
artist = release['ArtistName'].replace('/', '_')
album = release['AlbumTitle'].replace('/', '_')
if headphones.CONFIG.FILE_UNDERSCORES:
artist = artist.replace(' ', '_')
album = album.replace(' ', '_')
releasetype = release['Type'].replace('/', '_')
if release['ArtistName'].startswith('The '):
sortname = release['ArtistName'][4:] + ", The"
else:
sortname = release['ArtistName']
if sortname[0].isdigit():
firstchar = u'0-9'
else:
firstchar = sortname[0]
for r, d, f in os.walk(albumpath):
try:
origfolder = os.path.basename(
os.path.normpath(r).decode(headphones.SYS_ENCODING, 'replace'))
except:
origfolder = u''
values = {'$Artist': artist,
'$SortArtist': sortname,
'$Album': album,
'$Year': year,
'$Date': date,
'$Type': releasetype,
'$OriginalFolder': origfolder,
'$First': firstchar.upper(),
'$artist': artist.lower(),
'$sortartist': sortname.lower(),
'$album': album.lower(),
'$year': year,
'$date': date,
'$type': releasetype.lower(),
'$first': firstchar.lower(),
'$originalfolder': origfolder.lower()
}
folder = helpers.replace_all(headphones.CONFIG.FOLDER_FORMAT.strip(), values, normalize=True)
folder = folder.replace(' ', '_')
folder = helpers.replace_illegal_chars(folder, type="folder")
folder = folder.replace('./', '_/').replace('/.', '/_')
@@ -1080,82 +1020,25 @@ def embedLyrics(downloaded_track_list):
def renameFiles(albumpath, downloaded_track_list, release):
logger.info('Renaming files')
try:
date = release['ReleaseDate']
except TypeError:
date = u''
if date is not None:
year = date[:4]
else:
year = u''
# Until tagging works better I'm going to rely on the already provided metadata
for downloaded_track in downloaded_track_list:
try:
f = MediaFile(downloaded_track)
except:
logger.info("MediaFile couldn't parse: %s",
downloaded_track.decode(headphones.SYS_ENCODING, 'replace'))
md, from_metadata = metadata.file_metadata(downloaded_track, release)
if md is None:
# unable to parse media file, skip file
continue
if not f.disc:
discnumber = ''
else:
discnumber = '%d' % f.disc
if not f.track:
tracknumber = ''
else:
tracknumber = '%02d' % f.track
if not f.title:
basename = os.path.basename(downloaded_track.decode(headphones.SYS_ENCODING, 'replace'))
title = os.path.splitext(basename)[0]
ext = os.path.splitext(basename)[1]
ext = md[metadata.Vars.EXTENSION]
if not from_metadata:
title = md[metadata.Vars.TITLE]
new_file_name = helpers.cleanTitle(title) + ext
else:
title = f.title
new_file_name = helpers.replace_all(
headphones.CONFIG.FILE_FORMAT.strip(), md
).replace('/', '_') + ext
if release['ArtistName'] == "Various Artists" and f.artist:
artistname = f.artist
else:
artistname = release['ArtistName']
if artistname.startswith('The '):
sortname = artistname[4:] + ", The"
else:
sortname = artistname
values = {'$Disc': discnumber,
'$Track': tracknumber,
'$Title': title,
'$Artist': artistname,
'$SortArtist': sortname,
'$Album': release['AlbumTitle'],
'$Year': year,
'$Date': date,
'$disc': discnumber,
'$track': tracknumber,
'$title': title.lower(),
'$artist': artistname.lower(),
'$sortartist': sortname.lower(),
'$album': release['AlbumTitle'].lower(),
'$year': year,
'$date': date
}
ext = os.path.splitext(downloaded_track)[1]
new_file_name = helpers.replace_all(headphones.CONFIG.FILE_FORMAT.strip(),
values).replace('/', '_') + ext
new_file_name = helpers.replace_illegal_chars(new_file_name).encode(headphones.SYS_ENCODING,
'replace')
new_file_name = helpers.replace_illegal_chars(new_file_name).encode(
headphones.SYS_ENCODING, 'replace')
if headphones.CONFIG.FILE_UNDERSCORES:
new_file_name = new_file_name.replace(' ', '_')
@@ -1166,8 +1049,8 @@ def renameFiles(albumpath, downloaded_track_list, release):
new_file = os.path.join(albumpath, new_file_name)
if downloaded_track == new_file_name:
logger.debug("Renaming for: " + downloaded_track.decode(headphones.SYS_ENCODING,
'replace') + " is not neccessary")
logger.debug("Renaming for: " + downloaded_track.decode(
headphones.SYS_ENCODING, 'replace') + " is not neccessary")
continue
logger.debug('Renaming %s ---> %s',