Files
tg-admin-bot/services/external_checks.py
2026-02-08 02:16:42 +03:00

144 lines
4.3 KiB
Python
Raw Permalink Blame History

This file contains invisible Unicode characters
This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import asyncio
import json
import os
import socket
import time
from datetime import datetime, timezone
from typing import Any
from urllib.error import HTTPError, URLError
from urllib.request import Request, urlopen
def _state_path(cfg: dict[str, Any]) -> str:
return cfg.get("external_checks", {}).get("state_path", "/var/server-bot/external_checks.json")
def _load_state(cfg: dict[str, Any]) -> dict[str, Any]:
path = _state_path(cfg)
if not os.path.exists(path):
return {"services": {}, "total_checks": 0, "ok_checks": 0}
try:
with open(path, "r", encoding="utf-8") as f:
return json.load(f)
except Exception:
return {"services": {}, "total_checks": 0, "ok_checks": 0}
def _save_state(cfg: dict[str, Any], state: dict[str, Any]) -> None:
path = _state_path(cfg)
os.makedirs(os.path.dirname(path), exist_ok=True)
with open(path, "w", encoding="utf-8") as f:
json.dump(state, f, ensure_ascii=False, indent=2)
def _check_http(url: str, timeout: int) -> tuple[bool, str]:
req = Request(url, headers={"User-Agent": "tg-admin-bot"})
try:
with urlopen(req, timeout=timeout) as resp:
status = int(resp.status)
return status < 400, f"HTTP {status}"
except HTTPError as e:
return False, f"HTTP {int(e.code)}"
except URLError as e:
return False, str(e.reason)
except Exception as e:
return False, str(e)
def _check_tcp(host: str, port: int, timeout: int) -> tuple[bool, str]:
try:
with socket.create_connection((host, port), timeout=timeout):
return True, "TCP ok"
except Exception as e:
return False, str(e)
def _check_ping(host: str, timeout: int) -> tuple[bool, str]:
try:
socket.gethostbyname(host)
return True, "DNS ok"
except Exception:
pass
return _check_tcp(host, 80, timeout)
def run_checks(cfg: dict[str, Any]) -> dict[str, Any]:
checks_cfg = cfg.get("external_checks", {})
services = checks_cfg.get("services", [])
timeout = int(checks_cfg.get("timeout_sec", 5))
state = _load_state(cfg)
services_state = state.setdefault("services", {})
results = []
for entry in services:
name = entry.get("name") or "unknown"
check_type = entry.get("type", "http")
ok = False
detail = "n/a"
if check_type == "http":
url = entry.get("url")
if url:
ok, detail = _check_http(url, timeout)
elif check_type == "tcp":
host = entry.get("host")
port = int(entry.get("port", 0))
if host and port:
ok, detail = _check_tcp(host, port, timeout)
elif check_type == "ping":
host = entry.get("host")
if host:
ok, detail = _check_ping(host, timeout)
service_state = services_state.setdefault(name, {"ok": 0, "total": 0})
service_state["total"] += 1
if ok:
service_state["ok"] += 1
state["total_checks"] = state.get("total_checks", 0) + 1
if ok:
state["ok_checks"] = state.get("ok_checks", 0) + 1
results.append({"name": name, "ok": ok, "detail": detail})
_save_state(cfg, state)
return {"results": results, "state": state}
def format_report(cfg: dict[str, Any]) -> str:
checks_cfg = cfg.get("external_checks", {})
services = checks_cfg.get("services", [])
if not services:
return "🌍 External checks\n\n No services configured"
data = run_checks(cfg)
results = data["results"]
state = data["state"]
total = state.get("total_checks", 0) or 1
ok_total = state.get("ok_checks", 0)
uptime = 100.0 * ok_total / total
lines = ["🌍 External checks", ""]
for item in results:
icon = "🟢" if item["ok"] else "🔴"
lines.append(f"{icon} {item['name']}: {item['detail']}")
lines.append("")
lines.append(f"📈 Uptime (global): {uptime:.2f}%")
lines.append(f"🕒 {datetime.now(timezone.utc):%Y-%m-%d %H:%M UTC}")
return "\n".join(lines)
async def monitor_external(cfg: dict[str, Any]):
checks_cfg = cfg.get("external_checks", {})
if not checks_cfg.get("enabled", True):
return
interval = int(checks_cfg.get("interval_sec", 300))
while True:
run_checks(cfg)
await asyncio.sleep(interval)