import asyncio import json import os import socket import time from datetime import datetime, timezone from typing import Any from urllib.error import HTTPError, URLError from urllib.request import Request, urlopen def _state_path(cfg: dict[str, Any]) -> str: return cfg.get("external_checks", {}).get("state_path", "/var/server-bot/external_checks.json") def _load_state(cfg: dict[str, Any]) -> dict[str, Any]: path = _state_path(cfg) if not os.path.exists(path): return {"services": {}, "total_checks": 0, "ok_checks": 0} try: with open(path, "r", encoding="utf-8") as f: return json.load(f) except Exception: return {"services": {}, "total_checks": 0, "ok_checks": 0} def _save_state(cfg: dict[str, Any], state: dict[str, Any]) -> None: path = _state_path(cfg) os.makedirs(os.path.dirname(path), exist_ok=True) with open(path, "w", encoding="utf-8") as f: json.dump(state, f, ensure_ascii=False, indent=2) def _check_http(url: str, timeout: int) -> tuple[bool, str]: req = Request(url, headers={"User-Agent": "tg-admin-bot"}) try: with urlopen(req, timeout=timeout) as resp: status = int(resp.status) return status < 400, f"HTTP {status}" except HTTPError as e: return False, f"HTTP {int(e.code)}" except URLError as e: return False, str(e.reason) except Exception as e: return False, str(e) def _check_tcp(host: str, port: int, timeout: int) -> tuple[bool, str]: try: with socket.create_connection((host, port), timeout=timeout): return True, "TCP ok" except Exception as e: return False, str(e) def _check_ping(host: str, timeout: int) -> tuple[bool, str]: try: socket.gethostbyname(host) return True, "DNS ok" except Exception: pass return _check_tcp(host, 80, timeout) def run_checks(cfg: dict[str, Any]) -> dict[str, Any]: checks_cfg = cfg.get("external_checks", {}) services = checks_cfg.get("services", []) timeout = int(checks_cfg.get("timeout_sec", 5)) state = _load_state(cfg) services_state = state.setdefault("services", {}) results = [] for entry in services: name = entry.get("name") or "unknown" check_type = entry.get("type", "http") ok = False detail = "n/a" if check_type == "http": url = entry.get("url") if url: ok, detail = _check_http(url, timeout) elif check_type == "tcp": host = entry.get("host") port = int(entry.get("port", 0)) if host and port: ok, detail = _check_tcp(host, port, timeout) elif check_type == "ping": host = entry.get("host") if host: ok, detail = _check_ping(host, timeout) service_state = services_state.setdefault(name, {"ok": 0, "total": 0}) service_state["total"] += 1 if ok: service_state["ok"] += 1 state["total_checks"] = state.get("total_checks", 0) + 1 if ok: state["ok_checks"] = state.get("ok_checks", 0) + 1 results.append({"name": name, "ok": ok, "detail": detail}) _save_state(cfg, state) return {"results": results, "state": state} def format_report(cfg: dict[str, Any]) -> str: checks_cfg = cfg.get("external_checks", {}) services = checks_cfg.get("services", []) if not services: return "šŸŒ External checks\n\nā„¹ļø No services configured" data = run_checks(cfg) results = data["results"] state = data["state"] total = state.get("total_checks", 0) or 1 ok_total = state.get("ok_checks", 0) uptime = 100.0 * ok_total / total lines = ["šŸŒ External checks", ""] for item in results: icon = "🟢" if item["ok"] else "šŸ”“" lines.append(f"{icon} {item['name']}: {item['detail']}") lines.append("") lines.append(f"šŸ“ˆ Uptime (global): {uptime:.2f}%") lines.append(f"šŸ•’ {datetime.now(timezone.utc):%Y-%m-%d %H:%M UTC}") return "\n".join(lines) async def monitor_external(cfg: dict[str, Any]): checks_cfg = cfg.get("external_checks", {}) if not checks_cfg.get("enabled", True): return interval = int(checks_cfg.get("interval_sec", 300)) while True: run_checks(cfg) await asyncio.sleep(interval)