Source code for flexget.components.notify.notifiers.prowl

import xml.etree.ElementTree as ET

from loguru import logger
from requests.exceptions import RequestException

from flexget import plugin
from flexget.config_schema import one_or_more
from flexget.event import event
from flexget.plugin import PluginWarning
from flexget.utils.requests import Session as RequestSession
from flexget.utils.requests import TimedLimiter

plugin_name = 'prowl'
logger = logger.bind(name=plugin_name)

PROWL_URL = 'https://api.prowlapp.com/publicapi/add'

requests = RequestSession(max_retries=3)
requests.add_domain_limiter(TimedLimiter('prowlapp.com', '5 seconds'))


[docs] class ProwlNotifier: """Send prowl notifications. Example:: notify: entries: via: - prowl: api_key: xxxxxxx [application: application name, default FlexGet] [event: event title, default New Release] [priority: -2 - 2 (2 = highest), default 0] [description: notification to send] """ schema = { 'type': 'object', 'properties': { 'api_key': one_or_more({'type': 'string'}), 'application': {'type': 'string', 'default': 'FlexGet'}, 'priority': {'type': 'integer', 'minimum': -2, 'maximum': 2}, 'url': {'type': 'string'}, 'provider_key': {'type': 'string'}, }, 'required': ['api_key'], 'additionalProperties': False, }
[docs] def notify(self, title, message, config): """Send a Prowl notification.""" notification = { 'application': config.get('application'), 'event': title, 'description': message, 'url': config.get('url'), 'priority': config.get('priority'), 'providerkey': config.get('provider_key'), } if isinstance(config['api_key'], list): config['api_key'] = [config['api_key']] notification['apikey'] = config['api_key'] try: response = requests.post(PROWL_URL, data=notification) except RequestException as e: raise PluginWarning(repr(e)) request_status = ET.fromstring(response.content) error = request_status.find('error') if error is not None: raise PluginWarning(error.text) success = request_status.find('success').attrib logger.debug( 'prowl notification sent. Notifications remaining until next reset: {}. ' 'Next reset will occur in {} minutes', success['remaining'], success['resetdate'], )
[docs] @event('plugin.register') def register_plugin(): plugin.register(ProwlNotifier, plugin_name, api_ver=2, interfaces=['notifiers'])