96 lines
3.1 KiB
Python
96 lines
3.1 KiB
Python
import json
|
|
from datetime import datetime, timedelta
|
|
import asyncio
|
|
from typing import Any
|
|
|
|
from services.health import health
|
|
from services.runner import run_cmd_full
|
|
from services.incidents import log_incident
|
|
from services import runtime_state
|
|
|
|
|
|
def _save_history(entry: dict[str, Any]) -> None:
|
|
hist = runtime_state.get("selftest_history", [])
|
|
hist = hist[:50] if isinstance(hist, list) else []
|
|
hist.insert(0, entry)
|
|
runtime_state.set_state("selftest_history", hist[:20])
|
|
|
|
|
|
async def run_selftest(cfg: dict[str, Any], docker_map: dict[str, str]) -> tuple[str, bool]:
|
|
lines = ["🧪 Self-test"]
|
|
ok = True
|
|
|
|
# health
|
|
try:
|
|
htext = await asyncio.to_thread(health, cfg, docker_map)
|
|
h_lines = [ln for ln in htext.splitlines() if ln.strip()]
|
|
brief = " | ".join(h_lines[1:5]) if len(h_lines) > 1 else h_lines[0] if h_lines else "n/a"
|
|
lines.append(f"🟢 Health: {brief}")
|
|
except Exception as e:
|
|
lines.append(f"🔴 Health failed: {e}")
|
|
ok = False
|
|
|
|
# restic snapshots check
|
|
rc, out = await run_cmd_full(["restic", "snapshots", "--json"], use_restic_env=True, timeout=40)
|
|
if rc == 0:
|
|
try:
|
|
snaps = json.loads(out)
|
|
if isinstance(snaps, list) and snaps:
|
|
snaps.sort(key=lambda s: s.get("time", ""), reverse=True)
|
|
last = snaps[0]
|
|
t = last.get("time", "?").replace("Z", "").replace("T", " ")[:16]
|
|
lines.append(f"🟢 Restic snapshots: {len(snaps)}, last {t}")
|
|
else:
|
|
lines.append("🟡 Restic snapshots: empty")
|
|
except Exception:
|
|
lines.append("🟡 Restic snapshots: invalid JSON")
|
|
else:
|
|
lines.append(f"🔴 Restic snapshots error: {out.strip() or rc}")
|
|
ok = False
|
|
|
|
result_text = "\n".join(lines)
|
|
try:
|
|
_save_history(
|
|
{
|
|
"ts": datetime.now().isoformat(),
|
|
"ok": ok,
|
|
"summary": result_text.splitlines()[1] if len(lines) > 1 else "",
|
|
}
|
|
)
|
|
except Exception:
|
|
pass
|
|
|
|
return result_text, ok
|
|
|
|
|
|
async def schedule_selftest(cfg: dict[str, Any], bot, admin_ids: list[int], docker_map: dict[str, str]):
|
|
"""
|
|
Run selftest daily at configured time.
|
|
"""
|
|
sched_cfg = cfg.get("selftest", {}).get("schedule", {})
|
|
if not sched_cfg.get("enabled", False):
|
|
return
|
|
time_str = sched_cfg.get("time", "03:30")
|
|
try:
|
|
hh, mm = [int(x) for x in time_str.split(":")]
|
|
except Exception:
|
|
hh, mm = 3, 30
|
|
|
|
while True:
|
|
now = datetime.now()
|
|
run_at = now.replace(hour=hh, minute=mm, second=0, microsecond=0)
|
|
if run_at <= now:
|
|
run_at += timedelta(days=1)
|
|
await asyncio.sleep((run_at - now).total_seconds())
|
|
text, ok = await run_selftest(cfg, docker_map)
|
|
for chat_id in admin_ids:
|
|
try:
|
|
await bot.send_message(chat_id, text)
|
|
except Exception:
|
|
pass
|
|
if not ok:
|
|
try:
|
|
log_incident(cfg, "selftest failed", category="selftest")
|
|
except Exception:
|
|
pass
|