from aiogram import F from aiogram.types import Message from app import dp from auth import is_admin_msg from keyboards import system_kb from system_checks import security, disks from app import cfg from services.http_checks import get_url_checks, check_url import asyncio @dp.message(F.text == "💽 Disks") async def sd(msg: Message): if is_admin_msg(msg): await msg.answer(disks(), reply_markup=system_kb) @dp.message(F.text == "🔐 Security") async def sec(msg: Message): if is_admin_msg(msg): await msg.answer(security(), reply_markup=system_kb) @dp.message(F.text == "🌐 URLs") async def urls(msg: Message): if not is_admin_msg(msg): return checks = list(get_url_checks(cfg)) if not checks: await msg.answer("⚠️ Нет URL для проверки", reply_markup=system_kb) return await msg.answer("⏳ Проверяю URL…", reply_markup=system_kb) async def worker(): tasks = [asyncio.to_thread(check_url, url) for _, url in checks] results = await asyncio.gather(*tasks) lines = ["🌐 URLs\n"] for (alias, url), (ok, status, ms, err) in zip(checks, results): if ok: lines.append(f"🟢 {alias}: {status} ({ms}ms)") elif status is not None: lines.append(f"🔴 {alias}: {status} ({ms}ms)") else: reason = err or "error" lines.append(f"🔴 {alias}: {reason} ({ms}ms)") await msg.answer("\n".join(lines), reply_markup=system_kb) asyncio.create_task(worker())