From cf510623780af8e855d26c78844c861f4ca01673 Mon Sep 17 00:00:00 2001 From: Alex Root Junior Date: Sat, 5 Aug 2017 21:33:59 +0300 Subject: [PATCH] Implement more things for graceful shutdown of storage and more thread-safe Redis storage. --- aiogram/contrib/fsm_storage/memory.py | 6 +++++ aiogram/contrib/fsm_storage/redis.py | 34 ++++++++++++++++++++++----- aiogram/dispatcher/storage.py | 23 ++++++++++++++++++ 3 files changed, 57 insertions(+), 6 deletions(-) diff --git a/aiogram/contrib/fsm_storage/memory.py b/aiogram/contrib/fsm_storage/memory.py index cbd8f243..c0c81c57 100644 --- a/aiogram/contrib/fsm_storage/memory.py +++ b/aiogram/contrib/fsm_storage/memory.py @@ -10,6 +10,12 @@ class MemoryStorage(BaseStorage): This type of storage is not recommended for usage in bots, because you will lost all states after restarting. """ + async def wait_closed(self): + pass + + def close(self): + self.data.clear() + def __init__(self): self.data = {} diff --git a/aiogram/contrib/fsm_storage/redis.py b/aiogram/contrib/fsm_storage/redis.py index d0bab8f3..30c61128 100644 --- a/aiogram/contrib/fsm_storage/redis.py +++ b/aiogram/contrib/fsm_storage/redis.py @@ -2,6 +2,7 @@ This module has redis storage for finite-state machine based on `aioredis `_ driver """ +import asyncio import typing import aioredis @@ -21,6 +22,13 @@ class RedisStorage(BaseStorage): storage = RedisStorage('localhost', 6379, db=5) dp = Dispatcher(bot, storage=storage) + And need to close Redis connection when shutdown + + .. code-block:: python3 + + dp.storage.close() + await dp.storage.wait_closed() + """ def __init__(self, host, port, db=None, password=None, ssl=None, loop=None, **kwargs): @@ -29,10 +37,22 @@ class RedisStorage(BaseStorage): self._db = db self._password = password self._ssl = ssl - self._loop = loop + self._loop = loop or asyncio.get_event_loop() self._kwargs = kwargs self._redis: aioredis.RedisConnection = None + self._connection_lock = asyncio.Lock(loop=self._loop) + + def close(self): + if self._redis and not self._redis.closed: + self._redis.close() + del self._redis + self._redis = None + + async def wait_closed(self): + if self._redis: + return await self._redis.wait_closed() + return True @property async def redis(self) -> aioredis.RedisConnection: @@ -41,11 +61,13 @@ class RedisStorage(BaseStorage): This property is awaitable. """ - if self._redis is None: - self._redis = await aioredis.create_connection((self._host, self._port), - db=self._db, password=self._password, ssl=self._ssl, - loop=self._loop, - **self._kwargs) + # Use thread-safe asyncio Lock because this method without that is not safe + async with self._connection_lock: + if self._redis is None: + self._redis = await aioredis.create_connection((self._host, self._port), + db=self._db, password=self._password, ssl=self._ssl, + loop=self._loop, + **self._kwargs) return self._redis async def get_record(self, *, diff --git a/aiogram/dispatcher/storage.py b/aiogram/dispatcher/storage.py index 4deb3b54..d3c450ae 100644 --- a/aiogram/dispatcher/storage.py +++ b/aiogram/dispatcher/storage.py @@ -6,6 +6,23 @@ class BaseStorage: In states-storage you can save current user state and data for all steps """ + def close(self): + """ + Need override this method and use when application is shutdowns. + You can save data or etc. + + :return: + """ + raise NotImplementedError + + async def wait_closed(self): + """ + You need override this method for all asynchronously storage's like Redis. + + :return: + """ + raise NotImplementedError + @classmethod def check_address(cls, *, chat: typing.Union[str, int, None] = None, @@ -209,6 +226,12 @@ class DisabledStorage(BaseStorage): Empty storage. Use it if you don't want to use Finite-State Machine """ + def close(self): + pass + + async def wait_closed(self): + pass + async def get_state(self, *, chat: typing.Union[str, int, None] = None, user: typing.Union[str, int, None] = None,