76 lines
2.6 KiB
Python
76 lines
2.6 KiB
Python
import json
|
|
from datetime import datetime, timedelta
|
|
import asyncio
|
|
from typing import Any
|
|
|
|
from services.health import health
|
|
from services.runner import run_cmd_full
|
|
from services.incidents import log_incident
|
|
|
|
|
|
async def run_selftest(cfg: dict[str, Any], docker_map: dict[str, str]) -> tuple[str, bool]:
|
|
lines = ["🧪 Self-test"]
|
|
ok = True
|
|
|
|
# health
|
|
try:
|
|
htext = await asyncio.to_thread(health, cfg, docker_map)
|
|
h_lines = [ln for ln in htext.splitlines() if ln.strip()]
|
|
brief = " | ".join(h_lines[1:5]) if len(h_lines) > 1 else h_lines[0] if h_lines else "n/a"
|
|
lines.append(f"🟢 Health: {brief}")
|
|
except Exception as e:
|
|
lines.append(f"🔴 Health failed: {e}")
|
|
ok = False
|
|
|
|
# restic snapshots check
|
|
rc, out = await run_cmd_full(["restic", "snapshots", "--json"], use_restic_env=True, timeout=40)
|
|
if rc == 0:
|
|
try:
|
|
snaps = json.loads(out)
|
|
if isinstance(snaps, list) and snaps:
|
|
snaps.sort(key=lambda s: s.get("time", ""), reverse=True)
|
|
last = snaps[0]
|
|
t = last.get("time", "?").replace("Z", "").replace("T", " ")[:16]
|
|
lines.append(f"🟢 Restic snapshots: {len(snaps)}, last {t}")
|
|
else:
|
|
lines.append("🟡 Restic snapshots: empty")
|
|
except Exception:
|
|
lines.append("🟡 Restic snapshots: invalid JSON")
|
|
else:
|
|
lines.append(f"🔴 Restic snapshots error: {out.strip() or rc}")
|
|
ok = False
|
|
|
|
return "\n".join(lines), ok
|
|
|
|
|
|
async def schedule_selftest(cfg: dict[str, Any], bot, admin_ids: list[int], docker_map: dict[str, str]):
|
|
"""
|
|
Run selftest daily at configured time.
|
|
"""
|
|
sched_cfg = cfg.get("selftest", {}).get("schedule", {})
|
|
if not sched_cfg.get("enabled", False):
|
|
return
|
|
time_str = sched_cfg.get("time", "03:30")
|
|
try:
|
|
hh, mm = [int(x) for x in time_str.split(":")]
|
|
except Exception:
|
|
hh, mm = 3, 30
|
|
|
|
while True:
|
|
now = datetime.now()
|
|
run_at = now.replace(hour=hh, minute=mm, second=0, microsecond=0)
|
|
if run_at <= now:
|
|
run_at += timedelta(days=1)
|
|
await asyncio.sleep((run_at - now).total_seconds())
|
|
text, ok = await run_selftest(cfg, docker_map)
|
|
for chat_id in admin_ids:
|
|
try:
|
|
await bot.send_message(chat_id, text)
|
|
except Exception:
|
|
pass
|
|
if not ok:
|
|
try:
|
|
log_incident(cfg, "selftest failed", category="selftest")
|
|
except Exception:
|
|
pass
|