This commit is contained in:
Alex Root Junior 2017-08-05 04:13:12 +03:00
parent 2ac011521c
commit 19ce5c18b6
3 changed files with 231 additions and 15 deletions

View file

@ -5,6 +5,7 @@ import typing
from .filters import CommandsFilter, RegexpFilter, ContentTypeFilter, generate_default_filters
from .handler import Handler
from .storage import DisabledStorage, BaseStorage, FSMContext
from .webhook import BaseResponse
from ..bot import Bot
from ..types.message import ContentType
@ -74,8 +75,10 @@ class Dispatcher:
:param updates:
:return:
"""
tasks = []
for update in updates:
self.loop.create_task(self.updates_handler.notify(update))
tasks.append(self.loop.create_task(self.updates_handler.notify(update)))
return await asyncio.gather(*tasks)
async def process_update(self, update):
"""
@ -86,30 +89,31 @@ class Dispatcher:
"""
self.last_update_id = update.update_id
if update.message:
await self.message_handlers.notify(update.message)
return await self.message_handlers.notify(update.message)
if update.edited_message:
await self.edited_message_handlers.notify(update.edited_message)
return await self.edited_message_handlers.notify(update.edited_message)
if update.channel_post:
await self.channel_post_handlers.notify(update.channel_post)
return await self.channel_post_handlers.notify(update.channel_post)
if update.edited_channel_post:
await self.edited_channel_post_handlers.notify(update.edited_channel_post)
return await self.edited_channel_post_handlers.notify(update.edited_channel_post)
if update.inline_query:
await self.inline_query_handlers.notify(update.inline_query)
return await self.inline_query_handlers.notify(update.inline_query)
if update.chosen_inline_result:
await self.chosen_inline_result_handlers.notify(update.chosen_inline_result)
return await self.chosen_inline_result_handlers.notify(update.chosen_inline_result)
if update.callback_query:
await self.callback_query_handlers.notify(update.callback_query)
return await self.callback_query_handlers.notify(update.callback_query)
if update.shipping_query:
await self.shipping_query_handlers.notify(update.shipping_query)
return await self.shipping_query_handlers.notify(update.shipping_query)
if update.pre_checkout_query:
await self.pre_checkout_query_handlers.notify(update.pre_checkout_query)
return await self.pre_checkout_query_handlers.notify(update.pre_checkout_query)
async def start_pooling(self, timeout=20, relax=0.1):
async def start_pooling(self, timeout=20, relax=0.1, limit=None):
"""
Start long-pooling
:param timeout:
:param relax:
:param limit:
:return:
"""
if self._pooling:
@ -120,21 +124,42 @@ class Dispatcher:
offset = None
while self._pooling:
try:
updates = await self.bot.get_updates(offset=offset, timeout=timeout)
updates = await self.bot.get_updates(limit=limit, offset=offset, timeout=timeout)
except Exception as e:
log.exception('Cause exception while getting updates')
await asyncio.sleep(relax)
if relax:
await asyncio.sleep(relax)
continue
if updates:
log.info("Received {0} updates.".format(len(updates)))
offset = updates[-1].update_id + 1
await self.process_updates(updates)
self.loop.create_task(self._process_pooling_updates(updates))
await asyncio.sleep(relax)
log.warning('Pooling is stopped.')
async def _process_pooling_updates(self, updates):
"""
Process updates received from long-pooling.
:param updates: list of updates.
"""
need_to_call = []
for update in await self.process_updates(updates):
for responses in update:
for response in responses:
if not isinstance(response, BaseResponse):
continue
need_to_call.append(response.execute_response(self.bot))
if need_to_call:
try:
asyncio.gather(*need_to_call)
except Exception as e:
log.exception('Cause exception while processing updates.')
def stop_pooling(self):
"""
Break long-pooling process.

View file

@ -34,13 +34,19 @@ class Handler:
raise ValueError('This handler is not registered!')
async def notify(self, *args, **kwargs):
results = []
for filters, handler in self.handlers:
if await check_filters(filters, args, kwargs):
try:
await handler(*args, **kwargs)
response = await handler(*args, **kwargs)
if results is not None:
results.append(response)
if self.once:
break
except SkipHandler:
continue
except CancelHandler:
break
return results

View file

@ -0,0 +1,185 @@
import typing
from typing import Union, Dict, Optional
from aiohttp import web
from aiogram import types
from aiogram.bot import api
from aiogram.bot.base import Integer, String, Boolean
from aiogram.utils.payload import prepare_arg
from ..utils import json
DEFAULT_WEB_PATH = '/webhook'
BOT_DISPATCHER_KEY = 'BOT_DISPATCHER'
class WebhookRequestHandler(web.View):
"""
Simple Wehhook request handler for aiohttp web server.
You need to register that in app:
.. code-block:: python3
app.router.add_route('*', '/your/webhook/path', WebhookRequestHadler, name='webhook_handler')
But first you need to configure application for getting Dispatcher instance from request handler!
It must always be with key 'BOT_DISPATCHER'
.. code-block:: python3
bot = Bot(TOKEN, loop)
dp = Dispatcher(bot)
app['BOT_DISPATCHER'] = dp
"""
def get_dispatcher(self):
"""
Get Dispatcher instance from environment
:return: :class:`aiogram.Dispatcher`
"""
return self.request.app[BOT_DISPATCHER_KEY]
async def parse_update(self, bot):
"""
Read update from stream and deserialize it.
:param bot: bot instance. You an get it from Dispatcher
:return: :class:`aiogram.types.Update`
"""
data = await self.request.json()
update = types.Update.deserialize(data)
bot.prepare_object(update, parent=bot)
return update
async def post(self):
"""
Process POST request
if one of handler returns instance of :class:`aiogram.dispatcher.webhook.BaseResponse` return it to webhook.
Otherwise do nothing (return 'ok')
:return: :class:`aiohttp.web.Response`
"""
dispatcher = self.get_dispatcher()
update = await self.parse_update(dispatcher.bot)
results = await dispatcher.process_update(update)
for result in results:
if isinstance(result, BaseResponse):
return result.get_web_response()
return web.Response(text='ok')
def configure_app(dispatcher, app: web.Application, path=DEFAULT_WEB_PATH):
"""
You can prepare web.Application for working with webhook handler.
:param dispatcher: Dispatcher instance
:param app: :class:`aiohttp.web.Application`
:param path: Path to your webhook.
:return:
"""
app.router.add_route('*', path, WebhookRequestHandler, name='webhook_handler')
app[BOT_DISPATCHER_KEY] = dispatcher
def get_new_configured_app(dispatcher, path=DEFAULT_WEB_PATH):
"""
Create new :class:`aiohttp.web.Application` and configure it.
:param dispatcher: Dispatcher instance
:param path: Path to your webhook.
:return:
"""
app = web.Application()
configure_app(dispatcher, app, path)
return app
class BaseResponse:
"""
Base class for webhook responses.
"""
method = None
def prepare(self) -> typing.Dict:
"""
You need to owerwrite this method.
:return: response parameters dict
"""
raise NotImplementedError
def cleanup(self) -> typing.Dict:
"""
Cleanup response after preparing. Remove empty fields.
:return: response parameters dict
"""
return {k: v for k, v in self.prepare().items() if v is not None}
def get_response(self):
"""
Get response object
:return:
"""
return {'method': self.method, **self.cleanup()}
def get_web_response(self):
"""
Get prepared web response with JSON data.
:return: :class:`aiohttp.web.Response`
"""
return web.json_response(self.get_response(), dumps=json.dumps)
async def execute_response(self, bot):
"""
Use this method if you want to execute response as simple HTTP request.
:param bot: Bot instance.
:return:
"""
return await bot.request(self.method, self.cleanup())
class SendMessage(BaseResponse):
"""
You can send message with webhook by using this instance of this object.
All arguments is equal with :method:`Bot.send_message` method.
"""
__slots__ = ('chat_id', 'text', 'parse_mode',
'disable_web_page_preview', 'disable_notification',
'reply_to_message_id', 'reply_markup')
method = api.Methods.SEND_MESSAGE
def __init__(self, chat_id: Union[Integer, String],
text: String,
parse_mode: Optional[String] = None,
disable_web_page_preview: Optional[Boolean] = None,
disable_notification: Optional[Boolean] = None,
reply_to_message_id: Optional[Integer] = None,
reply_markup: Optional[Union[
types.InlineKeyboardMarkup, types.ReplyKeyboardMarkup, Dict, String]] = None):
self.chat_id = chat_id
self.text = text
self.parse_mode = parse_mode
self.disable_web_page_preview = disable_web_page_preview
self.disable_notification = disable_notification
self.reply_to_message_id = reply_to_message_id
self.reply_markup = reply_markup
def prepare(self) -> dict:
return {
'chat_id': self.chat_id,
'text': self.text,
'parse_mode': self.parse_mode,
'disable_web_page_preview': self.disable_web_page_preview,
'disable_notification': self.disable_notification,
'reply_to_message_id': self.reply_to_message_id,
'reply_markup': prepare_arg(self.reply_markup)
}