Merge branch 'dev-3.x' into dev-3.x

This commit is contained in:
Evgen Fil 2022-07-03 22:29:18 +05:00 committed by GitHub
commit 2da987daef
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
318 changed files with 6312 additions and 2419 deletions

View file

@ -2,11 +2,15 @@ from pathlib import Path
import pytest
from _pytest.config import UsageError
from aioredis.connection import parse_url as parse_redis_url
from redis.asyncio.connection import parse_url as parse_redis_url
from aiogram import Bot
from aiogram.dispatcher.fsm.storage.memory import MemoryStorage
from aiogram.dispatcher.fsm.storage.redis import RedisStorage
from aiogram import Bot, Dispatcher
from aiogram.dispatcher.fsm.storage.memory import (
DisabledEventIsolation,
MemoryStorage,
SimpleEventIsolation,
)
from aiogram.dispatcher.fsm.storage.redis import RedisEventIsolation, RedisStorage
from tests.mocked_bot import MockedBot
DATA_DIR = Path(__file__).parent / "data"
@ -67,6 +71,42 @@ async def memory_storage():
await storage.close()
@pytest.fixture()
@pytest.mark.redis
async def redis_isolation(redis_server):
if not redis_server:
pytest.skip("Redis is not available here")
isolation = RedisEventIsolation.from_url(redis_server)
try:
await isolation.redis.info()
except ConnectionError as e:
pytest.skip(str(e))
try:
yield isolation
finally:
conn = await isolation.redis
await conn.flushdb()
await isolation.close()
@pytest.fixture()
async def lock_isolation():
isolation = SimpleEventIsolation()
try:
yield isolation
finally:
await isolation.close()
@pytest.fixture()
async def disabled_isolation():
isolation = DisabledEventIsolation()
try:
yield isolation
finally:
await isolation.close()
@pytest.fixture()
def bot():
bot = MockedBot()
@ -75,3 +115,13 @@ def bot():
yield bot
finally:
Bot.reset_current(token)
@pytest.fixture()
async def dispatcher():
dp = Dispatcher()
await dp.emit_startup()
try:
yield dp
finally:
await dp.emit_shutdown()

View file

@ -215,11 +215,11 @@ class TestBaseSession:
return await make_request(bot, method)
session = CustomSession()
assert not session.middlewares
assert not session.middleware._middlewares
session.middleware(my_middleware)
assert my_middleware in session.middlewares
assert len(session.middlewares) == 1
assert my_middleware in session.middleware
assert len(session.middleware) == 1
async def test_use_middleware(self, bot: MockedBot):
flag_before = False

View file

@ -0,0 +1,45 @@
from aiogram import Bot
from aiogram.client.session.middlewares.base import (
BaseRequestMiddleware,
NextRequestMiddlewareType,
)
from aiogram.client.session.middlewares.manager import RequestMiddlewareManager
from aiogram.methods import Response, TelegramMethod
from aiogram.types import TelegramObject
class TestMiddlewareManager:
async def test_register(self):
manager = RequestMiddlewareManager()
@manager
async def middleware(handler, event, data):
await handler(event, data)
assert middleware in manager._middlewares
manager.unregister(middleware)
assert middleware not in manager._middlewares
async def test_wrap_middlewares(self):
manager = RequestMiddlewareManager()
class MyMiddleware(BaseRequestMiddleware):
async def __call__(
self,
make_request: NextRequestMiddlewareType,
bot: Bot,
method: TelegramMethod[TelegramObject],
) -> Response[TelegramObject]:
return await make_request(bot, method)
manager.register(MyMiddleware())
@manager()
@manager
async def middleware(make_request, bot, method):
return await make_request(bot, method)
async def target_call(bot, method, timeout: int = None):
return timeout
assert await manager.wrap_middlewares(target_call, timeout=42)(None, None) == 42

View file

@ -0,0 +1,33 @@
import pytest
from aiogram.methods import AnswerWebAppQuery, Request
from aiogram.types import InlineQueryResult, SentWebAppMessage
from tests.mocked_bot import MockedBot
class TestAnswerWebAppQuery:
@pytest.mark.asyncio
async def test_method(self, bot: MockedBot):
prepare_result = bot.add_result_for(AnswerWebAppQuery, ok=True, result=SentWebAppMessage())
response: SentWebAppMessage = await AnswerWebAppQuery(
web_app_query_id="test",
result=InlineQueryResult(),
)
request: Request = bot.get_request()
assert request.method == "answerWebAppQuery"
# assert request.data == {}
assert response == prepare_result.result
@pytest.mark.asyncio
async def test_bot_method(self, bot: MockedBot):
prepare_result = bot.add_result_for(AnswerWebAppQuery, ok=True, result=SentWebAppMessage())
response: SentWebAppMessage = await bot.answer_web_app_query(
web_app_query_id="test",
result=InlineQueryResult(),
)
request: Request = bot.get_request()
assert request.method == "answerWebAppQuery"
# assert request.data == {}
assert response == prepare_result.result

View file

View file

View file

@ -0,0 +1,43 @@
import pytest
from aiogram.methods import CreateInvoiceLink, Request
from aiogram.types import LabeledPrice
from tests.mocked_bot import MockedBot
class TestCreateInvoiceLink:
@pytest.mark.asyncio
async def test_method(self, bot: MockedBot):
prepare_result = bot.add_result_for(
CreateInvoiceLink, ok=True, result="https://t.me/invoice/example"
)
response: str = await CreateInvoiceLink(
title="test",
description="test",
payload="test",
provider_token="test",
currency="BTC",
prices=[LabeledPrice(label="Test", amount=1)],
)
request: Request = bot.get_request()
assert request.method == "createInvoiceLink"
assert response == prepare_result.result
@pytest.mark.asyncio
async def test_bot_method(self, bot: MockedBot):
prepare_result = bot.add_result_for(
CreateInvoiceLink, ok=True, result="https://t.me/invoice/example"
)
response: str = await bot.create_invoice_link(
title="test",
description="test",
payload="test",
provider_token="test",
currency="BTC",
prices=[LabeledPrice(label="Test", amount=1)],
)
request: Request = bot.get_request()
assert request.method == "createInvoiceLink"
assert response == prepare_result.result

View file

View file

@ -0,0 +1,27 @@
import pytest
from aiogram.methods import GetChatMenuButton, Request
from aiogram.types import MenuButton, MenuButtonDefault
from tests.mocked_bot import MockedBot
class TestGetChatMenuButton:
@pytest.mark.asyncio
async def test_method(self, bot: MockedBot):
prepare_result = bot.add_result_for(GetChatMenuButton, ok=True, result=MenuButtonDefault())
response: MenuButton = await GetChatMenuButton()
request: Request = bot.get_request()
assert request.method == "getChatMenuButton"
# assert request.data == {}
assert response == prepare_result.result
@pytest.mark.asyncio
async def test_bot_method(self, bot: MockedBot):
prepare_result = bot.add_result_for(GetChatMenuButton, ok=True, result=MenuButtonDefault())
response: MenuButton = await bot.get_chat_menu_button()
request: Request = bot.get_request()
assert request.method == "getChatMenuButton"
# assert request.data == {}
assert response == prepare_result.result

View file

@ -0,0 +1,53 @@
import pytest
from aiogram.methods import GetMyDefaultAdministratorRights, Request
from aiogram.types import ChatAdministratorRights
from tests.mocked_bot import MockedBot
class TestGetMyDefaultAdministratorRights:
@pytest.mark.asyncio
async def test_method(self, bot: MockedBot):
prepare_result = bot.add_result_for(
GetMyDefaultAdministratorRights,
ok=True,
result=ChatAdministratorRights(
is_anonymous=False,
can_manage_chat=False,
can_delete_messages=False,
can_manage_video_chats=False,
can_restrict_members=False,
can_promote_members=False,
can_change_info=False,
can_invite_users=False,
),
)
response: ChatAdministratorRights = await GetMyDefaultAdministratorRights()
request: Request = bot.get_request()
assert request.method == "getMyDefaultAdministratorRights"
# assert request.data == {}
assert response == prepare_result.result
@pytest.mark.asyncio
async def test_bot_method(self, bot: MockedBot):
prepare_result = bot.add_result_for(
GetMyDefaultAdministratorRights,
ok=True,
result=ChatAdministratorRights(
is_anonymous=False,
can_manage_chat=False,
can_delete_messages=False,
can_manage_video_chats=False,
can_restrict_members=False,
can_promote_members=False,
can_change_info=False,
can_invite_users=False,
),
)
response: ChatAdministratorRights = await bot.get_my_default_administrator_rights()
request: Request = bot.get_request()
assert request.method == "getMyDefaultAdministratorRights"
# assert request.data == {}
assert response == prepare_result.result

View file

@ -16,6 +16,7 @@ class TestGetStickerSet:
name="test",
title="test",
is_animated=False,
is_video=False,
contains_masks=False,
stickers=[
Sticker(
@ -23,6 +24,7 @@ class TestGetStickerSet:
width=42,
height=42,
is_animated=False,
is_video=False,
file_unique_id="file id",
)
],
@ -42,6 +44,7 @@ class TestGetStickerSet:
name="test",
title="test",
is_animated=False,
is_video=False,
contains_masks=False,
stickers=[
Sticker(
@ -49,6 +52,7 @@ class TestGetStickerSet:
width=42,
height=42,
is_animated=False,
is_video=False,
file_unique_id="file id",
)
],

View file

@ -3,7 +3,7 @@ import datetime
import pytest
from aiogram.methods import Request, SendMessage
from aiogram.types import Chat, Message
from aiogram.types import Chat, ForceReply, Message
from tests.mocked_bot import MockedBot
pytestmark = pytest.mark.asyncio
@ -43,3 +43,8 @@ class TestSendMessage:
request: Request = bot.get_request()
assert request.method == "sendMessage"
assert response == prepare_result.result
async def test_force_reply(self):
# https://github.com/aiogram/aiogram/issues/901
method = SendMessage(text="test", chat_id=42, reply_markup=ForceReply())
assert isinstance(method.reply_markup, ForceReply)

View file

@ -22,6 +22,7 @@ class TestSendSticker:
width=42,
height=42,
is_animated=False,
is_video=False,
file_unique_id="file id",
),
chat=Chat(id=42, type="private"),
@ -45,6 +46,7 @@ class TestSendSticker:
width=42,
height=42,
is_animated=False,
is_video=False,
file_unique_id="file id",
),
chat=Chat(id=42, type="private"),

View file

@ -0,0 +1,26 @@
import pytest
from aiogram.methods import Request, SetChatMenuButton
from tests.mocked_bot import MockedBot
class TestSetChatMenuButton:
@pytest.mark.asyncio
async def test_method(self, bot: MockedBot):
prepare_result = bot.add_result_for(SetChatMenuButton, ok=True, result=True)
response: bool = await SetChatMenuButton()
request: Request = bot.get_request()
assert request.method == "setChatMenuButton"
# assert request.data == {}
assert response == prepare_result.result
@pytest.mark.asyncio
async def test_bot_method(self, bot: MockedBot):
prepare_result = bot.add_result_for(SetChatMenuButton, ok=True, result=True)
response: bool = await bot.set_chat_menu_button()
request: Request = bot.get_request()
assert request.method == "setChatMenuButton"
# assert request.data == {}
assert response == prepare_result.result

View file

@ -0,0 +1,26 @@
import pytest
from aiogram.methods import Request, SetMyDefaultAdministratorRights
from tests.mocked_bot import MockedBot
class TestSetMyDefaultAdministratorRights:
@pytest.mark.asyncio
async def test_method(self, bot: MockedBot):
prepare_result = bot.add_result_for(SetMyDefaultAdministratorRights, ok=True, result=True)
response: bool = await SetMyDefaultAdministratorRights()
request: Request = bot.get_request()
assert request.method == "setMyDefaultAdministratorRights"
# assert request.data == {}
assert response == prepare_result.result
@pytest.mark.asyncio
async def test_bot_method(self, bot: MockedBot):
prepare_result = bot.add_result_for(SetMyDefaultAdministratorRights, ok=True, result=True)
response: bool = await bot.set_my_default_administrator_rights()
request: Request = bot.get_request()
assert request.method == "setMyDefaultAdministratorRights"
# assert request.data == {}
assert response == prepare_result.result

View file

View file

@ -1,3 +1,7 @@
from typing import Optional
from pytest import mark, param
from aiogram.types import Chat
@ -15,3 +19,24 @@ class TestChat:
method = chat.unban_sender_chat(sender_chat_id=-1337)
assert method.chat_id == chat.id
assert method.sender_chat_id == -1337
@mark.parametrize(
"first,last,title,chat_type,result",
[
param("First", None, None, "private", "First", id="private_first_only"),
param("First", "Last", None, "private", "First Last", id="private_with_last"),
param(None, None, "Title", "group", "Title", id="group_with_title"),
param(None, None, "Title", "supergroup", "Title", id="supergroup_with_title"),
param(None, None, "Title", "channel", "Title", id="channel_with_title"),
],
)
def test_full_name(
self,
first: Optional[str],
last: Optional[str],
title: Optional[str],
chat_type: str,
result: str,
):
chat = Chat(id=42, first_name=first, last_name=last, title=title, type=chat_type)
assert chat.full_name == result

View file

@ -47,16 +47,19 @@ from aiogram.types import (
PhotoSize,
Poll,
PollOption,
ProximityAlertTriggered,
Sticker,
SuccessfulPayment,
User,
Venue,
Video,
VideoChatEnded,
VideoChatParticipantsInvited,
VideoChatScheduled,
VideoChatStarted,
VideoNote,
Voice,
VoiceChatEnded,
VoiceChatParticipantsInvited,
VoiceChatStarted,
WebAppData,
)
from aiogram.types.message import ContentType, Message
@ -122,6 +125,7 @@ TEST_MESSAGE_STICKER = Message(
width=42,
height=42,
is_animated=False,
is_video=False,
),
chat=Chat(id=42, type="private"),
from_user=User(id=42, is_bot=False, first_name="Test"),
@ -281,6 +285,20 @@ TEST_MESSAGE_GROUP_CHAT_CREATED = Message(
chat=Chat(id=42, type="private"),
from_user=User(id=42, is_bot=False, first_name="Test"),
)
TEST_MESSAGE_SUPERGROUP_CHAT_CREATED = Message(
message_id=42,
date=datetime.datetime.now(),
supergroup_chat_created=True,
chat=Chat(id=-10042, type="supergroup"),
from_user=User(id=42, is_bot=False, first_name="Test"),
)
TEST_MESSAGE_CHANNEL_CHAT_CREATED = Message(
message_id=42,
date=datetime.datetime.now(),
channel_chat_created=True,
chat=Chat(id=-10042, type="channel"),
from_user=User(id=42, is_bot=False, first_name="Test"),
)
TEST_MESSAGE_PASSPORT_DATA = Message(
message_id=42,
date=datetime.datetime.now(),
@ -291,6 +309,17 @@ TEST_MESSAGE_PASSPORT_DATA = Message(
chat=Chat(id=42, type="private"),
from_user=User(id=42, is_bot=False, first_name="Test"),
)
TEST_MESSAGE_PROXIMITY_ALERT_TRIGGERED = Message(
message_id=42,
date=datetime.datetime.now(),
chat=Chat(id=42, type="supergroup"),
from_user=User(id=42, is_bot=False, first_name="Test"),
proximity_alert_triggered=ProximityAlertTriggered(
traveler=User(id=1, is_bot=False, first_name="Traveler"),
watcher=User(id=2, is_bot=False, first_name="Watcher"),
distance=42,
),
)
TEST_MESSAGE_POLL = Message(
message_id=42,
date=datetime.datetime.now(),
@ -318,29 +347,38 @@ TEST_MESSAGE_MESSAGE_AUTO_DELETE_TIMER_CHANGED = Message(
message_auto_delete_timer_changed=MessageAutoDeleteTimerChanged(message_auto_delete_time=42),
from_user=User(id=42, is_bot=False, first_name="Test"),
)
TEST_MESSAGE_VOICE_CHAT_STARTED = Message(
TEST_MESSAGE_VIDEO_CHAT_STARTED = Message(
message_id=42,
date=datetime.datetime.now(),
chat=Chat(id=42, type="private"),
from_user=User(id=42, is_bot=False, first_name="Test"),
voice_chat_started=VoiceChatStarted(),
video_chat_started=VideoChatStarted(),
)
TEST_MESSAGE_VOICE_CHAT_ENDED = Message(
TEST_MESSAGE_VIDEO_CHAT_ENDED = Message(
message_id=42,
date=datetime.datetime.now(),
chat=Chat(id=42, type="private"),
from_user=User(id=42, is_bot=False, first_name="Test"),
voice_chat_ended=VoiceChatEnded(duration=42),
video_chat_ended=VideoChatEnded(duration=42),
)
TEST_MESSAGE_VOICE_CHAT_PARTICIPANTS_INVITED = Message(
TEST_MESSAGE_VIDEO_CHAT_PARTICIPANTS_INVITED = Message(
message_id=42,
date=datetime.datetime.now(),
chat=Chat(id=42, type="private"),
from_user=User(id=42, is_bot=False, first_name="Test"),
voice_chat_participants_invited=VoiceChatParticipantsInvited(
video_chat_participants_invited=VideoChatParticipantsInvited(
users=[User(id=69, is_bot=False, first_name="Test")]
),
)
TEST_MESSAGE_VIDEO_CHAT_SCHEDULED = Message(
message_id=42,
date=datetime.datetime.now(),
chat=Chat(id=42, type="private"),
from_user=User(id=42, is_bot=False, first_name="Test"),
video_chat_scheduled=VideoChatScheduled(
start_date=datetime.datetime.now(),
),
)
TEST_MESSAGE_DICE = Message(
message_id=42,
date=datetime.datetime.now(),
@ -348,6 +386,13 @@ TEST_MESSAGE_DICE = Message(
dice=Dice(value=6, emoji="X"),
from_user=User(id=42, is_bot=False, first_name="Test"),
)
TEST_MESSAGE_WEB_APP_DATA = Message(
message_id=42,
date=datetime.datetime.now(),
chat=Chat(id=42, type="private"),
web_app_data=WebAppData(data="test", button_text="Test"),
from_user=User(id=42, is_bot=False, first_name="Test"),
)
TEST_MESSAGE_UNKNOWN = Message(
message_id=42,
date=datetime.datetime.now(),
@ -385,19 +430,24 @@ class TestMessage:
[TEST_MESSAGE_NEW_CHAT_PHOTO, ContentType.NEW_CHAT_PHOTO],
[TEST_MESSAGE_DELETE_CHAT_PHOTO, ContentType.DELETE_CHAT_PHOTO],
[TEST_MESSAGE_GROUP_CHAT_CREATED, ContentType.GROUP_CHAT_CREATED],
[TEST_MESSAGE_SUPERGROUP_CHAT_CREATED, ContentType.SUPERGROUP_CHAT_CREATED],
[TEST_MESSAGE_CHANNEL_CHAT_CREATED, ContentType.CHANNEL_CHAT_CREATED],
[TEST_MESSAGE_PASSPORT_DATA, ContentType.PASSPORT_DATA],
[TEST_MESSAGE_PROXIMITY_ALERT_TRIGGERED, ContentType.PROXIMITY_ALERT_TRIGGERED],
[TEST_MESSAGE_POLL, ContentType.POLL],
[
TEST_MESSAGE_MESSAGE_AUTO_DELETE_TIMER_CHANGED,
ContentType.MESSAGE_AUTO_DELETE_TIMER_CHANGED,
],
[TEST_MESSAGE_VOICE_CHAT_STARTED, ContentType.VOICE_CHAT_STARTED],
[TEST_MESSAGE_VOICE_CHAT_ENDED, ContentType.VOICE_CHAT_ENDED],
[TEST_MESSAGE_VIDEO_CHAT_SCHEDULED, ContentType.VIDEO_CHAT_SCHEDULED],
[TEST_MESSAGE_VIDEO_CHAT_STARTED, ContentType.VIDEO_CHAT_STARTED],
[TEST_MESSAGE_VIDEO_CHAT_ENDED, ContentType.VIDEO_CHAT_ENDED],
[
TEST_MESSAGE_VOICE_CHAT_PARTICIPANTS_INVITED,
ContentType.VOICE_CHAT_PARTICIPANTS_INVITED,
TEST_MESSAGE_VIDEO_CHAT_PARTICIPANTS_INVITED,
ContentType.VIDEO_CHAT_PARTICIPANTS_INVITED,
],
[TEST_MESSAGE_DICE, ContentType.DICE],
[TEST_MESSAGE_WEB_APP_DATA, ContentType.WEB_APP_DATA],
[TEST_MESSAGE_UNKNOWN, ContentType.UNKNOWN],
],
)
@ -532,12 +582,15 @@ class TestMessage:
[TEST_MESSAGE_NEW_CHAT_PHOTO, None],
[TEST_MESSAGE_DELETE_CHAT_PHOTO, None],
[TEST_MESSAGE_GROUP_CHAT_CREATED, None],
[TEST_MESSAGE_SUPERGROUP_CHAT_CREATED, None],
[TEST_MESSAGE_CHANNEL_CHAT_CREATED, None],
[TEST_MESSAGE_PASSPORT_DATA, None],
[TEST_MESSAGE_PROXIMITY_ALERT_TRIGGERED, None],
[TEST_MESSAGE_POLL, SendPoll],
[TEST_MESSAGE_MESSAGE_AUTO_DELETE_TIMER_CHANGED, None],
[TEST_MESSAGE_VOICE_CHAT_STARTED, None],
[TEST_MESSAGE_VOICE_CHAT_ENDED, None],
[TEST_MESSAGE_VOICE_CHAT_PARTICIPANTS_INVITED, None],
[TEST_MESSAGE_VIDEO_CHAT_STARTED, None],
[TEST_MESSAGE_VIDEO_CHAT_ENDED, None],
[TEST_MESSAGE_VIDEO_CHAT_PARTICIPANTS_INVITED, None],
[TEST_MESSAGE_DICE, SendDice],
[TEST_MESSAGE_UNKNOWN, None],
],
@ -641,13 +694,15 @@ class TestMessage:
assert method.message_id == message.message_id
@pytest.mark.parametrize(
"text,entities,correct",
"text,entities,mode,expected_value",
[
["test", [MessageEntity(type="bold", offset=0, length=4)], True],
["", [], False],
["test", [MessageEntity(type="bold", offset=0, length=4)], "html", "<b>test</b>"],
["test", [MessageEntity(type="bold", offset=0, length=4)], "md", "*test*"],
["", [], "html", ""],
["", [], "md", ""],
],
)
def test_html_text(self, text, entities, correct):
def test_html_text(self, text, entities, mode, expected_value):
message = Message(
message_id=42,
chat=Chat(id=42, type="private"),
@ -655,11 +710,4 @@ class TestMessage:
text=text,
entities=entities,
)
if correct:
assert message.html_text
assert message.md_text
else:
with pytest.raises(TypeError):
assert message.html_text
with pytest.raises(TypeError):
assert message.md_text
assert getattr(message, f"{mode}_text") == expected_value

View file

@ -3,11 +3,11 @@ from tests.deprecated import check_deprecated
class TestMessageEntity:
def test_extract_from(self):
entity = MessageEntity(type="hashtag", length=4, offset=5)
assert entity.extract_from("#foo #bar #baz") == "#bar"
def test_extract(self):
entity = MessageEntity(type="hashtag", length=4, offset=5)
assert entity.extract("#foo #bar #baz") == "#bar"
def test_get_text(self):
entity = MessageEntity(type="hashtag", length=4, offset=5)
with check_deprecated("3.2", exception=AttributeError):
assert entity.get_text("#foo #bar #baz") == "#bar"
with check_deprecated("3.0b5", exception=AttributeError):
assert entity.extract("#foo #bar #baz") == "#bar"

View file

@ -1,31 +0,0 @@
import pytest
from aiogram.dispatcher.event.telegram import TelegramEventObserver
from aiogram.dispatcher.router import Router
from tests.deprecated import check_deprecated
OBSERVERS = {
"callback_query",
"channel_post",
"chosen_inline_result",
"edited_channel_post",
"edited_message",
"errors",
"inline_query",
"message",
"poll",
"poll_answer",
"pre_checkout_query",
"shipping_query",
}
DEPRECATED_OBSERVERS = {observer + "_handler" for observer in OBSERVERS}
@pytest.mark.parametrize("observer_name", DEPRECATED_OBSERVERS)
def test_deprecated_handlers_name(observer_name: str):
router = Router()
with check_deprecated("3.2", exception=AttributeError):
observer = getattr(router, observer_name)
assert isinstance(observer, TelegramEventObserver)

View file

@ -74,22 +74,32 @@ class TestDispatcher:
assert dp.update.handlers
assert dp.update.handlers[0].callback == dp._listen_update
assert dp.update.outer_middlewares
assert dp.update.outer_middleware
def test_parent_router(self):
def test_data_bind(self):
dp = Dispatcher()
assert dp.get("foo") is None
assert dp.get("foo", 42) == 42
dp["foo"] = 1
assert dp.workflow_data["foo"] == 1
assert dp["foo"] == 1
del dp["foo"]
assert "foo" not in dp.workflow_data
def test_storage_property(self, dispatcher: Dispatcher):
assert dispatcher.storage is dispatcher.fsm.storage
def test_parent_router(self, dispatcher: Dispatcher):
with pytest.raises(RuntimeError):
dp.parent_router = Router()
assert dp.parent_router is None
dp._parent_router = Router()
assert dp.parent_router is None
dispatcher.parent_router = Router()
assert dispatcher.parent_router is None
dispatcher._parent_router = Router()
assert dispatcher.parent_router is None
@pytest.mark.parametrize("isolate_events", (True, False))
async def test_feed_update(self, isolate_events):
dp = Dispatcher(isolate_events=isolate_events)
bot = Bot("42:TEST")
@dp.message()
async def test_feed_update(self, dispatcher: Dispatcher, bot: MockedBot):
@dispatcher.message()
async def my_handler(message: Message, **kwargs):
assert "bot" in kwargs
assert isinstance(kwargs["bot"], Bot)
@ -97,7 +107,7 @@ class TestDispatcher:
return message.text
results_count = 0
result = await dp.feed_update(
result = await dispatcher.feed_update(
bot=bot,
update=Update(
update_id=42,

View file

@ -0,0 +1,42 @@
from functools import partial
from aiogram.dispatcher.middlewares.manager import MiddlewareManager
class TestMiddlewareManager:
async def test_register(self):
manager = MiddlewareManager()
@manager
async def middleware(handler, event, data):
await handler(event, data)
assert middleware in manager._middlewares
manager.unregister(middleware)
assert middleware not in manager._middlewares
async def test_wrap_middlewares(self):
manager = MiddlewareManager()
async def target(*args, **kwargs):
kwargs["target"] = True
kwargs["stack"].append(-1)
return kwargs
async def middleware1(handler, event, data):
data["mw1"] = True
data["stack"].append(1)
return await handler(event, data)
async def middleware2(handler, event, data):
data["mw2"] = True
data["stack"].append(2)
return await handler(event, data)
wrapped = manager.wrap_middlewares([middleware1, middleware2], target)
assert isinstance(wrapped, partial)
assert wrapped.func is middleware1
result = await wrapped(None, {"stack": []})
assert result == {"mw1": True, "mw2": True, "target": True, "stack": [1, 2, -1]}

View file

@ -297,10 +297,9 @@ class TestTelegramEventObserver:
def test_register_middleware(self, middleware_type):
event_observer = TelegramEventObserver(Router(), "test")
middlewares = getattr(event_observer, f"{middleware_type}s")
decorator = getattr(event_observer, middleware_type)
middlewares = getattr(event_observer, middleware_type)
@decorator
@middlewares
async def my_middleware1(handler, event, data):
pass
@ -308,7 +307,7 @@ class TestTelegramEventObserver:
assert my_middleware1.__name__ == "my_middleware1"
assert my_middleware1 in middlewares
@decorator()
@middlewares()
async def my_middleware2(handler, event, data):
pass
@ -319,13 +318,13 @@ class TestTelegramEventObserver:
async def my_middleware3(handler, event, data):
pass
decorator(my_middleware3)
middlewares(my_middleware3)
assert my_middleware3 is not None
assert my_middleware3.__name__ == "my_middleware3"
assert my_middleware3 in middlewares
assert middlewares == [my_middleware1, my_middleware2, my_middleware3]
assert list(middlewares) == [my_middleware1, my_middleware2, my_middleware3]
def test_register_global_filters(self):
router = Router(use_builtin_filters=False)

View file

@ -30,7 +30,7 @@ class MyCallback(CallbackData, prefix="test"):
class TestCallbackData:
def test_init_subclass_prefix_required(self):
assert MyCallback.prefix == "test"
assert MyCallback.__prefix__ == "test"
with pytest.raises(ValueError, match="prefix required.+"):
@ -38,12 +38,12 @@ class TestCallbackData:
pass
def test_init_subclass_sep_validation(self):
assert MyCallback.sep == ":"
assert MyCallback.__separator__ == ":"
class MyCallback2(CallbackData, prefix="test2", sep="@"):
pass
assert MyCallback2.sep == "@"
assert MyCallback2.__separator__ == "@"
with pytest.raises(ValueError, match="Separator symbol '@' .+ 'sp@m'"):

View file

@ -0,0 +1,345 @@
from datetime import datetime
import pytest
from aiogram.dispatcher.filters.chat_member_updated import (
ADMINISTRATOR,
IS_MEMBER,
JOIN_TRANSITION,
LEAVE_TRANSITION,
ChatMemberUpdatedFilter,
_MemberStatusGroupMarker,
_MemberStatusMarker,
_MemberStatusTransition,
)
from aiogram.types import Chat, ChatMember, ChatMemberUpdated, User
class TestMemberStatusMarker:
def test_str(self):
marker = _MemberStatusMarker("test")
assert str(marker) == "TEST"
assert str(+marker) == "+TEST"
assert str(-marker) == "-TEST"
def test_pos(self):
marker = _MemberStatusMarker("test")
assert marker.is_member is None
positive_marker = +marker
assert positive_marker is not marker
assert marker.is_member is None
assert positive_marker.is_member is True
def test_neg(self):
marker = _MemberStatusMarker("test")
assert marker.is_member is None
negative_marker = -marker
assert negative_marker is not marker
assert marker.is_member is None
assert negative_marker.is_member is False
def test_or(self):
marker1 = _MemberStatusMarker("test1")
marker2 = _MemberStatusMarker("test2")
combination = marker1 | marker2
assert isinstance(combination, _MemberStatusGroupMarker)
assert marker1 in combination.statuses
assert marker2 in combination.statuses
combination2 = marker1 | marker1
assert isinstance(combination2, _MemberStatusGroupMarker)
assert len(combination2.statuses) == 1
marker3 = _MemberStatusMarker("test3")
combination3 = marker3 | combination
assert isinstance(combination3, _MemberStatusGroupMarker)
assert marker3 in combination3.statuses
assert len(combination3.statuses) == 3
assert combination3 is not combination
with pytest.raises(TypeError):
marker1 | 42
def test_rshift(self):
marker1 = _MemberStatusMarker("test1")
marker2 = _MemberStatusMarker("test2")
marker3 = _MemberStatusMarker("test3")
transition = marker1 >> marker2
assert isinstance(transition, _MemberStatusTransition)
assert marker1 in transition.old.statuses
assert marker2 in transition.new.statuses
transition2 = marker1 >> (marker2 | marker3)
assert isinstance(transition2, _MemberStatusTransition)
with pytest.raises(TypeError):
marker1 >> 42
def test_lshift(self):
marker1 = _MemberStatusMarker("test1")
marker2 = _MemberStatusMarker("test2")
marker3 = _MemberStatusMarker("test3")
transition = marker1 << marker2
assert isinstance(transition, _MemberStatusTransition)
assert marker2 in transition.old.statuses
assert marker1 in transition.new.statuses
transition2 = marker1 << (marker2 | marker3)
assert isinstance(transition2, _MemberStatusTransition)
with pytest.raises(TypeError):
marker1 << 42
def test_hash(self):
marker1 = _MemberStatusMarker("test1")
marker1_1 = _MemberStatusMarker("test1")
marker2 = _MemberStatusMarker("test2")
assert hash(marker1) != hash(marker2)
assert hash(marker1) == hash(marker1_1)
assert hash(marker1) != hash(-marker1)
@pytest.mark.parametrize(
"name,is_member,member,result",
[
["test", None, ChatMember(status="member"), False],
["test", None, ChatMember(status="test"), True],
["test", True, ChatMember(status="test"), False],
["test", True, ChatMember(status="test", is_member=True), True],
["test", True, ChatMember(status="test", is_member=False), False],
],
)
def test_check(self, name, is_member, member, result):
marker = _MemberStatusMarker(name, is_member=is_member)
assert marker.check(member=member) == result
class TestMemberStatusGroupMarker:
def test_init_unique(self):
marker1 = _MemberStatusMarker("test1")
marker2 = _MemberStatusMarker("test2")
marker3 = _MemberStatusMarker("test3")
group = _MemberStatusGroupMarker(marker1, marker1, marker2, marker3)
assert len(group.statuses) == 3
def test_init_empty(self):
with pytest.raises(ValueError):
_MemberStatusGroupMarker()
def test_or(self):
marker1 = _MemberStatusMarker("test1")
marker2 = _MemberStatusMarker("test2")
marker3 = _MemberStatusMarker("test3")
marker4 = _MemberStatusMarker("test4")
group1 = _MemberStatusGroupMarker(marker1, marker2)
group2 = _MemberStatusGroupMarker(marker3, marker4)
group3 = group1 | marker3
assert isinstance(group3, _MemberStatusGroupMarker)
assert len(group3.statuses) == 3
group4 = group1 | group2
assert isinstance(group4, _MemberStatusGroupMarker)
assert len(group4.statuses) == 4
with pytest.raises(TypeError):
group4 | 42
def test_rshift(self):
marker1 = _MemberStatusMarker("test1")
marker2 = _MemberStatusMarker("test2")
marker3 = _MemberStatusMarker("test3")
group1 = _MemberStatusGroupMarker(marker1, marker2)
group2 = _MemberStatusGroupMarker(marker1, marker3)
transition1 = group1 >> marker1
assert isinstance(transition1, _MemberStatusTransition)
assert transition1.old is group1
assert marker1 in transition1.new.statuses
transition2 = group1 >> group2
assert isinstance(transition2, _MemberStatusTransition)
with pytest.raises(TypeError):
group1 >> 42
def test_lshift(self):
marker1 = _MemberStatusMarker("test1")
marker2 = _MemberStatusMarker("test2")
marker3 = _MemberStatusMarker("test3")
group1 = _MemberStatusGroupMarker(marker1, marker2)
group2 = _MemberStatusGroupMarker(marker1, marker3)
transition1 = group1 << marker1
assert isinstance(transition1, _MemberStatusTransition)
assert transition1.new is group1
assert marker1 in transition1.old.statuses
transition2 = group1 << group2
assert isinstance(transition2, _MemberStatusTransition)
with pytest.raises(TypeError):
group1 << 42
def test_str(self):
marker1 = _MemberStatusMarker("test1")
marker1_1 = +marker1
marker2 = _MemberStatusMarker("test2")
group1 = marker1 | marker1
assert str(group1) == "TEST1"
group2 = marker1 | marker2
assert str(group2) == "(TEST1 | TEST2)"
group3 = marker1 | marker1_1
assert str(group3) == "(+TEST1 | TEST1)"
@pytest.mark.parametrize(
"status,result",
[
["test", False],
["test1", True],
["test2", True],
],
)
def test_check(self, status, result):
marker1 = _MemberStatusMarker("test1")
marker2 = _MemberStatusMarker("test2")
group = marker1 | marker2
assert group.check(member=ChatMember(status=status)) is result
class TestMemberStatusTransition:
def test_invert(self):
marker1 = _MemberStatusMarker("test1")
marker2 = _MemberStatusMarker("test2")
transition1 = marker1 >> marker2
transition2 = ~transition1
assert transition1 is not transition2
assert transition1.old == transition2.new
assert transition1.new == transition2.old
assert str(transition1) == "TEST1 >> TEST2"
assert str(transition2) == "TEST2 >> TEST1"
@pytest.mark.parametrize(
"transition,old,new,result",
[
[JOIN_TRANSITION, ChatMember(status="left"), ChatMember(status="member"), True],
[
JOIN_TRANSITION,
ChatMember(status="restricted", is_member=True),
ChatMember(status="member"),
False,
],
[
JOIN_TRANSITION,
ChatMember(status="restricted", is_member=False),
ChatMember(status="member"),
True,
],
[
JOIN_TRANSITION,
ChatMember(status="member"),
ChatMember(status="restricted", is_member=False),
False,
],
[
LEAVE_TRANSITION,
ChatMember(status="member"),
ChatMember(status="restricted", is_member=False),
True,
],
],
)
def test_check(self, transition, old, new, result):
assert transition.check(old=old, new=new) == result
class TestChatMemberUpdatedStatusFilter:
@pytest.mark.asyncio
@pytest.mark.parametrize(
"transition,old,new,result",
[
[JOIN_TRANSITION, ChatMember(status="left"), ChatMember(status="member"), True],
[
JOIN_TRANSITION,
ChatMember(status="restricted", is_member=True),
ChatMember(status="member"),
False,
],
[
JOIN_TRANSITION,
ChatMember(status="restricted", is_member=False),
ChatMember(status="member"),
True,
],
[
JOIN_TRANSITION,
ChatMember(status="member"),
ChatMember(status="restricted", is_member=False),
False,
],
[
LEAVE_TRANSITION,
ChatMember(status="member"),
ChatMember(status="restricted", is_member=False),
True,
],
[
ADMINISTRATOR,
ChatMember(status="member"),
ChatMember(status="administrator"),
True,
],
[
IS_MEMBER,
ChatMember(status="restricted", is_member=False),
ChatMember(status="member"),
True,
],
],
)
async def test_call(self, transition, old, new, result):
updated_filter = ChatMemberUpdatedFilter(member_status_changed=transition)
user = User(id=42, first_name="Test", is_bot=False)
update = {
"user": user,
"until_date": datetime.now(),
"is_anonymous": False,
"can_be_edited": True,
"can_manage_chat": True,
"can_delete_messages": True,
"can_manage_video_chats": True,
"can_restrict_members": True,
"can_promote_members": True,
"can_change_info": True,
"can_invite_users": True,
"can_post_messages": True,
"can_edit_messages": True,
"can_pin_messages": True,
"can_send_messages": True,
"can_send_media_messages": True,
"can_send_polls": True,
"can_send_other_messages": True,
"can_add_web_page_previews": True,
}
event = ChatMemberUpdated(
chat=Chat(id=42, type="test"),
from_user=user,
old_chat_member=old.copy(update=update),
new_chat_member=new.copy(update=update),
date=datetime.now(),
)
assert await updated_filter(event) is result

View file

@ -92,6 +92,18 @@ class TestCommandFilter:
command = Command(commands=["test"])
assert bool(await command(message=message, bot=bot)) is result
async def test_command_magic_result(self, bot: MockedBot):
message = Message(
message_id=0,
text="/test 42",
chat=Chat(id=42, type="private"),
date=datetime.datetime.now(),
)
command = Command(commands=["test"], command_magic=(F.args.as_("args")))
result = await command(message=message, bot=bot)
assert "args" in result
assert result["args"] == "42"
class TestCommandObject:
@pytest.mark.parametrize(

View file

@ -2,7 +2,9 @@ import re
import pytest
from aiogram import Dispatcher
from aiogram.dispatcher.filters import ExceptionMessageFilter, ExceptionTypeFilter
from aiogram.types import Update
pytestmark = pytest.mark.asyncio
@ -16,10 +18,10 @@ class TestExceptionMessageFilter:
async def test_match(self):
obj = ExceptionMessageFilter(pattern="KABOOM")
result = await obj(Exception())
result = await obj(Update(update_id=0), exception=Exception())
assert not result
result = await obj(Exception("KABOOM"))
result = await obj(Update(update_id=0), exception=Exception("KABOOM"))
assert isinstance(result, dict)
assert "match_exception" in result
@ -46,6 +48,21 @@ class TestExceptionTypeFilter:
async def test_check(self, exception: Exception, value: bool):
obj = ExceptionTypeFilter(exception=MyException)
result = await obj(exception)
result = await obj(Update(update_id=0), exception=exception)
assert result == value
class TestDispatchException:
async def test_handle_exception(self, bot):
dp = Dispatcher()
@dp.update()
async def update_handler(update):
raise ValueError("KABOOM")
@dp.errors(ExceptionMessageFilter(pattern="KABOOM"))
async def handler0(update, exception):
return "Handled"
assert await dp.feed_update(bot, Update(update_id=0)) == "Handled"

View file

@ -0,0 +1,37 @@
import pytest
from aiogram.dispatcher.filters import Text, and_f, invert_f, or_f
from aiogram.dispatcher.filters.logic import _AndFilter, _InvertFilter, _OrFilter
class TestLogic:
@pytest.mark.parametrize(
"obj,case,result",
[
[True, and_f(lambda t: t is True, lambda t: t is True), True],
[True, and_f(lambda t: t is True, lambda t: t is False), False],
[True, and_f(lambda t: t is False, lambda t: t is False), False],
[True, and_f(lambda t: {"t": t}, lambda t: t is False), False],
[True, and_f(lambda t: {"t": t}, lambda t: t is True), {"t": True}],
[True, or_f(lambda t: t is True, lambda t: t is True), True],
[True, or_f(lambda t: t is True, lambda t: t is False), True],
[True, or_f(lambda t: t is False, lambda t: t is False), False],
[True, or_f(lambda t: t is False, lambda t: t is True), True],
[True, or_f(lambda t: t is False, lambda t: {"t": t}), {"t": True}],
[True, or_f(lambda t: {"t": t}, lambda t: {"a": 42}), {"t": True}],
[True, invert_f(lambda t: t is False), True],
],
)
async def test_logic(self, obj, case, result):
assert await case(obj) == result
@pytest.mark.parametrize(
"case,type_",
[
[Text(text="test") | Text(text="test"), _OrFilter],
[Text(text="test") & Text(text="test"), _AndFilter],
[~Text(text="test"), _InvertFilter],
],
)
def test_dunder_methods(self, case, type_):
assert isinstance(case, type_)

View file

@ -22,9 +22,9 @@ class TestMagicDataFilter:
assert value.spam is True
return value
f = MagicData(magic_data=F.func(check))
f = MagicData(magic_data=F.func(check).as_("test"))
result = await f(Update(update_id=123), "foo", "bar", spam=True)
assert called
assert isinstance(result, bool)
assert result
assert isinstance(result, dict)
assert result["test"]

View file

@ -0,0 +1,66 @@
import pytest
from aiogram.dispatcher.flags.flag import Flag, FlagDecorator, FlagGenerator
@pytest.fixture(name="flag")
def flag_fixture() -> Flag:
return Flag("test", True)
@pytest.fixture(name="flag_decorator")
def flag_decorator_fixture(flag: Flag) -> FlagDecorator:
return FlagDecorator(flag)
@pytest.fixture(name="flag_generator")
def flag_flag_generator() -> FlagGenerator:
return FlagGenerator()
class TestFlagDecorator:
def test_with_value(self, flag_decorator: FlagDecorator):
new_decorator = flag_decorator._with_value(True)
assert new_decorator is not flag_decorator
assert new_decorator.flag is not flag_decorator.flag
assert new_decorator.flag
def test_call_invalid(self, flag_decorator: FlagDecorator):
with pytest.raises(ValueError):
flag_decorator(True, test=True)
def test_call_with_function(self, flag_decorator: FlagDecorator):
def func():
pass
decorated = flag_decorator(func)
assert decorated is func
assert hasattr(decorated, "aiogram_flag")
def test_call_with_arg(self, flag_decorator: FlagDecorator):
new_decorator = flag_decorator("hello")
assert new_decorator is not flag_decorator
assert new_decorator.flag.value == "hello"
def test_call_with_kwargs(self, flag_decorator: FlagDecorator):
new_decorator = flag_decorator(test=True)
assert new_decorator is not flag_decorator
assert isinstance(new_decorator.flag.value, dict)
assert "test" in new_decorator.flag.value
class TestFlagGenerator:
def test_getattr(self):
generator = FlagGenerator()
assert isinstance(generator.foo, FlagDecorator)
assert isinstance(generator.bar, FlagDecorator)
assert generator.foo is not generator.foo
assert generator.foo is not generator.bar
def test_failed_getattr(self):
generator = FlagGenerator()
with pytest.raises(AttributeError):
generator._something

View file

@ -0,0 +1,64 @@
from unittest.mock import patch
import pytest
from aiogram import F
from aiogram.dispatcher.event.handler import HandlerObject
from aiogram.dispatcher.flags.getter import (
check_flags,
extract_flags,
extract_flags_from_object,
get_flag,
)
class TestGetters:
def test_extract_flags_from_object(self):
def func():
pass
assert extract_flags_from_object(func) == {}
func.aiogram_flag = {"test": True}
assert extract_flags_from_object(func) == func.aiogram_flag
@pytest.mark.parametrize(
"obj,result",
[
[None, {}],
[{}, {}],
[{"handler": None}, {}],
[{"handler": HandlerObject(lambda: True, flags={"test": True})}, {"test": True}],
],
)
def test_extract_flags(self, obj, result):
assert extract_flags(obj) == result
@pytest.mark.parametrize(
"obj,name,default,result",
[
[None, "test", None, None],
[None, "test", 42, 42],
[{}, "test", None, None],
[{}, "test", 42, 42],
[{"handler": None}, "test", None, None],
[{"handler": None}, "test", 42, 42],
[{"handler": HandlerObject(lambda: True, flags={"test": True})}, "test", None, True],
[{"handler": HandlerObject(lambda: True, flags={"test": True})}, "test2", None, None],
[{"handler": HandlerObject(lambda: True, flags={"test": True})}, "test2", 42, 42],
],
)
def test_get_flag(self, obj, name, default, result):
assert get_flag(obj, name, default=default) == result
@pytest.mark.parametrize(
"flags,magic,result",
[
[{}, F.test, None],
[{"test": True}, F.test, True],
[{"test": True}, F.spam, None],
],
)
def test_check_flag(self, flags, magic, result):
with patch("aiogram.dispatcher.flags.getter.extract_flags", return_value=flags):
assert check_flags(object(), magic) == result

View file

@ -0,0 +1,30 @@
import pytest
from aiogram.dispatcher.fsm.storage.base import BaseEventIsolation, StorageKey
from tests.mocked_bot import MockedBot
pytestmark = pytest.mark.asyncio
@pytest.fixture(name="storage_key")
def create_storate_key(bot: MockedBot):
return StorageKey(chat_id=-42, user_id=42, bot_id=bot.id)
@pytest.mark.parametrize(
"isolation",
[
pytest.lazy_fixture("redis_isolation"),
pytest.lazy_fixture("lock_isolation"),
pytest.lazy_fixture("disabled_isolation"),
],
)
class TestIsolations:
async def test_lock(
self,
bot: MockedBot,
isolation: BaseEventIsolation,
storage_key: StorageKey,
):
async with isolation.lock(bot=bot, key=storage_key):
assert True, "You are kidding me?"

View file

@ -1,9 +1,11 @@
from typing import Literal
import pytest
from aiogram.dispatcher.fsm.storage.base import DEFAULT_DESTINY, StorageKey
from aiogram.dispatcher.fsm.storage.redis import DefaultKeyBuilder
from aiogram.dispatcher.fsm.storage.redis import (
DefaultKeyBuilder,
RedisEventIsolation,
RedisStorage,
)
pytestmark = pytest.mark.asyncio
@ -45,3 +47,11 @@ class TestRedisDefaultKeyBuilder:
)
with pytest.raises(ValueError):
key_builder.build(key, FIELD)
def test_create_isolation(self):
fake_redis = object()
storage = RedisStorage(redis=fake_redis)
isolation = storage.create_isolation()
assert isinstance(isolation, RedisEventIsolation)
assert isolation.redis is fake_redis
assert isolation.key_builder is storage.key_builder

View file

@ -16,11 +16,6 @@ def create_storate_key(bot: MockedBot):
[pytest.lazy_fixture("redis_storage"), pytest.lazy_fixture("memory_storage")],
)
class TestStorages:
async def test_lock(self, bot: MockedBot, storage: BaseStorage, storage_key: StorageKey):
# TODO: ?!?
async with storage.lock(bot=bot, key=storage_key):
assert True, "You are kidding me?"
async def test_set_state(self, bot: MockedBot, storage: BaseStorage, storage_key: StorageKey):
assert await storage.get_state(bot=bot, key=storage_key) is None

View file

@ -5,7 +5,6 @@ from aiogram.dispatcher.router import Router
from aiogram.utils.warnings import CodeHasNoEffect
pytestmark = pytest.mark.asyncio
importable_router = Router()
class TestRouter:
@ -46,14 +45,10 @@ class TestRouter:
with pytest.warns(CodeHasNoEffect):
assert router1.include_router(router2)
def test_include_router_by_string(self):
router = Router()
router.include_router("tests.test_dispatcher.test_router:importable_router")
def test_include_router_by_string_bad_type(self):
router = Router()
with pytest.raises(ValueError, match=r"router should be instance of Router"):
router.include_router("tests.test_dispatcher.test_router:TestRouter")
router.include_router(self)
def test_set_parent_router_bad_type(self):
router = Router()

View file

@ -1,4 +1,6 @@
import asyncio
import time
from asyncio import Event
from dataclasses import dataclass
from typing import Any, Dict
@ -19,6 +21,12 @@ from aiogram.methods import GetMe, Request
from aiogram.types import Message, User
from tests.mocked_bot import MockedBot
try:
from asynctest import CoroutineMock, patch
except ImportError:
from unittest.mock import AsyncMock as CoroutineMock # type: ignore
from unittest.mock import patch
class TestAiohttpServer:
def test_setup_application(self):
@ -74,8 +82,11 @@ class TestSimpleRequestHandler:
app = Application()
dp = Dispatcher()
handler_event = Event()
@dp.message(F.text == "test")
def handle_message(msg: Message):
handler_event.set()
return msg.answer("PASS")
handler = SimpleRequestHandler(
@ -97,8 +108,15 @@ class TestSimpleRequestHandler:
assert not result
handler.handle_in_background = True
resp = await self.make_reqest(client=client)
assert resp.status == 200
with patch(
"aiogram.dispatcher.dispatcher.Dispatcher.silent_call_request",
new_callable=CoroutineMock,
) as mocked_silent_call_request:
handler_event.clear()
resp = await self.make_reqest(client=client)
assert resp.status == 200
await asyncio.wait_for(handler_event.wait(), timeout=1)
mocked_silent_call_request.assert_awaited()
result = await resp.json()
assert not result

View file

@ -0,0 +1,129 @@
import asyncio
import time
from datetime import datetime
import pytest
from aiogram import Bot, flags
from aiogram.dispatcher.event.handler import HandlerObject
from aiogram.types import Chat, Message, User
from aiogram.utils.chat_action import ChatActionMiddleware, ChatActionSender
from tests.mocked_bot import MockedBot
try:
from asynctest import CoroutineMock, patch
except ImportError:
from unittest.mock import AsyncMock as CoroutineMock # type: ignore
from unittest.mock import patch
pytestmarm = pytest.mark.asyncio
class TestChatActionSender:
async def test_wait(self, bot: Bot, loop: asyncio.BaseEventLoop):
sender = ChatActionSender.typing(bot=bot, chat_id=42)
loop.call_soon(sender._close_event.set)
start = time.monotonic()
await sender._wait(1)
assert time.monotonic() - start < 1
@pytest.mark.parametrize(
"action",
[
"typing",
"upload_photo",
"record_video",
"upload_video",
"record_voice",
"upload_voice",
"upload_document",
"choose_sticker",
"find_location",
"record_video_note",
"upload_video_note",
],
)
@pytest.mark.parametrize("pass_bot", [True, False])
async def test_factory(self, action: str, bot: MockedBot, pass_bot: bool):
sender_factory = getattr(ChatActionSender, action)
sender = sender_factory(chat_id=42, bot=bot if pass_bot else None)
assert isinstance(sender, ChatActionSender)
assert sender.action == action
assert sender.chat_id == 42
assert sender.bot is bot
async def test_worker(self, bot: Bot):
with patch(
"aiogram.client.bot.Bot.send_chat_action",
new_callable=CoroutineMock,
) as mocked_send_chat_action:
async with ChatActionSender.typing(
bot=bot, chat_id=42, interval=0.01, initial_sleep=0
):
await asyncio.sleep(0.1)
assert mocked_send_chat_action.await_count > 1
mocked_send_chat_action.assert_awaited_with(action="typing", chat_id=42)
async def test_contextmanager(self, bot: MockedBot):
sender: ChatActionSender = ChatActionSender.typing(bot=bot, chat_id=42)
assert not sender.running
await sender._stop() # nothing
async with sender:
assert sender.running
assert not sender._close_event.is_set()
with pytest.raises(RuntimeError):
await sender._run()
assert not sender.running
class TestChatActionMiddleware:
@pytest.mark.parametrize(
"value",
[
None,
"sticker",
{"action": "upload_photo"},
{"interval": 1, "initial_sleep": 0.5},
],
)
async def test_call_default(self, value, bot: Bot):
async def handler(event, data):
return "OK"
if value is None:
handler1 = flags.chat_action(handler)
else:
handler1 = flags.chat_action(value)(handler)
middleware = ChatActionMiddleware()
with patch(
"aiogram.utils.chat_action.ChatActionSender._run",
new_callable=CoroutineMock,
) as mocked_run, patch(
"aiogram.utils.chat_action.ChatActionSender._stop",
new_callable=CoroutineMock,
) as mocked_stop:
data = {"handler": HandlerObject(callback=handler1), "bot": bot}
message = Message(
chat=Chat(id=42, type="private", title="Test"),
from_user=User(id=42, is_bot=False, first_name="Test"),
date=datetime.now(),
message_id=42,
)
result = await middleware(handler=handler1, event=None, data=data)
assert result == "OK"
mocked_run.assert_not_awaited()
mocked_stop.assert_not_awaited()
result = await middleware(
handler=handler1,
event=message,
data=data,
)
assert result == "OK"
mocked_run.assert_awaited()
mocked_stop.assert_awaited()

View file

@ -111,8 +111,26 @@ class TestSimpleI18nMiddleware:
middleware = SimpleI18nMiddleware(i18n=i18n)
middleware.setup(router=dp)
assert middleware not in dp.update.outer_middlewares
assert middleware in dp.message.outer_middlewares
assert middleware not in dp.update.outer_middleware
assert middleware in dp.message.outer_middleware
async def test_get_unknown_locale(self, i18n: I18n):
dp = Dispatcher()
middleware = SimpleI18nMiddleware(i18n=i18n)
middleware.setup(router=dp)
locale = await middleware.get_locale(
None,
{
"event_from_user": User(
id=42,
is_bot=False,
first_name="Test",
language_code="unknown",
)
},
)
assert locale == i18n.default_locale
@pytest.mark.asyncio

View file

@ -1,29 +0,0 @@
import pytest
import aiogram
from aiogram.utils.imports import import_module
class TestImports:
def test_bad_type(self):
with pytest.raises(ValueError, match=r"Target should be string not"):
import_module(42)
@pytest.mark.parametrize("value", ["module", "module:", ":attribute"])
def test_bad_format(self, value):
with pytest.raises(ValueError, match='must be in format "<module>:<attribute>"'):
import_module(value)
@pytest.mark.parametrize("value", ["module", "aiogram.KABOOM", "aiogram.KABOOM.TEST"])
def test_bad_value(self, value):
with pytest.raises(ValueError, match="Could not import module"):
import_module(f"{value}:attribute")
def test_has_no_attribute(self):
with pytest.raises(ValueError, match="has no attribute"):
import_module("aiogram:KABOOM")
def test_imported(self):
value = import_module("aiogram:__version__")
isinstance(value, str)
assert value == aiogram.__version__

View file

@ -2,7 +2,7 @@ from typing import Any, Dict
import pytest
from aiogram.utils.link import create_telegram_link, create_tg_link
from aiogram.utils.link import BRANCH, create_telegram_link, create_tg_link, docs_url
class TestLink:
@ -22,3 +22,12 @@ class TestLink:
)
def test_create_telegram_link(self, base: str, params: Dict[str, Any], result: str):
assert create_telegram_link(base, **params) == result
def test_fragment(self):
assert (
docs_url("test.html", fragment_="test")
== f"https://docs.aiogram.dev/en/{BRANCH}/test.html#test"
)
def test_docs(self):
assert docs_url("test.html") == f"https://docs.aiogram.dev/en/{BRANCH}/test.html"

View file

@ -47,6 +47,11 @@ class TestTextDecoration:
'<a href="tg://user?id=42">test</a>',
],
[html_decoration, MessageEntity(type="url", offset=0, length=5), "test"],
[
html_decoration,
MessageEntity(type="spoiler", offset=0, length=5),
'<span class="tg-spoiler">test</span>',
],
[
html_decoration,
MessageEntity(type="text_link", offset=0, length=5, url="https://aiogram.dev"),
@ -76,6 +81,7 @@ class TestTextDecoration:
[markdown_decoration, MessageEntity(type="bot_command", offset=0, length=5), "test"],
[markdown_decoration, MessageEntity(type="email", offset=0, length=5), "test"],
[markdown_decoration, MessageEntity(type="phone_number", offset=0, length=5), "test"],
[markdown_decoration, MessageEntity(type="spoiler", offset=0, length=5), "|test|"],
[
markdown_decoration,
MessageEntity(

View file

@ -0,0 +1,80 @@
import pytest
from aiogram.utils.web_app import (
WebAppInitData,
check_webapp_signature,
parse_webapp_init_data,
safe_parse_webapp_init_data,
)
class TestWebApp:
@pytest.mark.parametrize(
"token,case,result",
[
[
"42:TEST",
"auth_date=1650385342"
"&user=%7B%22id%22%3A42%2C%22first_name%22%3A%22Test%22%7D"
"&query_id=test"
"&hash=46d2ea5e32911ec8d30999b56247654460c0d20949b6277af519e76271182803",
True,
],
[
"42:INVALID",
"auth_date=1650385342"
"&user=%7B%22id%22%3A42%2C%22first_name%22%3A%22Test%22%7D"
"&query_id=test"
"&hash=46d2ea5e32911ec8d30999b56247654460c0d20949b6277af519e76271182803",
False,
],
[
"42:TEST",
"user=%7B%22id%22%3A42%2C%22first_name%22%3A%22Test%22%7D&query_id=test&hash=test",
False,
],
[
"42:TEST",
"user=%7B%22id%22%3A42%2C%22first_name%22%3A%22Test%22%7D&query_id=test",
False,
],
["42:TEST", "", False],
["42:TEST", "test&foo=bar=baz", False],
],
)
def test_check_webapp_signature(self, token, case, result):
assert check_webapp_signature(token, case) is result
def test_parse_web_app_init_data(self):
parsed = parse_webapp_init_data(
"auth_date=1650385342"
"&user=%7B%22id%22%3A42%2C%22first_name%22%3A%22Test%22%7D"
"&query_id=test"
"&hash=46d2ea5e32911ec8d30999b56247654460c0d20949b6277af519e76271182803",
)
assert isinstance(parsed, WebAppInitData)
assert parsed.user
assert parsed.user.first_name == "Test"
assert parsed.user.id == 42
assert parsed.query_id == "test"
assert parsed.hash == "46d2ea5e32911ec8d30999b56247654460c0d20949b6277af519e76271182803"
assert parsed.auth_date.year == 2022
def test_valid_safe_parse_webapp_init_data(self):
assert safe_parse_webapp_init_data(
"42:TEST",
"auth_date=1650385342"
"&user=%7B%22id%22%3A42%2C%22first_name%22%3A%22Test%22%7D"
"&query_id=test"
"&hash=46d2ea5e32911ec8d30999b56247654460c0d20949b6277af519e76271182803",
)
def test_invalid_safe_parse_webapp_init_data(self):
with pytest.raises(ValueError):
safe_parse_webapp_init_data(
"42:TOKEN",
"auth_date=1650385342"
"&user=%7B%22id%22%3A42%2C%22first_name%22%3A%22Test%22%7D"
"&query_id=test"
"&hash=test",
)