Source code for flexget.plugins.input.from_telegram

"""Plugin for telegram input."""

import re

from loguru import logger
from requests.exceptions import HTTPError

from flexget import plugin
from flexget.config_schema import one_or_more
from flexget.entry import Entry
from flexget.event import event

logger = logger.bind(name='from_telegram')

_TELEGRAM_API_URL = 'https://api.telegram.org/'


[docs] class TelegramInput: """Parse any messages from Telegram and fills fields with regex. Example:: token: <token> only_new: yes whitelist: - username: <my_username> - group: <my_group> - fullname: first: <my_first_name> sur: <my_surname> entry: <field>: <regexp to match value> Note: If not declared, title will be the message """ schema = { 'type': 'object', 'properties': { 'token': {'type': 'string'}, 'types': one_or_more({'type': 'string', 'enum': ['private', 'group']}), 'whitelist': { 'type': 'array', 'minItems': 1, 'items': { 'oneOf': [ { 'type': 'object', 'properties': {'username': {'type': 'string'}}, 'required': ['username'], 'additionalProperties': False, }, { 'type': 'object', 'properties': { 'fullname': { 'type': 'object', 'properties': { 'first': {'type': 'string'}, 'sur': {'type': 'string'}, }, 'required': ['first', 'sur'], 'additionalProperties': False, } }, 'required': ['fullname'], 'additionalProperties': False, }, { 'type': 'object', 'properties': {'group': {'type': 'string'}}, 'required': ['group'], 'additionalProperties': False, }, ] }, }, 'only_new': {'type': 'boolean', 'default': True}, 'entry': { 'type': 'object', 'properties': { 'url': {'type': 'string', 'format': 'regex'}, 'title': {'type': 'string', 'format': 'regex'}, }, 'additionalProperties': {'type': 'string', 'format': 'regex'}, }, }, 'required': ['token'], 'additionalProperties': False, } @plugin.internet(logger) def on_task_input(self, task, config): # Load The Configs token = config['token'] only_new = config['only_new'] entry_config = config.get('entry') whitelist = config.get('whitelist', []) types = config.get('types', ['private', 'group']) # Get Last Checked ID persistence_name = f'{token}_update_id' update_id = task.simple_persistence.get(persistence_name) # Get only new messages params = {} if update_id and only_new: update_id += 1 params['offset'] = update_id # The Target URL url = f'{_TELEGRAM_API_URL}bot{token}/getUpdates' # Get Telegram Updates try: response = task.requests.get(url, timeout=60, raise_status=True, params=params).json() except HTTPError as e: raise plugin.PluginError(f'Error getting telegram update: {e}') # We have a error if not response['ok']: raise plugin.PluginError( f'Telegram updater returned error {response["error_code"]}: {response["description"]}' ) # Get All New Messages messages = response['result'] entries = [] for message in messages: # This is the ID update_id = message['update_id'] # Update the last ID for the Bot logger.debug('Last Update set to {}', update_id) task.simple_persistence[persistence_name] = update_id # We Don't care if it's not a message or no text if ( 'message' not in message or 'text' not in message['message'] or 'chat' not in message['message'] or 'type' not in message['message']['chat'] ): logger.debug('Invalid message discarted: {}', message) continue logger.debug('Income message: {}', message) # Check Types if types and message['message']['chat']['type'] not in types: logger.debug('Ignoring message because of invalid type {}', message) continue # Create Base Entry text = message['message']['text'] entry = Entry() entry['title'] = text # We need a url, so we add a dummy entry['url'] = f'http://localhost?update_id={update_id!s}' # Store the message if we need to use it in other plugins entry['telegram_message'] = message['message'] # Check From message_from = message['message']['from'] message_chat = message['message']['chat'] if whitelist: for check in whitelist: if 'username' in check and check['username'] == message_from['username']: logger.debug('WhiteListing: Username {}', message_from['username']) break if ( 'fullname' in check and check['fullname']['first'] == message_from['first_name'] and check['fullname']['sur'] == message_from['last_name'] ): logger.debug( 'WhiteListing: Full Name {} {}', message_from['first_name'], message_from['last_name'], ) break if 'group' in check and ( message_chat['type'] == 'group' and message_chat['title'] == check['group'] ): logger.debug('WhiteListing: Group {}', message_chat['title']) break else: logger.debug('Ignoring message because of no whitelist match {}', message) continue # Process the entry config accept = True if entry_config: for field, regexp in entry_config.items(): match = re.search(regexp, text) if match: try: # Add field to entry entry[field] = match.group(1) except IndexError: logger.error( 'Regex for field `{}` must contain a capture group', field ) raise plugin.PluginError( 'Your from_telegram plugin config contains errors, please correct them.' ) else: logger.debug('Ignored entry, not match on field {}: {}', field, entry) accept = False break # Append the entry if accept: entries.append(entry) logger.debug('Added entry {}', entry) return entries
[docs] @event('plugin.register') def register_plugin(): plugin.register(TelegramInput, 'from_telegram', api_ver=2)