Source code for flexget.components.notify.notifiers.cronitor

import socket

from loguru import logger
from requests.exceptions import RequestException

from flexget import plugin
from flexget.event import event
from flexget.plugin import PluginWarning
from flexget.utils.requests import Session as RequestSession

requests = RequestSession(max_retries=3)

plugin_name = 'cronitor'

logger = logger.bind(name=plugin_name)


[docs] class Cronitor: """Send messages via cronitor. Example:: cronitor: ABC123 Or: cronitor: monitor_code: ABC123 on_start: yes on_abort: no message: Ping host: foo.bar auth_key: secret """ base_url = 'https://cronitor.link/{monitor_code}/{status}' schema = { 'oneOf': [ { 'type': 'object', 'properties': { 'monitor_code': {'type': 'string'}, 'on_start': {'type': 'boolean'}, 'on_abort': {'type': 'boolean'}, 'message': {'type': 'string'}, 'host': {'type': 'string'}, 'auth_key': {'type': 'string'}, }, 'required': ['monitor_code'], 'additionalProperties': False, }, {'type': 'string'}, ] }
[docs] @staticmethod def prepare_config(config): if isinstance(config, str): config = {'monitor_code': config} config.setdefault('on_start', True) config.setdefault('on_abort', True) config.setdefault('host', socket.gethostname()) return config
[docs] def _send_request(self, status, config, task_name): url = self.base_url.format(monitor_code=config['monitor_code'], status=status) message = config.get('message', f'{task_name} task {status}') data = {'msg': message, 'host': config['host']} if config.get('auth_key'): data['auth_key'] = config['auth_key'] try: rsp = requests.get(url, params=data) rsp.raise_for_status() except RequestException as e: raise PluginWarning(f'Could not report to cronitor: {e}')
[docs] def on_task_start(self, task, config): config = self.prepare_config(config) if not config['on_start']: return self._send_request('run', config, task.name)
[docs] def on_task_abort(self, task, config): config = self.prepare_config(config) if not config['on_abort']: return self._send_request('fail', config, task.name)
[docs] def on_task_exit(self, task, config): config = self.prepare_config(config) self._send_request('complete', config, task.name)
[docs] @event('plugin.register') def register_plugin(): plugin.register(Cronitor, plugin_name, api_ver=2)