Add incidents export, queue alerts, and health summaries
This commit is contained in:
@@ -65,6 +65,10 @@ def _parse_line(line: str) -> tuple[datetime | None, str]:
|
||||
|
||||
|
||||
def read_recent(cfg: dict[str, Any], hours: int, limit: int = 200) -> list[str]:
|
||||
return [f"{dt:%Y-%m-%d %H:%M} {msg}" for dt, msg in read_raw(cfg, hours, limit=limit)]
|
||||
|
||||
|
||||
def read_raw(cfg: dict[str, Any], hours: int, limit: int = 200, *, include_old: bool = False) -> list[tuple[datetime, str]]:
|
||||
path = _get_path(cfg)
|
||||
if not os.path.exists(path):
|
||||
return []
|
||||
@@ -74,7 +78,40 @@ def read_recent(cfg: dict[str, Any], hours: int, limit: int = 200) -> list[str]:
|
||||
with open(path, "r", encoding="utf-8", errors="replace") as f:
|
||||
for line in f:
|
||||
dt, msg = _parse_line(line.rstrip())
|
||||
if dt is None or dt < since:
|
||||
if dt is None:
|
||||
continue
|
||||
lines.append(f"{dt:%Y-%m-%d %H:%M} {msg}")
|
||||
if not include_old and dt < since:
|
||||
continue
|
||||
lines.append((dt, msg))
|
||||
return list(lines)
|
||||
|
||||
|
||||
def infer_category(text: str) -> str | None:
|
||||
lower = text.lower()
|
||||
if "category=" in lower:
|
||||
import re
|
||||
|
||||
m = re.search(r"category=([a-z0-9_-]+)", lower)
|
||||
if m:
|
||||
return m.group(1)
|
||||
if "load" in lower:
|
||||
return "load"
|
||||
if "docker" in lower:
|
||||
return "docker"
|
||||
if "restic" in lower or "backup" in lower:
|
||||
return "backup"
|
||||
if "smart" in lower:
|
||||
return "smart"
|
||||
if "ssl" in lower or "cert" in lower:
|
||||
return "ssl"
|
||||
if "npmplus" in lower:
|
||||
return "npmplus"
|
||||
if "gitea" in lower:
|
||||
return "gitea"
|
||||
if "openwrt" in lower:
|
||||
return "openwrt"
|
||||
if "queue" in lower:
|
||||
return "queue"
|
||||
if "selftest" in lower:
|
||||
return "selftest"
|
||||
return None
|
||||
|
||||
@@ -3,6 +3,7 @@ import time
|
||||
from collections import deque
|
||||
from typing import Awaitable, Callable, Any
|
||||
from services import runtime_state
|
||||
from services.incidents import log_incident
|
||||
|
||||
|
||||
_queue: asyncio.Queue = asyncio.Queue()
|
||||
@@ -17,6 +18,13 @@ _stats: dict[str, Any] = runtime_state.get("queue_stats", {}) or {
|
||||
"last_finished_at": 0.0,
|
||||
}
|
||||
_history: deque[dict[str, Any]] = deque(runtime_state.get("queue_history", []) or [], maxlen=50)
|
||||
_alert_cfg: dict[str, Any] = {
|
||||
"max_pending": None,
|
||||
"avg_wait": None,
|
||||
"cooldown": 300,
|
||||
"last_sent": 0.0,
|
||||
}
|
||||
_cfg: dict[str, Any] | None = None
|
||||
|
||||
|
||||
def _save_stats():
|
||||
@@ -24,10 +32,39 @@ def _save_stats():
|
||||
runtime_state.set_state("queue_history", list(_history))
|
||||
|
||||
|
||||
def configure(queue_cfg: dict[str, Any], cfg: dict[str, Any]):
|
||||
global _cfg
|
||||
_cfg = cfg
|
||||
_alert_cfg["max_pending"] = queue_cfg.get("max_pending_alert")
|
||||
_alert_cfg["avg_wait"] = queue_cfg.get("avg_wait_alert")
|
||||
_alert_cfg["cooldown"] = queue_cfg.get("cooldown_sec", 300)
|
||||
|
||||
|
||||
def _check_congestion(pending_count: int, avg_wait: float | None):
|
||||
max_pending = _alert_cfg.get("max_pending")
|
||||
avg_wait_thr = _alert_cfg.get("avg_wait")
|
||||
cooldown = _alert_cfg.get("cooldown", 300)
|
||||
now = time.time()
|
||||
if now - _alert_cfg.get("last_sent", 0) < cooldown:
|
||||
return
|
||||
reason = None
|
||||
if max_pending and pending_count >= max_pending:
|
||||
reason = f"pending={pending_count} >= {max_pending}"
|
||||
if avg_wait_thr and avg_wait is not None and avg_wait >= avg_wait_thr:
|
||||
reason = reason or f"avg_wait={avg_wait:.1f}s >= {avg_wait_thr}s"
|
||||
if reason and _cfg:
|
||||
try:
|
||||
log_incident(_cfg, f"queue_congested {reason}", category="queue")
|
||||
except Exception:
|
||||
pass
|
||||
_alert_cfg["last_sent"] = now
|
||||
|
||||
|
||||
async def enqueue(label: str, job: Callable[[], Awaitable[None]]) -> int:
|
||||
enqueued_at = time.time()
|
||||
await _queue.put((label, job, enqueued_at))
|
||||
_pending.append((label, enqueued_at))
|
||||
_check_congestion(len(_pending), None)
|
||||
return len(_pending)
|
||||
|
||||
|
||||
@@ -75,6 +112,7 @@ async def worker():
|
||||
}
|
||||
)
|
||||
_save_stats()
|
||||
_check_congestion(len(_pending), _stats.get("avg_wait_sec"))
|
||||
_current_label = None
|
||||
_current_meta = None
|
||||
_queue.task_done()
|
||||
@@ -135,3 +173,17 @@ def format_details(limit: int = 10) -> str:
|
||||
f"(wait {item['wait_sec']}s, run {item['runtime_sec']}s)"
|
||||
)
|
||||
return "\n".join(lines)
|
||||
|
||||
|
||||
def format_history(limit: int = 20) -> str:
|
||||
lines = ["🗂 Queue history"]
|
||||
if not _history:
|
||||
lines.append("(empty)")
|
||||
return "\n".join(lines)
|
||||
for item in list(_history)[:limit]:
|
||||
t = time.strftime("%m-%d %H:%M:%S", time.localtime(item["finished_at"]))
|
||||
lines.append(
|
||||
f"{t} {item['label']} {item['status']} "
|
||||
f"(wait {item['wait_sec']}s, run {item['runtime_sec']}s)"
|
||||
)
|
||||
return "\n".join(lines)
|
||||
|
||||
@@ -5,10 +5,12 @@ from typing import Any
|
||||
|
||||
from services.health import health
|
||||
from services.runner import run_cmd_full
|
||||
from services.incidents import log_incident
|
||||
|
||||
|
||||
async def run_selftest(cfg: dict[str, Any], docker_map: dict[str, str]) -> str:
|
||||
async def run_selftest(cfg: dict[str, Any], docker_map: dict[str, str]) -> tuple[str, bool]:
|
||||
lines = ["🧪 Self-test"]
|
||||
ok = True
|
||||
|
||||
# health
|
||||
try:
|
||||
@@ -18,6 +20,7 @@ async def run_selftest(cfg: dict[str, Any], docker_map: dict[str, str]) -> str:
|
||||
lines.append(f"🟢 Health: {brief}")
|
||||
except Exception as e:
|
||||
lines.append(f"🔴 Health failed: {e}")
|
||||
ok = False
|
||||
|
||||
# restic snapshots check
|
||||
rc, out = await run_cmd_full(["restic", "snapshots", "--json"], use_restic_env=True, timeout=40)
|
||||
@@ -35,8 +38,9 @@ async def run_selftest(cfg: dict[str, Any], docker_map: dict[str, str]) -> str:
|
||||
lines.append("🟡 Restic snapshots: invalid JSON")
|
||||
else:
|
||||
lines.append(f"🔴 Restic snapshots error: {out.strip() or rc}")
|
||||
ok = False
|
||||
|
||||
return "\n".join(lines)
|
||||
return "\n".join(lines), ok
|
||||
|
||||
|
||||
async def schedule_selftest(cfg: dict[str, Any], bot, admin_ids: list[int], docker_map: dict[str, str]):
|
||||
@@ -58,9 +62,14 @@ async def schedule_selftest(cfg: dict[str, Any], bot, admin_ids: list[int], dock
|
||||
if run_at <= now:
|
||||
run_at += timedelta(days=1)
|
||||
await asyncio.sleep((run_at - now).total_seconds())
|
||||
text = await run_selftest(cfg, docker_map)
|
||||
text, ok = await run_selftest(cfg, docker_map)
|
||||
for chat_id in admin_ids:
|
||||
try:
|
||||
await bot.send_message(chat_id, text)
|
||||
except Exception:
|
||||
pass
|
||||
if not ok:
|
||||
try:
|
||||
log_incident(cfg, "selftest failed", category="selftest")
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
Reference in New Issue
Block a user