fix: correct lock and workers

This commit is contained in:
Oleg A 2022-09-21 22:10:59 +03:00
parent 515e5769ef
commit 96ad8d8ed9
No known key found for this signature in database
GPG key ID: 5FE046817A9657C5
2 changed files with 49 additions and 30 deletions

View file

@ -57,6 +57,7 @@ class ChatActionSender:
self._close_event = Event()
self._closed_event = Event()
self._task: Optional[asyncio.Task[Any]] = None
self._counter = 0
@property
def running(self) -> bool:
@ -66,39 +67,44 @@ class ChatActionSender:
with suppress(asyncio.TimeoutError):
await asyncio.wait_for(self._close_event.wait(), interval)
async def _worker(self, event_name: str, event: Event) -> None:
async def _init_worker(self) -> None:
"""Start init worker."""
await self._worker(self._initial_event)
async def _main_worker(self) -> None:
"""Start main worker."""
logger.debug(
"Started chat action %r sender %s in chat_id=%s via bot id=%d",
"Started chat action %r sender in chat_id=%s via bot id=%d",
self.action,
event_name,
self.chat_id,
self.bot.id,
)
try:
counter = 0
while not event.is_set():
start = time.monotonic()
logger.debug(
"Sent chat action %r to chat_id=%s via bot %d (already sent actions %d)",
self.action,
self.chat_id,
self.bot.id,
counter,
)
await self.bot.send_chat_action(chat_id=self.chat_id, action=self.action)
counter += 1
interval = self.interval - (time.monotonic() - start)
await self._wait(interval)
await self._worker(self._close_event)
finally:
logger.debug(
"Finished chat action %r sender %s in chat_id=%s via bot id=%d",
"Finished chat action %r sender in chat_id=%s via bot id=%d",
self.action,
event_name,
self.chat_id,
self.bot.id,
)
event.set()
self._closed_event.set()
async def _worker(self, event: Event) -> None:
while not event.is_set():
start = time.monotonic()
logger.debug(
"Sent chat action %r to chat_id=%s via bot %d (already sent actions %d)",
self.action,
self.chat_id,
self.bot.id,
self._counter,
)
await self.bot.send_chat_action(chat_id=self.chat_id, action=self.action)
self._counter += 1
interval = self.interval - (time.monotonic() - start)
await self._wait(interval)
async def _initial_action(self) -> None:
"""Process initial action.
@ -121,24 +127,26 @@ class ChatActionSender:
if self.initial_sleep <= self.interval:
start = time.monotonic()
await self.bot.send_chat_action(chat_id=self.chat_id, action=self.action)
self._counter += 1
initial_sleep = self.initial_sleep - (time.monotonic() - start)
await self._wait(initial_sleep)
return
loop = asyncio.get_event_loop()
loop.call_later(self.initial_sleep, self._initial_event.set)
await self._worker("init_event", self._initial_event)
await self._init_worker()
async def _run(self) -> None:
async with self._lock:
self._initial_event.clear()
self._close_event.clear()
self._closed_event.clear()
self._counter = 0
if self.running:
raise RuntimeError("Already running")
await self._initial_action()
self._task = asyncio.create_task(self._worker("close_event", self._close_event))
self._task = asyncio.create_task(self._main_worker())
async def _stop(self) -> None:
async with self._lock:

View file

@ -1,4 +1,5 @@
import asyncio
import logging
import time
from datetime import datetime
@ -17,10 +18,11 @@ except ImportError:
from unittest.mock import patch
pytestmarm = pytest.mark.asyncio
logger = logging.getLogger(__name__)
class TestChatActionSender:
initial_sleep = 1.0
initial_sleep = 0.5
async def test_wait_with_event(self, bot: Bot, loop: asyncio.BaseEventLoop):
sender = ChatActionSender.typing(bot=bot, chat_id=42)
@ -35,11 +37,6 @@ class TestChatActionSender:
await sender._wait(self.initial_sleep)
assert time.monotonic() - start >= self.initial_sleep
async def test_initial_sleep(self, bot: Bot):
start = time.monotonic()
async with ChatActionSender.typing(bot=bot, chat_id=42, initial_sleep=self.initial_sleep):
assert time.monotonic() - start >= self.initial_sleep
@pytest.mark.parametrize(
"action",
[
@ -71,12 +68,26 @@ class TestChatActionSender:
new_callable=CoroutineMock,
) as mocked_send_chat_action:
async with ChatActionSender.typing(
bot=bot, chat_id=42, interval=0.01, initial_sleep=0
bot=bot, chat_id=42, interval=0.01, initial_sleep=0.0
):
await asyncio.sleep(0.1)
assert mocked_send_chat_action.await_count > 1
mocked_send_chat_action.assert_awaited_with(action="typing", chat_id=42)
async def test_worker_with_initial_sleep(self, bot: Bot):
start = time.monotonic()
with patch(
"aiogram.client.bot.Bot.send_chat_action",
new_callable=CoroutineMock,
) as mocked_send_chat_action:
async with ChatActionSender.typing(
bot=bot, chat_id=42, interval=0.01, initial_sleep=self.initial_sleep
):
assert time.monotonic() - start >= self.initial_sleep
await asyncio.sleep(0.1)
assert mocked_send_chat_action.await_count > 1
mocked_send_chat_action.assert_awaited_with(action="typing", chat_id=42)
async def test_contextmanager(self, bot: MockedBot):
sender: ChatActionSender = ChatActionSender.typing(bot=bot, chat_id=42)
assert not sender.running