aiogram/aiogram/bot/base.py

294 lines
10 KiB
Python
Raw Normal View History

import asyncio
import contextlib
import io
import ssl
import typing
import warnings
from contextvars import ContextVar
from typing import Dict, List, Optional, Union, Type
import aiohttp
import certifi
from aiohttp.helpers import sentinel
from . import api
2018-01-27 03:34:53 +02:00
from ..types import ParseMode, base
from ..utils import json
from ..utils.auth_widget import check_integrity
from ..utils.deprecated import deprecated
class BaseBot:
"""
Base class for bot. It's raw bot.
"""
_ctx_timeout = ContextVar('TelegramRequestTimeout')
_ctx_token = ContextVar('BotDifferentToken')
def __init__(
self,
token: base.String,
loop: Optional[Union[asyncio.BaseEventLoop, asyncio.AbstractEventLoop]] = None,
connections_limit: Optional[base.Integer] = None,
proxy: Optional[base.String] = None,
proxy_auth: Optional[aiohttp.BasicAuth] = None,
validate_token: Optional[base.Boolean] = True,
parse_mode: typing.Optional[base.String] = None,
2019-05-03 16:08:23 +05:00
timeout: typing.Optional[typing.Union[base.Integer, base.Float, aiohttp.ClientTimeout]] = None
):
"""
Instructions how to get Bot token is found here: https://core.telegram.org/bots#3-how-do-i-create-a-bot
:param token: token from @BotFather
2017-08-26 12:12:35 +03:00
:type token: :obj:`str`
:param loop: event loop
2017-08-26 12:12:35 +03:00
:type loop: Optional Union :obj:`asyncio.BaseEventLoop`, :obj:`asyncio.AbstractEventLoop`
:param connections_limit: connections limit for aiohttp.ClientSession
2017-08-26 12:12:35 +03:00
:type connections_limit: :obj:`int`
:param proxy: HTTP proxy URL
2017-08-26 12:12:35 +03:00
:type proxy: :obj:`str`
:param proxy_auth: Authentication information
:type proxy_auth: Optional :obj:`aiohttp.BasicAuth`
2017-10-20 17:33:58 +03:00
:param validate_token: Validate token.
:type validate_token: :obj:`bool`
2018-01-18 17:36:24 +02:00
:param parse_mode: You can set default parse mode
:type parse_mode: :obj:`str`
:param timeout: Request timeout
2019-05-03 16:08:23 +05:00
:type timeout: :obj:`typing.Optional[typing.Union[base.Integer, base.Float, aiohttp.ClientTimeout]]`
2017-08-26 12:12:35 +03:00
:raise: when token is invalid throw an :obj:`aiogram.utils.exceptions.ValidationError`
"""
self._main_loop = loop
2017-10-22 14:13:56 +03:00
# Authentication
if validate_token:
api.check_token(token)
self._token = None
self.__token = token
2020-04-07 09:36:16 +03:00
self.id = int(token.split(sep=':')[0])
2017-10-22 14:13:56 +03:00
self.proxy = proxy
self.proxy_auth = proxy_auth
2017-10-22 14:13:56 +03:00
# aiohttp main session
ssl_context = ssl.create_default_context(cafile=certifi.where())
self._session: Optional[aiohttp.ClientSession] = None
self._connector_class: Type[aiohttp.TCPConnector] = aiohttp.TCPConnector
self._connector_init = dict(limit=connections_limit, ssl=ssl_context)
if isinstance(proxy, str) and (proxy.startswith('socks5://') or proxy.startswith('socks4://')):
from aiohttp_socks import SocksConnector
2019-12-29 18:49:46 +03:00
from aiohttp_socks.utils import parse_proxy_url
2019-12-29 18:49:46 +03:00
socks_ver, host, port, username, password = parse_proxy_url(proxy)
2018-08-17 12:18:22 +03:00
if proxy_auth:
if not username:
username = proxy_auth.login
if not password:
password = proxy_auth.password
self._connector_class = SocksConnector
self._connector_init.update(
socks_ver=socks_ver, host=host, port=port,
username=username, password=password, rdns=True,
)
self.proxy = None
self.proxy_auth = None
self._timeout = None
self.timeout = timeout
2018-01-18 17:36:24 +02:00
self.parse_mode = parse_mode
def get_new_session(self) -> aiohttp.ClientSession:
return aiohttp.ClientSession(
connector=self._connector_class(**self._connector_init, loop=self._main_loop),
loop=self._main_loop,
json_serialize=json.dumps
)
@property
def loop(self) -> Optional[asyncio.AbstractEventLoop]:
return self._main_loop
@property
def session(self) -> Optional[aiohttp.ClientSession]:
if self._session is None or self._session.closed:
self._session = self.get_new_session()
return self._session
@staticmethod
def _prepare_timeout(
value: typing.Optional[typing.Union[base.Integer, base.Float, aiohttp.ClientTimeout]]
) -> typing.Optional[aiohttp.ClientTimeout]:
if value is None or isinstance(value, aiohttp.ClientTimeout):
return value
return aiohttp.ClientTimeout(total=value)
@property
def timeout(self):
timeout = self._ctx_timeout.get(self._timeout)
if timeout is None:
return sentinel
return timeout
@timeout.setter
def timeout(self, value):
self._timeout = self._prepare_timeout(value)
@timeout.deleter
def timeout(self):
self.timeout = None
@contextlib.contextmanager
2019-05-03 16:08:23 +05:00
def request_timeout(self, timeout: typing.Union[base.Integer, base.Float, aiohttp.ClientTimeout]):
"""
Context manager implements opportunity to change request timeout in current context
2019-05-03 16:09:55 +05:00
:param timeout: Request timeout
:type timeout: :obj:`typing.Optional[typing.Union[base.Integer, base.Float, aiohttp.ClientTimeout]]`
:return:
"""
timeout = self._prepare_timeout(timeout)
token = self._ctx_timeout.set(timeout)
try:
yield
finally:
self._ctx_timeout.reset(token)
@property
def __token(self):
return self._ctx_token.get(self._token)
@__token.setter
def __token(self, value):
self._token = value
@contextlib.contextmanager
def with_token(self, bot_token: base.String, validate_token: Optional[base.Boolean] = True):
2019-05-12 11:35:45 +05:00
if validate_token:
api.check_token(bot_token)
token = self._ctx_token.set(bot_token)
try:
yield
finally:
self._ctx_token.reset(token)
@deprecated("This method's behavior will be changed in aiogram v3.0. "
"More info: https://core.telegram.org/bots/api#close")
async def close(self):
"""
Close all client sessions
"""
await self.session.close()
2017-10-20 17:33:58 +03:00
async def request(self, method: base.String,
data: Optional[Dict] = None,
2018-08-14 00:13:37 +03:00
files: Optional[Dict] = None, **kwargs) -> Union[List, Dict, base.Boolean]:
"""
Make an request to Telegram Bot API
https://core.telegram.org/bots/api#making-requests
:param method: API method
2017-08-26 12:12:35 +03:00
:type method: :obj:`str`
:param data: request parameters
2017-08-26 12:12:35 +03:00
:type data: :obj:`dict`
:param files: files
2017-08-26 12:12:35 +03:00
:type files: :obj:`dict`
:return: result
:rtype: Union[List, Dict]
:raise: :obj:`aiogram.exceptions.TelegramApiError`
"""
return await api.make_request(self.session, self.__token, method, data, files,
proxy=self.proxy, proxy_auth=self.proxy_auth, timeout=self.timeout, **kwargs)
2017-10-20 17:33:58 +03:00
async def download_file(self, file_path: base.String,
destination: Optional[base.InputFile] = None,
timeout: Optional[base.Integer] = sentinel,
2017-10-20 17:33:58 +03:00
chunk_size: Optional[base.Integer] = 65536,
seek: Optional[base.Boolean] = True) -> Union[io.BytesIO, io.FileIO]:
"""
Download file by file_path to destination
2017-07-22 19:57:47 +03:00
if You want to automatically create destination (:class:`io.BytesIO`) use default
2017-08-04 13:25:29 +03:00
value of destination and handle result of this method.
2017-08-26 12:12:35 +03:00
:param file_path: file path on telegram server (You can get it from :obj:`aiogram.types.File`)
:type file_path: :obj:`str`
2017-07-22 19:57:47 +03:00
:param destination: filename or instance of :class:`io.IOBase`. For e. g. :class:`io.BytesIO`
:param timeout: Integer
:param chunk_size: Integer
:param seek: Boolean - go to start of file when downloading is finished.
:return: destination
"""
if destination is None:
destination = io.BytesIO()
2019-05-24 17:54:48 +07:00
url = self.get_file_url(file_path)
dest = destination if isinstance(destination, io.IOBase) else open(destination, 'wb')
async with self.session.get(url, timeout=timeout, proxy=self.proxy, proxy_auth=self.proxy_auth) as response:
2018-04-23 00:02:13 +03:00
while True:
chunk = await response.content.read(chunk_size)
if not chunk:
break
dest.write(chunk)
dest.flush()
if seek:
dest.seek(0)
return dest
2019-05-24 17:54:48 +07:00
def get_file_url(self, file_path):
return api.Methods.file_url(token=self.__token, path=file_path)
2017-10-20 17:33:58 +03:00
async def send_file(self, file_type, method, file, payload) -> Union[Dict, base.Boolean]:
2017-07-22 19:35:39 +03:00
"""
Send file
https://core.telegram.org/bots/api#inputfile
2017-07-22 19:35:39 +03:00
:param file_type: field name
2017-08-26 12:12:35 +03:00
:param method: API method
:param file: String or io.IOBase
2017-07-22 19:35:39 +03:00
:param payload: request payload
2017-08-26 12:12:35 +03:00
:return: response
2017-07-22 19:35:39 +03:00
"""
if file is None:
files = {}
elif isinstance(file, str):
# You can use file ID or URL in the most of requests
payload[file_type] = file
2017-07-22 19:35:39 +03:00
files = None
else:
2017-07-22 19:35:39 +03:00
files = {file_type: file}
2017-07-22 19:35:39 +03:00
return await self.request(method, payload, files)
2018-01-18 17:36:24 +02:00
@property
def parse_mode(self):
return getattr(self, '_parse_mode', None)
@parse_mode.setter
def parse_mode(self, value):
if value is None:
setattr(self, '_parse_mode', None)
else:
if not isinstance(value, str):
raise TypeError(f"Parse mode must be str, not {type(value)}")
2018-01-18 17:36:24 +02:00
value = value.lower()
if value not in ParseMode.all():
raise ValueError(f"Parse mode must be one of {ParseMode.all()}")
setattr(self, '_parse_mode', value)
if value == 'markdown':
warnings.warn("Parse mode `Markdown` is legacy since Telegram Bot API 4.5, "
"retained for backward compatibility. Use `MarkdownV2` instead.\n"
"https://core.telegram.org/bots/api#markdown-style", stacklevel=3)
2018-01-18 17:36:24 +02:00
@parse_mode.deleter
def parse_mode(self):
self.parse_mode = None
def check_auth_widget(self, data):
return check_integrity(self.__token, data)