feat(storage): port and modify state storages

This commit is contained in:
mpa 2020-08-16 16:10:43 +04:00
parent 6f53f15577
commit 80f1862cb8
No known key found for this signature in database
GPG key ID: BCCFBFCCC9B754A8
8 changed files with 257 additions and 5 deletions

View file

@ -4,29 +4,63 @@ import asyncio
import contextvars
import warnings
from asyncio import CancelledError, Future, Lock
from typing import Any, AsyncGenerator, Dict, Optional, Union
from typing import (
TYPE_CHECKING,
Any,
AsyncGenerator,
Dict,
Generic,
Mapping,
Optional,
TypeVar,
Union,
)
from .. import loggers
from ..api.client.bot import Bot
from ..api.methods import TelegramMethod
from ..api.types import Update, User
from ..api.types import Chat, Update, User
from ..utils.exceptions import TelegramAPIError
from .event.bases import NOT_HANDLED
from .middlewares.user_context import UserContextMiddleware
from .router import Router
from .state.context import CurrentUserContext
from .storage.base import BaseStorage
from .storage.dummy import DummyStorage
if TYPE_CHECKING:
_StorageDataT = TypeVar("_StorageDataT", bound=Mapping[str, Any])
else:
_StorageDataT = TypeVar("_StorageDataT", bound=Mapping)
class Dispatcher(Router):
class Dispatcher(Router, Generic[_StorageDataT]):
"""
Root router
"""
def __init__(self, **kwargs: Any) -> None:
super(Dispatcher, self).__init__(**kwargs)
def __init__(
self,
use_builtin_filters: bool = True,
storage: Optional[BaseStorage[_StorageDataT]] = None,
) -> None:
super(Dispatcher, self).__init__(use_builtin_filters=use_builtin_filters,)
self._running_lock = Lock()
# Default middleware is needed for contextual features
self.update.outer_middleware(UserContextMiddleware())
self.storage = storage
@property
def current_state(self) -> CurrentUserContext[_StorageDataT]:
if self.storage is None:
self.storage: DummyStorage = DummyStorage() # type: ignore
return CurrentUserContext(
storage=self.storage,
chat_id=Chat.get_current().id, # type: ignore
user_id=User.get_current().id, # type: ignore
)
@property
def parent_router(self) -> None:

View file

View file

@ -0,0 +1,71 @@
from typing import TYPE_CHECKING, Any, Callable, Dict, Generic, Mapping, Optional, TypeVar
from aiogram.dispatcher.storage.base import BaseStorage
if TYPE_CHECKING:
DataT = TypeVar("DataT", bound=Mapping[str, Any])
else:
DataT = TypeVar("DataT", bound=Mapping)
def _default_key_maker(chat_id: Optional[int] = None, user_id: Optional[int] = None) -> str:
if chat_id is None and user_id is None:
raise ValueError("`user` or `chat` parameter is required but no one is provided!")
if user_id is None and chat_id is not None:
user_id = chat_id
elif user_id is not None and chat_id is None:
chat_id = user_id
return f"{chat_id}:{user_id}"
class CurrentUserContext(Generic[DataT]):
__slots__ = "key", "storage"
def __init__(
self,
storage: BaseStorage[DataT],
chat_id: Optional[int],
user_id: Optional[int],
key_maker: Callable[[Optional[int], Optional[int]], str] = _default_key_maker,
):
assert (
chat_id or user_id
) is not None, "Either chat_id or user_id should be non-None value"
self.storage = storage
self.key = key_maker(chat_id, user_id)
async def get_state(self, default: Optional[str] = None) -> Optional[str]:
return await self.storage.get_state(self.key, default=default)
async def get_data(self, default: Optional[DataT] = None) -> DataT:
return await self.storage.get_data(self.key, default=default)
async def update_data(self, data: Optional[DataT] = None, **kwargs: Any) -> None:
if data is not None and not isinstance(data, Mapping):
raise ValueError("Data is expected to be a map") # todo
temp_data: Dict[str, Any] = {}
if isinstance(data, Mapping):
temp_data.update(**data)
temp_data.update(**kwargs)
await self.storage.update_data(self.key, data=temp_data) # type: ignore
async def set_state(self, state: Optional[str] = None) -> None:
await self.storage.set_state(self.key, state=state)
async def set_data(self, data: Optional[DataT] = None) -> None:
await self.storage.set_data(self.key, data=data)
async def reset_state(self, with_data: bool = True) -> None:
await self.storage.reset_state(self.key, with_data=with_data)
async def reset_data(self) -> None:
await self.storage.reset_data(self.key)
async def finish(self) -> None:
await self.storage.finish(self.key)

View file

View file

@ -0,0 +1,39 @@
from typing import Generic, Optional, TypeVar
_DataT = TypeVar("_DataT")
class BaseStorage(Generic[_DataT]):
async def get_state(self, key: str, default: Optional[str] = None) -> Optional[str]:
raise NotImplementedError
async def set_state(self, key: str, state: Optional[str]) -> None:
raise NotImplementedError
async def get_data(self, key: str, default: Optional[_DataT] = None) -> _DataT:
raise NotImplementedError
async def set_data(self, key: str, data: Optional[_DataT]) -> None:
raise NotImplementedError
async def update_data(self, key: str, data: _DataT) -> None:
raise NotImplementedError
async def close(self) -> None:
raise NotImplementedError
async def wait_closed(self) -> None:
raise NotImplementedError
# naively implemented basic, base member methods
async def reset_state(self, key: str, with_data: bool = True) -> None:
await self.set_state(key=key, state=None)
if with_data:
await self.set_data(key=key, data=None)
async def reset_data(self, key: str) -> None:
await self.set_data(key=key, data=None)
async def finish(self, key: str) -> None:
await self.reset_state(key=key, with_data=True)

View file

@ -0,0 +1,59 @@
import copy
from typing import Any, Dict, Optional
from typing_extensions import TypedDict
from .base import BaseStorage
class _UserStorageMetaData(TypedDict):
state: Optional[str]
data: Dict[str, Any]
class DictStorage(BaseStorage[Dict[str, Any]]):
"""
In-memory based states storage.
This type of storage is not recommended for usage in bots, because you will lost all states after restarting.
"""
def __init__(self) -> None:
# note: we can use TypedDict for Dict flat value
self.data: Dict[str, _UserStorageMetaData] = {}
def resolve_address(self, key: str) -> None:
if key not in self.data:
self.data[key] = {"state": None, "data": {}}
async def get_state(self, key: str, default: Optional[str] = None) -> Optional[str]:
self.resolve_address(key)
return self.data[key]["state"]
async def get_data(self, key: str, default: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
self.resolve_address(key=key)
return copy.deepcopy(self.data[key]["data"])
async def update_data(self, key: str, data: Optional[Dict[str, Any]] = None) -> None:
if data is None:
data = {}
self.resolve_address(key=key)
self.data[key]["data"].update(data)
async def set_state(self, key: str, state: Optional[str] = None) -> None:
self.resolve_address(key=key)
self.data[key]["state"] = state
async def set_data(self, key: str, data: Optional[Dict[str, Any]] = None) -> None:
self.resolve_address(key=key)
self.data[key]["data"] = copy.deepcopy(data) # type: ignore
async def reset_state(self, key: str, with_data: bool = True) -> None:
await self.set_state(key=key, state=None)
if with_data:
await self.set_data(key=key, data={})
async def wait_closed(self) -> None:
pass
async def close(self) -> None:
self.data.clear()

View file

@ -0,0 +1,46 @@
from typing import Any, Optional
from warnings import warn
from .base import BaseStorage
def warn_storage_is_dummy() -> None:
warn(
"You havent set any storage yet so no states and no data will be saved. \n"
"You can connect MemoryStorage for debug purposes or non-essential data.",
UserWarning,
5,
)
return None
class DummyStorage(BaseStorage[Any]):
async def get_state(self, key: str, default: Optional[str] = None) -> None:
return warn_storage_is_dummy()
async def set_state(self, key: str, state: Optional[str]) -> None:
return warn_storage_is_dummy()
async def get_data(self, key: str, default: Any = None) -> None:
return warn_storage_is_dummy()
async def set_data(self, key: str, data: Any) -> None:
return warn_storage_is_dummy()
async def update_data(self, key: str, data: Any) -> None:
return warn_storage_is_dummy()
async def close(self) -> None:
return warn_storage_is_dummy()
async def wait_closed(self) -> None:
return warn_storage_is_dummy()
async def reset_data(self, key: str) -> None:
return warn_storage_is_dummy()
async def reset_state(self, key: str, with_data: Optional[bool] = True) -> None:
return warn_storage_is_dummy()
async def finish(self, key: str) -> None:
return warn_storage_is_dummy()

View file

@ -0,0 +1,3 @@
# todo
class FSMStorageBaseError(Exception):
pass