Source code for flexget.plugins.filter.delay
import pickle
from datetime import datetime
from loguru import logger
from sqlalchemy import Column, DateTime, Index, Integer, String, Unicode, select
from flexget import db_schema, plugin
from flexget.entry import Entry
from flexget.event import event
from flexget.utils import json, serialization
from flexget.utils.database import entry_synonym
from flexget.utils.sqlalchemy_utils import table_add_column, table_schema
from flexget.utils.tools import parse_timedelta
logger = logger.bind(name='delay')
Base = db_schema.versioned_base('delay', 3)
[docs]
class DelayedEntry(Base):
__tablename__ = 'delay'
id = Column(Integer, primary_key=True)
task = Column('feed', String)
title = Column(Unicode)
expire = Column(DateTime)
_json = Column('json', Unicode)
entry = entry_synonym('_json')
def __repr__(self):
return f'<DelayedEntry(title={self.title})>'
Index('delay_feed_title', DelayedEntry.task, DelayedEntry.title)
# TODO: index "expire, task"
@db_schema.upgrade('delay')
def upgrade(ver, session):
if ver is None:
# Upgrade to version 0 was a failed attempt at cleaning bad entries from our table, better attempt in ver 1
ver = 1
if ver == 1:
table = table_schema('delay', session)
table_add_column(table, 'json', Unicode, session)
# Make sure we get the new schema with the added column
table = table_schema('delay', session)
failures = 0
for row in session.execute(select(table.c.id, table.c.entry)):
try:
p = pickle.loads(row['entry'])
session.execute(
table
.update()
.where(table.c.id == row['id'])
.values(json=json.dumps(p, encode_datetime=True))
)
except (KeyError, ImportError):
failures += 1
if failures > 0:
logger.error(
'Error upgrading {} pickle objects. Some delay information has been lost.',
failures,
)
ver = 2
if ver == 2:
table = table_schema('delay', session)
for row in session.execute(select(table.c.id, table.c.json)):
if not row['json']:
# Seems there could be invalid data somehow. See #2590
continue
data = json.loads(row['json'], decode_datetime=True)
# If title looked like a date, make sure it's a string
title = str(data.pop('title'))
e = Entry(title=title, **data)
session.execute(
table.update().where(table.c.id == row['id']).values(json=serialization.dumps(e))
)
ver = 3
return ver
[docs]
class FilterDelay:
"""Add delay to a task. This is useful for de-prioritizing expensive / bad-quality tasks.
Format: n [minutes|hours|days|weeks]
Example::
delay: 2 hours
"""
schema = {'type': 'string', 'format': 'interval'}
[docs]
def get_delay(self, config):
logger.debug('delay: {}', config)
try:
return parse_timedelta(config)
except ValueError:
raise plugin.PluginError('Invalid time format', logger)
[docs]
@plugin.priority(-1)
def on_task_input(self, task, config):
"""Capture the current input then replaces it with entries that have passed the delay."""
if task.entries:
logger.verbose('Delaying {} new entries for {}', len(task.entries), config)
# Let details plugin know that it is ok if this task doesn't produce any entries
task.no_entries_ok = True
# First learn the current entries in the task to the database
expire_time = datetime.now() + self.get_delay(config)
for entry in task.entries:
logger.debug('Delaying {}', entry['title'])
# check if already in queue
if (
not task.session
.query(DelayedEntry)
.filter(DelayedEntry.title == entry['title'])
.filter(DelayedEntry.task == task.name)
.first()
):
delay_entry = DelayedEntry()
delay_entry.title = entry['title']
delay_entry.entry = entry
delay_entry.task = task.name
delay_entry.expire = expire_time
task.session.add(delay_entry)
# Clear the current entries from the task now that they are stored
task.all_entries[:] = []
# Generate the list of entries whose delay has passed
passed_delay = (
task.session
.query(DelayedEntry)
.filter(datetime.now() > DelayedEntry.expire)
.filter(DelayedEntry.task == task.name)
)
delayed_entries = [item.entry for item in passed_delay.all()]
for entry in delayed_entries:
entry['passed_delay'] = True
logger.debug('Releasing {}', entry['title'])
# Delete the entries from the db we are about to inject
passed_delay.delete()
if delayed_entries:
logger.verbose('Restoring {} entries that have passed delay.', len(delayed_entries))
# Return our delayed entries
return delayed_entries
[docs]
@event('plugin.register')
def register_plugin():
plugin.register(FilterDelay, 'delay', api_ver=2)