Cover chat member updated filter

This commit is contained in:
Alex Root Junior 2022-02-16 16:42:43 +02:00
parent d6f1c9552c
commit 1c9425bcf7
No known key found for this signature in database
GPG key ID: 074C1D455EBEA4AC
3 changed files with 360 additions and 9 deletions

View file

@ -1,7 +1,7 @@
from typing import Dict, Tuple, Type
from .base import BaseFilter
from .chat_member import (
from .chat_member_updated import (
ADMINISTRATOR,
CREATOR,
IS_ADMIN,
@ -14,7 +14,7 @@ from .chat_member import (
MEMBER,
PROMOTED_TRANSITION,
RESTRICTED,
ChatMemberUpdatedStatus,
ChatMemberUpdatedFilter,
)
from .command import Command, CommandObject
from .content_types import ContentTypesFilter
@ -34,7 +34,7 @@ __all__ = (
"ExceptionTypeFilter",
"StateFilter",
"MagicData",
"ChatMemberUpdatedStatus",
"ChatMemberUpdatedFilter",
"CREATOR",
"ADMINISTRATOR",
"MEMBER",
@ -112,12 +112,12 @@ BUILTIN_FILTERS: Dict[str, Tuple[Type[BaseFilter], ...]] = {
"my_chat_member": (
*_ALL_EVENTS_FILTERS,
*_TELEGRAM_EVENTS_FILTERS,
ChatMemberUpdatedStatus,
ChatMemberUpdatedFilter,
),
"chat_member": (
*_ALL_EVENTS_FILTERS,
*_TELEGRAM_EVENTS_FILTERS,
ChatMemberUpdatedStatus,
ChatMemberUpdatedFilter,
),
"chat_join_request": (
*_ALL_EVENTS_FILTERS,

View file

@ -36,6 +36,8 @@ class _MemberStatusMarker:
f"unsupported operand type(s) for |: {type(self).__name__!r} and {type(other).__name__!r}"
)
__ror__ = __or__
def __rshift__(
self, other: Union["_MemberStatusMarker", "_MemberStatusGroupMarker"]
) -> "_MemberStatusTransition":
@ -71,7 +73,9 @@ class _MemberStatusMarker:
class _MemberStatusGroupMarker:
def __init__(self, *statuses: _MemberStatusMarker) -> None:
self.statuses = set(statuses)
if not statuses:
raise ValueError("Member status group should have at least one status included")
self.statuses = frozenset(statuses)
def __or__(
self: MarkerGroupT, other: Union["_MemberStatusMarker", "_MemberStatusGroupMarker"]
@ -107,7 +111,7 @@ class _MemberStatusGroupMarker:
)
def __str__(self) -> str:
result = " | ".join(map(str, sorted(self.statuses, key=lambda s: (s.name, s.is_member))))
result = " | ".join(map(str, sorted(self.statuses, key=str)))
if len(self.statuses) != 1:
return f"({result})"
return result
@ -150,7 +154,7 @@ LEAVE_TRANSITION = ~JOIN_TRANSITION
PROMOTED_TRANSITION = (MEMBER | RESTRICTED | LEFT | KICKED) >> ADMINISTRATOR
class ChatMemberUpdatedStatus(BaseFilter):
class ChatMemberUpdatedFilter(BaseFilter):
member_status_changed: Union[
_MemberStatusMarker,
_MemberStatusGroupMarker,
@ -169,4 +173,6 @@ class ChatMemberUpdatedStatus(BaseFilter):
return rule.check(member=new)
if isinstance(rule, _MemberStatusTransition):
return rule.check(old=old, new=new)
return False
# Impossible variant in due to pydantic validation
return False # pragma: no cover

View file

@ -0,0 +1,345 @@
from datetime import datetime
import pytest
from aiogram.dispatcher.filters.chat_member_updated import (
ADMINISTRATOR,
IS_MEMBER,
JOIN_TRANSITION,
LEAVE_TRANSITION,
ChatMemberUpdatedFilter,
_MemberStatusGroupMarker,
_MemberStatusMarker,
_MemberStatusTransition,
)
from aiogram.types import Chat, ChatMember, ChatMemberUpdated, User
class TestMemberStatusMarker:
def test_str(self):
marker = _MemberStatusMarker("test")
assert str(marker) == "TEST"
assert str(+marker) == "+TEST"
assert str(-marker) == "-TEST"
def test_pos(self):
marker = _MemberStatusMarker("test")
assert marker.is_member is None
positive_marker = +marker
assert positive_marker is not marker
assert marker.is_member is None
assert positive_marker.is_member is True
def test_neg(self):
marker = _MemberStatusMarker("test")
assert marker.is_member is None
negative_marker = -marker
assert negative_marker is not marker
assert marker.is_member is None
assert negative_marker.is_member is False
def test_or(self):
marker1 = _MemberStatusMarker("test1")
marker2 = _MemberStatusMarker("test2")
combination = marker1 | marker2
assert isinstance(combination, _MemberStatusGroupMarker)
assert marker1 in combination.statuses
assert marker2 in combination.statuses
combination2 = marker1 | marker1
assert isinstance(combination2, _MemberStatusGroupMarker)
assert len(combination2.statuses) == 1
marker3 = _MemberStatusMarker("test3")
combination3 = marker3 | combination
assert isinstance(combination3, _MemberStatusGroupMarker)
assert marker3 in combination3.statuses
assert len(combination3.statuses) == 3
assert combination3 is not combination
with pytest.raises(TypeError):
marker1 | 42
def test_rshift(self):
marker1 = _MemberStatusMarker("test1")
marker2 = _MemberStatusMarker("test2")
marker3 = _MemberStatusMarker("test3")
transition = marker1 >> marker2
assert isinstance(transition, _MemberStatusTransition)
assert marker1 in transition.old.statuses
assert marker2 in transition.new.statuses
transition2 = marker1 >> (marker2 | marker3)
assert isinstance(transition2, _MemberStatusTransition)
with pytest.raises(TypeError):
marker1 >> 42
def test_lshift(self):
marker1 = _MemberStatusMarker("test1")
marker2 = _MemberStatusMarker("test2")
marker3 = _MemberStatusMarker("test3")
transition = marker1 << marker2
assert isinstance(transition, _MemberStatusTransition)
assert marker2 in transition.old.statuses
assert marker1 in transition.new.statuses
transition2 = marker1 << (marker2 | marker3)
assert isinstance(transition2, _MemberStatusTransition)
with pytest.raises(TypeError):
marker1 << 42
def test_hash(self):
marker1 = _MemberStatusMarker("test1")
marker1_1 = _MemberStatusMarker("test1")
marker2 = _MemberStatusMarker("test2")
assert hash(marker1) != hash(marker2)
assert hash(marker1) == hash(marker1_1)
assert hash(marker1) != hash(-marker1)
@pytest.mark.parametrize(
"name,is_member,member,result",
[
["test", None, ChatMember(status="member"), False],
["test", None, ChatMember(status="test"), True],
["test", True, ChatMember(status="test"), False],
["test", True, ChatMember(status="test", is_member=True), True],
["test", True, ChatMember(status="test", is_member=False), False],
],
)
def test_check(self, name, is_member, member, result):
marker = _MemberStatusMarker(name, is_member=is_member)
assert marker.check(member=member) == result
class TestMemberStatusGroupMarker:
def test_init_unique(self):
marker1 = _MemberStatusMarker("test1")
marker2 = _MemberStatusMarker("test2")
marker3 = _MemberStatusMarker("test3")
group = _MemberStatusGroupMarker(marker1, marker1, marker2, marker3)
assert len(group.statuses) == 3
def test_init_empty(self):
with pytest.raises(ValueError):
_MemberStatusGroupMarker()
def test_or(self):
marker1 = _MemberStatusMarker("test1")
marker2 = _MemberStatusMarker("test2")
marker3 = _MemberStatusMarker("test3")
marker4 = _MemberStatusMarker("test4")
group1 = _MemberStatusGroupMarker(marker1, marker2)
group2 = _MemberStatusGroupMarker(marker3, marker4)
group3 = group1 | marker3
assert isinstance(group3, _MemberStatusGroupMarker)
assert len(group3.statuses) == 3
group4 = group1 | group2
assert isinstance(group4, _MemberStatusGroupMarker)
assert len(group4.statuses) == 4
with pytest.raises(TypeError):
group4 | 42
def test_rshift(self):
marker1 = _MemberStatusMarker("test1")
marker2 = _MemberStatusMarker("test2")
marker3 = _MemberStatusMarker("test3")
group1 = _MemberStatusGroupMarker(marker1, marker2)
group2 = _MemberStatusGroupMarker(marker1, marker3)
transition1 = group1 >> marker1
assert isinstance(transition1, _MemberStatusTransition)
assert transition1.old is group1
assert marker1 in transition1.new.statuses
transition2 = group1 >> group2
assert isinstance(transition2, _MemberStatusTransition)
with pytest.raises(TypeError):
group1 >> 42
def test_lshift(self):
marker1 = _MemberStatusMarker("test1")
marker2 = _MemberStatusMarker("test2")
marker3 = _MemberStatusMarker("test3")
group1 = _MemberStatusGroupMarker(marker1, marker2)
group2 = _MemberStatusGroupMarker(marker1, marker3)
transition1 = group1 << marker1
assert isinstance(transition1, _MemberStatusTransition)
assert transition1.new is group1
assert marker1 in transition1.old.statuses
transition2 = group1 << group2
assert isinstance(transition2, _MemberStatusTransition)
with pytest.raises(TypeError):
group1 << 42
def test_str(self):
marker1 = _MemberStatusMarker("test1")
marker1_1 = +marker1
marker2 = _MemberStatusMarker("test2")
group1 = marker1 | marker1
assert str(group1) == "TEST1"
group2 = marker1 | marker2
assert str(group2) == "(TEST1 | TEST2)"
group3 = marker1 | marker1_1
assert str(group3) == "(+TEST1 | TEST1)"
@pytest.mark.parametrize(
"status,result",
[
["test", False],
["test1", True],
["test2", True],
],
)
def test_check(self, status, result):
marker1 = _MemberStatusMarker("test1")
marker2 = _MemberStatusMarker("test2")
group = marker1 | marker2
assert group.check(member=ChatMember(status=status)) is result
class TestMemberStatusTransition:
def test_invert(self):
marker1 = _MemberStatusMarker("test1")
marker2 = _MemberStatusMarker("test2")
transition1 = marker1 >> marker2
transition2 = ~transition1
assert transition1 is not transition2
assert transition1.old == transition2.new
assert transition1.new == transition2.old
assert str(transition1) == "TEST1 >> TEST2"
assert str(transition2) == "TEST2 >> TEST1"
@pytest.mark.parametrize(
"transition,old,new,result",
[
[JOIN_TRANSITION, ChatMember(status="left"), ChatMember(status="member"), True],
[
JOIN_TRANSITION,
ChatMember(status="restricted", is_member=True),
ChatMember(status="member"),
False,
],
[
JOIN_TRANSITION,
ChatMember(status="restricted", is_member=False),
ChatMember(status="member"),
True,
],
[
JOIN_TRANSITION,
ChatMember(status="member"),
ChatMember(status="restricted", is_member=False),
False,
],
[
LEAVE_TRANSITION,
ChatMember(status="member"),
ChatMember(status="restricted", is_member=False),
True,
],
],
)
def test_check(self, transition, old, new, result):
assert transition.check(old=old, new=new) == result
class TestChatMemberUpdatedStatusFilter:
@pytest.mark.asyncio
@pytest.mark.parametrize(
"transition,old,new,result",
[
[JOIN_TRANSITION, ChatMember(status="left"), ChatMember(status="member"), True],
[
JOIN_TRANSITION,
ChatMember(status="restricted", is_member=True),
ChatMember(status="member"),
False,
],
[
JOIN_TRANSITION,
ChatMember(status="restricted", is_member=False),
ChatMember(status="member"),
True,
],
[
JOIN_TRANSITION,
ChatMember(status="member"),
ChatMember(status="restricted", is_member=False),
False,
],
[
LEAVE_TRANSITION,
ChatMember(status="member"),
ChatMember(status="restricted", is_member=False),
True,
],
[
ADMINISTRATOR,
ChatMember(status="member"),
ChatMember(status="administrator"),
True,
],
[
IS_MEMBER,
ChatMember(status="restricted", is_member=False),
ChatMember(status="member"),
True,
],
],
)
async def test_call(self, transition, old, new, result):
updated_filter = ChatMemberUpdatedFilter(member_status_changed=transition)
user = User(id=42, first_name="Test", is_bot=False)
update = {
"user": user,
"until_date": datetime.now(),
"is_anonymous": False,
"can_be_edited": True,
"can_manage_chat": True,
"can_delete_messages": True,
"can_manage_voice_chats": True,
"can_restrict_members": True,
"can_promote_members": True,
"can_change_info": True,
"can_invite_users": True,
"can_post_messages": True,
"can_edit_messages": True,
"can_pin_messages": True,
"can_send_messages": True,
"can_send_media_messages": True,
"can_send_polls": True,
"can_send_other_messages": True,
"can_add_web_page_previews": True,
}
event = ChatMemberUpdated(
chat=Chat(id=42, type="test"),
from_user=user,
old_chat_member=old.copy(update=update),
new_chat_member=new.copy(update=update),
date=datetime.now(),
)
assert await updated_filter(event) is result