Source code for flexget.plugins.cli.inject
import argparse
import random
import string
import yaml
from flexget import options
from flexget.entry import Entry
from flexget.event import event
from flexget.terminal import console
from flexget.utils import requests
from flexget.utils.requests import parse_header
[docs]
@event('manager.subcommand.inject')
def do_cli(manager, options):
# Determine if first positional argument is a URL or a title
if not options.url and '://' in options.title:
options.url = options.title
options.title = None
if options.url and not options.title:
# Attempt to get a title from the URL response's headers
try:
value, params = parse_header(requests.head(options.url).headers['Content-Disposition'])
options.title = params['filename']
except KeyError:
console("No title given, and couldn't get one from the URL's HTTP response. Aborting.")
return
entry = Entry(title=options.title)
if options.url:
entry['url'] = options.url
else:
entry['url'] = 'http://localhost/inject/{}'.format(
''.join(random.sample(string.ascii_letters + string.digits, 30))
)
if options.force:
entry['immortal'] = True
if options.accept:
entry.accept(reason='accepted by CLI inject')
if options.fields:
for key, value in options.fields:
entry[key] = value
options.inject = [entry]
manager.execute_command(options)
[docs]
def key_equals_value(text):
if '=' not in text:
raise argparse.ArgumentTypeError('must be in the form: <field name>=<value>')
key, value = text.split('=')
return key, yaml.safe_load(value)
# Run after other plugins, so we can get all exec subcommand options
[docs]
@event('options.register', priority=0)
def register_parser_arguments():
exec_parser = options.get_parser('execute')
inject_parser = options.register_command(
'inject',
do_cli,
add_help=False,
parents=[exec_parser],
help='inject an entry from command line into tasks',
usage='%(prog)s title/url [url] [--accept] [--force] '
'[--fields NAME=VALUE [NAME=VALUE...]] [<execute arguments>]',
epilog=(
'If only a URL and no title is given, Flexget will attempt to '
"find a title in the URL's response headers."
),
)
inject_group = inject_parser.add_argument_group('inject arguments')
inject_group.add_argument(
'title', metavar='title/url', help='title or url of the entry to inject'
)
inject_group.add_argument('url', nargs='?', help='url of the entry to inject')
inject_group.add_argument(
'--force', action='store_true', help='prevent any plugins from rejecting this entry'
)
inject_group.add_argument(
'--accept', action='store_true', help='accept this entry immediately upon injection'
)
inject_group.add_argument('--fields', metavar='NAME=VALUE', nargs='+', type=key_equals_value)
# Hack the title of the exec options a bit (would be 'optional arguments' otherwise)
inject_parser._action_groups[1].title = 'execute arguments'
# The exec arguments show first... unless we switch them
inject_parser._action_groups.remove(inject_group)
inject_parser._action_groups.insert(0, inject_group)