Try to fix compatibility with aiohttp 3.8

This commit is contained in:
Alex Root Junior 2021-11-06 03:03:42 +02:00
parent b7a7e20750
commit dea3e384c2
6 changed files with 25 additions and 26 deletions

View file

@ -107,8 +107,7 @@ class BaseBot:
async def get_new_session(self) -> aiohttp.ClientSession:
return aiohttp.ClientSession(
connector=self._connector_class(**self._connector_init, loop=self._main_loop),
loop=self._main_loop,
connector=self._connector_class(**self._connector_init),
json_serialize=json.dumps
)
@ -119,6 +118,14 @@ class BaseBot:
async def get_session(self) -> Optional[aiohttp.ClientSession]:
if self._session is None or self._session.closed:
self._session = await self.get_new_session()
if not self._session._loop.is_running(): # NOQA
# Hate `aiohttp` devs because it juggles event-loops and breaks already opened session
# So... when we detect a broken session need to fix it by re-creating it
# @asvetlov, if you read this, please no more juggle event-loop inside aiohttp, it breaks the brain.
await self._session.close()
self._session = await self.get_new_session()
return self._session
@property
@ -126,7 +133,7 @@ class BaseBot:
reason="Client session should be created inside async function, use `await bot.get_session()` instead",
stacklevel=3,
)
async def session(self) -> Optional[aiohttp.ClientSession]:
def session(self) -> Optional[aiohttp.ClientSession]:
return self._session
@staticmethod

View file

@ -16,7 +16,7 @@ class EnvironmentMiddleware(BaseMiddleware):
data.update(
bot=dp.bot,
dispatcher=dp,
loop=dp.loop or asyncio.get_event_loop()
loop=asyncio.get_event_loop()
)
if self.context:
data.update(self.context)

View file

@ -56,8 +56,6 @@ class Dispatcher(DataMixin, ContextInstanceMixin):
filters_factory = FiltersFactory(self)
self.bot: Bot = bot
if loop is not None:
_ensure_loop(loop)
self._main_loop = loop
self.storage = storage
self.run_tasks_by_default = run_tasks_by_default
@ -104,10 +102,7 @@ class Dispatcher(DataMixin, ContextInstanceMixin):
@property
def _close_waiter(self) -> "asyncio.Future":
if self._dispatcher_close_waiter is None:
if self._main_loop is not None:
self._dispatcher_close_waiter = self._main_loop.create_future()
else:
self._dispatcher_close_waiter = asyncio.get_event_loop().create_future()
self._dispatcher_close_waiter = asyncio.get_event_loop().create_future()
return self._dispatcher_close_waiter
def _setup_filters(self):
@ -335,10 +330,7 @@ class Dispatcher(DataMixin, ContextInstanceMixin):
return await self.bot.delete_webhook()
def _loop_create_task(self, coro):
if self._main_loop is None:
return asyncio.create_task(coro)
_ensure_loop(self._main_loop)
return self._main_loop.create_task(coro)
return asyncio.create_task(coro)
async def start_polling(self,
timeout=20,
@ -403,7 +395,7 @@ class Dispatcher(DataMixin, ContextInstanceMixin):
log.debug(f"Received {len(updates)} updates.")
offset = updates[-1].update_id + 1
self._loop_create_task(self._process_polling_updates(updates, fast))
asyncio.create_task(self._process_polling_updates(updates, fast))
if relax:
await asyncio.sleep(relax)
@ -1401,15 +1393,15 @@ class Dispatcher(DataMixin, ContextInstanceMixin):
try:
response = task.result()
except Exception as e:
self._loop_create_task(
asyncio.create_task(
self.errors_handlers.notify(types.Update.get_current(), e))
else:
if isinstance(response, BaseResponse):
self._loop_create_task(response.execute_response(self.bot))
asyncio.create_task(response.execute_response(self.bot))
@functools.wraps(func)
async def wrapper(*args, **kwargs):
task = self._loop_create_task(func(*args, **kwargs))
task = asyncio.create_task(func(*args, **kwargs))
task.add_done_callback(process_response)
return wrapper

View file

@ -168,14 +168,14 @@ class WebhookRequestHandler(web.View):
:return:
"""
dispatcher = self.get_dispatcher()
loop = dispatcher.loop or asyncio.get_event_loop()
loop = asyncio.get_event_loop()
# Analog of `asyncio.wait_for` but without cancelling task
waiter = loop.create_future()
timeout_handle = loop.call_later(RESPONSE_TIMEOUT, asyncio.tasks._release_waiter, waiter)
cb = functools.partial(asyncio.tasks._release_waiter, waiter)
fut = asyncio.ensure_future(dispatcher.updates_handler.notify(update), loop=loop)
fut = asyncio.ensure_future(dispatcher.updates_handler.notify(update))
fut.add_done_callback(cb)
try:
@ -207,7 +207,7 @@ class WebhookRequestHandler(web.View):
TimeoutWarning)
dispatcher = self.get_dispatcher()
loop = dispatcher.loop or asyncio.get_event_loop()
loop = asyncio.get_running_loop()
try:
results = task.result()
@ -217,7 +217,7 @@ class WebhookRequestHandler(web.View):
else:
response = self.get_response(results)
if response is not None:
asyncio.ensure_future(response.execute_response(dispatcher.bot), loop=loop)
asyncio.ensure_future(response.execute_response(dispatcher.bot))
def get_response(self, results):
"""

View file

@ -314,7 +314,7 @@ class Executor:
:param timeout:
"""
self._prepare_polling()
loop: asyncio.AbstractEventLoop = self.loop
loop = asyncio.get_event_loop()
try:
loop.run_until_complete(self._startup_polling())

View file

@ -1,3 +1,3 @@
aiohttp>=3.7.2,<4.0.0
Babel>=2.8.0
certifi>=2020.6.20
aiohttp>=3.8.2,<3.9.0
Babel>=2.9.1,<2.10.0
certifi>=2021.10.8