Compare commits

..

52 Commits

Author SHA1 Message Date
b84107463c Add dedicated RAID alert category and monitor 2026-02-25 01:43:10 +03:00
ee361abb99 Detect md arrays via /proc/mdstat for RAID status 2026-02-25 01:39:11 +03:00
2ad423fb6a Fix md RAID detection for lsblk raid* types 2026-02-25 01:36:59 +03:00
efa5dd9644 Fix mojibake text and add md RAID checks 2026-02-25 01:32:55 +03:00
678332e6d0 Add lightweight unittest coverage for stability fixes 2026-02-15 01:25:11 +03:00
7c56430f32 Unify admin callback checks and log queue job failures 2026-02-15 01:20:55 +03:00
b54a094185 Add safe config fallbacks for app init and health checks 2026-02-15 01:16:58 +03:00
6d5fb9c258 Harden docker callback parsing and remove duplicate /openwrt handler 2026-02-15 01:12:45 +03:00
5099ae4fe2 Fix critical race conditions and unsafe disk report command 2026-02-15 01:12:41 +03:00
568cd86844 Fix heatmap button args 2026-02-15 00:51:09 +03:00
b138ee316d Import backup keyboard for SLA handlers 2026-02-15 00:46:53 +03:00
fa98a96b34 Route any SLA text to corresponding handler 2026-02-15 00:45:56 +03:00
1dba6d4a0f Match SLA buttons via regex 2026-02-15 00:44:14 +03:00
b784deb02b Ack SLA requests immediately 2026-02-15 00:35:32 +03:00
5ae54618e8 Broaden SLA button matching 2026-02-15 00:32:09 +03:00
3fc99bdcfc Handle SLA buttons without emojis 2026-02-15 00:30:39 +03:00
c1d69adbc5 Make incidents diff resilient and send sample if empty 2026-02-09 04:21:27 +03:00
a14fb8fccd Show recent sample when incidents diff is empty 2026-02-09 04:19:59 +03:00
4ba8f48228 Auto-reset incidents diff marker if ahead of log 2026-02-09 04:18:05 +03:00
10bf265c29 Add reset option to /incidents_diff 2026-02-09 04:16:28 +03:00
fd179d24e8 Remove Incidents entry from main keyboard 2026-02-09 04:13:47 +03:00
2905528677 Keep incidents summary inside logs keyboard 2026-02-09 04:12:44 +03:00
2b87ce04a3 Keep backup/queue SLA and OpenWrt leases diff in their menus 2026-02-09 04:10:04 +03:00
02b8e2bb55 Keep docker restarts inside docker keyboard 2026-02-09 04:08:27 +03:00
f0fb2aad0e Split OpenWrt menu vs full status actions 2026-02-09 04:06:49 +03:00
219776c642 Disambiguate OpenWrt menu vs full status button 2026-02-09 04:05:25 +03:00
28caa551bd Narrow /docker_health match to avoid summary collisions 2026-02-09 04:03:17 +03:00
783f4abd98 Use icon buttons for incidents, queue and OpenWrt actions 2026-02-09 04:00:04 +03:00
f71c02835a Adjust keyboards with incidents and OpenWrt submenus 2026-02-09 03:45:13 +03:00
f7081b78e1 Add incident exports, queue SLA, and OpenWrt diff utilities 2026-02-09 02:57:16 +03:00
0fbd374823 Log docker restarts as incidents 2026-02-09 02:45:06 +03:00
c3db70160c Use semicolon delimiter in incidents_export CSV 2026-02-09 02:32:50 +03:00
1b9d260530 Use BufferedInputFile for incidents_export 2026-02-09 02:31:24 +03:00
040a6c96e4 Seek to start before sending incidents export files 2026-02-09 02:30:17 +03:00
4f6d6dd549 Fix incidents_export file delivery 2026-02-09 02:28:49 +03:00
2e0bf0c6ea Add incidents export, queue alerts, and health summaries 2026-02-09 02:24:08 +03:00
5a4234f59d Log incidents even when alerts are muted 2026-02-09 02:09:32 +03:00
1d24caa2a2 Fix docker_status log_incident indentation 2026-02-09 02:04:15 +03:00
c91c961134 Tag incidents with categories for summaries 2026-02-09 02:03:04 +03:00
75113b6182 Add selftest scheduler, queue history, and OpenWrt signal stats 2026-02-09 01:56:27 +03:00
aa7bd85687 Filter restic forget parsing to ignore summary rows 2026-02-09 01:41:11 +03:00
ff65e15509 Beautify restic forget table in backup history 2026-02-09 01:39:06 +03:00
08fa95dffd Trim backup history output to fit Telegram 2026-02-09 01:35:41 +03:00
b0a4413671 Add runtime state, auto-mute schedules, and backup retries 2026-02-09 01:14:37 +03:00
9399be4168 Update help with alert shortcuts and docker/openwrt commands 2026-02-08 23:34:27 +03:00
2e35885a5e Fix cfg import in docker handler 2026-02-08 23:31:38 +03:00
4d4e3767bc Add weekly report, multi-admin, docker health cmd, backup tail, openwrt filters 2026-02-08 23:27:23 +03:00
b78dc3cd5c Limit /alerts handler to exact command (fix alias collisions) 2026-02-08 23:09:09 +03:00
20cd56a8c0 Add inline alerts menu with callbacks 2026-02-08 23:07:39 +03:00
7d251a7078 Fix alerts command dispatch indentation 2026-02-08 23:04:35 +03:00
2ee9756d12 Add shortcut commands for alerts, backup, docker, openwrt 2026-02-08 23:01:33 +03:00
77571da4d9 Add /help alias for inline help 2026-02-08 22:54:50 +03:00
33 changed files with 2156 additions and 134 deletions

View File

@@ -6,10 +6,12 @@ This project uses `config.yaml`. Start from `config.example.yaml`.
- `token` (string, required): Telegram bot token. - `token` (string, required): Telegram bot token.
- `admin_id` (int, required): Telegram user id with admin access. - `admin_id` (int, required): Telegram user id with admin access.
- `admin_ids` (list<int>): Optional list of admins (first is primary for alerts).
## paths ## paths
- `artifact_state` (string): JSON file for artifact state. - `artifact_state` (string): JSON file for artifact state.
- `runtime_state` (string): File for runtime state (mutes, metrics, etc.).
- `restic_env` (string): Path to a file with RESTIC_* environment variables. - `restic_env` (string): Path to a file with RESTIC_* environment variables.
## thresholds ## thresholds
@@ -30,11 +32,19 @@ This project uses `config.yaml`. Start from `config.example.yaml`.
- `start` (string): Start time `HH:MM` (e.g. `23:00`). - `start` (string): Start time `HH:MM` (e.g. `23:00`).
- `end` (string): End time `HH:MM` (e.g. `08:00`). - `end` (string): End time `HH:MM` (e.g. `08:00`).
- `allow_critical` (bool): Allow critical alerts during quiet hours. - `allow_critical` (bool): Allow critical alerts during quiet hours.
- `auto_mute` (list): Per-category auto mutes by time window.
- `category` (string): load/disk/smart/raid/ssl/docker/test.
- `start` (string): Start `HH:MM`.
- `end` (string): End `HH:MM` (can wrap over midnight).
- `auto_mute_on_high_load_sec` (int): auto-mute `load` category for N seconds on critical load (0 disables).
- `notify_recovery` (bool): Send recovery notifications. - `notify_recovery` (bool): Send recovery notifications.
- `smart_enabled` (bool): Enable SMART health polling. - `smart_enabled` (bool): Enable SMART health polling.
- `smart_interval_sec` (int): SMART poll interval. - `smart_interval_sec` (int): SMART poll interval.
- `smart_cooldown_sec` (int): SMART alert cooldown. - `smart_cooldown_sec` (int): SMART alert cooldown.
- `smart_temp_warn` (int): SMART temperature warning (C). - `smart_temp_warn` (int): SMART temperature warning (C).
- `raid_enabled` (bool): Enable md RAID polling (`/proc/mdstat`).
- `raid_interval_sec` (int): RAID poll interval.
- `raid_cooldown_sec` (int): RAID alert cooldown.
## disk_report ## disk_report
@@ -70,6 +80,22 @@ This project uses `config.yaml`. Start from `config.example.yaml`.
- `dry_run` (bool): If `true`, dangerous actions (upgrade/reboot/backup) are skipped. - `dry_run` (bool): If `true`, dangerous actions (upgrade/reboot/backup) are skipped.
## reports
- `weekly.enabled` (bool): Enable weekly report.
- `weekly.day` (string): Weekday `Mon`..`Sun` (default `Sun`).
- `weekly.time` (string): Local time `HH:MM` (default `08:00`).
## selftest
- `schedule.enabled` (bool): Enable auto self-test.
- `schedule.time` (string): Local time `HH:MM` (default `03:30`).
## queue
- `max_pending_alert` (int): Alert if pending tasks >= this value.
- `avg_wait_alert` (int): Alert if average wait exceeds N seconds.
- `cooldown_sec` (int): Cooldown between queue alerts (default 300s).
## external_checks ## external_checks
- `enabled` (bool): Enable background checks. - `enabled` (bool): Enable background checks.

View File

@@ -6,10 +6,12 @@
- `token` (string, обяз.): токен бота. - `token` (string, обяз.): токен бота.
- `admin_id` (int, обяз.): Telegram user id администратора. - `admin_id` (int, обяз.): Telegram user id администратора.
- `admin_ids` (list<int>): список админов (первый используется как основной для уведомлений).
## paths ## paths
- `artifact_state` (string): JSON файл состояния артефактов. - `artifact_state` (string): JSON файл состояния артефактов.
- `runtime_state` (string): файл с runtime-состоянием (мьюты, метрики и т.п.).
- `restic_env` (string): путь к файлу с RESTIC_* переменными. - `restic_env` (string): путь к файлу с RESTIC_* переменными.
## thresholds ## thresholds
@@ -30,11 +32,19 @@
- `start` (string): начало, формат `HH:MM` (например `23:00`). - `start` (string): начало, формат `HH:MM` (например `23:00`).
- `end` (string): конец, формат `HH:MM` (например `08:00`). - `end` (string): конец, формат `HH:MM` (например `08:00`).
- `allow_critical` (bool): слать критичные алерты в тишину. - `allow_critical` (bool): слать критичные алерты в тишину.
- `auto_mute` (list): авто‑мьюты по категориям и времени.
- `category` (string): load/disk/smart/raid/ssl/docker/test.
- `start` (string): начало `HH:MM`.
- `end` (string): конец `HH:MM` (интервал может пересекать ночь).
- `auto_mute_on_high_load_sec` (int): при critical load автоматически мьютить категорию `load` на N секунд (0 — выкл).
- `notify_recovery` (bool): уведомлять о восстановлении. - `notify_recovery` (bool): уведомлять о восстановлении.
- `smart_enabled` (bool): SMART проверки. - `smart_enabled` (bool): SMART проверки.
- `smart_interval_sec` (int): интервал SMART. - `smart_interval_sec` (int): интервал SMART.
- `smart_cooldown_sec` (int): кулдаун SMART. - `smart_cooldown_sec` (int): кулдаун SMART.
- `smart_temp_warn` (int): порог температуры (C). - `smart_temp_warn` (int): порог температуры (C).
- `raid_enabled` (bool): RAID проверки (`/proc/mdstat`).
- `raid_interval_sec` (int): интервал RAID.
- `raid_cooldown_sec` (int): кулдаун RAID алертов.
## disk_report ## disk_report
@@ -70,6 +80,23 @@
- `dry_run` (bool): если `true`, опасные действия (upgrade/reboot/backup) не выполняются. - `dry_run` (bool): если `true`, опасные действия (upgrade/reboot/backup) не выполняются.
## reports
- `weekly.enabled` (bool): включить еженедельный отчёт.
- `weekly.day` (string): день недели (`Mon`..`Sun`), по умолчанию `Sun`.
- `weekly.time` (string): локальное время `HH:MM`, по умолчанию `08:00`.
## selftest
- `schedule.enabled` (bool): включить авто self-test.
- `schedule.time` (string): локальное время `HH:MM`, по умолчанию `03:30`.
## queue
- `max_pending_alert` (int): алерт, если задач в очереди >= этому значению.
- `avg_wait_alert` (int): алерт, если среднее ожидание превышает N секунд.
- `cooldown_sec` (int): кулдаун между алертами очереди, по умолчанию 300с.
## external_checks ## external_checks
- `enabled` (bool): включить фоновые проверки. - `enabled` (bool): включить фоновые проверки.

15
app.py
View File

@@ -1,13 +1,22 @@
from aiogram import Bot, Dispatcher from aiogram import Bot, Dispatcher
from config import load_cfg, load_env from config import load_cfg, load_env
from services import runtime_state
cfg = load_cfg() cfg = load_cfg()
TOKEN = cfg["telegram"]["token"] TOKEN = cfg["telegram"]["token"]
ADMIN_ID = cfg["telegram"]["admin_id"] admin_ids_cfg = cfg["telegram"].get("admin_ids")
if isinstance(admin_ids_cfg, list) and admin_ids_cfg:
ADMIN_IDS = [int(x) for x in admin_ids_cfg]
ADMIN_ID = ADMIN_IDS[0]
else:
ADMIN_ID = int(cfg["telegram"]["admin_id"])
ADMIN_IDS = [ADMIN_ID]
ARTIFACT_STATE = cfg["paths"]["artifact_state"] paths_cfg = cfg.get("paths", {})
RESTIC_ENV = load_env(cfg["paths"].get("restic_env", "/etc/restic/restic.env")) runtime_state.configure(paths_cfg.get("runtime_state", "/var/server-bot/runtime.json"))
ARTIFACT_STATE = paths_cfg.get("artifact_state", "/opt/tg-bot/state.json")
RESTIC_ENV = load_env(paths_cfg.get("restic_env", "/etc/restic/restic.env"))
DISK_WARN = int(cfg.get("thresholds", {}).get("disk_warn", 80)) DISK_WARN = int(cfg.get("thresholds", {}).get("disk_warn", 80))
LOAD_WARN = float(cfg.get("thresholds", {}).get("load_warn", 2.0)) LOAD_WARN = float(cfg.get("thresholds", {}).get("load_warn", 2.0))

View File

@@ -1,10 +1,10 @@
from aiogram.types import Message, CallbackQuery from aiogram.types import Message, CallbackQuery
from app import ADMIN_ID from app import ADMIN_IDS
def is_admin_msg(msg: Message) -> bool: def is_admin_msg(msg: Message) -> bool:
return msg.from_user and msg.from_user.id == ADMIN_ID return msg.from_user and msg.from_user.id in ADMIN_IDS
def is_admin_cb(cb: CallbackQuery) -> bool: def is_admin_cb(cb: CallbackQuery) -> bool:
return cb.from_user and cb.from_user.id == ADMIN_ID return cb.from_user and cb.from_user.id in ADMIN_IDS

View File

@@ -1,10 +1,14 @@
telegram: telegram:
token: "YOUR_TELEGRAM_BOT_TOKEN" token: "YOUR_TELEGRAM_BOT_TOKEN"
admin_id: 123456789 admin_id: 123456789
# Optional list of admins (first is primary for alerts)
admin_ids:
- 123456789
paths: paths:
# JSON state file for artifacts # JSON state file for artifacts
artifact_state: "/opt/tg-bot/state.json" artifact_state: "/opt/tg-bot/state.json"
runtime_state: "/var/server-bot/runtime.json"
# Optional env file with RESTIC_* variables # Optional env file with RESTIC_* variables
restic_env: "/etc/restic/restic.env" restic_env: "/etc/restic/restic.env"
@@ -21,6 +25,13 @@ alerts:
notify_cooldown_sec: 900 notify_cooldown_sec: 900
# If true, only critical load alerts are sent (no warn/OK) # If true, only critical load alerts are sent (no warn/OK)
load_only_critical: false load_only_critical: false
# Optional auto-mute windows per category
auto_mute:
- category: "load"
start: "23:00"
end: "08:00"
# Auto-mute load when critical load fires (seconds)
auto_mute_on_high_load_sec: 600
quiet_hours: quiet_hours:
enabled: false enabled: false
start: "23:00" start: "23:00"
@@ -32,6 +43,9 @@ alerts:
smart_interval_sec: 3600 smart_interval_sec: 3600
smart_cooldown_sec: 21600 smart_cooldown_sec: 21600
smart_temp_warn: 50 smart_temp_warn: 50
raid_enabled: true
raid_interval_sec: 300
raid_cooldown_sec: 1800
disk_report: disk_report:
threshold: 90 threshold: 90
@@ -63,6 +77,22 @@ safety:
# If true, dangerous actions will be skipped # If true, dangerous actions will be skipped
dry_run: false dry_run: false
reports:
weekly:
enabled: false
day: "Sun" # Mon/Tue/Wed/Thu/Fri/Sat/Sun
time: "08:00" # HH:MM server local time
selftest:
schedule:
enabled: false
time: "03:30"
queue:
max_pending_alert: 5
avg_wait_alert: 120
cooldown_sec: 300
external_checks: external_checks:
enabled: true enabled: true
state_path: "/var/server-bot/external_checks.json" state_path: "/var/server-bot/external_checks.json"

View File

@@ -1,11 +1,11 @@
import time import time
from datetime import datetime, timedelta, timezone from datetime import datetime, timedelta, timezone
from aiogram import F from aiogram import F
from aiogram.types import Message from aiogram.types import Message, CallbackQuery, InlineKeyboardMarkup, InlineKeyboardButton
from app import dp, bot, cfg, ADMIN_ID from app import dp, bot, cfg, ADMIN_ID
from auth import is_admin_msg from auth import is_admin_msg
from services.alert_mute import set_mute, clear_mute, list_mutes from services.alert_mute import set_mute, clear_mute, list_mutes
from services.incidents import read_recent from services.incidents import read_recent, log_incident
from services.notify import notify from services.notify import notify
@@ -16,54 +16,50 @@ HELP_TEXT = (
"/alerts unmute <category> - unmute category\n" "/alerts unmute <category> - unmute category\n"
"/alerts list - show active mutes\n" "/alerts list - show active mutes\n"
"/alerts recent [hours] - show incidents log (default 24h)\n" "/alerts recent [hours] - show incidents log (default 24h)\n"
"Categories: load, disk, smart, ssl, docker, test\n" "Categories: load, disk, smart, raid, ssl, docker, test\n"
) )
@dp.message(F.text.startswith("/alerts")) def _dispatch(msg: Message, action: str, args: list[str]):
async def alerts_cmd(msg: Message): return {"action": action, "args": args}
if not is_admin_msg(msg):
return
parts = msg.text.split()
if len(parts) < 2:
await msg.answer(HELP_TEXT)
return
action = parts[1].lower()
async def _handle_alerts(msg: Message, action: str, args: list[str]):
if action == "test": if action == "test":
level = parts[2].lower() if len(parts) >= 3 else "info" level = args[0].lower() if args else "info"
if level not in ("critical", "warn", "info"): if level not in ("critical", "warn", "info"):
level = "info" level = "info"
key = f"test:{level}:{int(time.time())}" key = f"test:{level}:{int(time.time())}"
await notify(bot, msg.chat.id, f"[TEST] {level.upper()} alert", level=level, key=key, category="test") await notify(bot, msg.chat.id, f"[TEST] {level.upper()} alert", level=level, key=key, category="test")
await msg.answer(f"Sent test alert: {level}") await msg.answer(f"Sent test alert: {level}")
log_incident(cfg, f"alert_test level={level} by {msg.from_user.id}", category="test")
return return
if action == "mute": if action == "mute":
if len(parts) < 3: if len(args) < 1:
await msg.answer("Usage: /alerts mute <category> <minutes>") await msg.answer("Usage: /alerts mute <category> <minutes>")
return return
category = parts[2].lower() category = args[0].lower()
minutes = 60 minutes = 60
if len(parts) >= 4: if len(args) >= 2:
try: try:
minutes = max(1, int(parts[3])) minutes = max(1, int(args[1]))
except ValueError: except ValueError:
minutes = 60 minutes = 60
until = set_mute(category, minutes * 60) until = set_mute(category, minutes * 60)
dt = datetime.fromtimestamp(until, tz=timezone.utc).astimezone() dt = datetime.fromtimestamp(until, tz=timezone.utc).astimezone()
await msg.answer(f"🔕 Muted {category} for {minutes}m (until {dt:%Y-%m-%d %H:%M:%S})") await msg.answer(f"🔕 Muted {category} for {minutes}m (until {dt:%Y-%m-%d %H:%M:%S})")
log_incident(cfg, f"alert_mute category={category} minutes={minutes} by {msg.from_user.id}", category=category)
return return
if action == "unmute": if action == "unmute":
if len(parts) < 3: if len(args) < 1:
await msg.answer("Usage: /alerts unmute <category>") await msg.answer("Usage: /alerts unmute <category>")
return return
category = parts[2].lower() category = args[0].lower()
clear_mute(category) clear_mute(category)
await msg.answer(f"🔔 Unmuted {category}") await msg.answer(f"🔔 Unmuted {category}")
log_incident(cfg, f"alert_unmute category={category} by {msg.from_user.id}", category=category)
return return
if action in ("list", "mutes"): if action in ("list", "mutes"):
@@ -80,9 +76,9 @@ async def alerts_cmd(msg: Message):
if action == "recent": if action == "recent":
hours = 24 hours = 24
if len(parts) >= 3: if args:
try: try:
hours = max(1, int(parts[2])) hours = max(1, int(args[0]))
except ValueError: except ValueError:
hours = 24 hours = 24
rows = read_recent(cfg, hours, limit=50) rows = read_recent(cfg, hours, limit=50)
@@ -93,3 +89,74 @@ async def alerts_cmd(msg: Message):
return return
await msg.answer(HELP_TEXT) await msg.answer(HELP_TEXT)
ALERTS_KB = InlineKeyboardMarkup(
inline_keyboard=[
[
InlineKeyboardButton(text="List", callback_data="alerts:list"),
InlineKeyboardButton(text="Recent 24h", callback_data="alerts:recent:24"),
],
[
InlineKeyboardButton(text="Mute load 60m", callback_data="alerts:mute:load:60"),
InlineKeyboardButton(text="Unmute load", callback_data="alerts:unmute:load"),
],
[
InlineKeyboardButton(text="Test CRIT", callback_data="alerts:test:critical"),
InlineKeyboardButton(text="Test WARN", callback_data="alerts:test:warn"),
InlineKeyboardButton(text="Test INFO", callback_data="alerts:test:info"),
],
]
)
@dp.message(F.text.regexp(r"^/alerts(\\s|$)"))
async def alerts_cmd(msg: Message):
if not is_admin_msg(msg):
return
parts = msg.text.split()
if len(parts) < 2:
await msg.answer(HELP_TEXT, reply_markup=ALERTS_KB)
return
action = parts[1].lower()
args = parts[2:]
await _handle_alerts(msg, action, args)
@dp.message(F.text == "/alerts_list")
async def alerts_list(msg: Message):
if not is_admin_msg(msg):
return
await _handle_alerts(msg, "list", [])
@dp.message(F.text == "/alerts_recent")
async def alerts_recent(msg: Message):
if not is_admin_msg(msg):
return
await _handle_alerts(msg, "recent", ["24"])
@dp.message(F.text == "/alerts_mute_load")
async def alerts_mute_load(msg: Message):
if not is_admin_msg(msg):
return
await _handle_alerts(msg, "mute", ["load", "60"])
@dp.callback_query(F.data.startswith("alerts:"))
async def alerts_cb(cb: CallbackQuery):
if cb.from_user.id != ADMIN_ID:
await cb.answer()
return
parts = cb.data.split(":")
# formats: alerts:action or alerts:action:arg1:arg2
if len(parts) < 2:
await cb.answer()
return
action = parts[1]
args = parts[2:] if len(parts) > 2 else []
await _handle_alerts(cb.message, action, args)
await cb.answer()

View File

@@ -2,7 +2,7 @@ import asyncio
from datetime import datetime from datetime import datetime
from aiogram import F from aiogram import F
from aiogram.types import Message, InlineKeyboardMarkup, InlineKeyboardButton, CallbackQuery from aiogram.types import Message, InlineKeyboardMarkup, InlineKeyboardButton, CallbackQuery
from app import dp, cfg from app import dp, cfg, ADMIN_IDS
from auth import is_admin_msg from auth import is_admin_msg
from keyboards import docker_kb, arcane_kb from keyboards import docker_kb, arcane_kb
from services.arcane import list_projects, restart_project, set_project_state, get_project_details from services.arcane import list_projects, restart_project, set_project_state, get_project_details
@@ -115,7 +115,7 @@ async def arcane_refresh(msg: Message):
@dp.callback_query(F.data == "arcane:refresh") @dp.callback_query(F.data == "arcane:refresh")
async def arcane_refresh_inline(cb: CallbackQuery): async def arcane_refresh_inline(cb: CallbackQuery):
if cb.from_user.id != cfg["telegram"]["admin_id"]: if cb.from_user.id not in ADMIN_IDS:
return return
await cb.answer() await cb.answer()
await cmd_arcane_projects(cb.message, edit=True) await cmd_arcane_projects(cb.message, edit=True)
@@ -123,7 +123,7 @@ async def arcane_refresh_inline(cb: CallbackQuery):
@dp.callback_query(F.data.startswith("arcane:page:")) @dp.callback_query(F.data.startswith("arcane:page:"))
async def arcane_page(cb: CallbackQuery): async def arcane_page(cb: CallbackQuery):
if cb.from_user.id != cfg["telegram"]["admin_id"]: if cb.from_user.id not in ADMIN_IDS:
return return
try: try:
page = int(cb.data.split(":", 2)[2]) page = int(cb.data.split(":", 2)[2])
@@ -141,7 +141,7 @@ async def arcane_page(cb: CallbackQuery):
@dp.callback_query(F.data.startswith("arcane:restart:")) @dp.callback_query(F.data.startswith("arcane:restart:"))
async def arcane_restart(cb: CallbackQuery): async def arcane_restart(cb: CallbackQuery):
if cb.from_user.id != cfg["telegram"]["admin_id"]: if cb.from_user.id not in ADMIN_IDS:
return return
_, _, pid = cb.data.split(":", 2) _, _, pid = cb.data.split(":", 2)
@@ -160,7 +160,7 @@ async def arcane_restart(cb: CallbackQuery):
@dp.callback_query(F.data.startswith("arcane:details:")) @dp.callback_query(F.data.startswith("arcane:details:"))
async def arcane_details(cb: CallbackQuery): async def arcane_details(cb: CallbackQuery):
if cb.from_user.id != cfg["telegram"]["admin_id"]: if cb.from_user.id not in ADMIN_IDS:
return return
_, _, pid = cb.data.split(":", 2) _, _, pid = cb.data.split(":", 2)
@@ -208,7 +208,7 @@ async def arcane_details(cb: CallbackQuery):
@dp.callback_query(F.data.startswith("arcane:deploy:")) @dp.callback_query(F.data.startswith("arcane:deploy:"))
async def arcane_deploy_status(cb: CallbackQuery): async def arcane_deploy_status(cb: CallbackQuery):
if cb.from_user.id != cfg["telegram"]["admin_id"]: if cb.from_user.id not in ADMIN_IDS:
return return
_, _, pid = cb.data.split(":", 2) _, _, pid = cb.data.split(":", 2)
@@ -254,7 +254,7 @@ async def arcane_deploy_status(cb: CallbackQuery):
@dp.callback_query(F.data.startswith("arcane:up:")) @dp.callback_query(F.data.startswith("arcane:up:"))
async def arcane_up(cb: CallbackQuery): async def arcane_up(cb: CallbackQuery):
if cb.from_user.id != cfg["telegram"]["admin_id"]: if cb.from_user.id not in ADMIN_IDS:
return return
_, _, pid = cb.data.split(":", 2) _, _, pid = cb.data.split(":", 2)
@@ -273,7 +273,7 @@ async def arcane_up(cb: CallbackQuery):
@dp.callback_query(F.data.startswith("arcane:down:")) @dp.callback_query(F.data.startswith("arcane:down:"))
async def arcane_down(cb: CallbackQuery): async def arcane_down(cb: CallbackQuery):
if cb.from_user.id != cfg["telegram"]["admin_id"]: if cb.from_user.id not in ADMIN_IDS:
return return
_, _, pid = cb.data.split(":", 2) _, _, pid = cb.data.split(":", 2)

View File

@@ -8,9 +8,10 @@ from app import dp, cfg
from auth import is_admin_msg, is_admin_cb from auth import is_admin_msg, is_admin_cb
from keyboards import backup_kb from keyboards import backup_kb
from lock_utils import acquire_lock, release_lock from lock_utils import acquire_lock, release_lock
from services.queue import enqueue, format_status, format_details from services.queue import enqueue, format_status, format_details, format_history
from services.backup import backup_badge, restore_help from services.backup import backup_badge, restore_help
from services.runner import run_cmd, run_cmd_full from services.runner import run_cmd, run_cmd_full
from services.incidents import log_incident
def _parse_systemctl_kv(raw: str) -> dict[str, str]: def _parse_systemctl_kv(raw: str) -> dict[str, str]:
@@ -38,13 +39,136 @@ def _sudo_cmd(cmd: list[str]) -> list[str]:
def _format_backup_result(rc: int, out: str) -> str: def _format_backup_result(rc: int, out: str) -> str:
log_hint = "log: /var/log/backup-auto.log" log_path = "/var/log/backup-auto.log"
header = "✅ Backup finished" if rc == 0 else "❌ Backup failed" header = "✅ Backup finished" if rc == 0 else "❌ Backup failed"
lines = out.strip().splitlines() lines = out.strip().splitlines()
body = "\n".join(lines[:20]) body = "\n".join(lines[:20])
if len(lines) > 20: if len(lines) > 20:
body += f"\n… trimmed {len(lines) - 20} lines" body += f"\n… trimmed {len(lines) - 20} lines"
return f"{header} (rc={rc})\n{log_hint}\n\n{body}" if body else f"{header} (rc={rc})\n{log_hint}" extra = ""
if rc != 0 and os.path.exists(log_path):
try:
tail = ""
with open(log_path, "r", encoding="utf-8", errors="replace") as f:
tail_lines = f.readlines()[-40:]
tail = "".join(tail_lines).strip()
if tail:
extra = "\n\nLog tail:\n" + tail
except Exception:
pass
base = f"{header} (rc={rc})\nlog: {log_path}"
if body:
base += "\n\n" + body
if extra:
base += extra
return base
def _tail(path: str, lines: int = 120) -> str:
if not os.path.exists(path):
return f"⚠️ Log not found: {path}"
try:
with open(path, "r", encoding="utf-8", errors="replace") as f:
data = f.readlines()[-lines:]
except Exception as e:
return f"⚠️ Failed to read log: {e}"
return "".join(data).strip() or "(empty)"
def _beautify_restic_forget(raw: str) -> str | None:
"""
Parse restic forget output tables into a compact bullet list.
"""
if "Reasons" not in raw or "Paths" not in raw:
return None
import re
lines = raw.splitlines()
headers = []
for idx, line in enumerate(lines):
if line.startswith("ID") and "Reasons" in line and "Paths" in line:
headers.append(idx)
if not headers:
return None
def _valid_id(val: str) -> bool:
return bool(re.fullmatch(r"[0-9a-f]{7,64}", val.strip()))
def parse_block(start_idx: int, end_idx: int) -> list[dict]:
header = lines[start_idx]
cols = ["ID", "Time", "Host", "Tags", "Reasons", "Paths", "Size"]
positions = []
for name in cols:
pos = header.find(name)
if pos == -1:
return []
positions.append(pos)
positions.append(len(header))
entries: list[dict] = []
current: dict | None = None
for line in lines[start_idx + 2 : end_idx]:
if not line.strip():
continue
segments = []
for i in range(len(cols)):
segments.append(line[positions[i] : positions[i + 1]].strip())
row = dict(zip(cols, segments))
if row["ID"] and _valid_id(row["ID"]):
current = {
"id": row["ID"],
"time": row["Time"],
"host": row["Host"],
"size": row["Size"],
"tags": row["Tags"],
"reasons": [],
"paths": [],
}
if row["Reasons"]:
current["reasons"].append(row["Reasons"])
if row["Paths"]:
current["paths"].append(row["Paths"])
entries.append(current)
elif current:
if row["Reasons"] and not row["Reasons"].startswith("-"):
current["reasons"].append(row["Reasons"])
if row["Paths"] and not row["Paths"].startswith("-"):
current["paths"].append(row["Paths"])
return entries
blocks = []
for i, start in enumerate(headers):
end = headers[i + 1] if i + 1 < len(headers) else len(lines)
entries = parse_block(start, end)
if not entries:
continue
label = "Plan"
prev_line = lines[start - 1].lower() if start - 1 >= 0 else ""
prev2 = lines[start - 2].lower() if start - 2 >= 0 else ""
if "keep" in prev_line:
label = prev_line.strip()
elif "keep" in prev2:
label = prev2.strip()
elif "snapshots" in prev_line:
label = prev_line.strip()
blocks.append((label, entries))
if not blocks:
return None
out_lines = []
for label, entries in blocks:
out_lines.append(f"📦 {label}")
for e in entries:
head = f"🧉 {e['id']} | {e['time']} | {e['host']} | {e['size'] or 'n/a'}"
out_lines.append(head)
if e["reasons"]:
out_lines.append(" 📌 " + "; ".join(e["reasons"]))
if e["paths"]:
for p in e["paths"]:
out_lines.append(f"{p}")
out_lines.append("")
return "\n".join(out_lines).rstrip()
def _load_json(raw: str, label: str) -> tuple[bool, object | None, str]: def _load_json(raw: str, label: str) -> tuple[bool, object | None, str]:
@@ -209,6 +333,10 @@ async def cmd_backup_status(msg: Message):
async def cmd_backup_now(msg: Message): async def cmd_backup_now(msg: Message):
await schedule_backup(msg)
async def schedule_backup(msg: Message):
async def job(): async def job():
if cfg.get("safety", {}).get("dry_run", False): if cfg.get("safety", {}).get("dry_run", False):
await msg.answer("🧪 Dry-run: backup skipped", reply_markup=backup_kb) await msg.answer("🧪 Dry-run: backup skipped", reply_markup=backup_kb)
@@ -225,12 +353,24 @@ async def cmd_backup_now(msg: Message):
use_restic_env=True, use_restic_env=True,
timeout=6 * 3600, timeout=6 * 3600,
) )
await msg.answer(_format_backup_result(rc, out), reply_markup=backup_kb) kb = backup_kb
if rc != 0:
kb = InlineKeyboardMarkup(
inline_keyboard=[
[InlineKeyboardButton(text="🔁 Retry backup", callback_data="backup:retry")]
]
)
await msg.answer(_format_backup_result(rc, out), reply_markup=kb)
finally: finally:
release_lock("backup") release_lock("backup")
pos = await enqueue("backup", job) pos = await enqueue("backup", job)
await msg.answer(f"🕓 Backup queued (#{pos})", reply_markup=backup_kb) await msg.answer(f"🕓 Backup queued (#{pos})", reply_markup=backup_kb)
try:
from services.incidents import log_incident
log_incident(cfg, f"backup_queued by {msg.from_user.id}", category="backup")
except Exception:
pass
async def cmd_last_snapshot(msg: Message): async def cmd_last_snapshot(msg: Message):
@@ -328,6 +468,12 @@ async def br(msg: Message):
await cmd_backup_now(msg) await cmd_backup_now(msg)
@dp.message(F.text == "/backup_run")
async def br_cmd(msg: Message):
if is_admin_msg(msg):
await schedule_backup(msg)
@dp.message(F.text == "🧪 Restic check") @dp.message(F.text == "🧪 Restic check")
async def rc(msg: Message): async def rc(msg: Message):
if not is_admin_msg(msg): if not is_admin_msg(msg):
@@ -340,7 +486,14 @@ async def rc(msg: Message):
use_restic_env=True, use_restic_env=True,
timeout=6 * 3600, timeout=6 * 3600,
) )
await msg.answer(("✅ OK\n" if rc2 == 0 else "❌ FAIL\n") + out, reply_markup=backup_kb) kb = backup_kb
if rc2 != 0:
kb = InlineKeyboardMarkup(
inline_keyboard=[
[InlineKeyboardButton(text="🔁 Retry restic check", callback_data="backup:retry_check")]
]
)
await msg.answer(("✅ OK\n" if rc2 == 0 else "❌ FAIL\n") + out, reply_markup=kb)
pos = await enqueue("restic-check", job) pos = await enqueue("restic-check", job)
await msg.answer(f"🕓 Restic check queued (#{pos})", reply_markup=backup_kb) await msg.answer(f"🕓 Restic check queued (#{pos})", reply_markup=backup_kb)
@@ -368,3 +521,55 @@ async def wr(msg: Message):
async def rh(msg: Message): async def rh(msg: Message):
if is_admin_msg(msg): if is_admin_msg(msg):
await msg.answer(restore_help(), reply_markup=backup_kb) await msg.answer(restore_help(), reply_markup=backup_kb)
@dp.message(F.text == "📜 History")
@dp.message(F.text == "/backup_history")
async def backup_history(msg: Message):
if not is_admin_msg(msg):
return
log_path = "/var/log/backup-auto.log"
content = _tail(log_path, lines=160)
if content.startswith("⚠️"):
await msg.answer(content, reply_markup=backup_kb)
return
pretty = _beautify_restic_forget(content)
trimmed = False
max_len = 3500
if len(content) > max_len:
content = content[-max_len:]
trimmed = True
header = "📜 Backup history (tail)"
if trimmed:
header += " (trimmed)"
if pretty:
await msg.answer(f"{header}\n`{log_path}`\n\n{pretty}", reply_markup=backup_kb)
else:
await msg.answer(
f"{header}\n`{log_path}`\n```\n{content}\n```",
reply_markup=backup_kb,
parse_mode="Markdown",
)
@dp.message(F.text == "/queue_history")
async def queue_history(msg: Message):
if not is_admin_msg(msg):
return
await msg.answer(format_history(), reply_markup=backup_kb)
@dp.callback_query(F.data == "backup:retry")
async def backup_retry(cb: CallbackQuery):
if not is_admin_cb(cb):
return
await cb.answer("Queuing backup…")
await schedule_backup(cb.message)
@dp.callback_query(F.data == "backup:retry_check")
async def backup_retry_check(cb: CallbackQuery):
if not is_admin_cb(cb):
return
await cb.answer("Queuing restic check…")
await rc(cb.message)

View File

@@ -2,8 +2,9 @@ import json
import time import time
from aiogram import F from aiogram import F
from aiogram.types import CallbackQuery, InlineKeyboardMarkup, InlineKeyboardButton from aiogram.types import CallbackQuery, InlineKeyboardMarkup, InlineKeyboardButton
from app import dp, ADMIN_ID from app import dp, ADMIN_ID, cfg
from services.docker import docker_cmd from services.docker import docker_cmd
from services.incidents import log_incident
from services.runner import run_cmd from services.runner import run_cmd
from state import DOCKER_MAP, LOG_FILTER_PENDING from state import DOCKER_MAP, LOG_FILTER_PENDING
from handlers.backup import cmd_backup_status from handlers.backup import cmd_backup_status
@@ -14,8 +15,15 @@ async def docker_callback(cb: CallbackQuery):
if cb.from_user.id != ADMIN_ID: if cb.from_user.id != ADMIN_ID:
return return
_, action, alias = cb.data.split(":", 2) try:
real = DOCKER_MAP[alias] _, action, alias = cb.data.split(":", 2)
except ValueError:
await cb.answer("Bad request")
return
real = DOCKER_MAP.get(alias)
if not real:
await cb.answer("Container not found")
return
if action == "restart": if action == "restart":
await cb.answer("Restarting…") await cb.answer("Restarting…")
@@ -25,6 +33,10 @@ async def docker_callback(cb: CallbackQuery):
f"🔄 **{alias} restarted**\n```{out}```", f"🔄 **{alias} restarted**\n```{out}```",
parse_mode="Markdown" parse_mode="Markdown"
) )
try:
log_incident(cfg, f"docker_restart {alias}", category="docker")
except Exception:
pass
elif action == "logs": elif action == "logs":
await cb.answer() await cb.answer()
@@ -55,7 +67,7 @@ async def snapshot_details(cb: CallbackQuery):
snap_id = cb.data.split(":", 1)[1] snap_id = cb.data.split(":", 1)[1]
await cb.answer("Loading snapshot…") await cb.answer("Loading snapshot…")
# получаем статистику snapshot # получаем статистику snapshot
rc, raw = await run_cmd( rc, raw = await run_cmd(
["restic", "stats", snap_id, "--json"], ["restic", "stats", snap_id, "--json"],
use_restic_env=True, use_restic_env=True,

View File

@@ -1,11 +1,13 @@
from aiogram import F from aiogram import F
from aiogram.types import Message from aiogram.types import Message
from app import dp from app import dp, cfg
from auth import is_admin_msg from auth import is_admin_msg
from keyboards import docker_kb, docker_inline_kb from keyboards import docker_kb, docker_inline_kb
from services.docker import container_uptime, docker_cmd from services.docker import container_uptime, docker_cmd
from services.incidents import log_incident
from state import DOCKER_MAP, LOG_FILTER_PENDING from state import DOCKER_MAP, LOG_FILTER_PENDING
import time import time
import json
async def cmd_docker_status(msg: Message): async def cmd_docker_status(msg: Message):
@@ -42,7 +44,7 @@ async def cmd_docker_status(msg: Message):
lines.append(f"{icon} {alias}: {status} ({up})") lines.append(f"{icon} {alias}: {status} ({up})")
await msg.answer("\n".join(lines), reply_markup=docker_kb) await msg.answer("\n".join(lines), reply_markup=docker_kb)
log_incident(cfg, f"docker_status by {msg.from_user.id}", category="docker")
except Exception as e: except Exception as e:
# ⬅️ КРИТИЧЕСКИ ВАЖНО # ⬅️ КРИТИЧЕСКИ ВАЖНО
await msg.answer( await msg.answer(
@@ -77,6 +79,82 @@ async def ds(msg: Message):
await cmd_docker_status(msg) await cmd_docker_status(msg)
@dp.message(F.text == "/docker_status")
async def ds_cmd(msg: Message):
if is_admin_msg(msg):
await cmd_docker_status(msg)
@dp.message(F.text, F.func(lambda m: (m.text or "").split()[0] == "/docker_health"))
async def docker_health(msg: Message):
if not is_admin_msg(msg):
return
parts = msg.text.split()
if len(parts) < 2:
await msg.answer("Usage: /docker_health <alias>")
return
alias = parts[1]
real = DOCKER_MAP.get(alias)
if not real:
await msg.answer(f"⚠️ Unknown container: {alias}", reply_markup=docker_kb)
return
rc, out = await docker_cmd(["inspect", "-f", "{{json .State.Health}}", real], timeout=10)
if rc != 0 or not out.strip():
await msg.answer(f"⚠️ Failed to get health for {alias}", reply_markup=docker_kb)
return
try:
data = json.loads(out)
except json.JSONDecodeError:
await msg.answer(f"⚠️ Invalid health JSON for {alias}", reply_markup=docker_kb)
return
status = data.get("Status", "n/a")
fail = data.get("FailingStreak", "n/a")
logs = data.get("Log") or []
lines = [f"🐳 {alias} health", f"Status: {status}", f"Failing streak: {fail}"]
if logs:
lines.append("Recent logs:")
for entry in logs[-5:]:
if not isinstance(entry, dict):
continue
ts = entry.get("Start") or entry.get("End") or ""
exitc = entry.get("ExitCode", "")
out_line = entry.get("Output", "").strip()
lines.append(f"- {ts} rc={exitc} {out_line}")
await msg.answer("\n".join(lines), reply_markup=docker_kb)
log_incident(cfg, f"docker_health alias={alias} by {msg.from_user.id}", category="docker")
@dp.message(F.text == "/docker_health_summary")
async def docker_health_summary(msg: Message):
if not is_admin_msg(msg):
return
if not DOCKER_MAP:
await msg.answer("⚠️ DOCKER_MAP пуст", reply_markup=docker_kb)
return
problems = []
total = len(DOCKER_MAP)
for alias, real in DOCKER_MAP.items():
rc, out = await docker_cmd(["inspect", "-f", "{{json .State}}", real], timeout=10)
if rc != 0:
problems.append(f"{alias}: inspect error")
continue
try:
state = json.loads(out)
except Exception:
problems.append(f"{alias}: bad JSON")
continue
status = state.get("Status", "n/a")
health = (state.get("Health") or {}).get("Status", "n/a")
if status != "running" or health not in ("healthy", "none"):
problems.append(f"{alias}: {status}/{health}")
ok = total - len(problems)
lines = [f"🐳 Docker health: 🟢 {ok}/{total} healthy, 🔴 {len(problems)} issues"]
if problems:
lines.append("Problems:")
lines.extend([f"- {p}" for p in problems])
await msg.answer("\n".join(lines), reply_markup=docker_kb)
@dp.message(F.text == "📈 Stats") @dp.message(F.text == "📈 Stats")
async def dstats(msg: Message): async def dstats(msg: Message):
if not is_admin_msg(msg): if not is_admin_msg(msg):

View File

@@ -13,6 +13,7 @@ HELP_PAGES = [
"📊 *Статус* — общая загрузка.\n" "📊 *Статус* — общая загрузка.\n"
"📋 */status_short* — кратко (load/RAM/диски).\n" "📋 */status_short* — кратко (load/RAM/диски).\n"
"🩺 */health_short* — краткий health.\n" "🩺 */health_short* — краткий health.\n"
"🧪 */selftest* — health + restic snapshot probe.\n"
"🔧 Разделы: Docker, Backup, Artifacts, System, OpenWrt.", "🔧 Разделы: Docker, Backup, Artifacts, System, OpenWrt.",
), ),
( (
@@ -22,15 +23,19 @@ HELP_PAGES = [
"• `/alerts test <critical|warn|info>`\n" "• `/alerts test <critical|warn|info>`\n"
"• `/alerts mute <cat> <minutes>` / `/alerts unmute <cat>` / `/alerts list`\n" "• `/alerts mute <cat> <minutes>` / `/alerts unmute <cat>` / `/alerts list`\n"
"• `/alerts recent [hours]`\n" "• `/alerts recent [hours]`\n"
"Категории: load, disk, smart, ssl, docker, test.\n" "Шорткаты: `/alerts_list`, `/alerts_recent`, `/alerts_mute_load` (60м).\n"
"Категории: load, disk, smart, raid, ssl, docker, test.\n"
"Quiet hours: `alerts.quiet_hours` для не‑критичных.\n" "Quiet hours: `alerts.quiet_hours` для не‑критичных.\n"
"Авто-мьют: `alerts.auto_mute` со слотами времени.\n"
"Только красные load: `alerts.load_only_critical: true`.\n" "Только красные load: `alerts.load_only_critical: true`.\n"
"Валидатор конфига: `/config_check`.", "Валидатор конфига: `/config_check`.",
), ),
( (
"Backup", "Backup",
"💾 **Backup (restic)**\n\n" "💾 **Backup (restic)**\n\n"
"Кнопки: Status, Last snapshot, Repo stats, Run backup, Queue, Restic check, Weekly report.\n" "Кнопки: Status, Last snapshot, Repo stats, Run backup, Queue, Restic check, Weekly report, History.\n"
"History — хвост `/var/log/backup-auto.log`.\n"
"Fail → кнопка Retry (backup/check).\n"
"Run backup/Check учитывают `safety.dry_run`.\n" "Run backup/Check учитывают `safety.dry_run`.\n"
"После бэкапа приходит TL;DR + путь к логу `/var/log/backup-auto.log`.\n" "После бэкапа приходит TL;DR + путь к логу `/var/log/backup-auto.log`.\n"
"Queue → Details показывает отложенные задачи.", "Queue → Details показывает отложенные задачи.",
@@ -38,17 +43,28 @@ HELP_PAGES = [
( (
"Docker & System", "Docker & System",
"🐳 **Docker**\n" "🐳 **Docker**\n"
"Status/Restart/Logs/Stats — клавиатура Docker.\n\n" "Status/Restart/Logs/Stats — клавиатура Docker.\n"
"Команды: `/docker_status`, `/docker_health <alias>`.\n\n"
"🖥 **System**\n" "🖥 **System**\n"
"Info: Disks/Security/Metrics/Hardware/SMART/OpenWrt.\n" "Info: Disks/Security/Metrics/Hardware/SMART/OpenWrt.\n"
"Ops: Updates/Upgrade/Reboot.\n" "Ops: Updates/Upgrade/Reboot.\n"
"Logs: Audit/Incidents/Security/Integrations/Processes.", "Logs: Audit/Incidents/Security/Integrations/Processes.\n"
"OpenWrt: `/openwrt`, `/openwrt_wan`, `/openwrt_clients`, `/openwrt_leases`.",
), ),
( (
"Admin", "Admin",
"🛠 **Admin & Deploy**\n\n" "🛠 **Admin & Deploy**\n\n"
"Config: `/config_check`, файл `config.yaml` (см. config.example.yaml).\n" "Config: `/config_check`, файл `config.yaml` (см. config.example.yaml).\n"
"Deploy: `deploy.sh` (ssh 10.10.10.10:1090 → git pull → systemctl restart tg-bot).\n" "Deploy: `deploy.sh` (ssh 10.10.10.10:1090 → git pull → systemctl restart tg-bot).\n"
"Incidents: `/incidents_summary`, `/incidents_diff [hours]`.\n"
"Export: `/incidents_export [hours] [csv|json]`, `/export_all [hours]` (zip).\n"
"Alerts log/heatmap: `/alerts_log [hours]`, `/alerts_heatmap [hours] [cat]`.\n"
"Backup SLA: `/backup_sla`; Docker restarts: `/docker_restarts [hours]`.\n"
"Disk snapshot: `/disk_snapshot`.\n"
"Queue: `/queue_history`, `/queue_sla`.\n"
"Self-test history: `/selftest_history`.\n"
"OpenWrt leases diff: `/openwrt_leases_diff`.\n"
"BotFather list: `/botfather_list`.\n"
"Безопасность: `safety.dry_run: true` блокирует опасные действия.\n" "Безопасность: `safety.dry_run: true` блокирует опасные действия.\n"
"OpenWrt: кнопка в System → Info.", "OpenWrt: кнопка в System → Info.",
), ),
@@ -70,7 +86,7 @@ def _help_text(idx: int) -> str:
return body return body
@dp.message(F.text.in_({" Help", " Help", "Help"})) @dp.message(F.text.in_({" Help", " Help", "Help", "/help"}))
async def help_cmd(msg: Message): async def help_cmd(msg: Message):
if not is_admin_msg(msg): if not is_admin_msg(msg):
return return
@@ -103,3 +119,46 @@ async def help_cb(cb: CallbackQuery):
parse_mode="Markdown", parse_mode="Markdown",
) )
await cb.answer() await cb.answer()
BOTFATHER_LIST = """\
help - Show help pages
status_short - Compact host status
health_short - Compact health report
selftest - Health + restic snapshot probe
alerts - Manage alerts
alerts_list - List active mutes
alerts_recent - Show recent incidents (24h)
alerts_mute_load - Mute load alerts for 60m
alerts_log - Show suppressed alerts
alerts_heatmap - Hourly incidents heatmap
backup_run - Run backup (queued)
backup_history - Show backup log tail
queue_history - Show queue recent jobs
queue_sla - Queue SLA stats
docker_status - Docker summary
docker_health - Docker inspect/health by alias
docker_health_summary - Docker health summary (problems only)
openwrt - Full OpenWrt status
openwrt_wan - OpenWrt WAN only
openwrt_clients - OpenWrt wifi clients
openwrt_leases - OpenWrt DHCP leases
openwrt_fast - OpenWrt quick WAN view
openwrt_leases_diff - OpenWrt DHCP diff
incidents_summary - Incidents counters (24h/7d)
incidents_export - Export incidents (hours fmt)
incidents_diff - Show incidents since last check
export_all - Zip with incidents/queue/selftest
backup_sla - Backup SLA check
docker_restarts - Docker restart history
selftest_history - Self-test history
disk_snapshot - Disk usage snapshot
config_check - Validate config
"""
@dp.message(F.text == "/botfather_list")
async def botfather_list(msg: Message):
if not is_admin_msg(msg):
return
await msg.answer(f"Commands for BotFather:\n```\n{BOTFATHER_LIST}\n```", parse_mode="Markdown")

View File

@@ -1,4 +1,5 @@
import asyncio import asyncio
import json
import socket import socket
import time import time
import psutil import psutil
@@ -10,6 +11,8 @@ from keyboards import menu_kb
from services.system import format_disks from services.system import format_disks
from services.health import health from services.health import health
from state import DOCKER_MAP from state import DOCKER_MAP
from services.runner import run_cmd_full
from services.selftest import run_selftest
async def cmd_status(msg: Message): async def cmd_status(msg: Message):
@@ -115,6 +118,20 @@ async def health_short(msg: Message):
await msg.answer(f"🩺 Health (short)\n{brief}", reply_markup=menu_kb) await msg.answer(f"🩺 Health (short)\n{brief}", reply_markup=menu_kb)
@dp.message(F.text.in_({"🧪 Self-test", "/selftest"}))
async def selftest(msg: Message):
if not is_admin_msg(msg):
return
await msg.answer("⏳ Self-test…", reply_markup=menu_kb)
async def worker():
text, _ok = await run_selftest(cfg, DOCKER_MAP)
await msg.answer(text, reply_markup=menu_kb)
asyncio.create_task(worker())
def _rate_str(value: float) -> str: def _rate_str(value: float) -> str:
if value >= 1024 * 1024: if value >= 1024 * 1024:
return f"{value / (1024 * 1024):.2f} MiB/s" return f"{value / (1024 * 1024):.2f} MiB/s"

View File

@@ -1,8 +1,9 @@
import asyncio import asyncio
import os import os
from datetime import datetime, timezone, timedelta
from aiogram import F from aiogram import F
from aiogram.types import Message, CallbackQuery, InlineKeyboardMarkup, InlineKeyboardButton from aiogram.types import Message, CallbackQuery, InlineKeyboardMarkup, InlineKeyboardButton, InputFile, BufferedInputFile
from app import dp, cfg from app import dp, cfg, ADMIN_IDS
from auth import is_admin_msg from auth import is_admin_msg
from keyboards import ( from keyboards import (
system_info_kb, system_info_kb,
@@ -10,21 +11,31 @@ from keyboards import (
system_logs_audit_kb, system_logs_audit_kb,
system_logs_security_kb, system_logs_security_kb,
system_logs_integrations_kb, system_logs_integrations_kb,
system_logs_kb,
openwrt_kb,
docker_kb, backup_kb,
) )
from system_checks import security, disks, hardware, list_disks, smart_last_test from system_checks import security, disks, hardware, list_disks, smart_last_test
from services.http_checks import get_url_checks, check_url from services.http_checks import get_url_checks, check_url
from services.queue import enqueue from services.queue import enqueue, get_history_raw, get_stats
from services.updates import list_updates, apply_updates from services.updates import list_updates, apply_updates
from services.runner import run_cmd from services.runner import run_cmd, run_cmd_full
from services.npmplus import fetch_certificates, format_certificates, list_proxy_hosts, set_proxy_host from services.npmplus import fetch_certificates, format_certificates, list_proxy_hosts, set_proxy_host
from services.gitea import get_gitea_health from services.gitea import get_gitea_health
from services.openwrt import get_openwrt_status from services.openwrt import get_openwrt_status, fetch_openwrt_leases
from services.system import worst_disk_usage
import state import state
from state import UPDATES_CACHE, REBOOT_PENDING from state import UPDATES_CACHE, REBOOT_PENDING
from services.metrics import summarize from services.metrics import summarize
from services.audit import read_audit_tail from services.audit import read_audit_tail
from services.incidents import read_recent, incidents_path from services.incidents import read_recent, incidents_path, read_raw, infer_category
from services.external_checks import format_report from services.external_checks import format_report
from services.disk_report import build_disk_report
from services import runtime_state
import io
import json
import csv
import zipfile
@dp.message(F.text == "💽 Disks") @dp.message(F.text == "💽 Disks")
@@ -195,19 +206,91 @@ async def smart_status(msg: Message):
await msg.answer("\n".join(lines), reply_markup=system_info_kb) await msg.answer("\n".join(lines), reply_markup=system_info_kb)
@dp.message(F.text == "📡 OpenWrt") @dp.message(F.text.in_({"/openwrt", "📡 Full status"}))
async def openwrt_status(msg: Message): async def openwrt_status(msg: Message):
if not is_admin_msg(msg): if not is_admin_msg(msg):
return return
await msg.answer("⏳ Checking OpenWrt…", reply_markup=system_info_kb) await msg.answer("⏳ Checking OpenWrt…", reply_markup=openwrt_kb)
async def worker(): async def worker():
try: try:
text = await get_openwrt_status(cfg) text = await get_openwrt_status(cfg)
except Exception as e: except Exception as e:
text = f"⚠️ OpenWrt error: {e}" text = f"⚠️ OpenWrt error: {e}"
await msg.answer(text, reply_markup=system_info_kb) await msg.answer(text, reply_markup=openwrt_kb)
asyncio.create_task(worker())
@dp.message(F.text == "/openwrt_wan")
async def openwrt_wan(msg: Message):
if not is_admin_msg(msg):
return
await msg.answer("⏳ Checking OpenWrt WAN…", reply_markup=openwrt_kb)
async def worker():
try:
text = await get_openwrt_status(cfg, mode="wan")
except Exception as e:
text = f"⚠️ OpenWrt error: {e}"
await msg.answer(text, reply_markup=openwrt_kb)
asyncio.create_task(worker())
@dp.message(F.text == "📡 OpenWrt")
async def openwrt_menu(msg: Message):
if not is_admin_msg(msg):
return
await msg.answer("📡 OpenWrt menu", reply_markup=openwrt_kb)
@dp.message(F.text.in_({"/openwrt_clients", "📶 Wi-Fi clients"}))
async def openwrt_clients(msg: Message):
if not is_admin_msg(msg):
return
await msg.answer("⏳ Checking OpenWrt clients…", reply_markup=openwrt_kb)
async def worker():
try:
text = await get_openwrt_status(cfg, mode="clients")
except Exception as e:
text = f"⚠️ OpenWrt error: {e}"
await msg.answer(text, reply_markup=openwrt_kb)
asyncio.create_task(worker())
@dp.message(F.text.in_({"/openwrt_leases", "🧾 Leases"}))
async def openwrt_leases(msg: Message):
if not is_admin_msg(msg):
return
await msg.answer("⏳ Checking OpenWrt leases…", reply_markup=openwrt_kb)
async def worker():
try:
text = await get_openwrt_status(cfg, mode="leases")
except Exception as e:
text = f"⚠️ OpenWrt error: {e}"
await msg.answer(text, reply_markup=openwrt_kb)
asyncio.create_task(worker())
@dp.message(F.text == "/openwrt_fast")
@dp.message(F.text == "🌐 WAN fast")
async def openwrt_fast(msg: Message):
if not is_admin_msg(msg):
return
await msg.answer("⏳ OpenWrt fast…", reply_markup=openwrt_kb)
async def worker():
try:
text = await get_openwrt_status(cfg, mode="wan")
except Exception as e:
text = f"⚠️ OpenWrt error: {e}"
await msg.answer(text, reply_markup=openwrt_kb)
asyncio.create_task(worker()) asyncio.create_task(worker())
@@ -253,6 +336,512 @@ async def incidents(msg: Message):
await msg.answer(text, reply_markup=system_logs_audit_kb, parse_mode="Markdown") await msg.answer(text, reply_markup=system_logs_audit_kb, parse_mode="Markdown")
@dp.message(F.text == "🧾 Incidents")
async def incidents_entry(msg: Message):
if not is_admin_msg(msg):
return
await msg.answer(
"📣 Incidents menu:\n"
"- Summary\n- Diff\n- Heatmap\n- Export/All\n- Alerts log",
reply_markup=system_logs_audit_kb,
)
@dp.message(F.text.in_({"/incidents_summary", "📣 Summary"}))
async def incidents_summary(msg: Message):
if not is_admin_msg(msg):
return
last_24h = read_raw(cfg, hours=24, limit=2000)
last_7d = read_raw(cfg, hours=24 * 7, limit=4000)
def summarize(items):
total = len(items)
cats = {}
suppressed = {}
last_seen = {}
for dt, msg in items:
cat = infer_category(msg) or "n/a"
cats[cat] = cats.get(cat, 0) + 1
last_seen[cat] = dt
if "[suppressed" in msg.lower():
suppressed[cat] = suppressed.get(cat, 0) + 1
def fmt_top(d):
return ", ".join(f"{k}:{v}" for k, v in sorted(d.items(), key=lambda x: x[1], reverse=True)[:5]) or "n/a"
top = fmt_top(cats)
top_supp = fmt_top(suppressed)
last_parts = []
for k, dt in sorted(last_seen.items(), key=lambda x: x[1], reverse=True)[:5]:
last_parts.append(f"{k}:{dt.astimezone().strftime('%Y-%m-%d %H:%M')}")
last_str = ", ".join(last_parts) or "n/a"
return total, top, top_supp, last_str
t24, top24, supp24, last24 = summarize(last_24h)
t7, top7, supp7, last7 = summarize(last_7d)
text = (
"📣 Incidents summary\n\n"
f"24h: {t24} (top: {top24}; suppressed: {supp24}; last: {last24})\n"
f"7d: {t7} (top: {top7}; suppressed: {supp7}; last: {last7})"
)
await msg.answer(text, reply_markup=system_logs_kb)
@dp.message(F.text.startswith("/incidents_diff"))
@dp.message(F.text == "🆕 Diff")
async def incidents_diff(msg: Message):
if not is_admin_msg(msg):
return
try:
parts = msg.text.split()
hours = 24
reset = False
if len(parts) >= 2:
if parts[1].lower() in {"reset", "clear"}:
reset = True
else:
try:
hours = max(1, int(parts[1]))
except ValueError:
hours = 24
if reset:
runtime_state.set_state("incidents_diff_last_ts", None)
await msg.answer(
"📣 Diff marker reset. Next run will show all within window.",
reply_markup=system_logs_audit_kb,
)
return
last_iso = runtime_state.get("incidents_diff_last_ts")
last_dt = None
if isinstance(last_iso, str):
try:
last_dt = datetime.fromisoformat(last_iso)
except Exception:
last_dt = None
rows = read_raw(cfg, hours=hours, limit=5000, include_old=True)
def collect(from_dt):
fresh: list[tuple[datetime, str]] = []
for dt, line in rows:
if from_dt and dt <= from_dt:
continue
fresh.append((dt, line))
return fresh
fresh = collect(last_dt)
# auto-reset if marker is ahead of all rows
if not fresh and last_dt and rows and last_dt >= rows[-1][0]:
last_dt = None
runtime_state.set_state("incidents_diff_last_ts", None)
fresh = collect(None)
def fmt_lines(items: list[tuple[datetime, str]], limit_chars: int = 3500) -> str:
buf = []
total_len = 0
for dt, line in items:
entry = f"{dt.astimezone().strftime('%m-%d %H:%M')} {line}"
if total_len + len(entry) + 1 > limit_chars:
break
buf.append(entry)
total_len += len(entry) + 1
return "\n".join(buf)
if not fresh:
note = f"since {last_dt.astimezone().strftime('%Y-%m-%d %H:%M')}" if last_dt else "for the period"
if rows:
sample = rows[-50:][::-1] # newest first
body = fmt_lines(sample)
await msg.answer(
f"📣 No new incidents {note} (window {hours}h).\nRecent sample:\n```\n{body}\n```",
reply_markup=system_logs_audit_kb,
parse_mode="Markdown",
)
else:
await msg.answer(
f"📣 No new incidents {note} (window {hours}h)",
reply_markup=system_logs_audit_kb,
)
return
fresh.sort(key=lambda x: x[0], reverse=True)
body = fmt_lines(fresh)
await msg.answer(
f"📣 New incidents (since last mark, window {hours}h): {len(fresh)}\n```\n{body}\n```",
reply_markup=system_logs_audit_kb,
parse_mode="Markdown",
)
try:
runtime_state.set_state("incidents_diff_last_ts", fresh[0][0].isoformat())
except Exception:
pass
except Exception as e:
await msg.answer(f"⚠️ Diff error: {type(e).__name__}: {e}", reply_markup=system_logs_audit_kb)
@dp.message(F.text.startswith("/alerts_heatmap"))
@dp.message(F.text == "🔥 Heatmap")
async def alerts_heatmap(msg: Message):
if not is_admin_msg(msg):
return
hours = 48
category = None
if msg.text.startswith("/alerts_heatmap"):
parts = msg.text.split()
for part in parts[1:]:
if part.isdigit():
hours = max(1, int(part))
else:
category = part.lower()
rows = read_raw(cfg, hours=hours, limit=8000, include_old=True)
buckets: dict[datetime, int] = {}
for dt, line in rows:
cat = infer_category(line) or "n/a"
if category and cat != category:
continue
local = dt.astimezone()
hour = local.replace(minute=0, second=0, microsecond=0)
buckets[hour] = buckets.get(hour, 0) + 1
if not buckets:
await msg.answer(f"⚠️ No incidents for heatmap (hours={hours}, category={category or 'any'})")
return
start = datetime.now().astimezone() - timedelta(hours=hours - 1)
timeline = []
current = start.replace(minute=0, second=0, microsecond=0)
while current <= datetime.now().astimezone():
timeline.append((current, buckets.get(current, 0)))
current += timedelta(hours=1)
max_count = max(c for _, c in timeline) or 1
lines = [f"📊 Alerts heatmap (hours={hours}, category={category or 'any'})"]
for ts, cnt in timeline[-72:]:
bar_len = max(1, int(cnt / max_count * 12)) if cnt else 0
bar = "" * bar_len if cnt else ""
lines.append(f"{ts:%m-%d %H}: {cnt:>3} {bar}")
await msg.answer("\n".join(lines), reply_markup=system_logs_audit_kb)
@dp.message(F.text == "/disk_snapshot")
async def disk_snapshot(msg: Message):
if not is_admin_msg(msg):
return
usage, mount = worst_disk_usage()
mount = mount or "/"
try:
report = await build_disk_report(cfg, mount, usage or 0)
except Exception as e:
await msg.answer(f"⚠️ Disk snapshot error: {e}")
return
await msg.answer(f"💽 Disk snapshot ({mount})\n\n{report}", reply_markup=system_info_kb)
@dp.message(F.text.startswith("/alerts_log"))
async def alerts_log(msg: Message):
if not is_admin_msg(msg):
return
parts = msg.text.split()
hours = 24
if len(parts) >= 2:
try:
hours = max(1, int(parts[1]))
except ValueError:
hours = 24
rows = read_raw(cfg, hours=hours, limit=2000)
suppressed = [(dt, m) for dt, m in rows if "[suppressed" in m.lower()]
sent = [(dt, m) for dt, m in rows if "[suppressed" not in m.lower()]
lines = [f"📣 Alerts log ({hours}h)"]
lines.append(f"Sent: {len(sent)}, Suppressed: {len(suppressed)}")
if suppressed:
lines.append("\nSuppressed:")
for dt, m in suppressed[-20:]:
lines.append(f"{dt:%m-%d %H:%M} {m}")
await msg.answer("\n".join(lines), reply_markup=system_logs_audit_kb)
@dp.message(F.text.startswith("/incidents_export"))
@dp.message(F.text == "📤 Export")
async def incidents_export(msg: Message):
if not is_admin_msg(msg):
return
parts = msg.text.split()
hours = 24
fmt = "csv"
if len(parts) >= 2:
try:
hours = max(1, int(parts[1]))
except ValueError:
hours = 24
if len(parts) >= 3:
fmt = parts[2].lower()
rows = read_raw(cfg, hours=hours, limit=20000, include_old=False)
data = []
for dt, msg_line in rows:
data.append({
"timestamp": dt.astimezone().isoformat(),
"category": infer_category(msg_line) or "n/a",
"message": msg_line,
})
if fmt == "json":
payload = json.dumps(data, ensure_ascii=False, indent=2)
file_bytes = payload.encode("utf-8")
fname = f"incidents_{hours}h.json"
else:
sio = io.StringIO()
writer = csv.DictWriter(sio, fieldnames=["timestamp", "category", "message"], delimiter=";")
writer.writeheader()
for row in data:
writer.writerow(row)
file_bytes = sio.getvalue().encode("utf-8")
fname = f"incidents_{hours}h.csv"
summary = f"📤 Incidents export ({hours}h): {len(data)} rows, format {fmt}"
await msg.answer(summary)
await msg.answer_document(document=BufferedInputFile(file_bytes, filename=fname))
@dp.message(F.text.in_({"/backup_sla", "📉 Backup SLA", "Backup SLA"}))
@dp.message(F.text.contains("Backup SLA"))
@dp.message(F.text.regexp(r"(?i)backup.*sla|sla.*backup"))
@dp.message(F.text.func(lambda t: isinstance(t, str) and "backup" in t.lower() and "sla" in t.lower()))
async def backup_sla(msg: Message):
if not is_admin_msg(msg):
return
await msg.answer("⏳ Checking Backup SLA…", reply_markup=backup_kb)
rc, out = await run_cmd_full(["restic", "snapshots", "--json"], use_restic_env=True, timeout=40)
if rc != 0:
await msg.answer(f"⚠️ Restic error: {out.strip() or rc}", reply_markup=backup_kb)
return
try:
snaps = json.loads(out)
except Exception as e:
await msg.answer(f"⚠️ Invalid restic JSON: {e}", reply_markup=backup_kb)
return
if not isinstance(snaps, list) or not snaps:
await msg.answer("⚠️ No snapshots found", reply_markup=backup_kb)
return
snaps.sort(key=lambda s: s.get("time", ""), reverse=True)
last_time_raw = snaps[0].get("time")
if last_time_raw:
last_dt = datetime.fromisoformat(last_time_raw.replace("Z", "+00:00"))
age_h = (datetime.now(tz=last_dt.tzinfo) - last_dt).total_seconds() / 3600
else:
last_dt = None
age_h = None
sla = float(cfg.get("backup", {}).get("sla_hours", 26))
if age_h is None:
status = "🟡"
elif age_h <= sla:
status = "🟢"
elif age_h <= sla * 1.5:
status = "🟡"
else:
status = "🔴"
last_str = last_dt.astimezone().strftime("%Y-%m-%d %H:%M") if last_dt else "n/a"
age_str = f"{age_h:.1f}h" if age_h is not None else "n/a"
await msg.answer(
f"{status} Backup SLA\n"
f"Snapshots: {len(snaps)}\n"
f"Last: {last_str} (age {age_str})\n"
f"SLA: {sla}h",
reply_markup=backup_kb,
)
@dp.message(F.text.startswith("/docker_restarts"))
@dp.message(F.text == "♻️ Restarts")
async def docker_restarts(msg: Message):
if not is_admin_msg(msg):
return
parts = msg.text.split()
hours = 168
if len(parts) >= 2:
try:
hours = max(1, int(parts[1]))
except ValueError:
hours = 168
rows = read_raw(cfg, hours=hours, limit=5000, include_old=True)
restarts = [(dt, line) for dt, line in rows if "docker_restart" in line]
if not restarts:
await msg.answer(f"🐳 No docker restarts in last {hours}h", reply_markup=docker_kb)
return
counts: dict[str, int] = {}
for _dt, line in restarts:
parts_line = line.split()
if len(parts_line) >= 2:
alias = parts_line[-1]
else:
alias = "unknown"
counts[alias] = counts.get(alias, 0) + 1
top = ", ".join(f"{k}:{v}" for k, v in sorted(counts.items(), key=lambda x: x[1], reverse=True))
body = "\n".join(f"{dt.astimezone().strftime('%m-%d %H:%M')} {line}" for dt, line in restarts[-50:])
await msg.answer(
f"🐳 Docker restarts ({hours}h): {len(restarts)} ({top})\n```\n{body}\n```",
reply_markup=docker_kb,
parse_mode="Markdown",
)
@dp.message(F.text.startswith("/openwrt_leases_diff"))
@dp.message(F.text == "🔀 Leases diff")
async def openwrt_leases_diff(msg: Message):
if not is_admin_msg(msg):
return
await msg.answer("⏳ OpenWrt leases diff…", reply_markup=openwrt_kb)
async def worker():
try:
leases_now = await fetch_openwrt_leases(cfg)
except Exception as e:
await msg.answer(f"⚠️ OpenWrt error: {e}", reply_markup=openwrt_kb)
return
prev = runtime_state.get("openwrt_leases_prev", [])
if not prev:
runtime_state.set_state("openwrt_leases_prev", leases_now)
await msg.answer(f"Baseline saved ({len(leases_now)} leases)", reply_markup=openwrt_kb)
return
prev_set = set(prev)
now_set = set(leases_now)
added = sorted(list(now_set - prev_set))
removed = sorted(list(prev_set - now_set))
lines = [f"🧾 OpenWrt leases diff ({len(added)} added, {len(removed)} removed)"]
if added:
lines.append(" Added:")
lines.extend([f" - {x}" for x in added[:50]])
if removed:
lines.append(" Removed:")
lines.extend([f" - {x}" for x in removed[:50]])
if len(added) > 50 or len(removed) > 50:
lines.append("… trimmed")
runtime_state.set_state("openwrt_leases_prev", leases_now)
await msg.answer("\n".join(lines), reply_markup=openwrt_kb)
asyncio.create_task(worker())
@dp.message(F.text.in_({"/queue_sla", "📊 Queue SLA", "Queue SLA"}))
@dp.message(F.text.contains("Queue SLA"))
@dp.message(F.text.regexp(r"(?i)queue.*sla|sla.*queue"))
@dp.message(F.text.func(lambda t: isinstance(t, str) and "queue" in t.lower() and "sla" in t.lower()))
async def queue_sla(msg: Message):
if not is_admin_msg(msg):
return
await msg.answer("⏳ Calculating Queue SLA…", reply_markup=backup_kb)
hist = get_history_raw()
if not hist:
await msg.answer("🧾 Queue SLA: history is empty", reply_markup=backup_kb)
return
waits = [item.get("wait_sec", 0) for item in hist]
runs = [item.get("runtime_sec", 0) for item in hist]
def p95(values: list[int]) -> float:
if not values:
return 0.0
vals = sorted(values)
k = int(0.95 * (len(vals) - 1))
return float(vals[k])
lines = [
"🧾 Queue SLA",
f"Samples: {len(hist)}",
f"Max wait: {max(waits)}s, p95 {p95(waits):.1f}s",
f"Max run: {max(runs)}s, p95 {p95(runs):.1f}s",
]
stats = get_stats()
if stats:
lines.append(
f"Avg wait: {stats.get('avg_wait_sec', 0):.1f}s, "
f"avg run: {stats.get('avg_runtime_sec', 0):.1f}s"
)
await msg.answer("\n".join(lines), reply_markup=backup_kb)
# Fallback router: any message with "sla" is dispatched to backup or queue SLA.
@dp.message(F.text.regexp(r"(?i)sla"))
async def sla_fallback(msg: Message):
if not is_admin_msg(msg):
return
text = msg.text or ""
tl = text.lower()
if "queue" in tl:
await queue_sla(msg)
elif "backup" in tl:
await backup_sla(msg)
@dp.message(F.text == "/selftest_history")
async def selftest_history(msg: Message):
if not is_admin_msg(msg):
return
hist = runtime_state.get("selftest_history", [])
if not hist:
await msg.answer("🧪 Self-test history is empty", reply_markup=system_logs_audit_kb)
return
lines = ["🧪 Self-test history"]
for entry in hist[:15]:
ts = entry.get("ts", "")[:16]
ok = entry.get("ok", False)
emoji = "🟢" if ok else "🔴"
summary = entry.get("summary", "")
lines.append(f"{emoji} {ts} {summary}")
await msg.answer("\n".join(lines), reply_markup=system_logs_audit_kb)
@dp.message(F.text.startswith("/export_all"))
@dp.message(F.text == "📦 Export all")
async def export_all(msg: Message):
if not is_admin_msg(msg):
return
hours = 168
parts = msg.text.split()
if len(parts) >= 2:
try:
hours = max(1, int(parts[1]))
except ValueError:
hours = 168
incidents_rows = read_raw(cfg, hours=hours, limit=20000, include_old=True)
inc_sio = io.StringIO()
inc_writer = csv.writer(inc_sio, delimiter=";")
inc_writer.writerow(["timestamp", "category", "message"])
for dt, line in incidents_rows:
inc_writer.writerow([dt.astimezone().isoformat(), infer_category(line) or "n/a", line])
queue_rows = get_history_raw()
q_sio = io.StringIO()
q_writer = csv.writer(q_sio, delimiter=";")
q_writer.writerow(["finished_at", "label", "status", "wait_sec", "runtime_sec"])
for item in queue_rows:
q_writer.writerow(
[
datetime.fromtimestamp(item.get("finished_at", 0)).isoformat()
if item.get("finished_at")
else "",
item.get("label", ""),
item.get("status", ""),
item.get("wait_sec", ""),
item.get("runtime_sec", ""),
]
)
st_hist = runtime_state.get("selftest_history", [])
st_sio = io.StringIO()
st_writer = csv.writer(st_sio, delimiter=";")
st_writer.writerow(["timestamp", "ok", "summary"])
for entry in st_hist:
st_writer.writerow([entry.get("ts", ""), entry.get("ok", False), entry.get("summary", "")])
mem_zip = io.BytesIO()
with zipfile.ZipFile(mem_zip, "w", compression=zipfile.ZIP_DEFLATED) as zf:
zf.writestr(f"incidents_{hours}h.csv", inc_sio.getvalue())
zf.writestr("queue_history.csv", q_sio.getvalue())
zf.writestr("selftest_history.csv", st_sio.getvalue())
mem_zip.seek(0)
summary = (
f"📦 Export all\n"
f"Incidents: {len(incidents_rows)} rows ({hours}h)\n"
f"Queue history: {len(queue_rows)} rows\n"
f"Selftest: {len(st_hist)} rows"
)
await msg.answer(summary)
await msg.answer_document(document=BufferedInputFile(mem_zip.read(), filename="export_all.zip"))
@dp.message(F.text == "🔒 SSL") @dp.message(F.text == "🔒 SSL")
async def ssl_certs(msg: Message): async def ssl_certs(msg: Message):
if not is_admin_msg(msg): if not is_admin_msg(msg):
@@ -433,7 +1022,7 @@ async def updates_page(cb: CallbackQuery):
@dp.callback_query(F.data == "upgrade:confirm") @dp.callback_query(F.data == "upgrade:confirm")
async def upgrade_confirm(cb: CallbackQuery): async def upgrade_confirm(cb: CallbackQuery):
if cb.from_user.id != cfg["telegram"]["admin_id"]: if cb.from_user.id not in ADMIN_IDS:
return return
await cb.answer() await cb.answer()
@@ -456,7 +1045,7 @@ async def upgrade_cancel(cb: CallbackQuery):
@dp.callback_query(F.data == "reboot:confirm") @dp.callback_query(F.data == "reboot:confirm")
async def reboot_confirm(cb: CallbackQuery): async def reboot_confirm(cb: CallbackQuery):
if cb.from_user.id != cfg["telegram"]["admin_id"]: if cb.from_user.id not in ADMIN_IDS:
return return
await cb.answer() await cb.answer()
REBOOT_PENDING[cb.from_user.id] = {} REBOOT_PENDING[cb.from_user.id] = {}
@@ -471,7 +1060,7 @@ async def reboot_cancel(cb: CallbackQuery):
@dp.callback_query(F.data.startswith("npmplus:")) @dp.callback_query(F.data.startswith("npmplus:"))
async def npmplus_toggle(cb: CallbackQuery): async def npmplus_toggle(cb: CallbackQuery):
if cb.from_user.id != cfg["telegram"]["admin_id"]: if cb.from_user.id not in ADMIN_IDS:
return return
parts = cb.data.split(":") parts = cb.data.split(":")
if len(parts) != 3: if len(parts) != 3:

View File

@@ -10,7 +10,7 @@ menu_kb = ReplyKeyboardMarkup(
keyboard=[ keyboard=[
[KeyboardButton(text="🩺 Health"), KeyboardButton(text="📊 Статус")], [KeyboardButton(text="🩺 Health"), KeyboardButton(text="📊 Статус")],
[KeyboardButton(text="🐳 Docker"), KeyboardButton(text="📦 Backup")], [KeyboardButton(text="🐳 Docker"), KeyboardButton(text="📦 Backup")],
[KeyboardButton(text="🧉 Artifacts"), KeyboardButton(text="⚙️ System")], [KeyboardButton(text="⚙️ System")],
[KeyboardButton(text=" Help")], [KeyboardButton(text=" Help")],
], ],
resize_keyboard=True, resize_keyboard=True,
@@ -20,7 +20,8 @@ docker_kb = ReplyKeyboardMarkup(
keyboard=[ keyboard=[
[KeyboardButton(text="🐳 Status"), KeyboardButton(text="🧰 Arcane")], [KeyboardButton(text="🐳 Status"), KeyboardButton(text="🧰 Arcane")],
[KeyboardButton(text="🔄 Restart"), KeyboardButton(text="📜 Logs")], [KeyboardButton(text="🔄 Restart"), KeyboardButton(text="📜 Logs")],
[KeyboardButton(text="📈 Stats"), KeyboardButton(text=" Назад")], [KeyboardButton(text="📈 Stats"), KeyboardButton(text=" Restarts")],
[KeyboardButton(text="⬅️ Назад")],
], ],
resize_keyboard=True, resize_keyboard=True,
) )
@@ -37,8 +38,8 @@ backup_kb = ReplyKeyboardMarkup(
keyboard=[ keyboard=[
[KeyboardButton(text="📦 Status"), KeyboardButton(text="📦 Last snapshot")], [KeyboardButton(text="📦 Status"), KeyboardButton(text="📦 Last snapshot")],
[KeyboardButton(text="📊 Repo stats"), KeyboardButton(text="🧯 Restore help")], [KeyboardButton(text="📊 Repo stats"), KeyboardButton(text="🧯 Restore help")],
[KeyboardButton(text="▶️ Run backup"), KeyboardButton(text="🧾 Queue")], [KeyboardButton(text="▶️ Run backup"), KeyboardButton(text="🧾 Queue"), KeyboardButton(text="📊 Queue SLA")],
[KeyboardButton(text="🧪 Restic check"), KeyboardButton(text="📬 Weekly report"), KeyboardButton(text="⬅️ Назад")], [KeyboardButton(text="📉 Backup SLA"), KeyboardButton(text="📜 History"), KeyboardButton(text="⬅️ Назад")],
], ],
resize_keyboard=True, resize_keyboard=True,
) )
@@ -83,6 +84,7 @@ system_logs_kb = ReplyKeyboardMarkup(
keyboard=[ keyboard=[
[KeyboardButton(text="🧾 Audit/Incidents"), KeyboardButton(text="🔒 Security")], [KeyboardButton(text="🧾 Audit/Incidents"), KeyboardButton(text="🔒 Security")],
[KeyboardButton(text="🧩 Integrations"), KeyboardButton(text="🧰 Processes")], [KeyboardButton(text="🧩 Integrations"), KeyboardButton(text="🧰 Processes")],
[KeyboardButton(text="📣 Summary"), KeyboardButton(text="🔥 Heatmap")],
[KeyboardButton(text="⬅️ System")], [KeyboardButton(text="⬅️ System")],
], ],
resize_keyboard=True, resize_keyboard=True,
@@ -91,6 +93,8 @@ system_logs_kb = ReplyKeyboardMarkup(
system_logs_audit_kb = ReplyKeyboardMarkup( system_logs_audit_kb = ReplyKeyboardMarkup(
keyboard=[ keyboard=[
[KeyboardButton(text="🧾 Audit"), KeyboardButton(text="📣 Incidents")], [KeyboardButton(text="🧾 Audit"), KeyboardButton(text="📣 Incidents")],
[KeyboardButton(text="🆕 Diff"), KeyboardButton(text="📤 Export")],
[KeyboardButton(text="📦 Export all"), KeyboardButton(text="🧰 Alerts log")],
[KeyboardButton(text="⬅️ Logs")], [KeyboardButton(text="⬅️ Logs")],
], ],
resize_keyboard=True, resize_keyboard=True,
@@ -121,6 +125,17 @@ system_logs_tools_kb = ReplyKeyboardMarkup(
resize_keyboard=True, resize_keyboard=True,
) )
# OpenWrt submenu (4 ряда)
openwrt_kb = ReplyKeyboardMarkup(
keyboard=[
[KeyboardButton(text="🌐 WAN fast"), KeyboardButton(text="📡 Full status")],
[KeyboardButton(text="📶 Wi-Fi clients"), KeyboardButton(text="🧾 Leases")],
[KeyboardButton(text="🔀 Leases diff")],
[KeyboardButton(text="⬅️ System")],
],
resize_keyboard=True,
)
def docker_inline_kb(action: str) -> InlineKeyboardMarkup: def docker_inline_kb(action: str) -> InlineKeyboardMarkup:
rows = [] rows = []

View File

@@ -1,4 +1,5 @@
from pathlib import Path from pathlib import Path
import os
import time import time
LOCK_DIR = Path("/var/run/tg-bot") LOCK_DIR = Path("/var/run/tg-bot")
@@ -11,9 +12,14 @@ def lock_path(name: str) -> Path:
def acquire_lock(name: str) -> bool: def acquire_lock(name: str) -> bool:
p = lock_path(name) p = lock_path(name)
if p.exists(): try:
fd = os.open(str(p), os.O_CREAT | os.O_EXCL | os.O_WRONLY)
except FileExistsError:
return False return False
p.write_text(str(time.time())) try:
os.write(fd, str(time.time()).encode("ascii", errors="ignore"))
finally:
os.close(fd)
return True return True

31
main.py
View File

@@ -2,18 +2,19 @@ import asyncio
import logging import logging
import socket import socket
from datetime import datetime from datetime import datetime
from app import bot, dp, cfg, ADMIN_ID from app import bot, dp, cfg, ADMIN_ID, ADMIN_IDS
from keyboards import menu_kb from keyboards import menu_kb
from services.docker import discover_containers, docker_watchdog from services.docker import discover_containers, docker_watchdog
from services.alerts import monitor_resources, monitor_smart from services.alerts import monitor_resources, monitor_smart, monitor_raid
from services.metrics import MetricsStore, start_sampler from services.metrics import MetricsStore, start_sampler
from services.queue import worker as queue_worker from services.queue import worker as queue_worker, configure as queue_configure
from services.notify import notify from services.notify import notify
from services.audit import AuditMiddleware, audit_start from services.audit import AuditMiddleware, audit_start
from services.ssl_alerts import monitor_ssl from services.ssl_alerts import monitor_ssl
from services.external_checks import monitor_external from services.external_checks import monitor_external
from services.incidents import log_incident from services.incidents import log_incident
from services.logging_setup import setup_logging from services.logging_setup import setup_logging
from services.selftest import schedule_selftest
import state import state
import handlers.menu import handlers.menu
import handlers.status import handlers.status
@@ -25,6 +26,7 @@ import handlers.help
import handlers.callbacks import handlers.callbacks
import handlers.arcane import handlers.arcane
import handlers.processes import handlers.processes
from services.weekly_report import weekly_reporter
import handlers.alerts_admin import handlers.alerts_admin
import handlers.config_check import handlers.config_check
@@ -36,8 +38,24 @@ def _handle_async_exception(_loop, context):
text = f"{msg}: {type(exc).__name__}: {exc}" text = f"{msg}: {type(exc).__name__}: {exc}"
else: else:
text = f"{msg}" text = f"{msg}"
now = datetime.now()
if not hasattr(_handle_async_exception, "_recent"):
_handle_async_exception._recent = []
_handle_async_exception._last_alert = None
recent = _handle_async_exception._recent
recent.append(now)
# keep last hour
_handle_async_exception._recent = [t for t in recent if (now - t).total_seconds() < 3600]
if len(_handle_async_exception._recent) >= 3:
last_alert = getattr(_handle_async_exception, "_last_alert", None)
if not last_alert or (now - last_alert).total_seconds() > 3600:
try:
log_incident(cfg, "exception_flood", category="system")
except Exception:
pass
_handle_async_exception._last_alert = now
try: try:
log_incident(cfg, text) log_incident(cfg, text, category="system")
except Exception: except Exception:
pass pass
logging.getLogger("asyncio").error(text) logging.getLogger("asyncio").error(text)
@@ -64,13 +82,18 @@ async def main():
asyncio.create_task(monitor_resources(cfg, notify, bot, ADMIN_ID)) asyncio.create_task(monitor_resources(cfg, notify, bot, ADMIN_ID))
if cfg.get("alerts", {}).get("smart_enabled", True): if cfg.get("alerts", {}).get("smart_enabled", True):
asyncio.create_task(monitor_smart(cfg, notify, bot, ADMIN_ID)) asyncio.create_task(monitor_smart(cfg, notify, bot, ADMIN_ID))
if cfg.get("alerts", {}).get("raid_enabled", True):
asyncio.create_task(monitor_raid(cfg, notify, bot, ADMIN_ID))
if cfg.get("npmplus", {}).get("alerts", {}).get("enabled", True): if cfg.get("npmplus", {}).get("alerts", {}).get("enabled", True):
asyncio.create_task(monitor_ssl(cfg, notify, bot, ADMIN_ID)) asyncio.create_task(monitor_ssl(cfg, notify, bot, ADMIN_ID))
if cfg.get("external_checks", {}).get("enabled", True): if cfg.get("external_checks", {}).get("enabled", True):
asyncio.create_task(monitor_external(cfg)) asyncio.create_task(monitor_external(cfg))
state.METRICS_STORE = MetricsStore() state.METRICS_STORE = MetricsStore()
asyncio.create_task(start_sampler(state.METRICS_STORE, interval=5)) asyncio.create_task(start_sampler(state.METRICS_STORE, interval=5))
queue_configure(cfg.get("queue", {}), cfg)
asyncio.create_task(queue_worker()) asyncio.create_task(queue_worker())
asyncio.create_task(weekly_reporter(cfg, bot, ADMIN_IDS, state.DOCKER_MAP))
asyncio.create_task(schedule_selftest(cfg, bot, ADMIN_IDS, state.DOCKER_MAP))
loop = asyncio.get_running_loop() loop = asyncio.get_running_loop()
loop.set_exception_handler(_handle_async_exception) loop.set_exception_handler(_handle_async_exception)
await notify_start() await notify_start()

View File

@@ -1,37 +1,53 @@
import time import time
from typing import Dict from typing import Dict
from services.runtime_state import get_state, set_state
# category -> unix timestamp until muted # category -> unix timestamp until muted
_MUTES: Dict[str, float] = {}
def _mutes() -> Dict[str, float]:
return get_state().get("mutes", {})
def _save(mutes: Dict[str, float]):
set_state("mutes", mutes)
def _cleanup() -> None: def _cleanup() -> None:
mutes = _mutes()
now = time.time() now = time.time()
expired = [k for k, until in _MUTES.items() if until <= now] expired = [k for k, until in mutes.items() if until <= now]
for k in expired: for k in expired:
_MUTES.pop(k, None) mutes.pop(k, None)
_save(mutes)
def set_mute(category: str, seconds: int) -> float: def set_mute(category: str, seconds: int) -> float:
_cleanup() _cleanup()
mutes = _mutes()
until = time.time() + max(0, seconds) until = time.time() + max(0, seconds)
_MUTES[category] = until mutes[category] = until
_save(mutes)
return until return until
def clear_mute(category: str) -> None: def clear_mute(category: str) -> None:
_MUTES.pop(category, None) mutes = _mutes()
mutes.pop(category, None)
_save(mutes)
def is_muted(category: str | None) -> bool: def is_muted(category: str | None) -> bool:
if not category: if not category:
return False return False
_cleanup() _cleanup()
until = _MUTES.get(category) mutes = _mutes()
until = mutes.get(category)
if until is None: if until is None:
return False return False
if until <= time.time(): if until <= time.time():
_MUTES.pop(category, None) mutes.pop(category, None)
_save(mutes)
return False return False
return True return True
@@ -39,4 +55,39 @@ def is_muted(category: str | None) -> bool:
def list_mutes() -> dict[str, int]: def list_mutes() -> dict[str, int]:
_cleanup() _cleanup()
now = time.time() now = time.time()
return {k: int(until - now) for k, until in _MUTES.items()} mutes = _mutes()
return {k: int(until - now) for k, until in mutes.items()}
def is_auto_muted(cfg: dict, category: str | None) -> bool:
if not category:
return False
auto_list = cfg.get("alerts", {}).get("auto_mute", [])
if not isinstance(auto_list, list):
return False
now = time.localtime()
now_minutes = now.tm_hour * 60 + now.tm_min
for item in auto_list:
if not isinstance(item, dict):
continue
cat = item.get("category")
if cat != category:
continue
start = item.get("start", "00:00")
end = item.get("end", "00:00")
try:
sh, sm = [int(x) for x in start.split(":")]
eh, em = [int(x) for x in end.split(":")]
except Exception:
continue
start_min = sh * 60 + sm
end_min = eh * 60 + em
if start_min == end_min:
continue
if start_min < end_min:
if start_min <= now_minutes < end_min:
return True
else:
if now_minutes >= start_min or now_minutes < end_min:
return True
return False

View File

@@ -1,7 +1,7 @@
import asyncio import asyncio
import time import time
import psutil import psutil
from system_checks import list_disks, smart_health, disk_temperature from system_checks import list_disks, smart_health, disk_temperature, list_md_arrays, md_array_status
from services.system import worst_disk_usage from services.system import worst_disk_usage
from services.disk_report import build_disk_report from services.disk_report import build_disk_report
@@ -12,6 +12,7 @@ async def monitor_resources(cfg, notify, bot, chat_id):
cooldown = int(alerts_cfg.get("cooldown_sec", 900)) cooldown = int(alerts_cfg.get("cooldown_sec", 900))
notify_recovery = bool(alerts_cfg.get("notify_recovery", True)) notify_recovery = bool(alerts_cfg.get("notify_recovery", True))
load_only_critical = bool(alerts_cfg.get("load_only_critical", False)) load_only_critical = bool(alerts_cfg.get("load_only_critical", False))
auto_mute_high_load_sec = int(alerts_cfg.get("auto_mute_on_high_load_sec", 0))
disk_warn = int(cfg.get("thresholds", {}).get("disk_warn", 80)) disk_warn = int(cfg.get("thresholds", {}).get("disk_warn", 80))
snapshot_warn = int(cfg.get("disk_report", {}).get("threshold", disk_warn)) snapshot_warn = int(cfg.get("disk_report", {}).get("threshold", disk_warn))
@@ -72,6 +73,10 @@ async def monitor_resources(cfg, notify, bot, chat_id):
key = "load_high_crit" if level == 2 else "load_high_warn" key = "load_high_crit" if level == 2 else "load_high_warn"
await notify(bot, chat_id, f"{icon} Load high: {load:.2f}", level=level_name, key=key, category="load") await notify(bot, chat_id, f"{icon} Load high: {load:.2f}", level=level_name, key=key, category="load")
last_sent["load"] = now last_sent["load"] = now
if level == 2 and auto_mute_high_load_sec > 0:
from services.alert_mute import set_mute
set_mute("load", auto_mute_high_load_sec)
state["load_level"] = level state["load_level"] = level
await asyncio.sleep(interval) await asyncio.sleep(interval)
@@ -125,3 +130,54 @@ async def monitor_smart(cfg, notify, bot, chat_id):
continue continue
await asyncio.sleep(interval) await asyncio.sleep(interval)
async def monitor_raid(cfg, notify, bot, chat_id):
alerts_cfg = cfg.get("alerts", {})
interval = int(alerts_cfg.get("raid_interval_sec", 300))
cooldown = int(alerts_cfg.get("raid_cooldown_sec", 1800))
notify_recovery = bool(alerts_cfg.get("notify_recovery", True))
last_sent: dict[str, float] = {}
bad_state: dict[str, bool] = {}
while True:
now = time.time()
for dev in list_md_arrays():
status = md_array_status(dev)
lower = status.lower()
level = None
key_suffix = None
if "inactive" in lower:
level = "critical"
key_suffix = "inactive"
elif "degraded" in lower:
level = "warn"
key_suffix = "degraded"
if level:
if not bad_state.get(dev) or (now - last_sent.get(dev, 0.0) >= cooldown):
icon = "🔴" if level == "critical" else "🟡"
await notify(
bot,
chat_id,
f"{icon} RAID {dev}: {status}",
level=level,
key=f"raid_{key_suffix}:{dev}",
category="raid",
)
last_sent[dev] = now
bad_state[dev] = True
else:
if bad_state.get(dev) and notify_recovery:
await notify(
bot,
chat_id,
f"🟢 RAID {dev}: {status}",
level="info",
key=f"raid_ok:{dev}",
category="raid",
)
bad_state[dev] = False
await asyncio.sleep(interval)

View File

@@ -9,7 +9,9 @@ def validate_cfg(cfg: dict[str, Any]) -> Tuple[List[str], List[str]]:
tg = cfg.get("telegram", {}) tg = cfg.get("telegram", {})
if not tg.get("token"): if not tg.get("token"):
errors.append("telegram.token is missing") errors.append("telegram.token is missing")
if not tg.get("admin_id"): admin_ids = tg.get("admin_ids")
has_admin_ids = isinstance(admin_ids, list) and len(admin_ids) > 0
if not tg.get("admin_id") and not has_admin_ids:
errors.append("telegram.admin_id is missing") errors.append("telegram.admin_id is missing")
thresholds = cfg.get("thresholds", {}) thresholds = cfg.get("thresholds", {})

View File

@@ -1,11 +1,48 @@
import os import os
import re
from typing import Any from typing import Any
from services.runner import run_cmd from services.runner import run_cmd
def _top_dirs_cmd(path: str, limit: int) -> list[str]: def _top_dirs_cmd(path: str, limit: int) -> list[str]:
return ["bash", "-lc", f"du -xhd1 {path} 2>/dev/null | sort -h | tail -n {limit}"] _ = limit
return ["du", "-x", "-h", "-d", "1", path]
_SIZE_RE = re.compile(r"^\s*([0-9]+(?:\.[0-9]+)?)([KMGTP]?)(i?B?)?$", re.IGNORECASE)
def _size_to_bytes(value: str) -> float:
m = _SIZE_RE.match(value.strip())
if not m:
return -1.0
num = float(m.group(1))
unit = (m.group(2) or "").upper()
mul = {
"": 1,
"K": 1024,
"M": 1024**2,
"G": 1024**3,
"T": 1024**4,
"P": 1024**5,
}.get(unit, 1)
return num * mul
def _format_top_dirs(raw: str, limit: int) -> str:
rows: list[tuple[float, str]] = []
for line in raw.splitlines():
line = line.strip()
if not line:
continue
parts = line.split(maxsplit=1)
if len(parts) != 2:
continue
size, name = parts
rows.append((_size_to_bytes(size), f"{size}\t{name}"))
rows.sort(key=lambda x: x[0])
return "\n".join(line for _sz, line in rows[-max(1, limit):])
async def build_disk_report(cfg: dict[str, Any], mount: str, usage: int) -> str: async def build_disk_report(cfg: dict[str, Any], mount: str, usage: int) -> str:
@@ -15,24 +52,27 @@ async def build_disk_report(cfg: dict[str, Any], mount: str, usage: int) -> str:
rc, out = await run_cmd(_top_dirs_cmd(mount, limit), timeout=30) rc, out = await run_cmd(_top_dirs_cmd(mount, limit), timeout=30)
if rc == 0 and out.strip(): if rc == 0 and out.strip():
top_out = _format_top_dirs(out, limit)
lines.append("") lines.append("")
lines.append("Top directories:") lines.append("Top directories:")
lines.append(out.strip()) lines.append(top_out)
docker_dir = cfg.get("disk_report", {}).get("docker_dir", "/var/lib/docker") docker_dir = cfg.get("disk_report", {}).get("docker_dir", "/var/lib/docker")
if docker_dir and os.path.exists(docker_dir): if docker_dir and os.path.exists(docker_dir):
rc2, out2 = await run_cmd(_top_dirs_cmd(docker_dir, limit), timeout=30) rc2, out2 = await run_cmd(_top_dirs_cmd(docker_dir, limit), timeout=30)
if rc2 == 0 and out2.strip(): if rc2 == 0 and out2.strip():
top_out2 = _format_top_dirs(out2, limit)
lines.append("") lines.append("")
lines.append(f"Docker dir: {docker_dir}") lines.append(f"Docker dir: {docker_dir}")
lines.append(out2.strip()) lines.append(top_out2)
logs_dir = cfg.get("disk_report", {}).get("logs_dir", "/var/log") logs_dir = cfg.get("disk_report", {}).get("logs_dir", "/var/log")
if logs_dir and os.path.exists(logs_dir): if logs_dir and os.path.exists(logs_dir):
rc3, out3 = await run_cmd(_top_dirs_cmd(logs_dir, limit), timeout=30) rc3, out3 = await run_cmd(_top_dirs_cmd(logs_dir, limit), timeout=30)
if rc3 == 0 and out3.strip(): if rc3 == 0 and out3.strip():
top_out3 = _format_top_dirs(out3, limit)
lines.append("") lines.append("")
lines.append(f"Logs dir: {logs_dir}") lines.append(f"Logs dir: {logs_dir}")
lines.append(out3.strip()) lines.append(top_out3)
return "\n".join(lines) return "\n".join(lines)

View File

@@ -1,4 +1,4 @@
import os import os
import ssl import ssl
import subprocess import subprocess
import psutil import psutil
@@ -38,7 +38,9 @@ def _npm_api_base(cfg) -> str | None:
def health(cfg, container_map: dict | None = None) -> str: def health(cfg, container_map: dict | None = None) -> str:
lines = ["🩺 Health check\n"] lines = ["🩺 Health check\n"]
thresholds = cfg.get("thresholds", {})
disk_warn = int(thresholds.get("disk_warn", 80))
load_warn = float(thresholds.get("load_warn", 2.0))
try: try:
env = os.environ.copy() env = os.environ.copy()
env.update(RESTIC_ENV) env.update(RESTIC_ENV)
@@ -91,12 +93,13 @@ def health(cfg, container_map: dict | None = None) -> str:
usage, mount = worst_disk_usage() usage, mount = worst_disk_usage()
if usage is None: if usage is None:
lines.append("⚠️ Disk n/a") lines.append("⚠️ Disk n/a")
elif usage > cfg["thresholds"]["disk_warn"]: elif usage > disk_warn:
lines.append(f"🟡 Disk {usage}% ({mount})") lines.append(f"🟡 Disk {usage}% ({mount})")
else: else:
lines.append(f"🟢 Disk {usage}% ({mount})") lines.append(f"🟢 Disk {usage}% ({mount})")
load = psutil.getloadavg()[0] load = psutil.getloadavg()[0]
lines.append(f"{'🟢' if load < cfg['thresholds']['load_warn'] else '🟡'} Load {load}") lines.append(f"{'🟢' if load < load_warn else '🟡'} Load {load}")
return "\n".join(lines) return "\n".join(lines)

View File

@@ -4,6 +4,7 @@ from collections import deque
from datetime import datetime, timedelta, timezone from datetime import datetime, timedelta, timezone
from logging.handlers import TimedRotatingFileHandler from logging.handlers import TimedRotatingFileHandler
from typing import Any from typing import Any
from services import runtime_state
def _get_path(cfg: dict[str, Any]) -> str: def _get_path(cfg: dict[str, Any]) -> str:
@@ -44,9 +45,11 @@ def _get_logger(cfg: dict[str, Any]) -> logging.Logger:
return logger return logger
def log_incident(cfg: dict[str, Any], text: str) -> None: def log_incident(cfg: dict[str, Any], text: str, category: str | None = None) -> None:
if not cfg.get("incidents", {}).get("enabled", True): if not cfg.get("incidents", {}).get("enabled", True):
return return
if category and "category=" not in text:
text = f"category={category} {text}"
logger = _get_logger(cfg) logger = _get_logger(cfg)
logger.info(text) logger.info(text)
@@ -63,6 +66,10 @@ def _parse_line(line: str) -> tuple[datetime | None, str]:
def read_recent(cfg: dict[str, Any], hours: int, limit: int = 200) -> list[str]: def read_recent(cfg: dict[str, Any], hours: int, limit: int = 200) -> list[str]:
return [f"{dt:%Y-%m-%d %H:%M} {msg}" for dt, msg in read_raw(cfg, hours, limit=limit)]
def read_raw(cfg: dict[str, Any], hours: int, limit: int = 200, *, include_old: bool = False) -> list[tuple[datetime, str]]:
path = _get_path(cfg) path = _get_path(cfg)
if not os.path.exists(path): if not os.path.exists(path):
return [] return []
@@ -72,7 +79,40 @@ def read_recent(cfg: dict[str, Any], hours: int, limit: int = 200) -> list[str]:
with open(path, "r", encoding="utf-8", errors="replace") as f: with open(path, "r", encoding="utf-8", errors="replace") as f:
for line in f: for line in f:
dt, msg = _parse_line(line.rstrip()) dt, msg = _parse_line(line.rstrip())
if dt is None or dt < since: if dt is None:
continue continue
lines.append(f"{dt:%Y-%m-%d %H:%M} {msg}") if not include_old and dt < since:
continue
lines.append((dt, msg))
return list(lines) return list(lines)
def infer_category(text: str) -> str | None:
lower = text.lower()
if "category=" in lower:
import re
m = re.search(r"category=([a-z0-9_-]+)", lower)
if m:
return m.group(1)
if "load" in lower:
return "load"
if "docker" in lower:
return "docker"
if "restic" in lower or "backup" in lower:
return "backup"
if "smart" in lower:
return "smart"
if "ssl" in lower or "cert" in lower:
return "ssl"
if "npmplus" in lower:
return "npmplus"
if "gitea" in lower:
return "gitea"
if "openwrt" in lower:
return "openwrt"
if "queue" in lower:
return "queue"
if "selftest" in lower:
return "selftest"
return None

View File

@@ -2,7 +2,7 @@ import time
from datetime import datetime from datetime import datetime
from aiogram import Bot from aiogram import Bot
from app import cfg from app import cfg
from services.alert_mute import is_muted from services.alert_mute import is_muted, is_auto_muted
from services.incidents import log_incident from services.incidents import log_incident
@@ -47,12 +47,22 @@ async def notify(
category: str | None = None, category: str | None = None,
): ):
alerts_cfg = cfg.get("alerts", {}) alerts_cfg = cfg.get("alerts", {})
suppressed_reason = None
if category and is_muted(category): if category and is_muted(category):
return suppressed_reason = "muted"
if _in_quiet_hours(alerts_cfg): elif category and is_auto_muted(cfg, category):
suppressed_reason = "auto_mute"
elif _in_quiet_hours(alerts_cfg):
allow_critical = bool(alerts_cfg.get("quiet_hours", {}).get("allow_critical", True)) allow_critical = bool(alerts_cfg.get("quiet_hours", {}).get("allow_critical", True))
if not (allow_critical and level == "critical"): if not (allow_critical and level == "critical"):
return suppressed_reason = "quiet_hours"
if suppressed_reason:
try:
log_incident(cfg, f"[suppressed:{suppressed_reason}] {text}", category=category)
except Exception:
pass
return
dedup_sec = int(alerts_cfg.get("notify_cooldown_sec", alerts_cfg.get("cooldown_sec", 900))) dedup_sec = int(alerts_cfg.get("notify_cooldown_sec", alerts_cfg.get("cooldown_sec", 900)))
if dedup_sec > 0: if dedup_sec > 0:
@@ -68,6 +78,6 @@ async def notify(
except Exception: except Exception:
pass pass
try: try:
log_incident(cfg, text) log_incident(cfg, text, category=category)
except Exception: except Exception:
pass pass

View File

@@ -240,7 +240,7 @@ def _parse_hostapd_clients(
*, *,
name_map: dict[str, str] | None = None, name_map: dict[str, str] | None = None,
ifname_meta: dict[str, dict[str, str]] | None = None, ifname_meta: dict[str, dict[str, str]] | None = None,
) -> list[str]: ) -> list[tuple[str, int | None, str]]:
if not isinstance(payload, dict): if not isinstance(payload, dict):
return [] return []
data = payload.get("clients") data = payload.get("clients")
@@ -248,7 +248,7 @@ def _parse_hostapd_clients(
items = data.items() items = data.items()
else: else:
return [] return []
clients: list[str] = [] clients: list[tuple[str, int | None, str]] = []
name_map = name_map or {} name_map = name_map or {}
meta = (ifname_meta or {}).get(ifname, {}) meta = (ifname_meta or {}).get(ifname, {})
ssid = meta.get("ssid") or "" ssid = meta.get("ssid") or ""
@@ -274,7 +274,8 @@ def _parse_hostapd_clients(
client_label = host client_label = host
else: else:
client_label = str(mac) client_label = str(mac)
clients.append(f"{net_label} {client_label} {sig} rx:{rx} tx:{tx}") line = f"{net_label} {client_label} {sig} rx:{rx} tx:{tx}"
clients.append((line, signal if isinstance(signal, (int, float)) else None, net_label))
return clients return clients
@@ -308,7 +309,7 @@ def _parse_leases_fallback(raw: str) -> list[str]:
return out return out
async def get_openwrt_status(cfg: dict[str, Any]) -> str: async def get_openwrt_status(cfg: dict[str, Any], mode: str = "full") -> str:
ow_cfg = cfg.get("openwrt", {}) ow_cfg = cfg.get("openwrt", {})
host = ow_cfg.get("host") host = ow_cfg.get("host")
user = ow_cfg.get("user", "root") user = ow_cfg.get("user", "root")
@@ -353,19 +354,11 @@ async def get_openwrt_status(cfg: dict[str, Any]) -> str:
if len(parts) < 4: if len(parts) < 4:
return "⚠️ OpenWrt response incomplete" return "⚠️ OpenWrt response incomplete"
sys_info = None
wan_status = None
wireless = None
leases = None
leases_fallback = ""
sys_info = _safe_json_load(parts[0]) sys_info = _safe_json_load(parts[0])
if sys_info is None:
sys_info = None
wan_status = _safe_json_load(parts[1]) or {} wan_status = _safe_json_load(parts[1]) or {}
wireless = _safe_json_load(parts[2]) or {} wireless = _safe_json_load(parts[2]) or {}
leases = _safe_json_load(parts[3]) leases = _safe_json_load(parts[3])
if leases is None: leases_fallback = "" if leases is not None else parts[3]
leases_fallback = parts[3]
if isinstance(sys_info, dict): if isinstance(sys_info, dict):
uptime_raw = sys_info.get("uptime") uptime_raw = sys_info.get("uptime")
@@ -392,6 +385,7 @@ async def get_openwrt_status(cfg: dict[str, Any]) -> str:
if leases_fallback: if leases_fallback:
lease_name_map.update(_extract_lease_name_map_fallback(leases_fallback)) lease_name_map.update(_extract_lease_name_map_fallback(leases_fallback))
wifi_net_counts: dict[str, int] = {} wifi_net_counts: dict[str, int] = {}
wifi_signals: dict[str, list[int]] = {}
if ifnames: if ifnames:
for ifname in ifnames: for ifname in ifnames:
cmd_clients = ssh_cmd + ["ubus", "call", f"hostapd.{ifname}", "get_clients"] cmd_clients = ssh_cmd + ["ubus", "call", f"hostapd.{ifname}", "get_clients"]
@@ -405,49 +399,106 @@ async def get_openwrt_status(cfg: dict[str, Any]) -> str:
if isinstance(clients_payload, dict): if isinstance(clients_payload, dict):
label = _net_label_for_ifname(ifname, ifname_meta) label = _net_label_for_ifname(ifname, ifname_meta)
wifi_net_counts[label] = wifi_net_counts.get(label, 0) + len(clients_payload) wifi_net_counts[label] = wifi_net_counts.get(label, 0) + len(clients_payload)
wifi_clients.extend( parsed = _parse_hostapd_clients(
_parse_hostapd_clients( payload,
payload, ifname,
ifname, name_map=lease_name_map,
name_map=lease_name_map, ifname_meta=ifname_meta,
ifname_meta=ifname_meta,
)
) )
wifi_clients.extend([p[0] for p in parsed])
for _line, sig, net_label in parsed:
if sig is not None and net_label:
wifi_signals.setdefault(net_label, []).append(sig)
if leases: if leases:
leases_list = _extract_leases(leases) leases_list = _extract_leases(leases)
else: else:
leases_list = _parse_leases_fallback(leases_fallback) leases_list = _parse_leases_fallback(leases_fallback)
lines = [ header = [
"📡 OpenWrt", "📡 OpenWrt",
f"🕒 Uptime: {uptime}", f"🕒 Uptime: {uptime}",
f"⚙️ Load: {load}", f"⚙️ Load: {load}",
f"🌐 WAN: {wan_ip} ({wan_state})", f"🌐 WAN: {wan_ip} ({wan_state})",
"", "",
] ]
wifi_section: list[str] = []
if wifi_net_counts: if wifi_net_counts:
lines.append("📶 Wi-Fi networks:") wifi_section.append("📶 Wi-Fi networks:")
for label, count in sorted(wifi_net_counts.items()): for label, count in sorted(wifi_net_counts.items()):
lines.append(f" - {label}: {count}") sigs = wifi_signals.get(label) or []
lines.append("") if sigs:
avg_sig = sum(sigs) / len(sigs)
min_sig = min(sigs)
wifi_section.append(f" - {label}: {count} (avg {avg_sig:.0f}dBm, min {min_sig}dBm)")
else:
wifi_section.append(f" - {label}: {count}")
wifi_section.append("")
lines.append(f"📶 Wi-Fi clients: {len(wifi_clients)}") wifi_section.append(f"📶 Wi-Fi clients: {len(wifi_clients)}")
if wifi_clients: if wifi_clients:
for line in wifi_clients[:20]: for line in wifi_clients[:20]:
lines.append(f" - {line}") wifi_section.append(f" - {line}")
if len(wifi_clients) > 20: if len(wifi_clients) > 20:
lines.append(f" … and {len(wifi_clients) - 20} more") wifi_section.append(f" … and {len(wifi_clients) - 20} more")
else: else:
lines.append(" (none)") wifi_section.append(" (none)")
lines += ["", f"🧾 DHCP leases: {len(leases_list)}"] lease_section: list[str] = ["", f"🧾 DHCP leases: {len(leases_list)}"]
if leases_list: if leases_list:
for line in leases_list[:20]: for line in leases_list[:20]:
lines.append(f" - {line}") lease_section.append(f" - {line}")
if len(leases_list) > 20: if len(leases_list) > 20:
lines.append(f" … and {len(leases_list) - 20} more") lease_section.append(f" … and {len(leases_list) - 20} more")
else: else:
lines.append(" (none)") lease_section.append(" (none)")
return "\n".join(lines) if mode == "wan":
return "\n".join(header)
if mode == "clients":
return "\n".join(header + wifi_section)
if mode == "leases":
return "\n".join(header + lease_section)
return "\n".join(header + wifi_section + lease_section)
async def fetch_openwrt_leases(cfg: dict[str, Any]) -> list[str]:
"""
Fetch DHCP leases as list of strings "IP hostname (MAC)".
"""
ow_cfg = cfg.get("openwrt", {})
host = ow_cfg.get("host")
user = ow_cfg.get("user", "root")
port = ow_cfg.get("port", 22)
identity_file = ow_cfg.get("identity_file")
timeout_sec = ow_cfg.get("timeout_sec", 8)
strict = ow_cfg.get("strict_host_key_checking", True)
if not host:
raise RuntimeError("OpenWrt host not configured")
ssh_cmd = [
"ssh",
"-o",
"BatchMode=yes",
"-o",
f"ConnectTimeout={timeout_sec}",
"-o",
"LogLevel=ERROR",
]
if not strict:
ssh_cmd += ["-o", "StrictHostKeyChecking=no", "-o", "UserKnownHostsFile=/dev/null"]
if identity_file:
ssh_cmd += ["-i", str(identity_file)]
ssh_cmd += ["-p", str(port), f"{user}@{host}"]
remote = "ubus call luci-rpc getDHCPLeases '{\"family\":4}' 2>/dev/null || cat /tmp/dhcp.leases"
rc, out = await run_cmd_full(ssh_cmd + ["sh", "-c", remote], timeout=timeout_sec + 10)
if rc == 124:
raise RuntimeError("timeout")
if rc != 0:
raise RuntimeError(out.strip() or f"ssh rc={rc}")
leases = _safe_json_load(out)
if leases:
return _extract_leases(leases)
return _parse_leases_fallback(out)

View File

@@ -1,19 +1,72 @@
import asyncio import asyncio
import logging
import time import time
from collections import deque from collections import deque
from typing import Awaitable, Callable, Any from typing import Awaitable, Callable, Any
from services import runtime_state
from services.incidents import log_incident
_queue: asyncio.Queue = asyncio.Queue() _queue: asyncio.Queue = asyncio.Queue()
_current_label: str | None = None _current_label: str | None = None
_current_meta: dict[str, Any] | None = None _current_meta: dict[str, Any] | None = None
_pending: deque[tuple[str, float]] = deque() _pending: deque[tuple[str, float]] = deque()
_stats: dict[str, Any] = runtime_state.get("queue_stats", {}) or {
"processed": 0,
"avg_wait_sec": 0.0,
"avg_runtime_sec": 0.0,
"last_label": "",
"last_finished_at": 0.0,
}
_history: deque[dict[str, Any]] = deque(runtime_state.get("queue_history", []) or [], maxlen=50)
_alert_cfg: dict[str, Any] = {
"max_pending": None,
"avg_wait": None,
"cooldown": 300,
"last_sent": 0.0,
}
_cfg: dict[str, Any] | None = None
_logger = logging.getLogger("queue")
def _save_stats():
runtime_state.set_state("queue_stats", _stats)
runtime_state.set_state("queue_history", list(_history))
def configure(queue_cfg: dict[str, Any], cfg: dict[str, Any]):
global _cfg
_cfg = cfg
_alert_cfg["max_pending"] = queue_cfg.get("max_pending_alert")
_alert_cfg["avg_wait"] = queue_cfg.get("avg_wait_alert")
_alert_cfg["cooldown"] = queue_cfg.get("cooldown_sec", 300)
def _check_congestion(pending_count: int, avg_wait: float | None):
max_pending = _alert_cfg.get("max_pending")
avg_wait_thr = _alert_cfg.get("avg_wait")
cooldown = _alert_cfg.get("cooldown", 300)
now = time.time()
if now - _alert_cfg.get("last_sent", 0) < cooldown:
return
reason = None
if max_pending and pending_count >= max_pending:
reason = f"pending={pending_count} >= {max_pending}"
if avg_wait_thr and avg_wait is not None and avg_wait >= avg_wait_thr:
reason = reason or f"avg_wait={avg_wait:.1f}s >= {avg_wait_thr}s"
if reason and _cfg:
try:
log_incident(_cfg, f"queue_congested {reason}", category="queue")
except Exception:
pass
_alert_cfg["last_sent"] = now
async def enqueue(label: str, job: Callable[[], Awaitable[None]]) -> int: async def enqueue(label: str, job: Callable[[], Awaitable[None]]) -> int:
enqueued_at = time.time() enqueued_at = time.time()
await _queue.put((label, job, enqueued_at)) await _queue.put((label, job, enqueued_at))
_pending.append((label, enqueued_at)) _pending.append((label, enqueued_at))
_check_congestion(len(_pending), None)
return len(_pending) return len(_pending)
@@ -31,9 +84,47 @@ async def worker():
pass pass
_current_label = label _current_label = label
_current_meta = {"enqueued_at": enqueued_at, "started_at": time.time()} _current_meta = {"enqueued_at": enqueued_at, "started_at": time.time()}
status = "ok"
try: try:
await job() await job()
except Exception as e:
status = "err"
_logger.exception("Queue job failed: label=%s", label)
if _cfg:
try:
log_incident(
_cfg,
f"queue_job_failed label={label} error={type(e).__name__}: {e}",
category="queue",
)
except Exception:
pass
finally: finally:
finished_at = time.time()
if _current_meta:
wait_sec = max(0.0, _current_meta["started_at"] - _current_meta["enqueued_at"])
runtime_sec = max(0.0, finished_at - _current_meta["started_at"])
n_prev = int(_stats.get("processed", 0))
_stats["processed"] = n_prev + 1
_stats["avg_wait_sec"] = (
(_stats.get("avg_wait_sec", 0.0) * n_prev) + wait_sec
) / _stats["processed"]
_stats["avg_runtime_sec"] = (
(_stats.get("avg_runtime_sec", 0.0) * n_prev) + runtime_sec
) / _stats["processed"]
_stats["last_label"] = label
_stats["last_finished_at"] = finished_at
_history.appendleft(
{
"label": label,
"wait_sec": int(wait_sec),
"runtime_sec": int(runtime_sec),
"finished_at": int(finished_at),
"status": status,
}
)
_save_stats()
_check_congestion(len(_pending), _stats.get("avg_wait_sec"))
_current_label = None _current_label = None
_current_meta = None _current_meta = None
_queue.task_done() _queue.task_done()
@@ -47,6 +138,12 @@ def format_status() -> str:
if pending: if pending:
preview = ", ".join([p[0] for p in pending[:5]]) preview = ", ".join([p[0] for p in pending[:5]])
lines.append(f"➡️ Next: {preview}") lines.append(f"➡️ Next: {preview}")
if _stats.get("processed"):
lines.append(
f"📈 Done: {_stats.get('processed')} | "
f"avg wait {int(_stats.get('avg_wait_sec', 0))}s | "
f"avg run {int(_stats.get('avg_runtime_sec', 0))}s"
)
return "\n".join(lines) return "\n".join(lines)
@@ -67,4 +164,46 @@ def format_details(limit: int = 10) -> str:
for i, (label, enqueued_at) in enumerate(pending[:limit], start=1): for i, (label, enqueued_at) in enumerate(pending[:limit], start=1):
wait = int(now - enqueued_at) wait = int(now - enqueued_at)
lines.append(f"{i:>3} | {label} | {wait}s") lines.append(f"{i:>3} | {label} | {wait}s")
if _stats.get("processed"):
lines.append("")
lines.append(
"📈 Stats: "
f"{_stats.get('processed')} done, "
f"avg wait {int(_stats.get('avg_wait_sec', 0))}s, "
f"avg run {int(_stats.get('avg_runtime_sec', 0))}s"
)
last_label = _stats.get("last_label")
if last_label:
lines.append(f"Last: {last_label}")
if _history:
lines.append("")
lines.append("🗂 Last jobs:")
for item in list(_history)[:5]:
t = time.strftime("%H:%M:%S", time.localtime(item["finished_at"]))
lines.append(
f"- {t} {item['label']} {item['status']} "
f"(wait {item['wait_sec']}s, run {item['runtime_sec']}s)"
)
return "\n".join(lines) return "\n".join(lines)
def format_history(limit: int = 20) -> str:
lines = ["🗂 Queue history"]
if not _history:
lines.append("(empty)")
return "\n".join(lines)
for item in list(_history)[:limit]:
t = time.strftime("%m-%d %H:%M:%S", time.localtime(item["finished_at"]))
lines.append(
f"{t} {item['label']} {item['status']} "
f"(wait {item['wait_sec']}s, run {item['runtime_sec']}s)"
)
return "\n".join(lines)
def get_history_raw() -> list[dict[str, Any]]:
return list(_history)
def get_stats() -> dict[str, Any]:
return dict(_stats)

73
services/runtime_state.py Normal file
View File

@@ -0,0 +1,73 @@
import json
import os
import threading
import tempfile
from typing import Any, Dict
_PATH = "/var/server-bot/runtime.json"
_STATE: Dict[str, Any] = {}
_LOCK = threading.RLock()
_LOADED = False
def configure(path: str | None):
global _PATH
if path:
_PATH = path
def _load_from_disk():
global _STATE, _LOADED
if not os.path.exists(_PATH):
_STATE = {}
_LOADED = True
return
try:
with open(_PATH, "r", encoding="utf-8") as f:
_STATE = json.load(f)
except Exception:
_STATE = {}
_LOADED = True
def _save():
directory = os.path.dirname(_PATH) or "."
os.makedirs(directory, exist_ok=True)
try:
fd, tmp_path = tempfile.mkstemp(prefix=".runtime.", suffix=".json", dir=directory)
try:
with os.fdopen(fd, "w", encoding="utf-8") as f:
json.dump(_STATE, f, ensure_ascii=False)
f.flush()
os.fsync(f.fileno())
os.replace(tmp_path, _PATH)
finally:
if os.path.exists(tmp_path):
try:
os.unlink(tmp_path)
except Exception:
pass
except Exception:
pass
def get_state() -> Dict[str, Any]:
with _LOCK:
if not _LOADED:
_load_from_disk()
return _STATE
def set_state(key: str, value: Any):
with _LOCK:
if not _LOADED:
_load_from_disk()
_STATE[key] = value
_save()
def get(key: str, default: Any = None) -> Any:
with _LOCK:
if not _LOADED:
_load_from_disk()
return _STATE.get(key, default)

95
services/selftest.py Normal file
View File

@@ -0,0 +1,95 @@
import json
from datetime import datetime, timedelta
import asyncio
from typing import Any
from services.health import health
from services.runner import run_cmd_full
from services.incidents import log_incident
from services import runtime_state
def _save_history(entry: dict[str, Any]) -> None:
hist = runtime_state.get("selftest_history", [])
hist = hist[:50] if isinstance(hist, list) else []
hist.insert(0, entry)
runtime_state.set_state("selftest_history", hist[:20])
async def run_selftest(cfg: dict[str, Any], docker_map: dict[str, str]) -> tuple[str, bool]:
lines = ["🧪 Self-test"]
ok = True
# health
try:
htext = await asyncio.to_thread(health, cfg, docker_map)
h_lines = [ln for ln in htext.splitlines() if ln.strip()]
brief = " | ".join(h_lines[1:5]) if len(h_lines) > 1 else h_lines[0] if h_lines else "n/a"
lines.append(f"🟢 Health: {brief}")
except Exception as e:
lines.append(f"🔴 Health failed: {e}")
ok = False
# restic snapshots check
rc, out = await run_cmd_full(["restic", "snapshots", "--json"], use_restic_env=True, timeout=40)
if rc == 0:
try:
snaps = json.loads(out)
if isinstance(snaps, list) and snaps:
snaps.sort(key=lambda s: s.get("time", ""), reverse=True)
last = snaps[0]
t = last.get("time", "?").replace("Z", "").replace("T", " ")[:16]
lines.append(f"🟢 Restic snapshots: {len(snaps)}, last {t}")
else:
lines.append("🟡 Restic snapshots: empty")
except Exception:
lines.append("🟡 Restic snapshots: invalid JSON")
else:
lines.append(f"🔴 Restic snapshots error: {out.strip() or rc}")
ok = False
result_text = "\n".join(lines)
try:
_save_history(
{
"ts": datetime.now().isoformat(),
"ok": ok,
"summary": result_text.splitlines()[1] if len(lines) > 1 else "",
}
)
except Exception:
pass
return result_text, ok
async def schedule_selftest(cfg: dict[str, Any], bot, admin_ids: list[int], docker_map: dict[str, str]):
"""
Run selftest daily at configured time.
"""
sched_cfg = cfg.get("selftest", {}).get("schedule", {})
if not sched_cfg.get("enabled", False):
return
time_str = sched_cfg.get("time", "03:30")
try:
hh, mm = [int(x) for x in time_str.split(":")]
except Exception:
hh, mm = 3, 30
while True:
now = datetime.now()
run_at = now.replace(hour=hh, minute=mm, second=0, microsecond=0)
if run_at <= now:
run_at += timedelta(days=1)
await asyncio.sleep((run_at - now).total_seconds())
text, ok = await run_selftest(cfg, docker_map)
for chat_id in admin_ids:
try:
await bot.send_message(chat_id, text)
except Exception:
pass
if not ok:
try:
log_incident(cfg, "selftest failed", category="selftest")
except Exception:
pass

107
services/weekly_report.py Normal file
View File

@@ -0,0 +1,107 @@
import asyncio
import socket
from datetime import datetime, timedelta
import psutil
from services.system import worst_disk_usage
from services.alert_mute import list_mutes
from services.incidents import read_recent
from services.docker import docker_cmd
def _parse_hhmm(value: str) -> tuple[int, int]:
try:
h, m = value.split(":", 1)
h = int(h)
m = int(m)
if 0 <= h <= 23 and 0 <= m <= 59:
return h, m
except Exception:
pass
return 8, 0
def _next_run(day: str, time_str: str) -> datetime:
day = (day or "Sun").lower()
day_map = {"mon": 0, "tue": 1, "wed": 2, "thu": 3, "fri": 4, "sat": 5, "sun": 6}
target_wd = day_map.get(day[:3], 6)
hour, minute = _parse_hhmm(time_str or "08:00")
now = datetime.now()
candidate = now.replace(hour=hour, minute=minute, second=0, microsecond=0)
# find next target weekday/time
while candidate <= now or candidate.weekday() != target_wd:
candidate = candidate + timedelta(days=1)
candidate = candidate.replace(hour=hour, minute=minute, second=0, microsecond=0)
return candidate
async def _docker_running_counts(docker_map: dict) -> tuple[int, int]:
total = len(docker_map)
running = 0
for real in docker_map.values():
rc, raw = await docker_cmd(["inspect", "-f", "{{.State.Status}}", real], timeout=10)
if rc == 0 and raw.strip() == "running":
running += 1
return running, total
def _format_uptime(seconds: int) -> str:
days, rem = divmod(seconds, 86400)
hours, rem = divmod(rem, 3600)
minutes, _ = divmod(rem, 60)
return f"{days}d {hours:02d}:{minutes:02d}"
async def build_weekly_report(cfg, docker_map: dict) -> str:
host = socket.gethostname()
uptime = int(datetime.now().timestamp() - psutil.boot_time())
load1, load5, load15 = psutil.getloadavg()
mem = psutil.virtual_memory()
disk_usage, disk_mount = worst_disk_usage()
running, total = await _docker_running_counts(docker_map)
mutes = list_mutes()
incidents_24 = len(read_recent(cfg, 24, limit=1000))
incidents_7d = len(read_recent(cfg, 24 * 7, limit=2000))
lines = [
f"🧾 Weekly report — {host}",
f"⏱ Uptime: {_format_uptime(uptime)}",
f"⚙️ Load: {load1:.2f} {load5:.2f} {load15:.2f}",
f"🧠 RAM: {mem.percent}%",
]
if disk_usage is None:
lines.append("💾 Disk: n/a")
else:
lines.append(f"💾 Disk: {disk_usage}% ({disk_mount})")
lines.append(f"🐳 Docker: {running}/{total} running")
lines.append(f"📓 Incidents: 24h={incidents_24}, 7d={incidents_7d}")
if mutes:
lines.append("🔕 Active mutes:")
for cat, secs in mutes.items():
mins = max(0, secs) // 60
lines.append(f"- {cat}: {mins}m left")
else:
lines.append("🔔 Mutes: none")
return "\n".join(lines)
async def weekly_reporter(cfg, bot, admin_ids: list[int], docker_map: dict):
reports_cfg = cfg.get("reports", {}).get("weekly", {})
if not reports_cfg.get("enabled", False):
return
day = reports_cfg.get("day", "Sun")
time_str = reports_cfg.get("time", "08:00")
while True:
target = _next_run(day, time_str)
wait_sec = (target - datetime.now()).total_seconds()
if wait_sec > 0:
await asyncio.sleep(wait_sec)
try:
text = await build_weekly_report(cfg, docker_map)
for admin_id in admin_ids:
await bot.send_message(admin_id, text)
except Exception:
pass
await asyncio.sleep(60) # small delay to avoid tight loop if time skew

View File

@@ -1,5 +1,6 @@
import subprocess import subprocess
import os import os
import re
def _cmd(cmd: str) -> str: def _cmd(cmd: str) -> str:
@@ -82,6 +83,62 @@ def list_disks() -> list[str]:
return disks return disks
def list_md_arrays() -> list[str]:
# Prefer /proc/mdstat: it reliably lists active md arrays
# even when lsblk tree/filters differ across distros.
out = _cmd("cat /proc/mdstat")
arrays: set[str] = set()
for line in out.splitlines():
m = re.match(r"^\s*(md\d+)\s*:", line)
if m:
arrays.add(f"/dev/{m.group(1)}")
if arrays:
return sorted(arrays)
# Fallback for environments where mdstat parsing is unavailable.
out = _cmd("ls -1 /dev/md* 2>/dev/null")
for line in out.splitlines():
dev = line.strip()
if dev and re.match(r"^/dev/md\d+$", dev):
arrays.add(dev)
return sorted(arrays)
def md_array_status(dev: str) -> str:
out = _cmd("cat /proc/mdstat")
if not out or "ERROR:" in out:
return "⚠️ n/a"
name = dev.rsplit("/", 1)[-1]
lines = out.splitlines()
header = None
idx = -1
for i, line in enumerate(lines):
s = line.strip()
if s.startswith(f"{name} :"):
header = s
idx = i
break
if not header:
return "⚠️ not found in /proc/mdstat"
if "inactive" in header:
return "🔴 inactive"
# Typical mdstat health marker: [UU] for healthy mirrors/raid members.
block = [header]
for line in lines[idx + 1:]:
if not line.strip():
break
block.append(line.strip())
block_text = " ".join(block)
if "[U_" in block_text or "[_U" in block_text:
return "🟡 degraded"
return "🟢 active"
def smart_health(dev: str) -> str: def smart_health(dev: str) -> str:
out = _cmd(f"smartctl -H {dev}") out = _cmd(f"smartctl -H {dev}")
@@ -138,8 +195,9 @@ def smart_last_test(dev: str) -> str:
def disks() -> str: def disks() -> str:
disks = list_disks() disks = list_disks()
md_arrays = list_md_arrays()
if not disks: if not disks and not md_arrays:
return "💽 Disks\n\n❌ No disks found" return "💽 Disks\n\n❌ No disks found"
lines = ["💽 Disks (SMART)\n"] lines = ["💽 Disks (SMART)\n"]
@@ -158,6 +216,12 @@ def disks() -> str:
lines.append(f"{icon} {d}{health}, 🌡 {temp}") lines.append(f"{icon} {d}{health}, 🌡 {temp}")
if md_arrays:
lines.append("")
lines.append("🧱 RAID (md)")
for md in md_arrays:
lines.append(f"{md}{md_array_status(md)}")
return "\n".join(lines) return "\n".join(lines)

View File

@@ -0,0 +1,20 @@
import unittest
from services.config_check import validate_cfg
class ConfigCheckTests(unittest.TestCase):
def test_admin_ids_without_admin_id_is_valid(self):
cfg = {
"telegram": {
"token": "x",
"admin_ids": [1, 2],
}
}
errors, warnings = validate_cfg(cfg)
self.assertEqual(errors, [])
self.assertIsInstance(warnings, list)
if __name__ == "__main__":
unittest.main()

21
tests/test_disk_report.py Normal file
View File

@@ -0,0 +1,21 @@
import unittest
import types
import sys
# Avoid runtime import of real app/aiogram in services.runner.
sys.modules.setdefault("app", types.SimpleNamespace(RESTIC_ENV={}))
from services.disk_report import _top_dirs_cmd
class DiskReportTests(unittest.TestCase):
def test_top_dirs_cmd_uses_exec_args_without_shell(self):
cmd = _top_dirs_cmd("/tmp/path with spaces", 5)
self.assertEqual(cmd[:4], ["du", "-x", "-h", "-d"])
self.assertNotIn("bash", cmd)
self.assertNotIn("-lc", cmd)
self.assertEqual(cmd[-1], "/tmp/path with spaces")
if __name__ == "__main__":
unittest.main()

59
tests/test_queue.py Normal file
View File

@@ -0,0 +1,59 @@
import asyncio
import tempfile
import unittest
from services import runtime_state
from services import queue as queue_service
class QueueTests(unittest.IsolatedAsyncioTestCase):
async def asyncSetUp(self):
self.tmp = tempfile.TemporaryDirectory()
runtime_state.configure(f"{self.tmp.name}/runtime.json")
queue_service._pending.clear() # type: ignore[attr-defined]
queue_service._history.clear() # type: ignore[attr-defined]
queue_service._stats = { # type: ignore[attr-defined]
"processed": 0,
"avg_wait_sec": 0.0,
"avg_runtime_sec": 0.0,
"last_label": "",
"last_finished_at": 0.0,
}
queue_service._cfg = {"incidents": {"enabled": True}} # type: ignore[attr-defined]
async def asyncTearDown(self):
self.tmp.cleanup()
async def test_worker_logs_failed_job_to_incidents(self):
logged = []
def fake_log_incident(cfg, text, category=None):
logged.append((text, category))
orig = queue_service.log_incident
queue_service.log_incident = fake_log_incident
async def boom():
raise RuntimeError("boom")
worker_task = asyncio.create_task(queue_service.worker())
try:
await queue_service.enqueue("broken-job", boom)
await asyncio.wait_for(queue_service._queue.join(), timeout=2.0) # type: ignore[attr-defined]
finally:
worker_task.cancel()
with contextlib.suppress(asyncio.CancelledError):
await worker_task
queue_service.log_incident = orig
self.assertEqual(queue_service._stats.get("processed"), 1) # type: ignore[attr-defined]
self.assertTrue(any("queue_job_failed label=broken-job" in t for t, _c in logged))
self.assertTrue(any(c == "queue" for _t, c in logged))
import contextlib
if __name__ == "__main__":
unittest.main()

View File

@@ -0,0 +1,28 @@
import json
import tempfile
import unittest
from pathlib import Path
from services import runtime_state
class RuntimeStateTests(unittest.TestCase):
def test_set_and_get_persist_between_loads(self):
with tempfile.TemporaryDirectory() as tmp:
path = Path(tmp) / "runtime.json"
runtime_state.configure(str(path))
runtime_state.set_state("foo", {"bar": 1})
self.assertEqual(runtime_state.get("foo"), {"bar": 1})
# Force a fresh in-memory state and load from disk again.
runtime_state._STATE = {} # type: ignore[attr-defined]
runtime_state._LOADED = False # type: ignore[attr-defined]
self.assertEqual(runtime_state.get("foo"), {"bar": 1})
raw = json.loads(path.read_text(encoding="utf-8"))
self.assertEqual(raw.get("foo"), {"bar": 1})
if __name__ == "__main__":
unittest.main()