import asyncio import socket import time import psutil from aiogram import F from aiogram.types import Message from app import dp, cfg from auth import is_admin_msg from keyboards import menu_kb from services.system import format_disks from services.health import health from state import DOCKER_MAP async def cmd_status(msg: Message): now = time.time() uptime_sec = int(now - psutil.boot_time()) days, rem = divmod(uptime_sec, 86400) hours, rem = divmod(rem, 3600) minutes, _ = divmod(rem, 60) load1 = psutil.getloadavg()[0] cpu_icon = "๐ŸŸข" if load1 > 2.0: cpu_icon = "๐Ÿ”ด" elif load1 > 1.0: cpu_icon = "๐ŸŸก" mem = psutil.virtual_memory() cpu_percent = psutil.cpu_percent(interval=None) disks = format_disks() await msg.answer( "๐Ÿ“Š **Server status**\n\n" f"๐Ÿ–ฅ **Host:** `{socket.gethostname()}`\n" f"โฑ **Uptime:** {days}d {hours}h {minutes}m\n" f"{cpu_icon} **Load (1m):** {load1:.2f}\n" f"๐Ÿงฎ **CPU:** {cpu_percent:.0f}%\n" f"๐Ÿง  **RAM:** {mem.used // (1024**3)} / {mem.total // (1024**3)} GiB ({mem.percent}%)\n\n" f"{disks}", reply_markup=menu_kb, parse_mode="Markdown", ) async def cmd_health(msg: Message): await msg.answer("โณ Health-checkโ€ฆ", reply_markup=menu_kb) async def worker(): try: text = await asyncio.to_thread(health, cfg, DOCKER_MAP) except Exception as e: await msg.answer(f"โŒ Health failed: {type(e).__name__}: {e}", reply_markup=menu_kb) return await msg.answer(text, reply_markup=menu_kb) asyncio.create_task(worker()) @dp.message(F.text == "๐Ÿฉบ Health") async def h(msg: Message): if is_admin_msg(msg): await cmd_health(msg) @dp.message(F.text == "๐Ÿ“Š ะกั‚ะฐั‚ัƒั") async def st(msg: Message): if is_admin_msg(msg): await cmd_status(msg)