Source code for flexget.components.notify.notifiers.telegram

from __future__ import annotations

import asyncio
from collections import defaultdict
from importlib.metadata import PackageNotFoundError, version
from pathlib import Path
from textwrap import wrap
from typing import TYPE_CHECKING

from loguru import logger
from packaging.version import Version
from sqlalchemy import Column, Integer, String

from flexget import db_schema, plugin
from flexget.event import event
from flexget.manager import Session
from flexget.plugin import PluginError

if TYPE_CHECKING:
    from collections.abc import AsyncGenerator

    import sqlalchemy

try:
    if Version(version('python-telegram-bot')) < Version('21.9'):
        raise plugin.PluginWarning('obsolete python-telegram-bot pkg')
except PackageNotFoundError:
    pass
else:
    import telegram
    from telegram.error import ChatMigrated, NetworkError, TelegramError
    from telegram.ext import ApplicationBuilder

try:
    from PIL import Image
except ImportError:
    Image = None

_PLUGIN_NAME = 'telegram'
_TEXT_LIMIT = 4096
_PARSERS = {'html': 'HTML', 'markdown': 'MarkdownV2', 'markdown_legacy': 'Markdown'}

_DISABLE_PREVIEWS_ATTR = 'disable_previews'
_TOKEN_ATTR = 'bot_token'
_PARSE_ATTR = 'parse_mode'
_RCPTS_ATTR = 'recipients'
_CHATID_ATTR = 'chat_id'
_USERNAME_ATTR = 'username'
_FULLNAME_ATTR = 'fullname'
_FIRSTNAME_ATTR = 'first'
_SURNAME_ATTR = 'sur'
_GROUP_ATTR = 'group'
_SOCKSPROXY_ATTR = 'socks_proxy'
_IMAGES_ATTR = 'images'

ChatIdsBase = db_schema.versioned_base('telegram_chat_ids', 0)

logger = logger.bind(name=_PLUGIN_NAME)


[docs] class ChatIdEntry(ChatIdsBase): __tablename__ = 'telegram_chat_ids' id = Column(Integer, primary_key=True) username = Column(String, index=True, nullable=True) firstname = Column(String, index=True, nullable=True) surname = Column(String, index=True, nullable=True) group = Column(String, index=True, nullable=True) def __str__(self): x = [f'id={self.id}'] if self.username: x.append(f'username={self.username}') if self.firstname: x.append(f'firstname={self.firstname}') if self.surname: x.append(f'surname={self.surname}') if self.group: x.append(f'group={self.group}') return ' '.join(x)
[docs] class TelegramNotifier: """Send a message to one or more Telegram users or groups upon accepting a download. Preparations: * Install 'python-telegram-bot' python pkg (i.e. ``pip install python-telegram-bot``) * Create a bot & obtain a token for it (see https://core.telegram.org/bots#botfather). * For direct messages (not to a group), start a conversation with the bot and click "START" in the Telegram app. * For group messages, add the bot to the desired group and send a start message to the bot: "/start" (heed the leading '/'). Configuration example:: my-task: notify: title: {{title}} message: {{title}} entries: via: telegram: bot_token: token use_markdown: no disable_previews: yes images: - image1.png - image2.jpg recipients: - chat_id: ask @raw_data_bot (most recommended) - username: my-user-name - group: my-group-name - fullname: first: my-first-name sur: my-sur-name socks_proxy: socks5://user:pass@host:port Configuration notes You may use any combination of recipients types (``username``, ``group`` or ``fullname``) - 0 or more of each (but you need at least one total...). ``parse_mode`` Optional. Whether the template uses ``markdown`` or ``html`` formatting. Note: The markdown parser will fall back to basic parsing if there is a parsing error. This can be cause due to unclosed tags (watch out for wandering underscore when using markdown) ``chat_id`` vs. ``username`` vs. ``fullname`` The ``chat_id`` approach is the most recommended, because with this approach, you don't have to send a message to get the chat ID before the program runs. In addition, it is the most stable. Even if you change your name, the program will still work properly. ``chat_id`` can be a user's or a group's (including private groups). If the chat is a group, the chat id is negative. If it is a single person, then positive. """ _token = None _chat_ids_from_config = None _usernames = None _fullnames = None _groups = None _images = None _bot = None schema = { 'type': 'object', 'properties': { _TOKEN_ATTR: {'type': 'string'}, _PARSE_ATTR: {'type': 'string', 'enum': list(_PARSERS.keys())}, _DISABLE_PREVIEWS_ATTR: {'type': 'boolean', 'default': False}, _RCPTS_ATTR: { 'type': 'array', 'minItems': 1, 'items': { 'oneOf': [ { 'type': 'object', 'properties': {_CHATID_ATTR: {'type': 'integer'}}, 'required': [_CHATID_ATTR], 'additionalProperties': False, }, { 'type': 'object', 'properties': {_USERNAME_ATTR: {'type': 'string'}}, 'required': [_USERNAME_ATTR], 'additionalProperties': False, }, { 'type': 'object', 'properties': { _FULLNAME_ATTR: { 'type': 'object', 'properties': { _FIRSTNAME_ATTR: {'type': 'string'}, _SURNAME_ATTR: {'type': 'string'}, }, 'required': [_FIRSTNAME_ATTR, _SURNAME_ATTR], 'additionalProperties': False, } }, 'required': [_FULLNAME_ATTR], 'additionalProperties': False, }, { 'type': 'object', 'properties': {_GROUP_ATTR: {'type': 'string'}}, 'required': [_GROUP_ATTR], 'additionalProperties': False, }, ] }, }, _SOCKSPROXY_ATTR: {'type': 'string'}, _IMAGES_ATTR: {'type': 'array', 'items': {'type': 'string'}}, }, 'required': [_TOKEN_ATTR, _RCPTS_ATTR], 'additionalProperties': False, }
[docs] def notify(self, title: str, message: str, config: dict) -> None: self._load_config(config) asyncio.run(self.main(message))
[docs] async def main(self, message: str) -> None: """Send a Telegram notification.""" async with ( ApplicationBuilder() .token(self._token) .proxy(self.socks_proxy) .get_updates_proxy(self.socks_proxy) .build() .bot as bot ): self._bot = bot await self._check_token() session = Session() chat_ids = await self._get_chat_ids_and_update_db(session) if not chat_ids: return await self._send_msgs(message, chat_ids, session) if self._images and message: await self._send_images(chat_ids, session)
[docs] def _load_config(self, config: dict) -> None: self._token = config[_TOKEN_ATTR] self._parse_mode = config.get(_PARSE_ATTR) self._disable_previews = config[_DISABLE_PREVIEWS_ATTR] result = defaultdict(list) for i in config[_RCPTS_ATTR]: for k, v in i.items(): result[k].append(v) self._chat_ids_from_config = set(result[_CHATID_ATTR]) self._usernames = result[_USERNAME_ATTR] self._fullnames = [(d[_FIRSTNAME_ATTR], d[_SURNAME_ATTR]) for d in result[_FULLNAME_ATTR]] self._groups = result[_GROUP_ATTR] self.socks_proxy = config.get(_SOCKSPROXY_ATTR) self._images = config.get(_IMAGES_ATTR) logger.debug( 'token={}, parse_mode={}, disable_previews={}, usernames={}, fullnames={}, groups={}, images={}', self._token, self._parse_mode, self._disable_previews, self._usernames, self._fullnames, self._groups, self._images, )
[docs] async def _check_token(self) -> None: try: await self._bot.get_me() except UnicodeDecodeError as e: logger.trace(f'bot.get_me() raised: {e!r}') raise plugin.PluginWarning('invalid bot token') except (NetworkError, TelegramError) as e: logger.error( 'Could not connect Telegram servers at this time, please try again later: {}', e.message, )
[docs] async def _replace_chat_id( self, old_id: int, new_id: int, session: sqlalchemy.orm.Session ) -> None: _upd_usernames, _upd_fullnames, upd_groups = await self._get_bot_updates() for group in upd_groups: grp = upd_groups.get(group) if grp.id == new_id: old_data = session.query(ChatIdEntry).filter(ChatIdEntry.id == old_id).first() session.delete(old_data) entry = ChatIdEntry(id=grp.id, group=grp.title) self._update_db(session, [entry]) break
[docs] async def _send_msgs( self, msg: str, chat_ids: set[int], session: sqlalchemy.orm.Session ) -> None: kwargs = {} if self._parse_mode: kwargs['parse_mode'] = _PARSERS[self._parse_mode] kwargs['disable_web_page_preview'] = self._disable_previews for chat_id in chat_ids: for paragraph in wrap(msg, _TEXT_LIMIT, replace_whitespace=False): try: logger.debug('sending paragraph to telegram servers: {}', paragraph) await self._bot.send_message(chat_id=chat_id, text=paragraph, **kwargs) except ChatMigrated as e: logger.debug('Chat migrated to id {}', e.new_chat_id) await self._bot.send_message(chat_id=e.new_chat_id, text=paragraph, **kwargs) await self._replace_chat_id(chat_id, e.new_chat_id, session) except TelegramError as e: if kwargs.get('parse_mode'): logger.warning( 'Failed to render message using parse mode {}. Falling back to basic parsing: {}', kwargs['parse_mode'], e.message, ) del kwargs['parse_mode'] await self._bot.send_message(chat_id=chat_id, text=paragraph, **kwargs) else: raise
[docs] async def _send_images(self, chat_ids: set[int], session: sqlalchemy.orm.Session) -> None: for chat_id in chat_ids: for image in self._images: with Image.open(image) as photo: width = photo.width height = photo.height can_send_photo = width + height <= 10000 and width / height <= 20 try: await ( self._bot.send_photo(chat_id=chat_id, photo=Path(image)) if can_send_photo else self._bot.send_document(chat_id=chat_id, document=Path(image)) ) except ChatMigrated as e: await ( self._bot.send_photo(chat_id=e.new_chat_id, photo=Path(image)) if can_send_photo else self._bot.send_document(chat_id=e.new_chat_id, document=Path(image)) ) await self._replace_chat_id(chat_id, e.new_chat_id, session)
[docs] async def _get_chat_ids_and_update_db(self, session: sqlalchemy.orm.Session) -> set[int]: usernames = list(self._usernames) fullnames = list(self._fullnames) groups = list(self._groups) chat_id_entries, has_new_chat_ids = await self._get_chat_id_entries( session, usernames, fullnames, groups ) logger.debug('chat_id_entries={}', chat_id_entries) # TODO: The situation where the new chat_id from ``ChatMigrated`` exception needs to be written to the # database if the chat_id specified directly in the configuration file is migrated is not considered. # This will cause one more HTTP request to be sent each time the program runs. However, this situation # only occurs when the chat_id directly specified in the configuration file is a group and it is migrated. chat_ids = {x.id for x in chat_id_entries} | self._chat_ids_from_config if not chat_ids: raise PluginError( 'no chat id found, try manually sending the bot any message to initialize the chat' ) if chat_id_entries: if usernames: logger.warning('no chat id found for usernames: {}', usernames) if fullnames: logger.warning('no chat id found for fullnames: {}', fullnames) if groups: logger.warning('no chat id found for groups: {}', groups) if has_new_chat_ids: self._update_db(session, chat_id_entries) return chat_ids
[docs] async def _get_chat_id_entries( self, session: sqlalchemy.orm.Session, usernames: list[str], fullnames: list[tuple[str, str]], groups: list[str], ) -> tuple[list[ChatIdEntry], bool]: """Get chat ids for ``usernames``, ``fullnames`` & ``groups``. Entries with a matching chat ids will be removed from the input lists. """ logger.debug('loading cached chat ids') chat_id_entries = self._get_cached_chat_id_entries(session, usernames, fullnames, groups) logger.debug( 'found {} cached chat_ids: {}', len(chat_id_entries), [f'{x}' for x in chat_id_entries] ) if not (usernames or fullnames or groups): logger.debug('all chat ids found in cache') return chat_id_entries, False logger.debug('loading new chat ids') new_chat_ids = [ e async for e in self._get_new_chat_id_entries(usernames, fullnames, groups) ] logger.debug( 'found {} new chat_ids: {}', len(new_chat_ids), [f'{x}' for x in new_chat_ids] ) chat_id_entries += new_chat_ids return chat_id_entries, bool(new_chat_ids)
[docs] @staticmethod def _get_cached_chat_id_entries( session: sqlalchemy.orm.Session, usernames: list[str], fullnames: list[tuple[str, str]], groups: list[str], ) -> list[ChatIdEntry]: """Get chat ids from the cache (DB). remove found entries from ``usernames``, ``fullnames`` & ``groups``.""" chat_id_entries = [] cached_usernames = { x.username: x for x in session.query(ChatIdEntry).filter(ChatIdEntry.username.is_not(None)).all() } cached_fullnames = { (x.firstname, x.surname): x for x in session.query(ChatIdEntry).filter(ChatIdEntry.firstname.is_not(None)).all() } cached_groups = { x.group: x for x in session.query(ChatIdEntry).filter(ChatIdEntry.group.is_not(None)).all() } len_ = len(usernames) for i, username in enumerate(reversed(usernames)): item = cached_usernames.get(username) if item: chat_id_entries.append(item) usernames.pop(len_ - i - 1) len_ = len(fullnames) for i, fullname in enumerate(reversed(fullnames)): item = cached_fullnames.get(fullname) if item: chat_id_entries.append(item) fullnames.pop(len_ - i - 1) len_ = len(groups) for i, grp in enumerate(reversed(groups)): item = cached_groups.get(grp) if item: chat_id_entries.append(item) groups.pop(len_ - i - 1) return chat_id_entries
[docs] async def _get_new_chat_id_entries( self, usernames: list[str], fullnames: list[tuple[str, str]], groups: list[str] ) -> AsyncGenerator[ChatIdEntry, None]: """Get chat ids by querying the telegram ``bot``.""" upd_usernames, upd_fullnames, upd_groups = await self._get_bot_updates() len_ = len(usernames) for i, username in enumerate(reversed(usernames)): chat = upd_usernames.get(username) if chat is not None: entry = ChatIdEntry( id=chat.id, username=chat.username, firstname=chat.first_name, surname=chat.last_name, ) yield entry usernames.pop(len_ - i - 1) len_ = len(fullnames) for i, fullname in enumerate(reversed(fullnames)): chat = upd_fullnames.get(fullname) if chat is not None: entry = ChatIdEntry( id=chat.id, username=chat.username, firstname=chat.first_name, surname=chat.last_name, ) yield entry fullnames.pop(len_ - i - 1) len_ = len(groups) for i, grp in enumerate(reversed(groups)): chat = upd_groups.get(grp) if chat is not None: entry = ChatIdEntry(id=chat.id, group=chat.title) yield entry groups.pop(len_ - i - 1)
[docs] async def _get_bot_updates( self, ) -> tuple[ dict[str, telegram.Chat], dict[tuple[str, str], telegram.Chat], dict[str, telegram.Chat] ]: """Get updated chats info from telegram.""" # highly unlikely, but if there are more than ``telegram.constants.PollingLimit.MAX_LIMIT`` # msgs waiting for the bot, we should not miss one total_updates = [] last_update = 0 while True: updates = await self._bot.get_updates( last_update, limit=telegram.constants.PollingLimit.MAX_LIMIT ) total_updates.extend(updates) if len(updates) < telegram.constants.PollingLimit.MAX_LIMIT: break last_update = updates[-1].update_id usernames = {} fullnames = {} groups = {} for update in total_updates: if update.message: chat = update.message.chat elif update.edited_message: chat = update.edited_message.chat elif update.channel_post: chat = update.channel_post.chat else: logger.warning('Unknown update type encountered: {}', update) continue if chat.type == 'private': usernames[chat.username] = chat fullnames[(chat.first_name, chat.last_name)] = chat elif chat.type in ('group', 'supergroup', 'channel'): groups[chat.title] = chat else: logger.warning('unknown chat type: {}}}', type(chat)) return usernames, fullnames, groups
[docs] def _update_db( self, session: sqlalchemy.orm.Session, chat_id_entries: list[ChatIdEntry] ) -> None: """Update the DB with found ``chat_ids``.""" logger.info('saving updated chat_ids to db') # avoid duplicate chat_ids. (this is possible if configuration specified both username & fullname chat_id_mappings = {x.id: x for x in chat_id_entries} session.add_all(iter(chat_id_mappings.values())) session.commit()
[docs] @event('plugin.register') def register_plugin(): plugin.register(TelegramNotifier, _PLUGIN_NAME, api_ver=2, interfaces=['notifiers'])