Implement pipe for uploading files from URL

This commit is contained in:
Alex Root Junior 2018-09-22 21:48:48 +03:00
parent d0caabc55b
commit f9b9b1d932

View file

@ -1,8 +1,9 @@
import asyncio
import inspect
import io
import logging
import os
import secrets
import time
import aiohttp
@ -41,6 +42,9 @@ class InputFile(base.TelegramObject):
elif isinstance(path_or_bytesio, io.IOBase):
self._path = None
self._file = path_or_bytesio
elif isinstance(path_or_bytesio, _WebPipe):
self._path = None
self._file = path_or_bytesio
else:
raise TypeError('Not supported file type.')
@ -52,6 +56,11 @@ class InputFile(base.TelegramObject):
"""
Close file descriptor
"""
if not hasattr(self, '_file'):
return
if inspect.iscoroutinefunction(self._file.close):
return asyncio.ensure_future(self._file.close())
self._file.close()
@property
@ -89,7 +98,7 @@ class InputFile(base.TelegramObject):
return self.file
@classmethod
async def from_url(cls, url, filename=None, chunk_size=CHUNK_SIZE):
def from_url(cls, url, filename=None, chunk_size=CHUNK_SIZE):
"""
Download file from URL
@ -101,28 +110,15 @@ class InputFile(base.TelegramObject):
:return: InputFile
"""
conf = {
'downloaded': True,
'url': url
}
pipe = _WebPipe(url, chunk_size=chunk_size)
if filename is None:
*_, part = url.rpartition('/')
if part:
filename = part
else:
filename = secrets.token_urlsafe(32)
# Let's do magic with the filename
if filename:
filename_prefix, _, ext = filename.rpartition('.')
file_suffix = '.' + ext if ext else ''
else:
filename_prefix, _, ext = url.rpartition('/')[-1].rpartition('.')
file_suffix = '.' + ext if ext else ''
filename = filename_prefix + file_suffix
async with aiohttp.ClientSession() as session:
start = time.time()
async with session.get(url) as response:
# Save file in memory
file = await cls._process_stream(response, io.BytesIO(), chunk_size=chunk_size)
log.debug(f"File successful downloaded at {round(time.time() - start, 2)} seconds from '{url}'")
return cls(file, filename, conf=conf)
return cls(pipe, filename, chunk_size)
def save(self, filename, chunk_size=CHUNK_SIZE):
"""
@ -145,27 +141,6 @@ class InputFile(base.TelegramObject):
if self.file.seekable():
self.file.seek(0)
@classmethod
async def _process_stream(cls, response, writer, chunk_size=CHUNK_SIZE):
"""
Transfer data
:param response:
:param writer:
:param chunk_size:
:return:
"""
while True:
chunk = await response.content.read(chunk_size)
if not chunk:
break
writer.write(chunk)
if writer.seekable():
writer.seek(0)
return writer
def __str__(self):
return f"<InputFile 'attach://{self.attachment_key}' with file='{self.file}'>"
@ -175,3 +150,62 @@ class InputFile(base.TelegramObject):
@classmethod
def to_object(cls, data):
raise TypeError('Object of this type is not importable!')
class _WebPipe:
def __init__(self, url, chunk_size=-1):
self.url = url
self.chunk_size = chunk_size
self._session: aiohttp.ClientSession = None
self._response: aiohttp.ClientResponse = None
self._reader = None
self._name = None
self._lock = asyncio.Lock()
@property
def name(self):
if not self._name:
self._name = secrets.token_urlsafe(32)
return self._name
async def open(self):
session = self._session = aiohttp.ClientSession()
self._response = await session.get(self.url) # type: aiohttp.ClientResponse
await self._lock.acquire()
return self
async def close(self):
if self._response and not self._response.closed:
await self._response.close()
if self._session and not self._session.closed:
await self._session.close()
if self._lock.locked():
self._lock.release()
@property
def closed(self):
return not self._session or self._session.closed
def __aiter__(self):
return self
async def __anext__(self):
if self.closed:
await self.open()
chunk = await self.read(self.chunk_size)
if not chunk:
await self.close()
raise StopAsyncIteration
return chunk
async def read(self, chunk_size=-1):
if not self._response:
raise LookupError('I/O operation on closed stream')
response: aiohttp.ClientResponse = self._response
reader: aiohttp.StreamReader = response.content
return await reader.read(chunk_size)