Implement more things for graceful shutdown of storage and more thread-safe Redis storage.

This commit is contained in:
Alex Root Junior 2017-08-05 21:33:59 +03:00
parent f3088f2f0a
commit cf51062378
3 changed files with 57 additions and 6 deletions

View file

@ -10,6 +10,12 @@ class MemoryStorage(BaseStorage):
This type of storage is not recommended for usage in bots, because you will lost all states after restarting.
"""
async def wait_closed(self):
pass
def close(self):
self.data.clear()
def __init__(self):
self.data = {}

View file

@ -2,6 +2,7 @@
This module has redis storage for finite-state machine based on `aioredis <https://github.com/aio-libs/aioredis>`_ driver
"""
import asyncio
import typing
import aioredis
@ -21,6 +22,13 @@ class RedisStorage(BaseStorage):
storage = RedisStorage('localhost', 6379, db=5)
dp = Dispatcher(bot, storage=storage)
And need to close Redis connection when shutdown
.. code-block:: python3
dp.storage.close()
await dp.storage.wait_closed()
"""
def __init__(self, host, port, db=None, password=None, ssl=None, loop=None, **kwargs):
@ -29,10 +37,22 @@ class RedisStorage(BaseStorage):
self._db = db
self._password = password
self._ssl = ssl
self._loop = loop
self._loop = loop or asyncio.get_event_loop()
self._kwargs = kwargs
self._redis: aioredis.RedisConnection = None
self._connection_lock = asyncio.Lock(loop=self._loop)
def close(self):
if self._redis and not self._redis.closed:
self._redis.close()
del self._redis
self._redis = None
async def wait_closed(self):
if self._redis:
return await self._redis.wait_closed()
return True
@property
async def redis(self) -> aioredis.RedisConnection:
@ -41,11 +61,13 @@ class RedisStorage(BaseStorage):
This property is awaitable.
"""
if self._redis is None:
self._redis = await aioredis.create_connection((self._host, self._port),
db=self._db, password=self._password, ssl=self._ssl,
loop=self._loop,
**self._kwargs)
# Use thread-safe asyncio Lock because this method without that is not safe
async with self._connection_lock:
if self._redis is None:
self._redis = await aioredis.create_connection((self._host, self._port),
db=self._db, password=self._password, ssl=self._ssl,
loop=self._loop,
**self._kwargs)
return self._redis
async def get_record(self, *,

View file

@ -6,6 +6,23 @@ class BaseStorage:
In states-storage you can save current user state and data for all steps
"""
def close(self):
"""
Need override this method and use when application is shutdowns.
You can save data or etc.
:return:
"""
raise NotImplementedError
async def wait_closed(self):
"""
You need override this method for all asynchronously storage's like Redis.
:return:
"""
raise NotImplementedError
@classmethod
def check_address(cls, *,
chat: typing.Union[str, int, None] = None,
@ -209,6 +226,12 @@ class DisabledStorage(BaseStorage):
Empty storage. Use it if you don't want to use Finite-State Machine
"""
def close(self):
pass
async def wait_closed(self):
pass
async def get_state(self, *,
chat: typing.Union[str, int, None] = None,
user: typing.Union[str, int, None] = None,