Small refactoring + added possibility to specify post-action on handlers

This commit is contained in:
Alex Root Junior 2023-08-26 00:13:50 +03:00
parent f27f3eea42
commit e4caa419ca
No known key found for this signature in database
GPG key ID: 074C1D455EBEA4AC
5 changed files with 206 additions and 185 deletions

View file

@ -3,11 +3,11 @@ from __future__ import annotations
__all__ = [
"Scene",
"SceneRegistry",
"SceneManager",
"Wizard",
"on",
]
from ._manager import SceneManager
from ._wizard import Wizard
from ._marker import OnMarker
from ._registry import SceneRegistry
from ._scene import Scene

View file

@ -1,17 +1,9 @@
from __future__ import annotations
import inspect
from collections import defaultdict
from enum import Enum, auto
from typing import TYPE_CHECKING, Dict, Tuple, Type, Union, Optional
from typing import Union, Optional
from typing_extensions import Self
from aiogram.dispatcher.event.handler import CallableObject, CallbackType
from aiogram.types import TelegramObject
if TYPE_CHECKING:
from ._scene import Scene
from aiogram.dispatcher.event.handler import CallbackType
from ._scene import Scene, ObserverDecorator, SceneAction
class ObserverMarker:
@ -21,10 +13,32 @@ class ObserverMarker:
def __call__(
self,
*filters: CallbackType,
goto: Optional[Union[Scene, str]] = None,
exit_: bool = False,
leave: bool = False,
back: bool = False,
) -> ObserverDecorator:
# Check that only one action is specified
if sum((goto is not None, exit_, leave, back)) > 1:
raise ValueError("Only one action is allowed")
scene = None
action_after = None
if goto is not None:
action_after = SceneAction.enter
scene = goto
elif exit_:
action_after = SceneAction.exit
elif leave:
action_after = SceneAction.leave
elif back:
action_after = SceneAction.back
return ObserverDecorator(
self.name,
filters,
action_after=action_after,
scene=scene,
)
def enter(self, *filters: CallbackType) -> ObserverDecorator:
@ -40,118 +54,6 @@ class ObserverMarker:
return ObserverDecorator(self.name, (), action=SceneAction.back)
class ObserverDecorator:
def __init__(
self,
name: str,
filters: tuple[CallbackType, ...],
action: SceneAction | None = None,
) -> None:
self.name = name
self.filters = filters
self.action = action
def _wrap_class(self, target: Type[Scene]) -> None:
from ._scene import Scene
if not issubclass(target, Scene):
raise TypeError("Only subclass of Scene is allowed")
if self.action is not None:
raise TypeError("This action is not allowed for class")
filters = getattr(target, "__aiogram_filters__", None)
if filters is None:
filters = defaultdict(list)
setattr(target, "__aiogram_filters__", filters)
filters[self.name].extend(self.filters)
def _wrap_filter(self, target: Type[Scene] | CallbackType) -> None:
setattr(target, "__aiogram_handler__", True)
filters = getattr(target, "__aiogram_filters__", None)
if filters is None:
filters = defaultdict(list)
setattr(target, "__aiogram_filters__", filters)
filters[self.name].extend(self.filters)
def _wrap_action(self, target: Type[Scene] | CallbackType) -> None:
action = getattr(target, "__aiogram_action__", None)
if action is None:
action = defaultdict(dict)
setattr(target, "__aiogram_action__", action)
action[self.action][self.name] = CallableObject(target)
def __call__(self, target: Type[Scene] | CallbackType) -> Type[Scene] | CallbackType:
if inspect.isclass(target):
self._wrap_class(target)
elif inspect.isfunction(target):
if self.action is None:
self._wrap_filter(target)
else:
self._wrap_action(target)
return target
def leave(self) -> ActionContainer:
return ActionContainer(self.name, self.filters, SceneAction.leave)
def enter(self, target: Type[Scene]) -> ActionContainer:
return ActionContainer(self.name, self.filters, SceneAction.enter, target)
def exit(self) -> ActionContainer:
return ActionContainer(self.name, self.filters, SceneAction.exit)
def back(self) -> ActionContainer:
return ActionContainer(self.name, self.filters, SceneAction.back)
class SceneAction(Enum):
enter = auto()
leave = auto()
exit = auto()
back = auto()
class ActionContainer:
def __init__(
self,
name: str,
filters: Tuple[CallbackType, ...],
action: SceneAction,
target: Type[Scene] | None = None,
) -> None:
self.name = name
self.filters = filters
self.action = action
self.target = target
async def __call__(
self,
scene: Scene,
event: TelegramObject,
) -> None:
if self.action == SceneAction.enter and self.target is not None:
await scene.goto(self.target)
elif self.action == SceneAction.leave:
await scene.leave()
elif self.action == SceneAction.exit:
await scene.exit()
elif self.action == SceneAction.back:
await scene.back()
@property
def __aiogram_filters__(self) -> Dict[str, Tuple[CallbackType, ...]]:
return {self.name: self.filters}
def __await__(self) -> Self:
return self
class ControlActionContainer:
def __init__(self, name: str, action: SceneAction) -> None:
self.name = name
self.action = action
class OnMarker:
"""
The `_On` class is used as a marker class to define different types of events in the Scenes.

View file

@ -3,11 +3,11 @@ from __future__ import annotations
import inspect
from typing import Any, Dict, Optional, Type, Union
from aiogram import Dispatcher, Router
from aiogram import Router
from ..dispatcher.event.bases import NextMiddlewareType
from ..types import TelegramObject
from ._manager import SceneManager
from ._wizard import Wizard
from ._scene import Scene
@ -22,28 +22,13 @@ class SceneRegistry:
self._scenes: Dict[str, Type[Scene]] = {}
async def _error_middleware(
self,
handler: NextMiddlewareType[TelegramObject],
event: TelegramObject,
data: Dict[str, Any],
) -> Any:
data["scenes"] = SceneManager(
registry=self,
update=event.update,
event=event,
context=data["state"],
data=data,
)
return await handler(event, data)
async def _middleware(
self,
handler: NextMiddlewareType[TelegramObject],
event: TelegramObject,
data: Dict[str, Any],
) -> Any:
data["scenes"] = SceneManager(
data["wizard"] = Wizard(
registry=self,
update=data["event_update"],
event=event,

View file

@ -1,7 +1,9 @@
from __future__ import annotations
import inspect
from collections import defaultdict
from typing import TYPE_CHECKING, Any, ClassVar, Dict, List, Tuple, Type, Union
from enum import Enum, auto
from typing import TYPE_CHECKING, Any, ClassVar, Dict, List, Tuple, Type, Union, Optional
from typing_extensions import Self
@ -10,10 +12,128 @@ from aiogram.dispatcher.event.handler import CallableObject, CallbackType
from aiogram.filters import StateFilter
from aiogram.fsm.context import FSMContext
from aiogram.types import TelegramObject, Update
from ._marker import ActionContainer, SceneAction
if TYPE_CHECKING:
from ._manager import SceneManager
from ._wizard import Wizard
class ObserverDecorator:
def __init__(
self,
name: str,
filters: tuple[CallbackType, ...],
action: SceneAction | None = None,
action_after: SceneAction | None = None,
scene: Union[Scene, str] | None = None,
) -> None:
self.name = name
self.filters = filters
self.action = action
self.action_after = action_after
self.scene = scene
def _wrap_class(self, target: Type[Scene]) -> None:
if not issubclass(target, Scene):
raise TypeError("Only subclass of Scene is allowed")
if self.action is not None:
raise TypeError("This action is not allowed for class")
filters = getattr(target, "__aiogram_filters__", None)
if filters is None:
filters = defaultdict(list)
setattr(target, "__aiogram_filters__", filters)
filters[self.name].extend(self.filters)
def _wrap_filter(self, target: Type[Scene] | CallbackType) -> None:
handlers = getattr(target, "__aiogram_handler__", None)
if not handlers:
handlers = []
setattr(target, "__aiogram_handler__", handlers)
handlers.append(
HandlerContainer(
self.name,
target,
self.filters,
self.action_after,
self.scene,
)
)
def _wrap_action(self, target: Type[Scene] | CallbackType) -> None:
action = getattr(target, "__aiogram_action__", None)
if action is None:
action = defaultdict(dict)
setattr(target, "__aiogram_action__", action)
action[self.action][self.name] = CallableObject(target)
def __call__(self, target: Type[Scene] | CallbackType) -> Type[Scene] | CallbackType:
if inspect.isclass(target):
self._wrap_class(target)
elif inspect.isfunction(target):
if self.action is None:
self._wrap_filter(target)
else:
self._wrap_action(target)
return target
def leave(self) -> ActionContainer:
return ActionContainer(self.name, self.filters, SceneAction.leave)
def enter(self, target: Type[Scene]) -> ActionContainer:
return ActionContainer(self.name, self.filters, SceneAction.enter, target)
def exit(self) -> ActionContainer:
return ActionContainer(self.name, self.filters, SceneAction.exit)
def back(self) -> ActionContainer:
return ActionContainer(self.name, self.filters, SceneAction.back)
class SceneAction(Enum):
enter = auto()
leave = auto()
exit = auto()
back = auto()
class ActionContainer:
def __init__(
self,
name: str,
filters: Tuple[CallbackType, ...],
action: SceneAction,
target: Type[Scene] | None = None,
) -> None:
self.name = name
self.filters = filters
self.action = action
self.target = target
async def execute(self, scene: Scene) -> None:
if self.action == SceneAction.enter and self.target is not None:
await scene.goto(self.target)
elif self.action == SceneAction.leave:
await scene.leave()
elif self.action == SceneAction.exit:
await scene.exit()
elif self.action == SceneAction.back:
await scene.back()
class HandlerContainer:
def __init__(
self,
name: str,
handler: CallbackType,
filters: Tuple[CallbackType, ...],
action: Optional[SceneAction] = None,
scene: Optional[Union[Scene, str]] = None,
) -> None:
self.name = name
self.handler = handler
self.filters = filters
self.action_container = ActionContainer(name, filters, action, scene)
class _SceneMeta(type):
@ -27,7 +147,7 @@ class _SceneMeta(type):
state_name = kwargs.pop("state", f"{namespace['__module__']}:{name}")
aiogram_filters: defaultdict[str, List[CallbackType]] = defaultdict(list)
aiogram_handlers: list[CallbackType] = []
aiogram_handlers: list[HandlerContainer] = []
aiogram_actions: defaultdict[SceneAction, Dict[str, CallableObject]] = defaultdict(dict)
for base in bases:
@ -40,10 +160,12 @@ class _SceneMeta(type):
aiogram_actions[action].update(handlers)
for name, value in namespace.items():
if hasattr(value, "__aiogram_handler__"):
aiogram_handlers.append(value)
if handlers := getattr(value, "__aiogram_handler__", None):
aiogram_handlers.extend(handlers)
elif isinstance(value, ActionContainer):
aiogram_handlers.append(value)
aiogram_handlers.append(
HandlerContainer(name, value.execute, value.filters, value.action)
)
elif hasattr(value, "__aiogram_action__"):
for action, handlers in value.__aiogram_action__.items():
aiogram_actions[action].update(handlers)
@ -60,33 +182,43 @@ class _SceneMeta(type):
class SceneHandlerWrapper:
def __init__(self, scene: Type[Scene], handler: CallbackType) -> None:
self.scene = scene
def __init__(
self,
wizard: Type[Scene],
handler: CallbackType,
action_container: Optional[ActionContainer] = None,
) -> None:
self.wizard = wizard
self.handler = CallableObject(handler)
self.action_container = action_container
async def __call__(
self,
event: TelegramObject,
state: FSMContext,
scenes: SceneManager,
wizard: Wizard,
event_update: Update,
**kwargs: Any,
) -> Any:
scene = self.scene(
manager=scenes,
scene = self.wizard(
wizard=wizard,
update=event_update,
event=event,
context=state,
data=kwargs,
)
return await self.handler.call(
scene,
event,
state=state,
event_update=event_update,
scenes=scenes,
**kwargs,
)
try:
return await self.handler.call(
scene,
event,
state=state,
event_update=event_update,
wizard=wizard,
**kwargs,
)
finally:
if self.action_container is not None:
await self.action_container.execute(scene)
def __await__(self) -> Self:
return self
@ -95,18 +227,18 @@ class SceneHandlerWrapper:
class Scene(metaclass=_SceneMeta):
__aiogram_scene_name__: ClassVar[str]
__aiogram_filters__: ClassVar[Dict[str, List[CallbackType]]]
__aiogram_handlers__: ClassVar[List[CallbackType]]
__aiogram_handlers__: ClassVar[List[HandlerContainer]]
__aiogram_actions__: ClassVar[Dict[SceneAction, Dict[str, CallableObject]]]
def __init__(
self,
manager: SceneManager,
wizard: Wizard,
update: Update,
event: TelegramObject,
context: FSMContext,
data: Dict[str, Any],
) -> None:
self.manager = manager
self.manager = wizard
self.update = update
self.event = event
self.context = context
@ -122,12 +254,15 @@ class Scene(metaclass=_SceneMeta):
used_observers.add(observer)
for handler in cls.__aiogram_handlers__:
handler_filters = getattr(handler, "__aiogram_filters__", None)
if not handler_filters:
continue
for observer, filters in handler_filters.items():
router.observers[observer].register(SceneHandlerWrapper(cls, handler), *filters)
used_observers.add(observer)
router.observers[handler.name].register(
SceneHandlerWrapper(
cls,
handler.handler,
action_container=handler.action_container,
),
*handler.filters,
)
used_observers.add(handler.name)
for observer in used_observers:
router.observers[observer].filter(StateFilter(cls.__aiogram_scene_name__))
@ -136,26 +271,26 @@ class Scene(metaclass=_SceneMeta):
@classmethod
def as_handler(cls) -> CallbackType:
async def enter_to_scene_handler(event: TelegramObject, scenes: SceneManager) -> None:
await scenes.enter(cls)
async def enter_to_scene_handler(event: TelegramObject, wizard: Wizard) -> None:
await wizard.enter(cls)
return enter_to_scene_handler
async def enter(self, **kwargs: Any) -> None:
loggers.scene.debug("Entering scene %s", self.__aiogram_scene_name__)
loggers.scene.debug("Entering scene %r", self.__aiogram_scene_name__)
state = await self.context.get_state()
await self.context.set_state(self.__aiogram_scene_name__)
try:
if not await self._on_action(SceneAction.enter, **kwargs):
loggers.scene.error(
"Enter action not found in scene %s for event %r", self, type(self.event)
"Enter action not found in scene %r for event %r", self, type(self.event)
)
except Exception as e:
await self.context.set_state(state)
raise e
async def leave(self, **kwargs: Any) -> None:
loggers.scene.debug("Leaving scene %s", self.__aiogram_scene_name__)
loggers.scene.debug("Leaving scene %r", self.__aiogram_scene_name__)
state = await self.context.get_state()
await self.context.set_state(None)
try:
@ -165,7 +300,7 @@ class Scene(metaclass=_SceneMeta):
raise e
async def exit(self, **kwargs: Any) -> None:
loggers.scene.debug("Exiting scene %s", self.__aiogram_scene_name__)
loggers.scene.debug("Exiting scene %r", self.__aiogram_scene_name__)
state = await self.context.get_state()
await self.context.set_state(None)
try:
@ -202,6 +337,5 @@ class Scene(metaclass=_SceneMeta):
self.__aiogram_scene_name__,
)
return False
await action_config[event_type].call(self, self.event, **{**self.data, **kwargs})
return True

View file

@ -1,6 +1,6 @@
from __future__ import annotations
from typing import TYPE_CHECKING, Any
from typing import TYPE_CHECKING, Any, Union, Type, Optional
from aiogram.fsm.context import FSMContext
from aiogram.types import TelegramObject, Update
@ -11,7 +11,7 @@ if TYPE_CHECKING:
from ._scene import Scene
class SceneManager:
class Wizard:
def __init__(
self,
registry: SceneRegistry,
@ -28,23 +28,23 @@ class SceneManager:
self._history = HistoryManager(self.context)
async def _get_scene(self, scene_type: type[Scene] | str) -> Scene:
async def _get_scene(self, scene_type: Union[Scene, str]) -> Scene:
scene_type = self.registry.get(scene_type)
return scene_type(
manager=self,
wizard=self,
update=self.update,
event=self.event,
context=self.context,
data=self.data,
)
async def _get_active_scene(self) -> Scene | None:
async def _get_active_scene(self) -> Optional[Scene, None]:
state = await self.context.get_state()
if state is None:
return None
return await self._get_scene(state)
async def enter(self, scene_type: type[Scene] | str, **kwargs: Any) -> None:
async def enter(self, scene_type: Union[Type[Scene], str], **kwargs: Any) -> None:
active_scene = await self._get_active_scene()
if active_scene is not None:
await active_scene.leave(**kwargs)