diff --git a/examples/custom_filter_example.py b/examples/custom_filter_example.py index 4099ac27..855af0f5 100644 --- a/examples/custom_filter_example.py +++ b/examples/custom_filter_example.py @@ -1,55 +1,41 @@ from typing import Any, Union from aiogram import Bot, Dispatcher, executor, types from aiogram.dispatcher.filters import BoundFilter -from aiogram.dispatcher.handler import ctx_data -from aiogram.dispatcher.middlewares import LifetimeControllerMiddleware API_TOKEN = "BOT_TOKEN_HERE" -fake_db = { - 0000000000: {"ban": True}, - 1111111111: {"ban": False}, - 2222222222: {"ban": True}, - 3333333333: {"ban": False}, -} +LIMIT_DATABASE_RECORDS = 6 + + +ADMIN_IDS = [ + 000000000, + 111111111, + 222222222, + 333333333, + 444444444, +] bot = Bot(token=API_TOKEN) dp = Dispatcher(bot) -class UserMiddleware(LifetimeControllerMiddleware): - skip_patterns = ["error", "update"] +class GlobalAdminFilter(BoundFilter): + """ + Check if user bot admin + """ - async def pre_process( - self, - obj: Union[types.Message, types.CallbackQuery], - data: dict, - ): + key = "global_admin" + + def __init__(self, global_admin: bool): + self.global_admin = global_admin + + async def check(self, obj: Union[types.Message, types.CallbackQuery]): user = obj.from_user - if user.id not in fake_db: - fake_db[user.id] = {"ban": False} - - data["user_in_db"] = fake_db[user.id] - - -class BanUserFilter(BoundFilter): - """ - Check if user banned in bot db - """ - - key = "banned" - - def __init__(self, banned: bool): - self.banned = banned - - async def check(self, obj: Any): - data: dict = ctx_data.get() - user_in_db = data.get("user_in_db", None) - if user_in_db: - return self.banned == user_in_db["ban"] - return False # If the user is not in the database, then he cannot be banned + if user.id in ADMIN_IDS: + return self.global_admin is True + return self.global_admin is False class IsEvenIdFilter(BoundFilter): @@ -59,18 +45,22 @@ class IsEvenIdFilter(BoundFilter): key = "is_even" - def __init__(self, is_even: bool): - if is_even is False: - raise ValueError("filter is_even cannot be False") + def __init__(self, is_even: int): + if isinstance(is_even, int): + self.is_even = is_even + else: + raise ValueError( + f"filter is_even must be a int, not {type(is_even).__name__}" + ) async def check(self, obj: Union[types.Message, types.CallbackQuery]): user_id = obj.from_user.id - return {"is_even": bool(user_id % 2 == 0)} + return bool(user_id % self.is_even == 0) class DatabaseIsNotEmptyFilter(BoundFilter): """ - Check if the database contains the required number of records + Check if the bot admins users contains the required number of records """ key = "data_in_db" @@ -84,15 +74,12 @@ class DatabaseIsNotEmptyFilter(BoundFilter): ) async def check(self, obj: Any): - return len(fake_db) >= self.data_in_db + return {"data_in_db": self.data_in_db - len(ADMIN_IDS)} -# Setup middleware -dp.middleware.setup(UserMiddleware()) - # Binding filters dp.filters_factory.bind( - BanUserFilter, + GlobalAdminFilter, exclude_event_handlers=[dp.channel_post_handlers, dp.edited_channel_post_handlers], ) dp.filters_factory.bind( @@ -102,24 +89,26 @@ dp.filters_factory.bind( dp.filters_factory.bind(DatabaseIsNotEmptyFilter) -@dp.message_handler(banned=True) -async def handler_start1(message: types.Message): - await message.answer("Congratulations, you are banned!") +@dp.message_handler(commands="is_even", is_even=2) +async def handle_even_id(message: types.Message): + await message.answer("Congratulations, your id is even!") -@dp.message_handler(data_in_db=6) -async def handler_start2(message: types.Message): - await message.answer("There are too many records in the database!") +@dp.message_handler(commands="is_even", is_even=1) +async def handle_not_even_id(message: types.Message): + await message.answer("Sorry, but your id... is not even....") -@dp.message_handler(is_even=True) -async def handler_start3(message: types.Message, is_even: bool): - if is_even: - await message.answer("Congratulations, your id is even!") - else: - await message.answer("Congratulations, your id is not even :(") +@dp.message_handler(data_in_db=LIMIT_DATABASE_RECORDS) +async def handle_full_database(message: types.Message, data_in_db: int): + await message.answer(f"Too many records in the database.\nLimit {LIMIT_DATABASE_RECORDS}, current {data_in_db}") + + +@dp.message_handler(global_admin=True) +async def handle_admins(message: types.Message): + await message.answer("Congratulations, you are global admin!") if __name__ == "__main__": allowed_updates = types.AllowedUpdates.MESSAGE | types.AllowedUpdates.CALLBACK_QUERY - executor.start_polling(dp, allowed_updates=allowed_updates) + executor.start_polling(dp, allowed_updates=allowed_updates, skip_updates=True)