Source code for flexget.plugin

from __future__ import annotations

import os
import re
import time
from functools import total_ordering
from http.client import BadStatusLine
from importlib import import_module
from importlib.metadata import entry_points
from pathlib import Path
from typing import TYPE_CHECKING
from urllib.error import HTTPError, URLError

import loguru
from requests import RequestException

from flexget import components as components_pkg
from flexget import config_schema
from flexget import plugins as plugins_pkg
from flexget.event import add_event_handler as add_phase_handler
from flexget.event import event, fire_event, remove_event_handlers

if TYPE_CHECKING:
    from collections.abc import Callable, Iterable

    from flexget.event import Event

logger = loguru.logger.bind(name='plugin')

PRIORITY_DEFAULT = 128
PRIORITY_LAST = -255
PRIORITY_FIRST = 255


[docs] class DependencyError(Exception): """Plugin depends on other plugin, but it cannot be loaded. Args: issued_by: name of the plugin trying to do the import missing: name of the plugin or library that is missing message: customized user readable error message All args are optional. """ def __init__( self, issued_by: str | None = None, missing: str | None = None, message: str | None = None, silent: bool = False, ): super().__init__() self.issued_by = issued_by self.missing = missing self._message = message self.silent = silent
[docs] def _get_message(self) -> str: if self._message: return self._message return f'Plugin `{self.issued_by}` requires dependency `{self.missing}`'
[docs] def _set_message(self, message: str) -> None: self._message = message
[docs] def has_message(self) -> bool: return self._message is not None
message = property(_get_message, _set_message) def __str__(self) -> str: return f'<DependencyError(issued_by={self.issued_by!r},missing={self.missing!r},message={self.message!r},silent={self.silent!r})>'
[docs] class RegisterException(Exception): def __init__(self, value): super().__init__() self.value = value def __str__(self): return repr(self.value)
[docs] class PluginWarning(Warning): def __init__(self, value, logger: loguru.Logger = logger, **kwargs): super().__init__() self.value = value self.logger = logger self.kwargs = kwargs def __str__(self): return self.value
[docs] class PluginError(Exception): def __init__(self, value, logger: loguru.Logger = logger, **kwargs): super().__init__() # Value is expected to be a string if not isinstance(value, str): value = str(value) self.value = value self.logger = logger self.kwargs = kwargs def __str__(self): return self.value
# TODO: move to utils or somewhere more appropriate
[docs] class internet: # noqa: N801 It acts like a function in usage """@internet decorator for plugin phase methods. Catches all internet related exceptions and raises PluginError with relevant message. Task handles PluginErrors by aborting the task. """ def __init__(self, logger_: loguru.Logger = logger): if logger_: self.logger = logger_ else: self.logger = logger.bind(name='@internet') def __call__(self, func: Callable) -> Callable: def wrapped_func(*args, **kwargs): try: return func(*args, **kwargs) except RequestException as e: logger.opt(exception=True).debug( 'decorator caught RequestException. handled traceback:' ) raise PluginError(f'RequestException: {e}') except HTTPError as e: raise PluginError(f'HTTPError {e.code}', self.logger) except URLError as e: logger.opt(exception=True).debug('decorator caught urlerror. handled traceback:') raise PluginError(f'URLError {e.reason}', self.logger) except BadStatusLine: logger.opt(exception=True).debug( 'decorator caught badstatusline. handled traceback:' ) raise PluginError('Got BadStatusLine', self.logger) except ValueError as e: logger.opt(exception=True).debug('decorator caught ValueError. handled traceback:') raise PluginError(e) except OSError as e: logger.opt(exception=True).debug('decorator caught OSError. handled traceback:') if hasattr(e, 'reason'): raise PluginError(f'Failed to reach server. Reason: {e.reason}', self.logger) if hasattr(e, 'code'): raise PluginError( f"The server couldn't fulfill the request. Error code: {e.code}", self.logger, ) raise PluginError(f'OSError when connecting to server: {e}', self.logger) return wrapped_func
[docs] def priority(value: int) -> Callable[[Callable], Callable]: """Priority decorator for phase methods.""" def decorator(target: Callable) -> Callable: target.priority = value return target return decorator
# task phases, in order of their execution; note that this can be extended by # registering new phases at runtime task_phases = [ 'prepare', 'start', 'input', 'metainfo', 'filter', 'download', 'modify', 'output', 'learn', 'exit', ] # map phase names to method names phase_methods = { # task 'abort': 'on_task_abort' # special; not a task phase that gets called normally } phase_methods.update((_phase, 'on_task_' + _phase) for _phase in task_phases) # DRY # Mapping of plugin name to PluginInfo instance (logical singletons) plugins: dict[str, PluginInfo] = {} # Loading done? plugins_loaded = False _loaded_plugins = {} _plugin_options = [] _new_phase_queue: dict[str, list[str | None]] = {}
[docs] def register_task_phase(name: str, before: str | None = None, after: str | None = None): """Add a new task phase to the available phases.""" if before and after: raise RegisterException('You can only give either before or after for a phase.') if not before and not after: raise RegisterException('You must specify either a before or after phase.') if name in task_phases or name in _new_phase_queue: raise RegisterException(f'Phase {name} already exists.') def add_phase(phase_name: str, before: str | None, after: str | None): if before is not None and before not in task_phases: return False if after is not None and after not in task_phases: return False # add method name to phase -> method lookup table phase_methods[phase_name] = 'on_task_' + phase_name # place phase in phase list if before is None and after is not None: task_phases.insert(task_phases.index(after) + 1, phase_name) if after is None and before is not None: task_phases.insert(task_phases.index(before), phase_name) return True # if can't add yet (dependencies) queue addition if not add_phase(name, before, after): _new_phase_queue[name] = [before, after] for phase_name, args in list(_new_phase_queue.items()): if add_phase(phase_name, *args): del _new_phase_queue[phase_name]
[docs] @total_ordering class PluginInfo(dict): """Allow accessing key/value pairs of this dictionary subclass via attributes. Also instantiate a plugin and initialize properties. """ # Counts duplicate registrations dupe_counter = 0 def __init__( self, plugin_class: type, name: str | None = None, interfaces: list[str] | None = None, builtin: bool = False, debug: bool = False, api_ver: int = 1, category: str | None = None, ) -> None: """Register a plugin. :param plugin_class: The plugin factory. :param string name: Name of the plugin (if not given, default to factory class name in underscore form). :param list interfaces: Interfaces this plugin implements. :param bool builtin: Auto-activated? :param bool debug: True if plugin is for debugging purposes. :param int api_ver: Signature of callback hooks (1=task; 2=task,config). :param string category: The type of plugin. Can be one of the task phases. Defaults to the package name containing the plugin. """ dict.__init__(self) if interfaces is None: interfaces = ['task'] if name is None: # Convention is to take camel-case class name and rewrite it to an underscore form, # e.g. 'PluginName' to 'plugin_name' name = re.sub( '[A-Z]+', lambda i: '_' + i.group(0).lower(), plugin_class.__name__ ).lstrip('_') if category is None and plugin_class.__module__.startswith('flexget.plugins'): # By default look at the containing package of the plugin. category = plugin_class.__module__.split('.')[-2] # Check for unsupported api versions if api_ver < 2: raise PluginError(f'Api versions <2 are no longer supported. Plugin {name}') # Set basic info attributes self.api_ver = api_ver self.name = name self.interfaces = interfaces self.builtin = builtin self.debug = debug self.category = category self.phase_handlers: dict[str, Event] = {} self.schema: config_schema.JsonSchema = {} self.schema_id: str | None = None self.plugin_class: type = plugin_class self.instance: object = None if self.name in plugins: PluginInfo.dupe_counter += 1 logger.critical( 'Error while registering plugin {}. A plugin with the same name is already registered', self.name, ) else: plugins[self.name] = self
[docs] def initialize(self) -> None: if self.instance is not None: # We already initialized return # Create plugin instance self.instance = self.plugin_class() self.instance.plugin_info = self # give plugin easy access to its own info self.instance.logger = logger.bind( name=getattr(self.instance, 'LOGGER_NAME', None) or self.name ) if hasattr(self.instance, 'schema'): self.schema = self.instance.schema elif hasattr(self.instance, 'validator'): self.schema = self.instance.validator().schema() else: # TODO: I think plugins without schemas should not be allowed in config, maybe rethink this self.schema = {} if self.schema is not None: self.schema_id = f'/schema/plugin/{self.name}' config_schema.register_schema(self.schema_id, self.schema) self.build_phase_handlers()
[docs] def build_phase_handlers(self) -> None: """(Re)build phase_handlers in this plugin.""" for phase, method_name in phase_methods.items(): if phase in self.phase_handlers: continue if hasattr(self.instance, method_name): method = getattr(self.instance, method_name) if not callable(method): continue # check for priority decorator handler_prio = method.priority if hasattr(method, 'priority') else PRIORITY_DEFAULT event = add_phase_handler(f'plugin.{self.name}.{phase}', method, handler_prio) # provides backwards compatibility event.plugin = self self.phase_handlers[phase] = event
def __getattr__(self, attr: str): if attr in self: return self[attr] return dict.__getattribute__(self, attr) def __setattr__(self, attr: str, value): self[attr] = value def __str__(self): return f'<PluginInfo(name={self.name})>'
[docs] def _is_valid_operand(self, other): return hasattr(other, 'name')
def __eq__(self, other): return self.name == other.name def __lt__(self, other): return self.name < other.name def __hash__(self): return hash(self.name) __repr__ = __str__
register = PluginInfo
[docs] def _get_standard_plugins_path() -> list[Path]: """Return list of directories where traditional plugins should be tried to load from.""" # Get basic path from environment paths = [] env_path = os.environ.get('FLEXGET_PLUGIN_PATH') if env_path: paths = [Path(path) for path in env_path.split(os.pathsep)] # Add flexget.plugins directory (core plugins) paths.append(Path(plugins_pkg.__file__).parent.resolve()) return paths
[docs] def _get_standard_components_path() -> list[Path]: """Return list of directories where component plugins should be tried to load from.""" # Get basic path from environment paths = [] env_path = os.environ.get('FLEXGET_COMPONENT_PATH') if env_path: paths = [Path(path) for path in env_path.split(os.pathsep)] # Add flexget.plugins directory (core plugins) paths.append(Path(components_pkg.__file__).parent.resolve()) return paths
[docs] def _check_phase_queue() -> None: if _new_phase_queue: for phase, args in _new_phase_queue.items(): logger.error( 'Plugin {} requested new phase {}, but it could not be created at requested point (before, after). ' 'Plugin is not working properly.', args[0], phase, )
[docs] def _import_plugin(module_name: str, plugin_path: str | Path) -> None: try: import_module(module_name) except DependencyError as e: if e.has_message(): msg = e.message else: msg = 'Plugin `{}` requires plugin `{}` to load.'.format( e.issued_by or module_name, e.missing or 'N/A', ) if not e.silent: logger.warning(msg) else: logger.debug(msg) except ImportError: logger.opt(exception=True).critical( 'Plugin `{}` failed to import dependencies', module_name ) except ValueError as e: # Debugging #2755 logger.error( 'ValueError attempting to import `{}` (from {}): {}', module_name, plugin_path, e ) except Exception: logger.opt(exception=True).critical('Exception while loading plugin {}', module_name) raise else: logger.trace('Loaded module {} from {}', module_name, plugin_path)
[docs] def _load_plugins_from_dirs(dirs: list[Path]) -> None: """:param list dirs: Directories from where plugins are loaded from""" logger.debug('Trying to load plugins from: {}', dirs) dir_paths = [d for d in dirs if d.is_dir()] # add all dirs to plugins_pkg load path so that imports work properly from any of the plugin dirs plugins_pkg.__path__ = [str(d) for d in dir_paths] for plugins_dir in dir_paths: for plugin_path in plugins_dir.glob('**/*.py'): if plugin_path.name == '__init__.py': continue # Split the relative path from the plugins dir to current file's parent dir to find subpackage names plugin_subpackages = [ _f for _f in plugin_path.relative_to(plugins_dir).parent.parts if _f ] module_name = '.'.join([plugins_pkg.__name__, *plugin_subpackages, plugin_path.stem]) _import_plugin(module_name, plugin_path) _check_phase_queue()
# TODO: this is now identical to _load_plugins_from_dirs, REMOVE
[docs] def _load_components_from_dirs(dirs: list[Path]) -> None: """:param list dirs: Directories where plugin components are loaded from""" logger.debug('Trying to load components from: {}', dirs) dir_paths = [d for d in dirs if d.is_dir()] for component_dir in dir_paths: for component_path in component_dir.glob('**/*.py'): if component_path.name == '__init__.py': continue # Split the relative path from the plugins dir to current file's parent dir to find subpackage names plugin_subpackages = [ _f for _f in component_path.relative_to(component_dir).parent.parts if _f ] package_name = '.'.join([ components_pkg.__name__, *plugin_subpackages, component_path.stem, ]) _import_plugin(package_name, component_path) _check_phase_queue()
[docs] def _load_plugins_from_packages() -> None: """Load plugins installed via PIP.""" for entrypoint in entry_points(group='FlexGet.plugins'): try: plugin_module = entrypoint.load() except DependencyError as e: if e.has_message(): msg = e.message else: msg = ( 'Plugin `%s` requires `%s` to load.', e.issued_by or entrypoint.module_name, e.missing or 'N/A', ) if not e.silent: logger.warning(msg) else: logger.debug(msg) except ImportError: logger.opt(exception=True).critical( 'Plugin `{}` failed to import dependencies', entrypoint.module_name ) except Exception: logger.opt(exception=True).critical( 'Exception while loading plugin {}', entrypoint.module_name ) raise else: logger.trace( 'Loaded packaged module {} from {}', entrypoint.module, plugin_module.__file__ ) _check_phase_queue()
[docs] def load_plugins( extra_plugins: list[Path] | None = None, extra_components: list[Path] | None = None ) -> None: """Load plugins from the standard plugin and component paths. :param list extra_plugins: Extra directories from where plugins are loaded. :param list extra_components: Extra directories from where components are loaded. """ global plugins_loaded if extra_plugins is None: extra_plugins = [] if extra_components is None: extra_components = [] # Add flexget.plugins and flexget.components directories (core dist) extra_plugins.extend(_get_standard_plugins_path()) extra_components.extend(_get_standard_components_path()) start_time = time.time() # Import all the plugins _load_plugins_from_dirs(extra_plugins) _load_components_from_dirs(extra_components) _load_plugins_from_packages() # Register them fire_event('plugin.register') # Plugins should only be registered once, remove their handlers after remove_event_handlers('plugin.register') # After they have all been registered, instantiate them for plugin in list(plugins.values()): plugin.initialize() took = time.time() - start_time plugins_loaded = True logger.debug( 'Plugins took {:.2f} seconds to load. {} plugins in registry.', took, len(plugins.keys()) )
[docs] def get_plugins( phase: str | None = None, interface: str | None = None, category: str | None = None, name: str | None = None, min_api: int | None = None, ) -> Iterable[PluginInfo]: """Query other plugins characteristics. :param string phase: Require phase :param string interface: Plugin must implement this interface. :param string category: Type of plugin, phase names. :param string name: Name of the plugin. :param int min_api: Minimum api version. :return: List of PluginInfo instances. """ def matches(plugin): if phase is not None and phase not in phase_methods: raise ValueError(f'Unknown phase {phase}') if phase and phase not in plugin.phase_handlers: return False if interface and interface not in plugin.interfaces: return False if category and category != plugin.category: return False if name is not None and name != plugin.name: return False return not (min_api is not None and plugin.api_ver < min_api) return filter(matches, iter(plugins.values()))
[docs] def plugin_schemas(**kwargs) -> config_schema.JsonSchema: """Create a dict schema that matches plugins specified by `kwargs`.""" return { 'type': 'object', 'properties': {p.name: {'$ref': p.schema_id} for p in get_plugins(**kwargs)}, 'additionalProperties': False, 'error_additionalProperties': '{{message}} Only known plugin names are valid keys.', 'patternProperties': {'^_': {'title': 'Disabled Plugin'}}, }
[docs] @event('config.register') def register_schema(): config_schema.register_schema('/schema/plugins', plugin_schemas)
[docs] def get_phases_by_plugin(name: str) -> list[str]: """Return all phases plugin :name: hooks.""" return list(get_plugin_by_name(name).phase_handlers)
[docs] def get_plugin_by_name(name: str, issued_by: str = '???') -> PluginInfo: """Get plugin by name, preferred way since this structure may be changed at some point. Getting plugin via `.get` function is recommended for normal use. This results much shorter and cleaner code:: plugin.get_plugin_by_name('parsing').instance.parse_movie(data=entry['title']) Shortens into:: plugin.get('parsing', self).parse_movie(data=entry['title']) This function is still useful if you need to access plugin information (PluginInfo). :returns PluginInfo instance """ if name not in plugins: raise DependencyError(issued_by=issued_by, missing=name) return plugins[name]
[docs] def get(name: str, requested_by: str | object) -> object: """Return instance of Plugin class. :param str name: Name of the requested plugin :param requested_by: Plugin class instance OR string value who is making the request. """ if name not in plugins: if hasattr(requested_by, 'plugin_info'): who = requested_by.plugin_info.name else: who = requested_by raise DependencyError(issued_by=who, missing=name) instance = plugins[name].instance if instance is None: raise RuntimeError('Plugin referred before system initialized?') return instance