This commit is contained in:
rembo10
2016-02-20 21:19:03 +00:00
25 changed files with 1620 additions and 172 deletions

16
.gitignore vendored
View File

@@ -1,3 +1,13 @@
[Tt]est[Rr]esult*
/cache
/logs
.project
.pydevproject
# coverage generated:
/cover-html/
.coverage
.coveralls.yml
# Compiled source #
###################
@@ -61,8 +71,4 @@ Thumbs.db
obj/
[Rr]elease*/
_ReSharper*/
[Tt]est[Rr]esult*
/cache
/logs
.project
.pydevproject
.vscode

View File

@@ -2,19 +2,36 @@
# http://about.travis-ci.org/docs/
language: python
sudo: false
cache:
pip: true
directories:
- lib
# Available Python versions:
# http://about.travis-ci.org/docs/user/ci-environment/#Python-VM-images
python:
- "2.6"
- "2.7"
matrix:
include:
- python: "2.7"
env: SENDCOVERAGE=1
# pylint 1.4 does not run under python 2.6
install:
- pip install pyOpenSSL
- pip install pylint==1.3.1
- pip install pyflakes
- pip install pep8
- pip install pyOpenSSL
- pip install pylint==1.3.1
- pip install pyflakes
- pip install pep8
# coverage stuff:
- pip install coveralls
- pip install coverage
script:
- pep8 headphones
- pyflakes headphones
- nosetests headphones
- nosetests
after_success:
# coverage stuff:
- if [ $SENDCOVERAGE ]; then coveralls; fi

View File

@@ -1,5 +1,18 @@
# Changelog
## v0.5.11
Released 20 February 2016
Highlights:
* Added: Soft chroot option
* Fixed: Post processing temporary directory fix (#2504)
* Fixed: Ubuntu init script (#2509)
* Fixed: Image cache uncaught exception (#2485)
* Improved: $Date/$date variable in folder renaming
* Improved: Reuse transmission session id
The full list of commits can be found [here](https://github.com/rembo10/headphones/compare/v0.5.10...v0.5.11).
## v0.5.10
Released 29 January 2016

View File

@@ -152,7 +152,10 @@ def main():
headphones.DB_FILE = os.path.join(headphones.DATA_DIR, 'headphones.db')
# Read config and start logging
headphones.initialize(config_file)
try:
headphones.initialize(config_file)
except headphones.exceptions.SoftChrootError as e:
raise SystemExit('FATAL ERROR')
if headphones.DAEMON:
headphones.daemonize()

View File

@@ -1095,7 +1095,7 @@
</div>
<div class="row">
<label>Plex Token</label><input type="text" name="plex_token" value="${config['plex_token']}" size="30">
<small>Plex Token (for use with Plex Home)</small>
<small>Plex Token (for use with Plex Home)</small>
</div>
<div class="checkbox row">
<input type="checkbox" name="plex_update" value="1" ${config['plex_update']} /><label>Update Plex Library</label>

View File

@@ -229,6 +229,17 @@ textarea,
button {
font: 99%;
}
select {
-moz-border-radius: 5px;
-webkit-border-radius: 5px;
border-radius: 5px;
background: #4F4F4F;
border: 0;
border-bottom: 1px solid rgba(0, 0, 0, 0.25);
color: #fff;
padding: 3px 10px;
text-shadow: 0 -1px 1px rgba(0, 0, 0, 0.25);
}
textarea {
overflow: auto;
}

View File

@@ -124,7 +124,23 @@ table {
// Forms
select, input, textarea, button { font: 99%;}
select, input, textarea, button
{
font: 99%;
}
select
{
.rounded(5px);
background: #4F4F4F;
border: 0;
border-bottom: 1px solid rgba(0, 0, 0, 0.25);
color: #fff;
padding: 3px 10px;
text-shadow: 0 -1px 1px rgba(0, 0, 0, 0.25);
}
textarea {overflow: auto;}
input { .rounded(3px);}
input:invalid, textarea:invalid {

View File

@@ -29,7 +29,8 @@ from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.triggers.interval import IntervalTrigger
from headphones import versioncheck, logger
import headphones.config
from headphones.softchroot import SoftChroot
import headphones.exceptions
# (append new extras to the end)
POSSIBLE_EXTRAS = [
@@ -74,6 +75,7 @@ started = False
DATA_DIR = None
CONFIG = None
SOFT_CHROOT = None
DB_FILE = None
@@ -92,11 +94,11 @@ MIRRORLIST = ["musicbrainz.org", "headphones", "custom"]
UMASK = None
def initialize(config_file):
with INIT_LOCK:
global CONFIG
global SOFT_CHROOT
global _INITIALIZED
global CURRENT_VERSION
global LATEST_VERSION
@@ -136,6 +138,14 @@ def initialize(config_file):
logger.initLogger(console=not QUIET, log_dir=CONFIG.LOG_DIR,
verbose=VERBOSE)
try:
SOFT_CHROOT = SoftChroot(str(CONFIG.SOFT_CHROOT))
if SOFT_CHROOT.isEnabled():
logger.info("Soft-chroot enabled for dir: %s", str(CONFIG.SOFT_CHROOT))
except exceptions.SoftChrootError as e:
logger.error("SoftChroot error: %s", e)
raise e
if not CONFIG.CACHE_DIR:
# Put the cache dir in the data dir for now
CONFIG.CACHE_DIR = os.path.join(DATA_DIR, 'cache')

View File

@@ -0,0 +1,13 @@
#import unittest
#import mock
from headphones.unittestcompat import TestCase
import headphones.albumart
# no tests...
class AlbumArtTest(TestCase):
def test_nothing(self):
x = 100 - 2 * 50
if x:
headphones.albumart.getAlbumArt('asdf')
self.assertTrue(True)

View File

@@ -116,7 +116,7 @@ class Cache(object):
return None
for image in images:
if image['size'] == 'medium':
if image['size'] == 'medium' and '#text' in image:
thumb_url = image['#text']
break

View File

@@ -15,6 +15,19 @@ def bool_int(value):
value = 0
return int(bool(value))
class path(str):
"""Internal 'marker' type for paths in config."""
@staticmethod
def __call__(val):
return path(val)
def __new__(cls, *args, **kw):
hstr = str.__new__(cls, *args, **kw)
return hstr
def __repr__(self):
return 'headphones.config.path(%s)' % self
_CONFIG_DEFINITIONS = {
'ADD_ALBUM_ART': (int, 'General', 0),
@@ -31,11 +44,11 @@ _CONFIG_DEFINITIONS = {
'AUTO_ADD_ARTISTS': (int, 'General', 1),
'BITRATE': (int, 'General', 192),
'BLACKHOLE': (int, 'General', 0),
'BLACKHOLE_DIR': (str, 'General', ''),
'BLACKHOLE_DIR': (path, 'General', ''),
'BOXCAR_ENABLED': (int, 'Boxcar', 0),
'BOXCAR_ONSNATCH': (int, 'Boxcar', 0),
'BOXCAR_TOKEN': (str, 'Boxcar', ''),
'CACHE_DIR': (str, 'General', ''),
'CACHE_DIR': (path, 'General', ''),
'CACHE_SIZEMB': (int, 'Advanced', 32),
'CHECK_GITHUB': (int, 'General', 1),
'CHECK_GITHUB_INTERVAL': (int, 'General', 360),
@@ -44,8 +57,8 @@ _CONFIG_DEFINITIONS = {
'CONFIG_VERSION': (str, 'General', '0'),
'CORRECT_METADATA': (int, 'General', 0),
'CUE_SPLIT': (int, 'General', 1),
'CUE_SPLIT_FLAC_PATH': (str, 'General', ''),
'CUE_SPLIT_SHNTOOL_PATH': (str, 'General', ''),
'CUE_SPLIT_FLAC_PATH': (path, 'General', ''),
'CUE_SPLIT_SHNTOOL_PATH': (path, 'General', ''),
'CUSTOMAUTH': (int, 'General', 0),
'CUSTOMHOST': (str, 'General', 'localhost'),
'CUSTOMPASS': (str, 'General', ''),
@@ -53,12 +66,12 @@ _CONFIG_DEFINITIONS = {
'CUSTOMSLEEP': (int, 'General', 1),
'CUSTOMUSER': (str, 'General', ''),
'DELETE_LOSSLESS_FILES': (int, 'General', 1),
'DESTINATION_DIR': (str, 'General', ''),
'DESTINATION_DIR': (path, 'General', ''),
'DETECT_BITRATE': (int, 'General', 0),
'DO_NOT_PROCESS_UNMATCHED': (int, 'General', 0),
'DOWNLOAD_DIR': (str, 'General', ''),
'DOWNLOAD_DIR': (path, 'General', ''),
'DOWNLOAD_SCAN_INTERVAL': (int, 'General', 5),
'DOWNLOAD_TORRENT_DIR': (str, 'General', ''),
'DOWNLOAD_TORRENT_DIR': (path, 'General', ''),
'DO_NOT_OVERRIDE_GIT_BRANCH': (int, 'General', 0),
'EMAIL_ENABLED': (int, 'Email', 0),
'EMAIL_FROM': (str, 'Email', ''),
@@ -74,14 +87,14 @@ _CONFIG_DEFINITIONS = {
'EMBED_LYRICS': (int, 'General', 0),
'ENABLE_HTTPS': (int, 'General', 0),
'ENCODER': (str, 'General', 'ffmpeg'),
'ENCODERFOLDER': (str, 'General', ''),
'ENCODERFOLDER': (path, 'General', ''),
'ENCODERLOSSLESS': (int, 'General', 1),
'ENCODEROUTPUTFORMAT': (str, 'General', 'mp3'),
'ENCODERQUALITY': (int, 'General', 2),
'ENCODERVBRCBR': (str, 'General', 'cbr'),
'ENCODER_MULTICORE': (int, 'General', 0),
'ENCODER_MULTICORE_COUNT': (int, 'General', 0),
'ENCODER_PATH': (str, 'General', ''),
'ENCODER_PATH': (path, 'General', ''),
'EXTRAS': (str, 'General', ''),
'EXTRA_NEWZNABS': (list, 'Newznab', ''),
'EXTRA_TORZNABS': (list, 'Torznab', ''),
@@ -94,7 +107,7 @@ _CONFIG_DEFINITIONS = {
'FOLDER_PERMISSIONS': (str, 'General', '0755'),
'FREEZE_DB': (int, 'General', 0),
'GIT_BRANCH': (str, 'General', 'master'),
'GIT_PATH': (str, 'General', ''),
'GIT_PATH': (path, 'General', ''),
'GIT_USER': (str, 'General', 'rembo10'),
'GROWL_ENABLED': (int, 'Growl', 0),
'GROWL_HOST': (str, 'Growl', ''),
@@ -103,8 +116,8 @@ _CONFIG_DEFINITIONS = {
'HEADPHONES_INDEXER': (bool_int, 'General', False),
'HPPASS': (str, 'General', ''),
'HPUSER': (str, 'General', ''),
'HTTPS_CERT': (str, 'General', ''),
'HTTPS_KEY': (str, 'General', ''),
'HTTPS_CERT': (path, 'General', ''),
'HTTPS_KEY': (path, 'General', ''),
'HTTP_HOST': (str, 'General', 'localhost'),
'HTTP_PASSWORD': (str, 'General', ''),
'HTTP_PORT': (int, 'General', 8181),
@@ -114,8 +127,8 @@ _CONFIG_DEFINITIONS = {
'IDTAG': (int, 'Beets', 0),
'IGNORE_CLEAN_RELEASES': (int, 'General', 0),
'IGNORED_WORDS': (str, 'General', ''),
'IGNORED_FOLDERS': (list, 'Advanced', []),
'IGNORED_FILES': (list, 'Advanced', []),
'IGNORED_FOLDERS': (list, 'Advanced', []), # path
'IGNORED_FILES': (list, 'Advanced', []), # path
'INCLUDE_EXTRAS': (int, 'General', 0),
'INTERFACE': (str, 'General', 'default'),
'JOURNAL_MODE': (str, 'Advanced', 'wal'),
@@ -130,17 +143,17 @@ _CONFIG_DEFINITIONS = {
'LIBRARYSCAN_INTERVAL': (int, 'General', 300),
'LMS_ENABLED': (int, 'LMS', 0),
'LMS_HOST': (str, 'LMS', ''),
'LOG_DIR': (str, 'General', ''),
'LOG_DIR': (path, 'General', ''),
'LOSSLESS_BITRATE_FROM': (int, 'General', 0),
'LOSSLESS_BITRATE_TO': (int, 'General', 0),
'LOSSLESS_DESTINATION_DIR': (str, 'General', ''),
'LOSSLESS_DESTINATION_DIR': (path, 'General', ''),
'MB_IGNORE_AGE': (int, 'General', 365),
'MININOVA': (int, 'Mininova', 0),
'MININOVA_RATIO': (str, 'Mininova', ''),
'MIRROR': (str, 'General', 'musicbrainz.org'),
'MOVE_FILES': (int, 'General', 0),
'MPC_ENABLED': (bool_int, 'MPC', False),
'MUSIC_DIR': (str, 'General', ''),
'MUSIC_DIR': (path, 'General', ''),
'MUSIC_ENCODER': (int, 'General', 0),
'NEWZNAB': (int, 'Newznab', 0),
'NEWZNAB_APIKEY': (str, 'Newznab', ''),
@@ -223,6 +236,7 @@ _CONFIG_DEFINITIONS = {
'SAB_USERNAME': (str, 'SABnzbd', ''),
'SAMPLINGFREQUENCY': (int, 'General', 44100),
'SEARCH_INTERVAL': (int, 'General', 1440),
'SOFT_CHROOT': (path, 'General', ''),
'SONGKICK_APIKEY': (str, 'Songkick', 'nd1We7dFW2RqxPw8'),
'SONGKICK_ENABLED': (int, 'Songkick', 1),
'SONGKICK_FILTER_ENABLED': (int, 'Songkick', 0),
@@ -234,7 +248,7 @@ _CONFIG_DEFINITIONS = {
'SUBSONIC_PASSWORD': (str, 'Subsonic', ''),
'SUBSONIC_USERNAME': (str, 'Subsonic', ''),
'SYNOINDEX_ENABLED': (int, 'Synoindex', 0),
'TORRENTBLACKHOLE_DIR': (str, 'General', ''),
'TORRENTBLACKHOLE_DIR': (path, 'General', ''),
'TORRENT_DOWNLOADER': (int, 'General', 0),
'TORRENT_REMOVAL_INTERVAL': (int, 'General', 720),
'TORZNAB': (int, 'Torznab', 0),
@@ -295,7 +309,7 @@ class Config(object):
definition = _CONFIG_DEFINITIONS[key]
if len(definition) == 3:
definition_type, section, default = definition
else:
elif len(definition) == 4:
definition_type, section, _, default = definition
return key, definition_type, section, ini_key, default

439
headphones/config_test.py Normal file
View File

@@ -0,0 +1,439 @@
import mock
from mock import MagicMock
import headphones.config
import re
import unittestcompat
from unittestcompat import TestCase, TestArgs
class ConfigApiTest(TestCase):
""" Common tests for headphones.Config
Common tests for headphones.Config This test suite guarantees, that external
API of the Config class conforms all expectations of other modules.
"""
def _setUpConfigMock(self, mock, sections):
# every constructor `xx = ConfigObj()` in headphones.config will return
# this mock:
self.config_mock = self.config_module_mock.return_value = mock
if sections:
mock.__contains__.side_effect = sections.__contains__
mock.__getitem__.side_effect = sections.__getitem__
mock.__setitem__.side_effect = sections.__setitem__
mock.items.side_effect = sections.items
return mock
def setUp(self):
# patch for low-level ConfigObj for entire test class
# result - each test_* method will get one additional
# argument during testing
self.config_module_mock_patcher = mock.patch('headphones.config.ConfigObj', name='ConfigObjModuleMock')
self.config_module_mock = self.config_module_mock_patcher.start()
existing_sections = {'General': {}, 'Email': {}}
# every constructor `xx = ConfigObj()` in headphones.config will return
# this mock:
self._setUpConfigMock(MagicMock(), existing_sections)
def tearDown(self):
self.config_module_mock_patcher.stop()
def test_constructor(self):
""" Config : creating """
cf = headphones.config.Config('/tmp/notexist')
self.assertIsInstance(cf, headphones.config.Config)
@TestArgs(
# this sections are explicitly added in the test body:
('General', False),
('Email', False),
# this sections will not be created nor in the test, either in the
# Config module
('some_new_section_never_defined', True),
('another_new_section_never_defined', True),
)
def test_check_section(self, section_name, expected_return):
""" Config : check_section """
path = '/tmp/notexist'
# call methods
c = headphones.config.Config(path)
res = c.check_section(section_name)
res2 = c.check_section(section_name)
# assertions:
self.assertEqual(res, expected_return)
self.assertFalse(res2)
@TestArgs(
('api_enabled', 0, int),
('Api_Key', '', str),
)
def test_check_setting(self, setting_name, expected_return, expected_instance):
""" Config: check_setting , basic cases """
path = '/tmp/notexist'
# call methods
c = headphones.config.Config(path)
res = c.check_setting(setting_name)
res2 = c.check_setting(setting_name)
# assertions:
self.assertIsInstance(res, expected_instance)
self.assertEqual(res, expected_return)
self.assertEqual(res, res2)
@TestArgs(
(''),
('This_IsNew_Name'),
)
def test_check_setting_raise_on_unknown_settings(self, setting_name):
""" Config: check_setting should raise on unknown """
path = '/tmp/notexist'
exc_regex = re.compile(setting_name, re.IGNORECASE)
# call methods
c = headphones.config.Config(path)
# assertions:
with self.assertRaisesRegexp(KeyError, exc_regex):
c.check_setting(setting_name)
pass
@TestArgs(
(None)
)
def test_check_setting_raise_on_none(self, setting_name):
""" Config: check_setting shoud raise on None name """
path = '/tmp/notexist'
# call methods
c = headphones.config.Config(path)
# assertions:
with self.assertRaises(AttributeError):
c.check_setting(setting_name)
pass
def test_write(self):
""" Config : write """
path = '/tmp/notexist'
# overload mocks, defined in setUp:
old_conf_mock = self._setUpConfigMock(MagicMock(), {'a': {}})
option_name_not_from_definitions = 'some_invalid_option_with_super_uniq1_name'
option_name_not_from_definitions_value = 1
old_conf_mock['asdf'] = {option_name_not_from_definitions: option_name_not_from_definitions_value}
# call methods
cf = headphones.config.Config(path)
# overload mock-patching for NEW CONFIG
new_patcher = mock.patch('headphones.config.ConfigObj', name='NEW_ConfigObjModuleMock_FOR_WRITE')
new_conf_module_mock = new_patcher.start()
new_conf_mock = \
new_conf_module_mock.return_value = \
MagicMock()
cf.write()
new_patcher.stop()
# assertions:
self.assertFalse(old_conf_mock.write.called, 'write not called for old config')
self.assertTrue(new_conf_mock.write.called, 'write called for new config')
self.assertEqual(new_conf_mock.filename, path)
new_conf_mock['General'].__setitem__.assert_any_call('download_dir', '')
# from 3.5... new_conf_mock['asdf'].__setitem__.assert_not_called('download_dir', '')
new_conf_mock['asdf'].__setitem__.assert_any_call(option_name_not_from_definitions, option_name_not_from_definitions_value)
@unittestcompat.skip("process_kwargs should be removed")
def test_process_kwargs(self):
self.assertTrue(True)
# ===========================================================
# GET ATTR
# ===========================================================
@TestArgs(
('ADD_ALBUM_ART', True),
('ALBUM_ART_FORMAT', 'shmolder'),
('API_ENABLED', 1),
('API_KEY', 'Hello'),
)
def test__getattr__ConfValues(self, name, value):
""" Config: __getattr__ with setting value explicit """
path = '/tmp/notexist'
self.config_mock["General"] = {name.lower(): value}
# call methods
c = headphones.config.Config(path)
act = c.__getattr__(name)
# assertions:
self.assertEqual(act, value)
@TestArgs(
('ADD_ALBUM_ART', 0),
('ALBUM_ART_FORMAT', 'folder'),
('API_ENABLED', 0),
('API_KEY', ''),
)
def test__getattr__ConfValuesDefault(self, name, value):
""" Config: __getattr__ from config(by braces), default values """
path = '/tmp/notexist'
# call methods
c = headphones.config.Config(path)
res = c.__getattr__(name)
# assertions:
self.assertEqual(res, value)
def test__getattr__ConfValuesDefaultUsingDotNotation(self):
""" Config: __getattr__ from config (by dot), default values """
path = '/tmp/notexist'
# call methods
c = headphones.config.Config(path)
# assertions:
self.assertEqual(c.ALBUM_ART_FORMAT, 'folder')
self.assertEqual(c.API_ENABLED, 0)
self.assertEqual(c.API_KEY, '')
def test__getattr__OwnAttributes(self):
""" Config: __getattr__ access own attrs """
path = '/tmp/notexist'
# call methods
c = headphones.config.Config(path)
# assertions:
self.assertIsNotNone(c)
self.assertIn('<headphones.config.Config', c.__str__())
# ===========================================================
# SET ATTR
# ===========================================================
@TestArgs(
('ADD_ALBUM_ART', True),
('ALBUM_ART_FORMAT', 'shmolder'),
('API_ENABLED', 1),
('API_KEY', 'Hello'),
)
def test__setattr__ConfValuesDefault(self, name, value):
""" Config: __setattr__ with setting value explicit """
path = '/tmp/notexist'
# call methods
c = headphones.config.Config(path)
act = c.__setattr__(name, value)
# assertions:
self.assertEqual(self.config_mock["General"][name.lower()], value)
self.assertEqual(act, value)
def test__setattr__ExplicitSetUsingDotNotation(self):
""" Config: __setattr__ with setting values using dot notation """
path = '/tmp/notexist'
# call methods
c = headphones.config.Config(path)
act1 = c.ALBUM_ART_FORMAT = 'Apple'
act2 = c.API_ENABLED = True
act3 = c.API_KEY = 123
# assertions:
self.assertEqual(self.config_mock["General"]['album_art_format'], 'Apple')
self.assertEqual(self.config_mock["General"]['api_enabled'], 1)
self.assertEqual(self.config_mock["General"]['api_key'], '123')
self.assertEqual(act1, 'Apple')
self.assertEqual(act2, 1)
# TODO : check this trange behaviour. I have expected to see here '123', not 123.
self.assertEqual(act3, 123)
# ===========================================================
# NEWZNABS
#
@TestArgs(
('', []),
('ABCDEF', [('A', 'B', 'C'), ('D', 'E', 'F')]),
(['ABC', 'DEF'], []),
([1], []),
([1, 2], []),
([1, 2, 3], [(1, 2, 3)]),
([1, 2, 3, 'Aaa'], [(1, 2, 3)]),
([1, 2, 3, 'Aaa', 'Bbba'], [(1, 2, 3)]),
([1, 2, 3, 'Aaa', 'Bbba', 'Ccccc'], [(1, 2, 3), ('Aaa', 'Bbba', 'Ccccc')]),
([1, 2, 3, 'Aaa', 'Bbba', 'Ccccc', 'Ddddda'], [(1, 2, 3), ('Aaa', 'Bbba', 'Ccccc')]),
)
def test_get_extra_newznabs(self, conf_value, expected):
""" Config: get_extra_newznabs """
path = '/tmp/notexist'
#itertools.izip(*[itertools.islice('', i, None, 3) for i in range(3)])
# set up mocks:
# 'EXTRA_NEWZNABS': (list, 'Newznab', ''),
# 'EXTRA_TORZNABS': (list, 'Torznab', ''),
self.config_mock["Newznab"] = {"extra_newznabs": conf_value}
# call methods
c = headphones.config.Config(path)
res = c.get_extra_newznabs()
# assertions:
self.assertEqual(res, expected)
def test_clear_extra_newznabs(self):
""" Config: clear_extra_newznabs """
path = '/tmp/notexist'
random_value = 1827746
self.config_mock["Newznab"] = {"extra_newznabs": [1, 2, 3]}
self.config_mock["Newznab"] = {"do_not_touch": random_value}
# call methods
c = headphones.config.Config(path)
res = c.clear_extra_newznabs()
# assertions:
self.assertIsNone(res)
self.assertEqual(self.config_mock["Newznab"]["extra_newznabs"], [])
self.assertEqual(self.config_mock["Newznab"]["do_not_touch"], random_value)
@TestArgs(
([], [''], ['']),
([], 'ABCDEF', ['A', 'B', 'C', 'D', 'E', 'F']),
([1, 2, [False, True]], ['3', [0, 0]], [1, 2, [False, True], '3', [0, 0]]),
)
def test_add_extra_newznab(self, initial, added, expected):
""" Config: add_extra_newznab """
path = '/tmp/notexist'
self.config_mock["Newznab"] = {"extra_newznabs": initial}
# call methods
c = headphones.config.Config(path)
c.add_extra_newznab(added)
act = self.config_mock["Newznab"]["extra_newznabs"]
# assertions:
self.assertEqual(act, expected)
@TestArgs(
(None),
([]),
([1, 2, 3]),
([True]),
)
def test_add_extra_newznab_raise_on_none(self, initial):
""" Config: add_extra_newznab should raise on None adding"""
path = '/tmp/notexist'
self.config_mock["Newznab"] = {"extra_newznabs": initial}
# call methods
c = headphones.config.Config(path)
with self.assertRaises(TypeError):
c.add_extra_newznab(None)
pass
# ===========================================================
# TORZNABS
# TODO : here is copypaste from of NEZNABS tests. Make tests better, plz refactor them
#
@TestArgs(
('', []),
('ABCDEF', [('A', 'B', 'C'), ('D', 'E', 'F')]),
(['ABC', 'DEF'], []),
([1], []),
([1, 2], []),
([1, 2, 3], [(1, 2, 3)]),
([1, 2, 3, 'Aaa'], [(1, 2, 3)]),
([1, 2, 3, 'Aaa', 'Bbba'], [(1, 2, 3)]),
([1, 2, 3, 'Aaa', 'Bbba', 'Ccccc'], [(1, 2, 3), ('Aaa', 'Bbba', 'Ccccc')]),
([1, 2, 3, 'Aaa', 'Bbba', 'Ccccc', 'Ddddda'], [(1, 2, 3), ('Aaa', 'Bbba', 'Ccccc')]),
)
def test_get_extra_torznabs(self, conf_value, expected):
""" Config: get_extra_torznabs """
path = '/tmp/notexist'
#itertools.izip(*[itertools.islice('', i, None, 3) for i in range(3)])
# set up mocks:
# 'EXTRA_TORZNABS': (list, '', ''),
self.config_mock["Torznab"] = {"extra_torznabs": conf_value}
# call methods
c = headphones.config.Config(path)
res = c.get_extra_torznabs()
# assertions:
self.assertEqual(res, expected)
def test_clear_extra_torznabs(self):
""" Config: clear_extra_torznabs """
path = '/tmp/notexist'
random_value = -1292721
self.config_mock["Torznab"] = {"extra_torznabs": [1, 2, 3]}
self.config_mock["Torznab"] = {"do_not_touch": random_value}
# call methods
c = headphones.config.Config(path)
res = c.clear_extra_torznabs()
# assertions:
self.assertIsNone(res)
self.assertEqual(self.config_mock["Torznab"]["extra_torznabs"], [])
self.assertEqual(self.config_mock["Torznab"]["do_not_touch"], random_value)
@TestArgs(
([], [''], ['']),
([], 'ABCDEF', ['A', 'B', 'C', 'D', 'E', 'F']),
([1, 2, [False, True]], ['3', [0, 0]], [1, 2, [False, True], '3', [0, 0]]),
)
def test_add_extra_torznab(self, initial, added, expected):
""" Config: add_extra_torznab """
path = '/tmp/notexist'
self.config_mock["Torznab"] = {"extra_torznabs": initial}
# call methods
c = headphones.config.Config(path)
c.add_extra_torznab(added)
act = self.config_mock["Torznab"]["extra_torznabs"]
# assertions:
self.assertEqual(act, expected)
@TestArgs(
(None),
([]),
([1, 2, 3]),
([True]),
)
def test_add_extra_torznab_raise_on_none(self, initial):
""" Config: add_extra_torznab should raise on None adding"""
path = '/tmp/notexist'
self.config_mock["Torznab"] = {"extra_torznabs": initial}
# call methods
c = headphones.config.Config(path)
with self.assertRaises(TypeError):
c.add_extra_torznab(None)
pass

View File

@@ -24,3 +24,9 @@ class NewzbinAPIThrottled(HeadphonesException):
"""
Newzbin has throttled us, deal with it
"""
class SoftChrootError(HeadphonesException):
"""
Fatal errors in SoftChroot module
"""
pass

View File

@@ -191,11 +191,13 @@ def piratesize(size):
def replace_all(text, dic, normalize=False):
from headphones import pathrender
if not text:
return ''
for i, j in dic.iteritems():
if normalize:
if normalize:
new_dic = {}
for i, j in dic.iteritems():
try:
if sys.platform == 'darwin':
j = unicodedata.normalize('NFD', j)
@@ -203,8 +205,9 @@ def replace_all(text, dic, normalize=False):
j = unicodedata.normalize('NFC', j)
except TypeError:
j = unicodedata.normalize('NFC', j.decode(headphones.SYS_ENCODING, 'replace'))
text = text.replace(i, j)
return text
new_dic[i] = j
dic = new_dic
return pathrender.render(text, dic)[0]
def replace_illegal_chars(string, type="file"):

228
headphones/pathrender.py Normal file
View File

@@ -0,0 +1,228 @@
# encoding=utf8
# This file is part of Headphones.
#
# Headphones is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Headphones is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Headphones. If not, see <http://www.gnu.org/licenses/>.
'''Path pattern substitution module, see details below for syntax.
The pattern matching is loosely based on foobar2000 pattern syntax,
i.e. the notion of escaping characters with \' and optional elements
enclosed in square brackets [] is taken from there while the
substitution variable names are Perl-ish or sh-ish. The following
syntax elements are supported:
* escaped literal strings, that is everything that is enclosed
within single quotes (like \'this\');
* substitution variables, which start with dollar sign ($) and
extend until next non-alphanumeric+underscore character
(like $This and $5_that).
* optional elements enclosed in square brackets, which render
nonempty value only if any variable or optional inside returned
nonempty value, ignoring literals (like [\'[\'$That\']\' ]).
'''
from __future__ import print_function
from enum import Enum
__author__ = "Andrzej Ciarkowski <andrzej.ciarkowski@gmail.com>"
class _PatternElement(object):
'''ABC for hierarchy of path name renderer pattern elements.'''
def render(self, replacement):
# type: (Mapping[str,str]) -> str
'''Format this _PatternElement into string using provided substitution dictionary.'''
raise NotImplementedError()
class _Generator(_PatternElement):
# pylint: disable=abstract-method
'''Tagging interface for "content-generating" elements like replacement or optional block.'''
pass
class _Replacement(_Generator):
'''Replacement variable, eg. $title.'''
def __init__(self, pattern):
# type: (str)
self._pattern = pattern
def render(self, replacement):
# type: (Mapping[str,str]) -> str
return replacement.get(self._pattern, self._pattern)
def __str__(self):
return self._pattern
class _LiteralText(_PatternElement):
'''Just a plain piece of text to be rendered "as is".'''
def __init__(self, text):
# type: (str)
self._text = text
def render(self, replacement):
# type: (Mapping[str,str]) -> str
return self._text
def __str__(self):
return self._text
class _OptionalBlock(_Generator):
'''Optional block will render its contents only if any _Generator in its scope did return non-empty result.'''
def __init__(self, scope):
# type: ([_PatternElement])
self._scope = scope
def render(self, replacement):
# type: (Mapping[str,str]) -> str
res = [(isinstance(x, _Generator), x.render(replacement)) for x in self._scope]
if any((t[0] and len(t[1]) != 0) for t in res):
return u"".join(t[1] for t in res)
else:
return u""
_OPTIONAL_START = u'['
_OPTIONAL_END = u']'
_ESCAPE_CHAR = u'\''
_REPLACEMENT_START = u'$'
def _is_replacement_valid(c):
# type: (str) -> bool
return c.isalnum() or c == u'_'
class _State(Enum):
LITERAL = 0
ESCAPE = 1
REPLACEMENT = 2
def _append_literal(scope, text):
# type: ([_PatternElement], str) -> None
'''Append literal text to the scope BUT ONLY if it's not an empty string.'''
if len(text) == 0:
return
scope.append(_LiteralText(text))
class Warnings(Enum):
'''Pattern parsing warnings, as stored withing warnings property of Pattern object after parsing.'''
UNCLOSED_ESCAPE = 'Warnings.UNCLOSED_ESCAPE'
UNCLOSED_OPTIONAL = 'Warnings.UNCLOSED_OPTIONAL'
def _parse_pattern(pattern, warnings):
# type: (str,MutableSet[Warnings]) -> [_PatternElement]
'''Parse path pattern text into list of _PatternElements, put warnings into the provided set.'''
start = 0 # index of current state start char
root_scope = [] # here our _PatternElements will reside
scope_stack = [root_scope] # stack so that we can return to the outer scope
scope = root_scope # pointer to the current list for _OptionalBlock
inside_optional = 0 # nesting level of _OptionalBlocks
state = _State.LITERAL # current state
for i, c in enumerate(pattern):
if state is _State.ESCAPE:
if c != _ESCAPE_CHAR:
# only escape char can get us out of _State.ESCAPE
continue
_append_literal(scope, pattern[start + 1:i])
state = _State.LITERAL
start = i + 1
# after exiting _State.ESCAPE on escape char no more processing of c
continue
if state is _State.REPLACEMENT:
if _is_replacement_valid(c):
# only replacement invalid can get us out _State.REPLACEMENT
continue
scope.append(_Replacement(pattern[start:i]))
state = _State.LITERAL
start = i
# intentional fall-through to _State.LITERAL
assert state is _State.LITERAL
if c == _ESCAPE_CHAR:
_append_literal(scope, pattern[start:i])
state = _State.ESCAPE
start = i
# no more processing to escape char c
continue
if c == _REPLACEMENT_START:
_append_literal(scope, pattern[start:i])
state = _State.REPLACEMENT
start = i
# no more processing to replacement char c
continue
if c == _OPTIONAL_START:
_append_literal(scope, pattern[start:i])
inside_optional += 1
new_scope = []
scope_stack.append(new_scope)
scope = new_scope
start = i + 1
continue
if c == _OPTIONAL_END:
if inside_optional == 0:
# no optional block to end, just treat as literal text
continue
inside_optional -= 1
_append_literal(scope, pattern[start:i])
scope_stack.pop()
prev_scope = scope_stack[-1]
prev_scope.append(_OptionalBlock(scope))
scope = prev_scope
start = i + 1
# fi
# done
if state is _State.ESCAPE:
warnings.add(Warnings.UNCLOSED_ESCAPE)
if inside_optional != 0:
warnings.add(Warnings.UNCLOSED_OPTIONAL)
if state is _State.REPLACEMENT:
root_scope.append(_Replacement(pattern[start:]))
else:
# don't care about unclosed elements :P
_append_literal(root_scope, pattern[start:])
return root_scope
class Pattern(object):
'''Stores preparsed rename pattern for repeated use.
If using the same pattern repeatedly it is much more effective
to parse the pattern into Pattern object and use it instead of
parsing the textual pattern on each substitution. To use Pattern
object for substitution simply call it as it was function
providing dictionary as an argument (see __call__()).'''
def __init__(self, pattern):
# type: (str)
self._warnings = set()
self._pattern = _parse_pattern(pattern, self._warnings)
def __call__(self, replacement):
# type: (Mapping[str,str]) -> str
'''Execute path rendering/substitution based on replacement dictionary.'''
return u"".join(p.render(replacement) for p in self._pattern)
def _get_warnings(self):
# type: () -> str
'''Getter for warnings property.'''
return self._warnings
warnings = property(_get_warnings, doc="Access warnings raised during pattern parsing")
def render(pattern, replacement):
# type: (str, Mapping[str,str]) -> (str, AbstractSet[Warnings])
'''Render path name based on replacement pattern and dictionary.'''
p = Pattern(pattern)
return p(replacement), p.warnings
if __name__ == "__main__":
# primitive test ;)
p = Pattern(u"[$Disc.]$Track - $Artist - $Title[ '['$Year']'")
d = {'$Disc': '', '$Track': '05', '$Artist': u'Grzegżółka', '$Title': u'Błona kapłona', '$Year': '2019'}
print(p(d).encode('utf8'), p.warnings)

View File

@@ -314,12 +314,13 @@ def doPostProcessing(albumid, albumpath, release, tracks, downloaded_track_list,
new_folder = None
# Check to see if we're preserving the torrent dir
if (headphones.CONFIG.KEEP_TORRENT_FILES and Kind == "torrent" and 'headphones-modified' not in albumpath) or headphones.CONFIG.KEEP_ORIGINAL_FOLDER or keep_original_folder:
new_folder = os.path.join(tempfile.mkdtemp(prefix="headphones_"), "headphones")
logger.info("Copying files to " + new_folder.decode(headphones.SYS_ENCODING, 'replace') + " subfolder to preserve downloaded files for seeding")
new_folder = tempfile.mkdtemp(prefix="headphones_")
subdir = os.path.join(new_folder, "headphones")
logger.info("Copying files to " + subdir.decode(headphones.SYS_ENCODING, 'replace') + " subfolder to preserve downloaded files for seeding")
try:
shutil.copytree(albumpath, new_folder)
shutil.copytree(albumpath, subdir)
# Update the album path with the new location
albumpath = new_folder
albumpath = subdir
except Exception as e:
logger.warn("Cannot copy/move files to temp folder: " + new_folder.decode(headphones.SYS_ENCODING, 'replace') + ". Not continuing. Error: " + str(e))
shutil.rmtree(new_folder)
@@ -665,10 +666,10 @@ def renameNFO(albumpath):
def moveFiles(albumpath, release, tracks):
logger.info("Moving files: %s" % albumpath)
try:
year = release['ReleaseDate'][:4]
date = release['ReleaseDate']
except TypeError:
year = u''
date = u''
year = date[:4]
artist = release['ArtistName'].replace('/', '_')
album = release['AlbumTitle'].replace('/', '_')
if headphones.CONFIG.FILE_UNDERSCORES:
@@ -698,6 +699,7 @@ def moveFiles(albumpath, release, tracks):
'$SortArtist': sortname,
'$Album': album,
'$Year': year,
'$Date': date,
'$Type': releasetype,
'$OriginalFolder': origfolder,
'$First': firstchar.upper(),
@@ -705,6 +707,7 @@ def moveFiles(albumpath, release, tracks):
'$sortartist': sortname.lower(),
'$album': album.lower(),
'$year': year,
'$date': date,
'$type': releasetype.lower(),
'$first': firstchar.lower(),
'$originalfolder': origfolder.lower()
@@ -1056,9 +1059,11 @@ def embedLyrics(downloaded_track_list):
def renameFiles(albumpath, downloaded_track_list, release):
logger.info('Renaming files')
try:
year = release['ReleaseDate'][:4]
date = release['ReleaseDate']
except TypeError:
year = ''
date = u''
year = date[:4]
# Until tagging works better I'm going to rely on the already provided metadata
for downloaded_track in downloaded_track_list:
@@ -1107,13 +1112,15 @@ def renameFiles(albumpath, downloaded_track_list, release):
'$SortArtist': sortname,
'$Album': release['AlbumTitle'],
'$Year': year,
'$Date': date,
'$disc': discnumber,
'$track': tracknumber,
'$title': title.lower(),
'$artist': artistname.lower(),
'$sortartist': sortname.lower(),
'$album': release['AlbumTitle'].lower(),
'$year': year
'$year': year,
'$date': date
}
ext = os.path.splitext(downloaded_track)[1]

70
headphones/softchroot.py Normal file
View File

@@ -0,0 +1,70 @@
import os
from headphones.exceptions import SoftChrootError
class SoftChroot(object):
""" SoftChroot provides SOFT chrooting for UI
IMPORTANT: call methods of this class just in modules, which generates data for client UI. Try to avoid unnecessary usage.
"""
enabled = False
chroot = None
def __init__(self, path):
if not path:
#disabled
return
path = path.strip()
if not path:
return
if (not os.path.exists(path) or
not os.path.isdir(path)):
raise SoftChrootError('No such directory: %s' % path)
path = path.rstrip(os.path.sep) + os.path.sep
self.enabled = True
self.chroot = path
def isEnabled(self):
return self.enabled
def getRoot(self):
return self.chroot
def apply(self, path):
if not self.enabled:
return path
if not path:
return path
p = path.strip()
if not p:
return path
if path.startswith(self.chroot):
p = os.path.sep + path[len(self.chroot):]
else:
p = os.path.sep
return p
def revoke(self, path):
if not self.enabled:
return path
if not path:
return path
p = path.strip()
if not p:
return path
if os.path.sep == p[0]:
p = p[1:]
p = self.chroot + p
return p

View File

@@ -0,0 +1,121 @@
import os
import mock
from headphones.unittestcompat import TestCase, TestArgs
#from mock import MagicMock
from headphones.softchroot import SoftChroot
from headphones.exceptions import SoftChrootError
class SoftChrootTest(TestCase):
def test_create(self):
""" create headphones.SoftChroot """
cf = SoftChroot('/tmp/')
self.assertIsInstance(cf, SoftChroot)
self.assertTrue(cf.isEnabled())
self.assertEqual(cf.getRoot(), '/tmp/')
@TestArgs(
(None),
(''),
(' '),
)
def test_create_disabled(self, empty_path):
""" create DISABLED SoftChroot """
cf = SoftChroot(empty_path)
self.assertIsInstance(cf, SoftChroot)
self.assertFalse(cf.isEnabled())
self.assertIsNone(cf.getRoot())
def test_create_on_not_exists_dir(self):
""" create SoftChroot on non existent dir """
path = os.path.join('/tmp', 'notexist', 'asdf', '11', '12', 'np', 'itsssss')
cf = None
with self.assertRaises(SoftChrootError) as exc:
cf = SoftChroot(path)
self.assertIsNone(cf)
self.assertRegexpMatches(str(exc.exception), r'No such directory')
self.assertRegexpMatches(str(exc.exception), path)
@mock.patch('headphones.softchroot.os', wrap=os, name='OsMock')
def test_create_on_file(self, os_mock):
""" create SoftChroot on file, not a directory """
path = os.path.join('/tmp', 'notexist', 'asdf', '11', '12', 'np', 'itsssss')
os_mock.path.sep = os.path.sep
os_mock.path.isdir.side_effect = lambda x: x != path
cf = None
with self.assertRaises(SoftChrootError) as exc:
cf = SoftChroot(path)
self.assertIsNone(cf)
self.assertTrue(os_mock.path.isdir.called)
self.assertRegexpMatches(str(exc.exception), r'No such directory')
self.assertRegexpMatches(str(exc.exception), path)
@TestArgs(
(None, None),
('', ''),
(' ', ' '),
('/tmp/', '/'),
('/tmp/asdf', '/asdf'),
)
def test_apply(self, p, e):
""" apply SoftChroot """
sc = SoftChroot('/tmp/')
a = sc.apply(p)
self.assertEqual(a, e)
@TestArgs(
('/'),
('/nonch/path/asdf'),
('tmp/asdf'),
)
def test_apply_out_of_root(self, p):
""" apply SoftChroot to paths outside of the chroot """
sc = SoftChroot('/tmp/')
a = sc.apply(p)
self.assertEqual(a, '/')
@TestArgs(
(None, None),
('', ''),
(' ', ' '),
('/', '/tmp/'),
('/asdf', '/tmp/asdf'),
('/asdf/', '/tmp/asdf/'),
('localdir/adf', '/tmp/localdir/adf'),
('localdir/adf/', '/tmp/localdir/adf/'),
)
def test_revoke(self, p, e):
""" revoke SoftChroot """
sc = SoftChroot('/tmp/')
a = sc.revoke(p)
self.assertEqual(a, e)
@TestArgs(
(None),
(''),
(' '),
('/tmp'),
('/tmp/'),
('/tmp/asdf'),
('/tmp/localdir/adf'),
('localdir/adf'),
('localdir/adf/'),
)
def test_actions_on_disabled(self, p):
""" disabled SoftChroot should not change args on apply and revoke """
sc = SoftChroot(None)
a = sc.apply(p)
self.assertEqual(a, p)
r = sc.revoke(p)
self.assertEqual(r, p)

View File

@@ -25,9 +25,9 @@ import headphones
# This is just a simple script to send torrents to transmission. The
# intention is to turn this into a class where we can check the state
# of the download, set the download dir, etc.
# TODO: Store the session id so we don't need to make 2 calls
# Store torrent id so we can check up on it
# TODO: Store torrent id so we can check up on it
_session_id = None
def addTorrent(link, data=None):
method = 'torrent-add'
@@ -127,6 +127,7 @@ def removeTorrent(torrentid, remove_data=False):
def torrentAction(method, arguments):
global _session_id
host = headphones.CONFIG.TRANSMISSION_HOST
username = headphones.CONFIG.TRANSMISSION_USERNAME
password = headphones.CONFIG.TRANSMISSION_PASSWORD
@@ -148,43 +149,34 @@ def torrentAction(method, arguments):
parts[2] += "/transmission/rpc"
host = urlparse.urlunparse(parts)
# Retrieve session id
auth = (username, password) if username and password else None
response = request.request_response(host, auth=auth,
whitelist_status_code=[401, 409])
if response is None:
logger.error("Error gettings Transmission session ID")
return
# Parse response
if response.status_code == 401:
if auth:
logger.error("Username and/or password not accepted by " \
"Transmission")
else:
logger.error("Transmission authorization required")
return
elif response.status_code == 409:
session_id = response.headers['x-transmission-session-id']
if not session_id:
logger.error("Expected a Session ID from Transmission")
return
# Prepare next request
headers = {'x-transmission-session-id': session_id}
data = {'method': method, 'arguments': arguments}
data_json = json.dumps(data)
auth = (username, password) if username and password else None
for retry in range(2):
if _session_id is not None:
headers = {'x-transmission-session-id': _session_id}
response = request.request_response(host, method="POST",
data=data_json, headers=headers, auth=auth,
whitelist_status_code=[200, 401, 409])
else:
response = request.request_response(host, auth=auth,
whitelist_status_code=[401, 409])
if response.status_code == 401:
if auth:
logger.error("Username and/or password not accepted by " \
"Transmission")
else:
logger.error("Transmission authorization required")
return
elif response.status_code == 409:
_session_id = response.headers['x-transmission-session-id']
if _session_id is None:
logger.error("Expected a Session ID from Transmission, got None")
return
# retry request with new session id
logger.debug("Retrying Transmission request with new session id")
continue
response = request.request_json(host, method="POST", data=json.dumps(data),
headers=headers, auth=auth)
print response
if not response:
logger.error("Error sending torrent to Transmission")
return
return response
resp_json = response.json()
print resp_json
return resp_json

View File

@@ -0,0 +1,113 @@
import sys
if sys.version_info < (2, 7):
import unittest2 as unittest
from unittest2 import TestCase as TC
else:
import unittest
from unittest import TestCase as TC
skip = unittest.skip
_dummy = False
# less than 2.6 ...
if sys.version_info[0] == 2 and sys.version_info[1] <= 6:
_dummy = True
def _d(f):
def decorate(self, *args, **kw):
if not _dummy:
return f(self, *args, **kw)
return self.assertTrue(True)
return decorate
class TestCase(TC):
"""
Wrapper for python 2.6 stubs
"""
def assertIsInstance(self, obj, cls, msg=None):
if not _dummy:
return super(TestCase, self).assertIsInstance(obj, cls, msg)
tst = isinstance(obj, cls)
return self.assertTrue(tst, msg)
@_d
def assertNotIsInstance(self, *args, **kw):
return super(TestCase, self).assertNotIsInstance(*args, **kw)
@_d
def assertIn(self, *args, **kw):
return super(TestCase, self).assertIn(*args, **kw)
@_d
def assertRegexpMatches(self, *args, **kw):
return super(TestCase, self).assertRegexpMatches(*args, **kw)
# -----------------------------------------------------------
# NOT DUMMY ASSERTIONS
# -----------------------------------------------------------
def assertIsNone(self, val, msg=None):
if not _dummy:
return super(TestCase, self).assertIsNone(val, msg)
tst = val is None
return super(TestCase, self).assertTrue(tst, msg)
def assertIsNotNone(self, val, msg=None):
if not _dummy:
return super(TestCase, self).assertIsNotNone(val, msg)
tst = val is not None
return super(TestCase, self).assertTrue(tst, msg)
def assertRaises(self, exc, msg=None):
if not _dummy:
return super(TestCase, self).assertRaises(exc, msg)
return TestCase._TestCaseRaiseStub(self, exc, msg=msg)
def assertRaisesRegexp(self, exc, regex, msg=None):
if not _dummy:
return super(TestCase, self).assertRaises(exc, msg)
return TestCase._TestCaseRaiseStub(self, exc, regex=regex, msg=msg)
class _TestCaseRaiseStub:
""" Internal stuff for stubbing `assertRaises*` """
def __init__(self, test_case, exc, regex=None, msg=None):
self.exc = exc
self.test_case = test_case
self.regex = regex
self.msg = msg
def __enter__(self):
return self
def __exit__(self, tp, value, traceback):
tst = tp is self.exc
self.test_case.assertTrue(tst, msg=self.msg)
self.exception = value
# TODO: implement self.regex checking
# True indicates, that exception is handled
return True
def TestArgs(*parameters):
def tuplify(x):
if not isinstance(x, tuple):
return (x,)
return x
def decorator(method, parameters=parameters):
for parameter in (tuplify(x) for x in parameters):
def method_for_parameter(self, method=method, parameter=parameter):
method(self, *parameter)
args_for_parameter = ",".join(repr(v) for v in parameter)
name_for_parameter = method.__name__ + "(" + args_for_parameter + ")"
frame = sys._getframe(1) # pylint: disable-msg=W0212
frame.f_locals[name_for_parameter] = method_for_parameter
frame.f_locals[name_for_parameter].__doc__ = method.__doc__ + '(' + args_for_parameter + ')'
method_for_parameter.__name__ = name_for_parameter + '(' + args_for_parameter + ')'
return None
return decorator

View File

@@ -343,7 +343,7 @@ class WebInterface(object):
for dir in dirs:
artistfolder = os.path.join(dir, folder)
if not os.path.isdir(artistfolder):
if not os.path.isdir(artistfolder.encode(headphones.SYS_ENCODING)):
logger.debug("Cannot find directory: " + artistfolder)
continue
threading.Thread(target=librarysync.libraryScan,
@@ -1367,8 +1367,16 @@ class WebInterface(object):
"idtag": checked(headphones.CONFIG.IDTAG)
}
for k, v in config.iteritems():
if isinstance(v, headphones.config.path):
# need to apply SoftChroot to paths:
nv = headphones.SOFT_CHROOT.apply(v)
if v != nv:
config[k] = headphones.config.path(nv)
# Need to convert EXTRAS to a dictionary we can pass to the config:
# it'll come in as a string like 2,5,6,8
extra_munges = {
"dj-mix": "dj_mix",
"mixtape/street": "mixtape_street"
@@ -1435,6 +1443,17 @@ class WebInterface(object):
kwargs[plain_config] = kwargs[use_config]
del kwargs[use_config]
for k, v in kwargs.iteritems():
# TODO : HUGE crutch. It is all because there is no way to deal with options...
_conf = headphones.CONFIG._define(k)
conftype = _conf[1]
#print '===>', conftype
if conftype is headphones.config.path:
nv = headphones.SOFT_CHROOT.revoke(v)
if nv != v:
kwargs[k] = nv
# Check if encoderoutputformat is set multiple times
if len(kwargs['encoderoutputformat'][-1]) > 1:
kwargs['encoderoutputformat'] = kwargs['encoderoutputformat'][-1]

View File

@@ -164,7 +164,7 @@ start_headphones () {
handle_updates
if ! is_running; then
log_daemon_msg "Starting $DESC"
start-stop-daemon -o -d "$APP_PATH" -c "$RUN_AS" --start "$EXTRA_SSD"_OPTS --pidfile "$PID_FILE" --exec "$DAEMON" -- "$DAEMON_OPTS"
start-stop-daemon -o -d $APP_PATH -c $RUN_AS --start $EXTRA_SSD_OPTS --pidfile $PID_FILE --exec $DAEMON -- $DAEMON_OPTS
check_retval
else
log_success_msg "$DESC: already running (pid $PID)"

View File

@@ -1,6 +1,6 @@
"""Utilities for writing code that runs on Python 2 and 3"""
# Copyright (c) 2010-2014 Benjamin Peterson
# Copyright (c) 2010-2015 Benjamin Peterson
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
@@ -20,17 +20,22 @@
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
from __future__ import absolute_import
import functools
import itertools
import operator
import sys
import types
__author__ = "Benjamin Peterson <benjamin@python.org>"
__version__ = "1.6.1"
__version__ = "1.10.0"
# Useful for very coarse version differentiation.
PY2 = sys.version_info[0] == 2
PY3 = sys.version_info[0] == 3
PY34 = sys.version_info[0:2] >= (3, 4)
if PY3:
string_types = str,
@@ -53,6 +58,7 @@ else:
else:
# It's possible to have sizeof(long) != sizeof(Py_ssize_t).
class X(object):
def __len__(self):
return 1 << 31
try:
@@ -83,14 +89,14 @@ class _LazyDescr(object):
self.name = name
def __get__(self, obj, tp):
result = self._resolve()
setattr(obj, self.name, result) # Invokes __set__.
try:
result = self._resolve()
except ImportError:
# See the nice big comment in MovedModule.__getattr__.
raise AttributeError("%s could not be imported " % self.name)
setattr(obj, self.name, result) # Invokes __set__.
# This is a bit ugly, but it avoids running this again.
delattr(obj.__class__, self.name)
# This is a bit ugly, but it avoids running this again by
# removing this descriptor.
delattr(obj.__class__, self.name)
except AttributeError:
pass
return result
@@ -109,22 +115,7 @@ class MovedModule(_LazyDescr):
return _import_module(self.mod)
def __getattr__(self, attr):
# It turns out many Python frameworks like to traverse sys.modules and
# try to load various attributes. This causes problems if this is a
# platform-specific module on the wrong platform, like _winreg on
# Unixes. Therefore, we silently pretend unimportable modules do not
# have any attributes. See issues #51, #53, #56, and #63 for the full
# tales of woe.
#
# First, if possible, avoid loading the module just to look at __file__,
# __name__, or __path__.
if (attr in ("__file__", "__name__", "__path__") and
self.mod not in sys.modules):
raise AttributeError(attr)
try:
_module = self._resolve()
except ImportError:
raise AttributeError(attr)
_module = self._resolve()
value = getattr(_module, attr)
setattr(self, attr, value)
return value
@@ -170,9 +161,75 @@ class MovedAttribute(_LazyDescr):
return getattr(module, self.attr)
class _SixMetaPathImporter(object):
"""
A meta path importer to import six.moves and its submodules.
This class implements a PEP302 finder and loader. It should be compatible
with Python 2.5 and all existing versions of Python3
"""
def __init__(self, six_module_name):
self.name = six_module_name
self.known_modules = {}
def _add_module(self, mod, *fullnames):
for fullname in fullnames:
self.known_modules[self.name + "." + fullname] = mod
def _get_module(self, fullname):
return self.known_modules[self.name + "." + fullname]
def find_module(self, fullname, path=None):
if fullname in self.known_modules:
return self
return None
def __get_module(self, fullname):
try:
return self.known_modules[fullname]
except KeyError:
raise ImportError("This loader does not know module " + fullname)
def load_module(self, fullname):
try:
# in case of a reload
return sys.modules[fullname]
except KeyError:
pass
mod = self.__get_module(fullname)
if isinstance(mod, MovedModule):
mod = mod._resolve()
else:
mod.__loader__ = self
sys.modules[fullname] = mod
return mod
def is_package(self, fullname):
"""
Return true, if the named module is a package.
We need this method to get correct spec objects with
Python 3.4 (see PEP451)
"""
return hasattr(self.__get_module(fullname), "__path__")
def get_code(self, fullname):
"""Return None
Required, if is_package is implemented"""
self.__get_module(fullname) # eventually raises ImportError
return None
get_source = get_code # same as get_code
_importer = _SixMetaPathImporter(__name__)
class _MovedItems(_LazyModule):
"""Lazy loading of moved objects"""
__path__ = [] # mark as package
_moved_attributes = [
@@ -180,26 +237,33 @@ _moved_attributes = [
MovedAttribute("filter", "itertools", "builtins", "ifilter", "filter"),
MovedAttribute("filterfalse", "itertools", "itertools", "ifilterfalse", "filterfalse"),
MovedAttribute("input", "__builtin__", "builtins", "raw_input", "input"),
MovedAttribute("intern", "__builtin__", "sys"),
MovedAttribute("map", "itertools", "builtins", "imap", "map"),
MovedAttribute("getcwd", "os", "os", "getcwdu", "getcwd"),
MovedAttribute("getcwdb", "os", "os", "getcwd", "getcwdb"),
MovedAttribute("range", "__builtin__", "builtins", "xrange", "range"),
MovedAttribute("reload_module", "__builtin__", "imp", "reload"),
MovedAttribute("reload_module", "__builtin__", "importlib" if PY34 else "imp", "reload"),
MovedAttribute("reduce", "__builtin__", "functools"),
MovedAttribute("shlex_quote", "pipes", "shlex", "quote"),
MovedAttribute("StringIO", "StringIO", "io"),
MovedAttribute("UserDict", "UserDict", "collections"),
MovedAttribute("UserList", "UserList", "collections"),
MovedAttribute("UserString", "UserString", "collections"),
MovedAttribute("xrange", "__builtin__", "builtins", "xrange", "range"),
MovedAttribute("zip", "itertools", "builtins", "izip", "zip"),
MovedAttribute("zip_longest", "itertools", "itertools", "izip_longest", "zip_longest"),
MovedModule("builtins", "__builtin__"),
MovedModule("configparser", "ConfigParser"),
MovedModule("copyreg", "copy_reg"),
MovedModule("dbm_gnu", "gdbm", "dbm.gnu"),
MovedModule("_dummy_thread", "dummy_thread", "_dummy_thread"),
MovedModule("http_cookiejar", "cookielib", "http.cookiejar"),
MovedModule("http_cookies", "Cookie", "http.cookies"),
MovedModule("html_entities", "htmlentitydefs", "html.entities"),
MovedModule("html_parser", "HTMLParser", "html.parser"),
MovedModule("http_client", "httplib", "http.client"),
MovedModule("email_mime_multipart", "email.MIMEMultipart", "email.mime.multipart"),
MovedModule("email_mime_nonmultipart", "email.MIMENonMultipart", "email.mime.nonmultipart"),
MovedModule("email_mime_text", "email.MIMEText", "email.mime.text"),
MovedModule("email_mime_base", "email.MIMEBase", "email.mime.base"),
MovedModule("BaseHTTPServer", "BaseHTTPServer", "http.server"),
@@ -233,21 +297,28 @@ _moved_attributes = [
MovedModule("urllib", __name__ + ".moves.urllib", __name__ + ".moves.urllib"),
MovedModule("urllib_robotparser", "robotparser", "urllib.robotparser"),
MovedModule("xmlrpc_client", "xmlrpclib", "xmlrpc.client"),
MovedModule("xmlrpc_server", "xmlrpclib", "xmlrpc.server"),
MovedModule("winreg", "_winreg"),
MovedModule("xmlrpc_server", "SimpleXMLRPCServer", "xmlrpc.server"),
]
# Add windows specific modules.
if sys.platform == "win32":
_moved_attributes += [
MovedModule("winreg", "_winreg"),
]
for attr in _moved_attributes:
setattr(_MovedItems, attr.name, attr)
if isinstance(attr, MovedModule):
sys.modules[__name__ + ".moves." + attr.name] = attr
_importer._add_module(attr, "moves." + attr.name)
del attr
_MovedItems._moved_attributes = _moved_attributes
moves = sys.modules[__name__ + ".moves"] = _MovedItems(__name__ + ".moves")
moves = _MovedItems(__name__ + ".moves")
_importer._add_module(moves, "moves")
class Module_six_moves_urllib_parse(_LazyModule):
"""Lazy loading of moved objects in six.moves.urllib_parse"""
@@ -268,6 +339,13 @@ _urllib_parse_moved_attributes = [
MovedAttribute("unquote_plus", "urllib", "urllib.parse"),
MovedAttribute("urlencode", "urllib", "urllib.parse"),
MovedAttribute("splitquery", "urllib", "urllib.parse"),
MovedAttribute("splittag", "urllib", "urllib.parse"),
MovedAttribute("splituser", "urllib", "urllib.parse"),
MovedAttribute("uses_fragment", "urlparse", "urllib.parse"),
MovedAttribute("uses_netloc", "urlparse", "urllib.parse"),
MovedAttribute("uses_params", "urlparse", "urllib.parse"),
MovedAttribute("uses_query", "urlparse", "urllib.parse"),
MovedAttribute("uses_relative", "urlparse", "urllib.parse"),
]
for attr in _urllib_parse_moved_attributes:
setattr(Module_six_moves_urllib_parse, attr.name, attr)
@@ -275,10 +353,12 @@ del attr
Module_six_moves_urllib_parse._moved_attributes = _urllib_parse_moved_attributes
sys.modules[__name__ + ".moves.urllib_parse"] = sys.modules[__name__ + ".moves.urllib.parse"] = Module_six_moves_urllib_parse(__name__ + ".moves.urllib_parse")
_importer._add_module(Module_six_moves_urllib_parse(__name__ + ".moves.urllib_parse"),
"moves.urllib_parse", "moves.urllib.parse")
class Module_six_moves_urllib_error(_LazyModule):
"""Lazy loading of moved objects in six.moves.urllib_error"""
@@ -293,10 +373,12 @@ del attr
Module_six_moves_urllib_error._moved_attributes = _urllib_error_moved_attributes
sys.modules[__name__ + ".moves.urllib_error"] = sys.modules[__name__ + ".moves.urllib.error"] = Module_six_moves_urllib_error(__name__ + ".moves.urllib.error")
_importer._add_module(Module_six_moves_urllib_error(__name__ + ".moves.urllib.error"),
"moves.urllib_error", "moves.urllib.error")
class Module_six_moves_urllib_request(_LazyModule):
"""Lazy loading of moved objects in six.moves.urllib_request"""
@@ -341,10 +423,12 @@ del attr
Module_six_moves_urllib_request._moved_attributes = _urllib_request_moved_attributes
sys.modules[__name__ + ".moves.urllib_request"] = sys.modules[__name__ + ".moves.urllib.request"] = Module_six_moves_urllib_request(__name__ + ".moves.urllib.request")
_importer._add_module(Module_six_moves_urllib_request(__name__ + ".moves.urllib.request"),
"moves.urllib_request", "moves.urllib.request")
class Module_six_moves_urllib_response(_LazyModule):
"""Lazy loading of moved objects in six.moves.urllib_response"""
@@ -360,10 +444,12 @@ del attr
Module_six_moves_urllib_response._moved_attributes = _urllib_response_moved_attributes
sys.modules[__name__ + ".moves.urllib_response"] = sys.modules[__name__ + ".moves.urllib.response"] = Module_six_moves_urllib_response(__name__ + ".moves.urllib.response")
_importer._add_module(Module_six_moves_urllib_response(__name__ + ".moves.urllib.response"),
"moves.urllib_response", "moves.urllib.response")
class Module_six_moves_urllib_robotparser(_LazyModule):
"""Lazy loading of moved objects in six.moves.urllib_robotparser"""
@@ -376,22 +462,25 @@ del attr
Module_six_moves_urllib_robotparser._moved_attributes = _urllib_robotparser_moved_attributes
sys.modules[__name__ + ".moves.urllib_robotparser"] = sys.modules[__name__ + ".moves.urllib.robotparser"] = Module_six_moves_urllib_robotparser(__name__ + ".moves.urllib.robotparser")
_importer._add_module(Module_six_moves_urllib_robotparser(__name__ + ".moves.urllib.robotparser"),
"moves.urllib_robotparser", "moves.urllib.robotparser")
class Module_six_moves_urllib(types.ModuleType):
"""Create a six.moves.urllib namespace that resembles the Python 3 namespace"""
parse = sys.modules[__name__ + ".moves.urllib_parse"]
error = sys.modules[__name__ + ".moves.urllib_error"]
request = sys.modules[__name__ + ".moves.urllib_request"]
response = sys.modules[__name__ + ".moves.urllib_response"]
robotparser = sys.modules[__name__ + ".moves.urllib_robotparser"]
__path__ = [] # mark as package
parse = _importer._get_module("moves.urllib_parse")
error = _importer._get_module("moves.urllib_error")
request = _importer._get_module("moves.urllib_request")
response = _importer._get_module("moves.urllib_response")
robotparser = _importer._get_module("moves.urllib_robotparser")
def __dir__(self):
return ['parse', 'error', 'request', 'response', 'robotparser']
sys.modules[__name__ + ".moves.urllib"] = Module_six_moves_urllib(__name__ + ".moves.urllib")
_importer._add_module(Module_six_moves_urllib(__name__ + ".moves.urllib"),
"moves.urllib")
def add_move(move):
@@ -418,11 +507,6 @@ if PY3:
_func_code = "__code__"
_func_defaults = "__defaults__"
_func_globals = "__globals__"
_iterkeys = "keys"
_itervalues = "values"
_iteritems = "items"
_iterlists = "lists"
else:
_meth_func = "im_func"
_meth_self = "im_self"
@@ -432,11 +516,6 @@ else:
_func_defaults = "func_defaults"
_func_globals = "func_globals"
_iterkeys = "iterkeys"
_itervalues = "itervalues"
_iteritems = "iteritems"
_iterlists = "iterlists"
try:
advance_iterator = next
@@ -459,6 +538,9 @@ if PY3:
create_bound_method = types.MethodType
def create_unbound_method(func, cls):
return func
Iterator = object
else:
def get_unbound_function(unbound):
@@ -467,6 +549,9 @@ else:
def create_bound_method(func, obj):
return types.MethodType(func, obj, obj.__class__)
def create_unbound_method(func, cls):
return types.MethodType(func, None, cls)
class Iterator(object):
def next(self):
@@ -485,66 +570,117 @@ get_function_defaults = operator.attrgetter(_func_defaults)
get_function_globals = operator.attrgetter(_func_globals)
def iterkeys(d, **kw):
"""Return an iterator over the keys of a dictionary."""
return iter(getattr(d, _iterkeys)(**kw))
if PY3:
def iterkeys(d, **kw):
return iter(d.keys(**kw))
def itervalues(d, **kw):
"""Return an iterator over the values of a dictionary."""
return iter(getattr(d, _itervalues)(**kw))
def itervalues(d, **kw):
return iter(d.values(**kw))
def iteritems(d, **kw):
"""Return an iterator over the (key, value) pairs of a dictionary."""
return iter(getattr(d, _iteritems)(**kw))
def iteritems(d, **kw):
return iter(d.items(**kw))
def iterlists(d, **kw):
"""Return an iterator over the (key, [values]) pairs of a dictionary."""
return iter(getattr(d, _iterlists)(**kw))
def iterlists(d, **kw):
return iter(d.lists(**kw))
viewkeys = operator.methodcaller("keys")
viewvalues = operator.methodcaller("values")
viewitems = operator.methodcaller("items")
else:
def iterkeys(d, **kw):
return d.iterkeys(**kw)
def itervalues(d, **kw):
return d.itervalues(**kw)
def iteritems(d, **kw):
return d.iteritems(**kw)
def iterlists(d, **kw):
return d.iterlists(**kw)
viewkeys = operator.methodcaller("viewkeys")
viewvalues = operator.methodcaller("viewvalues")
viewitems = operator.methodcaller("viewitems")
_add_doc(iterkeys, "Return an iterator over the keys of a dictionary.")
_add_doc(itervalues, "Return an iterator over the values of a dictionary.")
_add_doc(iteritems,
"Return an iterator over the (key, value) pairs of a dictionary.")
_add_doc(iterlists,
"Return an iterator over the (key, [values]) pairs of a dictionary.")
if PY3:
def b(s):
return s.encode("latin-1")
def u(s):
return s
unichr = chr
if sys.version_info[1] <= 1:
def int2byte(i):
return bytes((i,))
else:
# This is about 2x faster than the implementation above on 3.2+
int2byte = operator.methodcaller("to_bytes", 1, "big")
import struct
int2byte = struct.Struct(">B").pack
del struct
byte2int = operator.itemgetter(0)
indexbytes = operator.getitem
iterbytes = iter
import io
StringIO = io.StringIO
BytesIO = io.BytesIO
_assertCountEqual = "assertCountEqual"
if sys.version_info[1] <= 1:
_assertRaisesRegex = "assertRaisesRegexp"
_assertRegex = "assertRegexpMatches"
else:
_assertRaisesRegex = "assertRaisesRegex"
_assertRegex = "assertRegex"
else:
def b(s):
return s
# Workaround for standalone backslash
def u(s):
return unicode(s.replace(r'\\', r'\\\\'), "unicode_escape")
unichr = unichr
int2byte = chr
def byte2int(bs):
return ord(bs[0])
def indexbytes(buf, i):
return ord(buf[i])
def iterbytes(buf):
return (ord(byte) for byte in buf)
iterbytes = functools.partial(itertools.imap, ord)
import StringIO
StringIO = BytesIO = StringIO.StringIO
_assertCountEqual = "assertItemsEqual"
_assertRaisesRegex = "assertRaisesRegexp"
_assertRegex = "assertRegexpMatches"
_add_doc(b, """Byte literal""")
_add_doc(u, """Text literal""")
def assertCountEqual(self, *args, **kwargs):
return getattr(self, _assertCountEqual)(*args, **kwargs)
def assertRaisesRegex(self, *args, **kwargs):
return getattr(self, _assertRaisesRegex)(*args, **kwargs)
def assertRegex(self, *args, **kwargs):
return getattr(self, _assertRegex)(*args, **kwargs)
if PY3:
exec_ = getattr(moves.builtins, "exec")
def reraise(tp, value, tb=None):
if value is None:
value = tp()
if value.__traceback__ is not tb:
raise value.with_traceback(tb)
raise value
@@ -562,12 +698,26 @@ else:
_locs_ = _globs_
exec("""exec _code_ in _globs_, _locs_""")
exec_("""def reraise(tp, value, tb=None):
raise tp, value, tb
""")
if sys.version_info[:2] == (3, 2):
exec_("""def raise_from(value, from_value):
if from_value is None:
raise value
raise value from from_value
""")
elif sys.version_info[:2] > (3, 2):
exec_("""def raise_from(value, from_value):
raise value from from_value
""")
else:
def raise_from(value, from_value):
raise value
print_ = getattr(moves.builtins, "print", None)
if print_ is None:
def print_(*args, **kwargs):
@@ -575,13 +725,14 @@ if print_ is None:
fp = kwargs.pop("file", sys.stdout)
if fp is None:
return
def write(data):
if not isinstance(data, basestring):
data = str(data)
# If the file has an encoding, encode unicode with it.
if (isinstance(fp, file) and
isinstance(data, unicode) and
fp.encoding is not None):
isinstance(data, unicode) and
fp.encoding is not None):
errors = getattr(fp, "errors", None)
if errors is None:
errors = "strict"
@@ -622,25 +773,96 @@ if print_ is None:
write(sep)
write(arg)
write(end)
if sys.version_info[:2] < (3, 3):
_print = print_
def print_(*args, **kwargs):
fp = kwargs.get("file", sys.stdout)
flush = kwargs.pop("flush", False)
_print(*args, **kwargs)
if flush and fp is not None:
fp.flush()
_add_doc(reraise, """Reraise an exception.""")
if sys.version_info[0:2] < (3, 4):
def wraps(wrapped, assigned=functools.WRAPPER_ASSIGNMENTS,
updated=functools.WRAPPER_UPDATES):
def wrapper(f):
f = functools.wraps(wrapped, assigned, updated)(f)
f.__wrapped__ = wrapped
return f
return wrapper
else:
wraps = functools.wraps
def with_metaclass(meta, *bases):
"""Create a base class with a metaclass."""
return meta("NewBase", bases, {})
# This requires a bit of explanation: the basic idea is to make a dummy
# metaclass for one level of class instantiation that replaces itself with
# the actual metaclass.
class metaclass(meta):
def __new__(cls, name, this_bases, d):
return meta(name, bases, d)
return type.__new__(metaclass, 'temporary_class', (), {})
def add_metaclass(metaclass):
"""Class decorator for creating a class with a metaclass."""
def wrapper(cls):
orig_vars = cls.__dict__.copy()
orig_vars.pop('__dict__', None)
orig_vars.pop('__weakref__', None)
slots = orig_vars.get('__slots__')
if slots is not None:
if isinstance(slots, str):
slots = [slots]
for slots_var in slots:
orig_vars.pop(slots_var)
orig_vars.pop('__dict__', None)
orig_vars.pop('__weakref__', None)
return metaclass(cls.__name__, cls.__bases__, orig_vars)
return wrapper
def python_2_unicode_compatible(klass):
"""
A decorator that defines __unicode__ and __str__ methods under Python 2.
Under Python 3 it does nothing.
To support Python 2 and 3 with a single code base, define a __str__ method
returning text and apply this decorator to the class.
"""
if PY2:
if '__str__' not in klass.__dict__:
raise ValueError("@python_2_unicode_compatible cannot be applied "
"to %s because it doesn't define __str__()." %
klass.__name__)
klass.__unicode__ = klass.__str__
klass.__str__ = lambda self: self.__unicode__().encode('utf-8')
return klass
# Complete the moves implementation.
# This code is at the end of this module to speed up module loading.
# Turn this module into a package.
__path__ = [] # required for PEP 302 and PEP 451
__package__ = __name__ # see PEP 366 @ReservedAssignment
if globals().get("__spec__") is not None:
__spec__.submodule_search_locations = [] # PEP 451 @UndefinedVariable
# Remove other six meta path importers, since they cause problems. This can
# happen if six is removed from sys.modules and then reloaded. (Setuptools does
# this for some reason.)
if sys.meta_path:
for i, importer in enumerate(sys.meta_path):
# Here's some real nastiness: Another "instance" of the six module might
# be floating around. Therefore, we can't use isinstance() to check for
# the six meta path importer, since the other six instance will have
# inserted an importer with different class.
if (type(importer).__name__ == "_SixMetaPathImporter" and
importer.name == __name__):
del sys.meta_path[i]
break
del i, importer
# Finally, add the importer to the meta path import hook.
sys.meta_path.append(_importer)

113
lib/unittestcompat.py Normal file
View File

@@ -0,0 +1,113 @@
import sys
if sys.version_info < (2, 7):
import unittest2 as unittest
from unittest2 import TestCase as TC
else:
import unittest
from unittest import TestCase as TC
skip = unittest.skip
_dummy = False
# less than 2.6 ...
if sys.version_info[0] == 2 and sys.version_info[1] <= 6:
_dummy = True
def _d(f):
def decorate(self, *args, **kw):
if not _dummy:
return f(self, *args, **kw)
return self.assertTrue(True)
return decorate
class TestCase(TC):
"""
Wrapper for python 2.6 stubs
"""
def assertIsInstance(self, obj, cls, msg=None):
if not _dummy:
return super(TestCase, self).assertIsInstance(obj, cls, msg)
tst = isinstance(obj, cls)
return self.assertTrue(tst, msg)
@_d
def assertNotIsInstance(self, *args, **kw):
return super(TestCase, self).assertNotIsInstance(*args, **kw)
@_d
def assertIn(self, *args, **kw):
return super(TestCase, self).assertIn(*args, **kw)
@_d
def assertRegexpMatches(self, *args, **kw):
return super(TestCase, self).assertRegexpMatches(*args, **kw)
# -----------------------------------------------------------
# NOT DUMMY ASSERTIONS
# -----------------------------------------------------------
def assertIsNone(self, val, msg=None):
if not _dummy:
return super(TestCase, self).assertIsNone(val, msg)
tst = val is None
return super(TestCase, self).assertTrue(tst, msg)
def assertIsNotNone(self, val, msg=None):
if not _dummy:
return super(TestCase, self).assertIsNotNone(val, msg)
tst = val is not None
return super(TestCase, self).assertTrue(tst, msg)
def assertRaises(self, exc, msg=None):
if not _dummy:
return super(TestCase, self).assertRaises(exc, msg)
return TestCase._TestCaseRaiseStub(self, exc, msg=msg)
def assertRaisesRegexp(self, exc, regex, msg=None):
if not _dummy:
return super(TestCase, self).assertRaises(exc, msg)
return TestCase._TestCaseRaiseStub(self, exc, regex=regex, msg=msg)
class _TestCaseRaiseStub:
""" Internal stuff for stubbing `assertRaises*` """
def __init__(self, test_case, exc, regex=None, msg=None):
self.exc = exc
self.test_case = test_case
self.regex = regex
self.msg = msg
def __enter__(self):
return self
def __exit__(self, tp, value, traceback):
tst = tp is self.exc
self.test_case.assertTrue(tst, msg=self.msg)
self.exception = value
# TODO: implement self.regex checking
# True indicates, that exception is handled
return True
def TestArgs(*parameters):
def tuplify(x):
if not isinstance(x, tuple):
return (x,)
return x
def decorator(method, parameters=parameters):
for parameter in (tuplify(x) for x in parameters):
def method_for_parameter(self, method=method, parameter=parameter):
method(self, *parameter)
args_for_parameter = ",".join(repr(v) for v in parameter)
name_for_parameter = method.__name__ + "(" + args_for_parameter + ")"
frame = sys._getframe(1) # pylint: disable-msg=W0212
frame.f_locals[name_for_parameter] = method_for_parameter
frame.f_locals[name_for_parameter].__doc__ = method.__doc__ + '(' + args_for_parameter + ')'
method_for_parameter.__name__ = name_for_parameter + '(' + args_for_parameter + ')'
return None
return decorator

12
setup.cfg Normal file
View File

@@ -0,0 +1,12 @@
[nosetests]
verbosity=2
tests=headphones
#rednose=1
#exclude-dir=lib
with-coverage=1
cover-branches=1
cover-html=1
cover-html-dir=cover-html
cover-package=headphones