Source code for flexget.plugins.input.regexp_parse

import codecs
import os
import re

from loguru import logger

from flexget import plugin
from flexget.entry import Entry
from flexget.event import event
from flexget.utils.cached_input import cached

logger = logger.bind(name='regexp_parse')


[docs] class RegexpParse: r"""Designed to take input from a web resource or a file. It then parses the text via regexps supplied in the config file. source: is a file or url to get the data from. You can specify a username:password sections: Takes a list of dicts that contain regexps to split the data up into sections. The regexps listed here are used by find all so every matching string in the data will be a valid section. keys: hold the keys that will be set in the entries key: regexps: a list of dicts that hold regexps. The key is set to the first string that matches any of the regexps listed. The regexps are evaluated in the order they are supplied so if a string matches the first regexp none of the others in the list will be used. required: a boolean that when set to true will only allow entries that contain this key onto the next stage. url and title are always required no matter what you do (part of flexget) #TODO: consider adding a set field that will allow you to set the field if no regexps match #TODO: consider a mode field that allows a growing list for a field instead of just setting to # first match Example config :: regexp_parse: source: http://username:password@ezrss.it/feed/ encoding: "utf-8" sections: - {regexp: "(?<=<item>).*?(?=</item>)", flags: "DOTALL,IGNORECASE"} keys: title: regexps: - {regexp: '(?<=<title><!\[CDATA\[).*?(?=\]\]></title>)'} #comment url: regexps: - {regexp: "magnet:.*?(?=])"} custom_field: regexps: - {regexp: "custom regexps", flags: "comma separated list of flags (see python regex docs)"} required: False custom_field2: regexps: - {regexp: 'first custom regexps'} - {regexp: 'can't find first regexp so try this one'} """ # dict used to convert string values of regexp flags to int FLAG_VALUES = { 'DEBUG': re.DEBUG, 'I': re.IGNORECASE, 'IGNORECASE': re.IGNORECASE, 'L': re.LOCALE, 'LOCALE': re.LOCALE, 'M': re.MULTILINE, 'MULTILINE': re.MULTILINE, 'S': re.DOTALL, 'DOTALL': re.DOTALL, 'U': re.UNICODE, 'UNICODE': re.UNICODE, 'X': re.VERBOSE, 'VERBOSE': re.VERBOSE, } FLAG_REGEX = r'^(\s?({})\s?(,|$))+$'.format('|'.join(FLAG_VALUES)) schema = { '$defs': { 'regex_list': { 'type': 'array', 'items': { 'type': 'object', 'properties': { 'regexp': {'type': 'string', 'format': 'regex'}, 'flags': { 'type': 'string', 'pattern': FLAG_REGEX, 'error_pattern': 'Must be a comma separated list of flags. See python regex docs.', }, }, 'required': ['regexp'], 'additionalProperties': False, }, } }, 'type': 'object', 'properties': { 'source': { 'anyOf': [ {'type': 'string', 'format': 'url'}, {'type': 'string', 'format': 'file'}, ] }, 'encoding': {'type': 'string'}, 'sections': {'$ref': '#/$defs/regex_list'}, 'keys': { 'type': 'object', 'additionalProperties': { 'type': 'object', 'properties': { 'required': {'type': 'boolean'}, 'regexps': {'$ref': '#/$defs/regex_list'}, }, 'required': ['regexps'], 'additionalProperties': False, }, 'required': ['title', 'url'], }, }, 'required': ['source', 'keys'], 'additionalProperties': False, } def __init__(self): self.required = []
[docs] def flagstr_to_flags(self, flag_str): """Turn a comma separated list of flags into the int value.""" combined_flags = 0 split_flags = flag_str.split(',') for flag in split_flags: combined_flags = combined_flags | RegexpParse.FLAG_VALUES[flag.strip()] return combined_flags
[docs] def compile_regexp_dict_list(self, re_list): """Turn a list of dicts containing regexps information into a list of compiled regexps.""" compiled_regexps = [] for dic in re_list: flags = 0 if 'flags' in dic: flags = self.flagstr_to_flags(dic['flags']) compiled_regexps.append(re.compile(dic['regexp'], flags)) return compiled_regexps
[docs] def isvalid(self, entry): """Check to make sure that all required fields are present in the entry.""" for key in self.required: if key not in entry: return False return entry.isvalid()
@cached('regexp_parse') @plugin.internet(logger) def on_task_input(self, task, config): url = config['source'] encoding = config.get('encoding') # if it's a file open it and read into content (assume utf-8 encoding) if os.path.isfile(os.path.expanduser(url)): with codecs.open(url, 'r', encoding=encoding or 'utf-8') as u: content = u.read() # else use requests to get the data else: resp = task.requests.get(url) if encoding: resp.encoding = encoding content = resp.text sections = [] separators = config.get('sections') if separators: for sep in separators: flags = 0 if 'flags' in sep: flags = self.flagstr_to_flags(sep['flags']) sections.extend(re.findall(sep['regexp'], content, flags)) # no separators just do work on the whole content else: sections.append(content) # holds all the regex in a dict for the field they are trying to fill key_to_regexps = {} # put every key in keys into the rey_to_regexps list for key, value in config['keys'].items(): key_to_regexps[key] = self.compile_regexp_dict_list(value['regexps']) if value.get('required'): self.required.append(key) entries = [] for section in sections: entry = Entry() for key, regexps in key_to_regexps.items(): for regexp in regexps: m = regexp.search(section) if m: entry[key] = m.group(0) break if self.isvalid(entry): entries.append(entry) return entries
[docs] @event('plugin.register') def register_plugin(): plugin.register(RegexpParse, 'regexp_parse', api_ver=2)