aiogram/aiogram/fsm/storage/memory.py
andrew000 91b84e63fa
Add cleanup to SimpleEventIsolation
cleaunp will clear unused locks to prevent memory overflow
2022-10-19 12:17:16 +03:00

82 lines
2.5 KiB
Python

from asyncio import Lock
from collections import defaultdict
from contextlib import asynccontextmanager
from dataclasses import dataclass, field
from typing import Any, AsyncGenerator, DefaultDict, Dict, Hashable, Optional
from aiogram import Bot
from aiogram.fsm.state import State
from aiogram.fsm.storage.base import BaseEventIsolation, BaseStorage, StateType, StorageKey
@dataclass
class MemoryStorageRecord:
data: Dict[str, Any] = field(default_factory=dict)
state: Optional[str] = None
class MemoryStorage(BaseStorage):
"""
Default FSM storage, stores all data in :class:`dict` and loss everything on shutdown
.. warning::
Is not recommended using in production in due to you will lose all data
when your bot restarts
"""
def __init__(self) -> None:
self.storage: DefaultDict[StorageKey, MemoryStorageRecord] = defaultdict(
MemoryStorageRecord
)
async def close(self) -> None:
pass
async def set_state(self, bot: Bot, key: StorageKey, state: StateType = None) -> None:
self.storage[key].state = state.state if isinstance(state, State) else state
async def get_state(self, bot: Bot, key: StorageKey) -> Optional[str]:
return self.storage[key].state
async def set_data(self, bot: Bot, key: StorageKey, data: Dict[str, Any]) -> None:
self.storage[key].data = data.copy()
async def get_data(self, bot: Bot, key: StorageKey) -> Dict[str, Any]:
return self.storage[key].data.copy()
class DisabledEventIsolation(BaseEventIsolation):
def __init__(self) -> None:
self._locks: DefaultDict[Hashable, Lock]
@asynccontextmanager
async def lock(self, bot: Bot, key: StorageKey) -> AsyncGenerator[None, None]:
yield
async def close(self) -> None:
pass
class SimpleEventIsolation(BaseEventIsolation):
def __init__(self) -> None:
# TODO: Unused locks cleaner is needed
self._locks: DefaultDict[Hashable, Lock] = defaultdict(Lock)
@asynccontextmanager
async def lock(self, bot: Bot, key: StorageKey) -> AsyncGenerator[None, None]:
lock = self._locks[key]
async with lock:
yield
self._cleanup(key)
async def close(self) -> None:
self._locks.clear()
def _cleanup(self, key: Hashable):
if self._locks[key]._waiters is None:
del self._locks[key]
elif len(self._locks[key]._waiters) == 0:
del self._locks[key]