from datetime import datetime, timedelta
from math import ceil
from flask import jsonify, request
from flask_restx import inputs
from loguru import logger
from sqlalchemy.orm.exc import NoResultFound
from flexget.api.app import APIResource, NotFoundError, api, etag, pagination_headers
from flexget.api.core.tasks import tasks_api
from . import db
logger = logger.bind(name='status_api')
status_api = api.namespace('status', description='View and manage task execution status')
[docs]
class ObjectsContainer:
task_status_execution_schema = {
'type': 'object',
'properties': {
'abort_reason': {'type': ['string', 'null']},
'accepted': {'type': 'integer'},
'end': {'type': 'string', 'format': 'date-time'},
'failed': {'type': 'integer'},
'id': {'type': 'integer'},
'produced': {'type': 'integer'},
'rejected': {'type': 'integer'},
'start': {'type': 'string', 'format': 'date-time'},
'succeeded': {'type': 'boolean'},
'task_id': {'type': 'integer'},
},
'additionalProperties': False,
}
executions_list = {'type': 'array', 'items': task_status_execution_schema}
task_status_schema = {
'type': 'object',
'properties': {
'id': {'type': 'integer'},
'name': {'type': 'string'},
'last_execution_time': {'type': ['string', 'null'], 'format': 'date-time'},
'last_execution': task_status_execution_schema,
},
'required': ['id', 'name'],
'additionalProperties': False,
}
task_status_list_schema = {'type': 'array', 'items': task_status_schema}
task_status = api.schema_model('tasks.tasks_status', ObjectsContainer.task_status_schema)
task_status_list = api.schema_model(
'tasks.tasks_status_list', ObjectsContainer.task_status_list_schema
)
task_executions = api.schema_model('tasks.tasks_executions_list', ObjectsContainer.executions_list)
sort_choices = ('last_execution_time', 'name', 'id')
tasks_parser = api.pagination_parser(sort_choices=sort_choices)
tasks_parser.add_argument(
'include_execution',
type=inputs.boolean,
default=True,
help='Include the last execution of the task',
)
[docs]
@tasks_api.route('/status/')
@status_api.route('/')
@api.doc(expect=[tasks_parser])
class TasksStatusAPI(APIResource):
[docs]
@etag
@api.response(200, model=task_status_list)
def get(self, session=None):
"""Get status tasks."""
args = tasks_parser.parse_args()
# Pagination and sorting params
page = args['page']
per_page = args['per_page']
sort_by = args['sort_by']
sort_order = args['order']
# Additional data
include_execution = args.get('include_execution')
if per_page > 100:
logger.debug('per_page is higher than max value of 100, setting 100')
per_page = 100
start = per_page * (page - 1)
stop = start + per_page
descending = sort_order == 'desc'
kwargs = {
'start': start,
'stop': stop,
'order_by': sort_by,
'descending': descending,
'session': session,
}
total_items = session.query(db.StatusTask).count()
logger.debug('db has a total of {} status tasks', total_items)
if not total_items:
return jsonify([])
db_status_tasks = db.get_status_tasks(**kwargs)
total_pages = ceil(total_items / float(per_page))
if page > total_pages:
raise NotFoundError(f'page {page} does not exist')
# Actual results in page
actual_size = min(len(db_status_tasks), per_page)
# Get pagination headers
pagination = pagination_headers(total_pages, total_items, actual_size, request)
status_tasks = []
for task in db_status_tasks:
st_task = task.to_dict()
if include_execution:
execution = task.executions.order_by(db.TaskExecution.start.desc()).first()
st_task['last_execution'] = execution.to_dict() if execution else {}
status_tasks.append(st_task)
# Create response
rsp = jsonify(status_tasks)
# Add link header to response
rsp.headers.extend(pagination)
return rsp
[docs]
@tasks_api.route('/status/<int:task_id>/')
@status_api.route('/<int:task_id>/')
@api.doc(params={'task_id': 'ID of the status task'}, expect=[tasks_parser])
class TaskStatusAPI(APIResource):
[docs]
@etag
@api.response(200, model=task_status)
@api.response(NotFoundError)
def get(self, task_id, session=None):
"""Get status task by ID."""
try:
task = session.query(db.StatusTask).filter(db.StatusTask.id == task_id).one()
except NoResultFound:
raise NotFoundError(f'task status with id {task_id} not found')
args = tasks_parser.parse_args()
include_execution = args.get('include_execution')
st_task = task.to_dict()
if include_execution:
execution = task.executions.order_by(db.TaskExecution.start.desc()).first()
st_task['last_execution'] = execution.to_dict() if execution else {}
return jsonify(st_task)
default_start_date = (datetime.now() - timedelta(weeks=1)).strftime('%Y-%m-%d')
executions_parser = api.parser()
executions_parser.add_argument(
'succeeded', type=inputs.boolean, default=True, help='Filter by success status'
)
executions_parser.add_argument(
'produced',
type=inputs.boolean,
default=True,
store_missing=False,
help='Filter by tasks that produced entries',
)
executions_parser.add_argument(
'start_date',
type=inputs.datetime_from_iso8601,
default=default_start_date,
help="Filter by minimal start date. Example: '2012-01-01'. Default is 1 week ago.",
)
executions_parser.add_argument(
'end_date',
type=inputs.datetime_from_iso8601,
help="Filter by maximal end date. Example: '2012-01-01'",
)
sort_choices = (
'start',
'end',
'succeeded',
'produced',
'accepted',
'rejected',
'failed',
'abort_reason',
)
executions_parser = api.pagination_parser(executions_parser, sort_choices=sort_choices)
[docs]
@tasks_api.route('/status/<int:task_id>/executions/')
@status_api.route('/<int:task_id>/executions/')
@api.doc(expect=[executions_parser], params={'task_id': 'ID of the status task'})
class TaskStatusExecutionsAPI(APIResource):
[docs]
@etag
@api.response(200, model=task_executions)
@api.response(NotFoundError)
def get(self, task_id, session=None):
"""Get task executions by ID."""
try:
task = session.query(db.StatusTask).filter(db.StatusTask.id == task_id).one()
except NoResultFound:
raise NotFoundError(f'task status with id {task_id} not found')
args = executions_parser.parse_args()
# Pagination and sorting params
page = args['page']
per_page = args['per_page']
sort_by = args['sort_by']
sort_order = args['order']
# Filter params
succeeded = args.get('succeeded')
produced = args.get('produced')
start_date = args.get('start_date')
end_date = args.get('end_date')
per_page = min(per_page, 100)
start = per_page * (page - 1)
stop = start + per_page
descending = sort_order == 'desc'
kwargs = {
'start': start,
'stop': stop,
'task_id': task_id,
'order_by': sort_by,
'descending': descending,
'succeeded': succeeded,
'produced': produced,
'start_date': start_date,
'end_date': end_date,
'session': session,
}
total_items = task.executions.count()
if not total_items:
return jsonify([])
executions = [e.to_dict() for e in db.get_executions_by_task_id(**kwargs)]
total_pages = ceil(total_items / float(per_page))
if page > total_pages:
raise NotFoundError(f'page {page} does not exist')
# Actual results in page
actual_size = min(len(executions), per_page)
# Get pagination headers
pagination = pagination_headers(total_pages, total_items, actual_size, request)
# Create response
rsp = jsonify(executions)
# Add link header to response
rsp.headers.extend(pagination)
return rsp