mirror of
https://github.com/aiogram/aiogram.git
synced 2026-04-08 16:37:47 +00:00
* Base implementation * Added tests, fixed arguments priority * Use `Optional[X]` instead of `X | None` * Added changelog * Added tests
103 lines
2.4 KiB
Python
103 lines
2.4 KiB
Python
from abc import ABC, abstractmethod
|
|
from contextlib import asynccontextmanager
|
|
from dataclasses import dataclass
|
|
from typing import Any, AsyncGenerator, Dict, Optional, Union
|
|
|
|
from aiogram.fsm.state import State
|
|
|
|
StateType = Optional[Union[str, State]]
|
|
|
|
DEFAULT_DESTINY = "default"
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class StorageKey:
|
|
bot_id: int
|
|
chat_id: int
|
|
user_id: int
|
|
thread_id: Optional[int] = None
|
|
destiny: str = DEFAULT_DESTINY
|
|
|
|
|
|
class BaseStorage(ABC):
|
|
"""
|
|
Base class for all FSM storages
|
|
"""
|
|
|
|
@abstractmethod
|
|
async def set_state(self, key: StorageKey, state: StateType = None) -> None:
|
|
"""
|
|
Set state for specified key
|
|
|
|
:param key: storage key
|
|
:param state: new state
|
|
"""
|
|
pass
|
|
|
|
@abstractmethod
|
|
async def get_state(self, key: StorageKey) -> Optional[str]:
|
|
"""
|
|
Get key state
|
|
|
|
:param key: storage key
|
|
:return: current state
|
|
"""
|
|
pass
|
|
|
|
@abstractmethod
|
|
async def set_data(self, key: StorageKey, data: Dict[str, Any]) -> None:
|
|
"""
|
|
Write data (replace)
|
|
|
|
:param key: storage key
|
|
:param data: new data
|
|
"""
|
|
pass
|
|
|
|
@abstractmethod
|
|
async def get_data(self, key: StorageKey) -> Dict[str, Any]:
|
|
"""
|
|
Get current data for key
|
|
|
|
:param key: storage key
|
|
:return: current data
|
|
"""
|
|
pass
|
|
|
|
async def update_data(self, key: StorageKey, data: Dict[str, Any]) -> Dict[str, Any]:
|
|
"""
|
|
Update date in the storage for key (like dict.update)
|
|
|
|
:param key: storage key
|
|
:param data: partial data
|
|
:return: new data
|
|
"""
|
|
current_data = await self.get_data(key=key)
|
|
current_data.update(data)
|
|
await self.set_data(key=key, data=current_data)
|
|
return current_data.copy()
|
|
|
|
@abstractmethod
|
|
async def close(self) -> None: # pragma: no cover
|
|
"""
|
|
Close storage (database connection, file or etc.)
|
|
"""
|
|
pass
|
|
|
|
|
|
class BaseEventIsolation(ABC):
|
|
@abstractmethod
|
|
@asynccontextmanager
|
|
async def lock(self, key: StorageKey) -> AsyncGenerator[None, None]:
|
|
"""
|
|
Isolate events with lock.
|
|
Will be used as context manager
|
|
|
|
:param key: storage key
|
|
:return: An async generator
|
|
"""
|
|
yield None
|
|
|
|
@abstractmethod
|
|
async def close(self) -> None:
|
|
pass
|