Add InlineButtonsPaginator

This commit is contained in:
Roman Inflianskas 2018-11-23 15:23:56 +03:00 committed by Roman Inflianskas
parent 691b66a513
commit de083df663
2 changed files with 293 additions and 4 deletions

View file

@ -1,6 +1,14 @@
from math import ceil
import typing
import uuid
from aiogram import Bot, Dispatcher
from aiogram.types import Message, InlineKeyboardButton, InlineKeyboardMarkup, CallbackQuery
from aiogram.types.page_selection_location import PageSelectionLocation
MAX_MESSAGE_LENGTH = 4096
PAGE_DEFAULT = 0
LIMIT_DEFAULT = 10
def split_text(text: str, length: int = MAX_MESSAGE_LENGTH) -> typing.List[str]:
@ -43,7 +51,7 @@ def safe_split_text(text: str, length: int = MAX_MESSAGE_LENGTH) -> typing.List[
return parts
def index_of_first_element_on_page(page: int = 0, limit: int = 10) -> int:
def index_of_first_element_on_page(page: int = PAGE_DEFAULT, limit: int = LIMIT_DEFAULT) -> int:
"""
Index of first element on page
@ -54,7 +62,7 @@ def index_of_first_element_on_page(page: int = 0, limit: int = 10) -> int:
return page * limit
def index_of_last_element_on_page(page: int = 0, limit: int = 10) -> int:
def index_of_last_element_on_page(page: int = PAGE_DEFAULT, limit: int = LIMIT_DEFAULT) -> int:
"""
Index of last element on page
@ -62,10 +70,10 @@ def index_of_last_element_on_page(page: int = 0, limit: int = 10) -> int:
:param limit: int items per page (default: 10)
:return: int index of last element on page
"""
return index_of_last_element_on_page(page=page, limit=limit) + limit
return index_of_first_element_on_page(page=page, limit=limit) + limit
def paginate(data: typing.Iterable, page: int = 0, limit: int = 10) -> typing.Iterable:
def paginate(data: typing.Iterable, page: int = PAGE_DEFAULT, limit: int = LIMIT_DEFAULT) -> typing.Iterable:
"""
Slice data over pages
@ -80,3 +88,149 @@ def paginate(data: typing.Iterable, page: int = 0, limit: int = 10) -> typing.It
"""
return data[index_of_first_element_on_page(page=page, limit=limit):
index_of_last_element_on_page(page=page, limit=limit)]
class InlineButtonsPaginator:
BUTTONS_TEXT_CORO = typing.Optional[typing.Callable[[int, int], typing.Awaitable[typing.List[str]]]]
INLINE_KEYBOARD_BUTTON_GENERATOR = typing.AsyncGenerator[InlineKeyboardButton, None]
def __init__(self, bot: Bot, dispatcher: Dispatcher, chat_id: int,
text: typing.Union[str, typing.Callable[[int], str]],
buttons: typing.Callable[[int], typing.AsyncGenerator[InlineKeyboardButton, None]],
first_page_button_text: str = '', last_page_button_text: str = '',
previous_page_button_text: str = '◀️', next_page_button_text: str = '▶️',
page_selection_location: PageSelectionLocation = PageSelectionLocation.BOTTOM,
send_message_args: typing.Tuple = (),
send_message_kwargs: typing.Optional[typing.Mapping[str, typing.Any]] = None,
edit_message_args: typing.Tuple = (),
edit_message_kwargs: typing.Optional[typing.Mapping[str, typing.Any]] = None):
"""
Construct InlineButtonsPaginator
:param bot: Bot bot object
:param dispatcher: Dispatcher dispatcher object
:param chat_id: int chat id
:param text: Union[str, Callable[[int], str]] text of messages or function, that accepts page index and \
returns text of messages
:param buttons: Callable[[int], AsyncGenerator[InlineKeyboardButton, None]] coroutine, that accepts index \
of page and asynchronously yields inline buttons
:param first_page_button_text: str first page button text (default: '')
:param last_page_button_text: str last page button text (default: '')
:param previous_page_button_text: str previous page button text (default: '')
:param next_page_button_text: str next page button text (default: '▶️')
:param page_selection_location: PageSelectionLocation (default: PageSelectionLocation.BOTTOM)
:param send_message_args: Tuple positional arguments to send_message (default: ())
:param send_message_kwargs: Mapping keyword arguments to send_message (default: None)
:param edit_message_args: Tuple positional arguments to edit_message (default: ())
:param edit_message_kwargs: Mapping keyword arguments to edit_message (default: None)
"""
self.bot = bot
self.dispatcher = dispatcher
self.chat_id = chat_id
self.text = text
self.buttons = buttons
self.first_page_button_text = first_page_button_text
self.last_page_button_text = last_page_button_text
self.previous_page_button_text = previous_page_button_text
self.next_page_button_text = next_page_button_text
self.page_selection_location = page_selection_location
self.send_message_args = send_message_args
self.send_message_kwargs = send_message_kwargs or {}
self.edit_message_args = edit_message_args
self.edit_message_kwargs = edit_message_kwargs or {}
self._page_count_uuid = str(uuid.uuid4())
self._page_selection_callback_data_uuid = str(uuid.uuid4())
self.dispatcher.register_callback_query_handler(callback=self._page_selection_button_callback,
func=self._callback_query_func,
state='*')
def _callback_query_func(self, callback_query: CallbackQuery) -> bool:
return callback_query.data.startswith(self._page_selection_callback_data_uuid)
async def _page_selection_button_callback(self, callback_query: CallbackQuery) -> None:
page = int(callback_query.data.replace(self._page_selection_callback_data_uuid, ''))
await self.send_buttons_message(page=page)
@staticmethod
async def buttons_helper(page: int, limit: int,
callback_data_func: typing.Callable[[typing.Optional[int], typing.Optional[str]], str],
callback_data_prefix: str, buttons_texts: typing.Optional[typing.Collection[str]] = None,
buttons_texts_coro: BUTTONS_TEXT_CORO = None) -> INLINE_KEYBOARD_BUTTON_GENERATOR:
"""
Buttons generator helper
:param page: int index of page
:param limit: int button count on page
:param callback_data_func: Callable[[typing.Optional[int], typing.Optional[str]], str] function, that accepts \
button index or button text and returns button callback data
:param callback_data_prefix: str prefix of callback data
:param buttons_texts: Optional[Collection[str]] buttons texts (default: None)
:param buttons_texts_coro: Optional[Callable[[int, int], Awaitable[List[str]]]] buttons texts coroutine, that \
accepts page and limit and returns list of buttons texts (default: None)
"""
if buttons_texts is None and buttons_texts_coro is None:
raise ValueError("You should specify either buttons_texts or buttons_texts_coro")
elif buttons_texts is not None and buttons_texts_coro is not None:
raise ValueError("You should specify only buttons_texts or buttons_texts_coro")
if buttons_texts is not None:
buttons_texts = paginate(data=buttons_texts, page=page, limit=limit)
else:
buttons_texts = await buttons_texts_coro(page, limit)
if not buttons_texts:
return
for i, button_text in zip(range(index_of_first_element_on_page(page=page, limit=limit),
index_of_last_element_on_page(page=page, limit=limit)),
buttons_texts):
callback_data = f'{callback_data_prefix}{callback_data_func(i, button_text)}'
yield InlineKeyboardButton(text=button_text, callback_data=callback_data)
async def send_buttons_message(self, page: int = 0,
page_count: typing.Optional[int] = None) -> typing.Optional[Message]:
"""
:param page: int page index (default: 0)
:param page_count: Optional[int] page count (default: None)
"""
data = await self.dispatcher.storage.get_data(chat=self.chat_id)
if page_count is None:
page_count = data.get(self._page_count_uuid)
if page_count is None:
raise ValueError("page_count was never passed as argument")
else:
data[self._page_count_uuid] = page_count
text = self.text if isinstance(self.text, str) else self.text(page)
page_selection_buttons = []
if page > 0:
first_page_previous_page_buttons = [
InlineKeyboardButton(text=self.first_page_button_text,
callback_data=f'{self._page_selection_callback_data_uuid}0'),
InlineKeyboardButton(text=self.previous_page_button_text,
callback_data=f'{self._page_selection_callback_data_uuid}{page - 1}')
]
page_selection_buttons += first_page_previous_page_buttons
if page < page_count - 1:
first_page_previous_page_buttons = [
InlineKeyboardButton(text=self.next_page_button_text,
callback_data=f'{self._page_selection_callback_data_uuid}{page + 1}'),
InlineKeyboardButton(text=self.last_page_button_text,
callback_data=f'{self._page_selection_callback_data_uuid}{page_count - 1}'),
]
page_selection_buttons += first_page_previous_page_buttons
keyboard = InlineKeyboardMarkup()
if self.page_selection_location in [PageSelectionLocation.TOP, PageSelectionLocation.TOP_AND_BOTTOM]:
keyboard.row(*page_selection_buttons)
async for button in self.buttons(page):
keyboard.add(button)
if self.page_selection_location in [PageSelectionLocation.BOTTOM, PageSelectionLocation.TOP_AND_BOTTOM]:
keyboard.row(*page_selection_buttons)
message_id = data.get(self._page_selection_callback_data_uuid)
if message_id is not None:
self.edit_message_kwargs.update(chat_id=self.chat_id, text=text, reply_markup=keyboard)
message = await self.bot.edit_message_text(message_id=message_id,
*self.edit_message_args,
**self.edit_message_kwargs)
else:
self.send_message_kwargs.update(chat_id=self.chat_id, text=text, reply_markup=keyboard)
message = await self.bot.send_message(*self.send_message_args, **self.send_message_kwargs)
data[self._page_selection_callback_data_uuid] = message.message_id
await self.dispatcher.storage.set_data(chat=self.chat_id, data=data)
return message

View file

@ -0,0 +1,135 @@
#!/usr/bin/env python
"""Bot to demonstrate inline buttons pagination functionality
Usage:
./inline_buttons_pagination.py [<token>]
Available commands are:
/google QUERY
/numbers START FINISH
"""
from math import ceil
from typing import Optional, Collection, AsyncGenerator
import asyncio
import logging
import sys
import uuid
from aiogram import Bot, types
from aiogram.contrib.fsm_storage.memory import MemoryStorage
from aiogram.dispatcher import Dispatcher
from aiogram.types import Message, CallbackQuery, InlineKeyboardButton
from aiogram.utils.executor import start_polling
from aiogram.utils.parts import InlineButtonsPaginator
logging.basicConfig(level=logging.INFO)
failed_to_import_google_search_api_library_message = (
"Failed to import Google-Search-API library, /google command will not work.\n"
"You can install needed libraries using following commands:\n"
"pip install fake_useragent future selenium unidecode\n"
"pip install git+https://github.com/rominf/Google-Search-API")
try:
from google.google import search as google_search
except ImportError:
google = None
logging.warning(failed_to_import_google_search_api_library_message)
API_TOKEN = sys.argv[1] if len(sys.argv) > 1 else 'BOT TOKEN HERE'
loop = asyncio.get_event_loop()
bot = Bot(token=API_TOKEN, loop=loop)
storage = MemoryStorage()
dp = Dispatcher(bot, storage=storage)
async def google(chat_id: int, query: str, reply_to_message_id: int, page_count: int = 100) -> Message:
def text(page: int) -> str:
return f"Search results (page {page + 1}/{page_count}):"
async def buttons(page: int) -> AsyncGenerator[InlineKeyboardButton, None]:
search_results = google_search(query=query, first_page=page)
for search_result in search_results:
url = search_result.link
if url is not None:
# A workaround for https://github.com/abenassi/Google-Search-API/issues/71
name = search_result.name.replace(url, '')
yield InlineKeyboardButton(text=name, url=url)
send_message_kwargs = dict(reply_to_message_id=reply_to_message_id)
buttons_paginator = InlineButtonsPaginator(bot=bot,
dispatcher=dp,
chat_id=chat_id,
text=text,
buttons=buttons,
send_message_kwargs=send_message_kwargs)
return await buttons_paginator.send_buttons_message(page_count=page_count)
@dp.message_handler(commands=['google'])
async def process_google_command(message: types.Message):
if google is None:
await message.reply(text=failed_to_import_google_search_api_library_message)
else:
query = message.get_args()
await google(chat_id=message.chat.id, query=query, reply_to_message_id=message.message_id)
async def numbers(chat_id: int, buttons_texts: Collection[str], reply_to_message_id: int, limit: int = 10) -> Message:
def callback_data_func(button_index: Optional[int], button_text: Optional[str]) -> str:
_ = button_index
return button_text
def text(page: int) -> str:
return f"Buttons (page {page + 1}/{page_count}):"
async def button_callback_handler(callback_query: CallbackQuery) -> None:
await callback_query.answer()
button_text = callback_query.data.replace(button_callback_data_prefix, '')
await callback_query.message.reply(text=f"You have pressed {button_text}")
async def buttons(page: int) -> AsyncGenerator[InlineKeyboardButton, None]:
async for button in buttons_paginator.buttons_helper(page=page,
limit=limit,
callback_data_func=callback_data_func,
callback_data_prefix=button_callback_data_prefix,
buttons_texts=buttons_texts):
yield button
page_count = ceil(len(buttons_texts) / limit)
button_callback_data_prefix = str(uuid.uuid4())
send_message_kwargs = dict(reply_to_message_id=reply_to_message_id)
dp.register_callback_query_handler(
callback=button_callback_handler,
func=lambda callback_query: callback_query.data.startswith(button_callback_data_prefix),
state='*')
buttons_paginator = InlineButtonsPaginator(bot=bot,
dispatcher=dp,
chat_id=chat_id,
text=text,
buttons=buttons,
send_message_kwargs=send_message_kwargs)
return await buttons_paginator.send_buttons_message(page_count=page_count)
@dp.message_handler(commands=['numbers'])
async def process_numbers_command(message: types.Message):
start, finish = (int(x) for x in message.get_args().split())
await numbers(chat_id=message.chat.id,
buttons_texts=[str(i) for i in range(start, finish)],
reply_to_message_id=message.message_id)
@dp.message_handler(commands=['start'])
async def process_start_command(message: types.Message):
await message.reply(text="Hi.\nThis bot demonstrates inline buttons pagination functionality.\n"
"Available commands are:\n"
"/google QUERY\n"
"/numbers START FINISH")
if __name__ == '__main__':
start_polling(dp, loop=loop, skip_updates=True)