Source code for aiotg.bot

import os
import re
import logging
import asyncio
from urllib.parse import urlparse

import aiohttp
from aiohttp import web
import json

from . chat import Chat, Sender
from . reloader import run_with_reloader

__author__ = "Stepan Zastupov"
__copyright__ = "Copyright 2015-2017 Stepan Zastupov"
__license__ = "MIT"

API_URL = "https://api.telegram.org"
API_TIMEOUT = 60
RETRY_TIMEOUT = 30
RETRY_CODES = [429, 500, 502, 503, 504]
BOTAN_URL = "https://api.botan.io/track"

# Message types to be handled by bot.handle(...)
MESSAGE_TYPES = [
    "location", "photo", "document", "audio", "voice", "sticker", "contact",
    "venue", "video", "game", "delete_chat_photo", "new_chat_photo",
    "delete_chat_photo", "new_chat_member", "left_chat_member",
    "new_chat_title"
]

# Update types for
MESSAGE_UPDATES = [
    "message", "edited_message", "channel_post", "edited_channel_post"
]

logger = logging.getLogger("aiotg")


[docs]class Bot: """Telegram bot framework designed for asyncio :param str api_token: Telegram bot token, ask @BotFather for this :param int api_timeout: Timeout for long polling :param str botan_token: Token for http://botan.io :param str name: Bot name :param callable json_serialize: Json serializer function. (json.dumps() by default) :param callable json_deserialize: Json deserializer function. (json.loads() by default) :param bool default_in_groups: Enables default callback in groups """ _running = False _offset = 0 def __init__(self, api_token, api_timeout=API_TIMEOUT, botan_token=None, name=None, json_serialize=json.dumps, json_deserialize=json.loads, default_in_groups=False): self.api_token = api_token self.api_timeout = api_timeout self.botan_token = botan_token self.name = name self.json_serialize = json_serialize self.json_deserialize = json_deserialize self.default_in_groups = default_in_groups self.webhook_url = None self._session = None def no_handle(mt): return lambda chat, msg: logger.debug("no handle for %s", mt) # Init default handlers and callbacks self._handlers = {mt: no_handle(mt) for mt in MESSAGE_TYPES} self._commands = [] self._callbacks = [] self._inlines = [] self._default = lambda chat, message: None self._default_callback = lambda chat, cq: None self._default_inline = lambda iq: None
[docs] async def loop(self): """ Return bot's main loop as coroutine. Use with asyncio. :Example: >>> loop = asyncio.get_event_loop() >>> loop.run_until_complete(bot.loop()) or >>> loop = asyncio.get_event_loop() >>> loop.create_task(bot.loop()) """ self._running = True while self._running: updates = await self.api_call( 'getUpdates', offset=self._offset + 1, timeout=self.api_timeout ) self._process_updates(updates)
[docs] def run(self, debug=False, reload=None): """ Convenience method for running bots in getUpdates mode :param bool debug: Enable debug logging and automatic reloading :param bool reload: Automatically reload bot on code change :Example: >>> if __name__ == '__main__': >>> bot.run() """ loop = asyncio.get_event_loop() logging.basicConfig(level=logging.DEBUG if debug else logging.INFO) if reload is None: reload = debug try: if reload: loop.run_until_complete( run_with_reloader(loop, self.loop(), self.stop)) else: loop.run_until_complete(self.loop()) # User cancels except KeyboardInterrupt: logger.debug("User cancelled") self.stop() # Stop loop finally: logger.debug("Closing loop") loop.stop() loop.close()
[docs] def run_webhook(self, webhook_url, **options): """ Convenience method for running bots in webhook mode :Example: >>> if __name__ == '__main__': >>> bot.run_webhook(webhook_url="https://yourserver.com/webhooktoken") Additional documentation on https://core.telegram.org/bots/api#setwebhook """ loop = asyncio.get_event_loop() loop.run_until_complete(self.set_webhook(webhook_url, **options)) if webhook_url: url = urlparse(webhook_url) app = self.create_webhook_app(url.path, loop) host = os.environ.get('HOST', '0.0.0.0') port = int(os.environ.get('PORT', 0)) or url.port web.run_app(app, host=host, port=port)
[docs] def stop_webhook(self): """ Use to switch from Webhook to getUpdates mode """ self.run_webhook(webhook_url="")
[docs] def add_command(self, regexp, fn): """ Manually register regexp based command """ self._commands.append((regexp, fn))
[docs] def command(self, regexp): """ Register a new command :param str regexp: Regular expression matching the command to register :Example: >>> @bot.command(r"/echo (.+)") >>> def echo(chat, match): >>> return chat.reply(match.group(1)) """ def decorator(fn): self.add_command(regexp, fn) return fn return decorator
[docs] def default(self, callback): """ Set callback for default command that is called on unrecognized commands for 1-to-1 chats If default_in_groups option is True, callback is called in groups too :Example: >>> @bot.default >>> def echo(chat, message): >>> return chat.reply(message["text"]) """ self._default = callback return callback
[docs] def add_inline(self, regexp, fn): """ Manually register regexp based callback """ self._inlines.append((regexp, fn))
[docs] def inline(self, callback): """ Set callback for inline queries :Example: >>> @bot.inline >>> def echo(iq): >>> return iq.answer([ >>> {"type": "text", "title": "test", "id": "0"} >>> ]) >>> @bot.inline(r"myinline-(.+)") >>> def echo(chat, iq, match): >>> return iq.answer([ >>> {"type": "text", "title": "test", "id": "0"} >>> ]) """ if callable(callback): self._default_inline = callback return callback elif isinstance(callback, str): def decorator(fn): self.add_inline(callback, fn) return fn return decorator else: raise TypeError('str expected {} given'.format(type(callback)))
[docs] def add_callback(self, regexp, fn): """ Manually register regexp based callback """ self._callbacks.append((regexp, fn))
[docs] def callback(self, callback): """ Set callback for callback queries :Example: >>> @bot.callback >>> def echo(chat, cq): >>> return cq.answer() >>> @bot.callback(r"buttonclick-(.+)") >>> def echo(chat, cq, match): >>> return chat.reply(match.group(1)) """ if callable(callback): self._default_callback = callback return callback elif isinstance(callback, str): def decorator(fn): self.add_callback(callback, fn) return fn return decorator else: raise TypeError('str expected {} given'.format(type(callback)))
[docs] def handle(self, msg_type): """ Set handler for specific message type :Example: >>> @bot.handle("audio") >>> def handle(chat, audio): >>> pass """ def wrap(callback): self._handlers[msg_type] = callback return callback return wrap
[docs] def channel(self, channel_name): """ Construct a Chat object used to post to channel :param str channel_name: Channel name """ return Chat(self, channel_name, "channel")
[docs] def private(self, user_id): """ Construct a Chat object used to post direct messages :param str user_id: User id """ return Chat(self, user_id, "private")
[docs] def group(self, group_id): """ Construct a Chat object used to post group messages :param str group_id: Group chat id """ return Chat(self, group_id, "group")
[docs] def api_call(self, method, **params): """ Call Telegram API. See https://core.telegram.org/bots/api for reference. :param str method: Telegram API method :param params: Arguments for the method call """ coro = self._api_call(method, **params) # Explicitly ensure that API call is executed return asyncio.ensure_future(coro)
async def _api_call(self, method, **params): url = "{0}/bot{1}/{2}".format(API_URL, self.api_token, method) logger.debug("api_call %s, %s", method, params) response = await self.session.post(url, data=params) if response.status == 200: return await response.json(loads=self.json_deserialize) elif response.status in RETRY_CODES: logger.info("Server returned %d, retrying in %d sec.", response.status, RETRY_TIMEOUT) await response.release() await asyncio.sleep(RETRY_TIMEOUT) return await self.api_call(method, **params) else: if response.headers['content-type'] == 'application/json': err_msg = (await response.json(loads=self.json_deserialize))["description"] else: err_msg = await response.read() logger.error(err_msg) raise RuntimeError(err_msg)
[docs] async def get_me(self): """ Returns basic information about the bot (see https://core.telegram.org/bots/api#getme) """ json_result = await self.api_call("getMe") return json_result["result"]
[docs] async def leave_chat(self, chat_id): """ Use this method for your bot to leave a group, supergroup or channel. Returns True on success. :param int chat_id: Unique identifier for the target chat \ or username of the target supergroup or channel \ (in the format @channelusername) """ json_result = await self.api_call("leaveChat", chat_id=chat_id) return json_result["result"]
[docs] def send_message(self, chat_id, text, **options): """ Send a text message to chat :param int chat_id: ID of the chat to send the message to :param str text: Text to send :param options: Additional sendMessage options (see https://core.telegram.org/bots/api#sendmessage) """ return self.api_call("sendMessage", chat_id=chat_id, text=text, **options)
[docs] def edit_message_text(self, chat_id, message_id, text, **options): """ Edit a text message in a chat :param int chat_id: ID of the chat the message to edit is in :param int message_id: ID of the message to edit :param str text: Text to edit the message to :param options: Additional API options """ return self.api_call( "editMessageText", chat_id=chat_id, message_id=message_id, text=text, **options )
[docs] def edit_message_reply_markup(self, chat_id, message_id, reply_markup, **options): """ Edit a reply markup of message in a chat :param int chat_id: ID of the chat the message to edit is in :param int message_id: ID of the message to edit :param str reply_markup: New inline keyboard markup for the message :param options: Additional API options """ return self.api_call( "editMessageReplyMarkup", chat_id=chat_id, message_id=message_id, reply_markup=reply_markup, **options )
[docs] async def get_file(self, file_id): """ Get basic information about a file and prepare it for downloading. :param int file_id: File identifier to get information about :return: File object (see https://core.telegram.org/bots/api#file) """ json = await self.api_call("getFile", file_id=file_id) return json["result"]
[docs] def download_file(self, file_path, range=None): """ Download a file from Telegram servers """ headers = {"range": range} if range else None url = "{0}/file/bot{1}/{2}".format(API_URL, self.api_token, file_path) return self.session.get(url, headers=headers)
[docs] def get_user_profile_photos(self, user_id, **options): """ Get a list of profile pictures for a user :param int user_id: Unique identifier of the target user :param options: Additional getUserProfilePhotos options (see https://core.telegram.org/bots/api#getuserprofilephotos) """ return self.api_call( "getUserProfilePhotos", user_id=str(user_id), **options )
[docs] def track(self, message, name="Message"): """ Track message using http://botan.io Set botan_token to make it work """ if self.botan_token: asyncio.ensure_future(self._track(message, name))
[docs] def stop(self): self._running = False
[docs] async def webhook_handle(self, request): """ aiohttp.web handle for processing web hooks :Example: >>> from aiohttp import web >>> app = web.Application() >>> app.router.add_route('/webhook') """ update = await request.json(loads=self.json_deserialize) self._process_update(update) return web.Response()
[docs] def create_webhook_app(self, path, loop=None): """ Shorthand for creating aiohttp.web.Application with registered webhook hanlde """ app = web.Application(loop=loop) app.router.add_route('POST', path, self.webhook_handle) return app
[docs] def set_webhook(self, webhook_url, **options): """ Register you webhook url for Telegram service. """ return self.api_call( 'setWebhook', url=webhook_url, **options )
[docs] def delete_webhook(self): ''' Tell Telegram to switch back to getUpdates mode ''' return self.api_call('deleteWebhook')
@property def session(self): if not self._session: self._session = aiohttp.ClientSession(json_serialize=self.json_serialize) return self._session def __del__(self): try: if self._session: self._session.close() except Exception as e: logger.debug(e) async def _track(self, message, name): response = await self.session.post( BOTAN_URL, params={ "token": self.botan_token, "uid": message["from"]["id"], "name": name }, data=self.json_serialize(message), headers={'content-type': 'application/json'} ) if response.status != 200: logger.info("error submiting stats %d", response.status) await response.release() def _process_message(self, message): chat = Chat.from_message(self, message) for mt in MESSAGE_TYPES: if mt in message: self.track(message, mt) return self._handlers[mt](chat, message[mt]) if "text" not in message: return for patterns, handler in self._commands: m = re.search(patterns, message["text"], re.I) if m: self.track(message, handler.__name__) return handler(chat, m) # No match, run default if it's a 1to1 chat # However, if default_in_groups option is active, run default in any chat (not only 1to1) if not chat.is_group() or self.default_in_groups: self.track(message, "default") return self._default(chat, message) def _process_inline_query(self, query): iq = InlineQuery(self, query) for patterns, handler in self._inlines: match = re.search(patterns, query['query'], re.I) if match: return handler(iq, match) return self._default_inline(iq) def _process_callback_query(self, query): chat = Chat.from_message(self, query["message"]) cq = CallbackQuery(self, query) for patterns, handler in self._callbacks: match = re.search(patterns, cq.data, re.I) if match: return handler(chat, cq, match) if not chat.is_group() or self.default_in_groups: return self._default_callback(chat, cq) def _process_updates(self, updates): if not updates["ok"]: logger.error("getUpdates error: %s", updates.get("description")) return for update in updates["result"]: self._process_update(update) def _process_update(self, update): logger.debug("update %s", update) # Update offset self._offset = max(self._offset, update["update_id"]) coro = None # Determine update type starting with message updates for ut in MESSAGE_UPDATES: if ut in update: coro = self._process_message(update[ut]) break else: if "inline_query" in update: coro = self._process_inline_query(update["inline_query"]) elif "callback_query" in update: coro = self._process_callback_query(update["callback_query"]) if coro: asyncio.ensure_future(coro)
[docs]class TgBot(Bot): def __init__(self, *args, **kwargs): logger.warning("TgBot is depricated, use Bot instead") super().__init__(*args, **kwargs)
[docs]class InlineQuery: """ Incoming inline query See https://core.telegram.org/bots/api#inline-mode for details """ def __init__(self, bot, src): self.bot = bot self.sender = Sender(src['from']) self.query_id = src['id'] self.query = src['query']
[docs] def answer(self, results, **options): return self.bot.api_call( "answerInlineQuery", inline_query_id=self.query_id, results=self.bot.json_serialize(results), **options )
[docs]class TgInlineQuery(InlineQuery): def __init__(self, *args, **kwargs): logger.warning("TgInlineQuery is depricated, use InlineQuery instead") super().__init__(*args, **kwargs)
[docs]class CallbackQuery: def __init__(self, bot, src): self.bot = bot self.query_id = src['id'] self.data = src['data'] self.src = src
[docs] def answer(self, **options): return self.bot.api_call( "answerCallbackQuery", callback_query_id=self.query_id, **options )