Source code for flexget.components.irc.api

from flask import jsonify
from flask_restx import inputs

from flexget.api import APIResource, api
from flexget.api.app import (
    BadRequest,
    NotFoundError,
    base_message_schema,
    empty_response,
    success_response,
)

irc_api = api.namespace('irc', description='View and manage IRC connections')

irc_parser = api.parser()
irc_parser.add_argument(
    'name', help='Name of connection. Leave empty to apply to all connections.'
)


[docs] class ObjectsContainer: connection_object = { 'type': 'object', 'properties': { 'alive': {'type': 'boolean'}, 'channels': { 'type': 'array', 'items': {'type': 'object', 'patternProperties': {r'\w': {'type': 'integer'}}}, }, 'connected_channels': {'type': 'array', 'items': {'type': 'string'}}, 'port': {'type': 'integer'}, 'server': {'type': 'string'}, }, } connection = {'type': 'object', 'patternProperties': {r'\w': connection_object}} return_response = {'type': 'array', 'items': connection}
return_schema = api.schema_model('irc.connections', ObjectsContainer.return_response)
[docs] @irc_api.route('/connections/') @api.doc(expect=[irc_parser]) class IRCStatus(APIResource):
[docs] @api.response(200, model=return_schema) @api.response(NotFoundError) @api.response(BadRequest) def get(self, session=None): """Return status of IRC connections.""" from .irc import irc_manager if irc_manager is None: raise BadRequest('IRC daemon does not appear to be running') args = irc_parser.parse_args() name = args.get('name') try: status = irc_manager.status(name) except ValueError as e: raise NotFoundError(e.args[0]) return jsonify(status)
[docs] @irc_api.route('/connections/enums/') class IRCEnums(APIResource):
[docs] @api.response(200, model=empty_response) def get(self, session=None): """Get channel status enumeration meaning.""" try: from irc_bot import simple_irc_bot except ImportError: raise BadRequest('irc_bot dep is not installed') return jsonify(simple_irc_bot.IRCChannelStatus().enum_dict)
[docs] @irc_api.route('/restart/') @api.doc(expect=[irc_parser]) class IRCRestart(APIResource):
[docs] @api.response(200, model=base_message_schema) @api.response(NotFoundError) @api.response(BadRequest) def get(self, session=None): """Restarts IRC connections.""" from .irc import irc_manager if irc_manager is None: raise BadRequest('IRC daemon does not appear to be running') args = irc_parser.parse_args() connection = args.get('name') try: irc_manager.restart_connections(connection) except KeyError: raise NotFoundError(f'Connection {connection} is not a valid IRC connection') return success_response('Successfully restarted connection(s)')
irc_stop_parser = irc_parser.copy() irc_stop_parser.add_argument( 'wait', type=inputs.boolean, default=False, help='Wait for connection to exit gracefully' )
[docs] @irc_api.route('/stop/') @api.doc(expect=[irc_stop_parser]) class IRCStop(APIResource):
[docs] @api.response(200, model=base_message_schema) @api.response(NotFoundError) @api.response(BadRequest) def get(self, session=None): """Stop IRC connections.""" from .irc import irc_manager if irc_manager is None: raise BadRequest('IRC daemon does not appear to be running') args = irc_stop_parser.parse_args() name = args.get('name') wait = args.get('wait') try: irc_manager.stop_connections(wait=wait, name=name) except KeyError: raise NotFoundError(f'Connection {name} is not a valid IRC connection') return success_response('Successfully stopped connection(s)')