import datetime
import itertools
import random
from loguru import logger
from sqlalchemy import Column, DateTime, Index, Integer, Unicode
from flexget import db_schema, options, plugin
from flexget.event import event
from flexget.manager import Session
from flexget.utils.tools import aggregate_inputs, multiply_timedelta, parse_timedelta
logger = logger.bind(name='discover')
Base = db_schema.versioned_base('discover', 0)
[docs]
class DiscoverEntry(Base):
__tablename__ = 'discover_entry'
id = Column(Integer, primary_key=True)
title = Column(Unicode, index=True)
task = Column(Unicode, index=True)
last_execution = Column(DateTime)
def __init__(self, title, task):
self.title = title
self.task = task
self.last_execution = None
def __str__(self):
return f'<DiscoverEntry(title={self.title},task={self.task},added={self.last_execution})>'
Index('ix_discover_entry_title_task', DiscoverEntry.title, DiscoverEntry.task)
[docs]
@event('manager.db_cleanup')
def db_cleanup(manager, session):
value = datetime.datetime.now() - parse_timedelta('7 days')
for discover_entry in (
session.query(DiscoverEntry).filter(DiscoverEntry.last_execution <= value).all()
):
logger.debug('deleting {}', discover_entry)
session.delete(discover_entry)
[docs]
class Discover:
"""Discover content based on other inputs material.
Example::
discover:
what:
- next_series_episodes: yes
from:
- piratebay
interval: [1 hours|days|weeks]
release_estimations: [strict|loose|ignore]
"""
schema = {
'type': 'object',
'properties': {
'what': {
'type': 'array',
'items': {
'allOf': [
{'$ref': '/schema/plugins?phase=input'},
{'maxProperties': 1, 'minProperties': 1},
]
},
},
'from': {
'type': 'array',
'items': {
'allOf': [
{'$ref': '/schema/plugins?interface=search'},
{'maxProperties': 1, 'minProperties': 1},
]
},
},
'interval': {'type': 'string', 'format': 'interval', 'default': '5 hours'},
'release_estimations': {
'oneOf': [
{
'type': 'string',
'default': 'strict',
'enum': ['loose', 'strict', 'ignore', 'smart'],
},
{
'type': 'object',
'properties': {'optimistic': {'type': 'string', 'format': 'interval'}},
'required': ['optimistic'],
},
]
},
'limit': {'type': 'integer', 'minimum': 1},
},
'required': ['what', 'from'],
'additionalProperties': False,
}
[docs]
def execute_searches(self, config, entries, task):
"""Return list of entries found from search engines listed under `from` configuration.
:param config: Discover plugin config
:param entries: List of pseudo entries to search
:param task: Task being run
"""
result = []
for index, entry in enumerate(entries):
entry_results = []
for item in config['from']:
if isinstance(item, dict):
plugin_name, plugin_config = next(iter(item.items()))
else:
plugin_name, plugin_config = item, None
search = plugin.get(plugin_name, self)
if not callable(search.search):
logger.critical(
'Search plugin {} does not implement search method', plugin_name
)
continue
logger.verbose(
'Searching for `{}` with plugin `{}` ({} of {})',
entry['title'],
plugin_name,
index + 1,
len(entries),
)
try:
search_results = search.search(task=task, entry=entry, config=plugin_config)
if not search_results:
logger.debug('No results from {}', plugin_name)
continue
if config.get('limit'):
search_results = itertools.islice(search_results, config['limit'])
# 'search_results' can be any iterable, make sure it's a list.
search_results = list(search_results)
logger.debug('Discovered {} entries from {}', len(search_results), plugin_name)
for e in search_results:
e['discovered_from'] = entry['title']
e['discovered_with'] = plugin_name
e.on_complete(
self.entry_complete, query=entry, search_results=search_results
)
entry_results.extend(search_results)
except plugin.PluginWarning as e:
logger.verbose('No results from {}: {}', plugin_name, e)
except plugin.PluginError as e:
logger.error('Error searching with {}: {}', plugin_name, e)
if not entry_results:
logger.verbose('No search results for `{}`', entry['title'])
entry.complete()
continue
result.extend(entry_results)
return result
[docs]
def entry_complete(self, entry, query=None, search_results=None, **kwargs):
"""Use as callback for Entry."""
if entry.accepted:
# One of the search results was accepted, transfer the acceptance back to the query entry which generated it
query.accept()
# Remove this entry from the list of search results yet to complete
search_results.remove(entry)
# When all the search results generated by a query entry are complete, complete the query which generated them
if not search_results:
query.complete()
[docs]
def estimated(self, entries, estimation_mode):
"""Return entries that we have estimated to be available.
:param dict estimation_mode: mode -> loose, strict, ignore
"""
estimator = plugin.get('estimate_release', self)
result = []
for entry in entries:
estimation = estimator.estimate(entry)
est_date = estimation['entity_date']
data_exists = estimation['data_exists']
if est_date is None:
if estimation_mode['mode'] == 'strict':
logger.verbose(
'Skipping discovery for `{}`, no release date could be determined. '
'To discover anyway, add `release_estimations: ignore` to your configuration.',
entry['title'],
)
entry.reject('has no release date')
entry.complete()
elif estimation_mode['mode'] == 'smart' and data_exists:
logger.debug(
'No release date could be determined for `{}`, but exists data',
entry['title'],
)
entry.reject('exists but has no release date')
entry.complete()
elif estimation_mode['mode'] == 'smart' and not data_exists:
logger.debug(
'Discovering because mode is `{}` and no data is found for entry',
estimation_mode['mode'],
)
result.append(entry)
else:
result.append(entry)
continue
if isinstance(est_date, datetime.date):
# If we just got a date, add a time so we can compare it to now()
est_date = datetime.datetime.combine(est_date, datetime.time())
if datetime.datetime.now() >= est_date:
logger.debug('{} has been released at {}', entry['title'], est_date)
result.append(entry)
elif datetime.datetime.now() >= est_date - parse_timedelta(
estimation_mode['optimistic']
):
logger.debug(
'{} will be released at {}. Ignoring release estimation because estimated release date is in less than {}',
entry['title'],
est_date,
estimation_mode['optimistic'],
)
result.append(entry)
else:
entry.reject('has not been released')
entry.complete()
logger.verbose(
"{} hasn't been released yet (Expected: {})", entry['title'], est_date
)
return result
[docs]
def interval_expired(self, config, task, entries):
"""Maintain some limit levels so that we don't hammer search sites with unreasonable amount of queries.
:return: Entries that are up for ``config['interval']``
"""
config.setdefault('interval', '5 hour')
interval = parse_timedelta(config['interval'])
if task.options.discover_now:
logger.info('Ignoring interval because of --discover-now')
result = []
interval_count = 0
with Session() as session:
for entry in entries:
discover_entry = (
session
.query(DiscoverEntry)
.filter(DiscoverEntry.title == entry['title'])
.filter(DiscoverEntry.task == task.name)
.first()
)
if not discover_entry:
logger.debug('{} -> No previous run recorded', entry['title'])
discover_entry = DiscoverEntry(entry['title'], task.name)
session.add(discover_entry)
if (
not task.is_rerun and task.options.discover_now
) or not discover_entry.last_execution:
# First time we execute (and on --discover-now) we randomize time to avoid clumping
delta = multiply_timedelta(interval, random.random())
discover_entry.last_execution = datetime.datetime.now() - delta
else:
next_time = discover_entry.last_execution + interval
logger.debug(
'last_time: {!r}, interval: {}, next_time: {!r}, ',
discover_entry.last_execution,
config['interval'],
next_time,
)
if datetime.datetime.now() < next_time:
logger.debug('interval not met')
interval_count += 1
entry.reject('discover interval not met')
entry.complete()
continue
discover_entry.last_execution = datetime.datetime.now()
logger.trace('interval passed for {}', entry['title'])
result.append(entry)
if interval_count and not task.is_rerun:
logger.verbose(
'Discover interval of {} not met for {} entries. Use --discover-now to override.',
config['interval'],
interval_count,
)
return result
[docs]
@event('plugin.register')
def register_plugin():
plugin.register(Discover, 'discover', api_ver=2)
[docs]
@event('options.register')
def register_parser_arguments():
options.get_parser('execute').add_argument(
'--discover-now',
action='store_true',
dest='discover_now',
default=False,
help='Immediately try to discover everything',
)