Files
tg-admin-bot/handlers/arcane.py
2026-02-07 23:27:09 +03:00

78 lines
2.6 KiB
Python

import asyncio
from aiogram import F
from aiogram.types import Message
from app import dp, cfg
from auth import is_admin_msg
from keyboards import docker_kb, arcane_kb
from services.arcane import list_projects, restart_project
from aiogram.types import InlineKeyboardMarkup, InlineKeyboardButton, CallbackQuery
def _arcane_cfg():
arc = cfg.get("arcane", {})
return arc.get("base_url"), arc.get("api_key"), int(arc.get("env_id", 0))
async def cmd_arcane_projects(msg: Message):
base_url, api_key, env_id = _arcane_cfg()
if not base_url or not api_key:
await msg.answer("⚠️ Arcane config missing", reply_markup=docker_kb)
return
await msg.answer("⏳ Arcane projects…", reply_markup=arcane_kb)
async def worker():
ok, info, items = await asyncio.to_thread(list_projects, base_url, api_key, env_id)
if not ok:
await msg.answer(f"❌ Arcane error: {info}", reply_markup=arcane_kb)
return
lines = ["🧰 Arcane projects\n"]
rows = []
for p in items:
status = p.get("status", "unknown")
name = p.get("name", "?")
pid = p.get("id", "")
running = p.get("runningCount", 0)
total = p.get("serviceCount", 0)
icon = "🟢" if status == "running" else "🟡"
lines.append(f"{icon} {name}: {status} ({running}/{total})")
if pid:
rows.append([InlineKeyboardButton(text=f"🔄 {name}", callback_data=f"arcane:restart:{pid}")])
kb = InlineKeyboardMarkup(inline_keyboard=rows) if rows else arcane_kb
await msg.answer("\n".join(lines), reply_markup=kb)
asyncio.create_task(worker())
@dp.message(F.text == "🧰 Arcane")
async def arcane_menu(msg: Message):
if is_admin_msg(msg):
await cmd_arcane_projects(msg)
@dp.message(F.text == "🔄 Refresh")
async def arcane_refresh(msg: Message):
if is_admin_msg(msg):
await cmd_arcane_projects(msg)
@dp.callback_query(F.data.startswith("arcane:restart:"))
async def arcane_restart(cb: CallbackQuery):
if cb.from_user.id != cfg["telegram"]["admin_id"]:
return
_, _, pid = cb.data.split(":", 2)
base_url, api_key, env_id = _arcane_cfg()
if not base_url or not api_key:
await cb.answer("Arcane config missing")
return
await cb.answer("Restarting…")
ok, info = await asyncio.to_thread(restart_project, base_url, api_key, env_id, pid)
if ok:
await cb.message.answer("✅ Arcane restart triggered", reply_markup=arcane_kb)
else:
await cb.message.answer(f"❌ Arcane restart failed: {info}", reply_markup=arcane_kb)