import hashlib
import logging
import os
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.triggers.cron import CronTrigger
from loguru import logger
from flexget.config_schema import format_checker, one_or_more, register_config_key, register_schema
from flexget.event import event
from flexget.manager import manager
from flexget.utils import json
logger = logger.bind(name='scheduler')
# Add a format checker for more detailed errors on cron type schedules
[docs]
@format_checker.checks('cron_schedule', raises=ValueError)
def is_cron_schedule(instance):
if not isinstance(instance, dict):
return True
try:
return CronTrigger(**instance)
except TypeError:
# A more specific error message about which key will also be shown by properties schema keyword
raise ValueError('Invalid key for schedule.')
DEFAULT_SCHEDULES = [{'tasks': ['*'], 'interval': {'hours': 1}}]
UNITS = ['minutes', 'hours', 'days', 'weeks']
interval_schema = {
'type': 'object',
'title': 'Simple Interval',
'properties': {
'minutes': {'type': 'number'},
'hours': {'type': 'number'},
'days': {'type': 'number'},
'weeks': {'type': 'number'},
'jitter': {'type': 'integer'},
},
'anyOf': [{'required': [unit]} for unit in UNITS],
'error_anyOf': 'Interval must be specified as one or more of {}'.format(', '.join(UNITS)),
'additionalProperties': False,
}
cron_schema = {
'type': 'object',
'title': 'Advanced Cron Interval',
'properties': {
'year': {'type': ['integer', 'string']},
'month': {'type': ['integer', 'string']},
'day': {'type': ['integer', 'string']},
'week': {'type': ['integer', 'string']},
'day_of_week': {'type': ['integer', 'string']},
'hour': {'type': ['integer', 'string']},
'minute': {'type': ['integer', 'string']},
'jitter': {'type': 'integer'},
},
'additionalProperties': False,
}
schedule_schema = {
'type': 'object',
'title': 'schedule',
'description': 'A schedule which runs specified tasks periodically',
'properties': {
'tasks': one_or_more({'type': 'string'}),
'interval': interval_schema,
'schedule': cron_schema,
},
'required': ['tasks'],
'minProperties': 2,
'maxProperties': 2,
'error_minProperties': 'Either `cron` or `interval` must be defined.',
'error_maxProperties': 'Either `cron` or `interval` must be defined.',
'additionalProperties': False,
}
main_schema = {
'title': 'scheduler',
'description': 'Runs tasks periodically (when FlexGet is run as a daemon)',
'oneOf': [
{'type': 'array', 'items': schedule_schema},
{'type': 'boolean'},
],
}
scheduler = None
scheduler_job_map = {}
[docs]
def job_id(conf):
"""Create a unique id for a schedule item in config."""
return hashlib.sha1(json.dumps(conf, sort_keys=True).encode('utf-8')).hexdigest()
[docs]
def run_job(tasks):
"""Add the execution to the queue and waits until it is finished."""
logger.debug('executing tasks: {}', tasks)
finished_events = manager.execute(
options={'tasks': tasks, 'cron': True, 'allow_manual': False}, priority=5
)
for _, task_name, event_ in finished_events:
logger.debug('task finished executing: {}', task_name)
event_.wait()
logger.debug('all tasks in schedule finished executing')
[docs]
@event('manager.daemon.started')
def setup_scheduler(manager):
"""Configure and start apscheduler."""
global scheduler
if logger.level(manager.options.loglevel).no > logger.level('DEBUG').no:
logging.getLogger('apscheduler').setLevel(logging.WARNING)
# Since APScheduler runs in a separate thread, slower devices can sometimes get a DB lock, so use a separate db
# for the jobs to avoid this
db_filename = os.path.join(manager.config_base, f'db-{manager.config_name}-jobs.sqlite')
# in case running on windows, needs double \\
db_filename = db_filename.replace('\\', '\\\\')
database_uri = f'sqlite:///{db_filename}'
jobstores = {'default': SQLAlchemyJobStore(url=database_uri)}
# If job was meant to run within last day while daemon was shutdown, run it once when continuing
job_defaults = {'coalesce': True, 'misfire_grace_time': 60 * 60 * 24}
scheduler = BackgroundScheduler(
jobstores=jobstores,
job_defaults=job_defaults,
)
setup_jobs(manager)
[docs]
@event('manager.config_updated')
def setup_jobs(manager):
"""Set up the jobs for apscheduler to run."""
if not manager.is_daemon:
return
global scheduler_job_map
scheduler_job_map = {}
if 'schedules' not in manager.config:
logger.info(
'No schedules defined in config. Defaulting to run all tasks on a 1 hour interval.'
)
config = manager.config.get('schedules', True)
if config is True:
config = DEFAULT_SCHEDULES
elif not config: # Schedules are disabled with `schedules: no`
if scheduler.running:
logger.info('Shutting down scheduler')
scheduler.shutdown()
return
if not scheduler.running:
logger.info('Starting scheduler')
scheduler.start(paused=True)
existing_job_ids = [job.id for job in scheduler.get_jobs()]
configured_job_ids = []
for job_config in config:
jid = job_id(job_config)
configured_job_ids.append(jid)
scheduler_job_map[id(job_config)] = jid
if jid in existing_job_ids:
continue
if 'interval' in job_config:
trigger, trigger_args = 'interval', job_config['interval']
else:
trigger, trigger_args = 'cron', job_config['schedule']
tasks = job_config['tasks']
if not isinstance(tasks, list):
tasks = [tasks]
name = ','.join(tasks)
scheduler.add_job(
run_job, args=(tasks,), id=jid, name=name, trigger=trigger, **trigger_args
)
# Remove jobs no longer in config
for jid in existing_job_ids:
if jid not in configured_job_ids:
scheduler.remove_job(jid)
scheduler.resume()
[docs]
@event('manager.shutdown_requested')
def shutdown_requested(manager):
if scheduler and scheduler.running:
scheduler.shutdown(wait=True)
[docs]
@event('manager.shutdown')
def stop_scheduler(manager):
if scheduler and scheduler.running:
scheduler.shutdown(wait=False)
[docs]
@event('config.register')
def register_config():
register_config_key('schedules', main_schema)
register_schema('/schema/config/schedule', schedule_schema)