Source code for flexget.entry

from __future__ import annotations

import functools
import types
import warnings
from datetime import date, datetime
from enum import Enum
from pathlib import Path
from typing import TYPE_CHECKING

import pendulum
from loguru import logger

from flexget import plugin
from flexget.utils.lazy_dict import LazyDict, LazyLookup
from flexget.utils.serialization import Serializer, deserialize, serialize
from flexget.utils.template import CoercingDateTime, FlexGetTemplate, render_from_entry

if TYPE_CHECKING:
    from collections.abc import Callable, Iterable, Mapping, Sequence

logger = logger.bind(name='entry')


[docs] class EntryState(Enum): ACCEPTED = 'accepted' REJECTED = 'rejected' FAILED = 'failed' UNDECIDED = 'undecided' @property def color(self) -> str: return { EntryState.ACCEPTED: 'green', EntryState.REJECTED: 'red', EntryState.FAILED: 'RED', EntryState.UNDECIDED: 'dim', }[self] @property def log_markup(self) -> str: return f'<{self.color}>{self.value.upper()}</>' # __str__, __eq__, and __hash__ are make these states almost interchangeable with the old plain string states def __str__(self) -> str: return self.value def __eq__(self, other) -> bool: if isinstance(other, str): return self.value == other return super().__eq__(other) def __hash__(self) -> int: return hash(self.value)
[docs] class EntryUnicodeError(Exception): """Thrown when trying to set non-unicode compatible field value to entry.""" def __init__(self, key: str, value) -> None: self.key = key self.value = value def __str__(self): return f'Entry strings must be unicode: {self.key} ({self.value!r})'
[docs] class Entry(LazyDict, Serializer): """Represents one item in task. Must have `url` and *title* fields. Stores automatically *original_url* key, which is necessary because plugins (eg. urlrewriters) may change *url* into something else and otherwise that information would be lost. Entry will also transparently convert all ascii strings into unicode and raises :class:`EntryUnicodeError` if conversion fails on any value being set. Such failures are caught by :class:`~flexget.task.Task` and trigger :meth:`~flexget.task.Task.abort`. """ def __init__(self, *args, **kwargs): super().__init__() self.traces = [] self._state = EntryState.UNDECIDED self._hooks = {'accept': [], 'reject': [], 'fail': [], 'complete': []} self.task = None self.lazy_lookups = [] if len(args) == 2: kwargs['title'] = args[0] kwargs['url'] = args[1] args = [] # Make sure constructor does not escape our __setitem__ enforcement self.update(*args, **kwargs)
[docs] def trace( self, message: str | None, operation: str | None = None, plugin: str | None = None, ) -> None: """Add trace message to the entry which should contain useful information about why plugin did not operate on entry. Accept and Reject messages are added to trace automatically. :param string message: Message to add into entry trace. :param string operation: None, reject, accept or fail :param plugin: Uses task.current_plugin by default, pass value to override """ if operation not in (None, 'accept', 'reject', 'fail'): raise ValueError(f'Unknown operation {operation}') item = (plugin, operation, message) if item not in self.traces: self.traces.append(item)
[docs] def run_hooks(self, action: str, **kwargs) -> None: """Run hooks that have been registered for given ``action``. :param action: Name of action to run hooks for :param kwargs: Keyword arguments that should be passed to the registered functions """ for func in self._hooks[action]: func(self, **kwargs)
[docs] def add_hook(self, action: str, func: Callable, **kwargs) -> None: """Add a hook for ``action`` to this entry. :param string action: One of: 'accept', 'reject', 'fail', 'complete' :param func: Function to execute when event occurs :param kwargs: Keyword arguments that should be passed to ``func`` :raises: ValueError when given an invalid ``action`` """ try: self._hooks[action].append(functools.partial(func, **kwargs)) except KeyError: raise ValueError(f'`{action}` is not a valid entry action')
[docs] def on_accept(self, func: Callable, **kwargs) -> None: """Register a function to be called when this entry is accepted. :param func: The function to call :param kwargs: Keyword arguments that should be passed to the registered function """ self.add_hook('accept', func, **kwargs)
[docs] def on_reject(self, func: Callable, **kwargs) -> None: """Register a function to be called when this entry is rejected. :param func: The function to call :param kwargs: Keyword arguments that should be passed to the registered function """ self.add_hook('reject', func, **kwargs)
[docs] def on_fail(self, func: Callable, **kwargs) -> None: """Register a function to be called when this entry is failed. :param func: The function to call :param kwargs: Keyword arguments that should be passed to the registered function """ self.add_hook('fail', func, **kwargs)
[docs] def on_complete(self, func: Callable, **kwargs) -> None: """Register a function to be called when a :class:`Task` has finished processing this entry. :param func: The function to call :param kwargs: Keyword arguments that should be passed to the registered function """ self.add_hook('complete', func, **kwargs)
[docs] def accept(self, reason: str | None = None, **kwargs) -> None: if self.rejected: logger.debug('tried to accept rejected {!r}', self) elif not self.accepted: self._state = EntryState.ACCEPTED self.trace(reason, operation='accept') # Run entry on_accept hooks self.run_hooks('accept', reason=reason, **kwargs)
[docs] def reject(self, reason: str | None = None, **kwargs) -> None: # ignore rejections on immortal entries if self.get('immortal'): reason_str = f'({reason})' if reason else '' logger.info('Tried to reject immortal {} {}', self['title'], reason_str) self.trace(f'Tried to reject immortal {reason_str}') return if not self.rejected: self._state = EntryState.REJECTED self.trace(reason, operation='reject') # Run entry on_reject hooks self.run_hooks('reject', reason=reason, **kwargs)
[docs] def fail(self, reason: str | None = None, **kwargs): logger.debug("Marking entry '{}' as failed", self['title']) if not self.failed: self._state = EntryState.FAILED self.trace(reason, operation='fail') logger.error('Failed {} ({})', self['title'], reason) # Run entry on_fail hooks self.run_hooks('fail', reason=reason, **kwargs)
[docs] def complete(self, **kwargs): # Run entry on_complete hooks self.run_hooks('complete', **kwargs)
@property def state(self) -> EntryState: return self._state @property def accepted(self) -> bool: return self._state == EntryState.ACCEPTED @property def rejected(self) -> bool: return self._state == EntryState.REJECTED @property def failed(self) -> bool: return self._state == EntryState.FAILED @property def undecided(self) -> bool: return self._state == EntryState.UNDECIDED def __setitem__(self, key, value): # Enforce unicode compatibility. if isinstance(value, bytes): raise EntryUnicodeError(key, value) # Coerce any enriched strings (such as those returned by BeautifulSoup) to plain strings to avoid serialization # troubles. if isinstance(value, str) and type(value) is not str: value = str(value) # Turn datetimes and dates into their pendulum versions elif isinstance(value, datetime): value = CoercingDateTime.instance(value) if not value.tzinfo: logger.warning( '{} was set to a naive datetime. Plugin should be updated to provide a timezone aware datetime', key, ) elif isinstance(value, date): value = pendulum.instance(value) # url and original_url handling if key == 'url': if not isinstance(value, (str, LazyLookup)): raise plugin.PluginError( 'Tried to set {!r} url to {!r}'.format(self.get('title'), value) ) self.setdefault('original_url', value) # title handling if key == 'title': if not isinstance(value, (str, LazyLookup)): raise plugin.PluginError(f'Tried to set title to {value!r}') self.setdefault('original_title', value) # location handling if key == 'location' and isinstance(value, str) and value != '': value = Path(value) try: logger.trace('ENTRY SET: {} = {!r}', key, value) except Exception as e: logger.debug('trying to debug key `{}` value threw exception: {}', key, e) super().__setitem__(key, value)
[docs] def safe_str(self) -> str: return f'{self["title"]} | {self["url"]}'
# TODO: this is too manual, maybe we should somehow check this internally and throw some exception if # application is trying to operate on invalid entry
[docs] def isvalid(self) -> bool: """Return True if entry is valid. Return False if this cannot be used.""" if 'title' not in self: return False if 'url' not in self: return False if not isinstance(self['url'], str): return False return isinstance(self['title'], str)
[docs] def update_using_map( self, field_map: dict, source_item: dict | object, ignore_none: bool = False ): """Populate entry fields from a source object using a dictionary that maps from entry field names to attributes (or keys) in the source object. :param dict field_map: A dictionary mapping entry field names to the attribute in source_item (or keys, if source_item is a dict)(nested attributes/dicts are also supported, separated by a dot,) or a function that takes source_item as an argument :param source_item: Source of information to be used by the map :param ignore_none: Ignore any None values, do not record it to the Entry """ if source_item is None: logger.debug('source_item is None, skipping update_using_map') return func = dict.get if isinstance(source_item, dict) else getattr for field, value in field_map.items(): if isinstance(value, str): v = functools.reduce(func, value.split('.'), source_item) else: v = value(source_item) if ignore_none and v is None: continue self[field] = v
[docs] def render(self, template: str | FlexGetTemplate, native: bool = False) -> str: """Render a template string based on fields in the entry. :param template: A template string or FlexGetTemplate that uses jinja2 or python string replacement format. :param native: If True, and the rendering result can be all native python types, not just strings. :return: The result of the rendering. :raises RenderError: If there is a problem. """ if not isinstance(template, (str, FlexGetTemplate)): raise TypeError( f'Trying to render non string template or unrecognized template format, got {template!r}' ) logger.trace('rendering: {}', template) return render_from_entry(template, self, native=native)
[docs] @classmethod def serialize(cls, entry: Entry) -> dict: fields = {} for key in entry: if key.startswith('_') or entry.is_lazy(key): continue try: fields[key] = serialize(entry[key]) except TypeError as exc: logger.debug('field {} was not serializable. {}', key, exc) lazy_lookups = [] for ll in entry.lazy_lookups: try: lazy_lookups.append(serialize(ll)) except TypeError: logger.exception( 'BUG: Lazy lookup was not compatible with serialization. Please file a bug report' ) return {'fields': fields, 'lazy_lookups': lazy_lookups}
[docs] @classmethod def deserialize(cls, data, version) -> Entry: result = cls() for key, value in data['fields'].items(): result[key] = deserialize(value) for lazy_lookup in deserialize(data['lazy_lookups']): result.add_lazy_fields(*lazy_lookup) return result
[docs] def add_lazy_fields( self, lazy_func: Callable[[Entry], None] | str, fields: Iterable[str], args: Sequence | None = None, kwargs: Mapping | None = None, ): """Add lazy fields to an entry. :param lazy_func: should be a function previously registered with the `register_lazy_func` decorator, or the name it was registered under. :param fields: list of fields this function will fill :param args: Arguments that will be passed to the lazy lookup function when called. :param kwargs: Keyword arguments which will be passed to the lazy lookup function when called. """ if not isinstance(lazy_func, str): lazy_func = getattr(lazy_func, 'lazy_func_id', None) if lazy_func not in lazy_func_registry: raise ValueError( 'Lazy lookup functions/methods must be registered with the `register_lazy_lookup` decorator' ) # Make sure fields is a plain list. If it is a dict, (or some other iterable) we may try to serialize # extra/unserializable stuff. fields = list(fields) func = lazy_func_registry[lazy_func] super().register_lazy_func(func.function, fields, args, kwargs) self.lazy_lookups.append((lazy_func, fields, args, kwargs))
[docs] def register_lazy_func(self, func, keys): """Do not use this anymore as it is DEPRECATED. Use `add_lazy_fields` instead. """ warnings.warn( '`register_lazy_func` is deprecated. `add_lazy_fields` should be used instead. ' 'This plugin should be updated to work with the latest versions of FlexGet', DeprecationWarning, stacklevel=2, ) super().register_lazy_func(func, keys, [], {})
def __eq__(self, other): return self.get('original_title') == other.get('original_title') and self.get( 'original_url' ) == other.get('original_url') def __hash__(self): return hash(self.get('original_title', '') + self.get('original_url', '')) def __repr__(self): return '<Entry(title={},state={})>'.format(self['title'], self._state)
lazy_func_registry = {}
[docs] class LazyFunc: def __init__(self, lazy_func_name: str): self.name = lazy_func_name self._func = None def __call__(self, func: Callable) -> Callable: if self.name in lazy_func_registry: raise RuntimeError( f'The name {self.name} is already registered to another lazy function.' ) func.lazy_func_id = self.name self._func = func lazy_func_registry[self.name] = self return func @property def function(self) -> Callable: if '.' in self._func.__qualname__: # This is a method of a plugin class, bind the function to the plugin instance plugin_class_name = self._func.__qualname__.split('.')[0] for p in plugin.plugins.values(): if p.plugin_class.__name__ == plugin_class_name: return types.MethodType(self._func, p.instance) raise TypeError( f'lazy lookups must be functions, or methods of a registered plugin class. {self._func!r} is not' ) return self._func
register_lazy_lookup = LazyFunc