Source code for flexget.plugins.filter.regexp

import re
from urllib.parse import unquote

from loguru import logger

from flexget import plugin
from flexget.config_schema import one_or_more
from flexget.entry import Entry
from flexget.event import event

logger = logger.bind(name='regexp')


[docs] class FilterRegexp: """All possible forms. Configuration options:: regexp: [operation]: # operation to perform on matches - [regexp] # simple regexp - [regexp]: <path> # override path - [regexp]: [path]: <path> # override path [not]: <regexp> # not match [from]: <field> # search from given entry field - [regexp]: [path]: <path> # override path [not]: # list of not match regexps - <regexp> [from]: # search only from these fields - <field> [operation]: - <regexp> [rest]: <operation> # non matching entries are [from]: # search only from these fields for all regexps - <field> Possible operations: accept, reject, accept_excluding, reject_excluding """ schema = { 'type': 'object', 'properties': { 'accept': {'$ref': '#/$defs/regex_list'}, 'reject': {'$ref': '#/$defs/regex_list'}, 'accept_excluding': {'$ref': '#/$defs/regex_list'}, 'reject_excluding': {'$ref': '#/$defs/regex_list'}, 'rest': {'type': 'string', 'enum': ['accept', 'reject']}, 'from': one_or_more({'type': 'string'}), }, 'additionalProperties': False, '$defs': { # The validator for a list of regexps, each with or without settings 'regex_list': { 'type': 'array', 'items': { 'oneOf': [ # Plain regex string {'type': 'string', 'format': 'regex'}, # Regex with options (regex is key, options are value) { 'type': 'object', 'additionalProperties': { 'oneOf': [ # Simple options, just path {'type': 'string', 'format': 'path'}, # Dict style options { 'type': 'object', 'properties': { 'path': {'type': 'string', 'format': 'path'}, 'set': {'type': 'object'}, 'not': one_or_more({ 'type': 'string', 'format': 'regex', }), 'from': one_or_more({'type': 'string'}), }, 'additionalProperties': False, }, ] }, }, ] }, } }, }
[docs] def prepare_config(self, config): """Return the config in standard format. All regexps are turned into dictionaries in the form of {compiled regexp: options} :param config: Dict that can optionally contain the following keys path: will be attached to entries that match set: a dict of values to be attached to entries that match via set plugin from: a list of fields in entry for the regexps to match against not: a list of compiled regexps that if matching, will disqualify the main match :return: New config dictionary """ out_config = {} if 'rest' in config: out_config['rest'] = config['rest'] # Turn all our regexps into advanced form dicts and compile them for operation, regexps in config.items(): if operation in ['rest', 'from']: continue for regexp_item in regexps: if not isinstance(regexp_item, dict): regexp = regexp_item regexp_item = {regexp: {}} regexp, opts = next(iter(regexp_item.items())) # Parse custom settings for this regexp, and we don't want to modify original config opts = {'path': opts} if not isinstance(opts, dict) else opts.copy() # advanced configuration if config.get('from'): opts.setdefault('from', config['from']) # Put plain strings into list form for `from` and `not` options if 'from' in opts and isinstance(opts['from'], str): opts['from'] = [opts['from']] if 'not' in opts and isinstance(opts['not'], str): opts['not'] = [opts['not']] # compile `not` option regexps if 'not' in opts: opts['not'] = [re.compile(not_re, re.IGNORECASE) for not_re in opts['not']] # compile regexp and make sure regexp is a string for series like '24' try: regexp = re.compile(str(regexp), re.IGNORECASE) except re.error as e: # Since validator can't validate dict keys (when an option is defined for the pattern) make sure we # raise a proper error here. raise plugin.PluginError(f'Invalid regex `{regexp}`: {e}') out_config.setdefault(operation, []).append({regexp: opts}) return out_config
[docs] @plugin.priority(172) def on_task_filter(self, task, config): # TODO: what if accept and accept_excluding configured? Should raise error ... config = self.prepare_config(config) # Keep track of all entries which have not matched any regexp rest = set(task.entries) for operation, regexps in config.items(): if operation == 'rest': continue matched = self.filter(task.entries, operation, regexps) # Remove any entries from rest which matched this regexp rest -= matched if 'rest' in config: rest_method = Entry.accept if config['rest'] == 'accept' else Entry.reject for entry in rest: logger.debug('Rest method {} for {}', config['rest'], entry['title']) rest_method(entry, 'regexp `rest`')
[docs] def matches(self, entry, regexp, find_from=None, not_regexps=None): """Check if :entry: has any string fields or strings in a list field that match :regexp:. :param entry: Entry instance :param regexp: Compiled regexp :param find_from: None or a list of fields to search from :param not_regexps: None or list of regexps that can NOT match :return: Field matching """ unquote_fields = ['url'] for field in find_from or ['title', 'description']: # Only evaluate lazy fields if find_from has been explicitly specified if not entry.get(field, eval_lazy=find_from): continue # Make all fields into lists for search purposes values = entry[field] if not isinstance(values, list): values = [values] for value in values: if not isinstance(value, str): value = str(value) if field in unquote_fields: value = unquote(value) # If none of the not_regexps match if regexp.search(value): # Make sure the not_regexps do not match for this field for not_regexp in not_regexps or []: if self.matches(entry, not_regexp, find_from=[field]): entry.trace(f'Configured not_regexp {not_regexp} matched, ignored') break else: # None of the not_regexps matched return field return None
[docs] def filter(self, entries, operation, regexps): """Return set of entries that matched regexps. :param entries: entries to filter :param operation: one of ``accept`` ``reject`` ``accept_excluding`` and ``reject_excluding`` accept and reject will be called on the entry if any of the regexps match ``*_excluding`` operations will be called if any of the regexps don't match :param regexps: list of {compiled_regexp: options} dictionaries """ matched = set() method = Entry.accept if 'accept' in operation else Entry.reject match_mode = 'excluding' not in operation for entry in entries: logger.trace('testing {} regexps to {}', len(regexps), entry['title']) for regexp_opts in regexps: regexp, opts = next(iter(regexp_opts.items())) # check if entry matches given regexp configuration field = self.matches(entry, regexp, opts.get('from'), opts.get('not')) # Run if we are in match mode and have a hit, or are in non-match mode and don't have a hit if match_mode == bool(field): # Creates the string with the reason for the hit matchtext = f"regexp '{regexp.pattern}' " + ( f"matched field '{field}'" if match_mode else "didn't match" ) logger.debug('{} for {}', matchtext, entry['title']) # apply settings to entry and run the method on it if opts.get('path'): entry['path'] = opts['path'] if opts.get('set'): # invoke set plugin with given configuration logger.debug( 'adding set: info to entry:"{}" {}', entry['title'], opts['set'] ) plugin.get('set', self).modify(entry, opts['set']) method(entry, matchtext) matched.add(entry) # We had a match so break out of the regexp loop. break else: # We didn't run method for any of the regexps, add this entry to rest entry.trace(f'None of configured {operation} regexps matched') return matched
[docs] @event('plugin.register') def register_plugin(): plugin.register(FilterRegexp, 'regexp', api_ver=2)