From 76b64a85bf43b8ebc466482c243a0036bb66f0e9 Mon Sep 17 00:00:00 2001 From: Alex Root Junior Date: Sat, 26 Aug 2023 03:27:51 +0300 Subject: [PATCH] Revise aiogram/scenes with wizard-based design pattern Modified files in aiogram/scenes to incorporate the Wizard design pattern. Files affected are _marker.py, _registry.py, _wizard.py and __init__.py. The changes introduced a SceneWizard Class and ScenesManager, both of which aid in controlling navigation between different scenes or states. This helps clarifying the codebase, streamline scene transitions and offer more control over the app flow. --- aiogram/scenes/__init__.py | 9 +- aiogram/scenes/_history.py | 49 ++-- aiogram/scenes/_marker.py | 97 -------- aiogram/scenes/_registry.py | 70 ------ aiogram/scenes/_scene.py | 463 +++++++++++++++++++++++++++--------- aiogram/scenes/_wizard.py | 79 ------ 6 files changed, 385 insertions(+), 382 deletions(-) delete mode 100644 aiogram/scenes/_marker.py delete mode 100644 aiogram/scenes/_registry.py delete mode 100644 aiogram/scenes/_wizard.py diff --git a/aiogram/scenes/__init__.py b/aiogram/scenes/__init__.py index 98b95e3a..fc2d63d7 100644 --- a/aiogram/scenes/__init__.py +++ b/aiogram/scenes/__init__.py @@ -2,14 +2,13 @@ from __future__ import annotations __all__ = [ "Scene", + "ScenesManager", "SceneRegistry", - "Wizard", + "OnMarker", + "After", "on", ] -from ._wizard import Wizard -from ._marker import OnMarker -from ._registry import SceneRegistry -from ._scene import Scene +from ._scene import After, OnMarker, Scene, SceneRegistry, ScenesManager on = OnMarker() diff --git a/aiogram/scenes/_history.py b/aiogram/scenes/_history.py index c34044af..659c58f8 100644 --- a/aiogram/scenes/_history.py +++ b/aiogram/scenes/_history.py @@ -1,5 +1,5 @@ -from dataclasses import replace, dataclass -from typing import Any, Dict, Optional +from dataclasses import dataclass, replace +from typing import Any, Dict, List, Optional from aiogram import loggers from aiogram.fsm.context import FSMContext @@ -7,51 +7,58 @@ from aiogram.fsm.context import FSMContext @dataclass class StateContainer: - state: str + state: Optional[str] data: Dict[str, Any] class HistoryManager: def __init__(self, context: FSMContext, destiny: str = "history", size: int = 10): self._size = size - self._context = context - self._history_context = FSMContext( + self._state = context + self._history_state = FSMContext( storage=context.storage, key=replace(context.key, destiny=destiny) ) - async def push(self, state: str, data: Dict[str, Any]) -> None: - history_data = await self._history_context.get_data() + async def push(self, state: Optional[str], data: Dict[str, Any]) -> None: + history_data = await self._history_state.get_data() history = history_data.setdefault("history", []) history.append({"state": state, "data": data}) if len(history) > self._size: history = history[-self._size :] loggers.scene.debug("Push state=%s data=%s", state, data) - await self._history_context.update_data(history=history) + await self._history_state.update_data(history=history) async def pop(self) -> Optional[StateContainer]: - history_data = await self._history_context.get_data() + history_data = await self._history_state.get_data() history = history_data.setdefault("history", []) if not history: return None record = history.pop() state = record["state"] data = record["data"] - await self._history_context.update_data(history=history) + await self._history_state.update_data(history=history) loggers.scene.debug("Pop state=%s data=%s", state, data) return StateContainer(state=state, data=data) - async def clear(self): - loggers.scene.debug("Clear history") - await self._history_context.clear() + async def get(self) -> Optional[StateContainer]: + history_data = await self._history_state.get_data() + history = history_data.setdefault("history", []) + if not history: + return None + return StateContainer(**history[-1]) - async def get(self) -> list[StateContainer]: - history_data = await self._history_context.get_data() + async def all(self) -> List[StateContainer]: + history_data = await self._history_state.get_data() history = history_data.setdefault("history", []) return [StateContainer(**item) for item in history] + async def clear(self) -> None: + loggers.scene.debug("Clear history") + await self._history_state.clear() + async def snapshot(self) -> None: - state = await self._context.get_state() - data = await self._context.get_data() + state = await self._state.get_state() + data = await self._state.get_data() await self.push(state, data) async def rollback(self) -> Optional[str]: @@ -59,15 +66,11 @@ class HistoryManager: if not state_container: return None - state_container = await self.pop() - if not state_container: - return None - loggers.scene.debug( "Rollback to state=%s data=%s", state_container.state, state_container.data, ) - await self._context.set_state(state_container.state) - await self._context.set_data(state_container.data) + await self._state.set_state(state_container.state) + await self._state.set_data(state_container.data) return state_container.state diff --git a/aiogram/scenes/_marker.py b/aiogram/scenes/_marker.py deleted file mode 100644 index 3e60f12d..00000000 --- a/aiogram/scenes/_marker.py +++ /dev/null @@ -1,97 +0,0 @@ -from __future__ import annotations - -from typing import Union, Optional - -from aiogram.dispatcher.event.handler import CallbackType -from ._scene import Scene, ObserverDecorator, SceneAction - - -class ObserverMarker: - def __init__(self, name: str) -> None: - self.name = name - - def __call__( - self, - *filters: CallbackType, - goto: Optional[Union[Scene, str]] = None, - exit_: bool = False, - leave: bool = False, - back: bool = False, - ) -> ObserverDecorator: - # Check that only one action is specified - if sum((goto is not None, exit_, leave, back)) > 1: - raise ValueError("Only one action is allowed") - - scene = None - action_after = None - if goto is not None: - action_after = SceneAction.enter - scene = goto - elif exit_: - action_after = SceneAction.exit - elif leave: - action_after = SceneAction.leave - elif back: - action_after = SceneAction.back - - return ObserverDecorator( - self.name, - filters, - action_after=action_after, - scene=scene, - ) - - def enter(self, *filters: CallbackType) -> ObserverDecorator: - return ObserverDecorator(self.name, filters, action=SceneAction.enter) - - def leave(self) -> ObserverDecorator: - return ObserverDecorator(self.name, (), action=SceneAction.leave) - - def exit(self) -> ObserverDecorator: - return ObserverDecorator(self.name, (), action=SceneAction.exit) - - def back(self) -> ObserverDecorator: - return ObserverDecorator(self.name, (), action=SceneAction.back) - - -class OnMarker: - """ - The `_On` class is used as a marker class to define different types of events in the Scenes. - - Attributes: - - - :code:`message`: Event marker for handling `Message` events. - - :code:`edited_message`: Event marker for handling edited `Message` events. - - :code:`channel_post`: Event marker for handling channel `Post` events. - - :code:`edited_channel_post`: Event marker for handling edited channel `Post` events. - - :code:`inline_query`: Event marker for handling `InlineQuery` events. - - :code:`chosen_inline_result`: Event marker for handling chosen `InlineResult` events. - - :code:`callback_query`: Event marker for handling `CallbackQuery` events. - - :code:`shipping_query`: Event marker for handling `ShippingQuery` events. - - :code:`pre_checkout_query`: Event marker for handling `PreCheckoutQuery` events. - - :code:`poll`: Event marker for handling `Poll` events. - - :code:`poll_answer`: Event marker for handling `PollAnswer` events. - - :code:`my_chat_member`: Event marker for handling my chat `Member` events. - - :code:`chat_member`: Event marker for handling chat `Member` events. - - :code:`chat_join_request`: Event marker for handling chat `JoinRequest` events. - - :code:`error`: Event marker for handling `Error` events. - - .. note:: - - This is a marker class and does not contain any methods or implementation logic. - """ - - message = ObserverMarker("message") - edited_message = ObserverMarker("edited_message") - channel_post = ObserverMarker("channel_post") - edited_channel_post = ObserverMarker("edited_channel_post") - inline_query = ObserverMarker("inline_query") - chosen_inline_result = ObserverMarker("chosen_inline_result") - callback_query = ObserverMarker("callback_query") - shipping_query = ObserverMarker("shipping_query") - pre_checkout_query = ObserverMarker("pre_checkout_query") - poll = ObserverMarker("poll") - poll_answer = ObserverMarker("poll_answer") - my_chat_member = ObserverMarker("my_chat_member") - chat_member = ObserverMarker("chat_member") - chat_join_request = ObserverMarker("chat_join_request") diff --git a/aiogram/scenes/_registry.py b/aiogram/scenes/_registry.py deleted file mode 100644 index 0fa3e714..00000000 --- a/aiogram/scenes/_registry.py +++ /dev/null @@ -1,70 +0,0 @@ -from __future__ import annotations - -import inspect -from typing import Any, Dict, Optional, Type, Union - -from aiogram import Router - -from ..dispatcher.event.bases import NextMiddlewareType -from ..types import TelegramObject -from ._wizard import Wizard -from ._scene import Scene - - -class SceneRegistry: - def __init__(self, router: Router) -> None: - self.router = router - - for observer in router.observers.values(): - if observer.event_name in {"update", "error"}: - continue - observer.outer_middleware(self._middleware) - - self._scenes: Dict[str, Type[Scene]] = {} - - async def _middleware( - self, - handler: NextMiddlewareType[TelegramObject], - event: TelegramObject, - data: Dict[str, Any], - ) -> Any: - data["wizard"] = Wizard( - registry=self, - update=data["event_update"], - event=event, - context=data["state"], - data=data, - ) - return await handler(event, data) - - def add(self, *scenes: Type[Scene], router: Optional[Router] = None) -> None: - if router is None: - router = self.router - - for scene in scenes: - if scene.__scene_config__.name in self._scenes: - raise ValueError(f"Scene {scene.__scene_config__.name} already exists") - - self._scenes[scene.__scene_config__.name] = scene - - router.include_router(scene.as_router()) - - def get(self, scene: Union[Type[Scene], str]) -> Type[Scene]: - if inspect.isclass(scene) and issubclass(scene, Scene): - target = scene.__scene_config__.name - check_class = True - else: - target = scene - check_class = False - - if not isinstance(target, str): - raise TypeError("Scene must be a string or subclass of Scene") - try: - result = self._scenes[target] - except KeyError: - raise ValueError(f"Scene {scene!r} is not registered") - - if check_class and not issubclass(result, scene): - raise ValueError(f"Scene {scene!r} is not registered") - - return result diff --git a/aiogram/scenes/_scene.py b/aiogram/scenes/_scene.py index 7d15a3bc..aa128a4d 100644 --- a/aiogram/scenes/_scene.py +++ b/aiogram/scenes/_scene.py @@ -4,19 +4,29 @@ import inspect from collections import defaultdict from dataclasses import dataclass from enum import Enum, auto -from typing import TYPE_CHECKING, Any, ClassVar, Dict, List, Tuple, Type, Union, Optional +from typing import ( + TYPE_CHECKING, + Any, + ClassVar, + Dict, + List, + Optional, + Tuple, + Type, + Union, + cast, +) from typing_extensions import Self from aiogram import Router, loggers +from aiogram.dispatcher.event.bases import NextMiddlewareType from aiogram.dispatcher.event.handler import CallableObject, CallbackType from aiogram.filters import StateFilter from aiogram.fsm.context import FSMContext +from aiogram.scenes._history import HistoryManager from aiogram.types import TelegramObject, Update -if TYPE_CHECKING: - from ._wizard import Wizard - class ObserverDecorator: def __init__( @@ -24,14 +34,12 @@ class ObserverDecorator: name: str, filters: tuple[CallbackType, ...], action: SceneAction | None = None, - action_after: SceneAction | None = None, - scene: Union[Scene, str] | None = None, + after: Optional[After] = None, ) -> None: self.name = name self.filters = filters self.action = action - self.action_after = action_after - self.scene = scene + self.after = after def _wrap_class(self, target: Type[Scene]) -> None: if not issubclass(target, Scene): @@ -53,11 +61,10 @@ class ObserverDecorator: handlers.append( HandlerContainer( - self.name, - target, - self.filters, - self.action_after, - self.scene, + name=self.name, + handler=target, + filters=self.filters, + after=self.after, ) ) @@ -104,22 +111,22 @@ class ActionContainer: name: str, filters: Tuple[CallbackType, ...], action: SceneAction, - target: Type[Scene] | None = None, + target: Optional[Union[Type[Scene], str]] = None, ) -> None: self.name = name self.filters = filters self.action = action self.target = target - async def execute(self, scene: Scene) -> None: + async def execute(self, wizard: SceneWizard) -> None: if self.action == SceneAction.enter and self.target is not None: - await scene.goto(self.target) + await wizard.goto(self.target) elif self.action == SceneAction.leave: - await scene.leave() + await wizard.leave() elif self.action == SceneAction.exit: - await scene.exit() + await wizard.exit() elif self.action == SceneAction.back: - await scene.back() + await wizard.back() class HandlerContainer: @@ -128,23 +135,26 @@ class HandlerContainer: name: str, handler: CallbackType, filters: Tuple[CallbackType, ...], - action: Optional[SceneAction] = None, - scene: Optional[Union[Scene, str]] = None, + after: Optional[After] = None, ) -> None: self.name = name self.handler = handler self.filters = filters - self.action_container = ActionContainer(name, filters, action, scene) + self.after = after -@dataclass(slots=True) +@dataclass() class SceneConfig: - name: str + state: Optional[str] filters: Dict[str, List[CallbackType]] handlers: List[HandlerContainer] actions: Dict[SceneAction, Dict[str, CallableObject]] +async def _empty_handler(*args: Any, **kwargs: Any) -> None: + pass + + class _SceneMeta(type): def __new__( mcs, @@ -153,14 +163,13 @@ class _SceneMeta(type): namespace: Dict[str, Any], **kwargs: Any, ) -> _SceneMeta: - state_name = kwargs.pop("state", f"{namespace['__module__']}:{name}") - + state_name = kwargs.pop("state", None) filters: defaultdict[str, List[CallbackType]] = defaultdict(list) handlers: list[HandlerContainer] = [] actions: defaultdict[SceneAction, Dict[str, CallableObject]] = defaultdict(dict) for base in bases: - if not isinstance(base, Scene): + if not issubclass(base, Scene): continue parent_scene_config = base.__scene_config__ @@ -172,14 +181,21 @@ class _SceneMeta(type): for name, value in namespace.items(): if scene_handlers := getattr(value, "__aiogram_handler__", None): handlers.extend(scene_handlers) - elif isinstance(value, ActionContainer): - handlers.append(HandlerContainer(name, value.execute, value.filters, value.action)) - elif hasattr(value, "__aiogram_action__"): + if isinstance(value, ObserverDecorator): + handlers.append( + HandlerContainer( + value.name, + _empty_handler, + value.filters, + after=value.after, + ) + ) + if hasattr(value, "__aiogram_action__"): for action, action_handlers in value.__aiogram_action__.items(): actions[action].update(action_handlers) namespace["__scene_config__"] = SceneConfig( - name=state_name, + state=state_name, filters=dict(filters), handlers=handlers, actions=dict(actions), @@ -190,41 +206,49 @@ class _SceneMeta(type): class SceneHandlerWrapper: def __init__( self, - wizard: Type[Scene], + scene: Type[Scene], handler: CallbackType, - action_container: Optional[ActionContainer] = None, + after: Optional[After] = None, ) -> None: - self.wizard = wizard + self.scene = scene self.handler = CallableObject(handler) - self.action_container = action_container + self.after = after async def __call__( self, event: TelegramObject, state: FSMContext, - wizard: Wizard, + scenes: ScenesManager, event_update: Update, **kwargs: Any, ) -> Any: - scene = self.wizard( - wizard=wizard, - update=event_update, - event=event, - context=state, - data=kwargs, - ) - try: - return await self.handler.call( - scene, - event, + scene = self.scene( + wizard=SceneWizard( + scene_config=self.scene.__scene_config__, + manager=scenes, state=state, - event_update=event_update, - wizard=wizard, - **kwargs, + update_type=event_update.event_type, + event=event, + data=kwargs, ) - finally: - if self.action_container is not None: - await self.action_container.execute(scene) + ) + + result = await self.handler.call( + scene, + event, + state=state, + event_update=event_update, + **kwargs, + ) + if self.after: + action_container = ActionContainer( + "after", + (), + self.after.action, + self.after.scene, + ) + await action_container.execute(scene.wizard) + return result def __await__(self) -> Self: return self @@ -235,23 +259,14 @@ class Scene(metaclass=_SceneMeta): def __init__( self, - wizard: Wizard, - update: Update, - event: TelegramObject, - context: FSMContext, - data: Dict[str, Any], + wizard: SceneWizard, ) -> None: - self.manager = wizard - self.update = update - self.event = event - self.context = context - self.data = data + self.wizard = wizard + self.wizard.scene = self @classmethod - def as_router(cls) -> Router: + def add_to_router(cls, router: Router) -> None: scene_config = cls.__scene_config__ - - router = Router(name=scene_config.name) used_observers = set() for observer, filters in scene_config.filters.items(): @@ -263,84 +278,316 @@ class Scene(metaclass=_SceneMeta): SceneHandlerWrapper( cls, handler.handler, - action_container=handler.action_container, + after=handler.after, ), *handler.filters, ) used_observers.add(handler.name) for observer in used_observers: - router.observers[observer].filter(StateFilter(scene_config.name)) + router.observers[observer].filter(StateFilter(scene_config.state)) + @classmethod + def as_router(cls) -> Router: + router = Router(name=cls.__scene_config__.state) + cls.add_to_router(router) return router @classmethod def as_handler(cls) -> CallbackType: - async def enter_to_scene_handler(event: TelegramObject, wizard: Wizard) -> None: - await wizard.enter(cls) + async def enter_to_scene_handler(event: TelegramObject, scenes: ScenesManager) -> None: + await scenes.enter(cls) return enter_to_scene_handler - async def enter(self, **kwargs: Any) -> None: - loggers.scene.debug("Entering scene %r", self.__scene_config__.name) - state = await self.context.get_state() - await self.context.set_state(self.__scene_config__.name) - try: - if not await self._on_action(SceneAction.enter, **kwargs): - loggers.scene.error( - "Enter action not found in scene %r for event %r", self, type(self.event) - ) - except Exception as e: - await self.context.set_state(state) - raise e + +class SceneWizard: + def __init__( + self, + scene_config: SceneConfig, + manager: ScenesManager, + state: FSMContext, + update_type: str, + event: TelegramObject, + data: Dict[str, Any], + ): + self.scene_config = scene_config + self.manager = manager + self.state = state + self.update_type = update_type + self.event = event + self.data = data + + self.scene: Optional[Scene] = None + + async def enter(self, _with_history: bool = False, **kwargs: Any) -> None: + loggers.scene.debug("Entering scene %r", self.scene_config.state) + await self.state.set_state(self.scene_config.state) + await self._on_action(SceneAction.enter, **kwargs) async def leave(self, **kwargs: Any) -> None: - loggers.scene.debug("Leaving scene %r", self.__scene_config__.name) - state = await self.context.get_state() - await self.context.set_state(None) - try: - await self._on_action(SceneAction.leave, **kwargs) - except Exception as e: - await self.context.set_state(state) - raise e + loggers.scene.debug("Leaving scene %r", self.scene_config.state) + await self.manager.history.snapshot() + await self._on_action(SceneAction.leave, **kwargs) async def exit(self, **kwargs: Any) -> None: - loggers.scene.debug("Exiting scene %r", self.__scene_config__.name) - state = await self.context.get_state() - await self.context.set_state(None) - try: - await self._on_action(SceneAction.exit, **kwargs) - except Exception as e: - await self.context.set_state(state) - raise e + loggers.scene.debug("Exiting scene %r", self.scene_config.state) + await self.state.set_state(None) + await self.manager.history.clear() + await self._on_action(SceneAction.exit, **kwargs) + await self.manager.enter(None, _check_active=False, _with_history=False, **kwargs) async def back(self, **kwargs: Any) -> None: - loggers.scene.debug("Back to previous scene from scene %s", self.__scene_config__.name) - await self.manager.back(**kwargs) + loggers.scene.debug("Back to previous scene from scene %s", self.scene_config.state) + await self.leave() + await self.manager.history.rollback() + new_scene = await self.manager.history.rollback() + await self.manager.enter(new_scene, _check_active=False, _with_history=False, **kwargs) async def replay(self, event: TelegramObject) -> None: await self._on_action(SceneAction.enter, event=event) - async def goto(self, scene: Union[Type[Scene], str]) -> None: - await self.manager.enter(scene) + async def goto(self, scene: Union[Type[Scene], str], **kwargs: Any) -> None: + await self.leave(**kwargs) + await self.manager.enter(scene, _check_active=False, **kwargs) async def _on_action(self, action: SceneAction, **kwargs: Any) -> bool: - loggers.scene.debug("Call action %r in scene %r", action.name, self.__scene_config__.name) - action_config = self.__scene_config__.actions.get(action, {}) + if not self.scene: + raise ValueError("Scene is not initialized") + + loggers.scene.debug("Call action %r in scene %r", action.name, self.scene_config.state) + action_config = self.scene_config.actions.get(action, {}) if not action_config: loggers.scene.debug( - "Action %r not found in scene %r", action.name, self.__scene_config__.name + "Action %r not found in scene %r", action.name, self.scene_config.state ) return False - event_type = self.update.event_type + event_type = self.update_type if event_type not in action_config: loggers.scene.debug( "Action %r for event %r not found in scene %r", action.name, event_type, - self.__scene_config__.name, + self.scene_config.state, ) return False - await action_config[event_type].call(self, self.event, **{**self.data, **kwargs}) + await action_config[event_type].call(self.scene, self.event, **{**self.data, **kwargs}) return True + + async def set_data(self, data: Dict[str, Any]) -> None: + await self.state.set_data(data=data) + + async def get_data(self) -> Dict[str, Any]: + return await self.state.get_data() + + async def update_data( + self, data: Optional[Dict[str, Any]] = None, **kwargs: Any + ) -> Dict[str, Any]: + if data: + kwargs.update(data) + return await self.state.update_data(data=kwargs) + + async def clear_data(self) -> None: + await self.set_data({}) + + +class ScenesManager: + def __init__( + self, + registry: SceneRegistry, + update_type: str, + event: TelegramObject, + state: FSMContext, + data: dict[str, Any], + ) -> None: + self.registry = registry + self.update_type = update_type + self.event = event + self.state = state + self.data = data + + self.history = HistoryManager(self.state) + + async def _get_scene(self, scene_type: Optional[Union[Type[Scene], str]]) -> Scene: + scene_type = self.registry.get(scene_type) + return scene_type( + wizard=SceneWizard( + scene_config=scene_type.__scene_config__, + manager=self, + state=self.state, + update_type=self.update_type, + event=self.event, + data=self.data, + ), + ) + + async def _get_active_scene(self) -> Optional[Scene]: + state = await self.state.get_state() + return await self._get_scene(state) + + async def enter( + self, + scene_type: Optional[Union[Type[Scene], str]], + _check_active: bool = True, + _with_history: bool = True, + **kwargs: Any, + ) -> None: + scene = await self._get_scene(scene_type) + if _check_active: + active_scene = await self._get_active_scene() + if active_scene is not None: + await active_scene.wizard.exit(**kwargs) + + await scene.wizard.enter(_with_history=_with_history, **kwargs) + + async def close(self, **kwargs: Any) -> None: + try: + scene = await self._get_active_scene() + except ValueError: + return + if not scene: + return + await scene.wizard.exit(**kwargs) + + +class SceneRegistry: + def __init__(self, router: Router) -> None: + self.router = router + + for observer in router.observers.values(): + if observer.event_name in {"update", "error"}: + continue + observer.outer_middleware(self._middleware) + + self._scenes: Dict[Optional[str], Type[Scene]] = {} + + async def _middleware( + self, + handler: NextMiddlewareType[TelegramObject], + event: TelegramObject, + data: Dict[str, Any], + ) -> Any: + update: Update = data["event_update"] + data["scenes"] = ScenesManager( + registry=self, + update_type=update.event_type, + event=event, + state=data["state"], + data=data, + ) + return await handler(event, data) + + def add(self, *scenes: Type[Scene], router: Optional[Router] = None) -> None: + if router is None: + router = self.router + + for scene in scenes: + if scene.__scene_config__.state in self._scenes: + raise ValueError(f"Scene {scene.__scene_config__.state} already exists") + + self._scenes[scene.__scene_config__.state] = scene + + router.include_router(scene.as_router()) + + def get(self, scene: Optional[Union[Type[Scene], str]]) -> Type[Scene]: + if inspect.isclass(scene) and issubclass(scene, Scene): + scene = scene.__scene_config__.state + if scene is not None and not isinstance(scene, str): + raise TypeError("Scene must be a subclass of Scene or a string") + + try: + result = self._scenes[scene] + except KeyError: + raise ValueError(f"Scene {scene!r} is not registered") + + return result + + +@dataclass +class After: + action: SceneAction + scene: Optional[Union[Type[Scene], str]] = None + + @classmethod + def exit(cls) -> After: + return cls(action=SceneAction.exit) + + @classmethod + def back(cls) -> After: + return cls(action=SceneAction.back) + + @classmethod + def goto(cls, scene: Optional[Union[Type[Scene], str]]) -> After: + return cls(action=SceneAction.enter, scene=scene) + + +class ObserverMarker: + def __init__(self, name: str) -> None: + self.name = name + + def __call__( + self, + *filters: CallbackType, + after: Optional[After] = None, + ) -> ObserverDecorator: + return ObserverDecorator( + self.name, + filters, + after=after, + ) + + def enter(self, *filters: CallbackType) -> ObserverDecorator: + return ObserverDecorator(self.name, filters, action=SceneAction.enter) + + def leave(self) -> ObserverDecorator: + return ObserverDecorator(self.name, (), action=SceneAction.leave) + + def exit(self) -> ObserverDecorator: + return ObserverDecorator(self.name, (), action=SceneAction.exit) + + def back(self) -> ObserverDecorator: + return ObserverDecorator(self.name, (), action=SceneAction.back) + + +class OnMarker: + """ + The `_On` class is used as a marker class to define different types of events in the Scenes. + + Attributes: + + - :code:`message`: Event marker for handling `Message` events. + - :code:`edited_message`: Event marker for handling edited `Message` events. + - :code:`channel_post`: Event marker for handling channel `Post` events. + - :code:`edited_channel_post`: Event marker for handling edited channel `Post` events. + - :code:`inline_query`: Event marker for handling `InlineQuery` events. + - :code:`chosen_inline_result`: Event marker for handling chosen `InlineResult` events. + - :code:`callback_query`: Event marker for handling `CallbackQuery` events. + - :code:`shipping_query`: Event marker for handling `ShippingQuery` events. + - :code:`pre_checkout_query`: Event marker for handling `PreCheckoutQuery` events. + - :code:`poll`: Event marker for handling `Poll` events. + - :code:`poll_answer`: Event marker for handling `PollAnswer` events. + - :code:`my_chat_member`: Event marker for handling my chat `Member` events. + - :code:`chat_member`: Event marker for handling chat `Member` events. + - :code:`chat_join_request`: Event marker for handling chat `JoinRequest` events. + - :code:`error`: Event marker for handling `Error` events. + + .. note:: + + This is a marker class and does not contain any methods or implementation logic. + """ + + message = ObserverMarker("message") + edited_message = ObserverMarker("edited_message") + channel_post = ObserverMarker("channel_post") + edited_channel_post = ObserverMarker("edited_channel_post") + inline_query = ObserverMarker("inline_query") + chosen_inline_result = ObserverMarker("chosen_inline_result") + callback_query = ObserverMarker("callback_query") + shipping_query = ObserverMarker("shipping_query") + pre_checkout_query = ObserverMarker("pre_checkout_query") + poll = ObserverMarker("poll") + poll_answer = ObserverMarker("poll_answer") + my_chat_member = ObserverMarker("my_chat_member") + chat_member = ObserverMarker("chat_member") + chat_join_request = ObserverMarker("chat_join_request") diff --git a/aiogram/scenes/_wizard.py b/aiogram/scenes/_wizard.py deleted file mode 100644 index 857317b1..00000000 --- a/aiogram/scenes/_wizard.py +++ /dev/null @@ -1,79 +0,0 @@ -from __future__ import annotations - -from typing import TYPE_CHECKING, Any, Union, Type, Optional - -from aiogram.fsm.context import FSMContext -from aiogram.types import TelegramObject, Update -from ._history import HistoryManager - -if TYPE_CHECKING: - from ._registry import SceneRegistry - from ._scene import Scene - - -class Wizard: - def __init__( - self, - registry: SceneRegistry, - update: Update, - event: TelegramObject, - context: FSMContext, - data: dict[str, Any], - ) -> None: - self.registry = registry - self.update = update - self.event = event - self.context = context - self.data = data - - self._history = HistoryManager(self.context) - - async def _get_scene(self, scene_type: Union[Scene, str]) -> Scene: - scene_type = self.registry.get(scene_type) - return scene_type( - wizard=self, - update=self.update, - event=self.event, - context=self.context, - data=self.data, - ) - - async def _get_active_scene(self) -> Optional[Scene, None]: - state = await self.context.get_state() - if state is None: - return None - return await self._get_scene(state) - - async def enter(self, scene_type: Union[Type[Scene], str], **kwargs: Any) -> None: - active_scene = await self._get_active_scene() - if active_scene is not None: - await active_scene.leave(**kwargs) - scene = await self._get_scene(scene_type) - await scene.enter(**kwargs) - await self._history.snapshot() - - async def leave(self, **kwargs: Any) -> None: - try: - scene = await self._get_active_scene() - except ValueError: - return - if not scene: - return - await scene.leave(**kwargs) - - async def exit(self, **kwargs: Any) -> None: - try: - scene = await self._get_active_scene() - except ValueError: - return - if not scene: - return - await scene.exit(**kwargs) - await self._history.clear() - - async def back(self, **kwargs: Any) -> None: - previous_state = await self._history.rollback() - if previous_state is not None: - await self.enter(previous_state, **kwargs) - else: - await self.exit(**kwargs)