Compare commits

..

37 Commits

Author SHA1 Message Date
b84107463c Add dedicated RAID alert category and monitor 2026-02-25 01:43:10 +03:00
ee361abb99 Detect md arrays via /proc/mdstat for RAID status 2026-02-25 01:39:11 +03:00
2ad423fb6a Fix md RAID detection for lsblk raid* types 2026-02-25 01:36:59 +03:00
efa5dd9644 Fix mojibake text and add md RAID checks 2026-02-25 01:32:55 +03:00
678332e6d0 Add lightweight unittest coverage for stability fixes 2026-02-15 01:25:11 +03:00
7c56430f32 Unify admin callback checks and log queue job failures 2026-02-15 01:20:55 +03:00
b54a094185 Add safe config fallbacks for app init and health checks 2026-02-15 01:16:58 +03:00
6d5fb9c258 Harden docker callback parsing and remove duplicate /openwrt handler 2026-02-15 01:12:45 +03:00
5099ae4fe2 Fix critical race conditions and unsafe disk report command 2026-02-15 01:12:41 +03:00
568cd86844 Fix heatmap button args 2026-02-15 00:51:09 +03:00
b138ee316d Import backup keyboard for SLA handlers 2026-02-15 00:46:53 +03:00
fa98a96b34 Route any SLA text to corresponding handler 2026-02-15 00:45:56 +03:00
1dba6d4a0f Match SLA buttons via regex 2026-02-15 00:44:14 +03:00
b784deb02b Ack SLA requests immediately 2026-02-15 00:35:32 +03:00
5ae54618e8 Broaden SLA button matching 2026-02-15 00:32:09 +03:00
3fc99bdcfc Handle SLA buttons without emojis 2026-02-15 00:30:39 +03:00
c1d69adbc5 Make incidents diff resilient and send sample if empty 2026-02-09 04:21:27 +03:00
a14fb8fccd Show recent sample when incidents diff is empty 2026-02-09 04:19:59 +03:00
4ba8f48228 Auto-reset incidents diff marker if ahead of log 2026-02-09 04:18:05 +03:00
10bf265c29 Add reset option to /incidents_diff 2026-02-09 04:16:28 +03:00
fd179d24e8 Remove Incidents entry from main keyboard 2026-02-09 04:13:47 +03:00
2905528677 Keep incidents summary inside logs keyboard 2026-02-09 04:12:44 +03:00
2b87ce04a3 Keep backup/queue SLA and OpenWrt leases diff in their menus 2026-02-09 04:10:04 +03:00
02b8e2bb55 Keep docker restarts inside docker keyboard 2026-02-09 04:08:27 +03:00
f0fb2aad0e Split OpenWrt menu vs full status actions 2026-02-09 04:06:49 +03:00
219776c642 Disambiguate OpenWrt menu vs full status button 2026-02-09 04:05:25 +03:00
28caa551bd Narrow /docker_health match to avoid summary collisions 2026-02-09 04:03:17 +03:00
783f4abd98 Use icon buttons for incidents, queue and OpenWrt actions 2026-02-09 04:00:04 +03:00
f71c02835a Adjust keyboards with incidents and OpenWrt submenus 2026-02-09 03:45:13 +03:00
f7081b78e1 Add incident exports, queue SLA, and OpenWrt diff utilities 2026-02-09 02:57:16 +03:00
0fbd374823 Log docker restarts as incidents 2026-02-09 02:45:06 +03:00
c3db70160c Use semicolon delimiter in incidents_export CSV 2026-02-09 02:32:50 +03:00
1b9d260530 Use BufferedInputFile for incidents_export 2026-02-09 02:31:24 +03:00
040a6c96e4 Seek to start before sending incidents export files 2026-02-09 02:30:17 +03:00
4f6d6dd549 Fix incidents_export file delivery 2026-02-09 02:28:49 +03:00
2e0bf0c6ea Add incidents export, queue alerts, and health summaries 2026-02-09 02:24:08 +03:00
5a4234f59d Log incidents even when alerts are muted 2026-02-09 02:09:32 +03:00
30 changed files with 1232 additions and 110 deletions

View File

@@ -33,7 +33,7 @@ This project uses `config.yaml`. Start from `config.example.yaml`.
- `end` (string): End time `HH:MM` (e.g. `08:00`). - `end` (string): End time `HH:MM` (e.g. `08:00`).
- `allow_critical` (bool): Allow critical alerts during quiet hours. - `allow_critical` (bool): Allow critical alerts during quiet hours.
- `auto_mute` (list): Per-category auto mutes by time window. - `auto_mute` (list): Per-category auto mutes by time window.
- `category` (string): load/disk/smart/ssl/docker/test. - `category` (string): load/disk/smart/raid/ssl/docker/test.
- `start` (string): Start `HH:MM`. - `start` (string): Start `HH:MM`.
- `end` (string): End `HH:MM` (can wrap over midnight). - `end` (string): End `HH:MM` (can wrap over midnight).
- `auto_mute_on_high_load_sec` (int): auto-mute `load` category for N seconds on critical load (0 disables). - `auto_mute_on_high_load_sec` (int): auto-mute `load` category for N seconds on critical load (0 disables).
@@ -42,6 +42,9 @@ This project uses `config.yaml`. Start from `config.example.yaml`.
- `smart_interval_sec` (int): SMART poll interval. - `smart_interval_sec` (int): SMART poll interval.
- `smart_cooldown_sec` (int): SMART alert cooldown. - `smart_cooldown_sec` (int): SMART alert cooldown.
- `smart_temp_warn` (int): SMART temperature warning (C). - `smart_temp_warn` (int): SMART temperature warning (C).
- `raid_enabled` (bool): Enable md RAID polling (`/proc/mdstat`).
- `raid_interval_sec` (int): RAID poll interval.
- `raid_cooldown_sec` (int): RAID alert cooldown.
## disk_report ## disk_report
@@ -88,6 +91,11 @@ This project uses `config.yaml`. Start from `config.example.yaml`.
- `schedule.enabled` (bool): Enable auto self-test. - `schedule.enabled` (bool): Enable auto self-test.
- `schedule.time` (string): Local time `HH:MM` (default `03:30`). - `schedule.time` (string): Local time `HH:MM` (default `03:30`).
## queue
- `max_pending_alert` (int): Alert if pending tasks >= this value.
- `avg_wait_alert` (int): Alert if average wait exceeds N seconds.
- `cooldown_sec` (int): Cooldown between queue alerts (default 300s).
## external_checks ## external_checks
- `enabled` (bool): Enable background checks. - `enabled` (bool): Enable background checks.

View File

@@ -33,7 +33,7 @@
- `end` (string): конец, формат `HH:MM` (например `08:00`). - `end` (string): конец, формат `HH:MM` (например `08:00`).
- `allow_critical` (bool): слать критичные алерты в тишину. - `allow_critical` (bool): слать критичные алерты в тишину.
- `auto_mute` (list): авто‑мьюты по категориям и времени. - `auto_mute` (list): авто‑мьюты по категориям и времени.
- `category` (string): load/disk/smart/ssl/docker/test. - `category` (string): load/disk/smart/raid/ssl/docker/test.
- `start` (string): начало `HH:MM`. - `start` (string): начало `HH:MM`.
- `end` (string): конец `HH:MM` (интервал может пересекать ночь). - `end` (string): конец `HH:MM` (интервал может пересекать ночь).
- `auto_mute_on_high_load_sec` (int): при critical load автоматически мьютить категорию `load` на N секунд (0 — выкл). - `auto_mute_on_high_load_sec` (int): при critical load автоматически мьютить категорию `load` на N секунд (0 — выкл).
@@ -42,6 +42,9 @@
- `smart_interval_sec` (int): интервал SMART. - `smart_interval_sec` (int): интервал SMART.
- `smart_cooldown_sec` (int): кулдаун SMART. - `smart_cooldown_sec` (int): кулдаун SMART.
- `smart_temp_warn` (int): порог температуры (C). - `smart_temp_warn` (int): порог температуры (C).
- `raid_enabled` (bool): RAID проверки (`/proc/mdstat`).
- `raid_interval_sec` (int): интервал RAID.
- `raid_cooldown_sec` (int): кулдаун RAID алертов.
## disk_report ## disk_report
@@ -88,6 +91,12 @@
- `schedule.enabled` (bool): включить авто self-test. - `schedule.enabled` (bool): включить авто self-test.
- `schedule.time` (string): локальное время `HH:MM`, по умолчанию `03:30`. - `schedule.time` (string): локальное время `HH:MM`, по умолчанию `03:30`.
## queue
- `max_pending_alert` (int): алерт, если задач в очереди >= этому значению.
- `avg_wait_alert` (int): алерт, если среднее ожидание превышает N секунд.
- `cooldown_sec` (int): кулдаун между алертами очереди, по умолчанию 300с.
## external_checks ## external_checks
- `enabled` (bool): включить фоновые проверки. - `enabled` (bool): включить фоновые проверки.

2
app.py
View File

@@ -15,7 +15,7 @@ else:
paths_cfg = cfg.get("paths", {}) paths_cfg = cfg.get("paths", {})
runtime_state.configure(paths_cfg.get("runtime_state", "/var/server-bot/runtime.json")) runtime_state.configure(paths_cfg.get("runtime_state", "/var/server-bot/runtime.json"))
ARTIFACT_STATE = paths_cfg["artifact_state"] ARTIFACT_STATE = paths_cfg.get("artifact_state", "/opt/tg-bot/state.json")
RESTIC_ENV = load_env(paths_cfg.get("restic_env", "/etc/restic/restic.env")) RESTIC_ENV = load_env(paths_cfg.get("restic_env", "/etc/restic/restic.env"))
DISK_WARN = int(cfg.get("thresholds", {}).get("disk_warn", 80)) DISK_WARN = int(cfg.get("thresholds", {}).get("disk_warn", 80))

View File

@@ -43,6 +43,9 @@ alerts:
smart_interval_sec: 3600 smart_interval_sec: 3600
smart_cooldown_sec: 21600 smart_cooldown_sec: 21600
smart_temp_warn: 50 smart_temp_warn: 50
raid_enabled: true
raid_interval_sec: 300
raid_cooldown_sec: 1800
disk_report: disk_report:
threshold: 90 threshold: 90
@@ -85,6 +88,11 @@ selftest:
enabled: false enabled: false
time: "03:30" time: "03:30"
queue:
max_pending_alert: 5
avg_wait_alert: 120
cooldown_sec: 300
external_checks: external_checks:
enabled: true enabled: true
state_path: "/var/server-bot/external_checks.json" state_path: "/var/server-bot/external_checks.json"

View File

@@ -16,7 +16,7 @@ HELP_TEXT = (
"/alerts unmute <category> - unmute category\n" "/alerts unmute <category> - unmute category\n"
"/alerts list - show active mutes\n" "/alerts list - show active mutes\n"
"/alerts recent [hours] - show incidents log (default 24h)\n" "/alerts recent [hours] - show incidents log (default 24h)\n"
"Categories: load, disk, smart, ssl, docker, test\n" "Categories: load, disk, smart, raid, ssl, docker, test\n"
) )

View File

@@ -2,7 +2,7 @@ import asyncio
from datetime import datetime from datetime import datetime
from aiogram import F from aiogram import F
from aiogram.types import Message, InlineKeyboardMarkup, InlineKeyboardButton, CallbackQuery from aiogram.types import Message, InlineKeyboardMarkup, InlineKeyboardButton, CallbackQuery
from app import dp, cfg from app import dp, cfg, ADMIN_IDS
from auth import is_admin_msg from auth import is_admin_msg
from keyboards import docker_kb, arcane_kb from keyboards import docker_kb, arcane_kb
from services.arcane import list_projects, restart_project, set_project_state, get_project_details from services.arcane import list_projects, restart_project, set_project_state, get_project_details
@@ -115,7 +115,7 @@ async def arcane_refresh(msg: Message):
@dp.callback_query(F.data == "arcane:refresh") @dp.callback_query(F.data == "arcane:refresh")
async def arcane_refresh_inline(cb: CallbackQuery): async def arcane_refresh_inline(cb: CallbackQuery):
if cb.from_user.id != cfg["telegram"]["admin_id"]: if cb.from_user.id not in ADMIN_IDS:
return return
await cb.answer() await cb.answer()
await cmd_arcane_projects(cb.message, edit=True) await cmd_arcane_projects(cb.message, edit=True)
@@ -123,7 +123,7 @@ async def arcane_refresh_inline(cb: CallbackQuery):
@dp.callback_query(F.data.startswith("arcane:page:")) @dp.callback_query(F.data.startswith("arcane:page:"))
async def arcane_page(cb: CallbackQuery): async def arcane_page(cb: CallbackQuery):
if cb.from_user.id != cfg["telegram"]["admin_id"]: if cb.from_user.id not in ADMIN_IDS:
return return
try: try:
page = int(cb.data.split(":", 2)[2]) page = int(cb.data.split(":", 2)[2])
@@ -141,7 +141,7 @@ async def arcane_page(cb: CallbackQuery):
@dp.callback_query(F.data.startswith("arcane:restart:")) @dp.callback_query(F.data.startswith("arcane:restart:"))
async def arcane_restart(cb: CallbackQuery): async def arcane_restart(cb: CallbackQuery):
if cb.from_user.id != cfg["telegram"]["admin_id"]: if cb.from_user.id not in ADMIN_IDS:
return return
_, _, pid = cb.data.split(":", 2) _, _, pid = cb.data.split(":", 2)
@@ -160,7 +160,7 @@ async def arcane_restart(cb: CallbackQuery):
@dp.callback_query(F.data.startswith("arcane:details:")) @dp.callback_query(F.data.startswith("arcane:details:"))
async def arcane_details(cb: CallbackQuery): async def arcane_details(cb: CallbackQuery):
if cb.from_user.id != cfg["telegram"]["admin_id"]: if cb.from_user.id not in ADMIN_IDS:
return return
_, _, pid = cb.data.split(":", 2) _, _, pid = cb.data.split(":", 2)
@@ -208,7 +208,7 @@ async def arcane_details(cb: CallbackQuery):
@dp.callback_query(F.data.startswith("arcane:deploy:")) @dp.callback_query(F.data.startswith("arcane:deploy:"))
async def arcane_deploy_status(cb: CallbackQuery): async def arcane_deploy_status(cb: CallbackQuery):
if cb.from_user.id != cfg["telegram"]["admin_id"]: if cb.from_user.id not in ADMIN_IDS:
return return
_, _, pid = cb.data.split(":", 2) _, _, pid = cb.data.split(":", 2)
@@ -254,7 +254,7 @@ async def arcane_deploy_status(cb: CallbackQuery):
@dp.callback_query(F.data.startswith("arcane:up:")) @dp.callback_query(F.data.startswith("arcane:up:"))
async def arcane_up(cb: CallbackQuery): async def arcane_up(cb: CallbackQuery):
if cb.from_user.id != cfg["telegram"]["admin_id"]: if cb.from_user.id not in ADMIN_IDS:
return return
_, _, pid = cb.data.split(":", 2) _, _, pid = cb.data.split(":", 2)
@@ -273,7 +273,7 @@ async def arcane_up(cb: CallbackQuery):
@dp.callback_query(F.data.startswith("arcane:down:")) @dp.callback_query(F.data.startswith("arcane:down:"))
async def arcane_down(cb: CallbackQuery): async def arcane_down(cb: CallbackQuery):
if cb.from_user.id != cfg["telegram"]["admin_id"]: if cb.from_user.id not in ADMIN_IDS:
return return
_, _, pid = cb.data.split(":", 2) _, _, pid = cb.data.split(":", 2)

View File

@@ -8,9 +8,10 @@ from app import dp, cfg
from auth import is_admin_msg, is_admin_cb from auth import is_admin_msg, is_admin_cb
from keyboards import backup_kb from keyboards import backup_kb
from lock_utils import acquire_lock, release_lock from lock_utils import acquire_lock, release_lock
from services.queue import enqueue, format_status, format_details from services.queue import enqueue, format_status, format_details, format_history
from services.backup import backup_badge, restore_help from services.backup import backup_badge, restore_help
from services.runner import run_cmd, run_cmd_full from services.runner import run_cmd, run_cmd_full
from services.incidents import log_incident
def _parse_systemctl_kv(raw: str) -> dict[str, str]: def _parse_systemctl_kv(raw: str) -> dict[str, str]:
@@ -551,6 +552,13 @@ async def backup_history(msg: Message):
) )
@dp.message(F.text == "/queue_history")
async def queue_history(msg: Message):
if not is_admin_msg(msg):
return
await msg.answer(format_history(), reply_markup=backup_kb)
@dp.callback_query(F.data == "backup:retry") @dp.callback_query(F.data == "backup:retry")
async def backup_retry(cb: CallbackQuery): async def backup_retry(cb: CallbackQuery):
if not is_admin_cb(cb): if not is_admin_cb(cb):

View File

@@ -2,8 +2,9 @@ import json
import time import time
from aiogram import F from aiogram import F
from aiogram.types import CallbackQuery, InlineKeyboardMarkup, InlineKeyboardButton from aiogram.types import CallbackQuery, InlineKeyboardMarkup, InlineKeyboardButton
from app import dp, ADMIN_ID from app import dp, ADMIN_ID, cfg
from services.docker import docker_cmd from services.docker import docker_cmd
from services.incidents import log_incident
from services.runner import run_cmd from services.runner import run_cmd
from state import DOCKER_MAP, LOG_FILTER_PENDING from state import DOCKER_MAP, LOG_FILTER_PENDING
from handlers.backup import cmd_backup_status from handlers.backup import cmd_backup_status
@@ -14,8 +15,15 @@ async def docker_callback(cb: CallbackQuery):
if cb.from_user.id != ADMIN_ID: if cb.from_user.id != ADMIN_ID:
return return
try:
_, action, alias = cb.data.split(":", 2) _, action, alias = cb.data.split(":", 2)
real = DOCKER_MAP[alias] except ValueError:
await cb.answer("Bad request")
return
real = DOCKER_MAP.get(alias)
if not real:
await cb.answer("Container not found")
return
if action == "restart": if action == "restart":
await cb.answer("Restarting…") await cb.answer("Restarting…")
@@ -25,6 +33,10 @@ async def docker_callback(cb: CallbackQuery):
f"🔄 **{alias} restarted**\n```{out}```", f"🔄 **{alias} restarted**\n```{out}```",
parse_mode="Markdown" parse_mode="Markdown"
) )
try:
log_incident(cfg, f"docker_restart {alias}", category="docker")
except Exception:
pass
elif action == "logs": elif action == "logs":
await cb.answer() await cb.answer()
@@ -55,7 +67,7 @@ async def snapshot_details(cb: CallbackQuery):
snap_id = cb.data.split(":", 1)[1] snap_id = cb.data.split(":", 1)[1]
await cb.answer("Loading snapshot…") await cb.answer("Loading snapshot…")
# получаем статистику snapshot # получаем статистику snapshot
rc, raw = await run_cmd( rc, raw = await run_cmd(
["restic", "stats", snap_id, "--json"], ["restic", "stats", snap_id, "--json"],
use_restic_env=True, use_restic_env=True,

View File

@@ -85,7 +85,7 @@ async def ds_cmd(msg: Message):
await cmd_docker_status(msg) await cmd_docker_status(msg)
@dp.message(F.text.startswith("/docker_health")) @dp.message(F.text, F.func(lambda m: (m.text or "").split()[0] == "/docker_health"))
async def docker_health(msg: Message): async def docker_health(msg: Message):
if not is_admin_msg(msg): if not is_admin_msg(msg):
return return
@@ -124,6 +124,37 @@ async def docker_health(msg: Message):
log_incident(cfg, f"docker_health alias={alias} by {msg.from_user.id}", category="docker") log_incident(cfg, f"docker_health alias={alias} by {msg.from_user.id}", category="docker")
@dp.message(F.text == "/docker_health_summary")
async def docker_health_summary(msg: Message):
if not is_admin_msg(msg):
return
if not DOCKER_MAP:
await msg.answer("⚠️ DOCKER_MAP пуст", reply_markup=docker_kb)
return
problems = []
total = len(DOCKER_MAP)
for alias, real in DOCKER_MAP.items():
rc, out = await docker_cmd(["inspect", "-f", "{{json .State}}", real], timeout=10)
if rc != 0:
problems.append(f"{alias}: inspect error")
continue
try:
state = json.loads(out)
except Exception:
problems.append(f"{alias}: bad JSON")
continue
status = state.get("Status", "n/a")
health = (state.get("Health") or {}).get("Status", "n/a")
if status != "running" or health not in ("healthy", "none"):
problems.append(f"{alias}: {status}/{health}")
ok = total - len(problems)
lines = [f"🐳 Docker health: 🟢 {ok}/{total} healthy, 🔴 {len(problems)} issues"]
if problems:
lines.append("Problems:")
lines.extend([f"- {p}" for p in problems])
await msg.answer("\n".join(lines), reply_markup=docker_kb)
@dp.message(F.text == "📈 Stats") @dp.message(F.text == "📈 Stats")
async def dstats(msg: Message): async def dstats(msg: Message):
if not is_admin_msg(msg): if not is_admin_msg(msg):

View File

@@ -24,7 +24,7 @@ HELP_PAGES = [
"• `/alerts mute <cat> <minutes>` / `/alerts unmute <cat>` / `/alerts list`\n" "• `/alerts mute <cat> <minutes>` / `/alerts unmute <cat>` / `/alerts list`\n"
"• `/alerts recent [hours]`\n" "• `/alerts recent [hours]`\n"
"Шорткаты: `/alerts_list`, `/alerts_recent`, `/alerts_mute_load` (60м).\n" "Шорткаты: `/alerts_list`, `/alerts_recent`, `/alerts_mute_load` (60м).\n"
"Категории: load, disk, smart, ssl, docker, test.\n" "Категории: load, disk, smart, raid, ssl, docker, test.\n"
"Quiet hours: `alerts.quiet_hours` для не‑критичных.\n" "Quiet hours: `alerts.quiet_hours` для не‑критичных.\n"
"Авто-мьют: `alerts.auto_mute` со слотами времени.\n" "Авто-мьют: `alerts.auto_mute` со слотами времени.\n"
"Только красные load: `alerts.load_only_critical: true`.\n" "Только красные load: `alerts.load_only_critical: true`.\n"
@@ -56,8 +56,14 @@ HELP_PAGES = [
"🛠 **Admin & Deploy**\n\n" "🛠 **Admin & Deploy**\n\n"
"Config: `/config_check`, файл `config.yaml` (см. config.example.yaml).\n" "Config: `/config_check`, файл `config.yaml` (см. config.example.yaml).\n"
"Deploy: `deploy.sh` (ssh 10.10.10.10:1090 → git pull → systemctl restart tg-bot).\n" "Deploy: `deploy.sh` (ssh 10.10.10.10:1090 → git pull → systemctl restart tg-bot).\n"
"Incidents summary: `/incidents_summary`.\n" "Incidents: `/incidents_summary`, `/incidents_diff [hours]`.\n"
"Export: `/incidents_export [hours] [csv|json]`, `/export_all [hours]` (zip).\n"
"Alerts log/heatmap: `/alerts_log [hours]`, `/alerts_heatmap [hours] [cat]`.\n"
"Backup SLA: `/backup_sla`; Docker restarts: `/docker_restarts [hours]`.\n"
"Disk snapshot: `/disk_snapshot`.\n" "Disk snapshot: `/disk_snapshot`.\n"
"Queue: `/queue_history`, `/queue_sla`.\n"
"Self-test history: `/selftest_history`.\n"
"OpenWrt leases diff: `/openwrt_leases_diff`.\n"
"BotFather list: `/botfather_list`.\n" "BotFather list: `/botfather_list`.\n"
"Безопасность: `safety.dry_run: true` блокирует опасные действия.\n" "Безопасность: `safety.dry_run: true` блокирует опасные действия.\n"
"OpenWrt: кнопка в System → Info.", "OpenWrt: кнопка в System → Info.",
@@ -124,15 +130,28 @@ alerts - Manage alerts
alerts_list - List active mutes alerts_list - List active mutes
alerts_recent - Show recent incidents (24h) alerts_recent - Show recent incidents (24h)
alerts_mute_load - Mute load alerts for 60m alerts_mute_load - Mute load alerts for 60m
alerts_log - Show suppressed alerts
alerts_heatmap - Hourly incidents heatmap
backup_run - Run backup (queued) backup_run - Run backup (queued)
backup_history - Show backup log tail backup_history - Show backup log tail
queue_history - Show queue recent jobs
queue_sla - Queue SLA stats
docker_status - Docker summary docker_status - Docker summary
docker_health - Docker inspect/health by alias docker_health - Docker inspect/health by alias
docker_health_summary - Docker health summary (problems only)
openwrt - Full OpenWrt status openwrt - Full OpenWrt status
openwrt_wan - OpenWrt WAN only openwrt_wan - OpenWrt WAN only
openwrt_clients - OpenWrt wifi clients openwrt_clients - OpenWrt wifi clients
openwrt_leases - OpenWrt DHCP leases openwrt_leases - OpenWrt DHCP leases
openwrt_fast - OpenWrt quick WAN view
openwrt_leases_diff - OpenWrt DHCP diff
incidents_summary - Incidents counters (24h/7d) incidents_summary - Incidents counters (24h/7d)
incidents_export - Export incidents (hours fmt)
incidents_diff - Show incidents since last check
export_all - Zip with incidents/queue/selftest
backup_sla - Backup SLA check
docker_restarts - Docker restart history
selftest_history - Self-test history
disk_snapshot - Disk usage snapshot disk_snapshot - Disk usage snapshot
config_check - Validate config config_check - Validate config
""" """

View File

@@ -126,7 +126,7 @@ async def selftest(msg: Message):
await msg.answer("⏳ Self-test…", reply_markup=menu_kb) await msg.answer("⏳ Self-test…", reply_markup=menu_kb)
async def worker(): async def worker():
text = await run_selftest(cfg, DOCKER_MAP) text, _ok = await run_selftest(cfg, DOCKER_MAP)
await msg.answer(text, reply_markup=menu_kb) await msg.answer(text, reply_markup=menu_kb)
asyncio.create_task(worker()) asyncio.create_task(worker())

View File

@@ -1,8 +1,9 @@
import asyncio import asyncio
import os import os
from datetime import datetime, timezone, timedelta
from aiogram import F from aiogram import F
from aiogram.types import Message, CallbackQuery, InlineKeyboardMarkup, InlineKeyboardButton from aiogram.types import Message, CallbackQuery, InlineKeyboardMarkup, InlineKeyboardButton, InputFile, BufferedInputFile
from app import dp, cfg from app import dp, cfg, ADMIN_IDS
from auth import is_admin_msg from auth import is_admin_msg
from keyboards import ( from keyboards import (
system_info_kb, system_info_kb,
@@ -10,23 +11,31 @@ from keyboards import (
system_logs_audit_kb, system_logs_audit_kb,
system_logs_security_kb, system_logs_security_kb,
system_logs_integrations_kb, system_logs_integrations_kb,
system_logs_kb,
openwrt_kb,
docker_kb, backup_kb,
) )
from system_checks import security, disks, hardware, list_disks, smart_last_test from system_checks import security, disks, hardware, list_disks, smart_last_test
from services.http_checks import get_url_checks, check_url from services.http_checks import get_url_checks, check_url
from services.queue import enqueue from services.queue import enqueue, get_history_raw, get_stats
from services.updates import list_updates, apply_updates from services.updates import list_updates, apply_updates
from services.runner import run_cmd from services.runner import run_cmd, run_cmd_full
from services.npmplus import fetch_certificates, format_certificates, list_proxy_hosts, set_proxy_host from services.npmplus import fetch_certificates, format_certificates, list_proxy_hosts, set_proxy_host
from services.gitea import get_gitea_health from services.gitea import get_gitea_health
from services.openwrt import get_openwrt_status from services.openwrt import get_openwrt_status, fetch_openwrt_leases
from services.system import worst_disk_usage from services.system import worst_disk_usage
import state import state
from state import UPDATES_CACHE, REBOOT_PENDING from state import UPDATES_CACHE, REBOOT_PENDING
from services.metrics import summarize from services.metrics import summarize
from services.audit import read_audit_tail from services.audit import read_audit_tail
from services.incidents import read_recent, incidents_path from services.incidents import read_recent, incidents_path, read_raw, infer_category
from services.external_checks import format_report from services.external_checks import format_report
from services.disk_report import build_disk_report from services.disk_report import build_disk_report
from services import runtime_state
import io
import json
import csv
import zipfile
@dp.message(F.text == "💽 Disks") @dp.message(F.text == "💽 Disks")
@@ -197,74 +206,91 @@ async def smart_status(msg: Message):
await msg.answer("\n".join(lines), reply_markup=system_info_kb) await msg.answer("\n".join(lines), reply_markup=system_info_kb)
@dp.message(F.text == "📡 OpenWrt") @dp.message(F.text.in_({"/openwrt", "📡 Full status"}))
async def openwrt_status(msg: Message): async def openwrt_status(msg: Message):
if not is_admin_msg(msg): if not is_admin_msg(msg):
return return
await msg.answer("⏳ Checking OpenWrt…", reply_markup=system_info_kb) await msg.answer("⏳ Checking OpenWrt…", reply_markup=openwrt_kb)
async def worker(): async def worker():
try: try:
text = await get_openwrt_status(cfg) text = await get_openwrt_status(cfg)
except Exception as e: except Exception as e:
text = f"⚠️ OpenWrt error: {e}" text = f"⚠️ OpenWrt error: {e}"
await msg.answer(text, reply_markup=system_info_kb) await msg.answer(text, reply_markup=openwrt_kb)
asyncio.create_task(worker()) asyncio.create_task(worker())
@dp.message(F.text == "/openwrt")
async def openwrt_cmd(msg: Message):
if not is_admin_msg(msg):
return
await openwrt_status(msg)
@dp.message(F.text == "/openwrt_wan") @dp.message(F.text == "/openwrt_wan")
async def openwrt_wan(msg: Message): async def openwrt_wan(msg: Message):
if not is_admin_msg(msg): if not is_admin_msg(msg):
return return
await msg.answer("⏳ Checking OpenWrt WAN…", reply_markup=system_info_kb) await msg.answer("⏳ Checking OpenWrt WAN…", reply_markup=openwrt_kb)
async def worker(): async def worker():
try: try:
text = await get_openwrt_status(cfg, mode="wan") text = await get_openwrt_status(cfg, mode="wan")
except Exception as e: except Exception as e:
text = f"⚠️ OpenWrt error: {e}" text = f"⚠️ OpenWrt error: {e}"
await msg.answer(text, reply_markup=system_info_kb) await msg.answer(text, reply_markup=openwrt_kb)
asyncio.create_task(worker()) asyncio.create_task(worker())
@dp.message(F.text == "/openwrt_clients") @dp.message(F.text == "📡 OpenWrt")
async def openwrt_menu(msg: Message):
if not is_admin_msg(msg):
return
await msg.answer("📡 OpenWrt menu", reply_markup=openwrt_kb)
@dp.message(F.text.in_({"/openwrt_clients", "📶 Wi-Fi clients"}))
async def openwrt_clients(msg: Message): async def openwrt_clients(msg: Message):
if not is_admin_msg(msg): if not is_admin_msg(msg):
return return
await msg.answer("⏳ Checking OpenWrt clients…", reply_markup=system_info_kb) await msg.answer("⏳ Checking OpenWrt clients…", reply_markup=openwrt_kb)
async def worker(): async def worker():
try: try:
text = await get_openwrt_status(cfg, mode="clients") text = await get_openwrt_status(cfg, mode="clients")
except Exception as e: except Exception as e:
text = f"⚠️ OpenWrt error: {e}" text = f"⚠️ OpenWrt error: {e}"
await msg.answer(text, reply_markup=system_info_kb) await msg.answer(text, reply_markup=openwrt_kb)
asyncio.create_task(worker()) asyncio.create_task(worker())
@dp.message(F.text == "/openwrt_leases") @dp.message(F.text.in_({"/openwrt_leases", "🧾 Leases"}))
async def openwrt_leases(msg: Message): async def openwrt_leases(msg: Message):
if not is_admin_msg(msg): if not is_admin_msg(msg):
return return
await msg.answer("⏳ Checking OpenWrt leases…", reply_markup=system_info_kb) await msg.answer("⏳ Checking OpenWrt leases…", reply_markup=openwrt_kb)
async def worker(): async def worker():
try: try:
text = await get_openwrt_status(cfg, mode="leases") text = await get_openwrt_status(cfg, mode="leases")
except Exception as e: except Exception as e:
text = f"⚠️ OpenWrt error: {e}" text = f"⚠️ OpenWrt error: {e}"
await msg.answer(text, reply_markup=system_info_kb) await msg.answer(text, reply_markup=openwrt_kb)
asyncio.create_task(worker())
@dp.message(F.text == "/openwrt_fast")
@dp.message(F.text == "🌐 WAN fast")
async def openwrt_fast(msg: Message):
if not is_admin_msg(msg):
return
await msg.answer("⏳ OpenWrt fast…", reply_markup=openwrt_kb)
async def worker():
try:
text = await get_openwrt_status(cfg, mode="wan")
except Exception as e:
text = f"⚠️ OpenWrt error: {e}"
await msg.answer(text, reply_markup=openwrt_kb)
asyncio.create_task(worker()) asyncio.create_task(worker())
@@ -310,32 +336,186 @@ async def incidents(msg: Message):
await msg.answer(text, reply_markup=system_logs_audit_kb, parse_mode="Markdown") await msg.answer(text, reply_markup=system_logs_audit_kb, parse_mode="Markdown")
@dp.message(F.text == "/incidents_summary") @dp.message(F.text == "🧾 Incidents")
async def incidents_entry(msg: Message):
if not is_admin_msg(msg):
return
await msg.answer(
"📣 Incidents menu:\n"
"- Summary\n- Diff\n- Heatmap\n- Export/All\n- Alerts log",
reply_markup=system_logs_audit_kb,
)
@dp.message(F.text.in_({"/incidents_summary", "📣 Summary"}))
async def incidents_summary(msg: Message): async def incidents_summary(msg: Message):
if not is_admin_msg(msg): if not is_admin_msg(msg):
return return
last_24h = read_recent(cfg, hours=24, limit=2000) last_24h = read_raw(cfg, hours=24, limit=2000)
last_7d = read_recent(cfg, hours=24 * 7, limit=4000) last_7d = read_raw(cfg, hours=24 * 7, limit=4000)
def count(lines): def summarize(items):
import re total = len(items)
total = len(lines)
cats = {} cats = {}
for line in lines: suppressed = {}
m = re.search(r"category=([A-Za-z0-9_-]+)", line) last_seen = {}
if m: for dt, msg in items:
cats[m.group(1)] = cats.get(m.group(1), 0) + 1 cat = infer_category(msg) or "n/a"
top = ", ".join(f"{k}:{v}" for k, v in sorted(cats.items(), key=lambda x: x[1], reverse=True)[:5]) or "n/a" cats[cat] = cats.get(cat, 0) + 1
return total, top last_seen[cat] = dt
if "[suppressed" in msg.lower():
suppressed[cat] = suppressed.get(cat, 0) + 1
def fmt_top(d):
return ", ".join(f"{k}:{v}" for k, v in sorted(d.items(), key=lambda x: x[1], reverse=True)[:5]) or "n/a"
top = fmt_top(cats)
top_supp = fmt_top(suppressed)
last_parts = []
for k, dt in sorted(last_seen.items(), key=lambda x: x[1], reverse=True)[:5]:
last_parts.append(f"{k}:{dt.astimezone().strftime('%Y-%m-%d %H:%M')}")
last_str = ", ".join(last_parts) or "n/a"
return total, top, top_supp, last_str
t24, top24 = count(last_24h) t24, top24, supp24, last24 = summarize(last_24h)
t7, top7 = count(last_7d) t7, top7, supp7, last7 = summarize(last_7d)
text = ( text = (
"📣 Incidents summary\n\n" "📣 Incidents summary\n\n"
f"24h: {t24} (top: {top24})\n" f"24h: {t24} (top: {top24}; suppressed: {supp24}; last: {last24})\n"
f"7d: {t7} (top: {top7})" f"7d: {t7} (top: {top7}; suppressed: {supp7}; last: {last7})"
) )
await msg.answer(text, reply_markup=system_logs_audit_kb) await msg.answer(text, reply_markup=system_logs_kb)
@dp.message(F.text.startswith("/incidents_diff"))
@dp.message(F.text == "🆕 Diff")
async def incidents_diff(msg: Message):
if not is_admin_msg(msg):
return
try:
parts = msg.text.split()
hours = 24
reset = False
if len(parts) >= 2:
if parts[1].lower() in {"reset", "clear"}:
reset = True
else:
try:
hours = max(1, int(parts[1]))
except ValueError:
hours = 24
if reset:
runtime_state.set_state("incidents_diff_last_ts", None)
await msg.answer(
"📣 Diff marker reset. Next run will show all within window.",
reply_markup=system_logs_audit_kb,
)
return
last_iso = runtime_state.get("incidents_diff_last_ts")
last_dt = None
if isinstance(last_iso, str):
try:
last_dt = datetime.fromisoformat(last_iso)
except Exception:
last_dt = None
rows = read_raw(cfg, hours=hours, limit=5000, include_old=True)
def collect(from_dt):
fresh: list[tuple[datetime, str]] = []
for dt, line in rows:
if from_dt and dt <= from_dt:
continue
fresh.append((dt, line))
return fresh
fresh = collect(last_dt)
# auto-reset if marker is ahead of all rows
if not fresh and last_dt and rows and last_dt >= rows[-1][0]:
last_dt = None
runtime_state.set_state("incidents_diff_last_ts", None)
fresh = collect(None)
def fmt_lines(items: list[tuple[datetime, str]], limit_chars: int = 3500) -> str:
buf = []
total_len = 0
for dt, line in items:
entry = f"{dt.astimezone().strftime('%m-%d %H:%M')} {line}"
if total_len + len(entry) + 1 > limit_chars:
break
buf.append(entry)
total_len += len(entry) + 1
return "\n".join(buf)
if not fresh:
note = f"since {last_dt.astimezone().strftime('%Y-%m-%d %H:%M')}" if last_dt else "for the period"
if rows:
sample = rows[-50:][::-1] # newest first
body = fmt_lines(sample)
await msg.answer(
f"📣 No new incidents {note} (window {hours}h).\nRecent sample:\n```\n{body}\n```",
reply_markup=system_logs_audit_kb,
parse_mode="Markdown",
)
else:
await msg.answer(
f"📣 No new incidents {note} (window {hours}h)",
reply_markup=system_logs_audit_kb,
)
return
fresh.sort(key=lambda x: x[0], reverse=True)
body = fmt_lines(fresh)
await msg.answer(
f"📣 New incidents (since last mark, window {hours}h): {len(fresh)}\n```\n{body}\n```",
reply_markup=system_logs_audit_kb,
parse_mode="Markdown",
)
try:
runtime_state.set_state("incidents_diff_last_ts", fresh[0][0].isoformat())
except Exception:
pass
except Exception as e:
await msg.answer(f"⚠️ Diff error: {type(e).__name__}: {e}", reply_markup=system_logs_audit_kb)
@dp.message(F.text.startswith("/alerts_heatmap"))
@dp.message(F.text == "🔥 Heatmap")
async def alerts_heatmap(msg: Message):
if not is_admin_msg(msg):
return
hours = 48
category = None
if msg.text.startswith("/alerts_heatmap"):
parts = msg.text.split()
for part in parts[1:]:
if part.isdigit():
hours = max(1, int(part))
else:
category = part.lower()
rows = read_raw(cfg, hours=hours, limit=8000, include_old=True)
buckets: dict[datetime, int] = {}
for dt, line in rows:
cat = infer_category(line) or "n/a"
if category and cat != category:
continue
local = dt.astimezone()
hour = local.replace(minute=0, second=0, microsecond=0)
buckets[hour] = buckets.get(hour, 0) + 1
if not buckets:
await msg.answer(f"⚠️ No incidents for heatmap (hours={hours}, category={category or 'any'})")
return
start = datetime.now().astimezone() - timedelta(hours=hours - 1)
timeline = []
current = start.replace(minute=0, second=0, microsecond=0)
while current <= datetime.now().astimezone():
timeline.append((current, buckets.get(current, 0)))
current += timedelta(hours=1)
max_count = max(c for _, c in timeline) or 1
lines = [f"📊 Alerts heatmap (hours={hours}, category={category or 'any'})"]
for ts, cnt in timeline[-72:]:
bar_len = max(1, int(cnt / max_count * 12)) if cnt else 0
bar = "" * bar_len if cnt else ""
lines.append(f"{ts:%m-%d %H}: {cnt:>3} {bar}")
await msg.answer("\n".join(lines), reply_markup=system_logs_audit_kb)
@dp.message(F.text == "/disk_snapshot") @dp.message(F.text == "/disk_snapshot")
@@ -352,6 +532,316 @@ async def disk_snapshot(msg: Message):
await msg.answer(f"💽 Disk snapshot ({mount})\n\n{report}", reply_markup=system_info_kb) await msg.answer(f"💽 Disk snapshot ({mount})\n\n{report}", reply_markup=system_info_kb)
@dp.message(F.text.startswith("/alerts_log"))
async def alerts_log(msg: Message):
if not is_admin_msg(msg):
return
parts = msg.text.split()
hours = 24
if len(parts) >= 2:
try:
hours = max(1, int(parts[1]))
except ValueError:
hours = 24
rows = read_raw(cfg, hours=hours, limit=2000)
suppressed = [(dt, m) for dt, m in rows if "[suppressed" in m.lower()]
sent = [(dt, m) for dt, m in rows if "[suppressed" not in m.lower()]
lines = [f"📣 Alerts log ({hours}h)"]
lines.append(f"Sent: {len(sent)}, Suppressed: {len(suppressed)}")
if suppressed:
lines.append("\nSuppressed:")
for dt, m in suppressed[-20:]:
lines.append(f"{dt:%m-%d %H:%M} {m}")
await msg.answer("\n".join(lines), reply_markup=system_logs_audit_kb)
@dp.message(F.text.startswith("/incidents_export"))
@dp.message(F.text == "📤 Export")
async def incidents_export(msg: Message):
if not is_admin_msg(msg):
return
parts = msg.text.split()
hours = 24
fmt = "csv"
if len(parts) >= 2:
try:
hours = max(1, int(parts[1]))
except ValueError:
hours = 24
if len(parts) >= 3:
fmt = parts[2].lower()
rows = read_raw(cfg, hours=hours, limit=20000, include_old=False)
data = []
for dt, msg_line in rows:
data.append({
"timestamp": dt.astimezone().isoformat(),
"category": infer_category(msg_line) or "n/a",
"message": msg_line,
})
if fmt == "json":
payload = json.dumps(data, ensure_ascii=False, indent=2)
file_bytes = payload.encode("utf-8")
fname = f"incidents_{hours}h.json"
else:
sio = io.StringIO()
writer = csv.DictWriter(sio, fieldnames=["timestamp", "category", "message"], delimiter=";")
writer.writeheader()
for row in data:
writer.writerow(row)
file_bytes = sio.getvalue().encode("utf-8")
fname = f"incidents_{hours}h.csv"
summary = f"📤 Incidents export ({hours}h): {len(data)} rows, format {fmt}"
await msg.answer(summary)
await msg.answer_document(document=BufferedInputFile(file_bytes, filename=fname))
@dp.message(F.text.in_({"/backup_sla", "📉 Backup SLA", "Backup SLA"}))
@dp.message(F.text.contains("Backup SLA"))
@dp.message(F.text.regexp(r"(?i)backup.*sla|sla.*backup"))
@dp.message(F.text.func(lambda t: isinstance(t, str) and "backup" in t.lower() and "sla" in t.lower()))
async def backup_sla(msg: Message):
if not is_admin_msg(msg):
return
await msg.answer("⏳ Checking Backup SLA…", reply_markup=backup_kb)
rc, out = await run_cmd_full(["restic", "snapshots", "--json"], use_restic_env=True, timeout=40)
if rc != 0:
await msg.answer(f"⚠️ Restic error: {out.strip() or rc}", reply_markup=backup_kb)
return
try:
snaps = json.loads(out)
except Exception as e:
await msg.answer(f"⚠️ Invalid restic JSON: {e}", reply_markup=backup_kb)
return
if not isinstance(snaps, list) or not snaps:
await msg.answer("⚠️ No snapshots found", reply_markup=backup_kb)
return
snaps.sort(key=lambda s: s.get("time", ""), reverse=True)
last_time_raw = snaps[0].get("time")
if last_time_raw:
last_dt = datetime.fromisoformat(last_time_raw.replace("Z", "+00:00"))
age_h = (datetime.now(tz=last_dt.tzinfo) - last_dt).total_seconds() / 3600
else:
last_dt = None
age_h = None
sla = float(cfg.get("backup", {}).get("sla_hours", 26))
if age_h is None:
status = "🟡"
elif age_h <= sla:
status = "🟢"
elif age_h <= sla * 1.5:
status = "🟡"
else:
status = "🔴"
last_str = last_dt.astimezone().strftime("%Y-%m-%d %H:%M") if last_dt else "n/a"
age_str = f"{age_h:.1f}h" if age_h is not None else "n/a"
await msg.answer(
f"{status} Backup SLA\n"
f"Snapshots: {len(snaps)}\n"
f"Last: {last_str} (age {age_str})\n"
f"SLA: {sla}h",
reply_markup=backup_kb,
)
@dp.message(F.text.startswith("/docker_restarts"))
@dp.message(F.text == "♻️ Restarts")
async def docker_restarts(msg: Message):
if not is_admin_msg(msg):
return
parts = msg.text.split()
hours = 168
if len(parts) >= 2:
try:
hours = max(1, int(parts[1]))
except ValueError:
hours = 168
rows = read_raw(cfg, hours=hours, limit=5000, include_old=True)
restarts = [(dt, line) for dt, line in rows if "docker_restart" in line]
if not restarts:
await msg.answer(f"🐳 No docker restarts in last {hours}h", reply_markup=docker_kb)
return
counts: dict[str, int] = {}
for _dt, line in restarts:
parts_line = line.split()
if len(parts_line) >= 2:
alias = parts_line[-1]
else:
alias = "unknown"
counts[alias] = counts.get(alias, 0) + 1
top = ", ".join(f"{k}:{v}" for k, v in sorted(counts.items(), key=lambda x: x[1], reverse=True))
body = "\n".join(f"{dt.astimezone().strftime('%m-%d %H:%M')} {line}" for dt, line in restarts[-50:])
await msg.answer(
f"🐳 Docker restarts ({hours}h): {len(restarts)} ({top})\n```\n{body}\n```",
reply_markup=docker_kb,
parse_mode="Markdown",
)
@dp.message(F.text.startswith("/openwrt_leases_diff"))
@dp.message(F.text == "🔀 Leases diff")
async def openwrt_leases_diff(msg: Message):
if not is_admin_msg(msg):
return
await msg.answer("⏳ OpenWrt leases diff…", reply_markup=openwrt_kb)
async def worker():
try:
leases_now = await fetch_openwrt_leases(cfg)
except Exception as e:
await msg.answer(f"⚠️ OpenWrt error: {e}", reply_markup=openwrt_kb)
return
prev = runtime_state.get("openwrt_leases_prev", [])
if not prev:
runtime_state.set_state("openwrt_leases_prev", leases_now)
await msg.answer(f"Baseline saved ({len(leases_now)} leases)", reply_markup=openwrt_kb)
return
prev_set = set(prev)
now_set = set(leases_now)
added = sorted(list(now_set - prev_set))
removed = sorted(list(prev_set - now_set))
lines = [f"🧾 OpenWrt leases diff ({len(added)} added, {len(removed)} removed)"]
if added:
lines.append(" Added:")
lines.extend([f" - {x}" for x in added[:50]])
if removed:
lines.append(" Removed:")
lines.extend([f" - {x}" for x in removed[:50]])
if len(added) > 50 or len(removed) > 50:
lines.append("… trimmed")
runtime_state.set_state("openwrt_leases_prev", leases_now)
await msg.answer("\n".join(lines), reply_markup=openwrt_kb)
asyncio.create_task(worker())
@dp.message(F.text.in_({"/queue_sla", "📊 Queue SLA", "Queue SLA"}))
@dp.message(F.text.contains("Queue SLA"))
@dp.message(F.text.regexp(r"(?i)queue.*sla|sla.*queue"))
@dp.message(F.text.func(lambda t: isinstance(t, str) and "queue" in t.lower() and "sla" in t.lower()))
async def queue_sla(msg: Message):
if not is_admin_msg(msg):
return
await msg.answer("⏳ Calculating Queue SLA…", reply_markup=backup_kb)
hist = get_history_raw()
if not hist:
await msg.answer("🧾 Queue SLA: history is empty", reply_markup=backup_kb)
return
waits = [item.get("wait_sec", 0) for item in hist]
runs = [item.get("runtime_sec", 0) for item in hist]
def p95(values: list[int]) -> float:
if not values:
return 0.0
vals = sorted(values)
k = int(0.95 * (len(vals) - 1))
return float(vals[k])
lines = [
"🧾 Queue SLA",
f"Samples: {len(hist)}",
f"Max wait: {max(waits)}s, p95 {p95(waits):.1f}s",
f"Max run: {max(runs)}s, p95 {p95(runs):.1f}s",
]
stats = get_stats()
if stats:
lines.append(
f"Avg wait: {stats.get('avg_wait_sec', 0):.1f}s, "
f"avg run: {stats.get('avg_runtime_sec', 0):.1f}s"
)
await msg.answer("\n".join(lines), reply_markup=backup_kb)
# Fallback router: any message with "sla" is dispatched to backup or queue SLA.
@dp.message(F.text.regexp(r"(?i)sla"))
async def sla_fallback(msg: Message):
if not is_admin_msg(msg):
return
text = msg.text or ""
tl = text.lower()
if "queue" in tl:
await queue_sla(msg)
elif "backup" in tl:
await backup_sla(msg)
@dp.message(F.text == "/selftest_history")
async def selftest_history(msg: Message):
if not is_admin_msg(msg):
return
hist = runtime_state.get("selftest_history", [])
if not hist:
await msg.answer("🧪 Self-test history is empty", reply_markup=system_logs_audit_kb)
return
lines = ["🧪 Self-test history"]
for entry in hist[:15]:
ts = entry.get("ts", "")[:16]
ok = entry.get("ok", False)
emoji = "🟢" if ok else "🔴"
summary = entry.get("summary", "")
lines.append(f"{emoji} {ts} {summary}")
await msg.answer("\n".join(lines), reply_markup=system_logs_audit_kb)
@dp.message(F.text.startswith("/export_all"))
@dp.message(F.text == "📦 Export all")
async def export_all(msg: Message):
if not is_admin_msg(msg):
return
hours = 168
parts = msg.text.split()
if len(parts) >= 2:
try:
hours = max(1, int(parts[1]))
except ValueError:
hours = 168
incidents_rows = read_raw(cfg, hours=hours, limit=20000, include_old=True)
inc_sio = io.StringIO()
inc_writer = csv.writer(inc_sio, delimiter=";")
inc_writer.writerow(["timestamp", "category", "message"])
for dt, line in incidents_rows:
inc_writer.writerow([dt.astimezone().isoformat(), infer_category(line) or "n/a", line])
queue_rows = get_history_raw()
q_sio = io.StringIO()
q_writer = csv.writer(q_sio, delimiter=";")
q_writer.writerow(["finished_at", "label", "status", "wait_sec", "runtime_sec"])
for item in queue_rows:
q_writer.writerow(
[
datetime.fromtimestamp(item.get("finished_at", 0)).isoformat()
if item.get("finished_at")
else "",
item.get("label", ""),
item.get("status", ""),
item.get("wait_sec", ""),
item.get("runtime_sec", ""),
]
)
st_hist = runtime_state.get("selftest_history", [])
st_sio = io.StringIO()
st_writer = csv.writer(st_sio, delimiter=";")
st_writer.writerow(["timestamp", "ok", "summary"])
for entry in st_hist:
st_writer.writerow([entry.get("ts", ""), entry.get("ok", False), entry.get("summary", "")])
mem_zip = io.BytesIO()
with zipfile.ZipFile(mem_zip, "w", compression=zipfile.ZIP_DEFLATED) as zf:
zf.writestr(f"incidents_{hours}h.csv", inc_sio.getvalue())
zf.writestr("queue_history.csv", q_sio.getvalue())
zf.writestr("selftest_history.csv", st_sio.getvalue())
mem_zip.seek(0)
summary = (
f"📦 Export all\n"
f"Incidents: {len(incidents_rows)} rows ({hours}h)\n"
f"Queue history: {len(queue_rows)} rows\n"
f"Selftest: {len(st_hist)} rows"
)
await msg.answer(summary)
await msg.answer_document(document=BufferedInputFile(mem_zip.read(), filename="export_all.zip"))
@dp.message(F.text == "🔒 SSL") @dp.message(F.text == "🔒 SSL")
async def ssl_certs(msg: Message): async def ssl_certs(msg: Message):
if not is_admin_msg(msg): if not is_admin_msg(msg):
@@ -532,7 +1022,7 @@ async def updates_page(cb: CallbackQuery):
@dp.callback_query(F.data == "upgrade:confirm") @dp.callback_query(F.data == "upgrade:confirm")
async def upgrade_confirm(cb: CallbackQuery): async def upgrade_confirm(cb: CallbackQuery):
if cb.from_user.id != cfg["telegram"]["admin_id"]: if cb.from_user.id not in ADMIN_IDS:
return return
await cb.answer() await cb.answer()
@@ -555,7 +1045,7 @@ async def upgrade_cancel(cb: CallbackQuery):
@dp.callback_query(F.data == "reboot:confirm") @dp.callback_query(F.data == "reboot:confirm")
async def reboot_confirm(cb: CallbackQuery): async def reboot_confirm(cb: CallbackQuery):
if cb.from_user.id != cfg["telegram"]["admin_id"]: if cb.from_user.id not in ADMIN_IDS:
return return
await cb.answer() await cb.answer()
REBOOT_PENDING[cb.from_user.id] = {} REBOOT_PENDING[cb.from_user.id] = {}
@@ -570,7 +1060,7 @@ async def reboot_cancel(cb: CallbackQuery):
@dp.callback_query(F.data.startswith("npmplus:")) @dp.callback_query(F.data.startswith("npmplus:"))
async def npmplus_toggle(cb: CallbackQuery): async def npmplus_toggle(cb: CallbackQuery):
if cb.from_user.id != cfg["telegram"]["admin_id"]: if cb.from_user.id not in ADMIN_IDS:
return return
parts = cb.data.split(":") parts = cb.data.split(":")
if len(parts) != 3: if len(parts) != 3:

View File

@@ -10,7 +10,7 @@ menu_kb = ReplyKeyboardMarkup(
keyboard=[ keyboard=[
[KeyboardButton(text="🩺 Health"), KeyboardButton(text="📊 Статус")], [KeyboardButton(text="🩺 Health"), KeyboardButton(text="📊 Статус")],
[KeyboardButton(text="🐳 Docker"), KeyboardButton(text="📦 Backup")], [KeyboardButton(text="🐳 Docker"), KeyboardButton(text="📦 Backup")],
[KeyboardButton(text="🧉 Artifacts"), KeyboardButton(text="⚙️ System")], [KeyboardButton(text="⚙️ System")],
[KeyboardButton(text=" Help")], [KeyboardButton(text=" Help")],
], ],
resize_keyboard=True, resize_keyboard=True,
@@ -20,7 +20,8 @@ docker_kb = ReplyKeyboardMarkup(
keyboard=[ keyboard=[
[KeyboardButton(text="🐳 Status"), KeyboardButton(text="🧰 Arcane")], [KeyboardButton(text="🐳 Status"), KeyboardButton(text="🧰 Arcane")],
[KeyboardButton(text="🔄 Restart"), KeyboardButton(text="📜 Logs")], [KeyboardButton(text="🔄 Restart"), KeyboardButton(text="📜 Logs")],
[KeyboardButton(text="📈 Stats"), KeyboardButton(text=" Назад")], [KeyboardButton(text="📈 Stats"), KeyboardButton(text=" Restarts")],
[KeyboardButton(text="⬅️ Назад")],
], ],
resize_keyboard=True, resize_keyboard=True,
) )
@@ -37,9 +38,8 @@ backup_kb = ReplyKeyboardMarkup(
keyboard=[ keyboard=[
[KeyboardButton(text="📦 Status"), KeyboardButton(text="📦 Last snapshot")], [KeyboardButton(text="📦 Status"), KeyboardButton(text="📦 Last snapshot")],
[KeyboardButton(text="📊 Repo stats"), KeyboardButton(text="🧯 Restore help")], [KeyboardButton(text="📊 Repo stats"), KeyboardButton(text="🧯 Restore help")],
[KeyboardButton(text="▶️ Run backup"), KeyboardButton(text="🧾 Queue")], [KeyboardButton(text="▶️ Run backup"), KeyboardButton(text="🧾 Queue"), KeyboardButton(text="📊 Queue SLA")],
[KeyboardButton(text="🧪 Restic check"), KeyboardButton(text="📬 Weekly report"), KeyboardButton(text="📜 History")], [KeyboardButton(text="📉 Backup SLA"), KeyboardButton(text="📜 History"), KeyboardButton(text="⬅️ Назад")],
[KeyboardButton(text="⬅️ Назад")],
], ],
resize_keyboard=True, resize_keyboard=True,
) )
@@ -84,6 +84,7 @@ system_logs_kb = ReplyKeyboardMarkup(
keyboard=[ keyboard=[
[KeyboardButton(text="🧾 Audit/Incidents"), KeyboardButton(text="🔒 Security")], [KeyboardButton(text="🧾 Audit/Incidents"), KeyboardButton(text="🔒 Security")],
[KeyboardButton(text="🧩 Integrations"), KeyboardButton(text="🧰 Processes")], [KeyboardButton(text="🧩 Integrations"), KeyboardButton(text="🧰 Processes")],
[KeyboardButton(text="📣 Summary"), KeyboardButton(text="🔥 Heatmap")],
[KeyboardButton(text="⬅️ System")], [KeyboardButton(text="⬅️ System")],
], ],
resize_keyboard=True, resize_keyboard=True,
@@ -92,6 +93,8 @@ system_logs_kb = ReplyKeyboardMarkup(
system_logs_audit_kb = ReplyKeyboardMarkup( system_logs_audit_kb = ReplyKeyboardMarkup(
keyboard=[ keyboard=[
[KeyboardButton(text="🧾 Audit"), KeyboardButton(text="📣 Incidents")], [KeyboardButton(text="🧾 Audit"), KeyboardButton(text="📣 Incidents")],
[KeyboardButton(text="🆕 Diff"), KeyboardButton(text="📤 Export")],
[KeyboardButton(text="📦 Export all"), KeyboardButton(text="🧰 Alerts log")],
[KeyboardButton(text="⬅️ Logs")], [KeyboardButton(text="⬅️ Logs")],
], ],
resize_keyboard=True, resize_keyboard=True,
@@ -122,6 +125,17 @@ system_logs_tools_kb = ReplyKeyboardMarkup(
resize_keyboard=True, resize_keyboard=True,
) )
# OpenWrt submenu (4 ряда)
openwrt_kb = ReplyKeyboardMarkup(
keyboard=[
[KeyboardButton(text="🌐 WAN fast"), KeyboardButton(text="📡 Full status")],
[KeyboardButton(text="📶 Wi-Fi clients"), KeyboardButton(text="🧾 Leases")],
[KeyboardButton(text="🔀 Leases diff")],
[KeyboardButton(text="⬅️ System")],
],
resize_keyboard=True,
)
def docker_inline_kb(action: str) -> InlineKeyboardMarkup: def docker_inline_kb(action: str) -> InlineKeyboardMarkup:
rows = [] rows = []

View File

@@ -1,4 +1,5 @@
from pathlib import Path from pathlib import Path
import os
import time import time
LOCK_DIR = Path("/var/run/tg-bot") LOCK_DIR = Path("/var/run/tg-bot")
@@ -11,9 +12,14 @@ def lock_path(name: str) -> Path:
def acquire_lock(name: str) -> bool: def acquire_lock(name: str) -> bool:
p = lock_path(name) p = lock_path(name)
if p.exists(): try:
fd = os.open(str(p), os.O_CREAT | os.O_EXCL | os.O_WRONLY)
except FileExistsError:
return False return False
p.write_text(str(time.time())) try:
os.write(fd, str(time.time()).encode("ascii", errors="ignore"))
finally:
os.close(fd)
return True return True

23
main.py
View File

@@ -5,9 +5,9 @@ from datetime import datetime
from app import bot, dp, cfg, ADMIN_ID, ADMIN_IDS from app import bot, dp, cfg, ADMIN_ID, ADMIN_IDS
from keyboards import menu_kb from keyboards import menu_kb
from services.docker import discover_containers, docker_watchdog from services.docker import discover_containers, docker_watchdog
from services.alerts import monitor_resources, monitor_smart from services.alerts import monitor_resources, monitor_smart, monitor_raid
from services.metrics import MetricsStore, start_sampler from services.metrics import MetricsStore, start_sampler
from services.queue import worker as queue_worker from services.queue import worker as queue_worker, configure as queue_configure
from services.notify import notify from services.notify import notify
from services.audit import AuditMiddleware, audit_start from services.audit import AuditMiddleware, audit_start
from services.ssl_alerts import monitor_ssl from services.ssl_alerts import monitor_ssl
@@ -38,6 +38,22 @@ def _handle_async_exception(_loop, context):
text = f"{msg}: {type(exc).__name__}: {exc}" text = f"{msg}: {type(exc).__name__}: {exc}"
else: else:
text = f"{msg}" text = f"{msg}"
now = datetime.now()
if not hasattr(_handle_async_exception, "_recent"):
_handle_async_exception._recent = []
_handle_async_exception._last_alert = None
recent = _handle_async_exception._recent
recent.append(now)
# keep last hour
_handle_async_exception._recent = [t for t in recent if (now - t).total_seconds() < 3600]
if len(_handle_async_exception._recent) >= 3:
last_alert = getattr(_handle_async_exception, "_last_alert", None)
if not last_alert or (now - last_alert).total_seconds() > 3600:
try:
log_incident(cfg, "exception_flood", category="system")
except Exception:
pass
_handle_async_exception._last_alert = now
try: try:
log_incident(cfg, text, category="system") log_incident(cfg, text, category="system")
except Exception: except Exception:
@@ -66,12 +82,15 @@ async def main():
asyncio.create_task(monitor_resources(cfg, notify, bot, ADMIN_ID)) asyncio.create_task(monitor_resources(cfg, notify, bot, ADMIN_ID))
if cfg.get("alerts", {}).get("smart_enabled", True): if cfg.get("alerts", {}).get("smart_enabled", True):
asyncio.create_task(monitor_smart(cfg, notify, bot, ADMIN_ID)) asyncio.create_task(monitor_smart(cfg, notify, bot, ADMIN_ID))
if cfg.get("alerts", {}).get("raid_enabled", True):
asyncio.create_task(monitor_raid(cfg, notify, bot, ADMIN_ID))
if cfg.get("npmplus", {}).get("alerts", {}).get("enabled", True): if cfg.get("npmplus", {}).get("alerts", {}).get("enabled", True):
asyncio.create_task(monitor_ssl(cfg, notify, bot, ADMIN_ID)) asyncio.create_task(monitor_ssl(cfg, notify, bot, ADMIN_ID))
if cfg.get("external_checks", {}).get("enabled", True): if cfg.get("external_checks", {}).get("enabled", True):
asyncio.create_task(monitor_external(cfg)) asyncio.create_task(monitor_external(cfg))
state.METRICS_STORE = MetricsStore() state.METRICS_STORE = MetricsStore()
asyncio.create_task(start_sampler(state.METRICS_STORE, interval=5)) asyncio.create_task(start_sampler(state.METRICS_STORE, interval=5))
queue_configure(cfg.get("queue", {}), cfg)
asyncio.create_task(queue_worker()) asyncio.create_task(queue_worker())
asyncio.create_task(weekly_reporter(cfg, bot, ADMIN_IDS, state.DOCKER_MAP)) asyncio.create_task(weekly_reporter(cfg, bot, ADMIN_IDS, state.DOCKER_MAP))
asyncio.create_task(schedule_selftest(cfg, bot, ADMIN_IDS, state.DOCKER_MAP)) asyncio.create_task(schedule_selftest(cfg, bot, ADMIN_IDS, state.DOCKER_MAP))

View File

@@ -1,7 +1,7 @@
import asyncio import asyncio
import time import time
import psutil import psutil
from system_checks import list_disks, smart_health, disk_temperature from system_checks import list_disks, smart_health, disk_temperature, list_md_arrays, md_array_status
from services.system import worst_disk_usage from services.system import worst_disk_usage
from services.disk_report import build_disk_report from services.disk_report import build_disk_report
@@ -130,3 +130,54 @@ async def monitor_smart(cfg, notify, bot, chat_id):
continue continue
await asyncio.sleep(interval) await asyncio.sleep(interval)
async def monitor_raid(cfg, notify, bot, chat_id):
alerts_cfg = cfg.get("alerts", {})
interval = int(alerts_cfg.get("raid_interval_sec", 300))
cooldown = int(alerts_cfg.get("raid_cooldown_sec", 1800))
notify_recovery = bool(alerts_cfg.get("notify_recovery", True))
last_sent: dict[str, float] = {}
bad_state: dict[str, bool] = {}
while True:
now = time.time()
for dev in list_md_arrays():
status = md_array_status(dev)
lower = status.lower()
level = None
key_suffix = None
if "inactive" in lower:
level = "critical"
key_suffix = "inactive"
elif "degraded" in lower:
level = "warn"
key_suffix = "degraded"
if level:
if not bad_state.get(dev) or (now - last_sent.get(dev, 0.0) >= cooldown):
icon = "🔴" if level == "critical" else "🟡"
await notify(
bot,
chat_id,
f"{icon} RAID {dev}: {status}",
level=level,
key=f"raid_{key_suffix}:{dev}",
category="raid",
)
last_sent[dev] = now
bad_state[dev] = True
else:
if bad_state.get(dev) and notify_recovery:
await notify(
bot,
chat_id,
f"🟢 RAID {dev}: {status}",
level="info",
key=f"raid_ok:{dev}",
category="raid",
)
bad_state[dev] = False
await asyncio.sleep(interval)

View File

@@ -9,7 +9,9 @@ def validate_cfg(cfg: dict[str, Any]) -> Tuple[List[str], List[str]]:
tg = cfg.get("telegram", {}) tg = cfg.get("telegram", {})
if not tg.get("token"): if not tg.get("token"):
errors.append("telegram.token is missing") errors.append("telegram.token is missing")
if not tg.get("admin_id"): admin_ids = tg.get("admin_ids")
has_admin_ids = isinstance(admin_ids, list) and len(admin_ids) > 0
if not tg.get("admin_id") and not has_admin_ids:
errors.append("telegram.admin_id is missing") errors.append("telegram.admin_id is missing")
thresholds = cfg.get("thresholds", {}) thresholds = cfg.get("thresholds", {})

View File

@@ -1,11 +1,48 @@
import os import os
import re
from typing import Any from typing import Any
from services.runner import run_cmd from services.runner import run_cmd
def _top_dirs_cmd(path: str, limit: int) -> list[str]: def _top_dirs_cmd(path: str, limit: int) -> list[str]:
return ["bash", "-lc", f"du -xhd1 {path} 2>/dev/null | sort -h | tail -n {limit}"] _ = limit
return ["du", "-x", "-h", "-d", "1", path]
_SIZE_RE = re.compile(r"^\s*([0-9]+(?:\.[0-9]+)?)([KMGTP]?)(i?B?)?$", re.IGNORECASE)
def _size_to_bytes(value: str) -> float:
m = _SIZE_RE.match(value.strip())
if not m:
return -1.0
num = float(m.group(1))
unit = (m.group(2) or "").upper()
mul = {
"": 1,
"K": 1024,
"M": 1024**2,
"G": 1024**3,
"T": 1024**4,
"P": 1024**5,
}.get(unit, 1)
return num * mul
def _format_top_dirs(raw: str, limit: int) -> str:
rows: list[tuple[float, str]] = []
for line in raw.splitlines():
line = line.strip()
if not line:
continue
parts = line.split(maxsplit=1)
if len(parts) != 2:
continue
size, name = parts
rows.append((_size_to_bytes(size), f"{size}\t{name}"))
rows.sort(key=lambda x: x[0])
return "\n".join(line for _sz, line in rows[-max(1, limit):])
async def build_disk_report(cfg: dict[str, Any], mount: str, usage: int) -> str: async def build_disk_report(cfg: dict[str, Any], mount: str, usage: int) -> str:
@@ -15,24 +52,27 @@ async def build_disk_report(cfg: dict[str, Any], mount: str, usage: int) -> str:
rc, out = await run_cmd(_top_dirs_cmd(mount, limit), timeout=30) rc, out = await run_cmd(_top_dirs_cmd(mount, limit), timeout=30)
if rc == 0 and out.strip(): if rc == 0 and out.strip():
top_out = _format_top_dirs(out, limit)
lines.append("") lines.append("")
lines.append("Top directories:") lines.append("Top directories:")
lines.append(out.strip()) lines.append(top_out)
docker_dir = cfg.get("disk_report", {}).get("docker_dir", "/var/lib/docker") docker_dir = cfg.get("disk_report", {}).get("docker_dir", "/var/lib/docker")
if docker_dir and os.path.exists(docker_dir): if docker_dir and os.path.exists(docker_dir):
rc2, out2 = await run_cmd(_top_dirs_cmd(docker_dir, limit), timeout=30) rc2, out2 = await run_cmd(_top_dirs_cmd(docker_dir, limit), timeout=30)
if rc2 == 0 and out2.strip(): if rc2 == 0 and out2.strip():
top_out2 = _format_top_dirs(out2, limit)
lines.append("") lines.append("")
lines.append(f"Docker dir: {docker_dir}") lines.append(f"Docker dir: {docker_dir}")
lines.append(out2.strip()) lines.append(top_out2)
logs_dir = cfg.get("disk_report", {}).get("logs_dir", "/var/log") logs_dir = cfg.get("disk_report", {}).get("logs_dir", "/var/log")
if logs_dir and os.path.exists(logs_dir): if logs_dir and os.path.exists(logs_dir):
rc3, out3 = await run_cmd(_top_dirs_cmd(logs_dir, limit), timeout=30) rc3, out3 = await run_cmd(_top_dirs_cmd(logs_dir, limit), timeout=30)
if rc3 == 0 and out3.strip(): if rc3 == 0 and out3.strip():
top_out3 = _format_top_dirs(out3, limit)
lines.append("") lines.append("")
lines.append(f"Logs dir: {logs_dir}") lines.append(f"Logs dir: {logs_dir}")
lines.append(out3.strip()) lines.append(top_out3)
return "\n".join(lines) return "\n".join(lines)

View File

@@ -1,4 +1,4 @@
import os import os
import ssl import ssl
import subprocess import subprocess
import psutil import psutil
@@ -38,7 +38,9 @@ def _npm_api_base(cfg) -> str | None:
def health(cfg, container_map: dict | None = None) -> str: def health(cfg, container_map: dict | None = None) -> str:
lines = ["🩺 Health check\n"] lines = ["🩺 Health check\n"]
thresholds = cfg.get("thresholds", {})
disk_warn = int(thresholds.get("disk_warn", 80))
load_warn = float(thresholds.get("load_warn", 2.0))
try: try:
env = os.environ.copy() env = os.environ.copy()
env.update(RESTIC_ENV) env.update(RESTIC_ENV)
@@ -91,12 +93,13 @@ def health(cfg, container_map: dict | None = None) -> str:
usage, mount = worst_disk_usage() usage, mount = worst_disk_usage()
if usage is None: if usage is None:
lines.append("⚠️ Disk n/a") lines.append("⚠️ Disk n/a")
elif usage > cfg["thresholds"]["disk_warn"]: elif usage > disk_warn:
lines.append(f"🟡 Disk {usage}% ({mount})") lines.append(f"🟡 Disk {usage}% ({mount})")
else: else:
lines.append(f"🟢 Disk {usage}% ({mount})") lines.append(f"🟢 Disk {usage}% ({mount})")
load = psutil.getloadavg()[0] load = psutil.getloadavg()[0]
lines.append(f"{'🟢' if load < cfg['thresholds']['load_warn'] else '🟡'} Load {load}") lines.append(f"{'🟢' if load < load_warn else '🟡'} Load {load}")
return "\n".join(lines) return "\n".join(lines)

View File

@@ -4,6 +4,7 @@ from collections import deque
from datetime import datetime, timedelta, timezone from datetime import datetime, timedelta, timezone
from logging.handlers import TimedRotatingFileHandler from logging.handlers import TimedRotatingFileHandler
from typing import Any from typing import Any
from services import runtime_state
def _get_path(cfg: dict[str, Any]) -> str: def _get_path(cfg: dict[str, Any]) -> str:
@@ -65,6 +66,10 @@ def _parse_line(line: str) -> tuple[datetime | None, str]:
def read_recent(cfg: dict[str, Any], hours: int, limit: int = 200) -> list[str]: def read_recent(cfg: dict[str, Any], hours: int, limit: int = 200) -> list[str]:
return [f"{dt:%Y-%m-%d %H:%M} {msg}" for dt, msg in read_raw(cfg, hours, limit=limit)]
def read_raw(cfg: dict[str, Any], hours: int, limit: int = 200, *, include_old: bool = False) -> list[tuple[datetime, str]]:
path = _get_path(cfg) path = _get_path(cfg)
if not os.path.exists(path): if not os.path.exists(path):
return [] return []
@@ -74,7 +79,40 @@ def read_recent(cfg: dict[str, Any], hours: int, limit: int = 200) -> list[str]:
with open(path, "r", encoding="utf-8", errors="replace") as f: with open(path, "r", encoding="utf-8", errors="replace") as f:
for line in f: for line in f:
dt, msg = _parse_line(line.rstrip()) dt, msg = _parse_line(line.rstrip())
if dt is None or dt < since: if dt is None:
continue continue
lines.append(f"{dt:%Y-%m-%d %H:%M} {msg}") if not include_old and dt < since:
continue
lines.append((dt, msg))
return list(lines) return list(lines)
def infer_category(text: str) -> str | None:
lower = text.lower()
if "category=" in lower:
import re
m = re.search(r"category=([a-z0-9_-]+)", lower)
if m:
return m.group(1)
if "load" in lower:
return "load"
if "docker" in lower:
return "docker"
if "restic" in lower or "backup" in lower:
return "backup"
if "smart" in lower:
return "smart"
if "ssl" in lower or "cert" in lower:
return "ssl"
if "npmplus" in lower:
return "npmplus"
if "gitea" in lower:
return "gitea"
if "openwrt" in lower:
return "openwrt"
if "queue" in lower:
return "queue"
if "selftest" in lower:
return "selftest"
return None

View File

@@ -47,13 +47,21 @@ async def notify(
category: str | None = None, category: str | None = None,
): ):
alerts_cfg = cfg.get("alerts", {}) alerts_cfg = cfg.get("alerts", {})
suppressed_reason = None
if category and is_muted(category): if category and is_muted(category):
return suppressed_reason = "muted"
if category and is_auto_muted(cfg, category): elif category and is_auto_muted(cfg, category):
return suppressed_reason = "auto_mute"
if _in_quiet_hours(alerts_cfg): elif _in_quiet_hours(alerts_cfg):
allow_critical = bool(alerts_cfg.get("quiet_hours", {}).get("allow_critical", True)) allow_critical = bool(alerts_cfg.get("quiet_hours", {}).get("allow_critical", True))
if not (allow_critical and level == "critical"): if not (allow_critical and level == "critical"):
suppressed_reason = "quiet_hours"
if suppressed_reason:
try:
log_incident(cfg, f"[suppressed:{suppressed_reason}] {text}", category=category)
except Exception:
pass
return return
dedup_sec = int(alerts_cfg.get("notify_cooldown_sec", alerts_cfg.get("cooldown_sec", 900))) dedup_sec = int(alerts_cfg.get("notify_cooldown_sec", alerts_cfg.get("cooldown_sec", 900)))

View File

@@ -460,3 +460,45 @@ async def get_openwrt_status(cfg: dict[str, Any], mode: str = "full") -> str:
if mode == "leases": if mode == "leases":
return "\n".join(header + lease_section) return "\n".join(header + lease_section)
return "\n".join(header + wifi_section + lease_section) return "\n".join(header + wifi_section + lease_section)
async def fetch_openwrt_leases(cfg: dict[str, Any]) -> list[str]:
"""
Fetch DHCP leases as list of strings "IP hostname (MAC)".
"""
ow_cfg = cfg.get("openwrt", {})
host = ow_cfg.get("host")
user = ow_cfg.get("user", "root")
port = ow_cfg.get("port", 22)
identity_file = ow_cfg.get("identity_file")
timeout_sec = ow_cfg.get("timeout_sec", 8)
strict = ow_cfg.get("strict_host_key_checking", True)
if not host:
raise RuntimeError("OpenWrt host not configured")
ssh_cmd = [
"ssh",
"-o",
"BatchMode=yes",
"-o",
f"ConnectTimeout={timeout_sec}",
"-o",
"LogLevel=ERROR",
]
if not strict:
ssh_cmd += ["-o", "StrictHostKeyChecking=no", "-o", "UserKnownHostsFile=/dev/null"]
if identity_file:
ssh_cmd += ["-i", str(identity_file)]
ssh_cmd += ["-p", str(port), f"{user}@{host}"]
remote = "ubus call luci-rpc getDHCPLeases '{\"family\":4}' 2>/dev/null || cat /tmp/dhcp.leases"
rc, out = await run_cmd_full(ssh_cmd + ["sh", "-c", remote], timeout=timeout_sec + 10)
if rc == 124:
raise RuntimeError("timeout")
if rc != 0:
raise RuntimeError(out.strip() or f"ssh rc={rc}")
leases = _safe_json_load(out)
if leases:
return _extract_leases(leases)
return _parse_leases_fallback(out)

View File

@@ -1,8 +1,10 @@
import asyncio import asyncio
import logging
import time import time
from collections import deque from collections import deque
from typing import Awaitable, Callable, Any from typing import Awaitable, Callable, Any
from services import runtime_state from services import runtime_state
from services.incidents import log_incident
_queue: asyncio.Queue = asyncio.Queue() _queue: asyncio.Queue = asyncio.Queue()
@@ -17,6 +19,14 @@ _stats: dict[str, Any] = runtime_state.get("queue_stats", {}) or {
"last_finished_at": 0.0, "last_finished_at": 0.0,
} }
_history: deque[dict[str, Any]] = deque(runtime_state.get("queue_history", []) or [], maxlen=50) _history: deque[dict[str, Any]] = deque(runtime_state.get("queue_history", []) or [], maxlen=50)
_alert_cfg: dict[str, Any] = {
"max_pending": None,
"avg_wait": None,
"cooldown": 300,
"last_sent": 0.0,
}
_cfg: dict[str, Any] | None = None
_logger = logging.getLogger("queue")
def _save_stats(): def _save_stats():
@@ -24,10 +34,39 @@ def _save_stats():
runtime_state.set_state("queue_history", list(_history)) runtime_state.set_state("queue_history", list(_history))
def configure(queue_cfg: dict[str, Any], cfg: dict[str, Any]):
global _cfg
_cfg = cfg
_alert_cfg["max_pending"] = queue_cfg.get("max_pending_alert")
_alert_cfg["avg_wait"] = queue_cfg.get("avg_wait_alert")
_alert_cfg["cooldown"] = queue_cfg.get("cooldown_sec", 300)
def _check_congestion(pending_count: int, avg_wait: float | None):
max_pending = _alert_cfg.get("max_pending")
avg_wait_thr = _alert_cfg.get("avg_wait")
cooldown = _alert_cfg.get("cooldown", 300)
now = time.time()
if now - _alert_cfg.get("last_sent", 0) < cooldown:
return
reason = None
if max_pending and pending_count >= max_pending:
reason = f"pending={pending_count} >= {max_pending}"
if avg_wait_thr and avg_wait is not None and avg_wait >= avg_wait_thr:
reason = reason or f"avg_wait={avg_wait:.1f}s >= {avg_wait_thr}s"
if reason and _cfg:
try:
log_incident(_cfg, f"queue_congested {reason}", category="queue")
except Exception:
pass
_alert_cfg["last_sent"] = now
async def enqueue(label: str, job: Callable[[], Awaitable[None]]) -> int: async def enqueue(label: str, job: Callable[[], Awaitable[None]]) -> int:
enqueued_at = time.time() enqueued_at = time.time()
await _queue.put((label, job, enqueued_at)) await _queue.put((label, job, enqueued_at))
_pending.append((label, enqueued_at)) _pending.append((label, enqueued_at))
_check_congestion(len(_pending), None)
return len(_pending) return len(_pending)
@@ -48,8 +87,18 @@ async def worker():
status = "ok" status = "ok"
try: try:
await job() await job()
except Exception: except Exception as e:
status = "err" status = "err"
_logger.exception("Queue job failed: label=%s", label)
if _cfg:
try:
log_incident(
_cfg,
f"queue_job_failed label={label} error={type(e).__name__}: {e}",
category="queue",
)
except Exception:
pass
finally: finally:
finished_at = time.time() finished_at = time.time()
if _current_meta: if _current_meta:
@@ -75,6 +124,7 @@ async def worker():
} }
) )
_save_stats() _save_stats()
_check_congestion(len(_pending), _stats.get("avg_wait_sec"))
_current_label = None _current_label = None
_current_meta = None _current_meta = None
_queue.task_done() _queue.task_done()
@@ -135,3 +185,25 @@ def format_details(limit: int = 10) -> str:
f"(wait {item['wait_sec']}s, run {item['runtime_sec']}s)" f"(wait {item['wait_sec']}s, run {item['runtime_sec']}s)"
) )
return "\n".join(lines) return "\n".join(lines)
def format_history(limit: int = 20) -> str:
lines = ["🗂 Queue history"]
if not _history:
lines.append("(empty)")
return "\n".join(lines)
for item in list(_history)[:limit]:
t = time.strftime("%m-%d %H:%M:%S", time.localtime(item["finished_at"]))
lines.append(
f"{t} {item['label']} {item['status']} "
f"(wait {item['wait_sec']}s, run {item['runtime_sec']}s)"
)
return "\n".join(lines)
def get_history_raw() -> list[dict[str, Any]]:
return list(_history)
def get_stats() -> dict[str, Any]:
return dict(_stats)

View File

@@ -1,9 +1,13 @@
import json import json
import os import os
import threading
import tempfile
from typing import Any, Dict from typing import Any, Dict
_PATH = "/var/server-bot/runtime.json" _PATH = "/var/server-bot/runtime.json"
_STATE: Dict[str, Any] = {} _STATE: Dict[str, Any] = {}
_LOCK = threading.RLock()
_LOADED = False
def configure(path: str | None): def configure(path: str | None):
@@ -13,40 +17,57 @@ def configure(path: str | None):
def _load_from_disk(): def _load_from_disk():
global _STATE global _STATE, _LOADED
if not os.path.exists(_PATH): if not os.path.exists(_PATH):
_STATE = {} _STATE = {}
_LOADED = True
return return
try: try:
with open(_PATH, "r", encoding="utf-8") as f: with open(_PATH, "r", encoding="utf-8") as f:
_STATE = json.load(f) _STATE = json.load(f)
except Exception: except Exception:
_STATE = {} _STATE = {}
_LOADED = True
def _save(): def _save():
os.makedirs(os.path.dirname(_PATH), exist_ok=True) directory = os.path.dirname(_PATH) or "."
os.makedirs(directory, exist_ok=True)
try: try:
with open(_PATH, "w", encoding="utf-8") as f: fd, tmp_path = tempfile.mkstemp(prefix=".runtime.", suffix=".json", dir=directory)
json.dump(_STATE, f) try:
with os.fdopen(fd, "w", encoding="utf-8") as f:
json.dump(_STATE, f, ensure_ascii=False)
f.flush()
os.fsync(f.fileno())
os.replace(tmp_path, _PATH)
finally:
if os.path.exists(tmp_path):
try:
os.unlink(tmp_path)
except Exception:
pass
except Exception: except Exception:
pass pass
def get_state() -> Dict[str, Any]: def get_state() -> Dict[str, Any]:
if not _STATE: with _LOCK:
if not _LOADED:
_load_from_disk() _load_from_disk()
return _STATE return _STATE
def set_state(key: str, value: Any): def set_state(key: str, value: Any):
if not _STATE: with _LOCK:
if not _LOADED:
_load_from_disk() _load_from_disk()
_STATE[key] = value _STATE[key] = value
_save() _save()
def get(key: str, default: Any = None) -> Any: def get(key: str, default: Any = None) -> Any:
if not _STATE: with _LOCK:
if not _LOADED:
_load_from_disk() _load_from_disk()
return _STATE.get(key, default) return _STATE.get(key, default)

View File

@@ -5,10 +5,20 @@ from typing import Any
from services.health import health from services.health import health
from services.runner import run_cmd_full from services.runner import run_cmd_full
from services.incidents import log_incident
from services import runtime_state
async def run_selftest(cfg: dict[str, Any], docker_map: dict[str, str]) -> str: def _save_history(entry: dict[str, Any]) -> None:
hist = runtime_state.get("selftest_history", [])
hist = hist[:50] if isinstance(hist, list) else []
hist.insert(0, entry)
runtime_state.set_state("selftest_history", hist[:20])
async def run_selftest(cfg: dict[str, Any], docker_map: dict[str, str]) -> tuple[str, bool]:
lines = ["🧪 Self-test"] lines = ["🧪 Self-test"]
ok = True
# health # health
try: try:
@@ -18,6 +28,7 @@ async def run_selftest(cfg: dict[str, Any], docker_map: dict[str, str]) -> str:
lines.append(f"🟢 Health: {brief}") lines.append(f"🟢 Health: {brief}")
except Exception as e: except Exception as e:
lines.append(f"🔴 Health failed: {e}") lines.append(f"🔴 Health failed: {e}")
ok = False
# restic snapshots check # restic snapshots check
rc, out = await run_cmd_full(["restic", "snapshots", "--json"], use_restic_env=True, timeout=40) rc, out = await run_cmd_full(["restic", "snapshots", "--json"], use_restic_env=True, timeout=40)
@@ -35,8 +46,21 @@ async def run_selftest(cfg: dict[str, Any], docker_map: dict[str, str]) -> str:
lines.append("🟡 Restic snapshots: invalid JSON") lines.append("🟡 Restic snapshots: invalid JSON")
else: else:
lines.append(f"🔴 Restic snapshots error: {out.strip() or rc}") lines.append(f"🔴 Restic snapshots error: {out.strip() or rc}")
ok = False
return "\n".join(lines) result_text = "\n".join(lines)
try:
_save_history(
{
"ts": datetime.now().isoformat(),
"ok": ok,
"summary": result_text.splitlines()[1] if len(lines) > 1 else "",
}
)
except Exception:
pass
return result_text, ok
async def schedule_selftest(cfg: dict[str, Any], bot, admin_ids: list[int], docker_map: dict[str, str]): async def schedule_selftest(cfg: dict[str, Any], bot, admin_ids: list[int], docker_map: dict[str, str]):
@@ -58,9 +82,14 @@ async def schedule_selftest(cfg: dict[str, Any], bot, admin_ids: list[int], dock
if run_at <= now: if run_at <= now:
run_at += timedelta(days=1) run_at += timedelta(days=1)
await asyncio.sleep((run_at - now).total_seconds()) await asyncio.sleep((run_at - now).total_seconds())
text = await run_selftest(cfg, docker_map) text, ok = await run_selftest(cfg, docker_map)
for chat_id in admin_ids: for chat_id in admin_ids:
try: try:
await bot.send_message(chat_id, text) await bot.send_message(chat_id, text)
except Exception: except Exception:
pass pass
if not ok:
try:
log_incident(cfg, "selftest failed", category="selftest")
except Exception:
pass

View File

@@ -1,5 +1,6 @@
import subprocess import subprocess
import os import os
import re
def _cmd(cmd: str) -> str: def _cmd(cmd: str) -> str:
@@ -82,6 +83,62 @@ def list_disks() -> list[str]:
return disks return disks
def list_md_arrays() -> list[str]:
# Prefer /proc/mdstat: it reliably lists active md arrays
# even when lsblk tree/filters differ across distros.
out = _cmd("cat /proc/mdstat")
arrays: set[str] = set()
for line in out.splitlines():
m = re.match(r"^\s*(md\d+)\s*:", line)
if m:
arrays.add(f"/dev/{m.group(1)}")
if arrays:
return sorted(arrays)
# Fallback for environments where mdstat parsing is unavailable.
out = _cmd("ls -1 /dev/md* 2>/dev/null")
for line in out.splitlines():
dev = line.strip()
if dev and re.match(r"^/dev/md\d+$", dev):
arrays.add(dev)
return sorted(arrays)
def md_array_status(dev: str) -> str:
out = _cmd("cat /proc/mdstat")
if not out or "ERROR:" in out:
return "⚠️ n/a"
name = dev.rsplit("/", 1)[-1]
lines = out.splitlines()
header = None
idx = -1
for i, line in enumerate(lines):
s = line.strip()
if s.startswith(f"{name} :"):
header = s
idx = i
break
if not header:
return "⚠️ not found in /proc/mdstat"
if "inactive" in header:
return "🔴 inactive"
# Typical mdstat health marker: [UU] for healthy mirrors/raid members.
block = [header]
for line in lines[idx + 1:]:
if not line.strip():
break
block.append(line.strip())
block_text = " ".join(block)
if "[U_" in block_text or "[_U" in block_text:
return "🟡 degraded"
return "🟢 active"
def smart_health(dev: str) -> str: def smart_health(dev: str) -> str:
out = _cmd(f"smartctl -H {dev}") out = _cmd(f"smartctl -H {dev}")
@@ -138,8 +195,9 @@ def smart_last_test(dev: str) -> str:
def disks() -> str: def disks() -> str:
disks = list_disks() disks = list_disks()
md_arrays = list_md_arrays()
if not disks: if not disks and not md_arrays:
return "💽 Disks\n\n❌ No disks found" return "💽 Disks\n\n❌ No disks found"
lines = ["💽 Disks (SMART)\n"] lines = ["💽 Disks (SMART)\n"]
@@ -158,6 +216,12 @@ def disks() -> str:
lines.append(f"{icon} {d}{health}, 🌡 {temp}") lines.append(f"{icon} {d}{health}, 🌡 {temp}")
if md_arrays:
lines.append("")
lines.append("🧱 RAID (md)")
for md in md_arrays:
lines.append(f"{md}{md_array_status(md)}")
return "\n".join(lines) return "\n".join(lines)

View File

@@ -0,0 +1,20 @@
import unittest
from services.config_check import validate_cfg
class ConfigCheckTests(unittest.TestCase):
def test_admin_ids_without_admin_id_is_valid(self):
cfg = {
"telegram": {
"token": "x",
"admin_ids": [1, 2],
}
}
errors, warnings = validate_cfg(cfg)
self.assertEqual(errors, [])
self.assertIsInstance(warnings, list)
if __name__ == "__main__":
unittest.main()

21
tests/test_disk_report.py Normal file
View File

@@ -0,0 +1,21 @@
import unittest
import types
import sys
# Avoid runtime import of real app/aiogram in services.runner.
sys.modules.setdefault("app", types.SimpleNamespace(RESTIC_ENV={}))
from services.disk_report import _top_dirs_cmd
class DiskReportTests(unittest.TestCase):
def test_top_dirs_cmd_uses_exec_args_without_shell(self):
cmd = _top_dirs_cmd("/tmp/path with spaces", 5)
self.assertEqual(cmd[:4], ["du", "-x", "-h", "-d"])
self.assertNotIn("bash", cmd)
self.assertNotIn("-lc", cmd)
self.assertEqual(cmd[-1], "/tmp/path with spaces")
if __name__ == "__main__":
unittest.main()

59
tests/test_queue.py Normal file
View File

@@ -0,0 +1,59 @@
import asyncio
import tempfile
import unittest
from services import runtime_state
from services import queue as queue_service
class QueueTests(unittest.IsolatedAsyncioTestCase):
async def asyncSetUp(self):
self.tmp = tempfile.TemporaryDirectory()
runtime_state.configure(f"{self.tmp.name}/runtime.json")
queue_service._pending.clear() # type: ignore[attr-defined]
queue_service._history.clear() # type: ignore[attr-defined]
queue_service._stats = { # type: ignore[attr-defined]
"processed": 0,
"avg_wait_sec": 0.0,
"avg_runtime_sec": 0.0,
"last_label": "",
"last_finished_at": 0.0,
}
queue_service._cfg = {"incidents": {"enabled": True}} # type: ignore[attr-defined]
async def asyncTearDown(self):
self.tmp.cleanup()
async def test_worker_logs_failed_job_to_incidents(self):
logged = []
def fake_log_incident(cfg, text, category=None):
logged.append((text, category))
orig = queue_service.log_incident
queue_service.log_incident = fake_log_incident
async def boom():
raise RuntimeError("boom")
worker_task = asyncio.create_task(queue_service.worker())
try:
await queue_service.enqueue("broken-job", boom)
await asyncio.wait_for(queue_service._queue.join(), timeout=2.0) # type: ignore[attr-defined]
finally:
worker_task.cancel()
with contextlib.suppress(asyncio.CancelledError):
await worker_task
queue_service.log_incident = orig
self.assertEqual(queue_service._stats.get("processed"), 1) # type: ignore[attr-defined]
self.assertTrue(any("queue_job_failed label=broken-job" in t for t, _c in logged))
self.assertTrue(any(c == "queue" for _t, c in logged))
import contextlib
if __name__ == "__main__":
unittest.main()

View File

@@ -0,0 +1,28 @@
import json
import tempfile
import unittest
from pathlib import Path
from services import runtime_state
class RuntimeStateTests(unittest.TestCase):
def test_set_and_get_persist_between_loads(self):
with tempfile.TemporaryDirectory() as tmp:
path = Path(tmp) / "runtime.json"
runtime_state.configure(str(path))
runtime_state.set_state("foo", {"bar": 1})
self.assertEqual(runtime_state.get("foo"), {"bar": 1})
# Force a fresh in-memory state and load from disk again.
runtime_state._STATE = {} # type: ignore[attr-defined]
runtime_state._LOADED = False # type: ignore[attr-defined]
self.assertEqual(runtime_state.get("foo"), {"bar": 1})
raw = json.loads(path.read_text(encoding="utf-8"))
self.assertEqual(raw.get("foo"), {"bar": 1})
if __name__ == "__main__":
unittest.main()