mirror of
https://github.com/aiogram/aiogram.git
synced 2026-04-08 16:37:47 +00:00
Custom encoding support (#1278)
* Custom encoding support in deep-linking
This commit is contained in:
parent
5cf8d7b565
commit
995a0d7e9b
5 changed files with 187 additions and 32 deletions
1
CHANGES/1262.feature
Normal file
1
CHANGES/1262.feature
Normal file
|
|
@ -0,0 +1 @@
|
||||||
|
Added support for custom encoders/decoders for payload (and also for deep-linking).
|
||||||
|
|
@ -46,19 +46,33 @@ Decode it back example:
|
||||||
"""
|
"""
|
||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
|
|
||||||
|
__all__ = [
|
||||||
|
"create_start_link",
|
||||||
|
"create_startgroup_link",
|
||||||
|
"create_deep_link",
|
||||||
|
"create_telegram_link",
|
||||||
|
"encode_payload",
|
||||||
|
"decode_payload",
|
||||||
|
]
|
||||||
|
|
||||||
import re
|
import re
|
||||||
from base64 import urlsafe_b64decode, urlsafe_b64encode
|
from typing import Callable, Literal, Optional, TYPE_CHECKING, cast
|
||||||
from typing import TYPE_CHECKING, Literal, cast
|
|
||||||
|
|
||||||
from aiogram.utils.link import create_telegram_link
|
from aiogram.utils.link import create_telegram_link
|
||||||
|
from aiogram.utils.payload import encode_payload, decode_payload
|
||||||
|
|
||||||
if TYPE_CHECKING:
|
if TYPE_CHECKING:
|
||||||
from aiogram import Bot
|
from aiogram import Bot
|
||||||
|
|
||||||
BAD_PATTERN = re.compile(r"[^_A-z0-9-]")
|
BAD_PATTERN = re.compile(r"[^A-z0-9-]")
|
||||||
|
|
||||||
|
|
||||||
async def create_start_link(bot: Bot, payload: str, encode: bool = False) -> str:
|
async def create_start_link(
|
||||||
|
bot: Bot,
|
||||||
|
payload: str,
|
||||||
|
encode: bool = False,
|
||||||
|
encoder: Optional[Callable[[bytes], bytes]] = None,
|
||||||
|
) -> str:
|
||||||
"""
|
"""
|
||||||
Create 'start' deep link with your payload.
|
Create 'start' deep link with your payload.
|
||||||
|
|
||||||
|
|
@ -67,16 +81,26 @@ async def create_start_link(bot: Bot, payload: str, encode: bool = False) -> str
|
||||||
|
|
||||||
:param bot: bot instance
|
:param bot: bot instance
|
||||||
:param payload: args passed with /start
|
:param payload: args passed with /start
|
||||||
:param encode: encode payload with base64url
|
:param encode: encode payload with base64url or custom encoder
|
||||||
|
:param encoder: custom encoder callable
|
||||||
:return: link
|
:return: link
|
||||||
"""
|
"""
|
||||||
username = (await bot.me()).username
|
username = (await bot.me()).username
|
||||||
return create_deep_link(
|
return create_deep_link(
|
||||||
username=cast(str, username), link_type="start", payload=payload, encode=encode
|
username=cast(str, username),
|
||||||
|
link_type="start",
|
||||||
|
payload=payload,
|
||||||
|
encode=encode,
|
||||||
|
encoder=encoder,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
async def create_startgroup_link(bot: Bot, payload: str, encode: bool = False) -> str:
|
async def create_startgroup_link(
|
||||||
|
bot: Bot,
|
||||||
|
payload: str,
|
||||||
|
encode: bool = False,
|
||||||
|
encoder: Optional[Callable[[bytes], bytes]] = None,
|
||||||
|
) -> str:
|
||||||
"""
|
"""
|
||||||
Create 'startgroup' deep link with your payload.
|
Create 'startgroup' deep link with your payload.
|
||||||
|
|
||||||
|
|
@ -85,17 +109,26 @@ async def create_startgroup_link(bot: Bot, payload: str, encode: bool = False) -
|
||||||
|
|
||||||
:param bot: bot instance
|
:param bot: bot instance
|
||||||
:param payload: args passed with /start
|
:param payload: args passed with /start
|
||||||
:param encode: encode payload with base64url
|
:param encode: encode payload with base64url or custom encoder
|
||||||
|
:param encoder: custom encoder callable
|
||||||
:return: link
|
:return: link
|
||||||
"""
|
"""
|
||||||
username = (await bot.me()).username
|
username = (await bot.me()).username
|
||||||
return create_deep_link(
|
return create_deep_link(
|
||||||
username=cast(str, username), link_type="startgroup", payload=payload, encode=encode
|
username=cast(str, username),
|
||||||
|
link_type="startgroup",
|
||||||
|
payload=payload,
|
||||||
|
encode=encode,
|
||||||
|
encoder=encoder,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
def create_deep_link(
|
def create_deep_link(
|
||||||
username: str, link_type: Literal["start", "startgroup"], payload: str, encode: bool = False
|
username: str,
|
||||||
|
link_type: Literal["start", "startgroup"],
|
||||||
|
payload: str,
|
||||||
|
encode: bool = False,
|
||||||
|
encoder: Optional[Callable[[bytes], bytes]] = None,
|
||||||
) -> str:
|
) -> str:
|
||||||
"""
|
"""
|
||||||
Create deep link.
|
Create deep link.
|
||||||
|
|
@ -103,14 +136,15 @@ def create_deep_link(
|
||||||
:param username:
|
:param username:
|
||||||
:param link_type: `start` or `startgroup`
|
:param link_type: `start` or `startgroup`
|
||||||
:param payload: any string-convertible data
|
:param payload: any string-convertible data
|
||||||
:param encode: pass True to encode the payload
|
:param encode: encode payload with base64url or custom encoder
|
||||||
|
:param encoder: custom encoder callable
|
||||||
:return: deeplink
|
:return: deeplink
|
||||||
"""
|
"""
|
||||||
if not isinstance(payload, str):
|
if not isinstance(payload, str):
|
||||||
payload = str(payload)
|
payload = str(payload)
|
||||||
|
|
||||||
if encode:
|
if encode or encoder:
|
||||||
payload = encode_payload(payload)
|
payload = encode_payload(payload, encoder=encoder)
|
||||||
|
|
||||||
if re.search(BAD_PATTERN, payload):
|
if re.search(BAD_PATTERN, payload):
|
||||||
raise ValueError(
|
raise ValueError(
|
||||||
|
|
@ -122,18 +156,3 @@ def create_deep_link(
|
||||||
raise ValueError("Payload must be up to 64 characters long.")
|
raise ValueError("Payload must be up to 64 characters long.")
|
||||||
|
|
||||||
return create_telegram_link(username, **{cast(str, link_type): payload})
|
return create_telegram_link(username, **{cast(str, link_type): payload})
|
||||||
|
|
||||||
|
|
||||||
def encode_payload(payload: str) -> str:
|
|
||||||
"""Encode payload with URL-safe base64url."""
|
|
||||||
payload = str(payload)
|
|
||||||
bytes_payload: bytes = urlsafe_b64encode(payload.encode())
|
|
||||||
str_payload = bytes_payload.decode()
|
|
||||||
return str_payload.replace("=", "")
|
|
||||||
|
|
||||||
|
|
||||||
def decode_payload(payload: str) -> str:
|
|
||||||
"""Decode payload with URL-safe base64url."""
|
|
||||||
payload += "=" * (4 - len(payload) % 4)
|
|
||||||
result: bytes = urlsafe_b64decode(payload)
|
|
||||||
return result.decode()
|
|
||||||
|
|
|
||||||
108
aiogram/utils/payload.py
Normal file
108
aiogram/utils/payload.py
Normal file
|
|
@ -0,0 +1,108 @@
|
||||||
|
"""
|
||||||
|
Payload preparing
|
||||||
|
|
||||||
|
We have added some utils to make work with payload easier.
|
||||||
|
|
||||||
|
Basic encode example:
|
||||||
|
|
||||||
|
.. code-block:: python
|
||||||
|
|
||||||
|
from aiogram.utils.payload import encode_payload
|
||||||
|
|
||||||
|
encoded = encode_payload("foo")
|
||||||
|
|
||||||
|
# result: "Zm9v"
|
||||||
|
|
||||||
|
Basic decode it back example:
|
||||||
|
|
||||||
|
.. code-block:: python
|
||||||
|
|
||||||
|
from aiogram.utils.payload import decode_payload
|
||||||
|
|
||||||
|
encoded = "Zm9v"
|
||||||
|
decoded = decode_payload(encoded)
|
||||||
|
# result: "foo"
|
||||||
|
|
||||||
|
Encoding and decoding with your own methods:
|
||||||
|
|
||||||
|
1. Create your own cryptor
|
||||||
|
|
||||||
|
.. code-block:: python
|
||||||
|
|
||||||
|
from Cryptodome.Cipher import AES
|
||||||
|
from Cryptodome.Util.Padding import pad, unpad
|
||||||
|
|
||||||
|
class Cryptor:
|
||||||
|
def __init__(self, key: str):
|
||||||
|
self.key = key.encode("utf-8")
|
||||||
|
self.mode = AES.MODE_ECB # never use ECB in strong systems obviously
|
||||||
|
self.size = 32
|
||||||
|
|
||||||
|
@property
|
||||||
|
def cipher(self):
|
||||||
|
return AES.new(self.key, self.mode)
|
||||||
|
|
||||||
|
def encrypt(self, data: bytes) -> bytes:
|
||||||
|
return self.cipher.encrypt(pad(data, self.size))
|
||||||
|
|
||||||
|
def decrypt(self, data: bytes) -> bytes:
|
||||||
|
decrypted_data = self.cipher.decrypt(data)
|
||||||
|
return unpad(decrypted_data, self.size)
|
||||||
|
|
||||||
|
2. Pass cryptor callable methods to aiogram payload tools
|
||||||
|
|
||||||
|
.. code-block:: python
|
||||||
|
|
||||||
|
cryptor = Cryptor("abcdefghijklmnop")
|
||||||
|
encoded = encode_payload("foo", encoder=cryptor.encrypt)
|
||||||
|
decoded = decode_payload(encoded_payload, decoder=cryptor.decrypt)
|
||||||
|
|
||||||
|
# result: decoded == "foo"
|
||||||
|
|
||||||
|
"""
|
||||||
|
from base64 import urlsafe_b64decode, urlsafe_b64encode
|
||||||
|
from typing import Callable, Optional
|
||||||
|
|
||||||
|
|
||||||
|
def encode_payload(
|
||||||
|
payload: str,
|
||||||
|
encoder: Optional[Callable[[bytes], bytes]] = None,
|
||||||
|
) -> str:
|
||||||
|
"""Encode payload with encoder.
|
||||||
|
|
||||||
|
Result also will be encoded with URL-safe base64url.
|
||||||
|
"""
|
||||||
|
if not isinstance(payload, str):
|
||||||
|
payload = str(payload)
|
||||||
|
|
||||||
|
payload_bytes = payload.encode("utf-8")
|
||||||
|
if encoder is not None:
|
||||||
|
payload_bytes = encoder(payload_bytes)
|
||||||
|
|
||||||
|
return _encode_b64(payload_bytes)
|
||||||
|
|
||||||
|
|
||||||
|
def decode_payload(
|
||||||
|
payload: str,
|
||||||
|
decoder: Optional[Callable[[bytes], bytes]] = None,
|
||||||
|
) -> str:
|
||||||
|
"""Decode URL-safe base64url payload with decoder."""
|
||||||
|
original_payload = _decode_b64(payload)
|
||||||
|
|
||||||
|
if decoder is None:
|
||||||
|
return original_payload.decode()
|
||||||
|
|
||||||
|
return decoder(original_payload).decode()
|
||||||
|
|
||||||
|
|
||||||
|
def _encode_b64(payload: bytes) -> str:
|
||||||
|
"""Encode with URL-safe base64url."""
|
||||||
|
bytes_payload: bytes = urlsafe_b64encode(payload)
|
||||||
|
str_payload = bytes_payload.decode()
|
||||||
|
return str_payload.replace("=", "")
|
||||||
|
|
||||||
|
|
||||||
|
def _decode_b64(payload: str) -> bytes:
|
||||||
|
"""Decode with URL-safe base64url."""
|
||||||
|
payload += "=" * (4 - len(payload) % 4)
|
||||||
|
return urlsafe_b64decode(payload.encode())
|
||||||
|
|
@ -78,7 +78,8 @@ test = [
|
||||||
"pytest-cov~=4.0.0",
|
"pytest-cov~=4.0.0",
|
||||||
"pytest-aiohttp~=1.0.4",
|
"pytest-aiohttp~=1.0.4",
|
||||||
"aresponses~=2.1.6",
|
"aresponses~=2.1.6",
|
||||||
"pytz~=2022.7.1"
|
"pytz~=2022.7.1",
|
||||||
|
"pycryptodomex~=3.18",
|
||||||
]
|
]
|
||||||
docs = [
|
docs = [
|
||||||
"Sphinx~=7.1.1",
|
"Sphinx~=7.1.1",
|
||||||
|
|
|
||||||
|
|
@ -3,9 +3,8 @@ import pytest
|
||||||
from aiogram.utils.deep_linking import (
|
from aiogram.utils.deep_linking import (
|
||||||
create_start_link,
|
create_start_link,
|
||||||
create_startgroup_link,
|
create_startgroup_link,
|
||||||
decode_payload,
|
|
||||||
encode_payload,
|
|
||||||
)
|
)
|
||||||
|
from aiogram.utils.payload import decode_payload, encode_payload
|
||||||
from tests.mocked_bot import MockedBot
|
from tests.mocked_bot import MockedBot
|
||||||
|
|
||||||
PAYLOADS = [
|
PAYLOADS = [
|
||||||
|
|
@ -51,6 +50,33 @@ class TestDeepLinking:
|
||||||
decoded = decode_payload(encoded)
|
decoded = decode_payload(encoded)
|
||||||
assert decoded == str(payload)
|
assert decoded == str(payload)
|
||||||
|
|
||||||
|
async def test_custom_encode_decode(self, payload: str):
|
||||||
|
from Cryptodome.Cipher import AES
|
||||||
|
from Cryptodome.Util.Padding import pad, unpad
|
||||||
|
|
||||||
|
class Cryptor:
|
||||||
|
def __init__(self, key: str):
|
||||||
|
self.key = key.encode("utf-8")
|
||||||
|
self.mode = AES.MODE_ECB # never use ECB in strong systems obviously
|
||||||
|
self.size = 32
|
||||||
|
|
||||||
|
@property
|
||||||
|
def cipher(self):
|
||||||
|
return AES.new(self.key, self.mode)
|
||||||
|
|
||||||
|
def encrypt(self, data: bytes) -> bytes:
|
||||||
|
return self.cipher.encrypt(pad(data, self.size))
|
||||||
|
|
||||||
|
def decrypt(self, data: bytes) -> bytes:
|
||||||
|
decrypted_data = self.cipher.decrypt(data)
|
||||||
|
return unpad(decrypted_data, self.size)
|
||||||
|
|
||||||
|
cryptor = Cryptor("abcdefghijklmnop")
|
||||||
|
encoded_payload = encode_payload(payload, encoder=cryptor.encrypt)
|
||||||
|
decoded_payload = decode_payload(encoded_payload, decoder=cryptor.decrypt)
|
||||||
|
|
||||||
|
assert decoded_payload == str(payload)
|
||||||
|
|
||||||
async def test_get_start_link_with_encoding(self, bot: MockedBot, wrong_payload: str):
|
async def test_get_start_link_with_encoding(self, bot: MockedBot, wrong_payload: str):
|
||||||
# define link
|
# define link
|
||||||
link = await create_start_link(bot, wrong_payload, encode=True)
|
link = await create_start_link(bot, wrong_payload, encode=True)
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue