Source code for flexget.plugins.filter.crossmatch
from loguru import logger
from flexget import plugin
from flexget.event import event
from flexget.utils.tools import aggregate_inputs
logger = logger.bind(name='crossmatch')
[docs]
class CrossMatch:
"""Perform action based on item on current task and other inputs.
Example::
crossmatch:
from:
- rss: http://example.com/
fields:
- title
action: reject
exact: yes
case_sensitive: yes
"""
schema = {
'type': 'object',
'properties': {
'fields': {'type': 'array', 'items': {'type': 'string'}},
'action': {'enum': ['accept', 'reject']},
'from': {'type': 'array', 'items': {'$ref': '/schema/plugins?phase=input'}},
'exact': {'type': 'boolean', 'default': True},
'all_fields': {'type': 'boolean', 'default': False},
'case_sensitive': {'type': 'boolean', 'default': True},
},
'required': ['fields', 'action', 'from'],
'additionalProperties': False,
}
[docs]
def on_task_filter(self, task, config):
fields = config['fields']
action = config['action']
all_fields = config['all_fields']
if not task.entries:
logger.trace('Stopping crossmatch filter because of no entries to check')
return
match_entries = aggregate_inputs(task, config['from'])
# perform action on intersecting entries
for entry in task.entries:
for generated_entry in match_entries:
logger.trace('checking if {} matches {}', entry['title'], generated_entry['title'])
common = self.entry_intersects(
entry,
generated_entry,
fields,
config.get('exact'),
config.get('case_sensitive'),
)
if common and (not all_fields or len(common) == len(fields)):
msg = 'intersects with {} on field(s) {}'.format(
generated_entry['title'],
', '.join(common),
)
for key in generated_entry:
if key not in entry:
entry[key] = generated_entry[key]
if action == 'reject':
entry.reject(msg)
if action == 'accept':
entry.accept(msg)
[docs]
def entry_intersects(self, e1, e2, fields=None, exact=True, case_sensitive=True):
"""Return list of field names in common.
:param e1: First :class:`flexget.entry.Entry`
:param e2: Second :class:`flexget.entry.Entry`
:param fields: List of fields which are checked
"""
if fields is None:
fields = []
common_fields = []
for field in fields:
# Doesn't really make sense to match if field is not in both entries
if field not in e1 or field not in e2:
logger.trace('field {} is not in both entries', field)
continue
if not case_sensitive and isinstance(e1[field], str):
v1 = e1[field].lower()
else:
v1 = e1[field]
if not case_sensitive and isinstance(e1[field], str):
v2 = e2[field].lower()
else:
v2 = e2[field]
try:
if v1 == v2 or (not exact and (v2 in v1 or v1 in v2)):
common_fields.append(field)
else:
logger.trace('not matching')
except TypeError as e:
# argument of type <type> is not iterable
logger.trace('error matching fields: {}', str(e))
return common_fields
[docs]
@event('plugin.register')
def register_plugin():
plugin.register(CrossMatch, 'crossmatch', api_ver=2)