Source code for flexget.components.status.cli

import datetime
import fnmatch
from datetime import timedelta

from sqlalchemy import and_, desc, func
from sqlalchemy.orm import aliased
from sqlalchemy.orm.exc import NoResultFound

from flexget import options
from flexget.event import event
from flexget.manager import Session
from flexget.terminal import TerminalTable, colorize, console, disable_colors, table_parser

from . import db


[docs] def do_cli(manager, options): if options.table_type == 'porcelain': disable_colors() if options.status_action == 'remove': do_cli_remove(manager, options) return if options.task: do_cli_task(manager, options) else: do_cli_summary(manager, options)
[docs] def do_cli_task(manager, options): header = ['Start', 'Duration', 'Entries', 'Accepted', 'Rejected', 'Failed', 'Abort Reason'] table = TerminalTable(*header, table_type=options.table_type) with Session() as session: try: task = session.query(db.StatusTask).filter(db.StatusTask.name == options.task).one() except NoResultFound: console(f'Task name `{options.task}` does not exists or does not have any records') return else: query = task.executions.order_by(desc(db.TaskExecution.start))[: options.limit] for ex in reversed(query): start = ex.start.strftime('%Y-%m-%d %H:%M') start = colorize('green', start) if ex.succeeded else colorize('red', start) if ex.end is not None and ex.start is not None: delta = ex.end - ex.start duration = f'{delta.total_seconds():1.0f}s' else: duration = '?' table.add_row( start, duration, str(ex.produced), str(ex.accepted), str(ex.rejected), str(ex.failed), ex.abort_reason if ex.abort_reason is not None else '', ) console(table)
[docs] def do_cli_summary(manager, options): header = [ 'Task', 'Last execution', 'Last success', 'Entries', 'Accepted', 'Rejected', 'Failed', 'Duration', ] table = TerminalTable(*header, table_type=options.table_type) with Session() as session: # Create aliases for different execution queries LastExecution = aliased(db.TaskExecution) # noqa: N806 LastSuccess = aliased(db.TaskExecution) # noqa: N806 # Subquery to find the last execution time for each task last_execution_subq = ( session .query(LastExecution.task_id, func.max(LastExecution.start).label('last_start')) .group_by(LastExecution.task_id) .subquery() ) # Subquery to find the last successful execution with produced > 0 for each task last_success_subq = ( session .query(LastSuccess.task_id, func.max(LastSuccess.start).label('last_success_start')) .filter(and_(LastSuccess.succeeded, LastSuccess.produced > 0)) .group_by(LastSuccess.task_id) .subquery() ) # Main query with left joins to get all required data in a single query query = ( session .query( db.StatusTask, last_execution_subq.c.last_start, LastSuccess.start, LastSuccess.end, LastSuccess.produced, LastSuccess.accepted, LastSuccess.rejected, LastSuccess.failed, ) .outerjoin(last_execution_subq, db.StatusTask.id == last_execution_subq.c.task_id) .outerjoin(last_success_subq, db.StatusTask.id == last_success_subq.c.task_id) .outerjoin( LastSuccess, and_( LastSuccess.task_id == db.StatusTask.id, LastSuccess.start == last_success_subq.c.last_success_start, LastSuccess.succeeded, LastSuccess.produced > 0, ), ) ) for row in query.all(): ( task, last_exec_time, success_start, success_end, produced, accepted, rejected, failed, ) = row # Process last execution time # Fix weird issue that a task registers StatusTask but without an execution. GH #2022 last_exec = last_exec_time.strftime('%Y-%m-%d %H:%M') if last_exec_time else '-' # Process last success data if success_start is None: duration = None last_success = '-' else: duration = success_end - success_start if success_end else None last_success = success_start.strftime('%Y-%m-%d %H:%M') age = datetime.datetime.utcnow() - success_start if age > timedelta(days=7): last_success = colorize('red', last_success) elif age < timedelta(minutes=10): last_success = colorize('green', last_success) table.add_row( task.name, last_exec, last_success, str(produced) if produced is not None else '-', str(accepted) if accepted is not None else '-', str(rejected) if rejected is not None else '-', str(failed) if failed is not None else '-', f'{duration.total_seconds():1.0f}s' if duration is not None else '-', ) console(table)
[docs] def do_cli_remove(manager, options): with Session() as session: all_tasks = session.query(db.StatusTask).all() pattern = options.task_name.lower() matching_tasks = [ task for task in all_tasks if fnmatch.fnmatchcase(task.name.lower(), pattern) ] if not matching_tasks: console(f'Task pattern `{options.task_name}` does not match any tasks') return task_names = [] for task in matching_tasks: console(f'Removing task `{task.name}` ...') session.delete(task) task_names.append(task.name) session.commit()
[docs] @event('options.register') def register_parser_arguments(): parser = options.register_command( 'status', do_cli, help='View task health status', parents=[table_parser] ) subparsers = parser.add_subparsers( title='actions', metavar='<action>', dest='status_action', required=False ) remove_parser = subparsers.add_parser( 'remove', help='Remove a task and all its execution records. Supports glob pattern matching.', ) remove_parser.add_argument( 'task_name', metavar='TASK', help='Name or glob pattern of the task(s) to remove (e.g., "task-*")', ) parser.add_argument( '--task', action='store', metavar='TASK', help='Limit to results in specified %(metavar)s' ) parser.add_argument( '--limit', action='store', type=int, metavar='NUM', default=50, help='Limit to %(metavar)s results', )