Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 16 additions & 3 deletions dumpyarabot/arq_jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

from rich.console import Console

from dumpyarabot.config import settings
from dumpyarabot.config import settings, RICH_MARKDOWN_PARSE_MODE
from dumpyarabot.firmware_downloader import FirmwareDownloader
from dumpyarabot.firmware_extractor import FirmwareExtractor
from dumpyarabot.gitlab_manager import GitLabManager
Expand Down Expand Up @@ -158,6 +158,19 @@ def _status_update_sequence(progress: Optional[Dict[str, Any]]) -> float:
return 0.0


def _status_parse_mode(job_data: Dict[str, Any]) -> str:
"""Pick the parse mode for a job's status edits.

Direct ``/dump`` jobs post their status message via sendRichMessage and set
``_rich_status``; their edits must stay on the rich-text endpoints. Moderated
requests (which create a legacy-Markdown status message elsewhere) keep using
the legacy parse mode.
"""
if job_data.get("_rich_status"):
return RICH_MARKDOWN_PARSE_MODE
return settings.DEFAULT_PARSE_MODE


async def _send_status_update(
job_data: Dict[str, Any],
message: str,
Expand Down Expand Up @@ -219,7 +232,7 @@ async def _send_status_update(
chat_id=chat_id,
text=formatted_message,
edit_message_id=initial_message_id,
parse_mode=settings.DEFAULT_PARSE_MODE,
parse_mode=_status_parse_mode(job_data),
context={
"job_id": job_data["job_id"],
"worker_id": "arq_worker",
Expand Down Expand Up @@ -337,7 +350,7 @@ async def _send_failure_notification(job_data: Dict[str, Any], error_details: st
chat_id=chat_id,
text=formatted_message,
edit_message_id=initial_message_id,
parse_mode=settings.DEFAULT_PARSE_MODE,
parse_mode=_status_parse_mode(job_data),
context={"job_id": job_data.get("job_id", "unknown"), "type": "failure"}
)

Expand Down
6 changes: 6 additions & 0 deletions dumpyarabot/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,12 @@ class Settings(BaseSettings):
settings = Settings()


# Sentinel parse mode that routes a queued/edited message through the Bot API
# 10.1 rich-text endpoints (sendRichMessage / editMessageText with rich_message)
# instead of the legacy send_message/edit_message_text + parse_mode path.
RICH_MARKDOWN_PARSE_MODE = "RichMarkdown"


# Callback data prefixes
CALLBACK_ACCEPT = "accept_"
CALLBACK_REJECT = "reject_"
Expand Down
8 changes: 6 additions & 2 deletions dumpyarabot/handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,8 +122,10 @@ async def dump(
initial_text += "*Elapsed:* 0s\n"
initial_text += " *Worker:* Waiting for assignment...\n"

# Send initial message directly to get real Telegram message ID
initial_message = await message_queue.send_immediate_message(
# Send initial message directly to get real Telegram message ID.
# Uses the Bot API 10.1 rich-message endpoint so the whole /dump status
# message (this send plus every worker edit) renders as rich text.
initial_message = await message_queue.send_immediate_rich_message(
chat_id=chat.id,
text=initial_text,
reply_to_message_id=None if use_privdump else message.message_id
Expand All @@ -137,6 +139,8 @@ async def dump(
enhanced_job_data = job.model_dump()
# Store initial text so the worker can re-edit it during Telegram context verification
enhanced_job_data["_queued_text"] = initial_text
# Flag the job so its status edits keep using the rich-message endpoints.
enhanced_job_data["_rich_status"] = True
enhanced_job_data["metadata"] = {
"telegram_context": {
"chat_id": chat.id,
Expand Down
158 changes: 149 additions & 9 deletions dumpyarabot/message_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,17 @@
from enum import Enum
from typing import Any, Dict, Optional, List

import httpx
import redis.asyncio as redis
from pydantic import BaseModel, Field, model_validator
from rich.console import Console
from telegram import Bot
from telegram.error import RetryAfter, TelegramError, NetworkError, BadRequest
from telegram.error import Forbidden, RetryAfter, TelegramError, NetworkError, BadRequest
import telegram

from dumpyarabot.config import settings
from dumpyarabot.config import settings, RICH_MARKDOWN_PARSE_MODE
from dumpyarabot.schemas import DumpArguments, DumpJob, JobCancelResult, JobProgress, JobStatus
from dumpyarabot.utils import legacy_markdown_to_rich_markdown

console = Console()

Expand Down Expand Up @@ -388,6 +390,125 @@ async def send_immediate_message(
console.print(f"[green]Sent immediate message {message.message_id} to chat {chat_id}[/green]")
return message

# ========== BOT API 10.1 RICH MESSAGE SUPPORT ==========
# Rich Messages landed in Bot API 10.1, but no released python-telegram-bot
# exposes them yet: 22.8 (latest, the version pinned here) only reaches Bot
# API 10.0, and upstream support is still open
# (https://github.com/python-telegram-bot/python-telegram-bot/issues/5261).
# Until a PTB release ships sendRichMessage / editMessageText(rich_message) /
# InputRichMessage, they are called here via the raw Bot API.
# TODO: replace _call_rich_api with PTB's typed methods once PTB supports 10.1.
# Telegram error responses are translated into the same telegram.error
# exceptions PTB raises, so the existing retry / dead-letter /
# "message is not modified" handling in _process_message works unchanged.

def _telegram_api_base(self) -> str:
"""Return the ``.../bot<token>`` prefix for raw Bot API requests."""
if settings.TELEGRAM_API_BASE_URL:
base = settings.TELEGRAM_API_BASE_URL.rstrip("/")
else:
base = "https://api.telegram.org"
return f"{base}/bot{settings.TELEGRAM_BOT_TOKEN}"

async def _call_rich_api(self, method: str, payload: Dict[str, Any]) -> Dict[str, Any]:
"""Call a raw Bot API method, returning ``result`` or raising a telegram error."""
url = f"{self._telegram_api_base()}/{method}"
timeout = httpx.Timeout(
settings.TELEGRAM_TEXT_WRITE_TIMEOUT,
read=settings.TELEGRAM_TEXT_READ_TIMEOUT,
)
try:
async with httpx.AsyncClient(timeout=timeout) as client:
response = await client.post(url, json=payload)
data = response.json()
except httpx.HTTPError as e:
raise NetworkError(f"{method} transport error: {e}") from e
except ValueError as e: # non-JSON body
raise TelegramError(f"{method} returned a non-JSON response: {e}") from e

if data.get("ok"):
return data.get("result", {})

error_code = data.get("error_code")
description = data.get("description", "unknown error")
if error_code == 429:
retry_after = int((data.get("parameters") or {}).get("retry_after", 1))
raise RetryAfter(retry_after)
if error_code == 403:
raise Forbidden(description)
if error_code == 400:
raise BadRequest(description)
raise TelegramError(f"{method} failed ({error_code}): {description}")

@staticmethod
def _build_input_rich_message(text: str) -> Dict[str, Any]:
"""Wrap legacy-Markdown ``text`` into an InputRichMessage payload."""
return {"markdown": legacy_markdown_to_rich_markdown(text)}

async def send_immediate_rich_message(
self,
chat_id: int,
text: str,
reply_to_message_id: Optional[int] = None,
) -> "telegram.Message":
"""Send a rich message directly via sendRichMessage and return the Message.

Mirrors send_immediate_message but uses the Bot API 10.1 rich-text endpoint
so the returned message_id can be edited later with editMessageText(rich_message).
"""
bot = await self._ensure_bot()

payload: Dict[str, Any] = {
"chat_id": chat_id,
"rich_message": self._build_input_rich_message(text),
}
if reply_to_message_id:
payload["reply_parameters"] = {"message_id": reply_to_message_id}

console.print(f"[blue]Sending immediate rich message to chat {chat_id}[/blue]")
result = await self._call_rich_api("sendRichMessage", payload)
message = telegram.Message.de_json(result, bot)
console.print(f"[green]Sent immediate rich message {message.message_id} to chat {chat_id}[/green]")
return message

async def _process_rich_message(self, message: "QueuedMessage", text: str) -> None:
"""Dispatch a queued message through the rich-text endpoints.

Raises the same telegram.error exceptions as the legacy path, so the
caller's retry / dead-letter / no-op-edit handling is reused verbatim.
"""
rich_message = self._build_input_rich_message(text)

if message.edit_message_id:
await self._call_rich_api(
"editMessageText",
{
"chat_id": message.chat_id,
"message_id": message.edit_message_id,
"rich_message": rich_message,
},
)
return

payload: Dict[str, Any] = {
"chat_id": message.chat_id,
"rich_message": rich_message,
}
if message.reply_parameters:
payload["reply_parameters"] = {
"message_id": message.reply_parameters["message_id"],
"chat_id": message.reply_parameters["chat_id"],
}
elif message.reply_to_message_id:
payload["reply_parameters"] = {"message_id": message.reply_to_message_id}

result = await self._call_rich_api("sendRichMessage", payload)

if message.delete_after:
asyncio.create_task(
self._auto_delete_message(message.chat_id, result["message_id"], message.delete_after)
)

async def send_immediate_status_update(
self,
chat_id: int,
Expand Down Expand Up @@ -607,6 +728,13 @@ async def _process_message(self, message: QueuedMessage) -> bool:
if latest:
text = latest

# Bot API 10.1 rich messages take a separate code path: the text is
# delivered as an InputRichMessage rather than text + parse_mode.
if message.parse_mode == RICH_MARKDOWN_PARSE_MODE:
await self._process_rich_message(message, text)
console.print(f"[green]Successfully processed {message.type.value} message[/green]")
return True

# Prepare common parameters
kwargs = {
"chat_id": message.chat_id,
Expand Down Expand Up @@ -882,13 +1010,25 @@ async def verify_telegram_context(self, job_data: Dict[str, Any]) -> None:
probe_text = job_data.get("_queued_text", f"\u23f3 Job `{job_id}` starting...")

try:
await bot.edit_message_text(
chat_id=initial_chat_id,
message_id=initial_message_id,
text=probe_text,
parse_mode=settings.DEFAULT_PARSE_MODE,
disable_web_page_preview=True,
)
if job_data.get("_rich_status"):
# The status message was sent via sendRichMessage, so it must be
# probed/edited via editMessageText(rich_message) to stay rich.
await self._call_rich_api(
"editMessageText",
{
"chat_id": initial_chat_id,
"message_id": initial_message_id,
"rich_message": self._build_input_rich_message(probe_text),
},
)
else:
await bot.edit_message_text(
chat_id=initial_chat_id,
message_id=initial_message_id,
text=probe_text,
parse_mode=settings.DEFAULT_PARSE_MODE,
disable_web_page_preview=True,
)
except Forbidden as e:
raise RuntimeError(f"Telegram context invalid (bot blocked/forbidden): {e}") from e
except BadRequest as e:
Expand Down
51 changes: 51 additions & 0 deletions dumpyarabot/utils.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import re
import secrets
import asyncio
from datetime import datetime
Expand Down Expand Up @@ -80,3 +81,53 @@ def escape_markdown(text: str) -> str:
def generate_request_id() -> str:
"""Generate a unique request ID."""
return secrets.token_hex(4) # 8-character hex string


# Sentinels used to shield code spans, fenced blocks and backslash escapes while
# rewriting emphasis markers. Control characters won't occur in our status text.
_RICH_STASH_OPEN = "\x00"
_RICH_STASH_CLOSE = "\x01"
_RICH_STASH_RE = re.compile(r"\x00(\d+)\x01")
_RICH_PROTECT_RE = re.compile(
r"```.*?```" # fenced code block (pre)
r"|`[^`]*`" # inline code span
r"|\\.", # backslash escape, e.g. \* \_ \[ \`
re.DOTALL,
)


def legacy_markdown_to_rich_markdown(text: str) -> str:
"""Convert Telegram legacy ``Markdown`` text into Bot API 10.1 Rich Markdown.

The bot builds every status string in legacy ``Markdown`` (``*bold*``,
``_italic_``, ``code spans`` and ``[links](url)``). Rich Markdown reuses the
same syntax with one breaking difference: a single ``*x*`` now means *italic*,
while **bold** requires ``**x**``. Italic (``_x_``), code spans, fenced blocks,
links and backslash escapes are identical in both dialects, so the only
rewrite needed is doubling the bold asterisks.

In valid legacy Markdown every literal asterisk is backslash-escaped (``\\*``),
so once escapes, code spans and fenced blocks are shielded, every remaining
``*`` is a bold delimiter and can be safely doubled.

Args:
text: A string formatted for the legacy ``Markdown`` parse mode.

Returns:
The equivalent string formatted for the Rich Markdown parse mode.
"""
if not text:
return text

stash: List[str] = []

def _protect(match: "re.Match[str]") -> str:
stash.append(match.group(0))
return f"{_RICH_STASH_OPEN}{len(stash) - 1}{_RICH_STASH_CLOSE}"

shielded = _RICH_PROTECT_RE.sub(_protect, text)

# Every surviving asterisk delimits legacy bold; ``**`` is Rich Markdown bold.
shielded = shielded.replace("*", "**")

return _RICH_STASH_RE.sub(lambda m: stash[int(m.group(1))], shielded)
Loading