import asyncio from datetime import datetime from aiogram import F from aiogram.types import Message, InlineKeyboardMarkup, InlineKeyboardButton, CallbackQuery from app import dp, cfg from auth import is_admin_msg from keyboards import docker_kb, arcane_kb from services.arcane import list_projects, restart_project, set_project_state, get_project_details from state import ARCANE_CACHE def _arcane_cfg(): arc = cfg.get("arcane", {}) return arc.get("base_url"), arc.get("api_key"), int(arc.get("env_id", 0)) def _arcane_kb(page: int, total_pages: int, items: list[dict]) -> InlineKeyboardMarkup: rows = [] for p in items: name = p.get("name", "?") pid = p.get("id", "") if not pid: continue status = p.get("status", "unknown") action = "down" if status == "running" else "up" action_text = "⏹" if action == "down" else "▶️" rows.append([ InlineKeyboardButton(text=f"🔄 {name}", callback_data=f"arcane:restart:{pid}"), InlineKeyboardButton(text="ℹ️", callback_data=f"arcane:details:{pid}"), InlineKeyboardButton(text="📦", callback_data=f"arcane:deploy:{pid}"), InlineKeyboardButton(text=action_text, callback_data=f"arcane:{action}:{pid}"), ]) nav = [] if page > 0: nav.append(InlineKeyboardButton(text="⬅️ Prev", callback_data=f"arcane:page:{page-1}")) nav.append(InlineKeyboardButton(text="🔄 Refresh", callback_data="arcane:refresh")) if page < total_pages - 1: nav.append(InlineKeyboardButton(text="Next ➡️", callback_data=f"arcane:page:{page+1}")) if nav: rows.append(nav) return InlineKeyboardMarkup(inline_keyboard=rows) def _render_arcane_page(items: list[dict], page: int, page_size: int, ts: str) -> tuple[str, InlineKeyboardMarkup]: total_pages = max(1, (len(items) + page_size - 1) // page_size) page = max(0, min(page, total_pages - 1)) start = page * page_size end = start + page_size page_items = items[start:end] lines = [f"🧰 Arcane projects на {ts} (page {page+1}/{total_pages})\n"] for p in page_items: status = p.get("status", "unknown") name = p.get("name", "?") running = p.get("runningCount", 0) total = p.get("serviceCount", 0) icon = "🟢" if status == "running" else "🟡" lines.append(f"{icon} {name}: {status} ({running}/{total})") kb = _arcane_kb(page, total_pages, page_items) return "\n".join(lines), kb async def cmd_arcane_projects(msg: Message, *, edit: bool, page: int = 0): base_url, api_key, env_id = _arcane_cfg() if not base_url or not api_key: await msg.answer("⚠️ Arcane config missing", reply_markup=docker_kb) return if edit: try: await msg.edit_text("⏳ Arcane projects…") except Exception: await msg.answer("⏳ Arcane projects…", reply_markup=arcane_kb) else: await msg.answer("⏳ Arcane projects…", reply_markup=arcane_kb) async def worker(): ok, info, items = await asyncio.to_thread(list_projects, base_url, api_key, env_id) if not ok: await msg.answer(f"❌ Arcane error: {info}", reply_markup=arcane_kb) return ts = datetime.now().strftime("%d.%m.%Y %H:%M:%S") ARCANE_CACHE[msg.chat.id] = { "items": items, "page_size": 4, "ts": ts, } text, kb = _render_arcane_page(items, page, 4, ts) if edit: try: await msg.edit_text(text, reply_markup=kb) except Exception: await msg.answer(text, reply_markup=kb) else: await msg.answer(text, reply_markup=kb) asyncio.create_task(worker()) @dp.message(F.text == "🧰 Arcane") async def arcane_menu(msg: Message): if is_admin_msg(msg): await cmd_arcane_projects(msg, edit=False) @dp.message(F.text == "🔄 Refresh") async def arcane_refresh(msg: Message): if is_admin_msg(msg): await cmd_arcane_projects(msg, edit=False) @dp.callback_query(F.data == "arcane:refresh") async def arcane_refresh_inline(cb: CallbackQuery): if cb.from_user.id != cfg["telegram"]["admin_id"]: return await cb.answer() await cmd_arcane_projects(cb.message, edit=True) @dp.callback_query(F.data.startswith("arcane:page:")) async def arcane_page(cb: CallbackQuery): if cb.from_user.id != cfg["telegram"]["admin_id"]: return try: page = int(cb.data.split(":", 2)[2]) except ValueError: await cb.answer("Bad page") return data = ARCANE_CACHE.get(cb.message.chat.id) if not data: await cb.answer("No cache") return text, kb = _render_arcane_page(data["items"], page, data["page_size"], data["ts"]) await cb.answer() await cb.message.edit_text(text, reply_markup=kb) @dp.callback_query(F.data.startswith("arcane:restart:")) async def arcane_restart(cb: CallbackQuery): if cb.from_user.id != cfg["telegram"]["admin_id"]: return _, _, pid = cb.data.split(":", 2) base_url, api_key, env_id = _arcane_cfg() if not base_url or not api_key: await cb.answer("Arcane config missing") return await cb.answer("Restarting…") ok, info = await asyncio.to_thread(restart_project, base_url, api_key, env_id, pid) if ok: await cb.message.answer("✅ Arcane restart triggered", reply_markup=arcane_kb) else: await cb.message.answer(f"❌ Arcane restart failed: {info}", reply_markup=arcane_kb) @dp.callback_query(F.data.startswith("arcane:details:")) async def arcane_details(cb: CallbackQuery): if cb.from_user.id != cfg["telegram"]["admin_id"]: return _, _, pid = cb.data.split(":", 2) base_url, api_key, env_id = _arcane_cfg() if not base_url or not api_key: await cb.answer("Arcane config missing") return await cb.answer("Loading…") ok, info, data = await asyncio.to_thread(get_project_details, base_url, api_key, env_id, pid) if not ok: await cb.message.answer(f"❌ Arcane details failed: {info}", reply_markup=arcane_kb) return name = data.get("name", "?") status = data.get("status", "unknown") running = data.get("runningCount", 0) total = data.get("serviceCount", 0) status_reason = data.get("statusReason") icon = "🟢" if status == "running" else "🟡" lines = [ f"🧰 **{name}**", f"{icon} Status: {status} ({running}/{total})", ] if status_reason: lines.append(f"⚠️ {status_reason}") services = data.get("runtimeServices", []) if services: lines.append("") lines.append("🧩 Services:") for s in services: s_name = s.get("name", "?") s_status = s.get("status", "unknown") s_health = s.get("health") s_icon = "🟢" if s_status == "running" else "🟡" line = f"{s_icon} {s_name}: {s_status}" if s_health: line += f" ({s_health})" lines.append(line) await cb.message.answer("\n".join(lines), parse_mode="Markdown", reply_markup=arcane_kb) @dp.callback_query(F.data.startswith("arcane:deploy:")) async def arcane_deploy_status(cb: CallbackQuery): if cb.from_user.id != cfg["telegram"]["admin_id"]: return _, _, pid = cb.data.split(":", 2) base_url, api_key, env_id = _arcane_cfg() if not base_url or not api_key: await cb.answer("Arcane config missing") return await cb.answer("Loading…") ok, info, data = await asyncio.to_thread(get_project_details, base_url, api_key, env_id, pid) if not ok: await cb.message.answer(f"❌ Arcane deploy status failed: {info}", reply_markup=arcane_kb) return name = data.get("name", "?") status = data.get("status", "unknown") status_reason = data.get("statusReason") updated = data.get("updatedAt") path = data.get("path") repo = data.get("gitRepositoryURL") commit = data.get("lastSyncCommit") running = data.get("runningCount", 0) total = data.get("serviceCount", 0) icon = "🟢" if status == "running" else "🟡" lines = [ f"📦 **Deploy status: {name}**", f"{icon} Status: {status} ({running}/{total})", ] if status_reason: lines.append(f"⚠️ {status_reason}") if updated: lines.append(f"🕒 Updated: {updated}") if path: lines.append(f"📁 Path: {path}") if repo: lines.append(f"🔗 Repo: {repo}") if commit: lines.append(f"🧾 Commit: {commit}") await cb.message.answer("\n".join(lines), parse_mode="Markdown", reply_markup=arcane_kb) @dp.callback_query(F.data.startswith("arcane:up:")) async def arcane_up(cb: CallbackQuery): if cb.from_user.id != cfg["telegram"]["admin_id"]: return _, _, pid = cb.data.split(":", 2) base_url, api_key, env_id = _arcane_cfg() if not base_url or not api_key: await cb.answer("Arcane config missing") return await cb.answer("Starting…") ok, info = await asyncio.to_thread(set_project_state, base_url, api_key, env_id, pid, "up") if ok: await cb.message.answer("✅ Arcane up triggered", reply_markup=arcane_kb) else: await cb.message.answer(f"❌ Arcane up failed: {info}", reply_markup=arcane_kb) @dp.callback_query(F.data.startswith("arcane:down:")) async def arcane_down(cb: CallbackQuery): if cb.from_user.id != cfg["telegram"]["admin_id"]: return _, _, pid = cb.data.split(":", 2) base_url, api_key, env_id = _arcane_cfg() if not base_url or not api_key: await cb.answer("Arcane config missing") return await cb.answer("Stopping…") ok, info = await asyncio.to_thread(set_project_state, base_url, api_key, env_id, pid, "down") if ok: await cb.message.answer("✅ Arcane down triggered", reply_markup=arcane_kb) else: await cb.message.answer(f"❌ Arcane down failed: {info}", reply_markup=arcane_kb)