diff --git a/tests/test_fsm/test_scene.py b/tests/test_fsm/test_scene.py index 0dd42b21..1313301f 100644 --- a/tests/test_fsm/test_scene.py +++ b/tests/test_fsm/test_scene.py @@ -29,7 +29,7 @@ from aiogram.fsm.scene import ( ) from aiogram.fsm.state import State, StatesGroup from aiogram.fsm.storage.base import StorageKey -from aiogram.fsm.storage.memory import MemoryStorage +from aiogram.fsm.storage.memory import MemoryStorage, MemoryStorageRecord from aiogram.types import Chat, Message, TelegramObject, Update from tests.mocked_bot import MockedBot @@ -338,6 +338,211 @@ class TestSceneHandlerWrapper: assert scene_handler_wrapper.__await__() is scene_handler_wrapper +class TestHistoryManager: + async def test_history_manager_push(self): + state = FSMContext( + storage=MemoryStorage(), key=StorageKey(bot_id=42, chat_id=-42, user_id=42) + ) + history_manager = HistoryManager(state=state) + + data = {"test_data": "test_data"} + await history_manager.push("test_state", data) + + history_data = await history_manager._history_state.get_data() + assert history_data.get("history") == [{"state": "test_state", "data": data}] + + async def test_history_manager_push_if_history_overflow(self): + state = FSMContext( + storage=MemoryStorage(), key=StorageKey(bot_id=42, chat_id=-42, user_id=42) + ) + history_manager = HistoryManager(state=state, size=2) + + states = ["test_state", "test_state2", "test_state3", "test_state4"] + data = {"test_data": "test_data"} + for state in states: + await history_manager.push(state, data) + + history_data = await history_manager._history_state.get_data() + assert history_data.get("history") == [ + {"state": "test_state3", "data": data}, + {"state": "test_state4", "data": data}, + ] + + async def test_history_manager_pop(self): + state = FSMContext( + storage=MemoryStorage(), key=StorageKey(bot_id=42, chat_id=-42, user_id=42) + ) + history_manager = HistoryManager(state=state) + + data = {"test_data": "test_data"} + await history_manager.push("test_state", data) + await history_manager.push("test_state2", data) + + record = await history_manager.pop() + history_data = await history_manager._history_state.get_data() + + assert isinstance(record, MemoryStorageRecord) + assert record == MemoryStorageRecord(state="test_state2", data=data) + assert history_data.get("history") == [{"state": "test_state", "data": data}] + + async def test_history_manager_pop_if_history_empty(self): + state = FSMContext( + storage=MemoryStorage(), key=StorageKey(bot_id=42, chat_id=-42, user_id=42) + ) + history_manager = HistoryManager(state=state) + + record = await history_manager.pop() + assert record is None + + async def test_history_manager_pop_if_history_become_empty_after_pop(self): + state = FSMContext( + storage=MemoryStorage(), key=StorageKey(bot_id=42, chat_id=-42, user_id=42) + ) + history_manager = HistoryManager(state=state) + + data = {"test_data": "test_data"} + await history_manager.push("test_state", data) + + await history_manager.pop() + + assert await history_manager._history_state.get_data() == {} + + async def test_history_manager_get(self): + state = FSMContext( + storage=MemoryStorage(), key=StorageKey(bot_id=42, chat_id=-42, user_id=42) + ) + history_manager = HistoryManager(state=state) + + data = {"test_data": "test_data"} + await history_manager.push("test_state", data) + + record = await history_manager.get() + + assert isinstance(record, MemoryStorageRecord) + assert record == MemoryStorageRecord(state="test_state", data=data) + + async def test_history_manager_get_if_history_empty(self): + state = FSMContext( + storage=MemoryStorage(), key=StorageKey(bot_id=42, chat_id=-42, user_id=42) + ) + history_manager = HistoryManager(state=state) + + record = await history_manager.get() + assert record is None + + async def test_history_manager_all(self): + state = FSMContext( + storage=MemoryStorage(), key=StorageKey(bot_id=42, chat_id=-42, user_id=42) + ) + history_manager_size = 10 + history_manager = HistoryManager(state=state, size=history_manager_size) + + data = {"test_data": "test_data"} + for i in range(history_manager_size): + await history_manager.push(f"test_state{i}", data) + + records = await history_manager.all() + + assert isinstance(records, list) + assert len(records) == history_manager_size + assert all(isinstance(record, MemoryStorageRecord) for record in records) + + async def test_history_manager_all_if_history_empty(self): + state = FSMContext( + storage=MemoryStorage(), key=StorageKey(bot_id=42, chat_id=-42, user_id=42) + ) + history_manager = HistoryManager(state=state) + + records = await history_manager.all() + assert records == [] + + async def test_history_manager_clear(self): + state = FSMContext( + storage=MemoryStorage(), key=StorageKey(bot_id=42, chat_id=-42, user_id=42) + ) + + history_manager = HistoryManager(state=state) + data = {"test_data": "test_data"} + await history_manager.push("test_state", data) + + await history_manager.clear() + + assert await history_manager._history_state.get_data() == {} + + async def test_history_manager_snapshot(self): + state = FSMContext( + storage=MemoryStorage(), key=StorageKey(bot_id=42, chat_id=-42, user_id=42) + ) + + history_manager = HistoryManager(state=state) + data = {"test_data": "test_data"} + await history_manager.push("test_state", data) + + await history_manager.snapshot() + + assert await history_manager._history_state.get_data() == { + "history": [ + {"state": "test_state", "data": data}, + { + "state": await history_manager._state.get_state(), + "data": await history_manager._state.get_data(), + }, + ] + } + + async def test_history_manager_set_state(self): + state_mock = AsyncMock(spec=FSMContext) + state_mock.storage = MemoryStorage() + state_mock.key = StorageKey(bot_id=42, chat_id=-42, user_id=42) + state_mock.set_state = AsyncMock() + state_mock.set_data = AsyncMock() + + history_manager = HistoryManager(state=state_mock) + history_manager._state = state_mock + + state = "test_state" + data = {"test_data": "test_data"} + await history_manager._set_state(state, data) + + state_mock.set_state.assert_called_once_with(state) + state_mock.set_data.assert_called_once_with(data) + + async def test_history_manager_rollback(self): + history_manager = HistoryManager( + state=FSMContext( + storage=MemoryStorage(), key=StorageKey(bot_id=42, chat_id=-42, user_id=42) + ) + ) + + data = {"test_data": "test_data"} + await history_manager.push("test_state", data) + await history_manager.push("test_state2", data) + + record = await history_manager.get() + assert record == MemoryStorageRecord(state="test_state2", data=data) + + await history_manager.rollback() + + record = await history_manager.get() + assert record == MemoryStorageRecord(state="test_state", data=data) + + async def test_history_manager_rollback_if_not_previous_state(self): + history_manager = HistoryManager( + state=FSMContext( + storage=MemoryStorage(), key=StorageKey(bot_id=42, chat_id=-42, user_id=42) + ) + ) + + data = {"test_data": "test_data"} + await history_manager.push("test_state", data) + + state = await history_manager.rollback() + assert state == "test_state" + + state = await history_manager.rollback() + assert state is None + + class TestScene: def test_scene_subclass_initialisation(self): class ParentScene(Scene):