added limits on all types

This commit is contained in:
uran-content 2025-07-28 16:19:46 +03:00
parent 132fc5fd22
commit 66ef51d239
2 changed files with 244 additions and 90 deletions

View file

@ -244,7 +244,7 @@ from ..types import (
from .default import Default, DefaultBotProperties
from .session.aiohttp import AiohttpSession
from .session.base import BaseSession
from .limiter import PrioritySlidingWindowLimiter
from .limiter import TelegramRateLimiter, ChatType, DefaultLimiter
T = TypeVar("T")
MESSAGE_MODIFYING_CLASSES = {
@ -295,7 +295,7 @@ class Bot:
default = DefaultBotProperties(max_sends_per_secods=29)
self.session = session
self.limiter = PrioritySlidingWindowLimiter(max_calls=default.max_sends_per_secods, period=1.0)
self.limiter = TelegramRateLimiter()
# Few arguments are completely removed in 3.7.0 version
# Temporary solution to raise an error if user passed these arguments
@ -493,16 +493,46 @@ class Bot:
async def __call__(
self, method: TelegramMethod[T], request_timeout: Optional[int] = None,
low_priority: bool = False
is_broadcast: bool = False, chat_id: ChatIdUnion = None
) -> T:
"""
Call API method
:param method:
:param request_timeout:
:param is_broadcast:
:param chat_id:
:return:
"""
if method.__class__.__name__ in MESSAGE_MODIFYING_CLASSES:
await self.limiter.wait(low_priority)
# Определяем chat_id из метода, если не передан
if chat_id is None:
# Пытаемся получить chat_id из метода
if hasattr(method, 'chat_id'):
chat_id = method.chat_id
elif hasattr(method, 'chat') and hasattr(method.chat, 'id'):
chat_id = method.chat.id
else:
# Если не можем определить chat_id, используем заглушку
chat_id = "unknown"
# Определяем тип чата
if isinstance(chat_id, int) or (isinstance(chat_id, str) and chat_id.lstrip('-').isdigit()):
if str(chat_id).startswith("-"):
# Проверяем, является ли это супергруппой или каналом
chat_id_str = str(chat_id)
if chat_id_str.startswith("-100"):
chat_type = ChatType.CHANNEL # Каналы обычно начинаются с -100
else:
chat_type = ChatType.GROUP
else:
chat_type = ChatType.PRIVATE
else:
# Для строковых ID (например, @username) считаем каналами
chat_type = ChatType.CHANNEL
await self.limiter.wait(chat_id=str(chat_id), chat_type=chat_type, is_broadcast=is_broadcast)
return await self.session(self, method, timeout=request_timeout)
def __hash__(self) -> int:
@ -2180,7 +2210,7 @@ class Bot:
allow_sending_without_reply: Optional[bool] = None,
reply_to_message_id: Optional[int] = None,
request_timeout: Optional[int] = None,
low_priority: bool = False
is_broadcast: bool = False
) -> Message:
"""
Use this method to send animation files (GIF or H.264/MPEG-4 AVC video without sound). On success, the sent :class:`aiogram.types.message.Message` is returned. Bots can currently send animation files of up to 50 MB in size, this limit may be changed in the future.
@ -2235,7 +2265,7 @@ class Bot:
allow_sending_without_reply=allow_sending_without_reply,
reply_to_message_id=reply_to_message_id,
)
return await self(call, request_timeout=request_timeout, low_priority=low_priority)
return await self(call, request_timeout=request_timeout, is_broadcast=is_broadcast, chat_id=chat_id)
async def send_audio(
self,
@ -2259,7 +2289,7 @@ class Bot:
allow_sending_without_reply: Optional[bool] = None,
reply_to_message_id: Optional[int] = None,
request_timeout: Optional[int] = None,
low_priority: bool = False
is_broadcast: bool = False
) -> Message:
"""
Use this method to send audio files, if you want Telegram clients to display them in the music player. Your audio must be in the .MP3 or .M4A format. On success, the sent :class:`aiogram.types.message.Message` is returned. Bots can currently send audio files of up to 50 MB in size, this limit may be changed in the future.
@ -2311,7 +2341,7 @@ class Bot:
allow_sending_without_reply=allow_sending_without_reply,
reply_to_message_id=reply_to_message_id,
)
return await self(call, request_timeout=request_timeout, low_priority=low_priority)
return await self(call, request_timeout=request_timeout, is_broadcast=is_broadcast, chat_id=chat_id)
async def send_chat_action(
self,
@ -2364,7 +2394,7 @@ class Bot:
allow_sending_without_reply: Optional[bool] = None,
reply_to_message_id: Optional[int] = None,
request_timeout: Optional[int] = None,
low_priority: bool = False
is_broadcast: bool = False
) -> Message:
"""
Use this method to send phone contacts. On success, the sent :class:`aiogram.types.message.Message` is returned.
@ -2407,7 +2437,7 @@ class Bot:
allow_sending_without_reply=allow_sending_without_reply,
reply_to_message_id=reply_to_message_id,
)
return await self(call, request_timeout=request_timeout, low_priority=low_priority)
return await self(call, request_timeout=request_timeout, is_broadcast=is_broadcast, chat_id=chat_id)
async def send_dice(
self,
@ -2424,7 +2454,7 @@ class Bot:
allow_sending_without_reply: Optional[bool] = None,
reply_to_message_id: Optional[int] = None,
request_timeout: Optional[int] = None,
low_priority: bool = False
is_broadcast: bool = False
) -> Message:
"""
Use this method to send an animated emoji that will display a random value. On success, the sent :class:`aiogram.types.message.Message` is returned.
@ -2461,7 +2491,7 @@ class Bot:
allow_sending_without_reply=allow_sending_without_reply,
reply_to_message_id=reply_to_message_id,
)
return await self(call, request_timeout=request_timeout, low_priority=low_priority)
return await self(call, request_timeout=request_timeout, is_broadcast=is_broadcast, chat_id=chat_id)
async def send_document(
self,
@ -2483,7 +2513,7 @@ class Bot:
allow_sending_without_reply: Optional[bool] = None,
reply_to_message_id: Optional[int] = None,
request_timeout: Optional[int] = None,
low_priority: bool = False
is_broadcast: bool = False
) -> Message:
"""
Use this method to send general files. On success, the sent :class:`aiogram.types.message.Message` is returned. Bots can currently send files of any type of up to 50 MB in size, this limit may be changed in the future.
@ -2530,7 +2560,7 @@ class Bot:
allow_sending_without_reply=allow_sending_without_reply,
reply_to_message_id=reply_to_message_id,
)
return await self(call, request_timeout=request_timeout, low_priority=low_priority)
return await self(call, request_timeout=request_timeout, is_broadcast=is_broadcast, chat_id=chat_id)
async def send_game(
self,
@ -2547,7 +2577,7 @@ class Bot:
allow_sending_without_reply: Optional[bool] = None,
reply_to_message_id: Optional[int] = None,
request_timeout: Optional[int] = None,
low_priority: bool = False
is_broadcast: bool = False
) -> Message:
"""
Use this method to send a game. On success, the sent :class:`aiogram.types.message.Message` is returned.
@ -2584,7 +2614,7 @@ class Bot:
allow_sending_without_reply=allow_sending_without_reply,
reply_to_message_id=reply_to_message_id,
)
return await self(call, request_timeout=request_timeout, low_priority=low_priority)
return await self(call, request_timeout=request_timeout, is_broadcast=is_broadcast, chat_id=chat_id)
async def send_invoice(
self,
@ -2620,7 +2650,7 @@ class Bot:
allow_sending_without_reply: Optional[bool] = None,
reply_to_message_id: Optional[int] = None,
request_timeout: Optional[int] = None,
low_priority: bool = False
is_broadcast: bool = False
) -> Message:
"""
Use this method to send invoices. On success, the sent :class:`aiogram.types.message.Message` is returned.
@ -2695,7 +2725,7 @@ class Bot:
allow_sending_without_reply=allow_sending_without_reply,
reply_to_message_id=reply_to_message_id,
)
return await self(call, request_timeout=request_timeout, low_priority=low_priority)
return await self(call, request_timeout=request_timeout, is_broadcast=is_broadcast, chat_id=chat_id)
async def send_location(
self,
@ -2717,7 +2747,7 @@ class Bot:
allow_sending_without_reply: Optional[bool] = None,
reply_to_message_id: Optional[int] = None,
request_timeout: Optional[int] = None,
low_priority: bool = False
is_broadcast: bool = False
) -> Message:
"""
Use this method to send point on the map. On success, the sent :class:`aiogram.types.message.Message` is returned.
@ -2763,9 +2793,8 @@ class Bot:
reply_markup=reply_markup,
allow_sending_without_reply=allow_sending_without_reply,
reply_to_message_id=reply_to_message_id,
low_priority=low_priority
)
return await self(call, request_timeout=request_timeout)
return await self(call, request_timeout=request_timeout, is_broadcast=is_broadcast, chat_id=chat_id)
async def send_media_group(
self,
@ -2781,7 +2810,7 @@ class Bot:
allow_sending_without_reply: Optional[bool] = None,
reply_to_message_id: Optional[int] = None,
request_timeout: Optional[int] = None,
low_priority: bool = False
is_broadcast: bool = False
) -> list[Message]:
"""
Use this method to send a group of photos, videos, documents or audios as an album. Documents and audio files can be only grouped in an album with messages of the same type. On success, an array of `Messages <https://core.telegram.org/bots/api#message>`_ that were sent is returned.
@ -2816,7 +2845,7 @@ class Bot:
allow_sending_without_reply=allow_sending_without_reply,
reply_to_message_id=reply_to_message_id,
)
return await self(call, request_timeout=request_timeout, low_priority=low_priority)
return await self(call, request_timeout=request_timeout, is_broadcast=is_broadcast, chat_id=chat_id)
async def send_message(
self,
@ -2841,7 +2870,7 @@ class Bot:
),
reply_to_message_id: Optional[int] = None,
request_timeout: Optional[int] = None,
low_priority: bool = False
is_broadcast: bool = False
) -> Message:
"""
Use this method to send text messages. On success, the sent :class:`aiogram.types.message.Message` is returned.
@ -2886,7 +2915,7 @@ class Bot:
disable_web_page_preview=disable_web_page_preview,
reply_to_message_id=reply_to_message_id,
)
return await self(call, request_timeout=request_timeout, low_priority=low_priority)
return await self(call, request_timeout=request_timeout, is_broadcast=is_broadcast, chat_id=chat_id)
async def send_photo(
self,
@ -2910,7 +2939,7 @@ class Bot:
allow_sending_without_reply: Optional[bool] = None,
reply_to_message_id: Optional[int] = None,
request_timeout: Optional[int] = None,
low_priority: bool = False
is_broadcast: bool = False
) -> Message:
"""
Use this method to send photos. On success, the sent :class:`aiogram.types.message.Message` is returned.
@ -2957,7 +2986,7 @@ class Bot:
allow_sending_without_reply=allow_sending_without_reply,
reply_to_message_id=reply_to_message_id,
)
return await self(call, request_timeout=request_timeout, low_priority=low_priority)
return await self(call, request_timeout=request_timeout, is_broadcast=is_broadcast, chat_id=chat_id)
async def send_poll(
self,
@ -2987,7 +3016,7 @@ class Bot:
allow_sending_without_reply: Optional[bool] = None,
reply_to_message_id: Optional[int] = None,
request_timeout: Optional[int] = None,
low_priority: bool = False
is_broadcast: bool = False
) -> Message:
"""
Use this method to send a native poll. On success, the sent :class:`aiogram.types.message.Message` is returned.
@ -3050,7 +3079,7 @@ class Bot:
allow_sending_without_reply=allow_sending_without_reply,
reply_to_message_id=reply_to_message_id,
)
return await self(call, request_timeout=request_timeout, low_priority=low_priority)
return await self(call, request_timeout=request_timeout, is_broadcast=is_broadcast, chat_id=chat_id)
async def send_sticker(
self,
@ -3068,7 +3097,7 @@ class Bot:
allow_sending_without_reply: Optional[bool] = None,
reply_to_message_id: Optional[int] = None,
request_timeout: Optional[int] = None,
low_priority: bool = False
is_broadcast: bool = False
) -> Message:
"""
Use this method to send static .WEBP, `animated <https://telegram.org/blog/animated-stickers>`_ .TGS, or `video <https://telegram.org/blog/video-stickers-better-reactions>`_ .WEBM stickers. On success, the sent :class:`aiogram.types.message.Message` is returned.
@ -3107,7 +3136,7 @@ class Bot:
allow_sending_without_reply=allow_sending_without_reply,
reply_to_message_id=reply_to_message_id,
)
return await self(call, request_timeout=request_timeout, low_priority=low_priority)
return await self(call, request_timeout=request_timeout, is_broadcast=is_broadcast, chat_id=chat_id)
async def send_venue(
self,
@ -3131,7 +3160,7 @@ class Bot:
allow_sending_without_reply: Optional[bool] = None,
reply_to_message_id: Optional[int] = None,
request_timeout: Optional[int] = None,
low_priority: bool = False
is_broadcast: bool = False
) -> Message:
"""
Use this method to send information about a venue. On success, the sent :class:`aiogram.types.message.Message` is returned.
@ -3182,7 +3211,7 @@ class Bot:
allow_sending_without_reply=allow_sending_without_reply,
reply_to_message_id=reply_to_message_id,
)
return await self(call, request_timeout=request_timeout, low_priority=low_priority)
return await self(call, request_timeout=request_timeout, is_broadcast=is_broadcast, chat_id=chat_id)
async def send_video(
self,
@ -3213,7 +3242,7 @@ class Bot:
allow_sending_without_reply: Optional[bool] = None,
reply_to_message_id: Optional[int] = None,
request_timeout: Optional[int] = None,
low_priority: bool = False
is_broadcast: bool = False
) -> Message:
"""
Use this method to send video files, Telegram clients support MPEG4 videos (other formats may be sent as :class:`aiogram.types.document.Document`). On success, the sent :class:`aiogram.types.message.Message` is returned. Bots can currently send video files of up to 50 MB in size, this limit may be changed in the future.
@ -3274,7 +3303,7 @@ class Bot:
allow_sending_without_reply=allow_sending_without_reply,
reply_to_message_id=reply_to_message_id,
)
return await self(call, request_timeout=request_timeout, low_priority=low_priority)
return await self(call, request_timeout=request_timeout, is_broadcast=is_broadcast, chat_id=chat_id)
async def send_video_note(
self,
@ -3294,7 +3323,7 @@ class Bot:
allow_sending_without_reply: Optional[bool] = None,
reply_to_message_id: Optional[int] = None,
request_timeout: Optional[int] = None,
low_priority: bool = False
is_broadcast: bool = False
) -> Message:
"""
As of `v.4.0 <https://telegram.org/blog/video-messages-and-telescope>`_, Telegram clients support rounded square MPEG4 videos of up to 1 minute long. Use this method to send video messages. On success, the sent :class:`aiogram.types.message.Message` is returned.
@ -3337,7 +3366,7 @@ class Bot:
allow_sending_without_reply=allow_sending_without_reply,
reply_to_message_id=reply_to_message_id,
)
return await self(call, request_timeout=request_timeout, low_priority=low_priority)
return await self(call, request_timeout=request_timeout, is_broadcast=is_broadcast, chat_id=chat_id)
async def send_voice(
self,
@ -3358,7 +3387,7 @@ class Bot:
allow_sending_without_reply: Optional[bool] = None,
reply_to_message_id: Optional[int] = None,
request_timeout: Optional[int] = None,
low_priority: bool = False
is_broadcast: bool = False
) -> Message:
"""
Use this method to send audio files, if you want Telegram clients to display the file as a playable voice message. For this to work, your audio must be in an .OGG file encoded with OPUS, or in .MP3 format, or in .M4A format (other formats may be sent as :class:`aiogram.types.audio.Audio` or :class:`aiogram.types.document.Document`). On success, the sent :class:`aiogram.types.message.Message` is returned. Bots can currently send voice messages of up to 50 MB in size, this limit may be changed in the future.
@ -3403,7 +3432,7 @@ class Bot:
allow_sending_without_reply=allow_sending_without_reply,
reply_to_message_id=reply_to_message_id,
)
return await self(call, request_timeout=request_timeout, low_priority=low_priority)
return await self(call, request_timeout=request_timeout, is_broadcast=is_broadcast, chat_id=chat_id)
async def set_chat_administrator_custom_title(
self,
@ -4635,7 +4664,7 @@ class Bot:
reply_parameters: Optional[ReplyParameters] = None,
reply_markup: Optional[ReplyMarkupUnion] = None,
request_timeout: Optional[int] = None,
low_priority: bool = False
is_broadcast: bool = False
) -> Message:
"""
Use this method to send paid media. On success, the sent :class:`aiogram.types.message.Message` is returned.
@ -4676,7 +4705,7 @@ class Bot:
reply_parameters=reply_parameters,
reply_markup=reply_markup,
)
return await self(call, request_timeout=request_timeout, low_priority=low_priority)
return await self(call, request_timeout=request_timeout, is_broadcast=is_broadcast, chat_id=chat_id)
async def create_chat_subscription_invite_link(
self,
@ -4820,7 +4849,7 @@ class Bot:
text_parse_mode: Optional[str] = None,
text_entities: Optional[list[MessageEntity]] = None,
request_timeout: Optional[int] = None,
low_priority: bool = False
is_broadcast: bool = False
) -> bool:
"""
Sends a gift to the given user or channel chat. The gift can't be converted to Telegram Stars by the receiver. Returns :code:`True` on success.
@ -4847,7 +4876,7 @@ class Bot:
text_parse_mode=text_parse_mode,
text_entities=text_entities,
)
return await self(call, request_timeout=request_timeout, low_priority=low_priority)
return await self(call, request_timeout=request_timeout, is_broadcast=is_broadcast, chat_id=chat_id)
async def set_user_emoji_status(
self,
@ -5524,7 +5553,7 @@ class Bot:
reply_parameters: Optional[ReplyParameters] = None,
reply_markup: Optional[InlineKeyboardMarkup] = None,
request_timeout: Optional[int] = None,
low_priority: bool = False
is_broadcast: bool = False
) -> Message:
"""
Use this method to send a checklist on behalf of a connected business account. On success, the sent :class:`aiogram.types.message.Message` is returned.
@ -5553,4 +5582,4 @@ class Bot:
reply_parameters=reply_parameters,
reply_markup=reply_markup,
)
return await self(call, request_timeout=request_timeout, low_priority=low_priority)
return await self(call, request_timeout=request_timeout, is_broadcast=is_broadcast, chat_id=chat_id)

View file

@ -1,25 +1,57 @@
from typing import Optional
from typing import Optional, Dict
from pydantic import BaseModel
from enum import Enum
import asyncio
import time
import random
from collections import deque
class PrioritySlidingWindowLimiter:
class ChatType(Enum):
PRIVATE = "private"
GROUP = "group"
CHANNEL = "channel"
class DefaultLimiter(BaseModel):
PER_CHAT_LIMIT: int = 1
PER_CHAT_PERIOD: float = 1.0
GROUP_LIMIT: int = 20
GROUP_PERIOD: float = 60.0
BROADCAST_LIMIT: int = 30
BROADCAST_PERIOD: float = 1.0
class TelegramRateLimiter:
"""
Ограничитель частоты запросов с поддержкой приоритетов.
Активируется только при наличии задач в очереди.
Ограничитель частоты запросов для Telegram ботов с поддержкой разных типов чатов.
Поддерживает три типа лимитов:
1. В одном чате - не более 1 сообщения в секунду
2. В группе - не более 20 сообщений в минуту
3. Для массовых уведомлений - не более 30 сообщений в секунду
"""
def __init__(self, max_calls: int, period: float):
self.max_calls = max_calls
self.period = period
def __init__(self, settings: DefaultLimiter = None):
if settings is None:
settings = DefaultLimiter()
# Лимиты для Telegram
self.PER_CHAT_LIMIT = settings.PER_CHAT_LIMIT
self.PER_CHAT_PERIOD = settings.PER_CHAT_PERIOD
# Очереди для запросов
self.GROUP_LIMIT = settings.GROUP_LIMIT
self.GROUP_PERIOD = settings.GROUP_PERIOD
self.BROADCAST_LIMIT = settings.BROADCAST_LIMIT
self.BROADCAST_PERIOD = settings.BROADCAST_PERIOD
# Очереди для запросов по приоритетам
self.high_priority_queue = asyncio.Queue()
self.low_priority_queue = asyncio.Queue()
# История вызовов для скользящего окна
self.calls_history: deque[float] = deque()
# История вызовов для разных лимитов
self.per_chat_history: Dict[str, deque] = {} # chat_id -> deque[timestamp]
self.group_history: deque = deque() # для групповых чатов
self.broadcast_history: deque = deque() # для всех сообщений
# Блокировка для потокобезопасной работы
self.lock = asyncio.Lock()
@ -34,67 +66,143 @@ class PrioritySlidingWindowLimiter:
while not self.high_priority_queue.empty() or not self.low_priority_queue.empty():
# Сначала обрабатываем высокоприоритетные запросы
if not self.high_priority_queue.empty():
future = await self.high_priority_queue.get()
await self._wait_and_release(future)
item = await self.high_priority_queue.get()
await self._wait_and_release(item)
self.high_priority_queue.task_done()
# Если высокоприоритетных нет - обрабатываем низкоприоритетные
elif not self.low_priority_queue.empty():
future = await self.low_priority_queue.get()
await self._wait_and_release(future)
item = await self.low_priority_queue.get()
await self._wait_and_release(item)
self.low_priority_queue.task_done()
finally:
self._processing = False
self._processing_task = None
async def _wait_and_release(self, future: asyncio.Future):
async def _wait_and_release(self, item: dict):
"""Ждем возможности выполнить запрос и разрешаем его"""
future = item['future']
chat_id = item['chat_id']
chat_type = item['chat_type']
is_broadcast = item['is_broadcast']
try:
await self._wait_sliding_window()
await self._wait_telegram_limits(chat_id, chat_type, is_broadcast)
if not future.done():
future.set_result(None)
except Exception as e:
if not future.done():
future.set_exception(e)
async def _wait_sliding_window(self):
"""Ожидание в соответствии со скользящим окном"""
async def _wait_telegram_limits(self, chat_id: str, chat_type: ChatType, is_broadcast: bool):
"""Ожидание в соответствии с лимитами Telegram"""
async with self.lock:
now = time.monotonic()
# Очищаем устаревшие записи
while self.calls_history and now - self.calls_history[0] > self.period:
self.calls_history.popleft()
# Если достигнут лимит - ждем
if len(self.calls_history) >= self.max_calls:
# Добавляем jitter для избежания коллизий
jitter_sleep = random.uniform(0.1, 0.5)
await asyncio.sleep(jitter_sleep)
# Ждем освобождения слота
if self.calls_history: # Проверяем, что очередь не пуста
sleep_for = self.period - (now - self.calls_history[0]) + 0.01
await asyncio.sleep(max(0, sleep_for))
# Очищаем устаревшие записи для всех лимитов
self._cleanup_history(now)
# Записываем время вызова
self.calls_history.append(time.monotonic())
# Проверяем все лимиты и ждем, если нужно
while True:
can_proceed = True
sleep_time = 0
# 1. Лимит для конкретного чата (1 сообщение/сек)
if len(self.per_chat_history.get(chat_id, deque())) >= self.PER_CHAT_LIMIT:
oldest = self.per_chat_history[chat_id][0]
sleep_time = max(sleep_time, self.PER_CHAT_PERIOD - (now - oldest) + 0.01)
can_proceed = False
# 2. Лимит для групп (20 сообщений/минуту)
if chat_type in [ChatType.GROUP, ChatType.CHANNEL]:
if len(self.group_history) >= self.GROUP_LIMIT:
oldest = self.group_history[0]
sleep_time = max(sleep_time, self.GROUP_PERIOD - (now - oldest) + 0.01)
can_proceed = False
# 3. Лимит для массовых рассылок (30 сообщений/сек)
if is_broadcast:
if len(self.broadcast_history) >= self.BROADCAST_LIMIT:
oldest = self.broadcast_history[0]
sleep_time = max(sleep_time, self.BROADCAST_PERIOD - (now - oldest) + 0.01)
can_proceed = False
if can_proceed:
break
# Добавляем небольшой jitter для избежания коллизий
jitter = random.uniform(0.05, 0.2)
await asyncio.sleep(max(0, sleep_time + jitter))
# Обновляем время после ожидания
now = time.monotonic()
self._cleanup_history(now)
# Записываем время вызова во все соответствующие истории
self._record_call(now, chat_id, chat_type, is_broadcast)
async def wait(self, low_priority: bool = False) -> None:
def _cleanup_history(self, now: float):
"""Очищает устаревшие записи из всех очередей"""
# Очищаем историю для конкретных чатов
chats_to_remove = []
for chat_id, history in self.per_chat_history.items():
while history and now - history[0] > self.PER_CHAT_PERIOD:
history.popleft()
if len(history) == 0:
chats_to_remove.append(chat_id)
# Удаляем пустые очереди чатов
for chat_id in chats_to_remove:
del self.per_chat_history[chat_id]
# Очищаем историю групп
while self.group_history and now - self.group_history[0] > self.GROUP_PERIOD:
self.group_history.popleft()
# Очищаем историю массовых рассылок
while self.broadcast_history and now - self.broadcast_history[0] > self.BROADCAST_PERIOD:
self.broadcast_history.popleft()
def _record_call(self, timestamp: float, chat_id: str, chat_type: ChatType, is_broadcast: bool):
"""Записывает вызов во все соответствующие очереди"""
# Запись для лимита по чату
if chat_id not in self.per_chat_history:
self.per_chat_history[chat_id] = deque()
self.per_chat_history[chat_id].append(timestamp)
# Запись для лимита групп, если это группа или канал
if chat_type in [ChatType.GROUP, ChatType.CHANNEL]:
self.group_history.append(timestamp)
# Запись для лимита массовых рассылок
if is_broadcast:
self.broadcast_history.append(timestamp)
async def wait(self, chat_id: str, chat_type: ChatType = ChatType.PRIVATE,
is_broadcast: bool = False) -> None:
"""
Ожидание разрешения на выполнение запроса.
Ожидание разрешения на выполнение запроса в соответствии с лимитами Telegram.
Рассылки автоматически считаются низкоприоритетными.
Args:
priority: Приоритет запроса (HIGH или LOW)
chat_id: ID чата (строка, чтобы поддерживать и числовые и строковые ID)
chat_type: Тип чата (PRIVATE, GROUP, CHANNEL)
is_broadcast: Является ли сообщение частью массовой рассылки
"""
future = asyncio.Future()
item = {
'future': future,
'chat_id': str(chat_id),
'chat_type': chat_type,
'is_broadcast': is_broadcast
}
# Добавляем в соответствующую очередь
if not low_priority:
await self.high_priority_queue.put(future)
# Рассылки идут в низкоприоритетную очередь, остальное - в высокоприоритетную
if is_broadcast:
await self.low_priority_queue.put(item)
else:
await self.low_priority_queue.put(future)
await self.high_priority_queue.put(item)
# Запускаем обработчик, если он не активен
if not self._processing:
@ -104,15 +212,32 @@ class PrioritySlidingWindowLimiter:
# Ждем разрешения
await future
def can_execute_immediately(self) -> bool:
def can_execute_immediately(self, chat_id: str, chat_type: ChatType = ChatType.PRIVATE,
is_broadcast: bool = False) -> bool:
"""Проверяет, можно ли выполнить запрос немедленно"""
now = time.monotonic()
chat_id = str(chat_id)
with self.lock:
# Очищаем устаревшие записи
while self.calls_history and now - self.calls_history[0] > self.period:
self.calls_history.popleft()
self._cleanup_history(now)
return len(self.calls_history) < self.max_calls
# Проверяем все лимиты
# 1. Лимит для конкретного чата
if len(self.per_chat_history.get(chat_id, deque())) >= self.PER_CHAT_LIMIT:
return False
# 2. Лимит для групп
if chat_type in [ChatType.GROUP, ChatType.CHANNEL]:
if len(self.group_history) >= self.GROUP_LIMIT:
return False
# 3. Лимит для массовых рассылок
if is_broadcast:
if len(self.broadcast_history) >= self.BROADCAST_LIMIT:
return False
return True
async def wait_until_idle(self):
"""Ожидание завершения всех задач в очереди"""