import asyncio import socket import time import psutil from aiogram import F from aiogram.types import Message from app import dp, cfg from auth import is_admin_msg from keyboards import menu_kb from services.system import format_disks from services.health import health from state import DOCKER_MAP async def cmd_status(msg: Message): now = time.time() uptime_sec = int(now - psutil.boot_time()) days, rem = divmod(uptime_sec, 86400) hours, rem = divmod(rem, 3600) minutes, _ = divmod(rem, 60) load1 = psutil.getloadavg()[0] cpu_icon = "🟒" if load1 > 2.0: cpu_icon = "πŸ”΄" elif load1 > 1.0: cpu_icon = "🟑" mem = psutil.virtual_memory() disks = format_disks() await msg.answer( "πŸ“Š **Server status**\n\n" f"πŸ–₯ **Host:** `{socket.gethostname()}`\n" f"⏱ **Uptime:** {days}d {hours}h {minutes}m\n" f"{cpu_icon} **Load (1m):** {load1:.2f}\n" f"🧠 **RAM:** {mem.used // (1024**3)} / {mem.total // (1024**3)} GiB ({mem.percent}%)\n\n" f"{disks}", reply_markup=menu_kb, parse_mode="Markdown", ) async def cmd_health(msg: Message): await msg.answer("⏳ Health-check…", reply_markup=menu_kb) async def worker(): try: text = await asyncio.to_thread(health, cfg, DOCKER_MAP) except Exception as e: await msg.answer(f"❌ Health failed: {type(e).__name__}: {e}", reply_markup=menu_kb) return await msg.answer(text, reply_markup=menu_kb) asyncio.create_task(worker()) @dp.message(F.text == "🩺 Health") async def h(msg: Message): if is_admin_msg(msg): await cmd_health(msg) @dp.message(F.text == "πŸ“Š Бтатус") async def st(msg: Message): if is_admin_msg(msg): await cmd_status(msg)