Files
tg-admin-bot/handlers/arcane.py
2026-02-07 23:24:58 +03:00

53 lines
1.6 KiB
Python

import asyncio
from aiogram import F
from aiogram.types import Message
from app import dp, cfg
from auth import is_admin_msg
from keyboards import docker_kb, arcane_kb
from services.arcane import list_projects
def _arcane_cfg():
arc = cfg.get("arcane", {})
return arc.get("base_url"), arc.get("api_key"), int(arc.get("env_id", 0))
async def cmd_arcane_projects(msg: Message):
base_url, api_key, env_id = _arcane_cfg()
if not base_url or not api_key:
await msg.answer("⚠️ Arcane config missing", reply_markup=docker_kb)
return
await msg.answer("⏳ Arcane projects…", reply_markup=arcane_kb)
async def worker():
ok, info, items = await asyncio.to_thread(list_projects, base_url, api_key, env_id)
if not ok:
await msg.answer(f"❌ Arcane error: {info}", reply_markup=arcane_kb)
return
lines = ["🧰 Arcane projects\n"]
for p in items:
status = p.get("status", "unknown")
name = p.get("name", "?")
running = p.get("runningCount", 0)
total = p.get("serviceCount", 0)
icon = "🟢" if status == "running" else "🟡"
lines.append(f"{icon} {name}: {status} ({running}/{total})")
await msg.answer("\n".join(lines), reply_markup=arcane_kb)
asyncio.create_task(worker())
@dp.message(F.text == "🧰 Arcane")
async def arcane_menu(msg: Message):
if is_admin_msg(msg):
await cmd_arcane_projects(msg)
@dp.message(F.text == "🔄 Refresh")
async def arcane_refresh(msg: Message):
if is_admin_msg(msg):
await cmd_arcane_projects(msg)