Small changes

This commit is contained in:
Alex Root Junior 2023-10-07 18:48:20 +03:00
parent 6c281188b3
commit d4248c5672
No known key found for this signature in database
GPG key ID: 074C1D455EBEA4AC
5 changed files with 191 additions and 184 deletions

View file

@ -2,7 +2,7 @@ from __future__ import annotations
import inspect
from collections import defaultdict
from dataclasses import dataclass
from dataclasses import dataclass, replace
from enum import Enum, auto
from typing import Any, ClassVar, Dict, List, Optional, Tuple, Type, Union
@ -14,10 +14,86 @@ from aiogram.dispatcher.event.handler import CallableObject, CallbackType
from aiogram.exceptions import SceneException
from aiogram.filters import StateFilter
from aiogram.fsm.context import FSMContext
from aiogram.scenes._history import HistoryManager
from aiogram.fsm.storage.memory import MemoryStorageRecord
from aiogram.types import TelegramObject, Update
class HistoryManager:
def __init__(self, state: FSMContext, destiny: str = "scenes_history", size: int = 10):
self._size = size
self._state = state
self._history_state = FSMContext(
storage=state.storage, key=replace(state.key, destiny=destiny)
)
async def push(self, state: Optional[str], data: Dict[str, Any]) -> None:
history_data = await self._history_state.get_data()
history = history_data.setdefault("history", [])
history.append({"state": state, "data": data})
if len(history) > self._size:
history = history[-self._size :]
loggers.scene.debug("Push state=%s data=%s to history", state, data)
if not history:
await self._history_state.set_data({})
else:
await self._history_state.update_data(history=history)
async def pop(self) -> Optional[MemoryStorageRecord]:
history_data = await self._history_state.get_data()
history = history_data.setdefault("history", [])
if not history:
return None
record = history.pop()
state = record["state"]
data = record["data"]
if not history:
await self._history_state.set_data({})
else:
await self._history_state.update_data(history=history)
loggers.scene.debug("Pop state=%s data=%s from history", state, data)
return MemoryStorageRecord(state=state, data=data)
async def get(self) -> Optional[MemoryStorageRecord]:
history_data = await self._history_state.get_data()
history = history_data.setdefault("history", [])
if not history:
return None
return MemoryStorageRecord(**history[-1])
async def all(self) -> List[MemoryStorageRecord]:
history_data = await self._history_state.get_data()
history = history_data.setdefault("history", [])
return [MemoryStorageRecord(**item) for item in history]
async def clear(self) -> None:
loggers.scene.debug("Clear history")
await self._history_state.set_data({})
async def snapshot(self) -> None:
state = await self._state.get_state()
data = await self._state.get_data()
await self.push(state, data)
async def _set_state(self, state: Optional[str], data: Dict[str, Any]) -> None:
await self._state.set_state(state)
await self._state.set_data(data)
async def rollback(self) -> Optional[str]:
previous_state = await self.pop()
if not previous_state:
await self._set_state(None, {})
return None
loggers.scene.debug(
"Rollback to state=%s data=%s",
previous_state.state,
previous_state.data,
)
await self._set_state(previous_state.state, previous_state.data)
return previous_state.state
class ObserverDecorator:
def __init__(
self,
@ -148,68 +224,6 @@ async def _empty_handler(*args: Any, **kwargs: Any) -> None:
pass
class _SceneMeta(type):
def __new__(
mcs,
name: str,
bases: Tuple[type],
namespace: Dict[str, Any],
**kwargs: Any,
) -> _SceneMeta:
state_name = kwargs.pop("state", None)
filters: defaultdict[str, List[CallbackType]] = defaultdict(list)
handlers: list[HandlerContainer] = []
actions: defaultdict[SceneAction, Dict[str, CallableObject]] = defaultdict(dict)
reset_data_on_enter = kwargs.pop("reset_data_on_enter", None)
reset_history_on_enter = kwargs.pop("reset_history_on_enter", None)
callback_query_without_state = kwargs.pop("callback_query_without_state", None)
for base in bases:
if not issubclass(base, Scene):
continue
parent_scene_config = base.__scene_config__
filters.update(parent_scene_config.filters)
handlers.extend(parent_scene_config.handlers)
for action, action_handlers in parent_scene_config.actions.items():
actions[action].update(action_handlers)
if reset_data_on_enter is None:
reset_data_on_enter = parent_scene_config.reset_data_on_enter
if reset_history_on_enter is None:
reset_history_on_enter = parent_scene_config.reset_history_on_enter
if callback_query_without_state is None:
callback_query_without_state = parent_scene_config.callback_query_without_state
for name, value in namespace.items():
if scene_handlers := getattr(value, "__aiogram_handler__", None):
handlers.extend(scene_handlers)
if isinstance(value, ObserverDecorator):
handlers.append(
HandlerContainer(
value.name,
_empty_handler,
value.filters,
after=value.after,
)
)
if hasattr(value, "__aiogram_action__"):
for action, action_handlers in value.__aiogram_action__.items():
actions[action].update(action_handlers)
namespace["__scene_config__"] = SceneConfig(
state=state_name,
filters=dict(filters),
handlers=handlers,
actions=dict(actions),
reset_data_on_enter=reset_data_on_enter,
reset_history_on_enter=reset_history_on_enter,
callback_query_without_state=callback_query_without_state,
)
return super().__new__(mcs, name, bases, namespace, **kwargs)
class SceneHandlerWrapper:
def __init__(
self,
@ -224,11 +238,11 @@ class SceneHandlerWrapper:
async def __call__(
self,
event: TelegramObject,
state: FSMContext,
scenes: ScenesManager,
event_update: Update,
**kwargs: Any,
) -> Any:
state: FSMContext = kwargs["state"]
scenes: ScenesManager = kwargs["scenes"]
event_update: Update = kwargs["event_update"]
scene = self.scene(
wizard=SceneWizard(
scene_config=self.scene.__scene_config__,
@ -240,13 +254,8 @@ class SceneHandlerWrapper:
)
)
result = await self.handler.call(
scene,
event,
state=state,
event_update=event_update,
**kwargs,
)
result = await self.handler.call(scene, event, **kwargs)
if self.after:
action_container = ActionContainer(
"after",
@ -268,8 +277,22 @@ class SceneHandlerWrapper:
return result
class Scene(metaclass=_SceneMeta):
class Scene:
"""
Represents a scene in a conversation flow.
A scene is a specific state in a conversation where certain actions can take place.
Each scene has a set of filters that determine when it should be triggered,
and a set of handlers that define the actions to be executed when the scene is active.
.. note::
This class is not meant to be used directly. Instead, it should be subclassed
to define custom scenes.
"""
__scene_config__: ClassVar[SceneConfig]
"""Scene configuration."""
def __init__(
self,
@ -278,8 +301,74 @@ class Scene(metaclass=_SceneMeta):
self.wizard = wizard
self.wizard.scene = self
def __init_subclass__(cls, **kwargs: Any) -> None:
state_name = kwargs.pop("state", None)
reset_data_on_enter = kwargs.pop("reset_data_on_enter", None)
reset_history_on_enter = kwargs.pop("reset_history_on_enter", None)
callback_query_without_state = kwargs.pop("callback_query_without_state", None)
super().__init_subclass__(**kwargs)
filters: defaultdict[str, List[CallbackType]] = defaultdict(list)
handlers: list[HandlerContainer] = []
actions: defaultdict[SceneAction, Dict[str, CallableObject]] = defaultdict(dict)
for base in cls.__bases__:
if not issubclass(base, Scene):
continue
parent_scene_config = getattr(base, "__scene_config__", None)
if not parent_scene_config:
continue
filters.update(parent_scene_config.filters)
handlers.extend(parent_scene_config.handlers)
for action, action_handlers in parent_scene_config.actions.items():
actions[action].update(action_handlers)
if reset_data_on_enter is None:
reset_data_on_enter = parent_scene_config.reset_data_on_enter
if reset_history_on_enter is None:
reset_history_on_enter = parent_scene_config.reset_history_on_enter
if callback_query_without_state is None:
callback_query_without_state = parent_scene_config.callback_query_without_state
for name in vars(cls):
value = getattr(cls, name)
if scene_handlers := getattr(value, "__aiogram_handler__", None):
handlers.extend(scene_handlers)
if isinstance(value, ObserverDecorator):
handlers.append(
HandlerContainer(
value.name,
_empty_handler,
value.filters,
after=value.after,
)
)
if hasattr(value, "__aiogram_action__"):
for action, action_handlers in value.__aiogram_action__.items():
actions[action].update(action_handlers)
cls.__scene_config__ = SceneConfig(
state=state_name,
filters=dict(filters),
handlers=handlers,
actions=dict(actions),
reset_data_on_enter=reset_data_on_enter,
reset_history_on_enter=reset_history_on_enter,
callback_query_without_state=callback_query_without_state,
)
@classmethod
def add_to_router(cls, router: Router) -> None:
"""
Adds the scene to the given router.
:param router:
:return:
"""
scene_config = cls.__scene_config__
used_observers = set()
@ -304,16 +393,30 @@ class Scene(metaclass=_SceneMeta):
router.observers[observer_name].filter(StateFilter(scene_config.state))
@classmethod
def as_router(cls) -> Router:
name = (
f"Scene '{cls.__module__}.{cls.__qualname__}' for state {cls.__scene_config__.state!r}"
)
def as_router(cls, name: Optional[str] = None) -> Router:
"""
Returns the scene as a router.
:return: new router
"""
if name is None:
name = (
f"Scene '{cls.__module__}.{cls.__qualname__}' "
f"for state {cls.__scene_config__.state!r}"
)
router = Router(name=name)
cls.add_to_router(router)
return router
@classmethod
def as_handler(cls) -> CallbackType:
"""
Create an entry point handler for the scene, can be used to simplify the handler
that starts the scene.
>>> router.message.register(MyScene.as_handler(), Command("start"))
"""
async def enter_to_scene_handler(event: TelegramObject, scenes: ScenesManager) -> None:
await scenes.enter(cls)
@ -366,8 +469,8 @@ class SceneWizard:
new_scene = await self.manager.history.rollback()
await self.manager.enter(new_scene, _check_active=False, **kwargs)
async def replay(self, event: TelegramObject) -> None:
await self._on_action(SceneAction.enter, event=event)
async def retake(self, **kwargs: Any) -> None:
await self.goto(self.scene_config.state, **kwargs)
async def goto(self, scene: Union[Type[Scene], str], **kwargs: Any) -> None:
await self.leave(**kwargs)
@ -394,6 +497,7 @@ class SceneWizard:
self.scene_config.state,
)
return False
await action_config[event_type].call(self.scene, self.event, **{**self.data, **kwargs})
return True
@ -643,3 +747,6 @@ class OnMarker:
my_chat_member = ObserverMarker("my_chat_member")
chat_member = ObserverMarker("chat_member")
chat_join_request = ObserverMarker("chat_join_request")
on = OnMarker()

View file

@ -1,14 +0,0 @@
from __future__ import annotations
__all__ = [
"Scene",
"ScenesManager",
"SceneRegistry",
"OnMarker",
"After",
"on",
]
from ._scene import After, OnMarker, Scene, SceneRegistry, ScenesManager
on = OnMarker()

View file

@ -1,82 +0,0 @@
from dataclasses import replace
from typing import Any, Dict, List, Optional
from aiogram import loggers
from aiogram.fsm.context import FSMContext
from aiogram.fsm.storage.memory import MemoryStorageRecord
class HistoryManager:
def __init__(self, state: FSMContext, destiny: str = "scenes_history", size: int = 10):
self._size = size
self._state = state
self._history_state = FSMContext(
storage=state.storage, key=replace(state.key, destiny=destiny)
)
async def push(self, state: Optional[str], data: Dict[str, Any]) -> None:
history_data = await self._history_state.get_data()
history = history_data.setdefault("history", [])
history.append({"state": state, "data": data})
if len(history) > self._size:
history = history[-self._size :]
loggers.scene.debug("Push state=%s data=%s to history", state, data)
if not history:
await self._history_state.set_data({})
else:
await self._history_state.update_data(history=history)
async def pop(self) -> Optional[MemoryStorageRecord]:
history_data = await self._history_state.get_data()
history = history_data.setdefault("history", [])
if not history:
return None
record = history.pop()
state = record["state"]
data = record["data"]
if not history:
await self._history_state.set_data({})
else:
await self._history_state.update_data(history=history)
loggers.scene.debug("Pop state=%s data=%s from history", state, data)
return MemoryStorageRecord(state=state, data=data)
async def get(self) -> Optional[MemoryStorageRecord]:
history_data = await self._history_state.get_data()
history = history_data.setdefault("history", [])
if not history:
return None
return MemoryStorageRecord(**history[-1])
async def all(self) -> List[MemoryStorageRecord]:
history_data = await self._history_state.get_data()
history = history_data.setdefault("history", [])
return [MemoryStorageRecord(**item) for item in history]
async def clear(self) -> None:
loggers.scene.debug("Clear history")
await self._history_state.set_data({})
async def snapshot(self) -> None:
state = await self._state.get_state()
data = await self._state.get_data()
await self.push(state, data)
async def _set_state(self, state: Optional[str], data: Dict[str, Any]) -> None:
await self._state.set_state(state)
await self._state.set_data(data)
async def rollback(self) -> Optional[str]:
previous_state = await self.pop()
if not previous_state:
await self._set_state(None, {})
return None
loggers.scene.debug(
"Rollback to state=%s data=%s",
previous_state.state,
previous_state.data,
)
await self._set_state(previous_state.state, previous_state.data)
return previous_state.state

View file

@ -31,9 +31,7 @@ Download file by `file_path` to destination.
If you want to automatically create destination (:obj:`io.BytesIO`) use default
value of destination and handle result of this method.
.. autoclass:: aiogram.client.bot.Bot
:members: download_file
:exclude-members: __init__
.. automethod:: aiogram.client.bot.Bot.download_file
There are two options where you can download the file: to **disk** or to **binary I/O object**.
@ -81,9 +79,7 @@ Download file by `file_id` or `Downloadable` object to destination.
If you want to automatically create destination (:obj:`io.BytesIO`) use default
value of destination and handle result of this method.
.. autoclass:: aiogram.client.bot.Bot
:members: download
:exclude-members: __init__
.. automethod:: aiogram.client.bot.Bot.download
It differs from `download_file <#download-file>`__ **only** in that it accepts `file_id`
or an `Downloadable` object (object that contains the `file_id` attribute) instead of `file_path`.

View file

@ -5,7 +5,7 @@ from typing import TypedDict
from aiogram import Bot, Dispatcher, F, html
from aiogram.filters import Command
from aiogram.scenes import After, Scene, SceneRegistry, on
from aiogram.fsm.scene import After, Scene, SceneRegistry, on
from aiogram.types import (
CallbackQuery,
InlineKeyboardButton,