flexget.utils.cached_input module#

class flexget.utils.cached_input.InputCache(**kwargs)[source]#

Bases: VersionedBase

_sa_class_manager = {'added': <sqlalchemy.orm.attributes.InstrumentedAttribute object>, 'entries': <sqlalchemy.orm.attributes.InstrumentedAttribute object>, 'hash': <sqlalchemy.orm.attributes.InstrumentedAttribute object>, 'id': <sqlalchemy.orm.attributes.InstrumentedAttribute object>, 'name': <sqlalchemy.orm.attributes.InstrumentedAttribute object>}#
added#
entries#
hash#
id#
name#
class flexget.utils.cached_input.InputCacheEntry(**kwargs)[source]#

Bases: VersionedBase

_json#
_sa_class_manager = {'_json': <sqlalchemy.orm.attributes.InstrumentedAttribute object>, 'cache_id': <sqlalchemy.orm.attributes.InstrumentedAttribute object>, 'entry': <sqlalchemy.orm.attributes.create_proxied_attribute.<locals>.Proxy object>, 'id': <sqlalchemy.orm.attributes.InstrumentedAttribute object>}#
cache_id#
entry#
id#
class flexget.utils.cached_input.IterableCache(iterable, finished_hook=None)[source]#

Bases: object

Can cache any iterable (including generators) without immediately evaluating all entries.

If finished_hook is supplied, it will be called the first time the iterable is run to the end.

Parameters:
  • iterable (Iterable)

  • finished_hook (Callable[[list], None] | None)

cache: list#
class flexget.utils.cached_input.cached(name, persist=None)[source]#

Bases: object

Implements transparent caching decorator @cached for inputs.

Decorator has two parameters:

  • name in which the configuration is present in tasks configuration.

  • key in which the configuration has the cached resource identifier (ie. url). If the key is not given or present in the configuration :name: is expected to be a cache name (ie. url)

Note

Configuration assumptions may make this unusable in some (future) inputs

Parameters:
  • name (str)

  • persist (str | None)

load_from_db(load_expired=False)[source]#
Parameters:

load_expired (bool)

Return type:

list[InputCacheEntry] | None

store_to_db(entries)[source]#
Parameters:

entries (list[str])

cache = TimedDict({})#
persist: timedelta | None#
flexget.utils.cached_input.db_cleanup(manager, session)[source]#

Remove old input caches from plugins that are no longer configured.

Parameters:

session (DBSession)

Return type:

None