import sys
from argparse import ArgumentParser, ArgumentTypeError
from sqlalchemy.orm.exc import NoResultFound
from flexget import options
from flexget.event import event
from flexget.manager import Session
from flexget.terminal import TerminalTable, colorize, console, disable_colors, table_parser
from . import db
[docs]
def valid_entry(value):
try:
int(value)
except ValueError:
if value != 'all':
raise ArgumentTypeError("Must select 'all' or valid entry ID")
return value
[docs]
def do_cli(manager, options):
if hasattr(options, 'table_type') and options.table_type == 'porcelain':
disable_colors()
if options.action == 'list':
list_entries(options)
elif options.action == 'approve':
manage_entries(options, options.selection, True)
elif options.action == 'reject':
manage_entries(options, options.selection, False)
elif options.action == 'clear':
clear_entries(options)
[docs]
def list_entries(options):
"""List pending entries."""
approved = options.approved
task_name = options.task_name
with Session() as session:
entries = db.list_pending_entries(session=session, task_name=task_name, approved=approved)
header = ['#', 'Task Name', 'Title', 'URL', 'Approved', 'Added']
table = TerminalTable(*header, table_type=options.table_type)
for entry in entries:
table.add_row(
str(entry.id),
entry.task_name,
entry.title,
entry.url,
colorize('green', 'Yes') if entry.approved else 'No',
entry.added.strftime('%c'),
)
console(table)
[docs]
def manage_entries(options, selection, approved):
"""Manage pending entries."""
approved_text = 'approved' if approved else 'pending'
with Session() as session:
if selection == 'all':
entries = db.list_pending_entries(session=session, approved=not approved)
else:
try:
entry = db.get_entry_by_id(session, selection)
if entry.approved is approved:
console(
colorize('red', 'ERROR: ')
+ f'Entry with ID {entry.id} is already {approved_text}'
)
sys.exit(1)
except NoResultFound:
console(f'Pending entry with ID {selection} does not exist')
sys.exit(1)
else:
entries = [entry]
if not entries:
console(f'All entries are already {approved_text}')
return
for entry in entries:
if entry.approved is not approved:
console(f'Setting pending entry with ID {entry.id} status to {approved_text}')
entry.approved = approved
[docs]
def clear_entries(options):
"""Clear pending entries."""
with Session() as session:
query = session.query(db.PendingEntry).filter(~db.PendingEntry.approved)
if options.task_name:
query = query.filter(db.PendingEntry.task_name == options.task_name)
deleted = query.delete()
console(f'Successfully deleted {deleted} pending entries')
[docs]
@event('options.register')
def register_parser_arguments():
selection_parser = ArgumentParser(add_help=False)
selection_parser.add_argument(
'selection', type=valid_entry, help="Entity ID or 'all' to approve all pending entries"
)
filter_parser = ArgumentParser(add_help=False)
filter_parser.add_argument('--task-name', help='Filter by task name')
parser = options.register_command('pending', do_cli, help='View and manage pending entries')
subparsers = parser.add_subparsers(title='actions', metavar='<action>', dest='action')
list_parser = subparsers.add_parser(
'list', help='Shows all existing pending entries', parents=[table_parser, filter_parser]
)
list_group = list_parser.add_mutually_exclusive_group()
list_group.add_argument(
'--pending',
action='store_false',
help='Show only pending entries',
dest='approved',
default=None,
)
list_group.add_argument(
'--approved',
action='store_true',
help='Show only approved entries',
dest='approved',
default=None,
)
subparsers.add_parser('approve', help='Approve pending entries', parents=[selection_parser])
subparsers.add_parser('reject', help='Reject pending entries', parents=[selection_parser])
subparsers.add_parser('clear', help='Clear all pending entries', parents=[filter_parser])