Source code for flexget.components.sites.urlrewrite_search
from difflib import SequenceMatcher
from loguru import logger
from flexget import plugin
from flexget.event import event
logger = logger.bind(name='urlrewrite_search')
[docs]
class PluginSearch:
"""Search entry from sites.
Accept list of known search plugins, list is in priority order.
Once hit has been found no more searches are performed. Should be used only when
there is no other way to get working download url, ie. when input plugin does not provide
any downloadable urls.
Example::
urlrewrite_search:
- newtorrents
- piratebay
.. note:: Some url rewriters will use search plugins automatically if entry url
points into a search page.
"""
schema = {
'type': 'array',
'items': {
'allOf': [
{'$ref': '/schema/plugins?interface=search'},
{'maxProperties': 1, 'minProperties': 1},
]
},
}
# Run before main urlrewriting
[docs]
@plugin.priority(130)
def on_task_urlrewrite(self, task, config):
# no searches in unit test mode
if task.manager.unit_test:
return
plugins = {}
for p in plugin.get_plugins(interface='search'):
plugins[p.name] = p.instance
# search accepted
for entry in task.accepted:
# loop through configured searches
for name in config:
search_config = None
if isinstance(name, dict):
# the name is the first/only key in the dict.
name, search_config = next(iter(name.items()))
logger.verbose('Searching `{}` from {}', entry['title'], name)
try:
try:
results = plugins[name].search(
task=task, entry=entry, config=search_config
)
except TypeError:
# Old search api did not take task argument
logger.warning(
'Search plugin {} does not support latest search api.', name
)
results = plugins[name].search(entry, search_config)
matcher = SequenceMatcher(a=entry['title'])
for result in results:
matcher.set_seq2(result['title'])
if matcher.ratio() > 0.9:
logger.debug('Found url: {}', result['url'])
entry['url'] = result['url']
break
logger.debug('Match {} is not close enough', result['title'])
else:
continue
break
except (plugin.PluginError, plugin.PluginWarning) as pw:
logger.verbose('Failed: {}', pw.value)
continue
# Search failed
else:
# If I don't have a URL, doesn't matter if I'm immortal...
entry['immortal'] = False
entry.reject('search failed')
[docs]
@event('plugin.register')
def register_plugin():
plugin.register(PluginSearch, 'urlrewrite_search', api_ver=2)