diff --git a/tests/test_fsm/test_scene.py b/tests/test_fsm/test_scene.py index 9cb936d1..3336ddf1 100644 --- a/tests/test_fsm/test_scene.py +++ b/tests/test_fsm/test_scene.py @@ -12,6 +12,7 @@ from aiogram.fsm.context import FSMContext from aiogram.fsm.scene import ( ActionContainer, After, + HistoryManager, ObserverDecorator, ObserverMarker, Scene, @@ -334,6 +335,376 @@ class TestSceneHandlerWrapper: assert scene_handler_wrapper.__await__() is scene_handler_wrapper +class TestSceneWizard: + async def test_scene_wizard_enter_with_reset_data_on_enter(self): + class MyScene(Scene, reset_data_on_enter=True): + pass + + scene_config_mock = AsyncMock() + scene_config_mock.state = "test_state" + + state_mock = AsyncMock(spec=FSMContext) + state_mock.set_state = AsyncMock() + + wizard = SceneWizard( + scene_config=MyScene.__scene_config__, + manager=AsyncMock(spec=ScenesManager), + state=state_mock, + update_type="message", + event=AsyncMock(), + data={}, + ) + kwargs = {"test_kwargs": "test_kwargs"} + wizard._on_action = AsyncMock() + + await wizard.enter(**kwargs) + + state_mock.set_data.assert_called_once_with({}) + state_mock.set_state.assert_called_once_with(MyScene.__scene_config__.state) + wizard._on_action.assert_called_once_with(SceneAction.enter, **kwargs) + + async def test_scene_wizard_enter_with_reset_history_on_enter(self): + class MyScene(Scene, reset_history_on_enter=True): + pass + + state_mock = AsyncMock(spec=FSMContext) + state_mock.set_state = AsyncMock() + + manager = AsyncMock(spec=ScenesManager) + manager.history = AsyncMock(spec=HistoryManager) + manager.history.clear = AsyncMock() + + wizard = SceneWizard( + scene_config=MyScene.__scene_config__, + manager=manager, + state=state_mock, + update_type="message", + event=AsyncMock(), + data={}, + ) + kwargs = {"test_kwargs": "test_kwargs"} + wizard._on_action = AsyncMock() + + await wizard.enter(**kwargs) + + manager.history.clear.assert_called_once() + state_mock.set_state.assert_called_once_with(MyScene.__scene_config__.state) + wizard._on_action.assert_called_once_with(SceneAction.enter, **kwargs) + + async def test_scene_wizard_leave_with_history(self): + scene_config_mock = AsyncMock() + scene_config_mock.state = "test_state" + + manager = AsyncMock(spec=ScenesManager) + manager.history = AsyncMock(spec=HistoryManager) + manager.history.snapshot = AsyncMock() + + wizard = SceneWizard( + scene_config=scene_config_mock, + manager=manager, + state=AsyncMock(spec=FSMContext), + update_type="message", + event=AsyncMock(), + data={}, + ) + wizard._on_action = AsyncMock() + + kwargs = {"test_kwargs": "test_kwargs"} + await wizard.leave(_with_history=False, **kwargs) + + manager.history.snapshot.assert_not_called() + wizard._on_action.assert_called_once_with(SceneAction.leave, **kwargs) + + async def test_scene_wizard_leave_without_history(self): + scene_config_mock = AsyncMock() + scene_config_mock.state = "test_state" + + manager = AsyncMock(spec=ScenesManager) + manager.history = AsyncMock(spec=HistoryManager) + manager.history.snapshot = AsyncMock() + + wizard = SceneWizard( + scene_config=scene_config_mock, + manager=manager, + state=AsyncMock(spec=FSMContext), + update_type="message", + event=AsyncMock(), + data={}, + ) + wizard._on_action = AsyncMock() + + kwargs = {"test_kwargs": "test_kwargs"} + await wizard.leave(**kwargs) + + manager.history.snapshot.assert_called_once() + wizard._on_action.assert_called_once_with(SceneAction.leave, **kwargs) + + async def test_scene_wizard_back(self): + current_scene_config_mock = AsyncMock() + current_scene_config_mock.state = "test_state" + + previous_scene_config_mock = AsyncMock() + previous_scene_config_mock.state = "previous_test_state" + + previous_scene_mock = AsyncMock() + previous_scene_mock.__scene_config__ = previous_scene_config_mock + + manager = AsyncMock(spec=ScenesManager) + manager.history = AsyncMock(spec=HistoryManager) + manager.history.rollback = AsyncMock(return_value=previous_scene_mock) + manager.enter = AsyncMock() + + wizard = SceneWizard( + scene_config=current_scene_config_mock, + manager=manager, + state=AsyncMock(spec=FSMContext), + update_type="message", + event=AsyncMock(), + data={}, + ) + wizard.leave = AsyncMock() + + kwargs = {"test_kwargs": "test_kwargs"} + await wizard.back(**kwargs) + + wizard.leave.assert_called_once_with(_with_history=False, **kwargs) + manager.history.rollback.assert_called_once() + manager.enter.assert_called_once_with(previous_scene_mock, _check_active=False, **kwargs) + + async def test_scene_wizard_retake(self): + scene_config_mock = AsyncMock() + scene_config_mock.state = "test_state" + + wizard = SceneWizard( + scene_config=scene_config_mock, + manager=AsyncMock(spec=ScenesManager), + state=AsyncMock(spec=FSMContext), + update_type="message", + event=AsyncMock(), + data={}, + ) + wizard.goto = AsyncMock() + + kwargs = {"test_kwargs": "test_kwargs"} + await wizard.retake(**kwargs) + + wizard.goto.assert_called_once_with(scene_config_mock.state, **kwargs) + + async def test_scene_wizard_retake_exception(self): + scene_config_mock = AsyncMock() + scene_config_mock.state = None + + wizard = SceneWizard( + scene_config=scene_config_mock, + manager=AsyncMock(spec=ScenesManager), + state=AsyncMock(spec=FSMContext), + update_type="message", + event=AsyncMock(), + data={}, + ) + + kwargs = {"test_kwargs": "test_kwargs"} + + with pytest.raises(AssertionError, match="Scene state is not specified"): + await wizard.retake(**kwargs) + + async def test_scene_wizard_goto(self): + scene_config_mock = AsyncMock() + scene_config_mock.state = "test_state" + + scene_mock = AsyncMock() + scene_mock.__scene_config__ = scene_config_mock + + wizard = SceneWizard( + scene_config=scene_config_mock, + manager=AsyncMock(spec=ScenesManager), + state=AsyncMock(spec=FSMContext), + update_type="message", + event=AsyncMock(), + data={}, + ) + wizard.leave = AsyncMock() + wizard.manager.enter = AsyncMock() + + kwargs = {"test_kwargs": "test_kwargs"} + await wizard.goto(scene_mock, **kwargs) + + wizard.leave.assert_called_once_with(**kwargs) + wizard.manager.enter.assert_called_once_with(scene_mock, _check_active=False, **kwargs) + + async def test_scene_wizard_on_action(self): + scene_config_mock = AsyncMock() + scene_config_mock.actions = {SceneAction.enter: {"message": AsyncMock()}} + scene_config_mock.state = "test_state" + + scene_mock = AsyncMock() + + event_mock = AsyncMock() + event_mock.type = "message" + + data = {"test_data": "test_data"} + wizard = SceneWizard( + scene_config=scene_config_mock, + manager=AsyncMock(), + state=AsyncMock(), + update_type="message", + event=event_mock, + data=data, + ) + wizard.scene = scene_mock + + kwargs = {"test_kwargs": "test_kwargs"} + result = await wizard._on_action(SceneAction.enter, **kwargs) + + scene_config_mock.actions[SceneAction.enter]["message"].call.assert_called_once_with( + scene_mock, event_mock, **{**data, **kwargs} + ) + assert result is True + + async def test_scene_wizard_on_action_no_scene(self): + wizard = SceneWizard( + scene_config=AsyncMock(), + manager=AsyncMock(), + state=AsyncMock(), + update_type="message", + event=AsyncMock(), + data={}, + ) + + with pytest.raises(SceneException, match="Scene is not initialized"): + await wizard._on_action(SceneAction.enter) + + async def test_scene_wizard_on_action_no_action_config(self): + scene_config_mock = AsyncMock() + scene_config_mock.actions = {} + scene_config_mock.state = "test_state" + + scene_mock = AsyncMock() + + event_mock = AsyncMock() + event_mock.type = "message" + + wizard = SceneWizard( + scene_config=scene_config_mock, + manager=AsyncMock(), + state=AsyncMock(), + update_type="message", + event=event_mock, + data={}, + ) + wizard.scene = scene_mock + + kwargs = {"test_kwargs": "test_kwargs"} + result = await wizard._on_action(SceneAction.enter, **kwargs) + + assert result is False + + async def test_scene_wizard_on_action_event_type_not_in_action_config(self): + scene_config_mock = AsyncMock() + scene_config_mock.actions = {SceneAction.enter: {"test_update_type": AsyncMock()}} + scene_config_mock.state = "test_state" + + event_mock = AsyncMock() + event_mock.type = "message" + + wizard = SceneWizard( + scene_config=scene_config_mock, + manager=AsyncMock(), + state=AsyncMock(), + update_type="message", + event=event_mock, + data={}, + ) + wizard.scene = AsyncMock() + + kwargs = {"test_kwargs": "test_kwargs"} + result = await wizard._on_action(SceneAction.enter, **kwargs) + + assert result is False + + async def test_scene_wizard_set_data(self): + wizard = SceneWizard( + scene_config=AsyncMock(), + manager=AsyncMock(), + state=AsyncMock(), + update_type="message", + event=AsyncMock(), + data={}, + ) + wizard.state.set_data = AsyncMock() + + data = {"test_key": "test_value"} + await wizard.set_data(data) + + wizard.state.set_data.assert_called_once_with(data=data) + + async def test_scene_wizard_get_data(self): + wizard = SceneWizard( + scene_config=AsyncMock(), + manager=AsyncMock(), + state=AsyncMock(), + update_type="message", + event=AsyncMock(), + data={}, + ) + wizard.state.get_data = AsyncMock() + + await wizard.get_data() + + wizard.state.get_data.assert_called_once_with() + + async def test_scene_wizard_update_data_if_data(self): + wizard = SceneWizard( + scene_config=AsyncMock(), + manager=AsyncMock(), + state=AsyncMock(), + update_type="message", + event=AsyncMock(), + data={}, + ) + data = {"test_key": "test_value"} + kwargs = {"test_kwargs": "test_kwargs"} + + wizard.state.update_data = AsyncMock(return_value={**data, **kwargs}) + result = await wizard.update_data(data=data, **kwargs) + + wizard.state.update_data.assert_called_once_with(data={**data, **kwargs}) + assert result == {**data, **kwargs} + + async def test_scene_wizard_update_data_if_no_data(self): + wizard = SceneWizard( + scene_config=AsyncMock(), + manager=AsyncMock(), + state=AsyncMock(), + update_type="message", + event=AsyncMock(), + data={}, + ) + data = None + kwargs = {"test_kwargs": "test_kwargs"} + + wizard.state.update_data = AsyncMock(return_value={**kwargs}) + result = await wizard.update_data(data=data, **kwargs) + + wizard.state.update_data.assert_called_once_with(data=kwargs) + assert result == {**kwargs} + + async def test_scene_wizard_clear_data(self): + wizard = SceneWizard( + scene_config=AsyncMock(), + manager=AsyncMock(), + state=AsyncMock(), + update_type="message", + event=AsyncMock(), + data={}, + ) + wizard.set_data = AsyncMock() + + await wizard.clear_data() + + wizard.set_data.assert_called_once_with({}) + + class TestScenesManager: async def test_scenes_manager_get_scene(self, bot: MockedBot): class MyScene(Scene):