Add selftest scheduler, queue history, and OpenWrt signal stats

This commit is contained in:
2026-02-09 01:56:27 +03:00
parent aa7bd85687
commit 75113b6182
11 changed files with 216 additions and 39 deletions

View File

@@ -12,6 +12,7 @@ async def monitor_resources(cfg, notify, bot, chat_id):
cooldown = int(alerts_cfg.get("cooldown_sec", 900))
notify_recovery = bool(alerts_cfg.get("notify_recovery", True))
load_only_critical = bool(alerts_cfg.get("load_only_critical", False))
auto_mute_high_load_sec = int(alerts_cfg.get("auto_mute_on_high_load_sec", 0))
disk_warn = int(cfg.get("thresholds", {}).get("disk_warn", 80))
snapshot_warn = int(cfg.get("disk_report", {}).get("threshold", disk_warn))
@@ -72,6 +73,10 @@ async def monitor_resources(cfg, notify, bot, chat_id):
key = "load_high_crit" if level == 2 else "load_high_warn"
await notify(bot, chat_id, f"{icon} Load high: {load:.2f}", level=level_name, key=key, category="load")
last_sent["load"] = now
if level == 2 and auto_mute_high_load_sec > 0:
from services.alert_mute import set_mute
set_mute("load", auto_mute_high_load_sec)
state["load_level"] = level
await asyncio.sleep(interval)

View File

@@ -240,7 +240,7 @@ def _parse_hostapd_clients(
*,
name_map: dict[str, str] | None = None,
ifname_meta: dict[str, dict[str, str]] | None = None,
) -> list[str]:
) -> list[tuple[str, int | None, str]]:
if not isinstance(payload, dict):
return []
data = payload.get("clients")
@@ -248,7 +248,7 @@ def _parse_hostapd_clients(
items = data.items()
else:
return []
clients: list[str] = []
clients: list[tuple[str, int | None, str]] = []
name_map = name_map or {}
meta = (ifname_meta or {}).get(ifname, {})
ssid = meta.get("ssid") or ""
@@ -274,7 +274,8 @@ def _parse_hostapd_clients(
client_label = host
else:
client_label = str(mac)
clients.append(f"{net_label} {client_label} {sig} rx:{rx} tx:{tx}")
line = f"{net_label} {client_label} {sig} rx:{rx} tx:{tx}"
clients.append((line, signal if isinstance(signal, (int, float)) else None, net_label))
return clients
@@ -384,6 +385,7 @@ async def get_openwrt_status(cfg: dict[str, Any], mode: str = "full") -> str:
if leases_fallback:
lease_name_map.update(_extract_lease_name_map_fallback(leases_fallback))
wifi_net_counts: dict[str, int] = {}
wifi_signals: dict[str, list[int]] = {}
if ifnames:
for ifname in ifnames:
cmd_clients = ssh_cmd + ["ubus", "call", f"hostapd.{ifname}", "get_clients"]
@@ -397,14 +399,16 @@ async def get_openwrt_status(cfg: dict[str, Any], mode: str = "full") -> str:
if isinstance(clients_payload, dict):
label = _net_label_for_ifname(ifname, ifname_meta)
wifi_net_counts[label] = wifi_net_counts.get(label, 0) + len(clients_payload)
wifi_clients.extend(
_parse_hostapd_clients(
payload,
ifname,
name_map=lease_name_map,
ifname_meta=ifname_meta,
)
parsed = _parse_hostapd_clients(
payload,
ifname,
name_map=lease_name_map,
ifname_meta=ifname_meta,
)
wifi_clients.extend([p[0] for p in parsed])
for _line, sig, net_label in parsed:
if sig is not None and net_label:
wifi_signals.setdefault(net_label, []).append(sig)
if leases:
leases_list = _extract_leases(leases)
@@ -422,7 +426,13 @@ async def get_openwrt_status(cfg: dict[str, Any], mode: str = "full") -> str:
if wifi_net_counts:
wifi_section.append("📶 Wi-Fi networks:")
for label, count in sorted(wifi_net_counts.items()):
wifi_section.append(f" - {label}: {count}")
sigs = wifi_signals.get(label) or []
if sigs:
avg_sig = sum(sigs) / len(sigs)
min_sig = min(sigs)
wifi_section.append(f" - {label}: {count} (avg {avg_sig:.0f}dBm, min {min_sig}dBm)")
else:
wifi_section.append(f" - {label}: {count}")
wifi_section.append("")
wifi_section.append(f"📶 Wi-Fi clients: {len(wifi_clients)}")

View File

@@ -16,10 +16,12 @@ _stats: dict[str, Any] = runtime_state.get("queue_stats", {}) or {
"last_label": "",
"last_finished_at": 0.0,
}
_history: deque[dict[str, Any]] = deque(runtime_state.get("queue_history", []) or [], maxlen=50)
def _save_stats():
runtime_state.set_state("queue_stats", _stats)
runtime_state.set_state("queue_history", list(_history))
async def enqueue(label: str, job: Callable[[], Awaitable[None]]) -> int:
@@ -43,8 +45,11 @@ async def worker():
pass
_current_label = label
_current_meta = {"enqueued_at": enqueued_at, "started_at": time.time()}
status = "ok"
try:
await job()
except Exception:
status = "err"
finally:
finished_at = time.time()
if _current_meta:
@@ -60,6 +65,15 @@ async def worker():
) / _stats["processed"]
_stats["last_label"] = label
_stats["last_finished_at"] = finished_at
_history.appendleft(
{
"label": label,
"wait_sec": int(wait_sec),
"runtime_sec": int(runtime_sec),
"finished_at": int(finished_at),
"status": status,
}
)
_save_stats()
_current_label = None
_current_meta = None
@@ -111,4 +125,13 @@ def format_details(limit: int = 10) -> str:
last_label = _stats.get("last_label")
if last_label:
lines.append(f"Last: {last_label}")
if _history:
lines.append("")
lines.append("🗂 Last jobs:")
for item in list(_history)[:5]:
t = time.strftime("%H:%M:%S", time.localtime(item["finished_at"]))
lines.append(
f"- {t} {item['label']} {item['status']} "
f"(wait {item['wait_sec']}s, run {item['runtime_sec']}s)"
)
return "\n".join(lines)

66
services/selftest.py Normal file
View File

@@ -0,0 +1,66 @@
import json
from datetime import datetime, timedelta
import asyncio
from typing import Any
from services.health import health
from services.runner import run_cmd_full
async def run_selftest(cfg: dict[str, Any], docker_map: dict[str, str]) -> str:
lines = ["🧪 Self-test"]
# health
try:
htext = await asyncio.to_thread(health, cfg, docker_map)
h_lines = [ln for ln in htext.splitlines() if ln.strip()]
brief = " | ".join(h_lines[1:5]) if len(h_lines) > 1 else h_lines[0] if h_lines else "n/a"
lines.append(f"🟢 Health: {brief}")
except Exception as e:
lines.append(f"🔴 Health failed: {e}")
# restic snapshots check
rc, out = await run_cmd_full(["restic", "snapshots", "--json"], use_restic_env=True, timeout=40)
if rc == 0:
try:
snaps = json.loads(out)
if isinstance(snaps, list) and snaps:
snaps.sort(key=lambda s: s.get("time", ""), reverse=True)
last = snaps[0]
t = last.get("time", "?").replace("Z", "").replace("T", " ")[:16]
lines.append(f"🟢 Restic snapshots: {len(snaps)}, last {t}")
else:
lines.append("🟡 Restic snapshots: empty")
except Exception:
lines.append("🟡 Restic snapshots: invalid JSON")
else:
lines.append(f"🔴 Restic snapshots error: {out.strip() or rc}")
return "\n".join(lines)
async def schedule_selftest(cfg: dict[str, Any], bot, admin_ids: list[int], docker_map: dict[str, str]):
"""
Run selftest daily at configured time.
"""
sched_cfg = cfg.get("selftest", {}).get("schedule", {})
if not sched_cfg.get("enabled", False):
return
time_str = sched_cfg.get("time", "03:30")
try:
hh, mm = [int(x) for x in time_str.split(":")]
except Exception:
hh, mm = 3, 30
while True:
now = datetime.now()
run_at = now.replace(hour=hh, minute=mm, second=0, microsecond=0)
if run_at <= now:
run_at += timedelta(days=1)
await asyncio.sleep((run_at - now).total_seconds())
text = await run_selftest(cfg, docker_map)
for chat_id in admin_ids:
try:
await bot.send_message(chat_id, text)
except Exception:
pass