Add incident exports, queue SLA, and OpenWrt diff utilities

This commit is contained in:
2026-02-09 02:57:16 +03:00
parent 0fbd374823
commit f7081b78e1
7 changed files with 397 additions and 9 deletions

View File

@@ -56,11 +56,14 @@ HELP_PAGES = [
"🛠 **Admin & Deploy**\n\n"
"Config: `/config_check`, файл `config.yaml` (см. config.example.yaml).\n"
"Deploy: `deploy.sh` (ssh 10.10.10.10:1090 → git pull → systemctl restart tg-bot).\n"
"Incidents summary: `/incidents_summary`.\n"
"Incidents export: `/incidents_export [hours] [csv|json]`.\n"
"Alerts log: `/alerts_log [hours]`.\n"
"Incidents: `/incidents_summary`, `/incidents_diff [hours]`.\n"
"Export: `/incidents_export [hours] [csv|json]`, `/export_all [hours]` (zip).\n"
"Alerts log/heatmap: `/alerts_log [hours]`, `/alerts_heatmap [hours] [cat]`.\n"
"Backup SLA: `/backup_sla`; Docker restarts: `/docker_restarts [hours]`.\n"
"Disk snapshot: `/disk_snapshot`.\n"
"Queue history: `/queue_history`.\n"
"Queue: `/queue_history`, `/queue_sla`.\n"
"Self-test history: `/selftest_history`.\n"
"OpenWrt leases diff: `/openwrt_leases_diff`.\n"
"BotFather list: `/botfather_list`.\n"
"Безопасность: `safety.dry_run: true` блокирует опасные действия.\n"
"OpenWrt: кнопка в System → Info.",
@@ -128,9 +131,11 @@ alerts_list - List active mutes
alerts_recent - Show recent incidents (24h)
alerts_mute_load - Mute load alerts for 60m
alerts_log - Show suppressed alerts
alerts_heatmap - Hourly incidents heatmap
backup_run - Run backup (queued)
backup_history - Show backup log tail
queue_history - Show queue recent jobs
queue_sla - Queue SLA stats
docker_status - Docker summary
docker_health - Docker inspect/health by alias
docker_health_summary - Docker health summary (problems only)
@@ -139,8 +144,14 @@ openwrt_wan - OpenWrt WAN only
openwrt_clients - OpenWrt wifi clients
openwrt_leases - OpenWrt DHCP leases
openwrt_fast - OpenWrt quick WAN view
openwrt_leases_diff - OpenWrt DHCP diff
incidents_summary - Incidents counters (24h/7d)
incidents_export - Export incidents (hours fmt)
incidents_diff - Show incidents since last check
export_all - Zip with incidents/queue/selftest
backup_sla - Backup SLA check
docker_restarts - Docker restart history
selftest_history - Self-test history
disk_snapshot - Disk usage snapshot
config_check - Validate config
"""

View File

@@ -126,7 +126,7 @@ async def selftest(msg: Message):
await msg.answer("⏳ Self-test…", reply_markup=menu_kb)
async def worker():
text = await run_selftest(cfg, DOCKER_MAP)
text, _ok = await run_selftest(cfg, DOCKER_MAP)
await msg.answer(text, reply_markup=menu_kb)
asyncio.create_task(worker())

View File

@@ -1,5 +1,6 @@
import asyncio
import os
from datetime import datetime, timezone, timedelta
from aiogram import F
from aiogram.types import Message, CallbackQuery, InlineKeyboardMarkup, InlineKeyboardButton, InputFile, BufferedInputFile
from app import dp, cfg
@@ -13,12 +14,12 @@ from keyboards import (
)
from system_checks import security, disks, hardware, list_disks, smart_last_test
from services.http_checks import get_url_checks, check_url
from services.queue import enqueue
from services.queue import enqueue, get_history_raw, get_stats
from services.updates import list_updates, apply_updates
from services.runner import run_cmd
from services.runner import run_cmd, run_cmd_full
from services.npmplus import fetch_certificates, format_certificates, list_proxy_hosts, set_proxy_host
from services.gitea import get_gitea_health
from services.openwrt import get_openwrt_status
from services.openwrt import get_openwrt_status, fetch_openwrt_leases
from services.system import worst_disk_usage
import state
from state import UPDATES_CACHE, REBOOT_PENDING
@@ -27,9 +28,11 @@ from services.audit import read_audit_tail
from services.incidents import read_recent, incidents_path, read_raw, infer_category
from services.external_checks import format_report
from services.disk_report import build_disk_report
from services import runtime_state
import io
import json
import csv
import zipfile
@dp.message(F.text == "💽 Disks")
@@ -367,6 +370,86 @@ async def incidents_summary(msg: Message):
await msg.answer(text, reply_markup=system_logs_audit_kb)
@dp.message(F.text.startswith("/incidents_diff"))
async def incidents_diff(msg: Message):
if not is_admin_msg(msg):
return
parts = msg.text.split()
hours = 24
if len(parts) >= 2:
try:
hours = max(1, int(parts[1]))
except ValueError:
hours = 24
last_iso = runtime_state.get("incidents_diff_last_ts")
last_dt = None
if isinstance(last_iso, str):
try:
last_dt = datetime.fromisoformat(last_iso)
except Exception:
last_dt = None
rows = read_raw(cfg, hours=hours, limit=5000, include_old=True)
fresh: list[tuple[datetime, str]] = []
for dt, line in rows:
if last_dt and dt <= last_dt:
continue
fresh.append((dt, line))
if not fresh:
note = f"since {last_dt.astimezone().strftime('%Y-%m-%d %H:%M')}" if last_dt else "for the period"
await msg.answer(f"📣 No new incidents {note} (window {hours}h)", reply_markup=system_logs_audit_kb)
return
fresh.sort(key=lambda x: x[0])
body = "\n".join(f"{dt.astimezone().strftime('%m-%d %H:%M')} {line}" for dt, line in fresh[-200:])
await msg.answer(
f"📣 New incidents (since last mark, window {hours}h): {len(fresh)}\n```\n{body}\n```",
reply_markup=system_logs_audit_kb,
parse_mode="Markdown",
)
try:
runtime_state.set_state("incidents_diff_last_ts", fresh[-1][0].isoformat())
except Exception:
pass
@dp.message(F.text.startswith("/alerts_heatmap"))
async def alerts_heatmap(msg: Message):
if not is_admin_msg(msg):
return
parts = msg.text.split()
hours = 48
category = None
for part in parts[1:]:
if part.isdigit():
hours = max(1, int(part))
else:
category = part.lower()
rows = read_raw(cfg, hours=hours, limit=8000, include_old=True)
buckets: dict[datetime, int] = {}
for dt, line in rows:
cat = infer_category(line) or "n/a"
if category and cat != category:
continue
local = dt.astimezone()
hour = local.replace(minute=0, second=0, microsecond=0)
buckets[hour] = buckets.get(hour, 0) + 1
if not buckets:
await msg.answer(f"⚠️ No incidents for heatmap (hours={hours}, category={category or 'any'})")
return
start = datetime.now().astimezone() - timedelta(hours=hours - 1)
timeline = []
current = start.replace(minute=0, second=0, microsecond=0)
while current <= datetime.now().astimezone():
timeline.append((current, buckets.get(current, 0)))
current += timedelta(hours=1)
max_count = max(c for _, c in timeline) or 1
lines = [f"📊 Alerts heatmap (hours={hours}, category={category or 'any'})"]
for ts, cnt in timeline[-72:]:
bar_len = max(1, int(cnt / max_count * 12)) if cnt else 0
bar = "" * bar_len if cnt else ""
lines.append(f"{ts:%m-%d %H}: {cnt:>3} {bar}")
await msg.answer("\n".join(lines), reply_markup=system_logs_audit_kb)
@dp.message(F.text == "/disk_snapshot")
async def disk_snapshot(msg: Message):
if not is_admin_msg(msg):
@@ -443,6 +526,229 @@ async def incidents_export(msg: Message):
await msg.answer_document(document=BufferedInputFile(file_bytes, filename=fname))
@dp.message(F.text == "/backup_sla")
async def backup_sla(msg: Message):
if not is_admin_msg(msg):
return
rc, out = await run_cmd_full(["restic", "snapshots", "--json"], use_restic_env=True, timeout=40)
if rc != 0:
await msg.answer(f"⚠️ Restic error: {out.strip() or rc}", reply_markup=system_logs_audit_kb)
return
try:
snaps = json.loads(out)
except Exception as e:
await msg.answer(f"⚠️ Invalid restic JSON: {e}", reply_markup=system_logs_audit_kb)
return
if not isinstance(snaps, list) or not snaps:
await msg.answer("⚠️ No snapshots found", reply_markup=system_logs_audit_kb)
return
snaps.sort(key=lambda s: s.get("time", ""), reverse=True)
last_time_raw = snaps[0].get("time")
if last_time_raw:
last_dt = datetime.fromisoformat(last_time_raw.replace("Z", "+00:00"))
age_h = (datetime.now(tz=last_dt.tzinfo) - last_dt).total_seconds() / 3600
else:
last_dt = None
age_h = None
sla = float(cfg.get("backup", {}).get("sla_hours", 26))
if age_h is None:
status = "🟡"
elif age_h <= sla:
status = "🟢"
elif age_h <= sla * 1.5:
status = "🟡"
else:
status = "🔴"
last_str = last_dt.astimezone().strftime("%Y-%m-%d %H:%M") if last_dt else "n/a"
age_str = f"{age_h:.1f}h" if age_h is not None else "n/a"
await msg.answer(
f"{status} Backup SLA\n"
f"Snapshots: {len(snaps)}\n"
f"Last: {last_str} (age {age_str})\n"
f"SLA: {sla}h",
reply_markup=system_logs_audit_kb,
)
@dp.message(F.text.startswith("/docker_restarts"))
async def docker_restarts(msg: Message):
if not is_admin_msg(msg):
return
parts = msg.text.split()
hours = 168
if len(parts) >= 2:
try:
hours = max(1, int(parts[1]))
except ValueError:
hours = 168
rows = read_raw(cfg, hours=hours, limit=5000, include_old=True)
restarts = [(dt, line) for dt, line in rows if "docker_restart" in line]
if not restarts:
await msg.answer(f"🐳 No docker restarts in last {hours}h", reply_markup=system_logs_audit_kb)
return
counts: dict[str, int] = {}
for _dt, line in restarts:
parts_line = line.split()
if len(parts_line) >= 2:
alias = parts_line[-1]
else:
alias = "unknown"
counts[alias] = counts.get(alias, 0) + 1
top = ", ".join(f"{k}:{v}" for k, v in sorted(counts.items(), key=lambda x: x[1], reverse=True))
body = "\n".join(f"{dt.astimezone().strftime('%m-%d %H:%M')} {line}" for dt, line in restarts[-50:])
await msg.answer(
f"🐳 Docker restarts ({hours}h): {len(restarts)} ({top})\n```\n{body}\n```",
reply_markup=system_logs_audit_kb,
parse_mode="Markdown",
)
@dp.message(F.text.startswith("/openwrt_leases_diff"))
async def openwrt_leases_diff(msg: Message):
if not is_admin_msg(msg):
return
await msg.answer("⏳ OpenWrt leases diff…", reply_markup=system_info_kb)
async def worker():
try:
leases_now = await fetch_openwrt_leases(cfg)
except Exception as e:
await msg.answer(f"⚠️ OpenWrt error: {e}", reply_markup=system_info_kb)
return
prev = runtime_state.get("openwrt_leases_prev", [])
if not prev:
runtime_state.set_state("openwrt_leases_prev", leases_now)
await msg.answer(f"Baseline saved ({len(leases_now)} leases)", reply_markup=system_info_kb)
return
prev_set = set(prev)
now_set = set(leases_now)
added = sorted(list(now_set - prev_set))
removed = sorted(list(prev_set - now_set))
lines = [f"🧾 OpenWrt leases diff ({len(added)} added, {len(removed)} removed)"]
if added:
lines.append(" Added:")
lines.extend([f" - {x}" for x in added[:50]])
if removed:
lines.append(" Removed:")
lines.extend([f" - {x}" for x in removed[:50]])
if len(added) > 50 or len(removed) > 50:
lines.append("… trimmed")
runtime_state.set_state("openwrt_leases_prev", leases_now)
await msg.answer("\n".join(lines), reply_markup=system_info_kb)
asyncio.create_task(worker())
@dp.message(F.text == "/queue_sla")
async def queue_sla(msg: Message):
if not is_admin_msg(msg):
return
hist = get_history_raw()
if not hist:
await msg.answer("🧾 Queue SLA: history is empty", reply_markup=system_logs_audit_kb)
return
waits = [item.get("wait_sec", 0) for item in hist]
runs = [item.get("runtime_sec", 0) for item in hist]
def p95(values: list[int]) -> float:
if not values:
return 0.0
vals = sorted(values)
k = int(0.95 * (len(vals) - 1))
return float(vals[k])
lines = [
"🧾 Queue SLA",
f"Samples: {len(hist)}",
f"Max wait: {max(waits)}s, p95 {p95(waits):.1f}s",
f"Max run: {max(runs)}s, p95 {p95(runs):.1f}s",
]
stats = get_stats()
if stats:
lines.append(
f"Avg wait: {stats.get('avg_wait_sec', 0):.1f}s, "
f"avg run: {stats.get('avg_runtime_sec', 0):.1f}s"
)
await msg.answer("\n".join(lines), reply_markup=system_logs_audit_kb)
@dp.message(F.text == "/selftest_history")
async def selftest_history(msg: Message):
if not is_admin_msg(msg):
return
hist = runtime_state.get("selftest_history", [])
if not hist:
await msg.answer("🧪 Self-test history is empty", reply_markup=system_logs_audit_kb)
return
lines = ["🧪 Self-test history"]
for entry in hist[:15]:
ts = entry.get("ts", "")[:16]
ok = entry.get("ok", False)
emoji = "🟢" if ok else "🔴"
summary = entry.get("summary", "")
lines.append(f"{emoji} {ts} {summary}")
await msg.answer("\n".join(lines), reply_markup=system_logs_audit_kb)
@dp.message(F.text.startswith("/export_all"))
async def export_all(msg: Message):
if not is_admin_msg(msg):
return
hours = 168
parts = msg.text.split()
if len(parts) >= 2:
try:
hours = max(1, int(parts[1]))
except ValueError:
hours = 168
incidents_rows = read_raw(cfg, hours=hours, limit=20000, include_old=True)
inc_sio = io.StringIO()
inc_writer = csv.writer(inc_sio, delimiter=";")
inc_writer.writerow(["timestamp", "category", "message"])
for dt, line in incidents_rows:
inc_writer.writerow([dt.astimezone().isoformat(), infer_category(line) or "n/a", line])
queue_rows = get_history_raw()
q_sio = io.StringIO()
q_writer = csv.writer(q_sio, delimiter=";")
q_writer.writerow(["finished_at", "label", "status", "wait_sec", "runtime_sec"])
for item in queue_rows:
q_writer.writerow(
[
datetime.fromtimestamp(item.get("finished_at", 0)).isoformat()
if item.get("finished_at")
else "",
item.get("label", ""),
item.get("status", ""),
item.get("wait_sec", ""),
item.get("runtime_sec", ""),
]
)
st_hist = runtime_state.get("selftest_history", [])
st_sio = io.StringIO()
st_writer = csv.writer(st_sio, delimiter=";")
st_writer.writerow(["timestamp", "ok", "summary"])
for entry in st_hist:
st_writer.writerow([entry.get("ts", ""), entry.get("ok", False), entry.get("summary", "")])
mem_zip = io.BytesIO()
with zipfile.ZipFile(mem_zip, "w", compression=zipfile.ZIP_DEFLATED) as zf:
zf.writestr(f"incidents_{hours}h.csv", inc_sio.getvalue())
zf.writestr("queue_history.csv", q_sio.getvalue())
zf.writestr("selftest_history.csv", st_sio.getvalue())
mem_zip.seek(0)
summary = (
f"📦 Export all\n"
f"Incidents: {len(incidents_rows)} rows ({hours}h)\n"
f"Queue history: {len(queue_rows)} rows\n"
f"Selftest: {len(st_hist)} rows"
)
await msg.answer(summary)
await msg.answer_document(document=BufferedInputFile(mem_zip.read(), filename="export_all.zip"))
@dp.message(F.text == "🔒 SSL")
async def ssl_certs(msg: Message):
if not is_admin_msg(msg):

View File

@@ -4,6 +4,7 @@ from collections import deque
from datetime import datetime, timedelta, timezone
from logging.handlers import TimedRotatingFileHandler
from typing import Any
from services import runtime_state
def _get_path(cfg: dict[str, Any]) -> str:

View File

@@ -460,3 +460,45 @@ async def get_openwrt_status(cfg: dict[str, Any], mode: str = "full") -> str:
if mode == "leases":
return "\n".join(header + lease_section)
return "\n".join(header + wifi_section + lease_section)
async def fetch_openwrt_leases(cfg: dict[str, Any]) -> list[str]:
"""
Fetch DHCP leases as list of strings "IP hostname (MAC)".
"""
ow_cfg = cfg.get("openwrt", {})
host = ow_cfg.get("host")
user = ow_cfg.get("user", "root")
port = ow_cfg.get("port", 22)
identity_file = ow_cfg.get("identity_file")
timeout_sec = ow_cfg.get("timeout_sec", 8)
strict = ow_cfg.get("strict_host_key_checking", True)
if not host:
raise RuntimeError("OpenWrt host not configured")
ssh_cmd = [
"ssh",
"-o",
"BatchMode=yes",
"-o",
f"ConnectTimeout={timeout_sec}",
"-o",
"LogLevel=ERROR",
]
if not strict:
ssh_cmd += ["-o", "StrictHostKeyChecking=no", "-o", "UserKnownHostsFile=/dev/null"]
if identity_file:
ssh_cmd += ["-i", str(identity_file)]
ssh_cmd += ["-p", str(port), f"{user}@{host}"]
remote = "ubus call luci-rpc getDHCPLeases '{\"family\":4}' 2>/dev/null || cat /tmp/dhcp.leases"
rc, out = await run_cmd_full(ssh_cmd + ["sh", "-c", remote], timeout=timeout_sec + 10)
if rc == 124:
raise RuntimeError("timeout")
if rc != 0:
raise RuntimeError(out.strip() or f"ssh rc={rc}")
leases = _safe_json_load(out)
if leases:
return _extract_leases(leases)
return _parse_leases_fallback(out)

View File

@@ -187,3 +187,11 @@ def format_history(limit: int = 20) -> str:
f"(wait {item['wait_sec']}s, run {item['runtime_sec']}s)"
)
return "\n".join(lines)
def get_history_raw() -> list[dict[str, Any]]:
return list(_history)
def get_stats() -> dict[str, Any]:
return dict(_stats)

View File

@@ -6,6 +6,14 @@ from typing import Any
from services.health import health
from services.runner import run_cmd_full
from services.incidents import log_incident
from services import runtime_state
def _save_history(entry: dict[str, Any]) -> None:
hist = runtime_state.get("selftest_history", [])
hist = hist[:50] if isinstance(hist, list) else []
hist.insert(0, entry)
runtime_state.set_state("selftest_history", hist[:20])
async def run_selftest(cfg: dict[str, Any], docker_map: dict[str, str]) -> tuple[str, bool]:
@@ -40,7 +48,19 @@ async def run_selftest(cfg: dict[str, Any], docker_map: dict[str, str]) -> tuple
lines.append(f"🔴 Restic snapshots error: {out.strip() or rc}")
ok = False
return "\n".join(lines), ok
result_text = "\n".join(lines)
try:
_save_history(
{
"ts": datetime.now().isoformat(),
"ok": ok,
"summary": result_text.splitlines()[1] if len(lines) > 1 else "",
}
)
except Exception:
pass
return result_text, ok
async def schedule_selftest(cfg: dict[str, Any], bot, admin_ids: list[int], docker_map: dict[str, str]):