import ssl
from hashlib import sha256
from loguru import logger
from flexget import plugin
from flexget.event import event
from flexget.plugin import DependencyError, PluginWarning
plugin_name = 'mqtt'
logger = logger.bind(name=plugin_name)
[docs]
class MQTTNotifier:
"""Send messages via MQTT.
Example::
notify:
entries:
via:
- mqtt:
broker_address: "iot.eclipse.org"
topic: "flexget/notifications"
[broker_port: 1883]
[broker_timeout: 30]
[broker_transport: ['tcp','websockets'] ]
[broker_protocol: ['MQTTv31', 'MQTTv311'] ]
[username: yourUsernameHere]
[password: yourPasswordHere]
[encrypted_communication: True/False]
[certificates:
broker_ca_cert: /path/to/pem/encoded/broker_ca_certificate.crt
client_cert: /path/to/pem/encoded/client_certificate.crt
client_key: /path/to/pem/encoded/client_certificate.key
validate_broker_cert: True/False
tls_version: ['tlsv1.2', 'tlsv1.1', 'tlsv1']
]
[qos: [0,1,2] ]
[retain: True/False]
"""
schema = {
'type': 'object',
'properties': {
'topic': {'type': 'string'},
'broker_address': {'type': 'string'},
'broker_port': {'type': 'integer', 'default': 1883},
'broker_timeout': {'type': 'integer', 'default': 30},
'broker_transport': {
'type': 'string',
'default': 'tcp',
'enum': ['tcp', 'websockets'],
},
'broker_protocol': {
'type': 'string',
'default': 'MQTTv311',
'enum': ['MQTTv31', 'MQTTv311'],
},
'username': {'type': 'string'},
'password': {'type': 'string'},
'encrypted_communication': {'type': 'boolean', 'default': False},
'certificates': {
'type': 'object',
'properties': {
'broker_ca_cert': {'type': 'string'},
'client_cert': {'type': 'string'},
'client_key': {'type': 'string'},
'validate_broker_cert': {'type': 'boolean', 'default': True},
'tls_version': {'type': 'string', 'enum': ['tlsv1.2', 'tlsv1.1', 'tlsv1', '']},
},
'additionalProperties': False,
},
'qos': {'type': 'integer', 'minimum': 0, 'maximum': 2, 'default': 0},
'retain': {'type': 'boolean', 'default': False},
},
'additionalProperties': False,
'required': ['broker_address', 'topic'],
'dependentRequired': {'password': ['username']},
}
[docs]
def notify(self, title, message, config):
"""Publish to an MQTT topic."""
try:
import paho.mqtt.client as mqtt
except ImportError as e:
logger.verbose('Error importing paho.mqtt.client: {}', e)
raise DependencyError(
plugin_name,
'paho.mqtt.client',
f'paho-mqtt python module is required for MQTT notify plugin. ImportError: {e}',
)
def on_log_cb(client, userdata, level, buff):
logger.verbose(str(buff))
def on_publish_cb(client, userdata, mid):
logger.verbose(
'MQTT on_publish callback - message was successfully published to broker as messageID={}',
mid,
)
client.disconnect()
def on_disconnect_cb(client, userdata, rc):
logger.verbose(
'MQTT on_disconnect callback - disconnected with result code {} [{}]',
rc,
conn_rc_description_map.get(rc),
'Unknown',
)
client.loop_stop()
config['title'] = title
config['message'] = message
config['payload'] = f'{config["title"]} - {config["message"]}'
conn_rc_description_map = {
0: 'Connection Accepted',
1: 'Connection Refused, unacceptable protocol version - The Server does not support the level of the MQTT protocol requested by the Client',
2: 'Connection Refused, identifier rejected - The Client identifier is correct UTF-8 but not allowed by the Server',
3: 'Connection Refused, Server unavailable - The Network Connection has been made but the MQTT service is unavailable',
4: 'Connection Refused, bad user name or password - The data in the user name or password is malformed',
5: 'Connection Refused, not authorized - The Client is not authorized to connect',
}
# Handle the MQTT broker protocol to be used
if config.get('broker_protocol') == 'MQTTv311':
config['broker_protocol_class'] = mqtt.MQTTv311
else:
config['broker_protocol_class'] = mqtt.MQTTv31
logger.trace('MQTT notify config={}', config)
# create the mqtt client
client = mqtt.Client(
protocol=config['broker_protocol_class'], transport=config['broker_transport']
)
client.enable_logger(logger=logger)
client.on_log = on_log_cb
client.on_publish = on_publish_cb
client.on_disconnect = on_disconnect_cb
# Handle SSL/TLS communication w/out certificate authentication
if not config.get('certificates', {}).get('client_cert') and config.get(
'encrypted_communication'
):
client.tls_set(
ca_certs=config.get('certificates', {}).get('broker_ca_cert'),
certfile=None,
keyfile=None,
cert_reqs=ssl.CERT_NONE,
)
client.tls_insecure_set(True)
logger.verbose('Basic SSL/TLS encrypted communications enabled')
logger.verbose('TLS insecure cert mode enabled. Broker cert will not be validated')
# Handle SSL/TLS communication with certificate authentication
if config.get('certificates'):
certs = config['certificates']
logger.debug('TLS certificate config: {}', certs)
tls_version_map = {
'tlsv1.2': ssl.PROTOCOL_TLSv1_2,
'tlsv1.1': ssl.PROTOCOL_TLSv1_1,
'tlsv1': ssl.PROTOCOL_TLSv1,
'': None,
}
tls_version = tls_version_map.get(certs.get('tls_version'), ssl.PROTOCOL_TLSv1_2)
logger.verbose('TLS version is {}', tls_version)
cert_required = (
ssl.CERT_REQUIRED if certs.get('validate_broker_cert', True) else ssl.CERT_NONE
)
client.tls_set(
ca_certs=certs.get('broker_ca_cert'),
certfile=certs.get('client_cert'),
keyfile=certs.get('client_key'),
cert_reqs=cert_required,
tls_version=tls_version,
)
if not certs.get('validate_broker_cert'):
client.tls_insecure_set(True)
logger.debug('TLS insecure cert mode enabled. Broker cert will not be validated')
else:
logger.debug('TLS secure cert mode enabled. Broker cert will be validated')
# Handle user/pass authentication
if config.get('username'):
logger.debug('Credential passwords s are redacted to protect the innocent...')
logger.debug(
'Auth credentials: username=[{}] password sha256 hash is "{}"',
config.get('username'),
sha256(str(config.get('password')).encode('utf-8')).hexdigest(),
)
logger.debug(
'You can validate them yourself by calculating the sha256 hex digest of your password string (google is your friend if you do not know how to do this)'
)
logger.debug(
'Note: a password that is not provided (i.e. None) will hash to "{}"',
sha256(str(None).encode('utf-8')).hexdigest(),
)
client.username_pw_set = (config.get('username'), config.get('password'))
try:
logger.verbose(
'Connecting to {}:{}', config.get('broker_address'), config.get('broker_port')
)
client.connect(
config.get('broker_address'),
config.get('broker_port'),
config.get('broker_timeout'),
)
logger.verbose('Connected to MQTT broker')
except Exception as e:
raise PluginWarning(f'Error connecting to MQTT broker: {e}')
try:
logger.verbose(
'Publishing message [{}] to topic [{}] ',
config.get('payload'),
config.get('topic'),
)
publish_info = client.publish(
config.get('topic'),
config.get('payload'),
qos=config.get('qos'),
retain=config.get('retain'),
)
logger.verbose(
'Notification sent to broker, waiting for callback response to confirm publishing success - rc={}',
publish_info,
)
except Exception as e:
raise PluginWarning(f'Error publishing to MQTT broker: {e}')
client.loop(timeout=config.get('broker_timeout'))
client.loop_start()
[docs]
@event('plugin.register')
def register_plugin():
plugin.register(MQTTNotifier, plugin_name, api_ver=2, interfaces=['notifiers'])