[3.0] Bot API 5.1 + FSM + Utils (#525)

* Regenerate corresponding to Bot API 5.1

* Added base of FSM. Markup constructor and small refactoring

* Fix dependencies

* Fix mypy windows error

* Move StatesGroup.get_root() from meta to class

* Fixed chat and user constraints

* Update pipeline

* Remove docs pipeline

* Added GLOBAL_USER FSM strategy

* Reformat code

* Fixed Dispatcher._process_update

* Bump Bot API 5.2. Added integration with MagicFilter

* Coverage
This commit is contained in:
Alex Root Junior 2021-05-11 23:04:32 +03:00 committed by GitHub
parent a6f824a117
commit 0e72d8e65b
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
265 changed files with 2921 additions and 1324 deletions

View file

View file

@ -0,0 +1,48 @@
from abc import ABC, abstractmethod
from collections import Generator
from typing import Dict, List
from aiogram.utils.help.record import CommandRecord
class BaseHelpBackend(ABC):
@abstractmethod
def add(self, record: CommandRecord) -> None:
pass
@abstractmethod
def search(self, value: str) -> CommandRecord:
pass
@abstractmethod
def all(self) -> Generator[CommandRecord, None, None]:
pass
def __getitem__(self, item: str) -> CommandRecord:
return self.search(item)
def __iter__(self) -> Generator[CommandRecord, None, None]:
return self.all()
class MappingBackend(BaseHelpBackend):
def __init__(self, search_empty_prefix: bool = True) -> None:
self._records: List[CommandRecord] = []
self._mapping: Dict[str, CommandRecord] = {}
self.search_empty_prefix = search_empty_prefix
def search(self, value: str) -> CommandRecord:
return self._mapping[value]
def add(self, record: CommandRecord) -> None:
new_records = {}
for key in record.as_keys(with_empty_prefix=self.search_empty_prefix):
if key in self._mapping:
raise ValueError(f"Key '{key}' is already indexed")
new_records[key] = record
self._mapping.update(new_records)
self._records.append(record)
self._records.sort(key=lambda rec: (rec.priority, rec.commands[0]))
def all(self) -> Generator[CommandRecord, None, None]:
yield from self._records

View file

@ -0,0 +1,113 @@
from typing import Any, Optional, Tuple
from aiogram import Bot, Router
from aiogram.dispatcher.filters import Command, CommandObject
from aiogram.types import BotCommand, Message
from aiogram.utils.help.engine import BaseHelpBackend, MappingBackend
from aiogram.utils.help.record import DEFAULT_PREFIXES, CommandRecord
from aiogram.utils.help.render import BaseHelpRenderer, SimpleRenderer
class HelpManager:
def __init__(
self,
backend: Optional[BaseHelpBackend] = None,
renderer: Optional[BaseHelpRenderer] = None,
) -> None:
if backend is None:
backend = MappingBackend()
if renderer is None:
renderer = SimpleRenderer()
self._backend = backend
self._renderer = renderer
def add(
self,
*commands: str,
help: str,
description: Optional[str] = None,
prefix: str = DEFAULT_PREFIXES,
ignore_case: bool = False,
ignore_mention: bool = False,
priority: int = 0,
) -> CommandRecord:
record = CommandRecord(
commands=commands,
help=help,
description=description,
prefix=prefix,
ignore_case=ignore_case,
ignore_mention=ignore_mention,
priority=priority,
)
self._backend.add(record)
return record
def command(
self,
*commands: str,
help: str,
description: Optional[str] = None,
prefix: str = DEFAULT_PREFIXES,
ignore_case: bool = False,
ignore_mention: bool = False,
priority: int = 0,
) -> Command:
record = self.add(
*commands,
help=help,
description=description,
prefix=prefix,
ignore_case=ignore_case,
ignore_mention=ignore_mention,
priority=priority,
)
return record.as_filter()
def mount_help(
self,
router: Router,
*commands: str,
prefix: str = "/",
help: str = "Help",
description: str = "Show help for the commands\n"
"Also you can use '/help command' for get help for specific command",
as_reply: bool = True,
filters: Tuple[Any, ...] = (),
**kw_filters: Any,
) -> Any:
if not commands:
commands = ("help",)
help_filter = self.command(*commands, prefix=prefix, help=help, description=description)
async def handle(message: Message, command: CommandObject, **kwargs: Any) -> Any:
return await self._handle_help(
message=message, command=command, as_reply=as_reply, **kwargs
)
return router.message.register(handle, help_filter, *filters, **kw_filters)
async def _handle_help(
self,
message: Message,
bot: Bot,
command: CommandObject,
as_reply: bool = True,
**kwargs: Any,
) -> Any:
lines = self._renderer.render(backend=self._backend, command=command, **kwargs)
text = "\n".join(line or "" for line in lines)
return await bot.send_message(
chat_id=message.chat.id,
text=text,
reply_to_message_id=message.message_id if as_reply else None,
)
async def set_bot_commands(self, bot: Bot) -> bool:
return await bot.set_my_commands(
commands=[
BotCommand(command=record.commands[0], description=record.help)
for record in self._backend
if "/" in record.prefix
]
)

View file

@ -0,0 +1,33 @@
from dataclasses import dataclass
from itertools import product
from typing import Generator, Optional, Sequence
from aiogram.dispatcher.filters import Command
DEFAULT_PREFIXES = "/"
@dataclass
class CommandRecord:
commands: Sequence[str]
help: str
description: Optional[str] = None
prefix: str = DEFAULT_PREFIXES
ignore_case: bool = False
ignore_mention: bool = False
priority: int = 0
def as_filter(self) -> Command:
return Command(commands=self.commands, commands_prefix=self.prefix)
def as_keys(self, with_empty_prefix: bool = False) -> Generator[str, None, None]:
for command in self.commands:
yield command
for prefix in self.prefix:
yield f"{prefix}{command}"
def as_command(self) -> str:
return f"{self.prefix[0]}{self.commands[0]}"
def as_aliases(self) -> str:
return ", ".join(f"{p}{c}" for c, p in product(self.commands, self.prefix))

View file

@ -0,0 +1,64 @@
from abc import ABC, abstractmethod
from typing import Any, Generator, Optional
from aiogram.dispatcher.filters import CommandObject
from aiogram.utils.help.engine import BaseHelpBackend
class BaseHelpRenderer(ABC):
@abstractmethod
def render(
self, backend: BaseHelpBackend, command: CommandObject, **kwargs: Any
) -> Generator[Optional[str], None, None]:
pass
class SimpleRenderer(BaseHelpRenderer):
def __init__(
self,
help_title: str = "Commands list:",
help_footer: str = "",
aliases_line: str = "Aliases",
command_title: str = "Help for command:",
unknown_command: str = "Command not found",
):
self.help_title = help_title
self.help_footer = help_footer
self.aliases_line = aliases_line
self.command_title = command_title
self.unknown_command = unknown_command
def render_help(self, backend: BaseHelpBackend) -> Generator[Optional[str], None, None]:
yield self.help_title
for command in backend:
yield f"{command.prefix[0]}{command.commands[0]} - {command.help}"
if self.help_footer:
yield None
yield self.help_footer
def render_command_help(
self, backend: BaseHelpBackend, target: str
) -> Generator[Optional[str], None, None]:
try:
record = backend[target]
except KeyError:
yield f"{self.command_title} {target}"
yield self.unknown_command
return
yield f"{self.command_title} {record.as_command()}"
if len(record.commands) > 1 or len(record.prefix) > 1:
yield f"{self.aliases_line}: {record.as_aliases()}"
yield record.help
yield None
yield record.description
def render(
self, backend: BaseHelpBackend, command: CommandObject, **kwargs: Any
) -> Generator[Optional[str], None, None]:
if command.args:
yield from self.render_command_help(backend=backend, target=command.args)
else:
yield from self.render_help(backend=backend)

224
aiogram/utils/markup.py Normal file
View file

@ -0,0 +1,224 @@
from itertools import chain
from itertools import cycle as repeat_all
from typing import Any, Generator, Generic, Iterable, List, Optional, Type, TypeVar
from aiogram.types import InlineKeyboardButton, KeyboardButton
ButtonType = TypeVar("ButtonType", InlineKeyboardButton, KeyboardButton)
T = TypeVar("T")
MAX_WIDTH = 8
MIN_WIDTH = 1
MAX_BUTTONS = 100
class MarkupConstructor(Generic[ButtonType]):
def __init__(
self, button_type: Type[ButtonType], markup: Optional[List[List[ButtonType]]] = None
) -> None:
if not issubclass(button_type, (InlineKeyboardButton, KeyboardButton)):
raise ValueError(f"Button type {button_type} are not allowed here")
self._button_type: Type[ButtonType] = button_type
if markup:
self._validate_markup(markup)
else:
markup = []
self._markup: List[List[ButtonType]] = markup
@property
def buttons(self) -> Generator[ButtonType, None, None]:
"""
Get flatten set of all buttons
:return:
"""
yield from chain.from_iterable(self.export())
def _validate_button(self, button: ButtonType) -> bool:
"""
Check that button item has correct type
:param button:
:return:
"""
allowed = self._button_type
if not isinstance(button, allowed):
raise ValueError(
f"{button!r} should be type {allowed.__name__!r} not {type(button).__name__!r}"
)
return True
def _validate_buttons(self, *buttons: ButtonType) -> bool:
"""
Check that all passed button has correct type
:param buttons:
:return:
"""
return all(map(self._validate_button, buttons))
def _validate_row(self, row: List[ButtonType]) -> bool:
"""
Check that row of buttons are correct
Row can be only list of allowed button types and has length 0 <= n <= 8
:param row:
:return:
"""
if not isinstance(row, list):
raise ValueError(
f"Row {row!r} should be type 'List[{self._button_type.__name__}]' not type {type(row).__name__}"
)
if len(row) > MAX_WIDTH:
raise ValueError(f"Row {row!r} is too long (MAX_WIDTH={MAX_WIDTH})")
self._validate_buttons(*row)
return True
def _validate_markup(self, markup: List[List[ButtonType]]) -> bool:
"""
Check that passed markup has correct data structure
Markup is list of lists of buttons
:param markup:
:return:
"""
count = 0
if not isinstance(markup, list):
raise ValueError(
f"Markup should be type 'List[List[{self._button_type.__name__}]]' not type {type(markup).__name__!r}"
)
for row in markup:
self._validate_row(row)
count += len(row)
if count > MAX_BUTTONS:
raise ValueError(f"Too much buttons detected Max allowed count - {MAX_BUTTONS}")
return True
def _validate_size(self, size: Any) -> int:
"""
Validate that passed size is legit
:param size:
:return:
"""
if not isinstance(size, int):
raise ValueError("Only int sizes are allowed")
if size not in range(MIN_WIDTH, MAX_WIDTH + 1):
raise ValueError(f"Row size {size} are not allowed")
return size
def copy(self: "MarkupConstructor[ButtonType]") -> "MarkupConstructor[ButtonType]":
"""
Make full copy of current constructor with markup
:return:
"""
return self.__class__(self._button_type, markup=self.export())
def export(self) -> List[List[ButtonType]]:
"""
Export configured markup as list of lists of buttons
.. code-block:: python
>>> constructor = MarkupConstructor(button_type=InlineKeyboardButton)
>>> ... # Add buttons to constructor
>>> markup = InlineKeyboardMarkup(inline_keyboard=constructor.export())
:return:
"""
return self._markup.copy()
def add(self, *buttons: ButtonType) -> "MarkupConstructor[ButtonType]":
"""
Add one or many buttons to markup.
:param buttons:
:return:
"""
self._validate_buttons(*buttons)
markup = self.export()
# Try to add new buttons to the end of last row if it possible
if markup and len(markup[-1]) < MAX_WIDTH:
last_row = markup[-1]
pos = MAX_WIDTH - len(last_row)
head, buttons = buttons[:pos], buttons[pos:]
last_row.extend(head)
# Separate buttons to exclusive rows with max possible row width
while buttons:
row, buttons = buttons[:MAX_WIDTH], buttons[MAX_WIDTH:]
markup.append(list(row))
self._markup = markup
return self
def row(self, *buttons: ButtonType, width: int = MAX_WIDTH) -> "MarkupConstructor[ButtonType]":
"""
Add row to markup
When too much buttons is passed it will be separated to many rows
:param buttons:
:param width:
:return:
"""
self._validate_size(width)
self._validate_buttons(*buttons)
self._markup.extend(
list(buttons[pos : pos + width]) for pos in range(0, len(buttons), width)
)
return self
def adjust(self, *sizes: int, repeat: bool = False) -> "MarkupConstructor[ButtonType]":
"""
Adjust previously added buttons to specific row sizes.
By default when the sum of passed sizes is lower than buttons count the last
one size will be used for tail of the markup.
If repeat=True is passed - all sizes will be cycled when available more buttons count than all sizes
:param sizes:
:param repeat:
:return:
"""
if not sizes:
sizes = (MAX_WIDTH,)
validated_sizes = map(self._validate_size, sizes)
sizes_iter = repeat_all(validated_sizes) if repeat else repeat_last(validated_sizes)
size = next(sizes_iter)
markup = []
row: List[ButtonType] = []
for button in self.buttons:
if len(row) >= size:
markup.append(row)
size = next(sizes_iter)
row = []
row.append(button)
if row:
markup.append(row)
self._markup = markup
return self
def button(self, **kwargs: Any) -> "MarkupConstructor[ButtonType]":
button = self._button_type(**kwargs)
return self.add(button)
def repeat_last(items: Iterable[T]) -> Generator[T, None, None]:
items_iter = iter(items)
try:
value = next(items_iter)
except StopIteration:
return
yield value
finished = False
while True:
if not finished:
try:
value = next(items_iter)
except StopIteration:
finished = True
yield value

View file

@ -17,6 +17,14 @@ __all__ = (
)
def add_surrogates(text: str) -> bytes:
return text.encode("utf-16-le")
def remove_surrogates(text: bytes) -> str:
return text.decode("utf-16-le")
class TextDecoration(ABC):
def apply_entity(self, entity: MessageEntity, text: str) -> str:
"""
@ -57,7 +65,7 @@ class TextDecoration(ABC):
"""
result = "".join(
self._unparse_entities(
self._add_surrogates(text),
add_surrogates(text),
sorted(entities, key=lambda item: item.offset) if entities else [],
)
)
@ -78,7 +86,7 @@ class TextDecoration(ABC):
if entity.offset * 2 < offset:
continue
if entity.offset * 2 > offset:
yield self.quote(self._remove_surrogates(text[offset : entity.offset * 2]))
yield self.quote(remove_surrogates(text[offset : entity.offset * 2]))
start = entity.offset * 2
offset = entity.offset * 2 + entity.length * 2
@ -91,15 +99,7 @@ class TextDecoration(ABC):
)
if offset < length:
yield self.quote(self._remove_surrogates(text[offset:length]))
@staticmethod
def _add_surrogates(text: str) -> bytes:
return text.encode("utf-16-le")
@staticmethod
def _remove_surrogates(text: bytes) -> str:
return text.decode("utf-16-le")
yield self.quote(remove_surrogates(text[offset:length]))
@abstractmethod
def link(self, value: str, link: str) -> str: # pragma: no cover