Webhook integration in 3.0 (#737)

* Added base webhook implementation and example
* Added example
* Enable on_startup callback
* Correctly handle response into webhook (silent call)
* Fixed State filter
This commit is contained in:
Alex Root Junior 2021-11-08 23:20:12 +02:00 committed by GitHub
parent e0ab7d8bd3
commit 1c2c7fd88c
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
13 changed files with 865 additions and 246 deletions

View file

@ -202,10 +202,11 @@ class Dispatcher(Router):
return await self.propagate_event(update_type=update_type, event=event, **kwargs)
@classmethod
async def _silent_call_request(cls, bot: Bot, result: TelegramMethod[Any]) -> None:
async def silent_call_request(cls, bot: Bot, result: TelegramMethod[Any]) -> None:
"""
Simulate answer into WebHook
:param bot:
:param result:
:return:
"""
@ -233,7 +234,7 @@ class Dispatcher(Router):
try:
response = await self.feed_update(bot, update, **kwargs)
if call_answer and isinstance(response, TelegramMethod):
await self._silent_call_request(bot=bot, result=response)
await self.silent_call_request(bot=bot, result=response)
return response is not UNHANDLED
except Exception as e:
@ -324,7 +325,7 @@ class Dispatcher(Router):
except Exception as e:
raise e
if isinstance(result, TelegramMethod):
asyncio.ensure_future(self._silent_call_request(bot=bot, result=result))
asyncio.ensure_future(self.silent_call_request(bot=bot, result=result))
try:
try:
@ -367,6 +368,7 @@ class Dispatcher(Router):
:param handle_as_tasks:
:param kwargs:
:param backoff_config:
:param allowed_updates:
:return:
"""
async with self._running_lock: # Prevent to run this method twice at a once

View file

@ -1,7 +1,7 @@
from inspect import isclass
from typing import Any, Dict, Optional, Sequence, Type, Union, cast, no_type_check
from pydantic import validator
from pydantic import Field, validator
from aiogram.dispatcher.filters import BaseFilter
from aiogram.dispatcher.fsm.state import State, StatesGroup
@ -15,12 +15,12 @@ class StateFilter(BaseFilter):
State filter
"""
state: Union[StateType, Sequence[StateType]]
state: Union[StateType, Sequence[StateType]] = Field(...)
class Config:
arbitrary_types_allowed = True
@validator("state", always=True)
@validator("state")
@no_type_check # issubclass breaks things
def _validate_state(cls, v: Union[StateType, Sequence[StateType]]) -> Sequence[StateType]:
if (

View file

@ -0,0 +1,231 @@
import asyncio
from abc import ABC, abstractmethod
from asyncio import Transport
from typing import Any, Awaitable, Callable, Dict, Optional, Tuple, cast
from aiohttp import web
from aiohttp.abc import Application
from aiohttp.typedefs import Handler
from aiohttp.web_middlewares import middleware
from aiogram import Bot, Dispatcher, loggers
from aiogram.dispatcher.webhook.security import IPFilter
from aiogram.methods import TelegramMethod
def setup_application(app: Application, dispatcher: Dispatcher, /, **kwargs: Any) -> None:
"""
This function helps to configure startup-shutdown process
:param app:
:param dispatcher:
:param kwargs:
:return:
"""
workflow_data = {
"app": app,
"dispatcher": dispatcher,
**kwargs,
}
async def on_startup(*a: Any, **kw: Any) -> None: # pragma: no cover
await dispatcher.emit_startup(**workflow_data)
async def on_shutdown(*a: Any, **kw: Any) -> None: # pragma: no cover
await dispatcher.emit_shutdown(**workflow_data)
app.on_startup.append(on_startup)
app.on_shutdown.append(on_shutdown)
def check_ip(ip_filter: IPFilter, request: web.Request) -> Tuple[str, bool]:
# Try to resolve client IP over reverse proxy
if forwarded_for := request.headers.get("X-Forwarded-For", ""):
# Get the left-most ip when there is multiple ips
# (request got through multiple proxy/load balancers)
# https://github.com/aiogram/aiogram/issues/672
forwarded_for, *_ = forwarded_for.split(",", maxsplit=1)
return forwarded_for, forwarded_for in ip_filter
# When reverse proxy is not configured IP address can be resolved from incoming connection
if peer_name := cast(Transport, request.transport).get_extra_info("peername"):
host, _ = peer_name
return host, host in ip_filter
# Potentially impossible case
return "", False # pragma: no cover
def ip_filter_middleware(
ip_filter: IPFilter,
) -> Callable[[web.Request, Handler], Awaitable[Any]]:
"""
:param ip_filter:
:return:
"""
@middleware
async def _ip_filter_middleware(request: web.Request, handler: Handler) -> Any:
ip_address, accept = check_ip(ip_filter=ip_filter, request=request)
if not accept:
loggers.webhook.warning(f"Blocking request from an unauthorized IP: {ip_address}")
raise web.HTTPUnauthorized()
return await handler(request)
return _ip_filter_middleware
class BaseRequestHandler(ABC):
"""
Base handler that helps to handle incoming request from aiohttp
and propagate it to the Dispatcher
"""
def __init__(self, dispatcher: Dispatcher, handle_in_background: bool = True) -> None:
"""
:param dispatcher: instance of :class:`aiogram.dispatcher.dispatcher.Dispatcher`
:param handle_in_background: immediately respond to the Telegram instead of waiting end of handler process
"""
self.dispatcher = dispatcher
self.handle_in_background = handle_in_background
def register(self, app: Application, /, path: str, **kwargs: Any) -> None:
"""
Register route and shutdown callback
:param app: instance of aiohttp Application
:param path: route path
:param kwargs:
"""
app.on_shutdown.append(self._handle_close)
app.router.add_route("POST", path, self.handle, **kwargs)
async def _handle_close(self, app: Application) -> None:
await self.close()
@abstractmethod
async def close(self) -> None:
pass
@abstractmethod
async def resolve_bot(self, request: web.Request) -> Bot:
"""
This method should be implemented in subclasses of this class.
Resolve Bot instance from request.
:param request:
:return: Bot instance
"""
pass
async def _background_feed_update(self, bot: Bot, update: Dict[str, Any]) -> None:
result = await self.dispatcher.feed_raw_update(
bot=bot,
update=update,
)
if isinstance(result, TelegramMethod):
await self.dispatcher.silent_call_request(bot=bot, result=result)
async def _handle_request_background(self, bot: Bot, request: web.Request) -> web.Response:
asyncio.create_task(
self._background_feed_update(
bot=bot, update=await request.json(loads=bot.session.json_loads)
)
)
return web.json_response({})
async def _handle_request(self, bot: Bot, request: web.Request) -> web.Response:
result = await self.dispatcher.feed_webhook_update(
bot,
await request.json(loads=bot.session.json_loads),
)
if result:
return web.json_response(result)
return web.json_response({})
async def handle(self, request: web.Request) -> web.Response:
bot = await self.resolve_bot(request)
if self.handle_in_background:
return await self._handle_request_background(bot=bot, request=request)
return await self._handle_request(bot=bot, request=request)
__call__ = handle
class SimpleRequestHandler(BaseRequestHandler):
"""
Handler for single Bot instance
"""
def __init__(
self, dispatcher: Dispatcher, bot: Bot, handle_in_background: bool = True
) -> None:
"""
:param dispatcher: instance of :class:`aiogram.dispatcher.dispatcher.Dispatcher`
:param handle_in_background: immediately respond to the Telegram instead of waiting end of handler process
:param bot: instance of :class:`aiogram.client.bot.Bot`
"""
super().__init__(dispatcher=dispatcher, handle_in_background=handle_in_background)
self.bot = bot
async def close(self) -> None:
"""
Close bot session
"""
await self.bot.session.close()
async def resolve_bot(self, request: web.Request) -> Bot:
return self.bot
class TokenBasedRequestHandler(BaseRequestHandler):
"""
Handler that supports multiple bots, the context will be resolved from path variable 'bot_token'
"""
def __init__(
self,
dispatcher: Dispatcher,
handle_in_background: bool = True,
bot_settings: Optional[Dict[str, Any]] = None,
) -> None:
"""
:param dispatcher: instance of :class:`aiogram.dispatcher.dispatcher.Dispatcher`
:param handle_in_background: immediately respond to the Telegram instead of waiting end of handler process
:param bot_settings: kwargs that will be passed to new Bot instance
"""
super().__init__(dispatcher=dispatcher, handle_in_background=handle_in_background)
if bot_settings is None:
bot_settings = {}
self.bot_settings = bot_settings
self.bots: Dict[str, Bot] = {}
async def close(self) -> None:
for bot in self.bots.values():
await bot.session.close()
def register(self, app: Application, /, path: str, **kwargs: Any) -> None:
"""
Validate path, register route and shutdown callback
:param app: instance of aiohttp Application
:param path: route path
:param kwargs:
"""
if "{bot_token}" not in path:
raise ValueError("Path should contains '{bot_token}' substring")
super().register(app, path=path, **kwargs)
async def resolve_bot(self, request: web.Request) -> Bot:
"""
Get bot token from path and create or get from cache Bot instance
:param request:
:return:
"""
token = request.match_info["bot_token"]
if token not in self.bots:
self.bots[token] = Bot(token=token, **self.bot_settings)
return self.bots[token]

View file

@ -0,0 +1,41 @@
from ipaddress import IPv4Address, IPv4Network
from typing import Optional, Sequence, Set, Union
DEFAULT_TELEGRAM_NETWORKS = [
IPv4Network("149.154.160.0/20"),
IPv4Network("91.108.4.0/22"),
]
class IPFilter:
def __init__(self, ips: Optional[Sequence[Union[str, IPv4Network, IPv4Address]]] = None):
self._allowed_ips: Set[IPv4Address] = set()
if ips:
self.allow(*ips)
def allow(self, *ips: Union[str, IPv4Network, IPv4Address]) -> None:
for ip in ips:
self.allow_ip(ip)
def allow_ip(self, ip: Union[str, IPv4Network, IPv4Address]) -> None:
if isinstance(ip, str):
ip = IPv4Network(ip) if "/" in ip else IPv4Address(ip)
if isinstance(ip, IPv4Address):
self._allowed_ips.add(ip)
elif isinstance(ip, IPv4Network):
self._allowed_ips.update(ip.hosts())
else:
raise ValueError(f"Invalid type of ipaddress: {type(ip)} ('{ip}')")
@classmethod
def default(cls) -> "IPFilter":
return cls(DEFAULT_TELEGRAM_NETWORKS)
def check(self, ip: Union[str, IPv4Address]) -> bool:
if not isinstance(ip, IPv4Address):
ip = IPv4Address(ip)
return ip in self._allowed_ips
def __contains__(self, item: Union[str, IPv4Address]) -> bool:
return self.check(item)

View file

@ -2,3 +2,4 @@ import logging
dispatcher = logging.getLogger("aiogram.dispatcher")
middlewares = logging.getLogger("aiogram.middlewares")
webhook = logging.getLogger("aiogram.webhook")