simplifying Filters and more handlers

This commit is contained in:
Fenicu 2021-09-13 01:17:06 +03:00
parent 5073442337
commit cc8669afe9
No known key found for this signature in database
GPG key ID: 078DB80AFDF256CA

View file

@ -1,55 +1,41 @@
from typing import Any, Union
from aiogram import Bot, Dispatcher, executor, types
from aiogram.dispatcher.filters import BoundFilter
from aiogram.dispatcher.handler import ctx_data
from aiogram.dispatcher.middlewares import LifetimeControllerMiddleware
API_TOKEN = "BOT_TOKEN_HERE"
fake_db = {
0000000000: {"ban": True},
1111111111: {"ban": False},
2222222222: {"ban": True},
3333333333: {"ban": False},
}
LIMIT_DATABASE_RECORDS = 6
ADMIN_IDS = [
000000000,
111111111,
222222222,
333333333,
444444444,
]
bot = Bot(token=API_TOKEN)
dp = Dispatcher(bot)
class UserMiddleware(LifetimeControllerMiddleware):
skip_patterns = ["error", "update"]
class GlobalAdminFilter(BoundFilter):
"""
Check if user bot admin
"""
async def pre_process(
self,
obj: Union[types.Message, types.CallbackQuery],
data: dict,
):
key = "global_admin"
def __init__(self, global_admin: bool):
self.global_admin = global_admin
async def check(self, obj: Union[types.Message, types.CallbackQuery]):
user = obj.from_user
if user.id not in fake_db:
fake_db[user.id] = {"ban": False}
data["user_in_db"] = fake_db[user.id]
class BanUserFilter(BoundFilter):
"""
Check if user banned in bot db
"""
key = "banned"
def __init__(self, banned: bool):
self.banned = banned
async def check(self, obj: Any):
data: dict = ctx_data.get()
user_in_db = data.get("user_in_db", None)
if user_in_db:
return self.banned == user_in_db["ban"]
return False # If the user is not in the database, then he cannot be banned
if user.id in ADMIN_IDS:
return self.global_admin is True
return self.global_admin is False
class IsEvenIdFilter(BoundFilter):
@ -59,18 +45,22 @@ class IsEvenIdFilter(BoundFilter):
key = "is_even"
def __init__(self, is_even: bool):
if is_even is False:
raise ValueError("filter is_even cannot be False")
def __init__(self, is_even: int):
if isinstance(is_even, int):
self.is_even = is_even
else:
raise ValueError(
f"filter is_even must be a int, not {type(is_even).__name__}"
)
async def check(self, obj: Union[types.Message, types.CallbackQuery]):
user_id = obj.from_user.id
return {"is_even": bool(user_id % 2 == 0)}
return bool(user_id % self.is_even == 0)
class DatabaseIsNotEmptyFilter(BoundFilter):
"""
Check if the database contains the required number of records
Check if the bot admins users contains the required number of records
"""
key = "data_in_db"
@ -84,15 +74,12 @@ class DatabaseIsNotEmptyFilter(BoundFilter):
)
async def check(self, obj: Any):
return len(fake_db) >= self.data_in_db
return {"data_in_db": self.data_in_db - len(ADMIN_IDS)}
# Setup middleware
dp.middleware.setup(UserMiddleware())
# Binding filters
dp.filters_factory.bind(
BanUserFilter,
GlobalAdminFilter,
exclude_event_handlers=[dp.channel_post_handlers, dp.edited_channel_post_handlers],
)
dp.filters_factory.bind(
@ -102,24 +89,26 @@ dp.filters_factory.bind(
dp.filters_factory.bind(DatabaseIsNotEmptyFilter)
@dp.message_handler(banned=True)
async def handler_start1(message: types.Message):
await message.answer("Congratulations, you are banned!")
@dp.message_handler(commands="is_even", is_even=2)
async def handle_even_id(message: types.Message):
await message.answer("Congratulations, your id is even!")
@dp.message_handler(data_in_db=6)
async def handler_start2(message: types.Message):
await message.answer("There are too many records in the database!")
@dp.message_handler(commands="is_even", is_even=1)
async def handle_not_even_id(message: types.Message):
await message.answer("Sorry, but your id... is not even....")
@dp.message_handler(is_even=True)
async def handler_start3(message: types.Message, is_even: bool):
if is_even:
await message.answer("Congratulations, your id is even!")
else:
await message.answer("Congratulations, your id is not even :(")
@dp.message_handler(data_in_db=LIMIT_DATABASE_RECORDS)
async def handle_full_database(message: types.Message, data_in_db: int):
await message.answer(f"Too many records in the database.\nLimit {LIMIT_DATABASE_RECORDS}, current {data_in_db}")
@dp.message_handler(global_admin=True)
async def handle_admins(message: types.Message):
await message.answer("Congratulations, you are global admin!")
if __name__ == "__main__":
allowed_updates = types.AllowedUpdates.MESSAGE | types.AllowedUpdates.CALLBACK_QUERY
executor.start_polling(dp, allowed_updates=allowed_updates)
executor.start_polling(dp, allowed_updates=allowed_updates, skip_updates=True)