import asyncio from aiogram import F from aiogram.types import Message from app import dp, cfg from auth import is_admin_msg from keyboards import docker_kb, arcane_kb from services.arcane import list_projects, restart_project, set_project_state from datetime import datetime from aiogram.types import InlineKeyboardMarkup, InlineKeyboardButton, CallbackQuery from state import ARCANE_CACHE def _arcane_cfg(): arc = cfg.get("arcane", {}) return arc.get("base_url"), arc.get("api_key"), int(arc.get("env_id", 0)) def _arcane_kb(page: int, total_pages: int, items: list[dict]) -> InlineKeyboardMarkup: rows = [] for p in items: name = p.get("name", "?") pid = p.get("id", "") if not pid: continue rows.append([ InlineKeyboardButton(text=f"šŸ”„ {name}", callback_data=f"arcane:restart:{pid}"), InlineKeyboardButton(text="ā–¶ļø", callback_data=f"arcane:up:{pid}"), InlineKeyboardButton(text="ā¹", callback_data=f"arcane:down:{pid}"), ]) nav = [] if page > 0: nav.append(InlineKeyboardButton(text="ā¬…ļø Prev", callback_data=f"arcane:page:{page-1}")) nav.append(InlineKeyboardButton(text="šŸ”„ Refresh", callback_data="arcane:refresh")) if page < total_pages - 1: nav.append(InlineKeyboardButton(text="Next āž”ļø", callback_data=f"arcane:page:{page+1}")) if nav: rows.append(nav) return InlineKeyboardMarkup(inline_keyboard=rows) def _render_arcane_page(items: list[dict], page: int, page_size: int, ts: str) -> tuple[str, InlineKeyboardMarkup]: total_pages = max(1, (len(items) + page_size - 1) // page_size) page = max(0, min(page, total_pages - 1)) start = page * page_size end = start + page_size page_items = items[start:end] lines = [f"🧰 Arcane projects на {ts} (page {page+1}/{total_pages})\n"] for p in page_items: status = p.get("status", "unknown") name = p.get("name", "?") running = p.get("runningCount", 0) total = p.get("serviceCount", 0) icon = "🟢" if status == "running" else "🟔" lines.append(f"{icon} {name}: {status} ({running}/{total})") kb = _arcane_kb(page, total_pages, page_items) return "\n".join(lines), kb async def cmd_arcane_projects(msg: Message, *, edit: bool, page: int = 0): base_url, api_key, env_id = _arcane_cfg() if not base_url or not api_key: await msg.answer("āš ļø Arcane config missing", reply_markup=docker_kb) return if edit: try: await msg.edit_text("ā³ Arcane projects…") except Exception: if edit: try: await msg.edit_text("ā³ Arcane projects…") except Exception: await msg.answer("ā³ Arcane projects…", reply_markup=arcane_kb) else: await msg.answer("ā³ Arcane projects…", reply_markup=arcane_kb) else: await msg.answer("ā³ Arcane projects…", reply_markup=arcane_kb) async def worker(): ok, info, items = await asyncio.to_thread(list_projects, base_url, api_key, env_id) if not ok: await msg.answer(f"āŒ Arcane error: {info}", reply_markup=arcane_kb) return ts = datetime.now().strftime("%d.%m.%Y %H:%M:%S") ARCANE_CACHE[msg.chat.id] = { "items": items, "page_size": 4, "ts": ts, } text, kb = _render_arcane_page(items, page, 4, ts) if edit: try: await msg.edit_text(text, reply_markup=kb) except Exception: await msg.answer(text, reply_markup=kb) else: await msg.answer(text, reply_markup=kb) asyncio.create_task(worker()) @dp.message(F.text == "🧰 Arcane") async def arcane_menu(msg: Message): if is_admin_msg(msg): await cmd_arcane_projects(msg, edit=False) @dp.message(F.text == "šŸ”„ Refresh") async def arcane_refresh(msg: Message): if is_admin_msg(msg): await cmd_arcane_projects(msg, edit=False) @dp.callback_query(F.data == "arcane:refresh") async def arcane_refresh_inline(cb: CallbackQuery): if cb.from_user.id != cfg["telegram"]["admin_id"]: return await cb.answer() await cmd_arcane_projects(cb.message, edit=True) @dp.callback_query(F.data.startswith("arcane:page:")) async def arcane_page(cb: CallbackQuery): if cb.from_user.id != cfg["telegram"]["admin_id"]: return try: page = int(cb.data.split(":", 2)[2]) except ValueError: await cb.answer("Bad page") return data = ARCANE_CACHE.get(cb.message.chat.id) if not data: await cb.answer("No cache") return text, kb = _render_arcane_page(data["items"], page, data["page_size"], data["ts"]) await cb.answer() await cb.message.edit_text(text, reply_markup=kb) @dp.callback_query(F.data.startswith("arcane:restart:")) async def arcane_restart(cb: CallbackQuery): if cb.from_user.id != cfg["telegram"]["admin_id"]: return _, _, pid = cb.data.split(":", 2) base_url, api_key, env_id = _arcane_cfg() if not base_url or not api_key: await cb.answer("Arcane config missing") return await cb.answer("Restarting…") ok, info = await asyncio.to_thread(restart_project, base_url, api_key, env_id, pid) if ok: await cb.message.answer("āœ… Arcane restart triggered", reply_markup=arcane_kb) else: await cb.message.answer(f"āŒ Arcane restart failed: {info}", reply_markup=arcane_kb) @dp.callback_query(F.data.startswith("arcane:up:")) async def arcane_up(cb: CallbackQuery): if cb.from_user.id != cfg["telegram"]["admin_id"]: return _, _, pid = cb.data.split(":", 2) base_url, api_key, env_id = _arcane_cfg() if not base_url or not api_key: await cb.answer("Arcane config missing") return await cb.answer("Starting…") ok, info = await asyncio.to_thread(set_project_state, base_url, api_key, env_id, pid, "up") if ok: await cb.message.answer("āœ… Arcane up triggered", reply_markup=arcane_kb) else: await cb.message.answer(f"āŒ Arcane up failed: {info}", reply_markup=arcane_kb) @dp.callback_query(F.data.startswith("arcane:down:")) async def arcane_down(cb: CallbackQuery): if cb.from_user.id != cfg["telegram"]["admin_id"]: return _, _, pid = cb.data.split(":", 2) base_url, api_key, env_id = _arcane_cfg() if not base_url or not api_key: await cb.answer("Arcane config missing") return await cb.answer("Stopping…") ok, info = await asyncio.to_thread(set_project_state, base_url, api_key, env_id, pid, "down") if ok: await cb.message.answer("āœ… Arcane down triggered", reply_markup=arcane_kb) else: await cb.message.answer(f"āŒ Arcane down failed: {info}", reply_markup=arcane_kb)