metadata.py: Use arbitrary variables from existing tags or database in renamer.

This commit adds the possiblity to use variables from tags already present in
the downloaded media files in file, folder and album art renaming routines. The
variable names translate directly to MediaFile field names, so it is now
possible to use variables like $mb_albumid, $genre, $bitrate, $samplerate, etc.
The full list can be read from MediaFile.readable_fields().
This commit is contained in:
Andrzej Ciarkowski
2016-02-27 00:37:49 +01:00
parent 7e9bd432ce
commit 97e405ee8c
3 changed files with 534 additions and 151 deletions

352
headphones/metadata.py Normal file
View File

@@ -0,0 +1,352 @@
# encoding=utf8
# This file is part of Headphones.
#
# Headphones is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Headphones is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Headphones. If not, see <http://www.gnu.org/licenses/>.
"""
Track/album metadata handling routines.
"""
from __future__ import print_function
from beets.mediafile import MediaFile, UnreadableFileError
import headphones
from headphones import logger
import os.path
import datetime
__author__ = "Andrzej Ciarkowski <andrzej.ciarkowski@gmail.com>"
class MetadataDict(dict):
"""
Dictionary which allows for case-insensitive, but case-preserving lookup,
allowing to put different values under $Album and $album, but still
finding some value if only single key is present and called with any
variation of the name's case.
Keeps case-sensitive mapping in superclass dict, and case-insensitive (
lowercase) in member variable self._lower. If case-sensitive lookup
fails, another case-insensitive attempt is made.
"""
def __setitem__(self, key, value):
super(MetadataDict, self).__setitem__(key, value)
self._lower.__setitem__(key.lower(), value)
def add_items(self, items):
# type: (Iterable[Tuple[Any,Any]])->None
"""
Add (key,value) pairs to this dictionary using iterable as an input.
:param items: input items.
"""
for key, value in items:
self.__setitem__(key, value)
def __init__(self, seq=None, **kwargs):
if isinstance(seq, MetadataDict):
super(MetadataDict, self).__init__(seq)
self._lower = dict(seq._lower)
else:
super(MetadataDict, self).__init__()
self._lower = {}
if seq is not None:
try:
self.add_items(seq.iteritems())
except KeyError:
self.add_items(seq)
def __getitem__(self, item):
try:
return super(MetadataDict, self).__getitem__(item)
except KeyError:
return self._lower.__getitem__(item.lower())
def __contains__(self, item):
return self._lower.__contains__(item.lower())
class Vars:
"""
Metadata $variable names (only ones set explicitly by headphones).
"""
DISC = '$Disc'
TRACK = '$Track'
TITLE = '$Title'
ARTIST = '$Artist'
SORT_ARTIST = '$SortArtist'
ALBUM = '$Album'
YEAR = '$Year'
DATE = '$Date'
EXTENSION = '$Extension'
ORIGINAL_FOLDER = '$OriginalFolder'
FIRST_LETTER = '$First'
TYPE = '$Type'
TITLE_LOWER = TITLE.lower()
ARTIST_LOWER = ARTIST.lower()
SORT_ARTIST_LOWER = SORT_ARTIST.lower()
ALBUM_LOWER = ALBUM.lower()
ORIGINAL_FOLDER_LOWER = ORIGINAL_FOLDER.lower()
FIRST_LETTER_LOWER = FIRST_LETTER.lower()
TYPE_LOWER = TYPE.lower()
def _verify_var_type(val):
"""
Check if type of value is allowed as a variable in pathname substitution.
"""
return isinstance(val, (basestring, int, float, datetime.date))
def _as_str(val):
if isinstance(val, basestring):
return val
else:
return str(val)
def _media_file_to_dict(mf, d):
"""
Populate dict with tags read from media file.
"""
for fld in mf.readable_fields():
if 'art' == fld:
# skip embedded artwork as it's a BLOB
continue
val = getattr(mf, fld)
if val is None:
val = ''
# include only types with meaningful string representation
if _verify_var_type(val):
d['$' + fld] = _as_str(val)
def _row_to_dict(row, d):
"""
Populate dict with database row fields.
"""
for fld in row.keys():
val = row[fld]
if val is None:
val = ''
if _verify_var_type(val):
d['$' + fld] = _as_str(val)
def _date_year(release):
# type: (sqlite3.Row)->Tuple[str,str]
"""
Extract release date and year from database row
"""
try:
date = release['ReleaseDate']
except TypeError:
date = ''
if date is not None:
year = date[:4]
else:
year = ''
return date, year
def file_metadata(path, release):
# type: (str,sqlite3.Row)->Tuple[Mapping[str,str],bool]
"""
Prepare metadata dictionary for path substitution, based on file name,
the tags stored within it and release info from the db.
:param path: media file path
:param release: database row with release info
:return: pair (dict,boolean indicating if Vars.TITLE is taken from tags or
file name). (None,None) if unable to parse the media file.
"""
try:
f = MediaFile(path)
except UnreadableFileError as ex:
logger.info("MediaFile couldn't parse: %s (%s)",
path.decode(headphones.SYS_ENCODING, 'replace'),
str(ex))
return None, None
res = MetadataDict()
# add existing tags first, these will get overwritten by musicbrainz from db
_media_file_to_dict(f, res)
# raw database fields come next
_row_to_dict(release, res)
date, year = _date_year(release)
if not f.disc:
disc_number = ''
else:
disc_number = '%d' % f.disc
if not f.track:
track_number = ''
else:
track_number = '%02d' % f.track
if not f.title:
basename = os.path.basename(
path.decode(headphones.SYS_ENCODING, 'replace'))
title = os.path.splitext(basename)[0]
from_metadata = False
else:
title = f.title
from_metadata = True
ext = os.path.splitext(path)[1]
if release['ArtistName'] == "Various Artists" and f.artist:
artist_name = f.artist
else:
artist_name = release['ArtistName']
if artist_name.startswith('The '):
sort_name = artist_name[4:] + ", The"
else:
sort_name = artist_name
album_title = release['AlbumTitle']
override_values = {
Vars.DISC: disc_number,
Vars.TRACK: track_number,
Vars.TITLE: title,
Vars.ARTIST: artist_name,
Vars.SORT_ARTIST: sort_name,
Vars.ALBUM: album_title,
Vars.YEAR: year,
Vars.DATE: date,
Vars.EXTENSION: ext,
Vars.TITLE_LOWER: title.lower(),
Vars.ARTIST_LOWER: artist_name.lower(),
Vars.SORT_ARTIST_LOWER: sort_name.lower(),
Vars.ALBUM_LOWER: album_title.lower(),
}
res.add_items(override_values.iteritems())
return res, from_metadata
def _intersect(d1, d2):
# type: (Mapping,Mapping)->Mapping
"""
Create intersection (common part) of two dictionaries.
"""
res = {}
for key, val in d1.iteritems():
if key in d2 and d2[key] == val:
res[key] = val
return res
def album_metadata(path, release, common_tags):
# type: (str,sqlite3.Row,Mapping[str,str])->Mapping[str,str]
"""
Prepare metadata dictionary for path substitution of album folder.
:param path: album path to prepare metadata for.
:param release: database row with release properties.
:param common_tags: common set of tags gathered from media files.
:return: metadata dictionary with substitution variables for rendering path.
"""
date, year = _date_year(release)
artist = release['ArtistName'].replace('/', '_')
album = release['AlbumTitle'].replace('/', '_')
release_type = release['Type'].replace('/', '_')
if release['ArtistName'].startswith('The '):
sort_name = release['ArtistName'][4:] + ", The"
else:
sort_name = release['ArtistName']
if sort_name[0].isdigit():
first_char = u'0-9'
else:
first_char = sort_name[0]
for r, d, f in os.walk(path):
try:
orig_folder = os.path.basename(
os.path.normpath(r).decode(headphones.SYS_ENCODING, 'replace'))
except:
orig_folder = u''
override_values = {
Vars.ARTIST: artist,
Vars.SORT_ARTIST: sort_name,
Vars.ALBUM: album,
Vars.YEAR: year,
Vars.DATE: date,
Vars.TYPE: release_type,
Vars.ORIGINAL_FOLDER: orig_folder,
Vars.FIRST_LETTER: first_char.upper(),
Vars.ARTIST_LOWER: artist.lower(),
Vars.SORT_ARTIST_LOWER: sort_name.lower(),
Vars.ALBUM_LOWER: album.lower(),
Vars.TYPE_LOWER: release_type.lower(),
Vars.FIRST_LETTER_LOWER: first_char.lower(),
Vars.ORIGINAL_FOLDER_LOWER: orig_folder.lower()
}
res = MetadataDict(common_tags)
res.add_items(override_values.iteritems())
return res
def albumart_metadata(release, common_tags):
# type: (sqlite3.Row,Mapping)->Mapping
"""
Prepare metadata dictionary for path subtitution of album art file.
:param release: database row with release properties.
:param common_tags: common set of tags gathered from media files.
:return: metadata dictionary with substitution variables for rendering path.
"""
date, year = _date_year(release)
override_values = {
Vars.ARTIST: release['ArtistName'],
Vars.ALBUM: release['AlbumTitle'],
Vars.YEAR: year,
Vars.DATE: date,
Vars.ARTIST_LOWER: release['ArtistName'].lower(),
Vars.ALBUM_LOWER: release['AlbumTitle'].lower()
}
res = MetadataDict(common_tags)
res.add_items(override_values.iteritems())
return res
class AlbumMetadataBuilder(object):
"""
Facilitates building of album metadata as a common set of tags retrieved
from media files.
"""
def __init__(self):
self._common = None
def add_media_file(self, mf):
# type: (Mapping)->None
"""
Add metadata tags read from media file to album metadata.
:param mf: MediaFile
"""
md = {}
_media_file_to_dict(mf, md)
if self._common is None:
self._common = md
else:
self._common = _intersect(self._common, md)
def build(self):
# type: (None)->Mapping
"""
Build case-insensitive, case-preserving dict from gathered metadata
tags.
:return: dictinary-like object filled with $variables based on common
tags.
"""
return MetadataDict(self._common)

148
headphones/metadata_test.py Normal file
View File

@@ -0,0 +1,148 @@
# encoding=utf8
# This file is part of Headphones.
#
# Headphones is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Headphones is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Headphones. If not, see <http://www.gnu.org/licenses/>.
"""
Test module for metadata.
"""
import headphones.metadata as _md
from headphones.metadata import MetadataDict
import datetime
from unittestcompat import TestCase
__author__ = "Andrzej Ciarkowski <andrzej.ciarkowski@gmail.com>"
class _MockMediaFile(object):
def __init__(self, artist, album, year, track, title, label):
self.artist = artist
self.album = album
self.year = year
self.track = track
self.title = title
self.label = label
self.art = 'THIS IS ART BLOB'
@classmethod
def readable_fields(cls):
return 'artist', 'album', 'year', 'track', 'title', 'label', 'art'
class _MockDatabaseRow(object):
def __init__(self, d):
self._dict = dict(d)
def keys(self):
return self._dict.iterkeys()
def __getitem__(self, item):
return self._dict[item]
class MetadataTest(TestCase):
"""
Tests for metadata module.
"""
def test_metadata_dict_ci(self):
"""MetadataDict: case-insensitive lookup"""
expected = u'naïve'
key_var = '$TitlE'
m = MetadataDict({key_var.lower(): u'naïve'})
self.assertFalse('$track' in m)
self.assertTrue('$tITLe' in m, "cross-case lookup with 'in'")
self.assertEqual(m[key_var], expected, "cross-case lookup success")
self.assertEqual(m[key_var.lower()], expected, "same-case lookup "
"succes")
def test_metadata_dict_cs(self):
"""MetadataDice: case-preserving lookup"""
expected_var = u'NaïVe'
key_var = '$TitlE'
m = MetadataDict({
key_var.lower(): expected_var.lower(),
key_var: expected_var
})
self.assertFalse('$track' in m)
self.assertTrue('$tITLe' in m, "cross-case lookup with 'in'")
self.assertEqual(m[key_var.lower()], expected_var.lower(),
"case-preserving lookup lower")
self.assertEqual(m[key_var], expected_var,
"case-preserving lookup variable")
def test_dict_intersect(self):
"""metadata: check dictionary intersect function validity"""
d1 = {
'one': 'one',
'two': 'two',
'three': 'zonk'
}
d2 = {
'two': 'two',
'three': 'three'
}
expected = {
'two': 'two'
}
self.assertItemsEqual(
expected, _md._intersect(d1, d2), "check dictionary intersection "
"is common part indeed"
)
del d1['two']
expected = {}
self.assertItemsEqual(
expected, _md._intersect(d1, d2), "check intersection empty"
)
def test_album_metadata_builder(self):
"""AlbumMetadataBuilder: check validity"""
mb = _md.AlbumMetadataBuilder()
f1 = _MockMediaFile('artist', 'album', 2000, 1, 'track1', 'Ant-Zen')
mb.add_media_file(f1)
f2 = _MockMediaFile('artist', 'album', 2000, 2, 'track2', 'Ant-Zen')
mb.add_media_file(f2)
md = mb.build()
expected = {
_md.Vars.ARTIST_LOWER: 'artist',
_md.Vars.ALBUM_LOWER: 'album',
_md.Vars.YEAR.lower(): 2000,
'$label': 'Ant-Zen'
}
self.assertItemsEqual(
expected, md, "check AlbumMetadataBuilder validity"
)
def test_populate_from_row(self):
"""metadata: check populating metadata from database row"""
row = _MockDatabaseRow({
'ArtistName': 'artist',
'AlbumTitle': 'album',
'ReleaseDate': datetime.date(2004, 11, 28),
'Variation': 5,
'WrongTyped': complex(1, -1)
})
md = _md.MetadataDict()
_md._row_to_dict(row, md)
expected = {
'$ArtistName': 'artist',
'$AlbumTitle': 'album',
'$ReleaseDate': '2004-11-28',
'$Variation': '5'
}
self.assertItemsEqual(expected, md, "check _row_to_dict() valid")

View File

@@ -30,6 +30,7 @@ from beetsplug import lyrics as beetslyrics
from headphones import notifiers, utorrent, transmission, deluge
from headphones import db, albumart, librarysync
from headphones import logger, helpers, request, mb, music_encoder
from headphones import metadata
postprocessor_lock = threading.Lock()
@@ -339,14 +340,13 @@ def doPostProcessing(albumid, albumpath, release, tracks, downloaded_track_list,
if any(files.lower().endswith('.' + x.lower()) for x in headphones.MEDIA_FORMATS):
downloaded_track_list.append(os.path.join(r, files))
builder = metadata.AlbumMetadataBuilder()
# Check if files are valid media files and are writable, before the steps
# below are executed. This simplifies errors and prevents unfinished steps.
for downloaded_track in downloaded_track_list:
try:
f = MediaFile(downloaded_track)
if f is None:
# this test is just to keep pyflakes from complaining about an unused variable
return
builder.add_media_file(f)
except (FileTypeError, UnreadableFileError):
logger.error("Track file is not a valid media file: %s. Not " \
"continuing.", downloaded_track.decode(
@@ -378,6 +378,7 @@ def doPostProcessing(albumid, albumpath, release, tracks, downloaded_track_list,
shutil.rmtree(new_folder)
return
metadata_dict = builder.build()
# start encoding
if headphones.CONFIG.MUSIC_ENCODER:
downloaded_track_list = music_encoder.encode(albumpath)
@@ -413,7 +414,7 @@ def doPostProcessing(albumid, albumpath, release, tracks, downloaded_track_list,
renameNFO(albumpath)
if headphones.CONFIG.ADD_ALBUM_ART and artwork:
addAlbumArt(artwork, albumpath, release)
addAlbumArt(artwork, albumpath, release, metadata_dict)
if headphones.CONFIG.CORRECT_METADATA:
correctedMetadata = correctMetadata(albumid, release, downloaded_track_list)
@@ -433,7 +434,7 @@ def doPostProcessing(albumid, albumpath, release, tracks, downloaded_track_list,
'No DESTINATION_DIR has been set. Set "Destination Directory" to the parent directory you want to move the files to')
albumpaths = [albumpath]
elif headphones.CONFIG.MOVE_FILES and headphones.CONFIG.DESTINATION_DIR:
albumpaths = moveFiles(albumpath, release, tracks)
albumpaths = moveFiles(albumpath, release, metadata_dict)
else:
albumpaths = [albumpath]
@@ -606,34 +607,20 @@ def embedAlbumArt(artwork, downloaded_track_list):
continue
def addAlbumArt(artwork, albumpath, release):
def addAlbumArt(artwork, albumpath, release, metadata_dict):
logger.info('Adding album art to folder')
md = metadata.album_metadata(albumpath, release, metadata_dict)
try:
date = release['ReleaseDate']
except TypeError:
date = u''
ext = ".jpg"
# PNGs are possibe here too
if artwork[:4] == '\x89PNG':
ext = ".png"
if date is not None:
year = date[:4]
else:
year = u''
album_art_name = helpers.replace_all(
headphones.CONFIG.ALBUM_ART_FORMAT.strip(), md) + ext
values = {'$Artist': release['ArtistName'],
'$Album': release['AlbumTitle'],
'$Year': year,
'$Date': date,
'$artist': release['ArtistName'].lower(),
'$album': release['AlbumTitle'].lower(),
'$year': year,
'$date': date
}
album_art_name = helpers.replace_all(headphones.CONFIG.ALBUM_ART_FORMAT.strip(),
values) + ".jpg"
album_art_name = helpers.replace_illegal_chars(album_art_name).encode(headphones.SYS_ENCODING,
'replace')
album_art_name = helpers.replace_illegal_chars(album_art_name).encode(
headphones.SYS_ENCODING, 'replace')
if headphones.CONFIG.FILE_UNDERSCORES:
album_art_name = album_art_name.replace(' ', '_')
@@ -680,62 +667,15 @@ def renameNFO(albumpath):
os.path.join(r, file).decode(headphones.SYS_ENCODING, 'replace'), e))
def moveFiles(albumpath, release, tracks):
def moveFiles(albumpath, release, metadata_dict):
logger.info("Moving files: %s" % albumpath)
try:
date = release['ReleaseDate']
except TypeError:
date = u''
if date is not None:
year = date[:4]
else:
year = u''
md = metadata.album_metadata(albumpath, release, metadata_dict)
folder = helpers.replace_all(
headphones.CONFIG.FOLDER_FORMAT.strip(), md, normalize=True)
artist = release['ArtistName'].replace('/', '_')
album = release['AlbumTitle'].replace('/', '_')
if headphones.CONFIG.FILE_UNDERSCORES:
artist = artist.replace(' ', '_')
album = album.replace(' ', '_')
releasetype = release['Type'].replace('/', '_')
if release['ArtistName'].startswith('The '):
sortname = release['ArtistName'][4:] + ", The"
else:
sortname = release['ArtistName']
if sortname[0].isdigit():
firstchar = u'0-9'
else:
firstchar = sortname[0]
for r, d, f in os.walk(albumpath):
try:
origfolder = os.path.basename(
os.path.normpath(r).decode(headphones.SYS_ENCODING, 'replace'))
except:
origfolder = u''
values = {'$Artist': artist,
'$SortArtist': sortname,
'$Album': album,
'$Year': year,
'$Date': date,
'$Type': releasetype,
'$OriginalFolder': origfolder,
'$First': firstchar.upper(),
'$artist': artist.lower(),
'$sortartist': sortname.lower(),
'$album': album.lower(),
'$year': year,
'$date': date,
'$type': releasetype.lower(),
'$first': firstchar.lower(),
'$originalfolder': origfolder.lower()
}
folder = helpers.replace_all(headphones.CONFIG.FOLDER_FORMAT.strip(), values, normalize=True)
folder = folder.replace(' ', '_')
folder = helpers.replace_illegal_chars(folder, type="folder")
folder = folder.replace('./', '_/').replace('/.', '/_')
@@ -1080,82 +1020,25 @@ def embedLyrics(downloaded_track_list):
def renameFiles(albumpath, downloaded_track_list, release):
logger.info('Renaming files')
try:
date = release['ReleaseDate']
except TypeError:
date = u''
if date is not None:
year = date[:4]
else:
year = u''
# Until tagging works better I'm going to rely on the already provided metadata
for downloaded_track in downloaded_track_list:
try:
f = MediaFile(downloaded_track)
except:
logger.info("MediaFile couldn't parse: %s",
downloaded_track.decode(headphones.SYS_ENCODING, 'replace'))
md, from_metadata = metadata.file_metadata(downloaded_track, release)
if md is None:
# unable to parse media file, skip file
continue
if not f.disc:
discnumber = ''
else:
discnumber = '%d' % f.disc
if not f.track:
tracknumber = ''
else:
tracknumber = '%02d' % f.track
if not f.title:
basename = os.path.basename(downloaded_track.decode(headphones.SYS_ENCODING, 'replace'))
title = os.path.splitext(basename)[0]
ext = os.path.splitext(basename)[1]
ext = md[metadata.Vars.EXTENSION]
if not from_metadata:
title = md[metadata.Vars.TITLE]
new_file_name = helpers.cleanTitle(title) + ext
else:
title = f.title
new_file_name = helpers.replace_all(
headphones.CONFIG.FILE_FORMAT.strip(), md
).replace('/', '_') + ext
if release['ArtistName'] == "Various Artists" and f.artist:
artistname = f.artist
else:
artistname = release['ArtistName']
if artistname.startswith('The '):
sortname = artistname[4:] + ", The"
else:
sortname = artistname
values = {'$Disc': discnumber,
'$Track': tracknumber,
'$Title': title,
'$Artist': artistname,
'$SortArtist': sortname,
'$Album': release['AlbumTitle'],
'$Year': year,
'$Date': date,
'$disc': discnumber,
'$track': tracknumber,
'$title': title.lower(),
'$artist': artistname.lower(),
'$sortartist': sortname.lower(),
'$album': release['AlbumTitle'].lower(),
'$year': year,
'$date': date
}
ext = os.path.splitext(downloaded_track)[1]
new_file_name = helpers.replace_all(headphones.CONFIG.FILE_FORMAT.strip(),
values).replace('/', '_') + ext
new_file_name = helpers.replace_illegal_chars(new_file_name).encode(headphones.SYS_ENCODING,
'replace')
new_file_name = helpers.replace_illegal_chars(new_file_name).encode(
headphones.SYS_ENCODING, 'replace')
if headphones.CONFIG.FILE_UNDERSCORES:
new_file_name = new_file_name.replace(' ', '_')
@@ -1166,8 +1049,8 @@ def renameFiles(albumpath, downloaded_track_list, release):
new_file = os.path.join(albumpath, new_file_name)
if downloaded_track == new_file_name:
logger.debug("Renaming for: " + downloaded_track.decode(headphones.SYS_ENCODING,
'replace') + " is not neccessary")
logger.debug("Renaming for: " + downloaded_track.decode(
headphones.SYS_ENCODING, 'replace') + " is not neccessary")
continue
logger.debug('Renaming %s ---> %s',