feat: updated

This commit is contained in:
2026-05-10 09:50:39 +03:00
parent 1ae330258b
commit 65c464ab67
19 changed files with 184 additions and 265 deletions

View File

@@ -1,9 +1,5 @@
import json
import mimetypes
import secrets
import urllib.parse
import urllib.request
from datetime import datetime
from pathlib import Path
from shutil import rmtree
from uuid import uuid4
@@ -72,180 +68,6 @@ def room_upload_dir(room_name: str) -> Path:
return settings.uploads_path / room_name
def human_dt(value: datetime) -> str:
return value.astimezone().strftime("%d.%m.%Y %H:%M:%S %Z")
def format_duration(seconds: int) -> str:
minutes, secs = divmod(max(0, seconds), 60)
hours, minutes = divmod(minutes, 60)
if hours:
return f"{hours:02d}:{minutes:02d}:{secs:02d}"
return f"{minutes:02d}:{secs:02d}"
def split_text_chunks(text: str, limit: int = 3500) -> list[str]:
if len(text) <= limit:
return [text]
chunks: list[str] = []
current = ""
for line in text.splitlines():
candidate = f"{current}\n{line}".strip() if current else line
if len(candidate) > limit and current:
chunks.append(current)
current = line
elif len(line) > limit:
if current:
chunks.append(current)
current = ""
for start in range(0, len(line), limit):
chunks.append(line[start:start + limit])
else:
current = candidate
if current:
chunks.append(current)
return chunks
def telegram_targets() -> list[dict[str, str | int]]:
chat_id = settings.telegram_chat_id.strip()
if settings.telegram_topic_id is not None:
return [
{
"chat_id": chat_id,
"message_thread_id": settings.telegram_topic_id,
}
]
return [{"chat_id": chat_id}]
def telegram_request(
method: str,
fields: dict[str, str | int],
file_path: Path | None = None,
file_name: str | None = None,
content_type: str | None = None,
) -> None:
url = f"https://api.telegram.org/bot{settings.telegram_bot_token.strip()}/{method}"
if file_path is None:
data = urllib.parse.urlencode({k: v for k, v in fields.items() if v is not None}).encode("utf-8")
request = urllib.request.Request(url, data=data, method="POST")
else:
boundary = f"----ChatGPTBoundary{uuid4().hex}"
body = bytearray()
for key, value in fields.items():
if value is None:
continue
body.extend(f"--{boundary}\r\n".encode())
body.extend(f'Content-Disposition: form-data; name="{key}"\r\n\r\n{value}\r\n'.encode("utf-8"))
guessed_type = content_type or mimetypes.guess_type(file_name or file_path.name)[0] or "application/octet-stream"
body.extend(f"--{boundary}\r\n".encode())
body.extend(
f'Content-Disposition: form-data; name="document"; filename="{file_name or file_path.name}"\r\n'.encode(
"utf-8"
)
)
body.extend(f"Content-Type: {guessed_type}\r\n\r\n".encode("utf-8"))
body.extend(file_path.read_bytes())
body.extend(b"\r\n")
body.extend(f"--{boundary}--\r\n".encode())
request = urllib.request.Request(
url,
data=bytes(body),
method="POST",
headers={"Content-Type": f"multipart/form-data; boundary={boundary}"},
)
with urllib.request.urlopen(request, timeout=20) as response:
payload = json.loads(response.read().decode("utf-8"))
if not payload.get("ok"):
raise RuntimeError(payload.get("description") or "Telegram API error")
def send_telegram_text(text: str) -> None:
for target in telegram_targets():
for chunk in split_text_chunks(text):
telegram_request(
"sendMessage",
{
**target,
"text": chunk,
"disable_web_page_preview": "true",
},
)
def send_telegram_document(file_path: Path, attachment: AttachmentRecord, caption: str | None = None) -> None:
for target in telegram_targets():
telegram_request(
"sendDocument",
{
**target,
"caption": caption or attachment.original_name,
},
file_path=file_path,
file_name=attachment.original_name,
content_type=attachment.content_type,
)
def send_finish_alert(call: CallRecord) -> None:
if not (settings.telegram_alerting_available and call.telegram_alert_enabled):
return
started_at = call.started_at or call.created_at
finished_at = datetime.now(started_at.tzinfo)
duration_seconds = int((finished_at - started_at).total_seconds()) if started_at else 0
if duration_seconds < 60:
return
if len(call.participant_names) < 2:
return
if call.started_at is None:
return
participants = ", ".join(call.participant_names) if call.participant_names else ""
summary = "\n".join(
[
f"Итоги созвона: {call.room_title}",
f"Комната: {call.room_name}",
f"Создана: {human_dt(call.created_at)}",
f"Таймер созвона: {format_duration(duration_seconds)}",
f"Участники: {participants}",
]
)
send_telegram_text(summary)
transcript_lines: list[str] = []
for event in call.chat_events:
timestamp = event.created_at.astimezone().strftime("%H:%M:%S")
if event.event_type == "text" and event.text:
transcript_lines.append(f"[{timestamp}] {event.author}: {event.text}")
elif event.event_type == "file":
transcript_lines.append(f"[{timestamp}] {event.author}: файл {event.file_name or 'без названия'}")
if transcript_lines:
send_telegram_text("Сообщения и файлы из чата:\n" + "\n".join(transcript_lines))
else:
send_telegram_text("Сообщения и файлы из чата: нет пользовательских сообщений.")
attachment_authors = {
event.attachment_id: event.author
for event in call.chat_events
if event.event_type == "file" and event.attachment_id
}
upload_dir = room_upload_dir(call.room_name)
for attachment in call.attachments:
file_path = upload_dir / attachment.stored_name
if not file_path.exists():
continue
author = attachment_authors.get(attachment.attachment_id)
caption = f"Файл из чата: {attachment.original_name}"
if author:
caption = f"{caption}{author}"
send_telegram_document(file_path, attachment, caption=caption[:1000])
@app.get("/health")
def health() -> dict[str, str]:
return {"status": "ok"}
@@ -254,8 +76,8 @@ def health() -> dict[str, str]:
@app.get("/api/config", response_model=AppConfigResponse)
def get_config() -> AppConfigResponse:
return AppConfigResponse(
telegram_alerting_available=settings.telegram_alerting_available,
max_attachment_size_mb=settings.max_attachment_size_mb,
turn_ice_servers=settings.turn_ice_servers,
)
@@ -278,7 +100,6 @@ async def create_call(payload: CreateCallRequest | None = None) -> CreateCallRes
invite_token = uuid4().hex
room_title = (payload.room_title if payload and payload.room_title else room_name)
password = payload.password.strip() if payload and payload.password else None
telegram_alert_enabled = bool(payload.telegram_alert_enabled) if payload else False
quick_join_default = bool(payload.quick_join) if payload else False
short_code = secrets.token_hex(4)
@@ -292,7 +113,6 @@ async def create_call(payload: CreateCallRequest | None = None) -> CreateCallRes
invite_short_code=short_code,
quick_join_default=quick_join_default,
password=password or None,
telegram_alert_enabled=settings.telegram_alerting_available and telegram_alert_enabled,
)
store.create(call)
await create_room_if_needed(room_name)
@@ -479,17 +299,9 @@ def finish_call(room_name: str, payload: FinishCallRequest) -> dict[str, str]:
if payload.invite_token != call.invite_token:
raise HTTPException(status_code=403, detail="Invalid invite token")
alert_status = "disabled"
if settings.telegram_alerting_available and call.telegram_alert_enabled:
try:
send_finish_alert(call)
alert_status = "sent"
except Exception:
alert_status = "failed"
store.clear_attachments(room_name)
upload_dir = room_upload_dir(room_name)
if upload_dir.exists():
rmtree(upload_dir, ignore_errors=True)
store.deactivate(room_name)
return {"status": "finished", "telegram_alert_status": alert_status}
return {"status": "finished"}