mirror of
https://github.com/aiogram/aiogram.git
synced 2026-04-08 16:37:47 +00:00
added priority-awaire limiter
This commit is contained in:
parent
ddcedadfbd
commit
705917c853
2 changed files with 195 additions and 21 deletions
|
|
@ -244,9 +244,29 @@ from ..types import (
|
|||
from .default import Default, DefaultBotProperties
|
||||
from .session.aiohttp import AiohttpSession
|
||||
from .session.base import BaseSession
|
||||
from .limiter import PrioritySlidingWindowLimiter
|
||||
|
||||
T = TypeVar("T")
|
||||
|
||||
MESSAGE_MODIFYING_CLASSES = {
|
||||
'SendAnimation',
|
||||
'SendAudio',
|
||||
'SendContact',
|
||||
'SendDice',
|
||||
'SendDocument',
|
||||
'SendGame',
|
||||
'SendInvoice',
|
||||
'SendLocation',
|
||||
'SendMediaGroup',
|
||||
'SendMessage',
|
||||
'SendPaidMedia',
|
||||
'SendPhoto',
|
||||
'SendPoll',
|
||||
'SendSticker',
|
||||
'SendVenue',
|
||||
'SendVideo',
|
||||
'SendVideoNote',
|
||||
'SendVoice'
|
||||
}
|
||||
|
||||
class Bot:
|
||||
def __init__(
|
||||
|
|
@ -275,6 +295,7 @@ class Bot:
|
|||
default = DefaultBotProperties()
|
||||
|
||||
self.session = session
|
||||
self.limiter = PrioritySlidingWindowLimiter(max_calls=29, period=1.0)
|
||||
|
||||
# Few arguments are completely removed in 3.7.0 version
|
||||
# Temporary solution to raise an error if user passed these arguments
|
||||
|
|
@ -471,7 +492,8 @@ class Bot:
|
|||
)
|
||||
|
||||
async def __call__(
|
||||
self, method: TelegramMethod[T], request_timeout: Optional[int] = None
|
||||
self, method: TelegramMethod[T], request_timeout: Optional[int] = None,
|
||||
low_priority: bool = False
|
||||
) -> T:
|
||||
"""
|
||||
Call API method
|
||||
|
|
@ -479,6 +501,8 @@ class Bot:
|
|||
:param method:
|
||||
:return:
|
||||
"""
|
||||
if method.__class__.__name__ in MESSAGE_MODIFYING_CLASSES:
|
||||
await self.limiter.wait(low_priority)
|
||||
return await self.session(self, method, timeout=request_timeout)
|
||||
|
||||
def __hash__(self) -> int:
|
||||
|
|
@ -2156,6 +2180,7 @@ class Bot:
|
|||
allow_sending_without_reply: Optional[bool] = None,
|
||||
reply_to_message_id: Optional[int] = None,
|
||||
request_timeout: Optional[int] = None,
|
||||
low_priority: bool = False
|
||||
) -> Message:
|
||||
"""
|
||||
Use this method to send animation files (GIF or H.264/MPEG-4 AVC video without sound). On success, the sent :class:`aiogram.types.message.Message` is returned. Bots can currently send animation files of up to 50 MB in size, this limit may be changed in the future.
|
||||
|
|
@ -2210,7 +2235,7 @@ class Bot:
|
|||
allow_sending_without_reply=allow_sending_without_reply,
|
||||
reply_to_message_id=reply_to_message_id,
|
||||
)
|
||||
return await self(call, request_timeout=request_timeout)
|
||||
return await self(call, request_timeout=request_timeout, low_priority=low_priority)
|
||||
|
||||
async def send_audio(
|
||||
self,
|
||||
|
|
@ -2234,6 +2259,7 @@ class Bot:
|
|||
allow_sending_without_reply: Optional[bool] = None,
|
||||
reply_to_message_id: Optional[int] = None,
|
||||
request_timeout: Optional[int] = None,
|
||||
low_priority: bool = False
|
||||
) -> Message:
|
||||
"""
|
||||
Use this method to send audio files, if you want Telegram clients to display them in the music player. Your audio must be in the .MP3 or .M4A format. On success, the sent :class:`aiogram.types.message.Message` is returned. Bots can currently send audio files of up to 50 MB in size, this limit may be changed in the future.
|
||||
|
|
@ -2285,7 +2311,7 @@ class Bot:
|
|||
allow_sending_without_reply=allow_sending_without_reply,
|
||||
reply_to_message_id=reply_to_message_id,
|
||||
)
|
||||
return await self(call, request_timeout=request_timeout)
|
||||
return await self(call, request_timeout=request_timeout, low_priority=low_priority)
|
||||
|
||||
async def send_chat_action(
|
||||
self,
|
||||
|
|
@ -2338,6 +2364,7 @@ class Bot:
|
|||
allow_sending_without_reply: Optional[bool] = None,
|
||||
reply_to_message_id: Optional[int] = None,
|
||||
request_timeout: Optional[int] = None,
|
||||
low_priority: bool = False
|
||||
) -> Message:
|
||||
"""
|
||||
Use this method to send phone contacts. On success, the sent :class:`aiogram.types.message.Message` is returned.
|
||||
|
|
@ -2380,7 +2407,7 @@ class Bot:
|
|||
allow_sending_without_reply=allow_sending_without_reply,
|
||||
reply_to_message_id=reply_to_message_id,
|
||||
)
|
||||
return await self(call, request_timeout=request_timeout)
|
||||
return await self(call, request_timeout=request_timeout, low_priority=low_priority)
|
||||
|
||||
async def send_dice(
|
||||
self,
|
||||
|
|
@ -2397,6 +2424,7 @@ class Bot:
|
|||
allow_sending_without_reply: Optional[bool] = None,
|
||||
reply_to_message_id: Optional[int] = None,
|
||||
request_timeout: Optional[int] = None,
|
||||
low_priority: bool = False
|
||||
) -> Message:
|
||||
"""
|
||||
Use this method to send an animated emoji that will display a random value. On success, the sent :class:`aiogram.types.message.Message` is returned.
|
||||
|
|
@ -2433,7 +2461,7 @@ class Bot:
|
|||
allow_sending_without_reply=allow_sending_without_reply,
|
||||
reply_to_message_id=reply_to_message_id,
|
||||
)
|
||||
return await self(call, request_timeout=request_timeout)
|
||||
return await self(call, request_timeout=request_timeout, low_priority=low_priority)
|
||||
|
||||
async def send_document(
|
||||
self,
|
||||
|
|
@ -2455,6 +2483,7 @@ class Bot:
|
|||
allow_sending_without_reply: Optional[bool] = None,
|
||||
reply_to_message_id: Optional[int] = None,
|
||||
request_timeout: Optional[int] = None,
|
||||
low_priority: bool = False
|
||||
) -> Message:
|
||||
"""
|
||||
Use this method to send general files. On success, the sent :class:`aiogram.types.message.Message` is returned. Bots can currently send files of any type of up to 50 MB in size, this limit may be changed in the future.
|
||||
|
|
@ -2501,7 +2530,7 @@ class Bot:
|
|||
allow_sending_without_reply=allow_sending_without_reply,
|
||||
reply_to_message_id=reply_to_message_id,
|
||||
)
|
||||
return await self(call, request_timeout=request_timeout)
|
||||
return await self(call, request_timeout=request_timeout, low_priority=low_priority)
|
||||
|
||||
async def send_game(
|
||||
self,
|
||||
|
|
@ -2518,6 +2547,7 @@ class Bot:
|
|||
allow_sending_without_reply: Optional[bool] = None,
|
||||
reply_to_message_id: Optional[int] = None,
|
||||
request_timeout: Optional[int] = None,
|
||||
low_priority: bool = False
|
||||
) -> Message:
|
||||
"""
|
||||
Use this method to send a game. On success, the sent :class:`aiogram.types.message.Message` is returned.
|
||||
|
|
@ -2554,7 +2584,7 @@ class Bot:
|
|||
allow_sending_without_reply=allow_sending_without_reply,
|
||||
reply_to_message_id=reply_to_message_id,
|
||||
)
|
||||
return await self(call, request_timeout=request_timeout)
|
||||
return await self(call, request_timeout=request_timeout, low_priority=low_priority)
|
||||
|
||||
async def send_invoice(
|
||||
self,
|
||||
|
|
@ -2590,6 +2620,7 @@ class Bot:
|
|||
allow_sending_without_reply: Optional[bool] = None,
|
||||
reply_to_message_id: Optional[int] = None,
|
||||
request_timeout: Optional[int] = None,
|
||||
low_priority: bool = False
|
||||
) -> Message:
|
||||
"""
|
||||
Use this method to send invoices. On success, the sent :class:`aiogram.types.message.Message` is returned.
|
||||
|
|
@ -2664,7 +2695,7 @@ class Bot:
|
|||
allow_sending_without_reply=allow_sending_without_reply,
|
||||
reply_to_message_id=reply_to_message_id,
|
||||
)
|
||||
return await self(call, request_timeout=request_timeout)
|
||||
return await self(call, request_timeout=request_timeout, low_priority=low_priority)
|
||||
|
||||
async def send_location(
|
||||
self,
|
||||
|
|
@ -2686,6 +2717,7 @@ class Bot:
|
|||
allow_sending_without_reply: Optional[bool] = None,
|
||||
reply_to_message_id: Optional[int] = None,
|
||||
request_timeout: Optional[int] = None,
|
||||
low_priority: bool = False
|
||||
) -> Message:
|
||||
"""
|
||||
Use this method to send point on the map. On success, the sent :class:`aiogram.types.message.Message` is returned.
|
||||
|
|
@ -2731,6 +2763,7 @@ class Bot:
|
|||
reply_markup=reply_markup,
|
||||
allow_sending_without_reply=allow_sending_without_reply,
|
||||
reply_to_message_id=reply_to_message_id,
|
||||
low_priority=low_priority
|
||||
)
|
||||
return await self(call, request_timeout=request_timeout)
|
||||
|
||||
|
|
@ -2748,6 +2781,7 @@ class Bot:
|
|||
allow_sending_without_reply: Optional[bool] = None,
|
||||
reply_to_message_id: Optional[int] = None,
|
||||
request_timeout: Optional[int] = None,
|
||||
low_priority: bool = False
|
||||
) -> list[Message]:
|
||||
"""
|
||||
Use this method to send a group of photos, videos, documents or audios as an album. Documents and audio files can be only grouped in an album with messages of the same type. On success, an array of `Messages <https://core.telegram.org/bots/api#message>`_ that were sent is returned.
|
||||
|
|
@ -2782,7 +2816,7 @@ class Bot:
|
|||
allow_sending_without_reply=allow_sending_without_reply,
|
||||
reply_to_message_id=reply_to_message_id,
|
||||
)
|
||||
return await self(call, request_timeout=request_timeout)
|
||||
return await self(call, request_timeout=request_timeout, low_priority=low_priority)
|
||||
|
||||
async def send_message(
|
||||
self,
|
||||
|
|
@ -2807,6 +2841,7 @@ class Bot:
|
|||
),
|
||||
reply_to_message_id: Optional[int] = None,
|
||||
request_timeout: Optional[int] = None,
|
||||
low_priority: bool = False
|
||||
) -> Message:
|
||||
"""
|
||||
Use this method to send text messages. On success, the sent :class:`aiogram.types.message.Message` is returned.
|
||||
|
|
@ -2851,7 +2886,7 @@ class Bot:
|
|||
disable_web_page_preview=disable_web_page_preview,
|
||||
reply_to_message_id=reply_to_message_id,
|
||||
)
|
||||
return await self(call, request_timeout=request_timeout)
|
||||
return await self(call, request_timeout=request_timeout, low_priority=low_priority)
|
||||
|
||||
async def send_photo(
|
||||
self,
|
||||
|
|
@ -2875,6 +2910,7 @@ class Bot:
|
|||
allow_sending_without_reply: Optional[bool] = None,
|
||||
reply_to_message_id: Optional[int] = None,
|
||||
request_timeout: Optional[int] = None,
|
||||
low_priority: bool = False
|
||||
) -> Message:
|
||||
"""
|
||||
Use this method to send photos. On success, the sent :class:`aiogram.types.message.Message` is returned.
|
||||
|
|
@ -2921,7 +2957,7 @@ class Bot:
|
|||
allow_sending_without_reply=allow_sending_without_reply,
|
||||
reply_to_message_id=reply_to_message_id,
|
||||
)
|
||||
return await self(call, request_timeout=request_timeout)
|
||||
return await self(call, request_timeout=request_timeout, low_priority=low_priority)
|
||||
|
||||
async def send_poll(
|
||||
self,
|
||||
|
|
@ -2951,6 +2987,7 @@ class Bot:
|
|||
allow_sending_without_reply: Optional[bool] = None,
|
||||
reply_to_message_id: Optional[int] = None,
|
||||
request_timeout: Optional[int] = None,
|
||||
low_priority: bool = False
|
||||
) -> Message:
|
||||
"""
|
||||
Use this method to send a native poll. On success, the sent :class:`aiogram.types.message.Message` is returned.
|
||||
|
|
@ -3013,7 +3050,7 @@ class Bot:
|
|||
allow_sending_without_reply=allow_sending_without_reply,
|
||||
reply_to_message_id=reply_to_message_id,
|
||||
)
|
||||
return await self(call, request_timeout=request_timeout)
|
||||
return await self(call, request_timeout=request_timeout, low_priority=low_priority)
|
||||
|
||||
async def send_sticker(
|
||||
self,
|
||||
|
|
@ -3031,6 +3068,7 @@ class Bot:
|
|||
allow_sending_without_reply: Optional[bool] = None,
|
||||
reply_to_message_id: Optional[int] = None,
|
||||
request_timeout: Optional[int] = None,
|
||||
low_priority: bool = False
|
||||
) -> Message:
|
||||
"""
|
||||
Use this method to send static .WEBP, `animated <https://telegram.org/blog/animated-stickers>`_ .TGS, or `video <https://telegram.org/blog/video-stickers-better-reactions>`_ .WEBM stickers. On success, the sent :class:`aiogram.types.message.Message` is returned.
|
||||
|
|
@ -3069,7 +3107,7 @@ class Bot:
|
|||
allow_sending_without_reply=allow_sending_without_reply,
|
||||
reply_to_message_id=reply_to_message_id,
|
||||
)
|
||||
return await self(call, request_timeout=request_timeout)
|
||||
return await self(call, request_timeout=request_timeout, low_priority=low_priority)
|
||||
|
||||
async def send_venue(
|
||||
self,
|
||||
|
|
@ -3093,6 +3131,7 @@ class Bot:
|
|||
allow_sending_without_reply: Optional[bool] = None,
|
||||
reply_to_message_id: Optional[int] = None,
|
||||
request_timeout: Optional[int] = None,
|
||||
low_priority: bool = False
|
||||
) -> Message:
|
||||
"""
|
||||
Use this method to send information about a venue. On success, the sent :class:`aiogram.types.message.Message` is returned.
|
||||
|
|
@ -3143,7 +3182,7 @@ class Bot:
|
|||
allow_sending_without_reply=allow_sending_without_reply,
|
||||
reply_to_message_id=reply_to_message_id,
|
||||
)
|
||||
return await self(call, request_timeout=request_timeout)
|
||||
return await self(call, request_timeout=request_timeout, low_priority=low_priority)
|
||||
|
||||
async def send_video(
|
||||
self,
|
||||
|
|
@ -3174,6 +3213,7 @@ class Bot:
|
|||
allow_sending_without_reply: Optional[bool] = None,
|
||||
reply_to_message_id: Optional[int] = None,
|
||||
request_timeout: Optional[int] = None,
|
||||
low_priority: bool = False
|
||||
) -> Message:
|
||||
"""
|
||||
Use this method to send video files, Telegram clients support MPEG4 videos (other formats may be sent as :class:`aiogram.types.document.Document`). On success, the sent :class:`aiogram.types.message.Message` is returned. Bots can currently send video files of up to 50 MB in size, this limit may be changed in the future.
|
||||
|
|
@ -3234,7 +3274,7 @@ class Bot:
|
|||
allow_sending_without_reply=allow_sending_without_reply,
|
||||
reply_to_message_id=reply_to_message_id,
|
||||
)
|
||||
return await self(call, request_timeout=request_timeout)
|
||||
return await self(call, request_timeout=request_timeout, low_priority=low_priority)
|
||||
|
||||
async def send_video_note(
|
||||
self,
|
||||
|
|
@ -3254,6 +3294,7 @@ class Bot:
|
|||
allow_sending_without_reply: Optional[bool] = None,
|
||||
reply_to_message_id: Optional[int] = None,
|
||||
request_timeout: Optional[int] = None,
|
||||
low_priority: bool = False
|
||||
) -> Message:
|
||||
"""
|
||||
As of `v.4.0 <https://telegram.org/blog/video-messages-and-telescope>`_, Telegram clients support rounded square MPEG4 videos of up to 1 minute long. Use this method to send video messages. On success, the sent :class:`aiogram.types.message.Message` is returned.
|
||||
|
|
@ -3296,7 +3337,7 @@ class Bot:
|
|||
allow_sending_without_reply=allow_sending_without_reply,
|
||||
reply_to_message_id=reply_to_message_id,
|
||||
)
|
||||
return await self(call, request_timeout=request_timeout)
|
||||
return await self(call, request_timeout=request_timeout, low_priority=low_priority)
|
||||
|
||||
async def send_voice(
|
||||
self,
|
||||
|
|
@ -3317,6 +3358,7 @@ class Bot:
|
|||
allow_sending_without_reply: Optional[bool] = None,
|
||||
reply_to_message_id: Optional[int] = None,
|
||||
request_timeout: Optional[int] = None,
|
||||
low_priority: bool = False
|
||||
) -> Message:
|
||||
"""
|
||||
Use this method to send audio files, if you want Telegram clients to display the file as a playable voice message. For this to work, your audio must be in an .OGG file encoded with OPUS, or in .MP3 format, or in .M4A format (other formats may be sent as :class:`aiogram.types.audio.Audio` or :class:`aiogram.types.document.Document`). On success, the sent :class:`aiogram.types.message.Message` is returned. Bots can currently send voice messages of up to 50 MB in size, this limit may be changed in the future.
|
||||
|
|
@ -3361,7 +3403,7 @@ class Bot:
|
|||
allow_sending_without_reply=allow_sending_without_reply,
|
||||
reply_to_message_id=reply_to_message_id,
|
||||
)
|
||||
return await self(call, request_timeout=request_timeout)
|
||||
return await self(call, request_timeout=request_timeout, low_priority=low_priority)
|
||||
|
||||
async def set_chat_administrator_custom_title(
|
||||
self,
|
||||
|
|
@ -4593,6 +4635,7 @@ class Bot:
|
|||
reply_parameters: Optional[ReplyParameters] = None,
|
||||
reply_markup: Optional[ReplyMarkupUnion] = None,
|
||||
request_timeout: Optional[int] = None,
|
||||
low_priority: bool = False
|
||||
) -> Message:
|
||||
"""
|
||||
Use this method to send paid media. On success, the sent :class:`aiogram.types.message.Message` is returned.
|
||||
|
|
@ -4633,7 +4676,7 @@ class Bot:
|
|||
reply_parameters=reply_parameters,
|
||||
reply_markup=reply_markup,
|
||||
)
|
||||
return await self(call, request_timeout=request_timeout)
|
||||
return await self(call, request_timeout=request_timeout, low_priority=low_priority)
|
||||
|
||||
async def create_chat_subscription_invite_link(
|
||||
self,
|
||||
|
|
@ -4777,6 +4820,7 @@ class Bot:
|
|||
text_parse_mode: Optional[str] = None,
|
||||
text_entities: Optional[list[MessageEntity]] = None,
|
||||
request_timeout: Optional[int] = None,
|
||||
low_priority: bool = False
|
||||
) -> bool:
|
||||
"""
|
||||
Sends a gift to the given user or channel chat. The gift can't be converted to Telegram Stars by the receiver. Returns :code:`True` on success.
|
||||
|
|
@ -4803,7 +4847,7 @@ class Bot:
|
|||
text_parse_mode=text_parse_mode,
|
||||
text_entities=text_entities,
|
||||
)
|
||||
return await self(call, request_timeout=request_timeout)
|
||||
return await self(call, request_timeout=request_timeout, low_priority=low_priority)
|
||||
|
||||
async def set_user_emoji_status(
|
||||
self,
|
||||
|
|
@ -5480,6 +5524,7 @@ class Bot:
|
|||
reply_parameters: Optional[ReplyParameters] = None,
|
||||
reply_markup: Optional[InlineKeyboardMarkup] = None,
|
||||
request_timeout: Optional[int] = None,
|
||||
low_priority: bool = False
|
||||
) -> Message:
|
||||
"""
|
||||
Use this method to send a checklist on behalf of a connected business account. On success, the sent :class:`aiogram.types.message.Message` is returned.
|
||||
|
|
@ -5508,4 +5553,4 @@ class Bot:
|
|||
reply_parameters=reply_parameters,
|
||||
reply_markup=reply_markup,
|
||||
)
|
||||
return await self(call, request_timeout=request_timeout)
|
||||
return await self(call, request_timeout=request_timeout, low_priority=low_priority)
|
||||
|
|
|
|||
129
aiogram/client/limiter.py
Normal file
129
aiogram/client/limiter.py
Normal file
|
|
@ -0,0 +1,129 @@
|
|||
from typing import Optional
|
||||
import asyncio
|
||||
import time
|
||||
import random
|
||||
from collections import deque
|
||||
|
||||
class PrioritySlidingWindowLimiter:
|
||||
"""
|
||||
Ограничитель частоты запросов с поддержкой приоритетов.
|
||||
Активируется только при наличии задач в очереди.
|
||||
"""
|
||||
|
||||
def __init__(self, max_calls: int, period: float):
|
||||
self.max_calls = max_calls
|
||||
self.period = period
|
||||
|
||||
# Очереди для запросов
|
||||
self.high_priority_queue = asyncio.Queue()
|
||||
self.low_priority_queue = asyncio.Queue()
|
||||
|
||||
# История вызовов для скользящего окна
|
||||
self.calls_history: deque[float] = deque()
|
||||
|
||||
# Блокировка для потокобезопасной работы
|
||||
self.lock = asyncio.Lock()
|
||||
|
||||
# Флаг активности обработчика
|
||||
self._processing = False
|
||||
self._processing_task: Optional[asyncio.Task] = None
|
||||
|
||||
async def _process_queue(self):
|
||||
"""Обработка очереди до полного опустошения"""
|
||||
try:
|
||||
while not self.high_priority_queue.empty() or not self.low_priority_queue.empty():
|
||||
# Сначала обрабатываем высокоприоритетные запросы
|
||||
if not self.high_priority_queue.empty():
|
||||
future = await self.high_priority_queue.get()
|
||||
await self._wait_and_release(future)
|
||||
self.high_priority_queue.task_done()
|
||||
|
||||
# Если высокоприоритетных нет - обрабатываем низкоприоритетные
|
||||
elif not self.low_priority_queue.empty():
|
||||
future = await self.low_priority_queue.get()
|
||||
await self._wait_and_release(future)
|
||||
self.low_priority_queue.task_done()
|
||||
|
||||
finally:
|
||||
self._processing = False
|
||||
self._processing_task = None
|
||||
|
||||
async def _wait_and_release(self, future: asyncio.Future):
|
||||
"""Ждем возможности выполнить запрос и разрешаем его"""
|
||||
try:
|
||||
await self._wait_sliding_window()
|
||||
if not future.done():
|
||||
future.set_result(None)
|
||||
except Exception as e:
|
||||
if not future.done():
|
||||
future.set_exception(e)
|
||||
|
||||
async def _wait_sliding_window(self):
|
||||
"""Ожидание в соответствии со скользящим окном"""
|
||||
async with self.lock:
|
||||
now = time.monotonic()
|
||||
|
||||
# Очищаем устаревшие записи
|
||||
while self.calls_history and now - self.calls_history[0] > self.period:
|
||||
self.calls_history.popleft()
|
||||
|
||||
# Если достигнут лимит - ждем
|
||||
if len(self.calls_history) >= self.max_calls:
|
||||
# Добавляем jitter для избежания коллизий
|
||||
jitter_sleep = random.uniform(0.1, 0.5)
|
||||
await asyncio.sleep(jitter_sleep)
|
||||
|
||||
# Ждем освобождения слота
|
||||
if self.calls_history: # Проверяем, что очередь не пуста
|
||||
sleep_for = self.period - (now - self.calls_history[0]) + 0.01
|
||||
await asyncio.sleep(max(0, sleep_for))
|
||||
|
||||
# Записываем время вызова
|
||||
self.calls_history.append(time.monotonic())
|
||||
|
||||
async def wait(self, low_priority: bool = False) -> None:
|
||||
"""
|
||||
Ожидание разрешения на выполнение запроса.
|
||||
|
||||
Args:
|
||||
priority: Приоритет запроса (HIGH или LOW)
|
||||
"""
|
||||
future = asyncio.Future()
|
||||
|
||||
# Добавляем в соответствующую очередь
|
||||
if not low_priority:
|
||||
await self.high_priority_queue.put(future)
|
||||
else:
|
||||
await self.low_priority_queue.put(future)
|
||||
|
||||
# Запускаем обработчик, если он не активен
|
||||
if not self._processing:
|
||||
self._processing = True
|
||||
self._processing_task = asyncio.create_task(self._process_queue())
|
||||
|
||||
# Ждем разрешения
|
||||
await future
|
||||
|
||||
def can_execute_immediately(self) -> bool:
|
||||
"""Проверяет, можно ли выполнить запрос немедленно"""
|
||||
now = time.monotonic()
|
||||
with self.lock:
|
||||
# Очищаем устаревшие записи
|
||||
while self.calls_history and now - self.calls_history[0] > self.period:
|
||||
self.calls_history.popleft()
|
||||
|
||||
return len(self.calls_history) < self.max_calls
|
||||
|
||||
async def wait_until_idle(self):
|
||||
"""Ожидание завершения всех задач в очереди"""
|
||||
if self._processing_task:
|
||||
await self._processing_task
|
||||
|
||||
async def close(self):
|
||||
"""Завершение работы лимитера"""
|
||||
if self._processing_task:
|
||||
self._processing_task.cancel()
|
||||
try:
|
||||
await self._processing_task
|
||||
except asyncio.CancelledError:
|
||||
pass
|
||||
Loading…
Add table
Add a link
Reference in a new issue