Add webhook feed method to Dispatcher

This commit is contained in:
Alex Root Junior 2020-02-22 00:59:10 +02:00
parent a41bccddf9
commit 3f5c51e805
4 changed files with 239 additions and 132 deletions

View file

@ -24,6 +24,12 @@ class Request(BaseModel):
class Config(BaseConfig):
arbitrary_types_allowed = True
def render_webhook_request(self):
return {
"method": self.method,
**{key: value for key, value in self.data.items() if value is not None},
}
class Response(ResponseParameters, GenericModel, Generic[T]):
ok: bool

View file

@ -1,6 +1,8 @@
import asyncio
from asyncio import Lock
from typing import Any, AsyncGenerator, Dict, Optional
import contextvars
import warnings
from asyncio import CancelledError, Future, Lock
from typing import Any, AsyncGenerator, Dict, Optional, Union
from .. import loggers
from ..api.client.bot import Bot
@ -94,7 +96,7 @@ class Dispatcher(Router):
update_id = update.update_id + 1
@classmethod
async def _silent_call_request(cls, result: TelegramMethod) -> None:
async def _silent_call_request(cls, bot: Bot, result: TelegramMethod) -> None:
"""
Simulate answer into WebHook
@ -102,7 +104,7 @@ class Dispatcher(Router):
:return:
"""
try:
await result
await bot(result)
except TelegramAPIError as e:
# In due to WebHook mechanism doesn't allows to get response for
# requests called in answer to WebHook request.
@ -111,13 +113,13 @@ class Dispatcher(Router):
loggers.dispatcher.error("Failed to make answer: %s: %s", e.__class__.__name__, e)
async def process_update(
self, update: Update, bot: Bot, call_answer: bool = True, **kwargs: Any
self, bot: Bot, update: Update, call_answer: bool = True, **kwargs: Any
) -> bool:
"""
Propagate update to event listeners
:param update: instance of Update
:param bot: instance of Bot
:param update: instance of Update
:param call_answer: need to execute response as Telegram method (like answer into webhook)
:param kwargs: contextual data for middlewares, filters and handlers
:return: status
@ -125,7 +127,7 @@ class Dispatcher(Router):
try:
async for result in self.feed_update(bot, update, **kwargs):
if call_answer and isinstance(result, TelegramMethod):
await self._silent_call_request(result)
await self._silent_call_request(bot=bot, result=result)
return True
except Exception as e:
@ -149,7 +151,85 @@ class Dispatcher(Router):
:return:
"""
async for update in self._listen_updates(bot):
await self.process_update(update=update, bot=bot, **kwargs)
await self.process_update(bot=bot, update=update, **kwargs)
async def _feed_webhook_update(self, bot: Bot, update: Update, **kwargs: Any) -> Any:
"""
The same with `Dispatcher.process_update()` but returns real response instead of bool
"""
try:
async for result in self.feed_update(bot, update, **kwargs):
return result
except Exception as e:
loggers.dispatcher.exception(
"Cause exception while process update id=%d by bot id=%d\n%s: %s",
update.update_id,
bot.id,
e.__class__.__name__,
e,
)
raise
async def feed_webhook_update(
self, bot: Bot, update: Union[Update, Dict[str, Any]], _timeout: int = 55, **kwargs
) -> Optional[Dict[str, Any]]:
if not isinstance(update, Update): # Allow to use raw updates
update = Update(**update)
ctx = contextvars.copy_context()
loop = asyncio.get_running_loop()
waiter = loop.create_future()
def release_waiter(*args: Any):
if not waiter.done():
waiter.set_result(None)
timeout_handle = loop.call_later(_timeout, release_waiter)
process_updates: Future = asyncio.ensure_future(
self._feed_webhook_update(bot=bot, update=update, **kwargs)
)
process_updates.add_done_callback(release_waiter, context=ctx)
def process_response(task: asyncio.Task):
warnings.warn(
f"Detected slow response into webhook.\n"
f"Telegram is waiting for response only first 60 seconds and then re-send update.\n"
f"For preventing this situation response into webhook returned immediately "
f"and handler is moved to background and still processing update.",
RuntimeWarning,
)
try:
result = task.result()
except Exception as e:
raise e
if isinstance(result, TelegramMethod):
asyncio.ensure_future(self._silent_call_request(bot=bot, result=result))
try:
try:
await waiter
except CancelledError: # pragma: nocover
process_updates.remove_done_callback(release_waiter)
process_updates.cancel()
raise
if process_updates.done():
# TODO: handle exceptions
response: Any = process_updates.result()
if isinstance(response, TelegramMethod):
request = response.build_request()
return request.render_webhook_request()
else:
process_updates.remove_done_callback(release_waiter)
process_updates.add_done_callback(process_response, context=ctx)
finally:
timeout_handle.cancel()
return None
async def start_polling(self, *bots: Bot, **kwargs: Any) -> None:
"""