import json
import os
import re
import ssl
import xmlrpc.client
import requests
from loguru import logger
from flexget import plugin
from flexget.event import event
from flexget.plugin import PluginError
from flexget.utils.template import RenderError
logger = logger.bind(name='aria2')
[docs]
class RpcClient:
def __init__(self, server, port, scheme, rpc_path, username, password, secret):
if secret:
self.token = 'token:' + secret
else:
self.token = ''
userpass = f'{username}:{password}@' if username and password else ''
self.url = f'{scheme}://{userpass}{server}:{port}/{rpc_path}'
logger.debug('aria2 url: {}', self.url)
[docs]
def add_uri(self, uris, options):
raise plugin.PluginError('Unsupported Operation')
[docs]
def add_torrent(self, torrent, options):
raise plugin.PluginError('Unsupported Operation')
[docs]
class JsonRpcClient(RpcClient):
RPC_ID = 'FLEXGET'
RPC_VERSION = '2.0'
ADDURI_METHOD = 'aria2.addUri'
GETGLOBALSTAT_METHOD = 'aria2.getGlobalStat'
ADDTORRENT_METHOD = 'aria2.addTorrent'
ADDMETALINK_METHOD = 'aria2.addMetalink'
def __init__(self, server, port, scheme, rpc_path, username=None, password=None, secret=None):
super().__init__(server, port, scheme, rpc_path, username, password, secret)
# trigger _default_error_handle on failure
self.get_global_stat()
[docs]
def _get_req_params(self, method, params=None):
if params is None:
params = []
req_params = {
'jsonrpc': JsonRpcClient.RPC_VERSION,
'id': JsonRpcClient.RPC_ID,
'method': method,
'params': params,
}
if self.token:
req_params['params'].insert(0, self.token)
if not req_params['params']:
del req_params['params']
return req_params
[docs]
@staticmethod
def _default_error_handle(code, message):
logger.critical('Fault code {} message {}', code, message)
raise plugin.PluginError(f'Fault code {code} message {message}', logger)
[docs]
@staticmethod
def _default_success_handle(response):
return response.text
[docs]
def _post(
self, method, params, on_success=_default_success_handle, on_fail=_default_error_handle
):
resp = requests.post(self.url, data=json.dumps(self._get_req_params(method, params)))
result = resp.json()
if 'error' in result:
return on_fail(result['error']['code'], result['error']['message'])
return on_success(resp)
[docs]
def add_uri(self, uris, options):
# https://aria2.github.io/manual/en/html/aria2c.html#aria2.addUri
return self._post(JsonRpcClient.ADDURI_METHOD, params=[[uris], options])
[docs]
def add_torrent(self, torrent, options):
# https://aria2.github.io/manual/en/html/aria2c.html#aria2.addTorrent
return self._post(JsonRpcClient.ADDTORRENT_METHOD, params=[torrent, options])
[docs]
def get_global_stat(self):
# https://aria2.github.io/manual/en/html/aria2c.html#aria2.getGlobalStat
return self._post(JsonRpcClient.GETGLOBALSTAT_METHOD, params=[])
[docs]
class XmlRpcClient(RpcClient):
def __init__(self, server, port, scheme, rpc_path, username=None, password=None, secret=None):
schemes = {'http': None, 'https': ssl.SSLContext()}
if scheme not in schemes:
raise plugin.PluginError(f'Unknown scheme: {scheme}', logger)
super().__init__(server, port, scheme, rpc_path, username, password, secret)
try:
self._aria2 = xmlrpc.client.ServerProxy(self.url, context=schemes[scheme]).aria2
except xmlrpc.client.ProtocolError as exc:
raise plugin.PluginError(
f'Could not connect to aria2 at {self.url}. Protocol error {exc.errcode}: {exc.errmsg}',
logger,
) from exc
except xmlrpc.client.Fault as exc:
raise plugin.PluginError(
f'XML-RPC fault: Unable to connect to aria2 daemon at {self.url}: {exc.faultString}',
logger,
) from exc
except OSError as exc:
raise plugin.PluginError(
f'Socket connection issue with aria2 daemon at {self.url}: {exc}', logger
) from exc
except Exception as exc:
logger.opt(exception=True).debug('Unexpected error during aria2 connection')
raise plugin.PluginError(
'Unidentified error during connection to aria2 daemon', logger
) from exc
[docs]
def add_uri(self, uris, options):
# https://aria2.github.io/manual/en/html/aria2c.html#aria2.addUri
params = [[uris]]
if options:
params.append(options)
if self.token:
params.insert(0, self.token)
return self._aria2.addUri(*params)
[docs]
def add_torrent(self, torrent, options):
# https://aria2.github.io/manual/en/html/aria2c.html#aria2.addTorrent
params = [torrent]
if options:
params.append(options)
if self.token:
params.insert(0, self.token)
return self._aria2.addTorrent(*params)
RPC_CLIENTS = {'xml': XmlRpcClient, 'json': JsonRpcClient}
[docs]
class OutputAria2:
"""Simple Aria2 output.
Example::
aria2:
path: ~/downloads/
"""
schema = {
'type': 'object',
'properties': {
'server': {'type': 'string', 'default': 'localhost'},
'port': {'type': 'integer', 'default': 6800},
'secret': {'type': 'string', 'default': ''},
# NOTE: To be deprecated by aria2
'username': {'type': 'string', 'default': ''},
'password': {'type': 'string', 'default': ''},
'scheme': {'type': 'string', 'default': 'http'},
# NOTE: xml/json
'rpc_mode': {'type': 'string', 'default': 'xml', 'enum': list(RPC_CLIENTS)},
'rpc_path': {'type': 'string', 'default': 'rpc'},
'path': {'type': 'string'},
'filename': {'type': 'string'},
'add_extension': {
'oneOf': [
{'type': 'string'},
{'type': 'boolean'},
],
'default': 'no',
},
'options': {
'type': 'object',
'additionalProperties': {
'oneOf': [{'type': 'string'}, {'type': 'number'}, {'type': 'boolean'}]
},
},
},
'required': ['path'],
'additionalProperties': False,
}
[docs]
def prepare_config(self, config):
config.setdefault('server', 'localhost')
config.setdefault('port', 6800)
config.setdefault('username', '')
config.setdefault('password', '')
config.setdefault('scheme', 'http')
config.setdefault('rpc_mode', 'xml')
config.setdefault('rpc_path', 'rpc')
config.setdefault('secret', '')
config.setdefault('options', {})
config.setdefault('add_extension', False)
options = config['options']
for key in options:
if isinstance(options[key], bool):
options[key] = str(options[key]).lower()
elif not isinstance(options[key], str):
options[key] = str(options[key])
return config
[docs]
def on_task_output(self, task, config):
# don't add when learning
if task.options.learn:
return
config = self.prepare_config(config)
if config['rpc_mode'] not in RPC_CLIENTS:
raise PluginError(f'Unknown rpc_mode: {config["rpc_mode"]}')
aria2 = RPC_CLIENTS[config['rpc_mode']](
config['server'],
config['port'],
config['scheme'],
config['rpc_path'],
config['username'],
config['password'],
config['secret'],
)
for entry in task.accepted:
if task.options.test:
logger.verbose('Would add `{}` to aria2.', entry['title'])
continue
try:
self.add_entry(aria2, entry, config, task)
except OSError as se:
entry.fail(f'Unable to reach Aria2: {se}')
except xmlrpc.client.Fault as err:
logger.critical('Fault code {} message {}', err.faultCode, err.faultString)
entry.fail('Aria2 communication Fault')
except Exception as e:
logger.opt(exception=True).debug('Exception type {}', type(e))
raise
[docs]
def add_entry(self, aria2: RpcClient, entry, config, task):
"""Add entry to Aria2."""
options = config['options']
try:
path = entry.get('path', config.get('path', None))
options['dir'] = os.path.expanduser(entry.render(path).rstrip('/'))
except RenderError as e:
entry.fail(f"failed to render 'path': {e}")
return None
filename = entry.get('content_filename', config.get('filename', None))
add_extension = entry.get('content_extension', config.get('add_extension', False))
if filename:
if add_extension:
ext = None
if isinstance(add_extension, bool):
logger.debug('Getting filename from `{}`', entry['url'])
content_disposition = None
try:
with task.requests.get(
entry['url'], headers=None, stream=True
) as response:
content_disposition = response.headers.get('content-disposition', None)
except Exception:
logger.warning(
'Not possible to retrieve file info from `{}`', entry['url']
)
entry.fail(
'Not possible to retrieve file info from `{}`'.format(entry['url'])
)
return None
if content_disposition:
fname_match = re.findall(
r'filename=["\']?([^"\']+)["\']?', content_disposition
)
if fname_match:
fname = fname_match[0]
fname_info = os.path.splitext(fname)
if len(fname_info) == 2:
ext = fname_info[1]
logger.debug(
'Filename from `{}` is {} with ext `{}`',
entry['url'],
fname,
ext,
)
else:
ext = add_extension if add_extension[0] == '.' else '.' + add_extension
if not ext:
logger.warning('Not possible to retrieve extension')
entry.fail('Not possible to retrieve extension')
return None
logger.debug('Adding extension `{}` to file `{}`', ext, filename)
filename += ext
try:
options['out'] = os.path.expanduser(entry.render(filename))
except RenderError as e:
entry.fail(f"failed to render 'filename': {e}")
return None
# handle torrent files
if 'torrent' in entry:
if 'file' in entry:
torrent_file = entry['file']
elif 'location' in entry:
# in case download plugin moved the file elsewhere
torrent_file = entry['location']
else:
entry.fail('Cannot find torrent file')
return None
return aria2.add_torrent(
xmlrpc.client.Binary(open(torrent_file, mode='rb').read()), options
)
# handle everything else (except metalink -- which is unsupported)
# so magnets, https, http, ftp .. etc
return aria2.add_uri(entry['url'], options)
[docs]
@event('plugin.register')
def register_plugin():
plugin.register(OutputAria2, 'aria2', api_ver=2)