From 19ce5c18b6cef30a0fc55c854e25db98712503a3 Mon Sep 17 00:00:00 2001 From: Alex Root Junior Date: Sat, 5 Aug 2017 04:13:12 +0300 Subject: [PATCH] What about response to Webhook? https://core.telegram.org/bots/faq#how-can-i-make-requests-in-response-to-updates --- aiogram/dispatcher/__init__.py | 53 +++++++--- aiogram/dispatcher/handler.py | 8 +- aiogram/dispatcher/webhook.py | 185 +++++++++++++++++++++++++++++++++ 3 files changed, 231 insertions(+), 15 deletions(-) create mode 100644 aiogram/dispatcher/webhook.py diff --git a/aiogram/dispatcher/__init__.py b/aiogram/dispatcher/__init__.py index 08f23d2c..28e53014 100644 --- a/aiogram/dispatcher/__init__.py +++ b/aiogram/dispatcher/__init__.py @@ -5,6 +5,7 @@ import typing from .filters import CommandsFilter, RegexpFilter, ContentTypeFilter, generate_default_filters from .handler import Handler from .storage import DisabledStorage, BaseStorage, FSMContext +from .webhook import BaseResponse from ..bot import Bot from ..types.message import ContentType @@ -74,8 +75,10 @@ class Dispatcher: :param updates: :return: """ + tasks = [] for update in updates: - self.loop.create_task(self.updates_handler.notify(update)) + tasks.append(self.loop.create_task(self.updates_handler.notify(update))) + return await asyncio.gather(*tasks) async def process_update(self, update): """ @@ -86,30 +89,31 @@ class Dispatcher: """ self.last_update_id = update.update_id if update.message: - await self.message_handlers.notify(update.message) + return await self.message_handlers.notify(update.message) if update.edited_message: - await self.edited_message_handlers.notify(update.edited_message) + return await self.edited_message_handlers.notify(update.edited_message) if update.channel_post: - await self.channel_post_handlers.notify(update.channel_post) + return await self.channel_post_handlers.notify(update.channel_post) if update.edited_channel_post: - await self.edited_channel_post_handlers.notify(update.edited_channel_post) + return await self.edited_channel_post_handlers.notify(update.edited_channel_post) if update.inline_query: - await self.inline_query_handlers.notify(update.inline_query) + return await self.inline_query_handlers.notify(update.inline_query) if update.chosen_inline_result: - await self.chosen_inline_result_handlers.notify(update.chosen_inline_result) + return await self.chosen_inline_result_handlers.notify(update.chosen_inline_result) if update.callback_query: - await self.callback_query_handlers.notify(update.callback_query) + return await self.callback_query_handlers.notify(update.callback_query) if update.shipping_query: - await self.shipping_query_handlers.notify(update.shipping_query) + return await self.shipping_query_handlers.notify(update.shipping_query) if update.pre_checkout_query: - await self.pre_checkout_query_handlers.notify(update.pre_checkout_query) + return await self.pre_checkout_query_handlers.notify(update.pre_checkout_query) - async def start_pooling(self, timeout=20, relax=0.1): + async def start_pooling(self, timeout=20, relax=0.1, limit=None): """ Start long-pooling :param timeout: :param relax: + :param limit: :return: """ if self._pooling: @@ -120,21 +124,42 @@ class Dispatcher: offset = None while self._pooling: try: - updates = await self.bot.get_updates(offset=offset, timeout=timeout) + updates = await self.bot.get_updates(limit=limit, offset=offset, timeout=timeout) except Exception as e: log.exception('Cause exception while getting updates') - await asyncio.sleep(relax) + if relax: + await asyncio.sleep(relax) continue if updates: log.info("Received {0} updates.".format(len(updates))) offset = updates[-1].update_id + 1 - await self.process_updates(updates) + + self.loop.create_task(self._process_pooling_updates(updates)) await asyncio.sleep(relax) log.warning('Pooling is stopped.') + async def _process_pooling_updates(self, updates): + """ + Process updates received from long-pooling. + + :param updates: list of updates. + """ + need_to_call = [] + for update in await self.process_updates(updates): + for responses in update: + for response in responses: + if not isinstance(response, BaseResponse): + continue + need_to_call.append(response.execute_response(self.bot)) + if need_to_call: + try: + asyncio.gather(*need_to_call) + except Exception as e: + log.exception('Cause exception while processing updates.') + def stop_pooling(self): """ Break long-pooling process. diff --git a/aiogram/dispatcher/handler.py b/aiogram/dispatcher/handler.py index 1cd5bb98..92ef9e7c 100644 --- a/aiogram/dispatcher/handler.py +++ b/aiogram/dispatcher/handler.py @@ -34,13 +34,19 @@ class Handler: raise ValueError('This handler is not registered!') async def notify(self, *args, **kwargs): + results = [] + for filters, handler in self.handlers: if await check_filters(filters, args, kwargs): try: - await handler(*args, **kwargs) + response = await handler(*args, **kwargs) + if results is not None: + results.append(response) if self.once: break except SkipHandler: continue except CancelHandler: break + + return results diff --git a/aiogram/dispatcher/webhook.py b/aiogram/dispatcher/webhook.py new file mode 100644 index 00000000..4a7c8505 --- /dev/null +++ b/aiogram/dispatcher/webhook.py @@ -0,0 +1,185 @@ +import typing +from typing import Union, Dict, Optional + +from aiohttp import web + +from aiogram import types +from aiogram.bot import api +from aiogram.bot.base import Integer, String, Boolean +from aiogram.utils.payload import prepare_arg +from ..utils import json + +DEFAULT_WEB_PATH = '/webhook' +BOT_DISPATCHER_KEY = 'BOT_DISPATCHER' + + +class WebhookRequestHandler(web.View): + """ + Simple Wehhook request handler for aiohttp web server. + + You need to register that in app: + + .. code-block:: python3 + + app.router.add_route('*', '/your/webhook/path', WebhookRequestHadler, name='webhook_handler') + + But first you need to configure application for getting Dispatcher instance from request handler! + It must always be with key 'BOT_DISPATCHER' + + .. code-block:: python3 + + bot = Bot(TOKEN, loop) + dp = Dispatcher(bot) + app['BOT_DISPATCHER'] = dp + + """ + def get_dispatcher(self): + """ + Get Dispatcher instance from environment + + :return: :class:`aiogram.Dispatcher` + """ + return self.request.app[BOT_DISPATCHER_KEY] + + async def parse_update(self, bot): + """ + Read update from stream and deserialize it. + + :param bot: bot instance. You an get it from Dispatcher + :return: :class:`aiogram.types.Update` + """ + data = await self.request.json() + update = types.Update.deserialize(data) + bot.prepare_object(update, parent=bot) + return update + + async def post(self): + """ + Process POST request + + if one of handler returns instance of :class:`aiogram.dispatcher.webhook.BaseResponse` return it to webhook. + Otherwise do nothing (return 'ok') + + :return: :class:`aiohttp.web.Response` + """ + dispatcher = self.get_dispatcher() + update = await self.parse_update(dispatcher.bot) + results = await dispatcher.process_update(update) + + for result in results: + if isinstance(result, BaseResponse): + return result.get_web_response() + return web.Response(text='ok') + + +def configure_app(dispatcher, app: web.Application, path=DEFAULT_WEB_PATH): + """ + You can prepare web.Application for working with webhook handler. + + :param dispatcher: Dispatcher instance + :param app: :class:`aiohttp.web.Application` + :param path: Path to your webhook. + :return: + """ + app.router.add_route('*', path, WebhookRequestHandler, name='webhook_handler') + app[BOT_DISPATCHER_KEY] = dispatcher + + +def get_new_configured_app(dispatcher, path=DEFAULT_WEB_PATH): + """ + Create new :class:`aiohttp.web.Application` and configure it. + + :param dispatcher: Dispatcher instance + :param path: Path to your webhook. + :return: + """ + app = web.Application() + configure_app(dispatcher, app, path) + return app + + +class BaseResponse: + """ + Base class for webhook responses. + """ + method = None + + def prepare(self) -> typing.Dict: + """ + You need to owerwrite this method. + + :return: response parameters dict + """ + raise NotImplementedError + + def cleanup(self) -> typing.Dict: + """ + Cleanup response after preparing. Remove empty fields. + + :return: response parameters dict + """ + return {k: v for k, v in self.prepare().items() if v is not None} + + def get_response(self): + """ + Get response object + + :return: + """ + return {'method': self.method, **self.cleanup()} + + def get_web_response(self): + """ + Get prepared web response with JSON data. + + :return: :class:`aiohttp.web.Response` + """ + return web.json_response(self.get_response(), dumps=json.dumps) + + async def execute_response(self, bot): + """ + Use this method if you want to execute response as simple HTTP request. + + :param bot: Bot instance. + :return: + """ + return await bot.request(self.method, self.cleanup()) + + +class SendMessage(BaseResponse): + """ + You can send message with webhook by using this instance of this object. + All arguments is equal with :method:`Bot.send_message` method. + """ + __slots__ = ('chat_id', 'text', 'parse_mode', + 'disable_web_page_preview', 'disable_notification', + 'reply_to_message_id', 'reply_markup') + + method = api.Methods.SEND_MESSAGE + + def __init__(self, chat_id: Union[Integer, String], + text: String, + parse_mode: Optional[String] = None, + disable_web_page_preview: Optional[Boolean] = None, + disable_notification: Optional[Boolean] = None, + reply_to_message_id: Optional[Integer] = None, + reply_markup: Optional[Union[ + types.InlineKeyboardMarkup, types.ReplyKeyboardMarkup, Dict, String]] = None): + self.chat_id = chat_id + self.text = text + self.parse_mode = parse_mode + self.disable_web_page_preview = disable_web_page_preview + self.disable_notification = disable_notification + self.reply_to_message_id = reply_to_message_id + self.reply_markup = reply_markup + + def prepare(self) -> dict: + return { + 'chat_id': self.chat_id, + 'text': self.text, + 'parse_mode': self.parse_mode, + 'disable_web_page_preview': self.disable_web_page_preview, + 'disable_notification': self.disable_notification, + 'reply_to_message_id': self.reply_to_message_id, + 'reply_markup': prepare_arg(self.reply_markup) + }