mirror of
https://github.com/aiogram/aiogram.git
synced 2026-04-08 16:37:47 +00:00
refactor(types): unify InputFile with str in media types (dev-3.x)
Refactored various InputMedia types to support Union[InputFile, str] for media properties, enhancing flexibility and unifying input data types. - Modified media_group.py for AUDIO, PHOTO, VIDEO, and DOCUMENT. - Updated input_media_document.py and input_media_audio.py to use Union[InputFile, str]. - Adjusted bot.py to utilize DateTime for close_date. - Refactored aiohttp session for better file handle and serialization. - Enhanced set_chat_photo.py to accept Union[InputFile, str]. - Re-organized input_file.py with modern BaseModel features. - Added new client/form module for splitting file extraction logic. - Adapted test cases to validate the new structure and behavior.
This commit is contained in:
parent
6f4452f4e0
commit
1ea41076cd
27 changed files with 346 additions and 305 deletions
|
|
@ -165,6 +165,7 @@ from ..types import (
|
|||
ChatMemberOwner,
|
||||
ChatMemberRestricted,
|
||||
ChatPermissions,
|
||||
DateTime,
|
||||
Downloadable,
|
||||
File,
|
||||
ForceReply,
|
||||
|
|
@ -2894,7 +2895,7 @@ class Bot:
|
|||
explanation_parse_mode: Optional[Union[str, Default]] = Default("parse_mode"),
|
||||
explanation_entities: Optional[List[MessageEntity]] = None,
|
||||
open_period: Optional[int] = None,
|
||||
close_date: Optional[Union[datetime.datetime, datetime.timedelta, int]] = None,
|
||||
close_date: Optional[DateTime] = None,
|
||||
is_closed: Optional[bool] = None,
|
||||
disable_notification: Optional[bool] = None,
|
||||
protect_content: Optional[Union[bool, Default]] = Default("protect_content"),
|
||||
|
|
|
|||
62
aiogram/client/form.py
Normal file
62
aiogram/client/form.py
Normal file
|
|
@ -0,0 +1,62 @@
|
|||
from __future__ import annotations
|
||||
|
||||
import secrets
|
||||
import typing
|
||||
from typing import Any, Dict, Optional, Tuple
|
||||
|
||||
from pydantic import BaseModel
|
||||
from pydantic_core import to_json
|
||||
|
||||
from aiogram.types import InputFile
|
||||
|
||||
M = typing.TypeVar("M", bound=BaseModel)
|
||||
|
||||
|
||||
def _extract_files(value: Any) -> Tuple[Any, Dict[str, InputFile]]:
|
||||
files = {}
|
||||
|
||||
# Handle input file
|
||||
if isinstance(value, InputFile):
|
||||
token = secrets.token_urlsafe(10)
|
||||
files[token] = value
|
||||
return f"attach://{token}", files
|
||||
|
||||
# Handle nested models
|
||||
if isinstance(value, BaseModel):
|
||||
modified_nested_model, nested_files = extract_files(value, files)
|
||||
files.update(nested_files)
|
||||
return modified_nested_model, files
|
||||
|
||||
return value, files
|
||||
|
||||
|
||||
def extract_files(model: M, files: Dict[str, InputFile] = None) -> Tuple[M, Dict[str, InputFile]]:
|
||||
if files is None:
|
||||
files = {}
|
||||
update = {}
|
||||
|
||||
for field_name, field_value in model:
|
||||
modified_field_value, nested_files = _extract_files(field_value)
|
||||
update[field_name] = modified_field_value
|
||||
files.update(nested_files)
|
||||
|
||||
# Handle nested models inside of lists (media groups)
|
||||
if isinstance(field_value, list):
|
||||
update[field_name] = []
|
||||
for item in field_value:
|
||||
modified_item, nested_item_files = _extract_files(item)
|
||||
update[field_name].append(modified_item)
|
||||
files.update(nested_item_files)
|
||||
continue
|
||||
|
||||
modified_model = model.model_copy(update=update)
|
||||
return modified_model, files
|
||||
|
||||
|
||||
def form_serialize(value: Any) -> Optional[str]:
|
||||
"""
|
||||
Prepare jsonable value to send
|
||||
"""
|
||||
if isinstance(value, str):
|
||||
return value
|
||||
return to_json(value).decode()
|
||||
|
|
@ -26,7 +26,7 @@ from aiogram.methods import TelegramMethod
|
|||
|
||||
from ...exceptions import TelegramNetworkError
|
||||
from ...methods.base import TelegramType
|
||||
from ...types import InputFile
|
||||
from ..form import extract_files, form_serialize
|
||||
from .base import BaseSession
|
||||
|
||||
if TYPE_CHECKING:
|
||||
|
|
@ -144,17 +144,15 @@ class AiohttpSession(BaseSession):
|
|||
|
||||
def build_form_data(self, bot: Bot, method: TelegramMethod[TelegramType]) -> FormData:
|
||||
form = FormData(quote_fields=False)
|
||||
files: Dict[str, InputFile] = {}
|
||||
for key, value in method.model_dump(warnings=False).items():
|
||||
value = self.prepare_value(value, bot=bot, files=files)
|
||||
if not value:
|
||||
continue
|
||||
form.add_field(key, value)
|
||||
for key, value in files.items():
|
||||
modified_method, files = extract_files(method)
|
||||
|
||||
for key, value in modified_method.model_dump(mode="json", exclude_none=True).items():
|
||||
form.add_field(key, form_serialize(value))
|
||||
for key, file in files.items():
|
||||
form.add_field(
|
||||
key,
|
||||
value.read(bot),
|
||||
filename=value.filename or key,
|
||||
file.read(bot),
|
||||
filename=file.filename or key,
|
||||
)
|
||||
return form
|
||||
|
||||
|
|
|
|||
|
|
@ -1,10 +1,8 @@
|
|||
from __future__ import annotations
|
||||
|
||||
import abc
|
||||
import datetime
|
||||
import json
|
||||
import secrets
|
||||
from enum import Enum
|
||||
import warnings
|
||||
from http import HTTPStatus
|
||||
from types import TracebackType
|
||||
from typing import (
|
||||
|
|
@ -19,8 +17,6 @@ from typing import (
|
|||
cast,
|
||||
)
|
||||
|
||||
from pydantic import ValidationError
|
||||
|
||||
from aiogram.exceptions import (
|
||||
ClientDecodeError,
|
||||
RestartingTelegram,
|
||||
|
|
@ -38,8 +34,6 @@ from aiogram.exceptions import (
|
|||
|
||||
from ...methods import Response, TelegramMethod
|
||||
from ...methods.base import TelegramType
|
||||
from ...types import InputFile, TelegramObject
|
||||
from ..default import Default
|
||||
from ..telegram import PRODUCTION, TelegramAPIServer
|
||||
from .middlewares.manager import RequestMiddlewareManager
|
||||
|
||||
|
|
@ -79,6 +73,12 @@ class BaseSession(abc.ABC):
|
|||
self.timeout = timeout
|
||||
|
||||
self.middleware = RequestMiddlewareManager()
|
||||
if self.json_loads != json.loads or json_dumps != json.dumps:
|
||||
warnings.warn(
|
||||
"Custom json de/serializers are no longer supported.\n"
|
||||
"Using pydantic_core.to_json and pydantic_core.from_json instead.",
|
||||
DeprecationWarning,
|
||||
)
|
||||
|
||||
def check_response(
|
||||
self, bot: Bot, method: TelegramMethod[TelegramType], status_code: int, content: str
|
||||
|
|
@ -86,19 +86,11 @@ class BaseSession(abc.ABC):
|
|||
"""
|
||||
Check response status
|
||||
"""
|
||||
try:
|
||||
json_data = self.json_loads(content)
|
||||
except Exception as e:
|
||||
# Handled error type can't be classified as specific error
|
||||
# in due to decoder can be customized and raise any exception
|
||||
|
||||
raise ClientDecodeError("Failed to decode object", e, content)
|
||||
|
||||
try:
|
||||
response_type = Response[method.__returning__] # type: ignore
|
||||
response = response_type.model_validate(json_data, context={"bot": bot})
|
||||
except ValidationError as e:
|
||||
raise ClientDecodeError("Failed to deserialize object", e, json_data)
|
||||
response = response_type.model_validate_json(content, context={"bot": bot})
|
||||
except ValueError as e:
|
||||
raise ClientDecodeError("Failed to deserialize object", e, content)
|
||||
|
||||
if HTTPStatus.OK <= status_code <= HTTPStatus.IM_USED and response.ok:
|
||||
return response
|
||||
|
|
@ -177,73 +169,6 @@ class BaseSession(abc.ABC):
|
|||
"""
|
||||
yield b""
|
||||
|
||||
def prepare_value(
|
||||
self,
|
||||
value: Any,
|
||||
bot: Bot,
|
||||
files: Dict[str, Any],
|
||||
_dumps_json: bool = True,
|
||||
) -> Any:
|
||||
"""
|
||||
Prepare value before send
|
||||
"""
|
||||
if value is None:
|
||||
return None
|
||||
if isinstance(value, str):
|
||||
return value
|
||||
if isinstance(value, Default):
|
||||
default_value = bot.default[value.name]
|
||||
return self.prepare_value(default_value, bot=bot, files=files, _dumps_json=_dumps_json)
|
||||
if isinstance(value, InputFile):
|
||||
key = secrets.token_urlsafe(10)
|
||||
files[key] = value
|
||||
return f"attach://{key}"
|
||||
if isinstance(value, dict):
|
||||
value = {
|
||||
key: prepared_item
|
||||
for key, item in value.items()
|
||||
if (
|
||||
prepared_item := self.prepare_value(
|
||||
item, bot=bot, files=files, _dumps_json=False
|
||||
)
|
||||
)
|
||||
is not None
|
||||
}
|
||||
if _dumps_json:
|
||||
return self.json_dumps(value)
|
||||
return value
|
||||
if isinstance(value, list):
|
||||
value = [
|
||||
prepared_item
|
||||
for item in value
|
||||
if (
|
||||
prepared_item := self.prepare_value(
|
||||
item, bot=bot, files=files, _dumps_json=False
|
||||
)
|
||||
)
|
||||
is not None
|
||||
]
|
||||
if _dumps_json:
|
||||
return self.json_dumps(value)
|
||||
return value
|
||||
if isinstance(value, datetime.timedelta):
|
||||
now = datetime.datetime.now()
|
||||
return str(round((now + value).timestamp()))
|
||||
if isinstance(value, datetime.datetime):
|
||||
return str(round(value.timestamp()))
|
||||
if isinstance(value, Enum):
|
||||
return self.prepare_value(value.value, bot=bot, files=files)
|
||||
if isinstance(value, TelegramObject):
|
||||
return self.prepare_value(
|
||||
value.model_dump(warnings=False),
|
||||
bot=bot,
|
||||
files=files,
|
||||
_dumps_json=_dumps_json,
|
||||
)
|
||||
if _dumps_json:
|
||||
return self.json_dumps(value)
|
||||
return value
|
||||
|
||||
async def __call__(
|
||||
self,
|
||||
bot: Bot,
|
||||
|
|
|
|||
|
|
@ -3,6 +3,7 @@ from __future__ import annotations
|
|||
import datetime
|
||||
from typing import TYPE_CHECKING, Any, Optional, Union
|
||||
|
||||
from ..types import DateTime
|
||||
from .base import TelegramMethod
|
||||
|
||||
|
||||
|
|
@ -20,7 +21,7 @@ class BanChatMember(TelegramMethod[bool]):
|
|||
"""Unique identifier for the target group or username of the target supergroup or channel (in the format :code:`@channelusername`)"""
|
||||
user_id: int
|
||||
"""Unique identifier of the target user"""
|
||||
until_date: Optional[Union[datetime.datetime, datetime.timedelta, int]] = None
|
||||
until_date: Optional[DateTime] = None
|
||||
"""Date when the user will be unbanned; Unix time. If user is banned for more than 366 days or less than 30 seconds from the current time they are considered to be banned forever. Applied for supergroups and channels only."""
|
||||
revoke_messages: Optional[bool] = None
|
||||
"""Pass :code:`True` to delete all messages from the chat for the user that is being removed. If :code:`False`, the user will be able to see messages in the group that were sent before the user was removed. Always :code:`True` for supergroups and channels."""
|
||||
|
|
|
|||
|
|
@ -12,18 +12,10 @@ from typing import (
|
|||
TypeVar,
|
||||
)
|
||||
|
||||
from pydantic import (
|
||||
BaseModel,
|
||||
ConfigDict,
|
||||
SerializerFunctionWrapHandler,
|
||||
model_serializer,
|
||||
model_validator,
|
||||
)
|
||||
from pydantic import BaseModel, ConfigDict
|
||||
|
||||
from aiogram.client.context_controller import BotContextController
|
||||
from aiogram.client.default import Default, DefaultBotProperties
|
||||
from ..types import InputFile, ResponseParameters
|
||||
from ..types.base import UNSET_TYPE
|
||||
from ..types.base import MutableTelegramObject
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from ..client.bot import Bot
|
||||
|
|
@ -48,44 +40,7 @@ class Response(BaseModel, Generic[TelegramType]):
|
|||
parameters: Optional[ResponseParameters] = None
|
||||
|
||||
|
||||
class TelegramMethod(BotContextController, BaseModel, Generic[TelegramType], ABC):
|
||||
model_config = ConfigDict(
|
||||
extra="allow",
|
||||
populate_by_name=True,
|
||||
arbitrary_types_allowed=True,
|
||||
)
|
||||
|
||||
@model_validator(mode="before")
|
||||
@classmethod
|
||||
def remove_unset(cls, values: Dict[str, Any]) -> Dict[str, Any]:
|
||||
"""
|
||||
Remove UNSET before fields validation.
|
||||
|
||||
We use UNSET as a sentinel value for `parse_mode` and replace it to real value later.
|
||||
It isn't a problem when it's just default value for a model field,
|
||||
but UNSET might be passing to a model initialization from `Bot.method_name`,
|
||||
so we must take care of it and remove it before fields validation.
|
||||
"""
|
||||
if not isinstance(values, dict):
|
||||
return values
|
||||
return {k: v for k, v in values.items() if not isinstance(v, UNSET_TYPE)}
|
||||
|
||||
@model_serializer(mode="wrap", when_used="json")
|
||||
def json_serialize(self, handler: SerializerFunctionWrapHandler) -> Dict[str, Any]:
|
||||
"""
|
||||
Replacing `Default` placeholders with actual values from bot defaults.
|
||||
Ensures JSON serialization backward compatibility by handling non-standard objects.
|
||||
"""
|
||||
if not isinstance(self, TelegramMethod):
|
||||
return handler(self)
|
||||
properties = self.bot.default if self.bot else DefaultBotProperties()
|
||||
default_fields = {
|
||||
field: properties[value.name]
|
||||
for field in self.model_fields.keys()
|
||||
if isinstance(value := getattr(self, field), Default)
|
||||
}
|
||||
return handler(self.model_copy(update=default_fields))
|
||||
|
||||
class TelegramMethod(MutableTelegramObject, Generic[TelegramType], ABC):
|
||||
if TYPE_CHECKING:
|
||||
__returning__: ClassVar[type]
|
||||
__api_method__: ClassVar[str]
|
||||
|
|
|
|||
|
|
@ -3,7 +3,7 @@ from __future__ import annotations
|
|||
import datetime
|
||||
from typing import TYPE_CHECKING, Any, Optional, Union
|
||||
|
||||
from ..types import ChatInviteLink
|
||||
from ..types import ChatInviteLink, DateTime
|
||||
from .base import TelegramMethod
|
||||
|
||||
|
||||
|
|
@ -21,7 +21,7 @@ class CreateChatInviteLink(TelegramMethod[ChatInviteLink]):
|
|||
"""Unique identifier for the target chat or username of the target channel (in the format :code:`@channelusername`)"""
|
||||
name: Optional[str] = None
|
||||
"""Invite link name; 0-32 characters"""
|
||||
expire_date: Optional[Union[datetime.datetime, datetime.timedelta, int]] = None
|
||||
expire_date: Optional[DateTime] = None
|
||||
"""Point in time (Unix timestamp) when the link will expire"""
|
||||
member_limit: Optional[int] = None
|
||||
"""The maximum number of users that can be members of the chat simultaneously after joining the chat via this invite link; 1-99999"""
|
||||
|
|
|
|||
|
|
@ -3,7 +3,7 @@ from __future__ import annotations
|
|||
import datetime
|
||||
from typing import TYPE_CHECKING, Any, Optional, Union
|
||||
|
||||
from ..types import ChatInviteLink
|
||||
from ..types import ChatInviteLink, DateTime
|
||||
from .base import TelegramMethod
|
||||
|
||||
|
||||
|
|
@ -23,7 +23,7 @@ class EditChatInviteLink(TelegramMethod[ChatInviteLink]):
|
|||
"""The invite link to edit"""
|
||||
name: Optional[str] = None
|
||||
"""Invite link name; 0-32 characters"""
|
||||
expire_date: Optional[Union[datetime.datetime, datetime.timedelta, int]] = None
|
||||
expire_date: Optional[DateTime] = None
|
||||
"""Point in time (Unix timestamp) when the link will expire"""
|
||||
member_limit: Optional[int] = None
|
||||
"""The maximum number of users that can be members of the chat simultaneously after joining the chat via this invite link; 1-99999"""
|
||||
|
|
|
|||
|
|
@ -3,7 +3,7 @@ from __future__ import annotations
|
|||
import datetime
|
||||
from typing import TYPE_CHECKING, Any, Optional, Union
|
||||
|
||||
from ..types import ChatPermissions
|
||||
from ..types import ChatPermissions, DateTime
|
||||
from .base import TelegramMethod
|
||||
|
||||
|
||||
|
|
@ -25,7 +25,7 @@ class RestrictChatMember(TelegramMethod[bool]):
|
|||
"""A JSON-serialized object for new user permissions"""
|
||||
use_independent_chat_permissions: Optional[bool] = None
|
||||
"""Pass :code:`True` if chat permissions are set independently. Otherwise, the *can_send_other_messages* and *can_add_web_page_previews* permissions will imply the *can_send_messages*, *can_send_audios*, *can_send_documents*, *can_send_photos*, *can_send_videos*, *can_send_video_notes*, and *can_send_voice_notes* permissions; the *can_send_polls* permission will imply the *can_send_messages* permission."""
|
||||
until_date: Optional[Union[datetime.datetime, datetime.timedelta, int]] = None
|
||||
until_date: Optional[DateTime] = None
|
||||
"""Date when restrictions will be lifted for the user; Unix time. If user is restricted for more than 366 days or less than 30 seconds from the current time, they are considered to be restricted forever"""
|
||||
|
||||
if TYPE_CHECKING:
|
||||
|
|
|
|||
|
|
@ -7,7 +7,7 @@ from pydantic import Field
|
|||
|
||||
from ..client.default import Default
|
||||
from ..types import (
|
||||
UNSET_PARSE_MODE,
|
||||
DateTime,
|
||||
ForceReply,
|
||||
InlineKeyboardMarkup,
|
||||
Message,
|
||||
|
|
@ -53,7 +53,7 @@ class SendPoll(TelegramMethod[Message]):
|
|||
"""A JSON-serialized list of special entities that appear in the poll explanation, which can be specified instead of *parse_mode*"""
|
||||
open_period: Optional[int] = None
|
||||
"""Amount of time in seconds the poll will be active after creation, 5-600. Can't be used together with *close_date*."""
|
||||
close_date: Optional[Union[datetime.datetime, datetime.timedelta, int]] = None
|
||||
close_date: Optional[DateTime] = None
|
||||
"""Point in time (Unix timestamp) when the poll will be automatically closed. Must be at least 5 and no more than 600 seconds in the future. Can't be used together with *open_period*."""
|
||||
is_closed: Optional[bool] = None
|
||||
"""Pass :code:`True` if the poll needs to be immediately closed. This can be useful for poll preview."""
|
||||
|
|
|
|||
|
|
@ -18,7 +18,7 @@ class SetChatPhoto(TelegramMethod[bool]):
|
|||
|
||||
chat_id: Union[int, str]
|
||||
"""Unique identifier for the target chat or username of the target channel (in the format :code:`@channelusername`)"""
|
||||
photo: InputFile
|
||||
photo: Union[InputFile, str]
|
||||
"""New chat photo, uploaded using multipart/form-data"""
|
||||
|
||||
if TYPE_CHECKING:
|
||||
|
|
|
|||
|
|
@ -1,6 +1,6 @@
|
|||
from __future__ import annotations
|
||||
|
||||
from typing import TYPE_CHECKING, Any
|
||||
from typing import TYPE_CHECKING, Any, Union
|
||||
|
||||
from ..types import File, InputFile
|
||||
from .base import TelegramMethod
|
||||
|
|
@ -18,7 +18,7 @@ class UploadStickerFile(TelegramMethod[File]):
|
|||
|
||||
user_id: int
|
||||
"""User identifier of sticker file owner"""
|
||||
sticker: InputFile
|
||||
sticker: Union[InputFile, str]
|
||||
"""A file with the sticker in .WEBP, .PNG, .TGS, or .WEBM format. See `https://core.telegram.org/stickers <https://core.telegram.org/stickers>`_`https://core.telegram.org/stickers <https://core.telegram.org/stickers>`_ for technical requirements. :ref:`More information on Sending Files » <sending-files>`"""
|
||||
sticker_format: str
|
||||
"""Format of the sticker, must be one of 'static', 'animated', 'video'"""
|
||||
|
|
|
|||
|
|
@ -22,9 +22,6 @@ class TelegramObject(BotContextController, BaseModel):
|
|||
populate_by_name=True,
|
||||
arbitrary_types_allowed=True,
|
||||
defer_build=True,
|
||||
json_encoders={
|
||||
Default: lambda obj: obj.name,
|
||||
}
|
||||
)
|
||||
|
||||
@model_validator(mode="before")
|
||||
|
|
@ -43,20 +40,18 @@ class TelegramObject(BotContextController, BaseModel):
|
|||
return {k: v for k, v in values.items() if not isinstance(v, UNSET_TYPE)}
|
||||
|
||||
@model_serializer(mode="wrap", when_used="json")
|
||||
def json_serialize(self, handler: SerializerFunctionWrapHandler) -> Dict[str, Any]:
|
||||
def json_serialize(self, serializer: SerializerFunctionWrapHandler):
|
||||
"""
|
||||
Replacing `Default` placeholders with actual values from bot defaults.
|
||||
Ensures JSON serialization backward compatibility by handling non-standard objects.
|
||||
"""
|
||||
if not isinstance(self, TelegramObject):
|
||||
return handler(self)
|
||||
properties = self.bot.default if self.bot else DefaultBotProperties()
|
||||
default_fields = {
|
||||
field: properties[value.name]
|
||||
for field in self.model_fields.keys()
|
||||
if isinstance(value := getattr(self, field), Default)
|
||||
}
|
||||
return handler(self.model_copy(update=default_fields))
|
||||
if isinstance(self, BotContextController) and isinstance(self, BaseModel):
|
||||
properties = self.bot.default if self.bot else DefaultBotProperties()
|
||||
default_fields = {
|
||||
key: properties[value.name] for key, value in self if isinstance(value, Default)
|
||||
}
|
||||
return serializer(self.model_copy(update=default_fields))
|
||||
return serializer(self)
|
||||
|
||||
|
||||
class MutableTelegramObject(TelegramObject):
|
||||
|
|
|
|||
|
|
@ -28,7 +28,7 @@ class CallbackQuery(TelegramObject):
|
|||
"""Sender"""
|
||||
chat_instance: str
|
||||
"""Global identifier, uniquely corresponding to the chat to which the message with the callback button was sent. Useful for high scores in :class:`aiogram.methods.games.Games`."""
|
||||
message: Optional[Union[Message, InaccessibleMessage]] = None
|
||||
message: Optional[Union[InaccessibleMessage, Message]] = None
|
||||
"""*Optional*. Message sent by the bot with the callback button that originated the query"""
|
||||
inline_message_id: Optional[str] = None
|
||||
"""*Optional*. Identifier of the message sent via the bot in inline mode, that originated the query."""
|
||||
|
|
|
|||
|
|
@ -1,13 +1,23 @@
|
|||
from datetime import datetime
|
||||
from datetime import datetime, timedelta
|
||||
from typing import Annotated, Union
|
||||
|
||||
from pydantic import PlainSerializer
|
||||
from typing_extensions import Annotated
|
||||
|
||||
# Make datetime compatible with Telegram Bot API (unixtime)
|
||||
|
||||
|
||||
def _serialize_datetime(dt: Union[datetime, timedelta, int]) -> int:
|
||||
if isinstance(dt, int):
|
||||
return dt
|
||||
if isinstance(dt, timedelta):
|
||||
dt = datetime.now() + dt
|
||||
return int(dt.timestamp())
|
||||
|
||||
|
||||
DateTime = Annotated[
|
||||
datetime,
|
||||
Union[datetime, timedelta, int],
|
||||
PlainSerializer(
|
||||
func=lambda dt: int(dt.timestamp()),
|
||||
func=_serialize_datetime,
|
||||
return_type=int,
|
||||
when_used="json-unless-none",
|
||||
),
|
||||
|
|
|
|||
|
|
@ -1,12 +1,14 @@
|
|||
from __future__ import annotations
|
||||
|
||||
import io
|
||||
import os
|
||||
from abc import ABC, abstractmethod
|
||||
from abc import abstractmethod
|
||||
from pathlib import Path
|
||||
from typing import TYPE_CHECKING, Any, AsyncGenerator, Dict, Optional, Union
|
||||
|
||||
import aiofiles
|
||||
from pydantic import BaseModel, Field, model_validator
|
||||
|
||||
from aiogram.client.context_controller import BotContextController
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from aiogram.client.bot import Bot
|
||||
|
|
@ -14,23 +16,28 @@ if TYPE_CHECKING:
|
|||
DEFAULT_CHUNK_SIZE = 64 * 1024 # 64 kb
|
||||
|
||||
|
||||
class InputFile(ABC):
|
||||
class InputFile(BaseModel):
|
||||
"""
|
||||
Base class for input files.
|
||||
This object represents the contents of a file to be uploaded. Must be posted using multipart/form-data in the usual way that files are uploaded via the browser.
|
||||
Should not be used directly. Look at :class:`BufferedInputFile`, :class:`FSInputFile` :class:`URLInputFile`
|
||||
|
||||
Source: https://core.telegram.org/bots/api#inputfile
|
||||
"""
|
||||
|
||||
filename: Optional[str] = None
|
||||
"""Name of the given file"""
|
||||
chunk_size: int = DEFAULT_CHUNK_SIZE
|
||||
"""Reader chunks size"""
|
||||
|
||||
def __init__(self, filename: Optional[str] = None, chunk_size: int = DEFAULT_CHUNK_SIZE):
|
||||
"""
|
||||
Base class for input files. Should not be used directly.
|
||||
Look at :class:`BufferedInputFile`, :class:`FSInputFile` :class:`URLInputFile`
|
||||
|
||||
:param filename: name of the given file
|
||||
:param chunk_size: reader chunks size
|
||||
Backward compatibility (positional arguments)
|
||||
"""
|
||||
self.filename = filename
|
||||
self.chunk_size = chunk_size
|
||||
super().__init__(
|
||||
filename=filename,
|
||||
chunk_size=chunk_size,
|
||||
)
|
||||
|
||||
@abstractmethod
|
||||
async def read(self, bot: "Bot") -> AsyncGenerator[bytes, None]: # pragma: no cover
|
||||
|
|
@ -38,17 +45,24 @@ class InputFile(ABC):
|
|||
|
||||
|
||||
class BufferedInputFile(InputFile):
|
||||
"""
|
||||
Represents object for uploading files from filesystem
|
||||
"""
|
||||
|
||||
data: bytes
|
||||
"""File bytes"""
|
||||
filename: str
|
||||
"""Filename to be propagated to telegram"""
|
||||
|
||||
def __init__(self, file: bytes, filename: str, chunk_size: int = DEFAULT_CHUNK_SIZE):
|
||||
"""
|
||||
Represents object for uploading files from filesystem
|
||||
|
||||
:param file: Bytes
|
||||
:param filename: Filename to be propagated to telegram.
|
||||
:param chunk_size: Uploading chunk size
|
||||
Backward compatibility (positional arguments and old naming)
|
||||
"""
|
||||
super().__init__(filename=filename, chunk_size=chunk_size)
|
||||
|
||||
self.data = file
|
||||
super(InputFile, self).__init__(
|
||||
data=file,
|
||||
filename=filename,
|
||||
chunk_size=chunk_size,
|
||||
)
|
||||
|
||||
@classmethod
|
||||
def from_file(
|
||||
|
|
@ -66,11 +80,10 @@ class BufferedInputFile(InputFile):
|
|||
:param chunk_size: Uploading chunk size
|
||||
:return: instance of :obj:`BufferedInputFile`
|
||||
"""
|
||||
if filename is None:
|
||||
filename = os.path.basename(path)
|
||||
with open(path, "rb") as f:
|
||||
data = f.read()
|
||||
return cls(data, filename=filename, chunk_size=chunk_size)
|
||||
path = Path(path)
|
||||
filename = filename or path.name
|
||||
file = path.read_bytes()
|
||||
return cls(file=file, filename=filename, chunk_size=chunk_size)
|
||||
|
||||
async def read(self, bot: "Bot") -> AsyncGenerator[bytes, None]:
|
||||
buffer = io.BytesIO(self.data)
|
||||
|
|
@ -79,6 +92,15 @@ class BufferedInputFile(InputFile):
|
|||
|
||||
|
||||
class FSInputFile(InputFile):
|
||||
"""
|
||||
Represents object for uploading files from filesystem
|
||||
"""
|
||||
|
||||
path: Path
|
||||
"""Path to file"""
|
||||
filename: str = "" # set it from path after validation
|
||||
"""Filename to be propagated to telegram"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
path: Union[str, Path],
|
||||
|
|
@ -86,18 +108,17 @@ class FSInputFile(InputFile):
|
|||
chunk_size: int = DEFAULT_CHUNK_SIZE,
|
||||
):
|
||||
"""
|
||||
Represents object for uploading files from filesystem
|
||||
|
||||
:param path: Path to file
|
||||
:param filename: Filename to be propagated to telegram.
|
||||
By default, will be parsed from path
|
||||
:param chunk_size: Uploading chunk size
|
||||
Backward compatibility (positional arguments)
|
||||
"""
|
||||
if filename is None:
|
||||
filename = os.path.basename(path)
|
||||
super().__init__(filename=filename, chunk_size=chunk_size)
|
||||
super(InputFile, self).__init__(
|
||||
path=Path(path),
|
||||
filename=filename or "",
|
||||
chunk_size=chunk_size,
|
||||
)
|
||||
|
||||
self.path = path
|
||||
@model_validator(mode="after")
|
||||
def filename_from_path(self):
|
||||
self.filename = self.filename or Path(self.path).name
|
||||
|
||||
async def read(self, bot: "Bot") -> AsyncGenerator[bytes, None]:
|
||||
async with aiofiles.open(self.path, "rb") as f:
|
||||
|
|
@ -105,7 +126,18 @@ class FSInputFile(InputFile):
|
|||
yield chunk
|
||||
|
||||
|
||||
class URLInputFile(InputFile):
|
||||
class URLInputFile(BotContextController, InputFile):
|
||||
"""
|
||||
Represents object for streaming files from internet
|
||||
"""
|
||||
|
||||
url: str
|
||||
"""URL in internet"""
|
||||
headers: Dict[str, Any] = Field(default_factory=dict)
|
||||
"""HTTP Headers"""
|
||||
timeout: int = 30
|
||||
"""Timeout for downloading"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
url: str,
|
||||
|
|
@ -116,27 +148,20 @@ class URLInputFile(InputFile):
|
|||
bot: Optional["Bot"] = None,
|
||||
):
|
||||
"""
|
||||
Represents object for streaming files from internet
|
||||
|
||||
:param url: URL in internet
|
||||
:param headers: HTTP Headers
|
||||
:param filename: Filename to be propagated to telegram.
|
||||
:param chunk_size: Uploading chunk size
|
||||
:param timeout: Timeout for downloading
|
||||
:param bot: Bot instance to use HTTP session from.
|
||||
If not specified, will be used current bot
|
||||
Backward compatibility (positional arguments and bot context)
|
||||
"""
|
||||
super().__init__(filename=filename, chunk_size=chunk_size)
|
||||
if headers is None:
|
||||
headers = {}
|
||||
super(InputFile, self).__init__(
|
||||
url=url,
|
||||
headers=headers or {},
|
||||
timeout=timeout,
|
||||
filename=filename,
|
||||
chunk_size=chunk_size,
|
||||
)
|
||||
|
||||
self.url = url
|
||||
self.headers = headers
|
||||
self.timeout = timeout
|
||||
self.bot = bot
|
||||
self._bot = bot
|
||||
|
||||
async def read(self, bot: "Bot") -> AsyncGenerator[bytes, None]:
|
||||
bot = self.bot or bot
|
||||
async def read(self, bot: Optional["Bot"] = None) -> AsyncGenerator[bytes, None]:
|
||||
bot = self.bot or bot # FIXME: invalid order suspected
|
||||
stream = bot.session.stream_content(
|
||||
url=self.url,
|
||||
headers=self.headers,
|
||||
|
|
|
|||
|
|
@ -20,7 +20,7 @@ class InputMediaAnimation(InputMedia):
|
|||
|
||||
type: Literal[InputMediaType.ANIMATION] = InputMediaType.ANIMATION
|
||||
"""Type of the result, must be *animation*"""
|
||||
media: Union[str, InputFile]
|
||||
media: Union[InputFile, str]
|
||||
"""File to send. Pass a file_id to send a file that exists on the Telegram servers (recommended), pass an HTTP URL for Telegram to get a file from the Internet, or pass 'attach://<file_attach_name>' to upload a new one using multipart/form-data under <file_attach_name> name. :ref:`More information on Sending Files » <sending-files>`"""
|
||||
thumbnail: Optional[InputFile] = None
|
||||
"""*Optional*. Thumbnail of the file sent; can be ignored if thumbnail generation for the file is supported server-side. The thumbnail should be in JPEG format and less than 200 kB in size. A thumbnail's width and height should not exceed 320. Ignored if the file is not uploaded using multipart/form-data. Thumbnails can't be reused and can be only uploaded as a new file, so you can pass 'attach://<file_attach_name>' if the thumbnail was uploaded using multipart/form-data under <file_attach_name>. :ref:`More information on Sending Files » <sending-files>`"""
|
||||
|
|
@ -47,7 +47,7 @@ class InputMediaAnimation(InputMedia):
|
|||
__pydantic__self__,
|
||||
*,
|
||||
type: Literal[InputMediaType.ANIMATION] = InputMediaType.ANIMATION,
|
||||
media: Union[str, InputFile],
|
||||
media: Union[InputFile, str],
|
||||
thumbnail: Optional[InputFile] = None,
|
||||
caption: Optional[str] = None,
|
||||
parse_mode: Optional[Union[str, Default]] = Default("parse_mode"),
|
||||
|
|
|
|||
|
|
@ -20,7 +20,7 @@ class InputMediaAudio(InputMedia):
|
|||
|
||||
type: Literal[InputMediaType.AUDIO] = InputMediaType.AUDIO
|
||||
"""Type of the result, must be *audio*"""
|
||||
media: Union[str, InputFile]
|
||||
media: Union[InputFile, str]
|
||||
"""File to send. Pass a file_id to send a file that exists on the Telegram servers (recommended), pass an HTTP URL for Telegram to get a file from the Internet, or pass 'attach://<file_attach_name>' to upload a new one using multipart/form-data under <file_attach_name> name. :ref:`More information on Sending Files » <sending-files>`"""
|
||||
thumbnail: Optional[InputFile] = None
|
||||
"""*Optional*. Thumbnail of the file sent; can be ignored if thumbnail generation for the file is supported server-side. The thumbnail should be in JPEG format and less than 200 kB in size. A thumbnail's width and height should not exceed 320. Ignored if the file is not uploaded using multipart/form-data. Thumbnails can't be reused and can be only uploaded as a new file, so you can pass 'attach://<file_attach_name>' if the thumbnail was uploaded using multipart/form-data under <file_attach_name>. :ref:`More information on Sending Files » <sending-files>`"""
|
||||
|
|
@ -45,7 +45,7 @@ class InputMediaAudio(InputMedia):
|
|||
__pydantic__self__,
|
||||
*,
|
||||
type: Literal[InputMediaType.AUDIO] = InputMediaType.AUDIO,
|
||||
media: Union[str, InputFile],
|
||||
media: Union[InputFile, str],
|
||||
thumbnail: Optional[InputFile] = None,
|
||||
caption: Optional[str] = None,
|
||||
parse_mode: Optional[Union[str, Default]] = Default("parse_mode"),
|
||||
|
|
|
|||
|
|
@ -20,7 +20,7 @@ class InputMediaDocument(InputMedia):
|
|||
|
||||
type: Literal[InputMediaType.DOCUMENT] = InputMediaType.DOCUMENT
|
||||
"""Type of the result, must be *document*"""
|
||||
media: Union[str, InputFile]
|
||||
media: Union[InputFile, str]
|
||||
"""File to send. Pass a file_id to send a file that exists on the Telegram servers (recommended), pass an HTTP URL for Telegram to get a file from the Internet, or pass 'attach://<file_attach_name>' to upload a new one using multipart/form-data under <file_attach_name> name. :ref:`More information on Sending Files » <sending-files>`"""
|
||||
thumbnail: Optional[InputFile] = None
|
||||
"""*Optional*. Thumbnail of the file sent; can be ignored if thumbnail generation for the file is supported server-side. The thumbnail should be in JPEG format and less than 200 kB in size. A thumbnail's width and height should not exceed 320. Ignored if the file is not uploaded using multipart/form-data. Thumbnails can't be reused and can be only uploaded as a new file, so you can pass 'attach://<file_attach_name>' if the thumbnail was uploaded using multipart/form-data under <file_attach_name>. :ref:`More information on Sending Files » <sending-files>`"""
|
||||
|
|
@ -41,7 +41,7 @@ class InputMediaDocument(InputMedia):
|
|||
__pydantic__self__,
|
||||
*,
|
||||
type: Literal[InputMediaType.DOCUMENT] = InputMediaType.DOCUMENT,
|
||||
media: Union[str, InputFile],
|
||||
media: Union[InputFile, str],
|
||||
thumbnail: Optional[InputFile] = None,
|
||||
caption: Optional[str] = None,
|
||||
parse_mode: Optional[Union[str, Default]] = Default("parse_mode"),
|
||||
|
|
|
|||
|
|
@ -20,7 +20,7 @@ class InputMediaPhoto(InputMedia):
|
|||
|
||||
type: Literal[InputMediaType.PHOTO] = InputMediaType.PHOTO
|
||||
"""Type of the result, must be *photo*"""
|
||||
media: Union[str, InputFile]
|
||||
media: Union[InputFile, str]
|
||||
"""File to send. Pass a file_id to send a file that exists on the Telegram servers (recommended), pass an HTTP URL for Telegram to get a file from the Internet, or pass 'attach://<file_attach_name>' to upload a new one using multipart/form-data under <file_attach_name> name. :ref:`More information on Sending Files » <sending-files>`"""
|
||||
caption: Optional[str] = None
|
||||
"""*Optional*. Caption of the photo to be sent, 0-1024 characters after entities parsing"""
|
||||
|
|
@ -39,7 +39,7 @@ class InputMediaPhoto(InputMedia):
|
|||
__pydantic__self__,
|
||||
*,
|
||||
type: Literal[InputMediaType.PHOTO] = InputMediaType.PHOTO,
|
||||
media: Union[str, InputFile],
|
||||
media: Union[InputFile, str],
|
||||
caption: Optional[str] = None,
|
||||
parse_mode: Optional[Union[str, Default]] = Default("parse_mode"),
|
||||
caption_entities: Optional[List[MessageEntity]] = None,
|
||||
|
|
|
|||
|
|
@ -20,7 +20,7 @@ class InputMediaVideo(InputMedia):
|
|||
|
||||
type: Literal[InputMediaType.VIDEO] = InputMediaType.VIDEO
|
||||
"""Type of the result, must be *video*"""
|
||||
media: Union[str, InputFile]
|
||||
media: Union[InputFile, str]
|
||||
"""File to send. Pass a file_id to send a file that exists on the Telegram servers (recommended), pass an HTTP URL for Telegram to get a file from the Internet, or pass 'attach://<file_attach_name>' to upload a new one using multipart/form-data under <file_attach_name> name. :ref:`More information on Sending Files » <sending-files>`"""
|
||||
thumbnail: Optional[InputFile] = None
|
||||
"""*Optional*. Thumbnail of the file sent; can be ignored if thumbnail generation for the file is supported server-side. The thumbnail should be in JPEG format and less than 200 kB in size. A thumbnail's width and height should not exceed 320. Ignored if the file is not uploaded using multipart/form-data. Thumbnails can't be reused and can be only uploaded as a new file, so you can pass 'attach://<file_attach_name>' if the thumbnail was uploaded using multipart/form-data under <file_attach_name>. :ref:`More information on Sending Files » <sending-files>`"""
|
||||
|
|
@ -49,7 +49,7 @@ class InputMediaVideo(InputMedia):
|
|||
__pydantic__self__,
|
||||
*,
|
||||
type: Literal[InputMediaType.VIDEO] = InputMediaType.VIDEO,
|
||||
media: Union[str, InputFile],
|
||||
media: Union[InputFile, str],
|
||||
thumbnail: Optional[InputFile] = None,
|
||||
caption: Optional[str] = None,
|
||||
parse_mode: Optional[Union[str, Default]] = Default("parse_mode"),
|
||||
|
|
|
|||
|
|
@ -63,7 +63,7 @@ class MediaGroupBuilder:
|
|||
self,
|
||||
*,
|
||||
type: Literal[InputMediaType.AUDIO],
|
||||
media: Union[str, InputFile],
|
||||
media: Union[InputFile, str],
|
||||
caption: Optional[str] = None,
|
||||
parse_mode: Optional[str] = UNSET_PARSE_MODE,
|
||||
caption_entities: Optional[List[MessageEntity]] = None,
|
||||
|
|
@ -79,7 +79,7 @@ class MediaGroupBuilder:
|
|||
self,
|
||||
*,
|
||||
type: Literal[InputMediaType.PHOTO],
|
||||
media: Union[str, InputFile],
|
||||
media: Union[InputFile, str],
|
||||
caption: Optional[str] = None,
|
||||
parse_mode: Optional[str] = UNSET_PARSE_MODE,
|
||||
caption_entities: Optional[List[MessageEntity]] = None,
|
||||
|
|
@ -93,7 +93,7 @@ class MediaGroupBuilder:
|
|||
self,
|
||||
*,
|
||||
type: Literal[InputMediaType.VIDEO],
|
||||
media: Union[str, InputFile],
|
||||
media: Union[InputFile, str],
|
||||
thumbnail: Optional[Union[InputFile, str]] = None,
|
||||
caption: Optional[str] = None,
|
||||
parse_mode: Optional[str] = UNSET_PARSE_MODE,
|
||||
|
|
@ -112,7 +112,7 @@ class MediaGroupBuilder:
|
|||
self,
|
||||
*,
|
||||
type: Literal[InputMediaType.DOCUMENT],
|
||||
media: Union[str, InputFile],
|
||||
media: Union[InputFile, str],
|
||||
thumbnail: Optional[Union[InputFile, str]] = None,
|
||||
caption: Optional[str] = None,
|
||||
parse_mode: Optional[str] = UNSET_PARSE_MODE,
|
||||
|
|
@ -144,7 +144,7 @@ class MediaGroupBuilder:
|
|||
|
||||
def add_audio(
|
||||
self,
|
||||
media: Union[str, InputFile],
|
||||
media: Union[InputFile, str],
|
||||
thumbnail: Optional[InputFile] = None,
|
||||
caption: Optional[str] = None,
|
||||
parse_mode: Optional[str] = UNSET_PARSE_MODE,
|
||||
|
|
@ -194,7 +194,7 @@ class MediaGroupBuilder:
|
|||
|
||||
def add_photo(
|
||||
self,
|
||||
media: Union[str, InputFile],
|
||||
media: Union[InputFile, str],
|
||||
caption: Optional[str] = None,
|
||||
parse_mode: Optional[str] = UNSET_PARSE_MODE,
|
||||
caption_entities: Optional[List[MessageEntity]] = None,
|
||||
|
|
@ -233,7 +233,7 @@ class MediaGroupBuilder:
|
|||
|
||||
def add_video(
|
||||
self,
|
||||
media: Union[str, InputFile],
|
||||
media: Union[InputFile, str],
|
||||
thumbnail: Optional[InputFile] = None,
|
||||
caption: Optional[str] = None,
|
||||
parse_mode: Optional[str] = UNSET_PARSE_MODE,
|
||||
|
|
@ -295,7 +295,7 @@ class MediaGroupBuilder:
|
|||
|
||||
def add_document(
|
||||
self,
|
||||
media: Union[str, InputFile],
|
||||
media: Union[InputFile, str],
|
||||
thumbnail: Optional[InputFile] = None,
|
||||
caption: Optional[str] = None,
|
||||
parse_mode: Optional[str] = UNSET_PARSE_MODE,
|
||||
|
|
|
|||
|
|
@ -10,9 +10,9 @@ from aiohttp.typedefs import Handler
|
|||
from aiohttp.web_middlewares import middleware
|
||||
|
||||
from aiogram import Bot, Dispatcher, loggers
|
||||
from aiogram.client.form import extract_files, form_serialize
|
||||
from aiogram.methods import TelegramMethod
|
||||
from aiogram.methods.base import TelegramType
|
||||
from aiogram.types import InputFile
|
||||
from aiogram.webhook.security import IPFilter
|
||||
|
||||
|
||||
|
|
@ -141,13 +141,11 @@ class BaseRequestHandler(ABC):
|
|||
|
||||
async def _handle_request_background(self, bot: Bot, request: web.Request) -> web.Response:
|
||||
feed_update_task = asyncio.create_task(
|
||||
self._background_feed_update(
|
||||
bot=bot, update=await request.json(loads=bot.session.json_loads)
|
||||
)
|
||||
self._background_feed_update(bot=bot, update=await request.json())
|
||||
)
|
||||
self._background_feed_update_tasks.add(feed_update_task)
|
||||
feed_update_task.add_done_callback(self._background_feed_update_tasks.discard)
|
||||
return web.json_response({}, dumps=bot.session.json_dumps)
|
||||
return web.json_response({})
|
||||
|
||||
def _build_response_writer(
|
||||
self, bot: Bot, result: Optional[TelegramMethod[TelegramType]]
|
||||
|
|
@ -162,12 +160,10 @@ class BaseRequestHandler(ABC):
|
|||
payload = writer.append(result.__api_method__)
|
||||
payload.set_content_disposition("form-data", name="method")
|
||||
|
||||
files: Dict[str, InputFile] = {}
|
||||
for key, value in result.model_dump(warnings=False).items():
|
||||
value = bot.session.prepare_value(value, bot=bot, files=files)
|
||||
if not value:
|
||||
continue
|
||||
payload = writer.append(value)
|
||||
modified_result, files = extract_files(result)
|
||||
|
||||
for key, value in modified_result.model_dump(mode="json", exclude_none=True).items():
|
||||
payload = writer.append(form_serialize(value))
|
||||
payload.set_content_disposition("form-data", name=key)
|
||||
|
||||
for key, value in files.items():
|
||||
|
|
@ -183,7 +179,7 @@ class BaseRequestHandler(ABC):
|
|||
async def _handle_request(self, bot: Bot, request: web.Request) -> web.Response:
|
||||
result: Optional[TelegramMethod[Any]] = await self.dispatcher.feed_webhook_update(
|
||||
bot,
|
||||
await request.json(loads=bot.session.json_loads),
|
||||
await request.json(),
|
||||
**self.data,
|
||||
)
|
||||
return web.Response(body=self._build_response_writer(bot=bot, result=result))
|
||||
|
|
|
|||
|
|
@ -5,7 +5,7 @@ from aiogram import Bot
|
|||
from aiogram.client.session.base import BaseSession
|
||||
from aiogram.methods import TelegramMethod
|
||||
from aiogram.methods.base import Response, TelegramType
|
||||
from aiogram.types import UNSET_PARSE_MODE, ResponseParameters, User
|
||||
from aiogram.types import ResponseParameters, User
|
||||
|
||||
|
||||
class MockedSession(BaseSession):
|
||||
|
|
@ -29,7 +29,7 @@ class MockedSession(BaseSession):
|
|||
self,
|
||||
bot: Bot,
|
||||
method: TelegramMethod[TelegramType],
|
||||
timeout: Optional[int] = UNSET_PARSE_MODE,
|
||||
timeout: Optional[int] = None,
|
||||
) -> TelegramType:
|
||||
self.closed = False
|
||||
self.requests.append(method)
|
||||
|
|
|
|||
|
|
@ -21,7 +21,13 @@ from aiogram.client.session import aiohttp
|
|||
from aiogram.client.session.aiohttp import AiohttpSession
|
||||
from aiogram.exceptions import TelegramNetworkError
|
||||
from aiogram.methods import TelegramMethod
|
||||
from aiogram.types import UNSET_PARSE_MODE, InputFile
|
||||
from aiogram.types import (
|
||||
InputFile,
|
||||
InputMediaAudio,
|
||||
InputMediaDocument,
|
||||
InputMediaPhoto,
|
||||
InputMediaVideo,
|
||||
)
|
||||
from tests.mocked_bot import MockedBot
|
||||
|
||||
|
||||
|
|
@ -139,27 +145,53 @@ class TestAiohttpSession:
|
|||
assert all(isinstance(field[2], str) for field in fields)
|
||||
assert "null_" not in [item[0]["name"] for item in fields]
|
||||
|
||||
def test_build_form_data_with_file(self, bot: Bot):
|
||||
class TestMethod(TelegramMethod[bool]):
|
||||
__api_method__ = "test"
|
||||
__returning__ = bool
|
||||
|
||||
document: Union[InputMediaAudio, InputMediaDocument, InputMediaPhoto, InputMediaVideo]
|
||||
|
||||
session = AiohttpSession()
|
||||
form = session.build_form_data(
|
||||
bot,
|
||||
TestMethod(document=InputMediaDocument(media=BareInputFile(filename="file.txt"))),
|
||||
)
|
||||
|
||||
fields = form._fields
|
||||
assert len(fields) == 2
|
||||
assert fields[0][0]["name"] == "document"
|
||||
assert fields[0][2].count("attach://") == 1
|
||||
assert fields[1][0]["filename"] == "file.txt"
|
||||
assert isinstance(fields[1][2], AsyncIterable)
|
||||
|
||||
def test_build_form_data_with_files(self, bot: Bot):
|
||||
class TestMethod(TelegramMethod[bool]):
|
||||
__api_method__ = "test"
|
||||
__returning__ = bool
|
||||
|
||||
key: str
|
||||
document: InputFile
|
||||
group: List[
|
||||
Union[InputMediaAudio, InputMediaDocument, InputMediaPhoto, InputMediaVideo]
|
||||
]
|
||||
|
||||
session = AiohttpSession()
|
||||
form = session.build_form_data(
|
||||
bot,
|
||||
TestMethod(key="value", document=BareInputFile(filename="file.txt")),
|
||||
TestMethod(
|
||||
group=[
|
||||
InputMediaDocument(media=BareInputFile(filename="file.txt")),
|
||||
InputMediaDocument(media=BareInputFile(filename="file2.txt")),
|
||||
]
|
||||
),
|
||||
)
|
||||
|
||||
fields = form._fields
|
||||
|
||||
assert len(fields) == 3
|
||||
assert fields[1][0]["name"] == "document"
|
||||
assert fields[1][2].startswith("attach://")
|
||||
assert fields[2][0]["name"] == fields[1][2][9:]
|
||||
assert fields[2][0]["filename"] == "file.txt"
|
||||
assert fields[0][0]["name"] == "group"
|
||||
assert fields[0][2].count("attach://") == 2
|
||||
assert fields[1][0]["filename"] == "file.txt"
|
||||
assert fields[2][0]["filename"] == "file2.txt"
|
||||
assert isinstance(fields[1][2], AsyncIterable)
|
||||
assert isinstance(fields[2][2], AsyncIterable)
|
||||
|
||||
async def test_make_request(self, bot: MockedBot, aresponses: ResponsesMockServer):
|
||||
|
|
|
|||
|
|
@ -1,6 +1,6 @@
|
|||
import datetime
|
||||
import json
|
||||
from typing import Any, AsyncContextManager, AsyncGenerator, Dict, Optional
|
||||
from typing import Any, AsyncContextManager, AsyncGenerator, Dict, Optional, Union
|
||||
from unittest.mock import AsyncMock, patch
|
||||
|
||||
import pytest
|
||||
|
|
@ -8,6 +8,7 @@ from pytz import utc
|
|||
|
||||
from aiogram import Bot
|
||||
from aiogram.client.default import Default, DefaultBotProperties
|
||||
from aiogram.client.form import form_serialize
|
||||
from aiogram.client.session.base import BaseSession, TelegramType
|
||||
from aiogram.client.telegram import PRODUCTION, TelegramAPIServer
|
||||
from aiogram.enums import ChatType, ParseMode, TopicIconColor
|
||||
|
|
@ -26,8 +27,18 @@ from aiogram.exceptions import (
|
|||
TelegramUnauthorizedError,
|
||||
)
|
||||
from aiogram.methods import DeleteMessage, GetMe, TelegramMethod
|
||||
from aiogram.types import UNSET_PARSE_MODE, User, LinkPreviewOptions
|
||||
from aiogram.types.base import UNSET_DISABLE_WEB_PAGE_PREVIEW, UNSET_PROTECT_CONTENT
|
||||
from aiogram.types import (
|
||||
UNSET_PARSE_MODE,
|
||||
DateTime,
|
||||
InputFile,
|
||||
LinkPreviewOptions,
|
||||
User,
|
||||
)
|
||||
from aiogram.types.base import (
|
||||
UNSET_DISABLE_WEB_PAGE_PREVIEW,
|
||||
UNSET_PROTECT_CONTENT,
|
||||
TelegramObject,
|
||||
)
|
||||
from tests.mocked_bot import MockedBot
|
||||
|
||||
|
||||
|
|
@ -39,7 +50,7 @@ class CustomSession(BaseSession):
|
|||
self,
|
||||
token: str,
|
||||
method: TelegramMethod[TelegramType],
|
||||
timeout: Optional[int] = UNSET_PARSE_MODE,
|
||||
timeout: Optional[int] = None,
|
||||
) -> None: # type: ignore
|
||||
assert isinstance(token, str)
|
||||
assert isinstance(method, TelegramMethod)
|
||||
|
|
@ -94,41 +105,30 @@ class TestBaseSession:
|
|||
@pytest.mark.parametrize(
|
||||
"value,result",
|
||||
[
|
||||
[None, None],
|
||||
[None, ...],
|
||||
["text", "text"],
|
||||
[ChatType.PRIVATE, "private"],
|
||||
[TopicIconColor.RED, "16478047"],
|
||||
[42, "42"],
|
||||
[True, "true"],
|
||||
[["test"], '["test"]'],
|
||||
[["test", ["test"]], '["test", ["test"]]'],
|
||||
[[{"test": "pass", "spam": None}], '[{"test": "pass"}]'],
|
||||
[{"test": "pass", "number": 42, "spam": None}, '{"test": "pass", "number": 42}'],
|
||||
[{"foo": {"test": "pass", "spam": None}}, '{"foo": {"test": "pass"}}'],
|
||||
[["test", ["test"]], '["test",["test"]]'],
|
||||
[[{"test": "pass"}], '[{"test":"pass"}]'],
|
||||
[{"test": "pass", "number": 42}, '{"test":"pass","number":42}'],
|
||||
[{"foo": {"test": "pass"}}, '{"foo":{"test":"pass"}}'],
|
||||
[
|
||||
datetime.datetime(
|
||||
year=2017, month=5, day=17, hour=4, minute=11, second=42, tzinfo=utc
|
||||
),
|
||||
"1494994302",
|
||||
],
|
||||
[
|
||||
{"link_preview": LinkPreviewOptions(is_disabled=True)},
|
||||
'{"link_preview": {"is_disabled": true}}',
|
||||
],
|
||||
[LinkPreviewOptions(is_disabled=True, url=None), '{"is_disabled":true}'],
|
||||
[Default("parse_mode"), "HTML"],
|
||||
[Default("protect_content"), "true"],
|
||||
[Default("link_preview_is_disabled"), "true"],
|
||||
],
|
||||
)
|
||||
def test_prepare_value(self, value: Any, result: str, bot: MockedBot):
|
||||
session = CustomSession()
|
||||
|
||||
assert session.prepare_value(value, bot=bot, files={}) == result
|
||||
|
||||
def test_prepare_value_timedelta(self, bot: MockedBot):
|
||||
session = CustomSession()
|
||||
|
||||
value = session.prepare_value(datetime.timedelta(minutes=2), bot=bot, files={})
|
||||
assert isinstance(value, str)
|
||||
|
||||
def test_prepare_value_defaults_replace(self):
|
||||
def test_form_serialize(self, value: Any, result: str):
|
||||
bot = MockedBot(
|
||||
default=DefaultBotProperties(
|
||||
parse_mode=ParseMode.HTML,
|
||||
|
|
@ -136,18 +136,43 @@ class TestBaseSession:
|
|||
link_preview_is_disabled=True,
|
||||
)
|
||||
)
|
||||
assert bot.session.prepare_value(Default("parse_mode"), bot=bot, files={}) == "HTML"
|
||||
assert (
|
||||
bot.session.prepare_value(Default("link_preview_is_disabled"), bot=bot, files={})
|
||||
== "true"
|
||||
)
|
||||
assert bot.session.prepare_value(Default("protect_content"), bot=bot, files={}) == "true"
|
||||
|
||||
def test_prepare_value_defaults_unset(self):
|
||||
field_type = type(value)
|
||||
if issubclass(field_type, (datetime.datetime, datetime.timedelta)):
|
||||
field_type = DateTime
|
||||
elif issubclass(field_type, InputFile):
|
||||
field_type = Union[InputFile, str]
|
||||
elif issubclass(field_type, Default):
|
||||
field_type = Optional[Union[Any, Default]]
|
||||
|
||||
class TestObject(TelegramObject):
|
||||
field: field_type
|
||||
|
||||
obj = TestObject.model_validate({"field": value}, context={"bot": bot})
|
||||
serialized_obj = obj.model_dump(mode="json", exclude_none=True)
|
||||
if value is None:
|
||||
assert "field" not in serialized_obj
|
||||
else:
|
||||
value = serialized_obj["field"]
|
||||
assert form_serialize(value) == result
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"default",
|
||||
[
|
||||
UNSET_PARSE_MODE,
|
||||
UNSET_DISABLE_WEB_PAGE_PREVIEW,
|
||||
UNSET_PROTECT_CONTENT,
|
||||
],
|
||||
)
|
||||
def test_default_unset(self, default: Default):
|
||||
bot = MockedBot()
|
||||
assert bot.session.prepare_value(UNSET_PARSE_MODE, bot=bot, files={}) is None
|
||||
assert bot.session.prepare_value(UNSET_DISABLE_WEB_PAGE_PREVIEW, bot=bot, files={}) is None
|
||||
assert bot.session.prepare_value(UNSET_PROTECT_CONTENT, bot=bot, files={}) is None
|
||||
|
||||
class TestObject(TelegramObject):
|
||||
field: Optional[Union[Any, Default]]
|
||||
|
||||
obj = TestObject.model_validate({"field": default}, context={"bot": bot})
|
||||
serialized_obj = obj.model_dump(mode="json")
|
||||
assert serialized_obj["field"] is None
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"status_code,content,error",
|
||||
|
|
@ -205,7 +230,7 @@ class TestBaseSession:
|
|||
bot = MockedBot()
|
||||
method = DeleteMessage(chat_id=42, message_id=42)
|
||||
|
||||
with pytest.raises(ClientDecodeError, match="JSONDecodeError"):
|
||||
with pytest.raises(ClientDecodeError, match="Invalid JSON"):
|
||||
session.check_response(
|
||||
bot=bot,
|
||||
method=method,
|
||||
|
|
|
|||
|
|
@ -1,5 +1,3 @@
|
|||
from typing import AsyncIterable
|
||||
|
||||
from aresponses import ResponsesMockServer
|
||||
|
||||
from aiogram import Bot
|
||||
|
|
@ -83,3 +81,21 @@ class TestInputFile:
|
|||
assert chunk_size == 1
|
||||
size += chunk_size
|
||||
assert size == 10
|
||||
|
||||
async def test_url_input_file_with_default_bot(self, aresponses: ResponsesMockServer):
|
||||
aresponses.add(
|
||||
aresponses.ANY,
|
||||
aresponses.ANY,
|
||||
"get",
|
||||
aresponses.Response(status=200, body=b"\f" * 10),
|
||||
)
|
||||
async with Bot(token="42:TEST").context() as bot:
|
||||
file = URLInputFile("https://test.org/", chunk_size=1, bot=bot)
|
||||
|
||||
size = 0
|
||||
async for chunk in file.read():
|
||||
assert chunk == b"\f"
|
||||
chunk_size = len(chunk)
|
||||
assert chunk_size == 1
|
||||
size += chunk_size
|
||||
assert size == 10
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue