Files
tg-admin-bot/handlers/arcane.py

291 lines
9.9 KiB
Python
Raw Permalink Blame History

This file contains invisible Unicode characters
This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import asyncio
from datetime import datetime
from aiogram import F
from aiogram.types import Message, InlineKeyboardMarkup, InlineKeyboardButton, CallbackQuery
from app import dp, cfg, ADMIN_IDS
from auth import is_admin_msg
from keyboards import docker_kb, arcane_kb
from services.arcane import list_projects, restart_project, set_project_state, get_project_details
from state import ARCANE_CACHE
def _arcane_cfg():
arc = cfg.get("arcane", {})
return arc.get("base_url"), arc.get("api_key"), int(arc.get("env_id", 0))
def _arcane_kb(page: int, total_pages: int, items: list[dict]) -> InlineKeyboardMarkup:
rows = []
for p in items:
name = p.get("name", "?")
pid = p.get("id", "")
if not pid:
continue
status = p.get("status", "unknown")
action = "down" if status == "running" else "up"
action_text = "" if action == "down" else "▶️"
rows.append([
InlineKeyboardButton(text=f"🔄 {name}", callback_data=f"arcane:restart:{pid}"),
InlineKeyboardButton(text="", callback_data=f"arcane:details:{pid}"),
InlineKeyboardButton(text="📦", callback_data=f"arcane:deploy:{pid}"),
InlineKeyboardButton(text=action_text, callback_data=f"arcane:{action}:{pid}"),
])
nav = []
if page > 0:
nav.append(InlineKeyboardButton(text="⬅️ Prev", callback_data=f"arcane:page:{page-1}"))
nav.append(InlineKeyboardButton(text="🔄 Refresh", callback_data="arcane:refresh"))
if page < total_pages - 1:
nav.append(InlineKeyboardButton(text="Next ➡️", callback_data=f"arcane:page:{page+1}"))
if nav:
rows.append(nav)
return InlineKeyboardMarkup(inline_keyboard=rows)
def _render_arcane_page(items: list[dict], page: int, page_size: int, ts: str) -> tuple[str, InlineKeyboardMarkup]:
total_pages = max(1, (len(items) + page_size - 1) // page_size)
page = max(0, min(page, total_pages - 1))
start = page * page_size
end = start + page_size
page_items = items[start:end]
lines = [f"🧰 Arcane projects на {ts} (page {page+1}/{total_pages})\n"]
for p in page_items:
status = p.get("status", "unknown")
name = p.get("name", "?")
running = p.get("runningCount", 0)
total = p.get("serviceCount", 0)
icon = "🟢" if status == "running" else "🟡"
lines.append(f"{icon} {name}: {status} ({running}/{total})")
kb = _arcane_kb(page, total_pages, page_items)
return "\n".join(lines), kb
async def cmd_arcane_projects(msg: Message, *, edit: bool, page: int = 0):
base_url, api_key, env_id = _arcane_cfg()
if not base_url or not api_key:
await msg.answer("⚠️ Arcane config missing", reply_markup=docker_kb)
return
if edit:
try:
await msg.edit_text("⏳ Arcane projects…")
except Exception:
await msg.answer("⏳ Arcane projects…", reply_markup=arcane_kb)
else:
await msg.answer("⏳ Arcane projects…", reply_markup=arcane_kb)
async def worker():
ok, info, items = await asyncio.to_thread(list_projects, base_url, api_key, env_id)
if not ok:
await msg.answer(f"❌ Arcane error: {info}", reply_markup=arcane_kb)
return
ts = datetime.now().strftime("%d.%m.%Y %H:%M:%S")
ARCANE_CACHE[msg.chat.id] = {
"items": items,
"page_size": 4,
"ts": ts,
}
text, kb = _render_arcane_page(items, page, 4, ts)
if edit:
try:
await msg.edit_text(text, reply_markup=kb)
except Exception:
await msg.answer(text, reply_markup=kb)
else:
await msg.answer(text, reply_markup=kb)
asyncio.create_task(worker())
@dp.message(F.text == "🧰 Arcane")
async def arcane_menu(msg: Message):
if is_admin_msg(msg):
await cmd_arcane_projects(msg, edit=False)
@dp.message(F.text == "🔄 Refresh")
async def arcane_refresh(msg: Message):
if is_admin_msg(msg):
await cmd_arcane_projects(msg, edit=False)
@dp.callback_query(F.data == "arcane:refresh")
async def arcane_refresh_inline(cb: CallbackQuery):
if cb.from_user.id not in ADMIN_IDS:
return
await cb.answer()
await cmd_arcane_projects(cb.message, edit=True)
@dp.callback_query(F.data.startswith("arcane:page:"))
async def arcane_page(cb: CallbackQuery):
if cb.from_user.id not in ADMIN_IDS:
return
try:
page = int(cb.data.split(":", 2)[2])
except ValueError:
await cb.answer("Bad page")
return
data = ARCANE_CACHE.get(cb.message.chat.id)
if not data:
await cb.answer("No cache")
return
text, kb = _render_arcane_page(data["items"], page, data["page_size"], data["ts"])
await cb.answer()
await cb.message.edit_text(text, reply_markup=kb)
@dp.callback_query(F.data.startswith("arcane:restart:"))
async def arcane_restart(cb: CallbackQuery):
if cb.from_user.id not in ADMIN_IDS:
return
_, _, pid = cb.data.split(":", 2)
base_url, api_key, env_id = _arcane_cfg()
if not base_url or not api_key:
await cb.answer("Arcane config missing")
return
await cb.answer("Restarting…")
ok, info = await asyncio.to_thread(restart_project, base_url, api_key, env_id, pid)
if ok:
await cb.message.answer("✅ Arcane restart triggered", reply_markup=arcane_kb)
else:
await cb.message.answer(f"❌ Arcane restart failed: {info}", reply_markup=arcane_kb)
@dp.callback_query(F.data.startswith("arcane:details:"))
async def arcane_details(cb: CallbackQuery):
if cb.from_user.id not in ADMIN_IDS:
return
_, _, pid = cb.data.split(":", 2)
base_url, api_key, env_id = _arcane_cfg()
if not base_url or not api_key:
await cb.answer("Arcane config missing")
return
await cb.answer("Loading…")
ok, info, data = await asyncio.to_thread(get_project_details, base_url, api_key, env_id, pid)
if not ok:
await cb.message.answer(f"❌ Arcane details failed: {info}", reply_markup=arcane_kb)
return
name = data.get("name", "?")
status = data.get("status", "unknown")
running = data.get("runningCount", 0)
total = data.get("serviceCount", 0)
status_reason = data.get("statusReason")
icon = "🟢" if status == "running" else "🟡"
lines = [
f"🧰 **{name}**",
f"{icon} Status: {status} ({running}/{total})",
]
if status_reason:
lines.append(f"⚠️ {status_reason}")
services = data.get("runtimeServices", [])
if services:
lines.append("")
lines.append("🧩 Services:")
for s in services:
s_name = s.get("name", "?")
s_status = s.get("status", "unknown")
s_health = s.get("health")
s_icon = "🟢" if s_status == "running" else "🟡"
line = f"{s_icon} {s_name}: {s_status}"
if s_health:
line += f" ({s_health})"
lines.append(line)
await cb.message.answer("\n".join(lines), parse_mode="Markdown", reply_markup=arcane_kb)
@dp.callback_query(F.data.startswith("arcane:deploy:"))
async def arcane_deploy_status(cb: CallbackQuery):
if cb.from_user.id not in ADMIN_IDS:
return
_, _, pid = cb.data.split(":", 2)
base_url, api_key, env_id = _arcane_cfg()
if not base_url or not api_key:
await cb.answer("Arcane config missing")
return
await cb.answer("Loading…")
ok, info, data = await asyncio.to_thread(get_project_details, base_url, api_key, env_id, pid)
if not ok:
await cb.message.answer(f"❌ Arcane deploy status failed: {info}", reply_markup=arcane_kb)
return
name = data.get("name", "?")
status = data.get("status", "unknown")
status_reason = data.get("statusReason")
updated = data.get("updatedAt")
path = data.get("path")
repo = data.get("gitRepositoryURL")
commit = data.get("lastSyncCommit")
running = data.get("runningCount", 0)
total = data.get("serviceCount", 0)
icon = "🟢" if status == "running" else "🟡"
lines = [
f"📦 **Deploy status: {name}**",
f"{icon} Status: {status} ({running}/{total})",
]
if status_reason:
lines.append(f"⚠️ {status_reason}")
if updated:
lines.append(f"🕒 Updated: {updated}")
if path:
lines.append(f"📁 Path: {path}")
if repo:
lines.append(f"🔗 Repo: {repo}")
if commit:
lines.append(f"🧾 Commit: {commit}")
await cb.message.answer("\n".join(lines), parse_mode="Markdown", reply_markup=arcane_kb)
@dp.callback_query(F.data.startswith("arcane:up:"))
async def arcane_up(cb: CallbackQuery):
if cb.from_user.id not in ADMIN_IDS:
return
_, _, pid = cb.data.split(":", 2)
base_url, api_key, env_id = _arcane_cfg()
if not base_url or not api_key:
await cb.answer("Arcane config missing")
return
await cb.answer("Starting…")
ok, info = await asyncio.to_thread(set_project_state, base_url, api_key, env_id, pid, "up")
if ok:
await cb.message.answer("✅ Arcane up triggered", reply_markup=arcane_kb)
else:
await cb.message.answer(f"❌ Arcane up failed: {info}", reply_markup=arcane_kb)
@dp.callback_query(F.data.startswith("arcane:down:"))
async def arcane_down(cb: CallbackQuery):
if cb.from_user.id not in ADMIN_IDS:
return
_, _, pid = cb.data.split(":", 2)
base_url, api_key, env_id = _arcane_cfg()
if not base_url or not api_key:
await cb.answer("Arcane config missing")
return
await cb.answer("Stopping…")
ok, info = await asyncio.to_thread(set_project_state, base_url, api_key, env_id, pid, "down")
if ok:
await cb.message.answer("✅ Arcane down triggered", reply_markup=arcane_kb)
else:
await cb.message.answer(f"❌ Arcane down failed: {info}", reply_markup=arcane_kb)