From f9b9b1d932bd682859d15ce834952e1f17f64917 Mon Sep 17 00:00:00 2001 From: Alex Root Junior Date: Sat, 22 Sep 2018 21:48:48 +0300 Subject: [PATCH] Implement pipe for uploading files from URL --- aiogram/types/input_file.py | 122 +++++++++++++++++++++++------------- 1 file changed, 78 insertions(+), 44 deletions(-) diff --git a/aiogram/types/input_file.py b/aiogram/types/input_file.py index 5c271701..c9e37c30 100644 --- a/aiogram/types/input_file.py +++ b/aiogram/types/input_file.py @@ -1,8 +1,9 @@ +import asyncio +import inspect import io import logging import os import secrets -import time import aiohttp @@ -41,6 +42,9 @@ class InputFile(base.TelegramObject): elif isinstance(path_or_bytesio, io.IOBase): self._path = None self._file = path_or_bytesio + elif isinstance(path_or_bytesio, _WebPipe): + self._path = None + self._file = path_or_bytesio else: raise TypeError('Not supported file type.') @@ -52,6 +56,11 @@ class InputFile(base.TelegramObject): """ Close file descriptor """ + if not hasattr(self, '_file'): + return + + if inspect.iscoroutinefunction(self._file.close): + return asyncio.ensure_future(self._file.close()) self._file.close() @property @@ -89,7 +98,7 @@ class InputFile(base.TelegramObject): return self.file @classmethod - async def from_url(cls, url, filename=None, chunk_size=CHUNK_SIZE): + def from_url(cls, url, filename=None, chunk_size=CHUNK_SIZE): """ Download file from URL @@ -101,28 +110,15 @@ class InputFile(base.TelegramObject): :return: InputFile """ - conf = { - 'downloaded': True, - 'url': url - } + pipe = _WebPipe(url, chunk_size=chunk_size) + if filename is None: + *_, part = url.rpartition('/') + if part: + filename = part + else: + filename = secrets.token_urlsafe(32) - # Let's do magic with the filename - if filename: - filename_prefix, _, ext = filename.rpartition('.') - file_suffix = '.' + ext if ext else '' - else: - filename_prefix, _, ext = url.rpartition('/')[-1].rpartition('.') - file_suffix = '.' + ext if ext else '' - filename = filename_prefix + file_suffix - - async with aiohttp.ClientSession() as session: - start = time.time() - async with session.get(url) as response: - # Save file in memory - file = await cls._process_stream(response, io.BytesIO(), chunk_size=chunk_size) - - log.debug(f"File successful downloaded at {round(time.time() - start, 2)} seconds from '{url}'") - return cls(file, filename, conf=conf) + return cls(pipe, filename, chunk_size) def save(self, filename, chunk_size=CHUNK_SIZE): """ @@ -145,27 +141,6 @@ class InputFile(base.TelegramObject): if self.file.seekable(): self.file.seek(0) - @classmethod - async def _process_stream(cls, response, writer, chunk_size=CHUNK_SIZE): - """ - Transfer data - - :param response: - :param writer: - :param chunk_size: - :return: - """ - while True: - chunk = await response.content.read(chunk_size) - if not chunk: - break - writer.write(chunk) - - if writer.seekable(): - writer.seek(0) - - return writer - def __str__(self): return f"" @@ -175,3 +150,62 @@ class InputFile(base.TelegramObject): @classmethod def to_object(cls, data): raise TypeError('Object of this type is not importable!') + + +class _WebPipe: + def __init__(self, url, chunk_size=-1): + self.url = url + self.chunk_size = chunk_size + + self._session: aiohttp.ClientSession = None + self._response: aiohttp.ClientResponse = None + self._reader = None + self._name = None + + self._lock = asyncio.Lock() + + @property + def name(self): + if not self._name: + self._name = secrets.token_urlsafe(32) + return self._name + + async def open(self): + session = self._session = aiohttp.ClientSession() + self._response = await session.get(self.url) # type: aiohttp.ClientResponse + await self._lock.acquire() + + return self + + async def close(self): + if self._response and not self._response.closed: + await self._response.close() + if self._session and not self._session.closed: + await self._session.close() + if self._lock.locked(): + self._lock.release() + + @property + def closed(self): + return not self._session or self._session.closed + + def __aiter__(self): + return self + + async def __anext__(self): + if self.closed: + await self.open() + + chunk = await self.read(self.chunk_size) + if not chunk: + await self.close() + raise StopAsyncIteration + return chunk + + async def read(self, chunk_size=-1): + if not self._response: + raise LookupError('I/O operation on closed stream') + response: aiohttp.ClientResponse = self._response + reader: aiohttp.StreamReader = response.content + + return await reader.read(chunk_size)