Source code for flexget.api.core.tasks

from __future__ import annotations

import argparse
import copy
from datetime import datetime, timedelta
from json import JSONEncoder
from queue import Empty, Queue
from typing import TYPE_CHECKING, Any

from flask import Response, jsonify, request
from flask_restx import inputs

from flexget.api import APIResource, api
from flexget.api.app import (
    APIError,
    BadRequest,
    Conflict,
    NotFoundError,
    base_message_schema,
    etag,
    success_response,
)
from flexget.config_schema import process_config
from flexget.entry import Entry
from flexget.event import event
from flexget.log import capture_logs
from flexget.options import get_parser
from flexget.task import task_phases
from flexget.terminal import capture_console
from flexget.utils import json, requests
from flexget.utils.lazy_dict import LazyLookup
from flexget.utils.requests import parse_header

if TYPE_CHECKING:
    from sqlalchemy.orm import Session

# Tasks API
tasks_api = api.namespace('tasks', description='Manage Tasks')


[docs] class ObjectsContainer: tasks_list_object = { 'oneOf': [ {'type': 'array', 'items': {'$ref': '#/definitions/tasks.task'}}, {'type': 'array', 'items': {'type': 'string'}}, ] } task_input_object = { 'type': 'object', 'properties': {'name': {'type': 'string'}, 'config': {'$ref': '/schema/plugins'}}, 'required': ['name', 'config'], 'additionalProperties': False, } task_return_object = copy.deepcopy(task_input_object) task_return_object['properties']['config'] = {'type': 'object'} task_queue_schema = { 'type': 'array', 'items': { 'type': 'object', 'properties': { 'id': {'type': 'integer'}, 'name': {'type': 'string'}, 'current_phase': {'type': ['string', 'null']}, 'current_plugin': {'type': ['string', 'null']}, }, }, } task_execution_results_schema = { 'type': 'object', 'properties': { 'task': { 'type': 'object', 'properties': { 'id': {'type': 'integer'}, 'name': {'type': 'string'}, 'stream': { 'type': 'array', 'items': { 'progress': { 'type': 'object', 'properties': { 'status': { 'type': 'string', 'enum': ['pending', 'running', 'complete'], }, 'phase': {'type': 'string', 'enum': task_phases}, 'plugin': {'type': 'string'}, 'percent': {'type': 'float'}, }, }, 'summary': { 'type': 'object', 'properties': { 'accepted': {'type': 'integer'}, 'rejected': {'type': 'integer'}, 'failed': {'type': 'integer'}, 'undecided': {'type': 'integer'}, 'aborted': {'type': 'boolean'}, 'abort_reason': {'type': 'string'}, }, }, 'entry_dump': {'type': 'array', 'items': {'type': 'object'}}, 'log': {'type': 'string'}, }, }, }, } }, } inject_input = { 'type': 'object', 'properties': { 'title': { 'type': 'string', 'description': 'Title of the entry. If not supplied it will be attempted to retrieve it from ' 'URL headers', }, 'url': {'type': 'string', 'format': 'url', 'description': 'URL of the entry'}, 'force': { 'type': 'boolean', 'description': 'Prevent any plugins from rejecting this entry', }, 'accept': { 'type': 'boolean', 'description': 'Accept this entry immediately upon injection (disregard task filters)', }, 'fields': { 'type': 'object', 'description': 'A array of objects that can contain any other value for the entry', }, }, 'required': ['url'], } task_execution_input = { 'type': 'object', 'properties': { 'tasks': { 'type': 'array', 'items': {'type': 'string'}, 'minItems': 1, 'uniqueItems': True, }, 'progress': { 'type': 'boolean', 'default': True, 'description': 'Include task progress updates', }, 'summary': {'type': 'boolean', 'default': True, 'description': 'Include task summary'}, 'entry_dump': { 'type': 'boolean', 'default': True, 'description': 'Include dump of entries including fields', }, 'inject': { 'type': 'array', 'items': inject_input, 'description': 'A List of entry objects', }, 'loglevel': { 'type': 'string', 'description': 'Specify log level', 'enum': ['critical', 'error', 'warning', 'info', 'verbose', 'debug', 'trace'], }, }, 'required': ['tasks'], } params_return_schema = {'type': 'array', 'items': {'type': 'object'}}
tasks_list_schema = api.schema_model('tasks.list', ObjectsContainer.tasks_list_object) task_input_schema = api.schema_model('tasks.task', ObjectsContainer.task_input_object) task_return_schema = api.schema_model('tasks.task', ObjectsContainer.task_return_object) task_api_queue_schema = api.schema_model('task.queue', ObjectsContainer.task_queue_schema) task_api_execute_schema = api.schema_model( 'task.execution', ObjectsContainer.task_execution_results_schema ) task_execution_schema = api.schema_model( 'task_execution_input', ObjectsContainer.task_execution_input ) task_execution_params = api.schema_model( 'tasks.execution_params', ObjectsContainer.params_return_schema ) task_api_desc = ( 'Task config schema too large to display, you can view the schema using the schema API' ) tasks_parser = api.parser() tasks_parser.add_argument( 'include_config', type=inputs.boolean, default=True, help='Include task config' )
[docs] @tasks_api.route('/') @api.doc(description=task_api_desc) class TasksAPI(APIResource):
[docs] @etag @api.response(200, model=tasks_list_schema) @api.doc(expect=[tasks_parser]) def get(self, session: Session = None) -> Response: """List all tasks.""" active_tasks = { task: task_data for task, task_data in self.manager.user_config.get('tasks', {}).items() if not task.startswith('_') } args = tasks_parser.parse_args() if not args.get('include_config'): return jsonify(list(active_tasks)) tasks = [{'name': name, 'config': config} for name, config in active_tasks.items()] return jsonify(tasks)
[docs] @api.validate(task_input_schema, description='New task object') @api.response(201, description='Newly created task', model=task_return_schema) @api.response(Conflict) @api.response(APIError) def post(self, session: Session = None) -> Response: """Add new task.""" data = request.json task_name = data['name'] if task_name in self.manager.user_config.get('tasks', {}): raise Conflict('task already exists') if 'tasks' not in self.manager.user_config: self.manager.user_config['tasks'] = {} if 'tasks' not in self.manager.config: self.manager.config['tasks'] = {} task_schema_processed = copy.deepcopy(data) errors = process_config( task_schema_processed, schema=task_input_schema.__schema__, set_defaults=True ) if errors: raise APIError('problem loading config, raise a BUG as this should not happen!') self.manager.user_config['tasks'][task_name] = data['config'] self.manager.config['tasks'][task_name] = task_schema_processed['config'] self.manager.save_config() self.manager.config_changed() rsp = jsonify({'name': task_name, 'config': self.manager.user_config['tasks'][task_name]}) rsp.status_code = 201 return rsp
[docs] @tasks_api.route('/<task>/') @api.doc(params={'task': 'task name'}, description=task_api_desc) @api.response(APIError, description='unable to read config') class TaskAPI(APIResource):
[docs] @etag @api.response(200, model=task_return_schema) @api.response(NotFoundError, description='task not found') def get(self, task, session: Session = None) -> Response: """Get task config.""" if task not in self.manager.user_config.get('tasks', {}): raise NotFoundError(f'task `{task}` not found') return jsonify({'name': task, 'config': self.manager.user_config['tasks'][task]})
[docs] @api.validate(task_input_schema) @api.response(200, model=task_return_schema) @api.response(NotFoundError) @api.response(BadRequest) def put(self, task, session: Session = None) -> Response: """Update tasks config.""" data = request.json new_task_name = data['name'] if task not in self.manager.user_config.get('tasks', {}): raise NotFoundError(f'task `{task}` not found') if 'tasks' not in self.manager.user_config: self.manager.user_config['tasks'] = {} if 'tasks' not in self.manager.config: self.manager.config['tasks'] = {} if task != new_task_name: # Rename task if new_task_name in self.manager.user_config['tasks']: raise BadRequest('cannot rename task as it already exist') del self.manager.user_config['tasks'][task] del self.manager.config['tasks'][task] # Process the task config task_schema_processed = copy.deepcopy(data) errors = process_config( task_schema_processed, schema=task_return_schema.__schema__, set_defaults=True ) if errors: raise APIError('problem loading config, raise a BUG as this should not happen!') self.manager.user_config['tasks'][new_task_name] = data['config'] self.manager.config['tasks'][new_task_name] = task_schema_processed['config'] self.manager.save_config() self.manager.config_changed() rsp = jsonify({ 'name': new_task_name, 'config': self.manager.user_config['tasks'][new_task_name], }) rsp.status_code = 200 return rsp
[docs] @api.response(200, model=base_message_schema, description='deleted task') @api.response(NotFoundError) def delete(self, task, session: Session = None) -> Response: """Delete a task.""" try: self.manager.config['tasks'].pop(task) self.manager.user_config['tasks'].pop(task) except KeyError: raise NotFoundError('task does not exist') self.manager.save_config() self.manager.config_changed() return success_response('successfully deleted task')
default_start_date = (datetime.now() - timedelta(weeks=1)).strftime('%Y-%m-%d') status_parser = api.parser() status_parser.add_argument( 'succeeded', type=inputs.boolean, default=True, help='Filter by success status' ) status_parser.add_argument( 'produced', type=inputs.boolean, default=True, store_missing=False, help='Filter by tasks that produced entries', ) status_parser.add_argument( 'start_date', type=inputs.datetime_from_iso8601, default=default_start_date, help="Filter by minimal start date. Example: '2012-01-01'", ) status_parser.add_argument( 'end_date', type=inputs.datetime_from_iso8601, help="Filter by maximal end date. Example: '2012-01-01'", ) status_parser.add_argument( 'limit', default=100, type=int, help='Limit return of executions per task, as that number can be huge', )
[docs] def _task_info_dict(task): return { 'id': int(task.id), 'name': task.name, 'current_phase': task.current_phase, 'current_plugin': task.current_plugin, }
[docs] @tasks_api.route('/queue/') class TaskQueueAPI(APIResource):
[docs] @api.response(200, model=task_api_queue_schema) def get(self, session: Session = None) -> Response: """List task(s) in queue for execution.""" tasks = [_task_info_dict(task) for task in self.manager.task_queue.run_queue.queue] if self.manager.task_queue.current_task: tasks.insert(0, _task_info_dict(self.manager.task_queue.current_task)) return jsonify(tasks)
[docs] class ExecuteLog(Queue): """Support task log streaming by acting like a file object."""
[docs] def write(self, s): self.put(json.dumps({'log': s}))
if TYPE_CHECKING: from typing import TypedDict class StreamTaskDict(TypedDict): queue: ExecuteLog last_update: datetime args: dict[str, Any] _streams: dict[str, StreamTaskDict] _streams = {} # Another namespace for the same endpoint inject_api = api.namespace('inject', description='Entry injection API')
[docs] @inject_api.route('/params/') @tasks_api.route('/execute/params/') @api.doc(description='Available payload parameters for task execute') class TaskExecutionParams(APIResource):
[docs] @etag(cache_age=3600) @api.response(200, model=task_execution_params) def get(self, session: Session = None) -> Response: """Execute payload parameters.""" return jsonify(ObjectsContainer.task_execution_input)
[docs] @inject_api.route('/') @tasks_api.route('/execute/') @api.doc(description='For details on available parameters query /params/ endpoint') class TaskExecutionAPI(APIResource):
[docs] @api.response(NotFoundError) @api.response(BadRequest) @api.response(200, model=task_api_execute_schema) @api.validate(task_execution_schema) def post(self, session: Session = None) -> Response: """Execute task and stream results.""" data = request.json for task in data.get('tasks'): if task.lower() not in [t.lower() for t in self.manager.user_config.get('tasks', {})]: raise NotFoundError(f'task {task} does not exist') queue = ExecuteLog() output = queue if data.get('loglevel') else None stream = bool( any( arg[0] in ['progress', 'summary', 'loglevel', 'entry_dump'] for arg in data.items() if arg[1] ) ) loglevel = data.pop('loglevel', None) if loglevel: loglevel = loglevel.upper() # This emulates the CLI command of using `--now` and `no-cache` options = { 'interval_ignore': data.pop('now', None), 'nocache': data.pop('no_cache', None), 'allow_manual': True, } options.update(data) if data.get('inject'): entries = [] for item in data.get('inject'): entry = Entry() entry['url'] = item['url'] if not item.get('title'): try: value, params = parse_header( requests.head(item['url']).headers['Content-Disposition'] ) entry['title'] = params['filename'] except KeyError: raise BadRequest( "No title given, and couldn't get one from the URL's HTTP response" ) else: entry['title'] = item.get('title') if item.get('force'): entry['immortal'] = True if item.get('accept'): entry.accept(reason='accepted by API inject') if item.get('fields'): for key, value in item.get('fields').items(): entry[key] = value entries.append(entry) options['inject'] = entries if output: with capture_console(output), capture_logs(output, level=loglevel): executed_tasks = self.manager.execute(options=options) else: executed_tasks = self.manager.execute(options=options) tasks_queued = [] for task_id, task_name, task_event in executed_tasks: tasks_queued.append({'id': task_id, 'name': task_name, 'event': task_event}) _streams[task_id] = {'queue': queue, 'last_update': datetime.now(), 'args': data} if not stream: return jsonify({ 'tasks': [{'id': task['id'], 'name': task['name']} for task in tasks_queued] }) def stream_response(): # First return the tasks to execute yield '{"stream": [' yield ( json.dumps({ 'tasks': [{'id': task['id'], 'name': task['name']} for task in tasks_queued] }) + ',\n' ) while True: try: yield queue.get(timeout=1) + ',\n' continue except Empty: pass if queue.empty() and all(task['event'].is_set() for task in tasks_queued): for task in tasks_queued: del _streams[task['id']] break yield '{}]}' return Response(stream_response(), mimetype='text/event-stream')
[docs] @event('manager.daemon.started') def setup_params(mgr): parser = get_parser('execute') for action in parser._optionals._actions: # Ignore list for irrelevant actions ignore = ['help', 'verbose', 'silent', 'try-regexp', 'dump-config', 'dump'] name = action.option_strings[-1].strip('-') if name in ignore or action.help == '==SUPPRESS==': continue name = name.replace('-', '_') property_data = {'description': action.help.capitalize()} if isinstance(action, argparse._StoreConstAction): property_data['type'] = 'boolean' elif isinstance(action, argparse._StoreAction): if action.nargs in ['+', '*']: property_data['type'] = 'array' property_data['items'] = {'type': 'string'} property_data['minItems'] = 1 else: property_data['type'] = 'string' else: # Unknown actions should not be added to schema property_data = None # Some options maybe pre-added to schema with additional options, don't override them if property_data and name not in ObjectsContainer.task_execution_input['properties']: ObjectsContainer.task_execution_input['properties'][name] = property_data ObjectsContainer.task_execution_input['additionalProperties'] = False
[docs] class EntryDecoder(JSONEncoder):
[docs] def default(self, o): if isinstance(o, LazyLookup): return '<LazyField>' try: return JSONEncoder.default(self, o) except TypeError: return str(o)
_phase_percents = { 'input': 5, 'metainfo': 10, 'filter': 30, 'download': 40, 'modify': 65, 'output': 75, 'exit': 100, }
[docs] def update_stream(task, status: str = 'pending') -> None: if task.current_phase in _phase_percents: task.stream['percent'] = _phase_percents[task.current_phase] progress = { 'status': status, 'phase': task.current_phase, 'plugin': task.current_plugin, 'percent': task.stream.get('percent', 0), } task.stream['queue'].put(json.dumps({'progress': progress, 'task_id': task.id}))
[docs] @event('task.execute.started') def start_task(task): task.stream = _streams.get(task.id) if task.stream and task.stream['args'].get('progress'): update_stream(task, status='running')
[docs] @event('task.execute.completed') def finish_task(task): if task.stream: if task.stream['args'].get('progress'): update_stream(task, status='complete') if task.stream['args'].get('entry_dump'): entries = [entry.store for entry in task.entries] task.stream['queue'].put( EntryDecoder().encode({'entry_dump': entries, 'task_id': task.id}) ) if task.stream['args'].get('summary'): task.stream['queue'].put( json.dumps({ 'summary': { 'accepted': len(task.accepted), 'rejected': len(task.rejected), 'failed': len(task.failed), 'undecided': len(task.undecided), 'aborted': task.aborted, 'abort_reason': task.abort_reason, }, 'task_id': task.id, }) )
[docs] @event('task.execute.before_plugin') def track_progress(task, plugin_name: str): if task.stream and task.stream['args'].get('progress'): update_stream(task, status='running')