From f0aa17767edc549bfb4939c250cd7f0bf780e067 Mon Sep 17 00:00:00 2001 From: Alex Root Junior Date: Sun, 24 Oct 2021 23:25:15 +0300 Subject: [PATCH] Added base webhook implementation and example --- aiogram/dispatcher/dispatcher.py | 1 + aiogram/dispatcher/filters/state.py | 12 +- aiogram/dispatcher/webhook/aiohttp_server.py | 223 +++++++++++++++++++ aiogram/dispatcher/webhook/security.py | 41 ++++ aiogram/loggers.py | 1 + 5 files changed, 270 insertions(+), 8 deletions(-) create mode 100644 aiogram/dispatcher/webhook/aiohttp_server.py create mode 100644 aiogram/dispatcher/webhook/security.py diff --git a/aiogram/dispatcher/dispatcher.py b/aiogram/dispatcher/dispatcher.py index 2c78bace..cacfbd6d 100644 --- a/aiogram/dispatcher/dispatcher.py +++ b/aiogram/dispatcher/dispatcher.py @@ -367,6 +367,7 @@ class Dispatcher(Router): :param handle_as_tasks: :param kwargs: :param backoff_config: + :param allowed_updates: :return: """ async with self._running_lock: # Prevent to run this method twice at a once diff --git a/aiogram/dispatcher/filters/state.py b/aiogram/dispatcher/filters/state.py index 294c1ada..429b57d5 100644 --- a/aiogram/dispatcher/filters/state.py +++ b/aiogram/dispatcher/filters/state.py @@ -1,7 +1,7 @@ from inspect import isclass from typing import Any, Dict, Optional, Sequence, Type, Union, cast, no_type_check -from pydantic import validator +from pydantic import Field, validator from aiogram.dispatcher.filters import BaseFilter from aiogram.dispatcher.fsm.state import State, StatesGroup @@ -15,19 +15,15 @@ class StateFilter(BaseFilter): State filter """ - state: Union[StateType, Sequence[StateType]] + state: Union[StateType, Sequence[StateType]] = Field(...) class Config: arbitrary_types_allowed = True - @validator("state", always=True) + @validator("state") @no_type_check # issubclass breaks things def _validate_state(cls, v: Union[StateType, Sequence[StateType]]) -> Sequence[StateType]: - if ( - isinstance(v, (str, State, StatesGroup)) - or (isclass(v) and issubclass(v, StatesGroup)) - or v is None - ): + if isinstance(v, (str, State, StatesGroup)) or (isclass(v) and issubclass(v, StatesGroup)): return [v] return v diff --git a/aiogram/dispatcher/webhook/aiohttp_server.py b/aiogram/dispatcher/webhook/aiohttp_server.py new file mode 100644 index 00000000..a5ec3986 --- /dev/null +++ b/aiogram/dispatcher/webhook/aiohttp_server.py @@ -0,0 +1,223 @@ +import asyncio +from abc import ABC, abstractmethod +from asyncio import Transport +from typing import Any, Awaitable, Callable, Dict, Optional, Tuple, cast + +from aiohttp import web +from aiohttp.abc import Application + +from aiogram import Bot, Dispatcher, loggers +from aiogram.dispatcher.webhook.security import IPFilter +from aiogram.methods import TelegramMethod + + +def setup_application(app: Application, dispatcher: Dispatcher, /, **kwargs: Any) -> None: + """ + This function helps to configure startup-shutdown process + + :param app: + :param dispatcher: + :param kwargs: + :return: + """ + workflow_data = { + "app": app, + "dispatcher": dispatcher, + **kwargs, + } + + async def on_startup(*a: Any, **kw: Any) -> None: + await dispatcher.emit_startup(**workflow_data) + + async def on_shutdown(*a: Any, **kw: Any) -> None: + await dispatcher.emit_shutdown(**workflow_data) + + app.on_startup.append(on_startup) + app.on_shutdown.append(on_shutdown) + + +def check_ip(ip_filter: IPFilter, request: web.Request) -> Tuple[str, bool]: + # Try to resolve client IP over reverse proxy + if forwarded_for := request.headers.get("X-Forwarded-For", ""): + # Get the left-most ip when there is multiple ips + # (request got through multiple proxy/load balancers) + # https://github.com/aiogram/aiogram/issues/672 + forwarded_for, *_ = forwarded_for.split(",", maxsplit=1) + return forwarded_for, forwarded_for in ip_filter + + # When reverse proxy is not configured IP address can be resolved from incoming connection + if peer_name := cast(Transport, request.transport).get_extra_info("peername"): + host, _ = peer_name + return host, host in ip_filter + + return "", False + + +def ip_filter_middleware( + ip_filter: IPFilter, +) -> Callable[[web.Request, Callable[[web.Request], Any]], Awaitable[Any]]: + """ + + :param ip_filter: + :return: + """ + + async def _ip_filter_middleware( + request: web.Request, handler: Callable[[web.Request], Any] + ) -> Any: + ip_address, accept = check_ip(ip_filter=ip_filter, request=request) + if not accept: + loggers.webhook.warning(f"Blocking request from an unauthorized IP: {ip_address}") + raise web.HTTPUnauthorized() + return await handler(request) + + return _ip_filter_middleware + + +class BaseRequestHandler(ABC): + """ + Base handler that helps to handle incoming request from aiohttp + and propagate it to the Dispatcher + """ + + def __init__(self, dispatcher: Dispatcher, handle_in_background: bool = True) -> None: + """ + :param dispatcher: instance of :class:`aiogram.dispatcher.dispatcher.Dispatcher` + :param handle_in_background: immediately respond to the Telegram instead of waiting end of handler process + """ + self.dispatcher = dispatcher + self.handle_in_background = handle_in_background + + def register(self, app: Application, /, path: str, **kwargs: Any) -> None: + """ + Register route and shutdown callback + + :param app: instance of aiohttp Application + :param path: route path + :param kwargs: + """ + app.on_shutdown.append(self._handle_close) + app.router.add_route("POST", path, self.handle, **kwargs) + + async def _handle_close(self, app: Application) -> None: + await self.close() + + @abstractmethod + async def close(self) -> None: + pass + + @abstractmethod + async def resolve_bot(self, request: web.Request) -> Bot: + """ + This method should be implemented in subclasses of this class. + + Resolve Bot instance from request. + + :param request: + :return: Bot instance + """ + pass + + async def _handle_request_background(self, bot: Bot, request: web.Request) -> web.Response: + asyncio.create_task( + self.dispatcher.feed_raw_update( + bot=bot, + update=await request.json(loads=bot.session.json_loads), + ) + ) + return web.Response(status=200) + + async def _handle_request(self, bot: Bot, request: web.Request) -> web.Response: + result = await self.dispatcher.feed_webhook_update( + bot, + await request.json(loads=bot.session.json_loads), + ) + if isinstance(result, TelegramMethod): + request = result.build_request(bot) + return web.json_response(request.render_webhook_request()) + return web.Response(status=200) + + async def handle(self, request: web.Request) -> web.Response: + bot = await self.resolve_bot(request) + if self.handle_in_background: + return await self._handle_request_background(bot=bot, request=request) + return await self._handle_request(bot=bot, request=request) + + __call__ = handle + + +class SimpleRequestHandler(BaseRequestHandler): + """ + Handler for single Bot instance + """ + + def __init__( + self, dispatcher: Dispatcher, bot: Bot, handle_in_background: bool = True + ) -> None: + """ + :param dispatcher: instance of :class:`aiogram.dispatcher.dispatcher.Dispatcher` + :param handle_in_background: immediately respond to the Telegram instead of waiting end of handler process + :param bot: instance of :class:`aiogram.client.bot.Bot` + """ + super().__init__(dispatcher=dispatcher, handle_in_background=handle_in_background) + self.bot = bot + + async def close(self) -> None: + """ + Close bot session + """ + await self.bot.session.close() + + async def resolve_bot(self, request: web.Request) -> Bot: + return self.bot + + +class TokenBasedRequestHandler(BaseRequestHandler): + """ + Handler that supports multiple bots, the context will be resolved from path variable 'bot_token' + """ + + def __init__( + self, + dispatcher: Dispatcher, + handle_in_background: bool = True, + bot_settings: Optional[Dict[str, Any]] = None, + ) -> None: + """ + :param dispatcher: instance of :class:`aiogram.dispatcher.dispatcher.Dispatcher` + :param handle_in_background: immediately respond to the Telegram instead of waiting end of handler process + :param bot_settings: kwargs that will be passed to new Bot instance + """ + super().__init__(dispatcher=dispatcher, handle_in_background=handle_in_background) + if bot_settings is None: + bot_settings = {} + self.bot_settings = bot_settings + self.bots: Dict[str, Bot] = {} + + async def close(self) -> None: + for bot in self.bots.values(): + await bot.session.close() + + def register(self, app: Application, /, path: str, **kwargs: Any) -> None: + """ + Validate path, register route and shutdown callback + + :param app: instance of aiohttp Application + :param path: route path + :param kwargs: + """ + if "{bot_token}" not in path: + raise ValueError("Path should contains '{bot_token}' substring") + super().register(app, path=path, **kwargs) + + async def resolve_bot(self, request: web.Request) -> Bot: + """ + Get bot token from path and create or get from cache Bot instance + + :param request: + :return: + """ + token = request.match_info["bot_token"] + if token not in self.bots: + self.bots[token] = Bot(token=token, **self.bot_settings) + return self.bots[token] diff --git a/aiogram/dispatcher/webhook/security.py b/aiogram/dispatcher/webhook/security.py new file mode 100644 index 00000000..91d38204 --- /dev/null +++ b/aiogram/dispatcher/webhook/security.py @@ -0,0 +1,41 @@ +from ipaddress import IPv4Address, IPv4Network +from typing import Optional, Sequence, Set, Union + +DEFAULT_TELEGRAM_NETWORKS = [ + IPv4Network("149.154.160.0/20"), + IPv4Network("91.108.4.0/22"), +] + + +class IPFilter: + def __init__(self, ips: Optional[Sequence[Union[str, IPv4Network, IPv4Address]]] = None): + self._allowed_ips: Set[IPv4Address] = set() + + if ips: + self.allow(*ips) + + def allow(self, *ips: Union[str, IPv4Network, IPv4Address]) -> None: + for ip in ips: + self.allow_ip(ip) + + def allow_ip(self, ip: Union[str, IPv4Network, IPv4Address]) -> None: + if isinstance(ip, str): + ip = IPv4Network(ip) if "/" in ip else IPv4Address(ip) + if isinstance(ip, IPv4Address): + self._allowed_ips.add(ip) + elif isinstance(ip, IPv4Network): + self._allowed_ips.update(ip.hosts()) + else: + raise ValueError(f"Invalid type of ipaddress: {type(ip)} ('{ip}')") + + @classmethod + def default(cls) -> "IPFilter": + return cls(DEFAULT_TELEGRAM_NETWORKS) + + def check(self, ip: Union[str, IPv4Address]) -> bool: + if not isinstance(ip, IPv4Address): + ip = IPv4Address(ip) + return ip in self._allowed_ips + + def __contains__(self, item: Union[str, IPv4Address]) -> bool: + return self.check(item) diff --git a/aiogram/loggers.py b/aiogram/loggers.py index 5b5a8eba..eec44216 100644 --- a/aiogram/loggers.py +++ b/aiogram/loggers.py @@ -2,3 +2,4 @@ import logging dispatcher = logging.getLogger("aiogram.dispatcher") middlewares = logging.getLogger("aiogram.middlewares") +webhook = logging.getLogger("aiogram.webhook")