added sabnzbd & nzbmatrix support

This commit is contained in:
Remy
2011-05-23 20:58:42 -07:00
parent 77fdbe4cee
commit 742176cb90
27 changed files with 6068 additions and 0 deletions

14
.gitignore vendored Normal file
View File

@@ -0,0 +1,14 @@
# Compiled source #
###################
*.pyc
# Logs & Databases #
####################
*.db
*.log
# OS generated files #
######################
.DS_Store?
Thumbs.db

3
apscheduler/__init__.py Normal file
View File

@@ -0,0 +1,3 @@
version_info = (2, 0, 0, 'rc', 2)
version = '.'.join(str(n) for n in version_info[:3])
release = version + ''.join(str(n) for n in version_info[3:])

64
apscheduler/events.py Normal file
View File

@@ -0,0 +1,64 @@
__all__ = ('EVENT_SCHEDULER_START', 'EVENT_SCHEDULER_SHUTDOWN',
'EVENT_JOBSTORE_ADDED', 'EVENT_JOBSTORE_REMOVED',
'EVENT_JOBSTORE_JOB_ADDED', 'EVENT_JOBSTORE_JOB_REMOVED',
'EVENT_JOB_EXECUTED', 'EVENT_JOB_ERROR', 'EVENT_JOB_MISSED',
'EVENT_ALL', 'SchedulerEvent', 'JobStoreEvent', 'JobEvent')
EVENT_SCHEDULER_START = 1 # The scheduler was started
EVENT_SCHEDULER_SHUTDOWN = 2 # The scheduler was shut down
EVENT_JOBSTORE_ADDED = 4 # A job store was added to the scheduler
EVENT_JOBSTORE_REMOVED = 8 # A job store was removed from the scheduler
EVENT_JOBSTORE_JOB_ADDED = 16 # A job was added to a job store
EVENT_JOBSTORE_JOB_REMOVED = 32 # A job was removed from a job store
EVENT_JOB_EXECUTED = 64 # A job was executed successfully
EVENT_JOB_ERROR = 128 # A job raised an exception during execution
EVENT_JOB_MISSED = 256 # A job's execution was missed
EVENT_ALL = (EVENT_SCHEDULER_START | EVENT_SCHEDULER_SHUTDOWN |
EVENT_JOBSTORE_ADDED | EVENT_JOBSTORE_REMOVED |
EVENT_JOBSTORE_JOB_ADDED | EVENT_JOBSTORE_JOB_REMOVED |
EVENT_JOB_EXECUTED | EVENT_JOB_ERROR | EVENT_JOB_MISSED)
class SchedulerEvent(object):
"""
An event that concerns the scheduler itself.
:var code: the type code of this event
"""
def __init__(self, code):
self.code = code
class JobStoreEvent(SchedulerEvent):
"""
An event that concerns job stores.
:var alias: the alias of the job store involved
:var job: the new job if a job was added
"""
def __init__(self, code, alias, job=None):
SchedulerEvent.__init__(self, code)
self.alias = alias
if job:
self.job = job
class JobEvent(SchedulerEvent):
"""
An event that concerns the execution of individual jobs.
:var job: the job instance in question
:var scheduled_run_time: the time when the job was scheduled to be run
:var retval: the return value of the successfully executed job
:var exception: the exception raised by the job
:var traceback: the traceback object associated with the exception
"""
def __init__(self, code, job, scheduled_run_time, retval=None,
exception=None, traceback=None):
SchedulerEvent.__init__(self, code)
self.job = job
self.scheduled_run_time = scheduled_run_time
self.retval = retval
self.exception = exception
self.traceback = traceback

134
apscheduler/job.py Normal file
View File

@@ -0,0 +1,134 @@
"""
Jobs represent scheduled tasks.
"""
from threading import Lock
from datetime import timedelta
from apscheduler.util import to_unicode, ref_to_obj, get_callable_name,\
obj_to_ref
class MaxInstancesReachedError(Exception):
pass
class Job(object):
"""
Encapsulates the actual Job along with its metadata. Job instances
are created by the scheduler when adding jobs, and it should not be
directly instantiated.
:param trigger: trigger that determines the execution times
:param func: callable to call when the trigger is triggered
:param args: list of positional arguments to call func with
:param kwargs: dict of keyword arguments to call func with
:param name: name of the job (optional)
:param misfire_grace_time: seconds after the designated run time that
the job is still allowed to be run
:param coalesce: run once instead of many times if the scheduler determines
that the job should be run more than once in succession
:param max_runs: maximum number of times this job is allowed to be
triggered
:param max_instances: maximum number of concurrently running
instances allowed for this job
"""
id = None
next_run_time = None
def __init__(self, trigger, func, args, kwargs, misfire_grace_time,
coalesce, name=None, max_runs=None, max_instances=1):
if not trigger:
raise ValueError('The trigger must not be None')
if not hasattr(func, '__call__'):
raise TypeError('func must be callable')
if not hasattr(args, '__getitem__'):
raise TypeError('args must be a list-like object')
if not hasattr(kwargs, '__getitem__'):
raise TypeError('kwargs must be a dict-like object')
if misfire_grace_time <= 0:
raise ValueError('misfire_grace_time must be a positive value')
if max_runs is not None and max_runs <= 0:
raise ValueError('max_runs must be a positive value')
if max_instances <= 0:
raise ValueError('max_instances must be a positive value')
self._lock = Lock()
self.trigger = trigger
self.func = func
self.args = args
self.kwargs = kwargs
self.name = to_unicode(name or get_callable_name(func))
self.misfire_grace_time = misfire_grace_time
self.coalesce = coalesce
self.max_runs = max_runs
self.max_instances = max_instances
self.runs = 0
self.instances = 0
def compute_next_run_time(self, now):
if self.runs == self.max_runs:
self.next_run_time = None
else:
self.next_run_time = self.trigger.get_next_fire_time(now)
return self.next_run_time
def get_run_times(self, now):
"""
Computes the scheduled run times between ``next_run_time`` and ``now``.
"""
run_times = []
run_time = self.next_run_time
increment = timedelta(microseconds=1)
while ((not self.max_runs or self.runs < self.max_runs) and
run_time and run_time <= now):
run_times.append(run_time)
run_time = self.trigger.get_next_fire_time(run_time + increment)
return run_times
def add_instance(self):
self._lock.acquire()
try:
if self.instances == self.max_instances:
raise MaxInstancesReachedError
self.instances += 1
finally:
self._lock.release()
def remove_instance(self):
self._lock.acquire()
try:
assert self.instances > 0, 'Already at 0 instances'
self.instances -= 1
finally:
self._lock.release()
def __getstate__(self):
# Prevents the unwanted pickling of transient or unpicklable variables
state = self.__dict__.copy()
state.pop('instances', None)
state.pop('func', None)
state.pop('_lock', None)
state['func_ref'] = obj_to_ref(self.func)
return state
def __setstate__(self, state):
state['instances'] = 0
state['func'] = ref_to_obj(state.pop('func_ref'))
state['_lock'] = Lock()
self.__dict__ = state
def __eq__(self, other):
if isinstance(other, Job):
return self.id is not None and other.id == self.id or self is other
return NotImplemented
def __repr__(self):
return '<Job (name=%s, trigger=%s)>' % (self.name, repr(self.trigger))
def __str__(self):
return '%s (trigger: %s, next run at: %s)' % (self.name,
str(self.trigger), str(self.next_run_time))

View File

View File

@@ -0,0 +1,25 @@
"""
Abstract base class that provides the interface needed by all job stores.
Job store methods are also documented here.
"""
class JobStore(object):
def add_job(self, job):
"""Adds the given job from this store."""
raise NotImplementedError
def update_job(self, job):
"""Persists the running state of the given job."""
raise NotImplementedError
def remove_job(self, job):
"""Removes the given jobs from this store."""
raise NotImplementedError
def load_jobs(self):
"""Loads jobs from this store into memory."""
raise NotImplementedError
def close(self):
"""Frees any resources still bound to this job store."""

View File

@@ -0,0 +1,84 @@
"""
Stores jobs in a MongoDB database.
"""
import logging
from apscheduler.jobstores.base import JobStore
from apscheduler.job import Job
try:
import cPickle as pickle
except ImportError: # pragma: nocover
import pickle
try:
from bson.binary import Binary
from pymongo.connection import Connection
except ImportError: # pragma: nocover
raise ImportError('MongoDBJobStore requires PyMongo installed')
logger = logging.getLogger(__name__)
class MongoDBJobStore(JobStore):
def __init__(self, database='apscheduler', collection='jobs',
connection=None, pickle_protocol=pickle.HIGHEST_PROTOCOL,
**connect_args):
self.jobs = []
self.pickle_protocol = pickle_protocol
if not database:
raise ValueError('The "database" parameter must not be empty')
if not collection:
raise ValueError('The "collection" parameter must not be empty')
if connection:
self.connection = connection
else:
self.connection = Connection(**connect_args)
self.collection = self.connection[database][collection]
def add_job(self, job):
job_dict = job.__getstate__()
job_dict['trigger'] = Binary(pickle.dumps(job.trigger,
self.pickle_protocol))
job_dict['args'] = Binary(pickle.dumps(job.args,
self.pickle_protocol))
job_dict['kwargs'] = Binary(pickle.dumps(job.kwargs,
self.pickle_protocol))
job.id = self.collection.insert(job_dict)
self.jobs.append(job)
def remove_job(self, job):
self.collection.remove(job.id)
self.jobs.remove(job)
def load_jobs(self):
jobs = []
for job_dict in self.collection.find():
try:
job = Job.__new__(Job)
job_dict['id'] = job_dict.pop('_id')
job_dict['trigger'] = pickle.loads(job_dict['trigger'])
job_dict['args'] = pickle.loads(job_dict['args'])
job_dict['kwargs'] = pickle.loads(job_dict['kwargs'])
job.__setstate__(job_dict)
jobs.append(job)
except Exception:
job_name = job_dict.get('name', '(unknown)')
logger.exception('Unable to restore job "%s"', job_name)
self.jobs = jobs
def update_job(self, job):
spec = {'_id': job.id}
document = {'$set': {'next_run_time': job.next_run_time},
'$inc': {'runs': 1}}
self.collection.update(spec, document)
def close(self):
self.connection.disconnect()
def __repr__(self):
connection = self.collection.database.connection
return '<%s (connection=%s)>' % (self.__class__.__name__, connection)

View File

@@ -0,0 +1,25 @@
"""
Stores jobs in an array in RAM. Provides no persistence support.
"""
from apscheduler.jobstores.base import JobStore
class RAMJobStore(JobStore):
def __init__(self):
self.jobs = []
def add_job(self, job):
self.jobs.append(job)
def update_job(self, job):
pass
def remove_job(self, job):
self.jobs.remove(job)
def load_jobs(self):
pass
def __repr__(self):
return '<%s>' % (self.__class__.__name__)

View File

@@ -0,0 +1,65 @@
"""
Stores jobs in a file governed by the :mod:`shelve` module.
"""
import shelve
import pickle
import random
import logging
from apscheduler.jobstores.base import JobStore
from apscheduler.job import Job
from apscheduler.util import itervalues
logger = logging.getLogger(__name__)
class ShelveJobStore(JobStore):
MAX_ID = 1000000
def __init__(self, path, pickle_protocol=pickle.HIGHEST_PROTOCOL):
self.jobs = []
self.path = path
self.pickle_protocol = pickle_protocol
self.store = shelve.open(path, 'c', self.pickle_protocol)
def _generate_id(self):
id = None
while not id:
id = str(random.randint(1, self.MAX_ID))
if not id in self.store:
return id
def add_job(self, job):
job.id = self._generate_id()
self.jobs.append(job)
self.store[job.id] = job.__getstate__()
def update_job(self, job):
job_dict = self.store[job.id]
job_dict['next_run_time'] = job.next_run_time
job_dict['runs'] = job.runs
self.store[job.id] = job_dict
def remove_job(self, job):
del self.store[job.id]
self.jobs.remove(job)
def load_jobs(self):
jobs = []
for job_dict in itervalues(self.store):
try:
job = Job.__new__(Job)
job.__setstate__(job_dict)
jobs.append(job)
except Exception:
job_name = job_dict.get('name', '(unknown)')
logger.exception('Unable to restore job "%s"', job_name)
self.jobs = jobs
def close(self):
self.store.close()
def __repr__(self):
return '<%s (path=%s)>' % (self.__class__.__name__, self.path)

View File

@@ -0,0 +1,87 @@
"""
Stores jobs in a database table using SQLAlchemy.
"""
import pickle
import logging
from apscheduler.jobstores.base import JobStore
from apscheduler.job import Job
try:
from sqlalchemy import *
except ImportError: # pragma: nocover
raise ImportError('SQLAlchemyJobStore requires SQLAlchemy installed')
logger = logging.getLogger(__name__)
class SQLAlchemyJobStore(JobStore):
def __init__(self, url=None, engine=None, tablename='apscheduler_jobs',
metadata=None, pickle_protocol=pickle.HIGHEST_PROTOCOL):
self.jobs = []
self.pickle_protocol = pickle_protocol
if engine:
self.engine = engine
elif url:
self.engine = create_engine(url)
else:
raise ValueError('Need either "engine" or "url" defined')
self.jobs_t = Table(tablename, metadata or MetaData(),
Column('id', Integer,
Sequence(tablename + '_id_seq', optional=True),
primary_key=True),
Column('trigger', PickleType(pickle_protocol, mutable=False),
nullable=False),
Column('func_ref', String(1024), nullable=False),
Column('args', PickleType(pickle_protocol, mutable=False),
nullable=False),
Column('kwargs', PickleType(pickle_protocol, mutable=False),
nullable=False),
Column('name', Unicode(1024), unique=True),
Column('misfire_grace_time', Integer, nullable=False),
Column('coalesce', Boolean, nullable=False),
Column('max_runs', Integer),
Column('max_instances', Integer),
Column('next_run_time', DateTime, nullable=False),
Column('runs', BigInteger))
self.jobs_t.create(self.engine, True)
def add_job(self, job):
job_dict = job.__getstate__()
result = self.engine.execute(self.jobs_t.insert().values(**job_dict))
job.id = result.inserted_primary_key[0]
self.jobs.append(job)
def remove_job(self, job):
delete = self.jobs_t.delete().where(self.jobs_t.c.id == job.id)
self.engine.execute(delete)
self.jobs.remove(job)
def load_jobs(self):
jobs = []
for row in self.engine.execute(select([self.jobs_t])):
try:
job = Job.__new__(Job)
job_dict = dict(row.items())
job.__setstate__(job_dict)
jobs.append(job)
except Exception:
job_name = job_dict.get('name', '(unknown)')
logger.exception('Unable to restore job "%s"', job_name)
self.jobs = jobs
def update_job(self, job):
job_dict = job.__getstate__()
update = self.jobs_t.update().where(self.jobs_t.c.id == job.id).\
values(next_run_time=job_dict['next_run_time'],
runs=job_dict['runs'])
self.engine.execute(update)
def close(self):
self.engine.dispose()
def __repr__(self):
return '<%s (url=%s)>' % (self.__class__.__name__, self.engine.url)

559
apscheduler/scheduler.py Normal file
View File

@@ -0,0 +1,559 @@
"""
This module is the main part of the library. It houses the Scheduler class
and related exceptions.
"""
from threading import Thread, Event, Lock
from datetime import datetime, timedelta
from logging import getLogger
import os
import sys
from apscheduler.util import *
from apscheduler.triggers import SimpleTrigger, IntervalTrigger, CronTrigger
from apscheduler.jobstores.ram_store import RAMJobStore
from apscheduler.job import Job, MaxInstancesReachedError
from apscheduler.events import *
from apscheduler.threadpool import ThreadPool
logger = getLogger(__name__)
class SchedulerAlreadyRunningError(Exception):
"""
Raised when attempting to start or configure the scheduler when it's
already running.
"""
def __str__(self):
return 'Scheduler is already running'
class Scheduler(object):
"""
This class is responsible for scheduling jobs and triggering
their execution.
"""
_stopped = False
_thread = None
def __init__(self, gconfig={}, **options):
self._wakeup = Event()
self._jobstores = {}
self._jobstores_lock = Lock()
self._listeners = []
self._listeners_lock = Lock()
self._pending_jobs = []
self.configure(gconfig, **options)
def configure(self, gconfig={}, **options):
"""
Reconfigures the scheduler with the given options. Can only be done
when the scheduler isn't running.
"""
if self.running:
raise SchedulerAlreadyRunningError
# Set general options
config = combine_opts(gconfig, 'apscheduler.', options)
self.misfire_grace_time = int(config.pop('misfire_grace_time', 1))
self.coalesce = asbool(config.pop('coalesce', True))
self.daemonic = asbool(config.pop('daemonic', True))
# Configure the thread pool
if 'threadpool' in config:
self._threadpool = maybe_ref(config['threadpool'])
else:
threadpool_opts = combine_opts(config, 'threadpool.')
self._threadpool = ThreadPool(**threadpool_opts)
# Configure job stores
jobstore_opts = combine_opts(config, 'jobstore.')
jobstores = {}
for key, value in jobstore_opts.items():
store_name, option = key.split('.', 1)
opts_dict = jobstores.setdefault(store_name, {})
opts_dict[option] = value
for alias, opts in jobstores.items():
classname = opts.pop('class')
cls = maybe_ref(classname)
jobstore = cls(**opts)
self.add_jobstore(jobstore, alias, True)
def start(self):
"""
Starts the scheduler in a new thread.
"""
if self.running:
raise SchedulerAlreadyRunningError
# Create a RAMJobStore as the default if there is no default job store
if not 'default' in self._jobstores:
self.add_jobstore(RAMJobStore(), 'default', True)
# Schedule all pending jobs
for job, jobstore in self._pending_jobs:
self._real_add_job(job, jobstore, False)
del self._pending_jobs[:]
self._stopped = False
self._thread = Thread(target=self._main_loop, name='APScheduler')
self._thread.setDaemon(self.daemonic)
self._thread.start()
def shutdown(self, wait=True, shutdown_threadpool=True):
"""
Shuts down the scheduler and terminates the thread.
Does not interrupt any currently running jobs.
:param wait: ``True`` to wait until all currently executing jobs have
finished (if ``shutdown_threadpool`` is also ``True``)
:param shutdown_threadpool: ``True`` to shut down the thread pool
"""
if not self.running:
return
self._stopped = True
self._wakeup.set()
# Shut down the thread pool
if shutdown_threadpool:
self._threadpool.shutdown(wait)
# Wait until the scheduler thread terminates
self._thread.join()
@property
def running(self):
return not self._stopped and self._thread and self._thread.isAlive()
def add_jobstore(self, jobstore, alias, quiet=False):
"""
Adds a job store to this scheduler.
:param jobstore: job store to be added
:param alias: alias for the job store
:param quiet: True to suppress scheduler thread wakeup
:type jobstore: instance of
:class:`~apscheduler.jobstores.base.JobStore`
:type alias: str
"""
self._jobstores_lock.acquire()
try:
if alias in self._jobstores:
raise KeyError('Alias "%s" is already in use' % alias)
self._jobstores[alias] = jobstore
jobstore.load_jobs()
finally:
self._jobstores_lock.release()
# Notify listeners that a new job store has been added
self._notify_listeners(JobStoreEvent(EVENT_JOBSTORE_ADDED, alias))
# Notify the scheduler so it can scan the new job store for jobs
if not quiet:
self._wakeup.set()
def remove_jobstore(self, alias):
"""
Removes the job store by the given alias from this scheduler.
:type alias: str
"""
self._jobstores_lock.acquire()
try:
try:
del self._jobstores[alias]
except KeyError:
raise KeyError('No such job store: %s' % alias)
finally:
self._jobstores_lock.release()
# Notify listeners that a job store has been removed
self._notify_listeners(JobStoreEvent(EVENT_JOBSTORE_REMOVED, alias))
def add_listener(self, callback, mask=EVENT_ALL):
"""
Adds a listener for scheduler events. When a matching event occurs,
``callback`` is executed with the event object as its sole argument.
If the ``mask`` parameter is not provided, the callback will receive
events of all types.
:param callback: any callable that takes one argument
:param mask: bitmask that indicates which events should be listened to
"""
self._listeners_lock.acquire()
try:
self._listeners.append((callback, mask))
finally:
self._listeners_lock.release()
def remove_listener(self, callback):
"""
Removes a previously added event listener.
"""
self._listeners_lock.acquire()
try:
for i, (cb, _) in enumerate(self._listeners):
if callback == cb:
del self._listeners[i]
finally:
self._listeners_lock.release()
def _notify_listeners(self, event):
self._listeners_lock.acquire()
try:
listeners = tuple(self._listeners)
finally:
self._listeners_lock.release()
for cb, mask in listeners:
if event.code & mask:
try:
cb(event)
except:
logger.exception('Error notifying listener')
def _real_add_job(self, job, jobstore, wakeup):
job.compute_next_run_time(datetime.now())
if not job.next_run_time:
raise ValueError('Not adding job since it would never be run')
self._jobstores_lock.acquire()
try:
try:
store = self._jobstores[jobstore]
except KeyError:
raise KeyError('No such job store: %s' % jobstore)
store.add_job(job)
finally:
self._jobstores_lock.release()
# Notify listeners that a new job has been added
event = JobStoreEvent(EVENT_JOBSTORE_JOB_ADDED, jobstore, job)
self._notify_listeners(event)
logger.info('Added job "%s" to job store "%s"', job, jobstore)
# Notify the scheduler about the new job
if wakeup:
self._wakeup.set()
def add_job(self, trigger, func, args, kwargs, jobstore='default',
**options):
"""
Adds the given job to the job list and notifies the scheduler thread.
:param trigger: alias of the job store to store the job in
:param func: callable to run at the given time
:param args: list of positional arguments to call func with
:param kwargs: dict of keyword arguments to call func with
:param jobstore: alias of the job store to store the job in
:rtype: :class:`~apscheduler.job.Job`
"""
job = Job(trigger, func, args or [], kwargs or {},
options.pop('misfire_grace_time', self.misfire_grace_time),
options.pop('coalesce', self.coalesce), **options)
if not self.running:
self._pending_jobs.append((job, jobstore))
logger.info('Adding job tentatively -- it will be properly '
'scheduled when the scheduler starts')
else:
self._real_add_job(job, jobstore, True)
return job
def _remove_job(self, job, alias, jobstore):
jobstore.remove_job(job)
# Notify listeners that a job has been removed
event = JobStoreEvent(EVENT_JOBSTORE_JOB_REMOVED, alias, job)
self._notify_listeners(event)
logger.info('Removed job "%s"', job)
def add_date_job(self, func, date, args=None, kwargs=None, **options):
"""
Schedules a job to be completed on a specific date and time.
:param func: callable to run at the given time
:param date: the date/time to run the job at
:param name: name of the job
:param jobstore: stored the job in the named (or given) job store
:param misfire_grace_time: seconds after the designated run time that
the job is still allowed to be run
:type date: :class:`datetime.date`
:rtype: :class:`~apscheduler.job.Job`
"""
trigger = SimpleTrigger(date)
return self.add_job(trigger, func, args, kwargs, **options)
def add_interval_job(self, func, weeks=0, days=0, hours=0, minutes=0,
seconds=0, start_date=None, args=None, kwargs=None,
**options):
"""
Schedules a job to be completed on specified intervals.
:param func: callable to run
:param weeks: number of weeks to wait
:param days: number of days to wait
:param hours: number of hours to wait
:param minutes: number of minutes to wait
:param seconds: number of seconds to wait
:param start_date: when to first execute the job and start the
counter (default is after the given interval)
:param args: list of positional arguments to call func with
:param kwargs: dict of keyword arguments to call func with
:param name: name of the job
:param jobstore: alias of the job store to add the job to
:param misfire_grace_time: seconds after the designated run time that
the job is still allowed to be run
:rtype: :class:`~apscheduler.job.Job`
"""
interval = timedelta(weeks=weeks, days=days, hours=hours,
minutes=minutes, seconds=seconds)
trigger = IntervalTrigger(interval, start_date)
return self.add_job(trigger, func, args, kwargs, **options)
def add_cron_job(self, func, year='*', month='*', day='*', week='*',
day_of_week='*', hour='*', minute='*', second='*',
start_date=None, args=None, kwargs=None, **options):
"""
Schedules a job to be completed on times that match the given
expressions.
:param func: callable to run
:param year: year to run on
:param month: month to run on (0 = January)
:param day: day of month to run on
:param week: week of the year to run on
:param day_of_week: weekday to run on (0 = Monday)
:param hour: hour to run on
:param second: second to run on
:param args: list of positional arguments to call func with
:param kwargs: dict of keyword arguments to call func with
:param name: name of the job
:param jobstore: alias of the job store to add the job to
:param misfire_grace_time: seconds after the designated run time that
the job is still allowed to be run
:return: the scheduled job
:rtype: :class:`~apscheduler.job.Job`
"""
trigger = CronTrigger(year=year, month=month, day=day, week=week,
day_of_week=day_of_week, hour=hour,
minute=minute, second=second,
start_date=start_date)
return self.add_job(trigger, func, args, kwargs, **options)
def cron_schedule(self, **options):
"""
Decorator version of :meth:`add_cron_job`.
This decorator does not wrap its host function.
Unscheduling decorated functions is possible by passing the ``job``
attribute of the scheduled function to :meth:`unschedule_job`.
"""
def inner(func):
func.job = self.add_cron_job(func, **options)
return func
return inner
def interval_schedule(self, **options):
"""
Decorator version of :meth:`add_interval_job`.
This decorator does not wrap its host function.
Unscheduling decorated functions is possible by passing the ``job``
attribute of the scheduled function to :meth:`unschedule_job`.
"""
def inner(func):
func.job = self.add_interval_job(func, **options)
return func
return inner
def get_jobs(self):
"""
Returns a list of all scheduled jobs.
:return: list of :class:`~apscheduler.job.Job` objects
"""
self._jobstores_lock.acquire()
try:
jobs = []
for jobstore in itervalues(self._jobstores):
jobs.extend(jobstore.jobs)
return jobs
finally:
self._jobstores_lock.release()
def unschedule_job(self, job):
"""
Removes a job, preventing it from being run any more.
"""
self._jobstores_lock.acquire()
try:
for alias, jobstore in iteritems(self._jobstores):
if job in list(jobstore.jobs):
self._remove_job(job, alias, jobstore)
return
finally:
self._jobstores_lock.release()
raise KeyError('Job "%s" is not scheduled in any job store' % job)
def unschedule_func(self, func):
"""
Removes all jobs that would execute the given function.
"""
found = False
self._jobstores_lock.acquire()
try:
for alias, jobstore in iteritems(self._jobstores):
for job in list(jobstore.jobs):
if job.func == func:
self._remove_job(job, alias, jobstore)
found = True
finally:
self._jobstores_lock.release()
if not found:
raise KeyError('The given function is not scheduled in this '
'scheduler')
def print_jobs(self, out=None):
"""
Prints out a textual listing of all jobs currently scheduled on this
scheduler.
:param out: a file-like object to print to (defaults to **sys.stdout**
if nothing is given)
"""
out = out or sys.stdout
job_strs = []
self._jobstores_lock.acquire()
try:
for alias, jobstore in iteritems(self._jobstores):
job_strs.append('Jobstore %s:' % alias)
if jobstore.jobs:
for job in jobstore.jobs:
job_strs.append(' %s' % job)
else:
job_strs.append(' No scheduled jobs')
finally:
self._jobstores_lock.release()
out.write(os.linesep.join(job_strs))
def _run_job(self, job, run_times):
"""
Acts as a harness that runs the actual job code in a thread.
"""
for run_time in run_times:
# See if the job missed its run time window, and handle possible
# misfires accordingly
difference = datetime.now() - run_time
grace_time = timedelta(seconds=job.misfire_grace_time)
if difference > grace_time:
# Notify listeners about a missed run
event = JobEvent(EVENT_JOB_MISSED, job, run_time)
self._notify_listeners(event)
logger.warning('Run time of job "%s" was missed by %s',
job, difference)
else:
try:
job.add_instance()
except MaxInstancesReachedError:
event = JobEvent(EVENT_JOB_MISSED, job, run_time)
self._notify_listeners(event)
logger.warning('Execution of job "%s" skipped: '
'maximum number of running instances '
'reached (%d)', job, job.max_instances)
break
logger.info('Running job "%s" (scheduled at %s)', job,
run_time)
try:
retval = job.func(*job.args, **job.kwargs)
except:
# Notify listeners about the exception
exc, tb = sys.exc_info()[1:]
event = JobEvent(EVENT_JOB_ERROR, job, run_time,
exception=exc, traceback=tb)
self._notify_listeners(event)
logger.exception('Job "%s" raised an exception', job)
else:
# Notify listeners about successful execution
event = JobEvent(EVENT_JOB_EXECUTED, job, run_time,
retval=retval)
self._notify_listeners(event)
logger.info('Job "%s" executed successfully', job)
job.remove_instance()
# If coalescing is enabled, don't attempt any further runs
if job.coalesce:
break
def _process_jobs(self, now):
"""
Iterates through jobs in every jobstore, starts pending jobs
and figures out the next wakeup time.
"""
next_wakeup_time = None
self._jobstores_lock.acquire()
try:
for alias, jobstore in iteritems(self._jobstores):
for job in tuple(jobstore.jobs):
run_times = job.get_run_times(now)
if run_times:
self._threadpool.submit(self._run_job, job, run_times)
# Increase the job's run count
if job.coalesce:
job.runs += 1
else:
job.runs += len(run_times)
# Update the job, but don't keep finished jobs around
if job.compute_next_run_time(now + timedelta(microseconds=1)):
jobstore.update_job(job)
else:
self._remove_job(job, alias, jobstore)
if not next_wakeup_time:
next_wakeup_time = job.next_run_time
elif job.next_run_time:
next_wakeup_time = min(next_wakeup_time,
job.next_run_time)
return next_wakeup_time
finally:
self._jobstores_lock.release()
def _main_loop(self):
"""Executes jobs on schedule."""
logger.info('Scheduler started')
self._notify_listeners(SchedulerEvent(EVENT_SCHEDULER_START))
self._wakeup.clear()
while not self._stopped:
logger.debug('Looking for jobs to run')
now = datetime.now()
next_wakeup_time = self._process_jobs(now)
# Sleep until the next job is scheduled to be run,
# a new job is added or the scheduler is stopped
if next_wakeup_time is not None:
wait_seconds = time_difference(next_wakeup_time, now)
logger.debug('Next wakeup is due at %s (in %f seconds)',
next_wakeup_time, wait_seconds)
self._wakeup.wait(wait_seconds)
else:
logger.debug('No jobs; waiting until a job is added')
self._wakeup.wait()
self._wakeup.clear()
logger.info('Scheduler has been shut down')
self._notify_listeners(SchedulerEvent(EVENT_SCHEDULER_SHUTDOWN))

133
apscheduler/threadpool.py Normal file
View File

@@ -0,0 +1,133 @@
"""
Generic thread pool class. Modeled after Java's ThreadPoolExecutor.
Please note that this ThreadPool does *not* fully implement the PEP 3148
ThreadPool!
"""
from threading import Thread, Lock, currentThread
from weakref import ref
import logging
import atexit
try:
from queue import Queue, Empty
except ImportError:
from Queue import Queue, Empty
logger = logging.getLogger(__name__)
_threadpools = set()
# Worker threads are daemonic in order to let the interpreter exit without
# an explicit shutdown of the thread pool. The following trick is necessary
# to allow worker threads to finish cleanly.
def _shutdown_all():
for pool_ref in tuple(_threadpools):
pool = pool_ref()
if pool:
pool.shutdown()
atexit.register(_shutdown_all)
class ThreadPool(object):
def __init__(self, core_threads=0, max_threads=20, keepalive=1):
"""
:param core_threads: maximum number of persistent threads in the pool
:param max_threads: maximum number of total threads in the pool
:param thread_class: callable that creates a Thread object
:param keepalive: seconds to keep non-core worker threads waiting
for new tasks
"""
self.core_threads = core_threads
self.max_threads = max(max_threads, core_threads, 1)
self.keepalive = keepalive
self._queue = Queue()
self._threads_lock = Lock()
self._threads = set()
self._shutdown = False
_threadpools.add(ref(self))
logger.info('Started thread pool with %d core threads and %s maximum '
'threads', core_threads, max_threads or 'unlimited')
def _adjust_threadcount(self):
self._threads_lock.acquire()
try:
if self.num_threads < self.max_threads:
self._add_thread(self.num_threads < self.core_threads)
finally:
self._threads_lock.release()
def _add_thread(self, core):
t = Thread(target=self._run_jobs, args=(core,))
t.setDaemon(True)
t.start()
self._threads.add(t)
def _run_jobs(self, core):
logger.debug('Started worker thread')
block = True
timeout = None
if not core:
block = self.keepalive > 0
timeout = self.keepalive
while True:
try:
func, args, kwargs = self._queue.get(block, timeout)
except Empty:
break
if self._shutdown:
break
try:
func(*args, **kwargs)
except:
logger.exception('Error in worker thread')
self._threads_lock.acquire()
self._threads.remove(currentThread())
self._threads_lock.release()
logger.debug('Exiting worker thread')
@property
def num_threads(self):
return len(self._threads)
def submit(self, func, *args, **kwargs):
if self._shutdown:
raise RuntimeError('Cannot schedule new tasks after shutdown')
self._queue.put((func, args, kwargs))
self._adjust_threadcount()
def shutdown(self, wait=True):
if self._shutdown:
return
logging.info('Shutting down thread pool')
self._shutdown = True
_threadpools.remove(ref(self))
self._threads_lock.acquire()
for _ in range(self.num_threads):
self._queue.put((None, None, None))
self._threads_lock.release()
if wait:
self._threads_lock.acquire()
threads = tuple(self._threads)
self._threads_lock.release()
for thread in threads:
thread.join()
def __repr__(self):
if self.max_threads:
threadcount = '%d/%d' % (self.num_threads, self.max_threads)
else:
threadcount = '%d' % self.num_threads
return '<ThreadPool at %x; threads=%s>' % (id(self), threadcount)

View File

@@ -0,0 +1,3 @@
from apscheduler.triggers.cron import CronTrigger
from apscheduler.triggers.interval import IntervalTrigger
from apscheduler.triggers.simple import SimpleTrigger

View File

@@ -0,0 +1,135 @@
from datetime import date, datetime
from apscheduler.triggers.cron.fields import *
from apscheduler.util import datetime_ceil, convert_to_datetime
class CronTrigger(object):
FIELD_NAMES = ('year', 'month', 'day', 'week', 'day_of_week', 'hour',
'minute', 'second')
FIELDS_MAP = {'year': BaseField,
'month': BaseField,
'week': WeekField,
'day': DayOfMonthField,
'day_of_week': DayOfWeekField,
'hour': BaseField,
'minute': BaseField,
'second': BaseField}
def __init__(self, **values):
self.start_date = values.pop('start_date', None)
if self.start_date:
self.start_date = convert_to_datetime(self.start_date)
self.fields = []
for field_name in self.FIELD_NAMES:
if field_name in values:
exprs = values.pop(field_name)
is_default = False
elif not values:
exprs = DEFAULT_VALUES[field_name]
is_default = True
else:
exprs = '*'
is_default = True
field_class = self.FIELDS_MAP[field_name]
field = field_class(field_name, exprs, is_default)
self.fields.append(field)
def _increment_field_value(self, dateval, fieldnum):
"""
Increments the designated field and resets all less significant fields
to their minimum values.
:type dateval: datetime
:type fieldnum: int
:type amount: int
:rtype: tuple
:return: a tuple containing the new date, and the number of the field
that was actually incremented
"""
i = 0
values = {}
while i < len(self.fields):
field = self.fields[i]
if not field.REAL:
if i == fieldnum:
fieldnum -= 1
i -= 1
else:
i += 1
continue
if i < fieldnum:
values[field.name] = field.get_value(dateval)
i += 1
elif i > fieldnum:
values[field.name] = field.get_min(dateval)
i += 1
else:
value = field.get_value(dateval)
maxval = field.get_max(dateval)
if value == maxval:
fieldnum -= 1
i -= 1
else:
values[field.name] = value + 1
i += 1
return datetime(**values), fieldnum
def _set_field_value(self, dateval, fieldnum, new_value):
values = {}
for i, field in enumerate(self.fields):
if field.REAL:
if i < fieldnum:
values[field.name] = field.get_value(dateval)
elif i > fieldnum:
values[field.name] = field.get_min(dateval)
else:
values[field.name] = new_value
return datetime(**values)
def get_next_fire_time(self, start_date):
if self.start_date:
start_date = max(start_date, self.start_date)
next_date = datetime_ceil(start_date)
fieldnum = 0
while 0 <= fieldnum < len(self.fields):
field = self.fields[fieldnum]
curr_value = field.get_value(next_date)
next_value = field.get_next_value(next_date)
if next_value is None:
# No valid value was found
next_date, fieldnum = self._increment_field_value(next_date,
fieldnum - 1)
elif next_value > curr_value:
# A valid, but higher than the starting value, was found
if field.REAL:
next_date = self._set_field_value(next_date, fieldnum,
next_value)
fieldnum += 1
else:
next_date, fieldnum = self._increment_field_value(next_date,
fieldnum)
else:
# A valid value was found, no changes necessary
fieldnum += 1
if fieldnum >= 0:
return next_date
def __str__(self):
options = ["%s='%s'" % (f.name, str(f)) for f in self.fields
if not f.is_default]
return 'cron[%s]' % (', '.join(options))
def __repr__(self):
options = ["%s='%s'" % (f.name, str(f)) for f in self.fields
if not f.is_default]
if self.start_date:
options.append("start_date='%s'" % self.start_date.isoformat(' '))
return '<%s (%s)>' % (self.__class__.__name__, ', '.join(options))

View File

@@ -0,0 +1,178 @@
"""
This module contains the expressions applicable for CronTrigger's fields.
"""
from calendar import monthrange
import re
from apscheduler.util import asint
__all__ = ('AllExpression', 'RangeExpression', 'WeekdayRangeExpression',
'WeekdayPositionExpression')
WEEKDAYS = ['mon', 'tue', 'wed', 'thu', 'fri', 'sat', 'sun']
class AllExpression(object):
value_re = re.compile(r'\*(?:/(?P<step>\d+))?$')
def __init__(self, step=None):
self.step = asint(step)
if self.step == 0:
raise ValueError('Increment must be higher than 0')
def get_next_value(self, date, field):
start = field.get_value(date)
minval = field.get_min(date)
maxval = field.get_max(date)
start = max(start, minval)
if not self.step:
next = start
else:
distance_to_next = (self.step - (start - minval)) % self.step
next = start + distance_to_next
if next <= maxval:
return next
def __str__(self):
if self.step:
return '*/%d' % self.step
return '*'
def __repr__(self):
return "%s(%s)" % (self.__class__.__name__, self.step)
class RangeExpression(AllExpression):
value_re = re.compile(
r'(?P<first>\d+)(?:-(?P<last>\d+))?(?:/(?P<step>\d+))?$')
def __init__(self, first, last=None, step=None):
AllExpression.__init__(self, step)
first = asint(first)
last = asint(last)
if last is None and step is None:
last = first
if last is not None and first > last:
raise ValueError('The minimum value in a range must not be '
'higher than the maximum')
self.first = first
self.last = last
def get_next_value(self, date, field):
start = field.get_value(date)
minval = field.get_min(date)
maxval = field.get_max(date)
# Apply range limits
minval = max(minval, self.first)
if self.last is not None:
maxval = min(maxval, self.last)
start = max(start, minval)
if not self.step:
next = start
else:
distance_to_next = (self.step - (start - minval)) % self.step
next = start + distance_to_next
if next <= maxval:
return next
def __str__(self):
if self.last != self.first and self.last is not None:
range = '%d-%d' % (self.first, self.last)
else:
range = str(self.first)
if self.step:
return '%s/%d' % (range, self.step)
return range
def __repr__(self):
args = [str(self.first)]
if self.last != self.first and self.last is not None or self.step:
args.append(str(self.last))
if self.step:
args.append(str(self.step))
return "%s(%s)" % (self.__class__.__name__, ', '.join(args))
class WeekdayRangeExpression(RangeExpression):
value_re = re.compile(r'(?P<first>[a-z]+)(?:-(?P<last>[a-z]+))?',
re.IGNORECASE)
def __init__(self, first, last=None):
try:
first_num = WEEKDAYS.index(first.lower())
except ValueError:
raise ValueError('Invalid weekday name "%s"' % first)
if last:
try:
last_num = WEEKDAYS.index(last.lower())
except ValueError:
raise ValueError('Invalid weekday name "%s"' % last)
else:
last_num = None
RangeExpression.__init__(self, first_num, last_num)
def __str__(self):
if self.last != self.first and self.last is not None:
return '%s-%s' % (WEEKDAYS[self.first], WEEKDAYS[self.last])
return WEEKDAYS[self.first]
def __repr__(self):
args = ["'%s'" % WEEKDAYS[self.first]]
if self.last != self.first and self.last is not None:
args.append("'%s'" % WEEKDAYS[self.last])
return "%s(%s)" % (self.__class__.__name__, ', '.join(args))
class WeekdayPositionExpression(AllExpression):
options = ['1st', '2nd', '3rd', '4th', '5th', 'last']
value_re = re.compile(r'(?P<option_name>%s) +(?P<weekday_name>(?:\d+|\w+))'
% '|'.join(options), re.IGNORECASE)
def __init__(self, option_name, weekday_name):
try:
self.option_num = self.options.index(option_name.lower())
except ValueError:
raise ValueError('Invalid weekday position "%s"' % option_name)
try:
self.weekday = WEEKDAYS.index(weekday_name.lower())
except ValueError:
raise ValueError('Invalid weekday name "%s"' % weekday_name)
def get_next_value(self, date, field):
# Figure out the weekday of the month's first day and the number
# of days in that month
first_day_wday, last_day = monthrange(date.year, date.month)
# Calculate which day of the month is the first of the target weekdays
first_hit_day = self.weekday - first_day_wday + 1
if first_hit_day <= 0:
first_hit_day += 7
# Calculate what day of the month the target weekday would be
if self.option_num < 5:
target_day = first_hit_day + self.option_num * 7
else:
target_day = first_hit_day + ((last_day - first_hit_day) / 7) * 7
if target_day <= last_day and target_day >= date.day:
return target_day
def __str__(self):
return '%s %s' % (self.options[self.option_num],
WEEKDAYS[self.weekday])
def __repr__(self):
return "%s('%s', '%s')" % (self.__class__.__name__,
self.options[self.option_num],
WEEKDAYS[self.weekday])

View File

@@ -0,0 +1,99 @@
"""
Fields represent CronTrigger options which map to :class:`~datetime.datetime`
fields.
"""
from calendar import monthrange
from apscheduler.triggers.cron.expressions import *
__all__ = ('MIN_VALUES', 'MAX_VALUES', 'DEFAULT_VALUES', 'BaseField',
'WeekField', 'DayOfMonthField', 'DayOfWeekField')
MIN_VALUES = {'year': 1970, 'month': 1, 'day': 1, 'week': 1,
'day_of_week': 0, 'hour': 0, 'minute': 0, 'second': 0}
MAX_VALUES = {'year': 2 ** 63, 'month': 12, 'day:': 31, 'week': 53,
'day_of_week': 6, 'hour': 23, 'minute': 59, 'second': 59}
DEFAULT_VALUES = {'year': '*', 'month': 1, 'day': 1, 'week': '*',
'day_of_week': '*', 'hour': 0, 'minute': 0, 'second': 0}
class BaseField(object):
REAL = True
COMPILERS = [AllExpression, RangeExpression]
def __init__(self, name, exprs, is_default=False):
self.name = name
self.is_default = is_default
self.compile_expressions(exprs)
def get_min(self, dateval):
return MIN_VALUES[self.name]
def get_max(self, dateval):
return MAX_VALUES[self.name]
def get_value(self, dateval):
return getattr(dateval, self.name)
def get_next_value(self, dateval):
smallest = None
for expr in self.expressions:
value = expr.get_next_value(dateval, self)
if smallest is None or (value is not None and value < smallest):
smallest = value
return smallest
def compile_expressions(self, exprs):
self.expressions = []
# Split a comma-separated expression list, if any
exprs = str(exprs).strip()
if ',' in exprs:
for expr in exprs.split(','):
self.compile_expression(expr)
else:
self.compile_expression(exprs)
def compile_expression(self, expr):
for compiler in self.COMPILERS:
match = compiler.value_re.match(expr)
if match:
compiled_expr = compiler(**match.groupdict())
self.expressions.append(compiled_expr)
return
raise ValueError('Unrecognized expression "%s" for field "%s"' %
(expr, self.name))
def __str__(self):
expr_strings = (str(e) for e in self.expressions)
return ','.join(expr_strings)
def __repr__(self):
return "%s('%s', '%s')" % (self.__class__.__name__, self.name,
str(self))
class WeekField(BaseField):
REAL = False
def get_value(self, dateval):
return dateval.isocalendar()[1]
class DayOfMonthField(BaseField):
COMPILERS = BaseField.COMPILERS + [WeekdayPositionExpression]
def get_max(self, dateval):
return monthrange(dateval.year, dateval.month)[1]
class DayOfWeekField(BaseField):
REAL = False
COMPILERS = BaseField.COMPILERS + [WeekdayRangeExpression]
def get_value(self, dateval):
return dateval.weekday()

View File

@@ -0,0 +1,39 @@
from datetime import datetime, timedelta
from math import ceil
from apscheduler.util import convert_to_datetime, timedelta_seconds
class IntervalTrigger(object):
def __init__(self, interval, start_date=None):
if not isinstance(interval, timedelta):
raise TypeError('interval must be a timedelta')
if start_date:
start_date = convert_to_datetime(start_date)
self.interval = interval
self.interval_length = timedelta_seconds(self.interval)
if self.interval_length == 0:
self.interval = timedelta(seconds=1)
self.interval_length = 1
if start_date is None:
self.start_date = datetime.now() + self.interval
else:
self.start_date = convert_to_datetime(start_date)
def get_next_fire_time(self, start_date):
if start_date < self.start_date:
return self.start_date
timediff_seconds = timedelta_seconds(start_date - self.start_date)
next_interval_num = int(ceil(timediff_seconds / self.interval_length))
return self.start_date + self.interval * next_interval_num
def __str__(self):
return 'interval[%s]' % str(self.interval)
def __repr__(self):
return "<%s (interval=%s, start_date=%s)>" % (
self.__class__.__name__, repr(self.interval),
repr(self.start_date))

View File

@@ -0,0 +1,17 @@
from apscheduler.util import convert_to_datetime
class SimpleTrigger(object):
def __init__(self, run_date):
self.run_date = convert_to_datetime(run_date)
def get_next_fire_time(self, start_date):
if self.run_date >= start_date:
return self.run_date
def __str__(self):
return 'date[%s]' % str(self.run_date)
def __repr__(self):
return '<%s (run_date=%s)>' % (
self.__class__.__name__, repr(self.run_date))

204
apscheduler/util.py Normal file
View File

@@ -0,0 +1,204 @@
"""
This module contains several handy functions primarily meant for internal use.
"""
from datetime import date, datetime, timedelta
from time import mktime
import re
import sys
__all__ = ('asint', 'asbool', 'convert_to_datetime', 'timedelta_seconds',
'time_difference', 'datetime_ceil', 'combine_opts',
'get_callable_name', 'obj_to_ref', 'ref_to_obj', 'maybe_ref',
'to_unicode', 'iteritems', 'itervalues', 'xrange')
def asint(text):
"""
Safely converts a string to an integer, returning None if the string
is None.
:type text: str
:rtype: int
"""
if text is not None:
return int(text)
def asbool(obj):
"""
Interprets an object as a boolean value.
:rtype: bool
"""
if isinstance(obj, str):
obj = obj.strip().lower()
if obj in ('true', 'yes', 'on', 'y', 't', '1'):
return True
if obj in ('false', 'no', 'off', 'n', 'f', '0'):
return False
raise ValueError('Unable to interpret value "%s" as boolean' % obj)
return bool(obj)
_DATE_REGEX = re.compile(
r'(?P<year>\d{4})-(?P<month>\d{1,2})-(?P<day>\d{1,2})'
r'(?: (?P<hour>\d{1,2}):(?P<minute>\d{1,2}):(?P<second>\d{1,2})'
r'(?:\.(?P<microsecond>\d{1,6}))?)?')
def convert_to_datetime(input):
"""
Converts the given object to a datetime object, if possible.
If an actual datetime object is passed, it is returned unmodified.
If the input is a string, it is parsed as a datetime.
Date strings are accepted in three different forms: date only (Y-m-d),
date with time (Y-m-d H:M:S) or with date+time with microseconds
(Y-m-d H:M:S.micro).
:rtype: datetime
"""
if isinstance(input, datetime):
return input
elif isinstance(input, date):
return datetime.fromordinal(input.toordinal())
elif isinstance(input, str):
m = _DATE_REGEX.match(input)
if not m:
raise ValueError('Invalid date string')
values = [(k, int(v or 0)) for k, v in m.groupdict().items()]
values = dict(values)
return datetime(**values)
raise TypeError('Unsupported input type: %s' % type(input))
def timedelta_seconds(delta):
"""
Converts the given timedelta to seconds.
:type delta: timedelta
:rtype: float
"""
return delta.days * 24 * 60 * 60 + delta.seconds + \
delta.microseconds / 1000000.0
def time_difference(date1, date2):
"""
Returns the time difference in seconds between the given two
datetime objects. The difference is calculated as: date1 - date2.
:param date1: the later datetime
:type date1: datetime
:param date2: the earlier datetime
:type date2: datetime
:rtype: float
"""
later = mktime(date1.timetuple()) + date1.microsecond / 1000000.0
earlier = mktime(date2.timetuple()) + date2.microsecond / 1000000.0
return later - earlier
def datetime_ceil(dateval):
"""
Rounds the given datetime object upwards.
:type dateval: datetime
"""
if dateval.microsecond > 0:
return dateval + timedelta(seconds=1,
microseconds=-dateval.microsecond)
return dateval
def combine_opts(global_config, prefix, local_config={}):
"""
Returns a subdictionary from keys and values of ``global_config`` where
the key starts with the given prefix, combined with options from
local_config. The keys in the subdictionary have the prefix removed.
:type global_config: dict
:type prefix: str
:type local_config: dict
:rtype: dict
"""
prefixlen = len(prefix)
subconf = {}
for key, value in global_config.items():
if key.startswith(prefix):
key = key[prefixlen:]
subconf[key] = value
subconf.update(local_config)
return subconf
def get_callable_name(func):
"""
Returns the best available display name for the given function/callable.
"""
name = func.__module__
if hasattr(func, '__self__') and func.__self__:
name += '.' + func.__self__.__name__
elif hasattr(func, 'im_self') and func.im_self: # py2.4, 2.5
name += '.' + func.im_self.__name__
if hasattr(func, '__name__'):
name += '.' + func.__name__
return name
def obj_to_ref(obj):
"""
Returns the path to the given object.
"""
ref = '%s:%s' % (obj.__module__, obj.__name__)
try:
obj2 = ref_to_obj(ref)
except AttributeError:
pass
else:
if obj2 == obj:
return ref
raise ValueError('Only module level objects are supported')
def ref_to_obj(ref):
"""
Returns the object pointed to by ``ref``.
"""
modulename, rest = ref.split(':', 1)
obj = __import__(modulename)
for name in modulename.split('.')[1:] + rest.split('.'):
obj = getattr(obj, name)
return obj
def maybe_ref(ref):
"""
Returns the object that the given reference points to, if it is indeed
a reference. If it is not a reference, the object is returned as-is.
"""
if not isinstance(ref, str):
return ref
return ref_to_obj(ref)
def to_unicode(string, encoding='ascii'):
"""
Safely converts a string to a unicode representation on any
Python version.
"""
if hasattr(string, 'decode'):
return string.decode(encoding, 'ignore')
return string
if sys.version_info < (3, 0): # pragma: nocover
iteritems = lambda d: d.iteritems()
itervalues = lambda d: d.itervalues()
xrange = xrange
else: # pragma: nocover
iteritems = lambda d: d.items()
itervalues = lambda d: d.values()
xrange = range

25
config.ini Normal file
View File

@@ -0,0 +1,25 @@
[General]
http_host = 0.0.0.0
http_port = 8181
http_username = ohmightyrv
http_password = nelson1
launch_browser = 0
include_lossless = 0
flac_to_mp3 = 0
move_to_itunes = 0
path_to_itunes = ""
rename_mp3s = 0
cleanup = 0
add_album_art = 0
music_download_dir = ""
usenet_retention = 500
[SABnzbd]
sab_host = 0.0.0.0:8080
sab_username = ohmightyrv
sab_password = nelson1
sab_apikey = 1be61ba6032dfd98892969aa799d3541
sab_category = music
[NZBMatrix]
nzbmatrix = 1
nzbmatrix_username = ohmightyrv
nzbmatrix_apikey = 01a9d83368952932bb3b43386e593fe1

32
configcreate.py Normal file
View File

@@ -0,0 +1,32 @@
from configobj import ConfigObj
def configCreate(path):
config = ConfigObj()
config.filename = path
config['General'] = {}
config['General']['http_host'] = '0.0.0.0'
config['General']['http_port'] = 8181
config['General']['http_username'] = ''
config['General']['http_password'] = ''
config['General']['launch_browser'] = 0
config['General']['include_lossless'] = 0
config['General']['flac_to_mp3'] = 0
config['General']['move_to_itunes'] = 0
config['General']['path_to_itunes'] = ''
config['General']['rename_mp3s'] = 0
config['General']['cleanup'] = 0
config['General']['add_album_art'] = 0
config['General']['music_download_dir'] = ''
config['General']['usenet_retention'] = 500
config['SABnzbd'] = {}
config['SABnzbd']['sab_host'] = ''
config['SABnzbd']['sab_username'] = ''
config['SABnzbd']['sab_password'] = ''
config['SABnzbd']['sab_apikey'] = ''
config['SABnzbd']['sab_category'] = ''
config['NZBMatrix'] = {}
config['NZBMatrix']['nzbmatrix'] = 0
config['NZBMatrix']['nzbmatrix_username'] = ''
config['NZBMatrix']['nzbmatrix_apikey'] = ''
config.write()

3909
feedparser.py Normal file

File diff suppressed because it is too large Load Diff

104
searcher.py Normal file
View File

@@ -0,0 +1,104 @@
import urllib
from webServer import database
from configobj import ConfigObj
import string
import feedparser
import sqlite3
import re
config = ConfigObj('config.ini')
General = config['General']
NZBMatrix = config['NZBMatrix']
SABnzbd = config['SABnzbd']
nzbmatrix = NZBMatrix['nzbmatrix']
nzbmatrix_username = NZBMatrix['nzbmatrix_username']
nzbmatrix_apikey = NZBMatrix['nzbmatrix_apikey']
usenet_retention = General['usenet_retention']
sab_host = SABnzbd['sab_host']
sab_username = SABnzbd['sab_username']
sab_password = SABnzbd['sab_password']
sab_apikey = SABnzbd['sab_apikey']
sab_category = SABnzbd['sab_category']
if General['include_lossless'] == '1':
categories = "23, 22"
maxsize = 2000000000
else:
categories = "22"
maxsize = 250000000
def searchNZB(albumid=None):
conn=sqlite3.connect(database)
c=conn.cursor()
if albumid:
c.execute('SELECT ArtistName, AlbumTitle, AlbumID, ReleaseDate from albums WHERE Status="Wanted" AND AlbumID="%s"' % albumid)
else:
c.execute('SELECT ArtistName, AlbumTitle, AlbumID, ReleaseDate from albums WHERE Status="Wanted"')
results = c.fetchall()
for albums in results:
reldate = albums[3]
year = reldate[:4]
clname = string.replace(albums[0], ' & ', ' ')
clalbum = string.replace(albums[1], ' & ', ' ')
term = re.sub('[\.\-]', ' ', '%s %s %s' % (clname, clalbum, year)).encode('utf-8')
params = { "page": "download",
"username": nzbmatrix_username,
"apikey": nzbmatrix_apikey,
"subcat": categories,
"age": usenet_retention,
"english": 1,
"ssl": 1,
"scenename": 1,
"term": term
}
searchURL = "http://rss.nzbmatrix.com/rss.php?" + urllib.urlencode(params)
d = feedparser.parse(searchURL)
resultlist = []
for item in d.entries:
try:
url = item.link
title = item.title
size = int(item.links[1]['length'])
if size < maxsize:
resultlist.append((title, size, url))
except:
print '''No results found'''
bestqual = sorted(resultlist, key=lambda title: title[1], reverse=True)[0]
downloadurl = bestqual[2]
linkparams = { "mode": "addurl",
"apikey": sab_apikey,
"ma_username": sab_username,
"ma_password": sab_password,
"cat": sab_category,
"name": downloadurl
}
saburl = 'http://' + sab_host + '/sabnzbd/api?' + urllib.urlencode(linkparams)
urllib.urlopen(saburl)
c.execute('UPDATE albums SET status = "Snatched" WHERE AlbumID="%s"' % albums[2])
conn.commit()
c.close()

32
threadtools-backup.py Normal file
View File

@@ -0,0 +1,32 @@
from cherrypy.process.plugins import SimplePlugin
import time
import threading
import Queue
class threadtool(SimplePlugin):
thread = None
def __init__(self, bus):
SimplePlugin.__init__(self, bus)
def start(self):
self.running = True
if not self.thread:
self.thread = threading.Thread(target=self.run)
self.thread.start()
def stop(self):
self.running = False
if self.thread:
self.thread.join()
self.thread = None
self.running = False
def run(self):
while self.running:
from webServer import database
import updater
updater.dbUpdate(database)
time.sleep(3600*24)

36
threadtools.py Normal file
View File

@@ -0,0 +1,36 @@
from cherrypy.process.plugins import SimplePlugin
from apscheduler.scheduler import Scheduler
import time
import threading
import Queue
class threadtool(SimplePlugin):
sched = Scheduler()
thread = None
def __init__(self, bus):
SimplePlugin.__init__(self, bus)
def start(self):
self.running = True
if not self.thread:
self.thread = threading.Thread(target=self.run)
self.thread.start()
self.sched.start()
def stop(self):
self.running = False
print '''Shutting down'''
if self.thread:
self.thread.join()
self.thread = None
self.running = False
self.sched.shutdown()
def run(self):
import updater
import searcher
self.sched.add_cron_job(updater.dbUpdate, hour=3)
self.sched.add_interval_job(searcher.searchNZB, hours=18)

56
updater.py Normal file
View File

@@ -0,0 +1,56 @@
from webServer import database
import musicbrainz2.webservice as ws
import musicbrainz2.model as m
import musicbrainz2.utils as u
import sqlite3
import time
def dbUpdate():
conn=sqlite3.connect(database)
c=conn.cursor()
c.execute('SELECT ArtistID from artists WHERE Status="Active"')
activeartists = c.fetchall()
i = 0
while i < len(activeartists):
artistid = activeartists[i][0]
inc = ws.ArtistIncludes(releases=(m.Release.TYPE_OFFICIAL, m.Release.TYPE_ALBUM), ratings=False, releaseGroups=False)
artist = ws.Query().getArtistById(artistid, inc)
print '''now working on %s''' % artist.name
for release in artist.getReleases():
releaseid = u.extractUuid(release.id)
inc = ws.ReleaseIncludes(artist=True, releaseEvents= True, tracks= True, releaseGroup=True)
results = ws.Query().getReleaseById(releaseid, inc)
print '''now working on %s by %s''' % (results.title, artist.name)
time.sleep(2)
for event in results.releaseEvents:
if event.country == 'US':
if (u.extractUuid(results.id) in x for x in activeartists):
print '''%s is already in the database''' % results.title
c.execute('UPDATE albums SET AlbumASIN="%s", ReleaseDate="%s" WHERE AlbumID="%s"' % (results.asin, results.getEarliestReleaseDate(), u.extractUuid(results.id)))
print ''' updated asin and release date for %s ''' % results.title
for track in results.tracks:
c.execute('UPDATE tracks SET TrackDuration="%s" WHERE AlbumID="%s" AND TrackID="%s"' % (track.duration, u.extractUuid(results.id), u.extractUuid(track.id)))
conn.commit()
print '''%s has been updated''' % results.title
else:
print '''%s is new, adding it''' % results.title
c.execute('INSERT INTO albums VALUES( ?, ?, ?, ?, ?, CURRENT_DATE, ?, ?)', (artistid, results.artist.name, results.title, results.asin, results.getEarliestReleaseDate(), u.extractUuid(results.id), 'Skipped'))
conn.commit()
c.execute('SELECT ReleaseDate, DateAdded from albums WHERE AlbumID="%s"' % u.extractUuid(results.id))
latestrelease = c.fetchall()
if latestrelease[0][0] > latestrelease[0][1]:
c.execute('UPDATE albums SET Status = "Wanted" WHERE AlbumID="%s"' % u.extractUuid(results.id))
else:
pass
for track in results.tracks:
c.execute('INSERT INTO tracks VALUES( ?, ?, ?, ?, ?, ?, ?, ?)', (artistid, results.artist.name, results.title, results.asin, u.extractUuid(results.id), track.title, track.duration, u.extractUuid(track.id)))
conn.commit()
print '''%s has been added''' % release.title
else:
print '''%s is not a US release''' % results.title
i += 1
conn.commit()
c.close()
conn.close()

6
updatertest.py Normal file
View File

@@ -0,0 +1,6 @@
import time
def threadtest():
while True:
print '''The time is now'''
time.sleep(10)