144 lines
4.3 KiB
Python
144 lines
4.3 KiB
Python
import asyncio
|
||
import json
|
||
import os
|
||
import socket
|
||
import time
|
||
from datetime import datetime, timezone
|
||
from typing import Any
|
||
from urllib.error import HTTPError, URLError
|
||
from urllib.request import Request, urlopen
|
||
|
||
|
||
def _state_path(cfg: dict[str, Any]) -> str:
|
||
return cfg.get("external_checks", {}).get("state_path", "/var/server-bot/external_checks.json")
|
||
|
||
|
||
def _load_state(cfg: dict[str, Any]) -> dict[str, Any]:
|
||
path = _state_path(cfg)
|
||
if not os.path.exists(path):
|
||
return {"services": {}, "total_checks": 0, "ok_checks": 0}
|
||
try:
|
||
with open(path, "r", encoding="utf-8") as f:
|
||
return json.load(f)
|
||
except Exception:
|
||
return {"services": {}, "total_checks": 0, "ok_checks": 0}
|
||
|
||
|
||
def _save_state(cfg: dict[str, Any], state: dict[str, Any]) -> None:
|
||
path = _state_path(cfg)
|
||
os.makedirs(os.path.dirname(path), exist_ok=True)
|
||
with open(path, "w", encoding="utf-8") as f:
|
||
json.dump(state, f, ensure_ascii=False, indent=2)
|
||
|
||
|
||
def _check_http(url: str, timeout: int) -> tuple[bool, str]:
|
||
req = Request(url, headers={"User-Agent": "tg-admin-bot"})
|
||
try:
|
||
with urlopen(req, timeout=timeout) as resp:
|
||
status = int(resp.status)
|
||
return status < 400, f"HTTP {status}"
|
||
except HTTPError as e:
|
||
return False, f"HTTP {int(e.code)}"
|
||
except URLError as e:
|
||
return False, str(e.reason)
|
||
except Exception as e:
|
||
return False, str(e)
|
||
|
||
|
||
def _check_tcp(host: str, port: int, timeout: int) -> tuple[bool, str]:
|
||
try:
|
||
with socket.create_connection((host, port), timeout=timeout):
|
||
return True, "TCP ok"
|
||
except Exception as e:
|
||
return False, str(e)
|
||
|
||
|
||
def _check_ping(host: str, timeout: int) -> tuple[bool, str]:
|
||
try:
|
||
socket.gethostbyname(host)
|
||
return True, "DNS ok"
|
||
except Exception:
|
||
pass
|
||
return _check_tcp(host, 80, timeout)
|
||
|
||
|
||
def run_checks(cfg: dict[str, Any]) -> dict[str, Any]:
|
||
checks_cfg = cfg.get("external_checks", {})
|
||
services = checks_cfg.get("services", [])
|
||
timeout = int(checks_cfg.get("timeout_sec", 5))
|
||
|
||
state = _load_state(cfg)
|
||
services_state = state.setdefault("services", {})
|
||
|
||
results = []
|
||
for entry in services:
|
||
name = entry.get("name") or "unknown"
|
||
check_type = entry.get("type", "http")
|
||
ok = False
|
||
detail = "n/a"
|
||
|
||
if check_type == "http":
|
||
url = entry.get("url")
|
||
if url:
|
||
ok, detail = _check_http(url, timeout)
|
||
elif check_type == "tcp":
|
||
host = entry.get("host")
|
||
port = int(entry.get("port", 0))
|
||
if host and port:
|
||
ok, detail = _check_tcp(host, port, timeout)
|
||
elif check_type == "ping":
|
||
host = entry.get("host")
|
||
if host:
|
||
ok, detail = _check_ping(host, timeout)
|
||
|
||
service_state = services_state.setdefault(name, {"ok": 0, "total": 0})
|
||
service_state["total"] += 1
|
||
if ok:
|
||
service_state["ok"] += 1
|
||
|
||
state["total_checks"] = state.get("total_checks", 0) + 1
|
||
if ok:
|
||
state["ok_checks"] = state.get("ok_checks", 0) + 1
|
||
|
||
results.append({"name": name, "ok": ok, "detail": detail})
|
||
|
||
_save_state(cfg, state)
|
||
return {"results": results, "state": state}
|
||
|
||
|
||
def format_report(cfg: dict[str, Any]) -> str:
|
||
checks_cfg = cfg.get("external_checks", {})
|
||
services = checks_cfg.get("services", [])
|
||
if not services:
|
||
return "🌍 External checks\n\nℹ️ No services configured"
|
||
|
||
data = run_checks(cfg)
|
||
results = data["results"]
|
||
state = data["state"]
|
||
|
||
total = state.get("total_checks", 0) or 1
|
||
ok_total = state.get("ok_checks", 0)
|
||
uptime = 100.0 * ok_total / total
|
||
|
||
lines = ["🌍 External checks", ""]
|
||
for item in results:
|
||
icon = "🟢" if item["ok"] else "🔴"
|
||
lines.append(f"{icon} {item['name']}: {item['detail']}")
|
||
|
||
lines.append("")
|
||
lines.append(f"📈 Uptime (global): {uptime:.2f}%")
|
||
|
||
lines.append(f"🕒 {datetime.now(timezone.utc):%Y-%m-%d %H:%M UTC}")
|
||
return "\n".join(lines)
|
||
|
||
|
||
async def monitor_external(cfg: dict[str, Any]):
|
||
checks_cfg = cfg.get("external_checks", {})
|
||
if not checks_cfg.get("enabled", True):
|
||
return
|
||
interval = int(checks_cfg.get("interval_sec", 300))
|
||
|
||
while True:
|
||
run_checks(cfg)
|
||
await asyncio.sleep(interval)
|