From d4248c5672b203344621eed0042d9fc0f7719e01 Mon Sep 17 00:00:00 2001 From: Alex Root Junior Date: Sat, 7 Oct 2023 18:48:20 +0300 Subject: [PATCH] Small changes --- aiogram/{scenes/_scene.py => fsm/scene.py} | 269 ++++++++++++++------- aiogram/scenes/__init__.py | 14 -- aiogram/scenes/_history.py | 82 ------- docs/api/download_file.rst | 8 +- examples/scene.py | 2 +- 5 files changed, 191 insertions(+), 184 deletions(-) rename aiogram/{scenes/_scene.py => fsm/scene.py} (81%) delete mode 100644 aiogram/scenes/__init__.py delete mode 100644 aiogram/scenes/_history.py diff --git a/aiogram/scenes/_scene.py b/aiogram/fsm/scene.py similarity index 81% rename from aiogram/scenes/_scene.py rename to aiogram/fsm/scene.py index 3f0f455e..8c236db4 100644 --- a/aiogram/scenes/_scene.py +++ b/aiogram/fsm/scene.py @@ -2,7 +2,7 @@ from __future__ import annotations import inspect from collections import defaultdict -from dataclasses import dataclass +from dataclasses import dataclass, replace from enum import Enum, auto from typing import Any, ClassVar, Dict, List, Optional, Tuple, Type, Union @@ -14,10 +14,86 @@ from aiogram.dispatcher.event.handler import CallableObject, CallbackType from aiogram.exceptions import SceneException from aiogram.filters import StateFilter from aiogram.fsm.context import FSMContext -from aiogram.scenes._history import HistoryManager +from aiogram.fsm.storage.memory import MemoryStorageRecord from aiogram.types import TelegramObject, Update +class HistoryManager: + def __init__(self, state: FSMContext, destiny: str = "scenes_history", size: int = 10): + self._size = size + self._state = state + self._history_state = FSMContext( + storage=state.storage, key=replace(state.key, destiny=destiny) + ) + + async def push(self, state: Optional[str], data: Dict[str, Any]) -> None: + history_data = await self._history_state.get_data() + history = history_data.setdefault("history", []) + history.append({"state": state, "data": data}) + if len(history) > self._size: + history = history[-self._size :] + loggers.scene.debug("Push state=%s data=%s to history", state, data) + + if not history: + await self._history_state.set_data({}) + else: + await self._history_state.update_data(history=history) + + async def pop(self) -> Optional[MemoryStorageRecord]: + history_data = await self._history_state.get_data() + history = history_data.setdefault("history", []) + if not history: + return None + record = history.pop() + state = record["state"] + data = record["data"] + if not history: + await self._history_state.set_data({}) + else: + await self._history_state.update_data(history=history) + loggers.scene.debug("Pop state=%s data=%s from history", state, data) + return MemoryStorageRecord(state=state, data=data) + + async def get(self) -> Optional[MemoryStorageRecord]: + history_data = await self._history_state.get_data() + history = history_data.setdefault("history", []) + if not history: + return None + return MemoryStorageRecord(**history[-1]) + + async def all(self) -> List[MemoryStorageRecord]: + history_data = await self._history_state.get_data() + history = history_data.setdefault("history", []) + return [MemoryStorageRecord(**item) for item in history] + + async def clear(self) -> None: + loggers.scene.debug("Clear history") + await self._history_state.set_data({}) + + async def snapshot(self) -> None: + state = await self._state.get_state() + data = await self._state.get_data() + await self.push(state, data) + + async def _set_state(self, state: Optional[str], data: Dict[str, Any]) -> None: + await self._state.set_state(state) + await self._state.set_data(data) + + async def rollback(self) -> Optional[str]: + previous_state = await self.pop() + if not previous_state: + await self._set_state(None, {}) + return None + + loggers.scene.debug( + "Rollback to state=%s data=%s", + previous_state.state, + previous_state.data, + ) + await self._set_state(previous_state.state, previous_state.data) + return previous_state.state + + class ObserverDecorator: def __init__( self, @@ -148,68 +224,6 @@ async def _empty_handler(*args: Any, **kwargs: Any) -> None: pass -class _SceneMeta(type): - def __new__( - mcs, - name: str, - bases: Tuple[type], - namespace: Dict[str, Any], - **kwargs: Any, - ) -> _SceneMeta: - state_name = kwargs.pop("state", None) - filters: defaultdict[str, List[CallbackType]] = defaultdict(list) - handlers: list[HandlerContainer] = [] - actions: defaultdict[SceneAction, Dict[str, CallableObject]] = defaultdict(dict) - - reset_data_on_enter = kwargs.pop("reset_data_on_enter", None) - reset_history_on_enter = kwargs.pop("reset_history_on_enter", None) - callback_query_without_state = kwargs.pop("callback_query_without_state", None) - - for base in bases: - if not issubclass(base, Scene): - continue - parent_scene_config = base.__scene_config__ - - filters.update(parent_scene_config.filters) - handlers.extend(parent_scene_config.handlers) - for action, action_handlers in parent_scene_config.actions.items(): - actions[action].update(action_handlers) - - if reset_data_on_enter is None: - reset_data_on_enter = parent_scene_config.reset_data_on_enter - if reset_history_on_enter is None: - reset_history_on_enter = parent_scene_config.reset_history_on_enter - if callback_query_without_state is None: - callback_query_without_state = parent_scene_config.callback_query_without_state - - for name, value in namespace.items(): - if scene_handlers := getattr(value, "__aiogram_handler__", None): - handlers.extend(scene_handlers) - if isinstance(value, ObserverDecorator): - handlers.append( - HandlerContainer( - value.name, - _empty_handler, - value.filters, - after=value.after, - ) - ) - if hasattr(value, "__aiogram_action__"): - for action, action_handlers in value.__aiogram_action__.items(): - actions[action].update(action_handlers) - - namespace["__scene_config__"] = SceneConfig( - state=state_name, - filters=dict(filters), - handlers=handlers, - actions=dict(actions), - reset_data_on_enter=reset_data_on_enter, - reset_history_on_enter=reset_history_on_enter, - callback_query_without_state=callback_query_without_state, - ) - return super().__new__(mcs, name, bases, namespace, **kwargs) - - class SceneHandlerWrapper: def __init__( self, @@ -224,11 +238,11 @@ class SceneHandlerWrapper: async def __call__( self, event: TelegramObject, - state: FSMContext, - scenes: ScenesManager, - event_update: Update, **kwargs: Any, ) -> Any: + state: FSMContext = kwargs["state"] + scenes: ScenesManager = kwargs["scenes"] + event_update: Update = kwargs["event_update"] scene = self.scene( wizard=SceneWizard( scene_config=self.scene.__scene_config__, @@ -240,13 +254,8 @@ class SceneHandlerWrapper: ) ) - result = await self.handler.call( - scene, - event, - state=state, - event_update=event_update, - **kwargs, - ) + result = await self.handler.call(scene, event, **kwargs) + if self.after: action_container = ActionContainer( "after", @@ -268,8 +277,22 @@ class SceneHandlerWrapper: return result -class Scene(metaclass=_SceneMeta): +class Scene: + """ + Represents a scene in a conversation flow. + + A scene is a specific state in a conversation where certain actions can take place. + + Each scene has a set of filters that determine when it should be triggered, + and a set of handlers that define the actions to be executed when the scene is active. + + .. note:: + This class is not meant to be used directly. Instead, it should be subclassed + to define custom scenes. + """ + __scene_config__: ClassVar[SceneConfig] + """Scene configuration.""" def __init__( self, @@ -278,8 +301,74 @@ class Scene(metaclass=_SceneMeta): self.wizard = wizard self.wizard.scene = self + def __init_subclass__(cls, **kwargs: Any) -> None: + state_name = kwargs.pop("state", None) + reset_data_on_enter = kwargs.pop("reset_data_on_enter", None) + reset_history_on_enter = kwargs.pop("reset_history_on_enter", None) + callback_query_without_state = kwargs.pop("callback_query_without_state", None) + + super().__init_subclass__(**kwargs) + + filters: defaultdict[str, List[CallbackType]] = defaultdict(list) + handlers: list[HandlerContainer] = [] + actions: defaultdict[SceneAction, Dict[str, CallableObject]] = defaultdict(dict) + + for base in cls.__bases__: + if not issubclass(base, Scene): + continue + + parent_scene_config = getattr(base, "__scene_config__", None) + if not parent_scene_config: + continue + + filters.update(parent_scene_config.filters) + handlers.extend(parent_scene_config.handlers) + for action, action_handlers in parent_scene_config.actions.items(): + actions[action].update(action_handlers) + + if reset_data_on_enter is None: + reset_data_on_enter = parent_scene_config.reset_data_on_enter + if reset_history_on_enter is None: + reset_history_on_enter = parent_scene_config.reset_history_on_enter + if callback_query_without_state is None: + callback_query_without_state = parent_scene_config.callback_query_without_state + + for name in vars(cls): + value = getattr(cls, name) + + if scene_handlers := getattr(value, "__aiogram_handler__", None): + handlers.extend(scene_handlers) + if isinstance(value, ObserverDecorator): + handlers.append( + HandlerContainer( + value.name, + _empty_handler, + value.filters, + after=value.after, + ) + ) + if hasattr(value, "__aiogram_action__"): + for action, action_handlers in value.__aiogram_action__.items(): + actions[action].update(action_handlers) + + cls.__scene_config__ = SceneConfig( + state=state_name, + filters=dict(filters), + handlers=handlers, + actions=dict(actions), + reset_data_on_enter=reset_data_on_enter, + reset_history_on_enter=reset_history_on_enter, + callback_query_without_state=callback_query_without_state, + ) + @classmethod def add_to_router(cls, router: Router) -> None: + """ + Adds the scene to the given router. + + :param router: + :return: + """ scene_config = cls.__scene_config__ used_observers = set() @@ -304,16 +393,30 @@ class Scene(metaclass=_SceneMeta): router.observers[observer_name].filter(StateFilter(scene_config.state)) @classmethod - def as_router(cls) -> Router: - name = ( - f"Scene '{cls.__module__}.{cls.__qualname__}' for state {cls.__scene_config__.state!r}" - ) + def as_router(cls, name: Optional[str] = None) -> Router: + """ + Returns the scene as a router. + + :return: new router + """ + if name is None: + name = ( + f"Scene '{cls.__module__}.{cls.__qualname__}' " + f"for state {cls.__scene_config__.state!r}" + ) router = Router(name=name) cls.add_to_router(router) return router @classmethod def as_handler(cls) -> CallbackType: + """ + Create an entry point handler for the scene, can be used to simplify the handler + that starts the scene. + + >>> router.message.register(MyScene.as_handler(), Command("start")) + """ + async def enter_to_scene_handler(event: TelegramObject, scenes: ScenesManager) -> None: await scenes.enter(cls) @@ -366,8 +469,8 @@ class SceneWizard: new_scene = await self.manager.history.rollback() await self.manager.enter(new_scene, _check_active=False, **kwargs) - async def replay(self, event: TelegramObject) -> None: - await self._on_action(SceneAction.enter, event=event) + async def retake(self, **kwargs: Any) -> None: + await self.goto(self.scene_config.state, **kwargs) async def goto(self, scene: Union[Type[Scene], str], **kwargs: Any) -> None: await self.leave(**kwargs) @@ -394,6 +497,7 @@ class SceneWizard: self.scene_config.state, ) return False + await action_config[event_type].call(self.scene, self.event, **{**self.data, **kwargs}) return True @@ -643,3 +747,6 @@ class OnMarker: my_chat_member = ObserverMarker("my_chat_member") chat_member = ObserverMarker("chat_member") chat_join_request = ObserverMarker("chat_join_request") + + +on = OnMarker() diff --git a/aiogram/scenes/__init__.py b/aiogram/scenes/__init__.py deleted file mode 100644 index fc2d63d7..00000000 --- a/aiogram/scenes/__init__.py +++ /dev/null @@ -1,14 +0,0 @@ -from __future__ import annotations - -__all__ = [ - "Scene", - "ScenesManager", - "SceneRegistry", - "OnMarker", - "After", - "on", -] - -from ._scene import After, OnMarker, Scene, SceneRegistry, ScenesManager - -on = OnMarker() diff --git a/aiogram/scenes/_history.py b/aiogram/scenes/_history.py deleted file mode 100644 index 09ae2861..00000000 --- a/aiogram/scenes/_history.py +++ /dev/null @@ -1,82 +0,0 @@ -from dataclasses import replace -from typing import Any, Dict, List, Optional - -from aiogram import loggers -from aiogram.fsm.context import FSMContext -from aiogram.fsm.storage.memory import MemoryStorageRecord - - -class HistoryManager: - def __init__(self, state: FSMContext, destiny: str = "scenes_history", size: int = 10): - self._size = size - self._state = state - self._history_state = FSMContext( - storage=state.storage, key=replace(state.key, destiny=destiny) - ) - - async def push(self, state: Optional[str], data: Dict[str, Any]) -> None: - history_data = await self._history_state.get_data() - history = history_data.setdefault("history", []) - history.append({"state": state, "data": data}) - if len(history) > self._size: - history = history[-self._size :] - loggers.scene.debug("Push state=%s data=%s to history", state, data) - - if not history: - await self._history_state.set_data({}) - else: - await self._history_state.update_data(history=history) - - async def pop(self) -> Optional[MemoryStorageRecord]: - history_data = await self._history_state.get_data() - history = history_data.setdefault("history", []) - if not history: - return None - record = history.pop() - state = record["state"] - data = record["data"] - if not history: - await self._history_state.set_data({}) - else: - await self._history_state.update_data(history=history) - loggers.scene.debug("Pop state=%s data=%s from history", state, data) - return MemoryStorageRecord(state=state, data=data) - - async def get(self) -> Optional[MemoryStorageRecord]: - history_data = await self._history_state.get_data() - history = history_data.setdefault("history", []) - if not history: - return None - return MemoryStorageRecord(**history[-1]) - - async def all(self) -> List[MemoryStorageRecord]: - history_data = await self._history_state.get_data() - history = history_data.setdefault("history", []) - return [MemoryStorageRecord(**item) for item in history] - - async def clear(self) -> None: - loggers.scene.debug("Clear history") - await self._history_state.set_data({}) - - async def snapshot(self) -> None: - state = await self._state.get_state() - data = await self._state.get_data() - await self.push(state, data) - - async def _set_state(self, state: Optional[str], data: Dict[str, Any]) -> None: - await self._state.set_state(state) - await self._state.set_data(data) - - async def rollback(self) -> Optional[str]: - previous_state = await self.pop() - if not previous_state: - await self._set_state(None, {}) - return None - - loggers.scene.debug( - "Rollback to state=%s data=%s", - previous_state.state, - previous_state.data, - ) - await self._set_state(previous_state.state, previous_state.data) - return previous_state.state diff --git a/docs/api/download_file.rst b/docs/api/download_file.rst index d60e8051..06450ba3 100644 --- a/docs/api/download_file.rst +++ b/docs/api/download_file.rst @@ -31,9 +31,7 @@ Download file by `file_path` to destination. If you want to automatically create destination (:obj:`io.BytesIO`) use default value of destination and handle result of this method. -.. autoclass:: aiogram.client.bot.Bot - :members: download_file - :exclude-members: __init__ +.. automethod:: aiogram.client.bot.Bot.download_file There are two options where you can download the file: to **disk** or to **binary I/O object**. @@ -81,9 +79,7 @@ Download file by `file_id` or `Downloadable` object to destination. If you want to automatically create destination (:obj:`io.BytesIO`) use default value of destination and handle result of this method. -.. autoclass:: aiogram.client.bot.Bot - :members: download - :exclude-members: __init__ +.. automethod:: aiogram.client.bot.Bot.download It differs from `download_file <#download-file>`__ **only** in that it accepts `file_id` or an `Downloadable` object (object that contains the `file_id` attribute) instead of `file_path`. diff --git a/examples/scene.py b/examples/scene.py index 93583af4..aab1f03c 100644 --- a/examples/scene.py +++ b/examples/scene.py @@ -5,7 +5,7 @@ from typing import TypedDict from aiogram import Bot, Dispatcher, F, html from aiogram.filters import Command -from aiogram.scenes import After, Scene, SceneRegistry, on +from aiogram.fsm.scene import After, Scene, SceneRegistry, on from aiogram.types import ( CallbackQuery, InlineKeyboardButton,