import json
from loguru import logger
from flexget import plugin
from flexget.event import event
logger = logger.bind(name='output.sns')
DEFAULT_TEMPLATE_VALUE = json.dumps({
'entry': {
'title': '{{title}}',
'url': '{{url}}',
'original_url': '{{original_url}}',
'series': '{{series_name}}',
'series_id': '{{series_id}}',
},
'task': '{{task}}',
})
[docs]
class SNSNotification:
"""Emits SNS notifications of entries.
Optionally writes the torrent itself to S3
Example configuration::
sns:
[aws_access_key_id: <AWS ACCESS KEY ID>] (will be taken from AWS_ACCESS_KEY_ID environment if not provided)
[aws_secret_access_key: <AWS SECRET ACCESS KEY>] (will be taken from AWS_SECRET_ACCESS_KEY environment if
not provided)
[profile_name: <AWS PROFILE NAME>] (If provided, use this profile name instead of the default.)
aws_region: <REGION>
sns_topic_arn: <SNS ARN>
[sns_notification_template: <TEMPLATE] (defaults to DEFAULT_TEMPLATE_VALUE)
"""
schema = {
'type': 'object',
'properties': {
'sns_topic_arn': {'type': 'string'},
'sns_notification_template': {'type': 'string', 'default': DEFAULT_TEMPLATE_VALUE},
'aws_access_key_id': {'type': 'string'},
'aws_secret_access_key': {'type': 'string'},
'aws_region': {'type': 'string'},
'profile_name': {'type': 'string'},
},
'required': ['sns_topic_arn', 'aws_region'],
'additionalProperties': False,
}
[docs]
def on_task_start(self, task, config):
# verify that we actually support Boto 3
try:
import boto3 # noqa: F401
except ImportError as e:
logger.debug('Error importing boto3: {}', e)
raise plugin.DependencyError(
'sns', 'boto3', f'Boto3 module required. ImportError: {e}'
)
# this has to run near the end of the plugin chain, because we
# should notify after all other outputs.
[docs]
@plugin.priority(0)
def on_task_output(self, task, config):
sender = SNSNotificationEmitter(config)
sender.send_notifications(task)
[docs]
class SNSNotificationEmitter:
def __init__(self, config):
self.config = config
import boto3
self.boto3 = boto3
self.sns_notification_template = self.config.get(
'sns_notification_template', DEFAULT_TEMPLATE_VALUE
)
[docs]
def build_session(self):
self.session = self.boto3.Session(
aws_access_key_id=self.config.get('aws_access_key_id', None),
aws_secret_access_key=self.config.get('aws_secret_access_key', None),
profile_name=self.config.get('profile_name', None),
region_name=self.config['aws_region'],
)
[docs]
def get_topic(self):
self.build_session()
sns = self.session.resource('sns')
return sns.Topic(self.config['sns_topic_arn'])
[docs]
def send_notifications(self, task):
topic = self.get_topic()
for entry in task.accepted:
message = entry.render(self.sns_notification_template)
if task.options.test:
logger.info(
'SNS publication: region={}, arn={}', self.config['aws_region'], topic.arn
)
logger.info('Message: {}', message)
continue
try:
response = topic.publish(Message=message)
except Exception as e:
logger.error('Error publishing {}: {}', entry['title'], e)
continue
else:
logger.debug('Published {}: {}', entry, response)
[docs]
@event('plugin.register')
def register_sns_plugin():
plugin.register(SNSNotification, 'sns', api_ver=2)