Compare commits

..

62 Commits

Author SHA1 Message Date
b84107463c Add dedicated RAID alert category and monitor 2026-02-25 01:43:10 +03:00
ee361abb99 Detect md arrays via /proc/mdstat for RAID status 2026-02-25 01:39:11 +03:00
2ad423fb6a Fix md RAID detection for lsblk raid* types 2026-02-25 01:36:59 +03:00
efa5dd9644 Fix mojibake text and add md RAID checks 2026-02-25 01:32:55 +03:00
678332e6d0 Add lightweight unittest coverage for stability fixes 2026-02-15 01:25:11 +03:00
7c56430f32 Unify admin callback checks and log queue job failures 2026-02-15 01:20:55 +03:00
b54a094185 Add safe config fallbacks for app init and health checks 2026-02-15 01:16:58 +03:00
6d5fb9c258 Harden docker callback parsing and remove duplicate /openwrt handler 2026-02-15 01:12:45 +03:00
5099ae4fe2 Fix critical race conditions and unsafe disk report command 2026-02-15 01:12:41 +03:00
568cd86844 Fix heatmap button args 2026-02-15 00:51:09 +03:00
b138ee316d Import backup keyboard for SLA handlers 2026-02-15 00:46:53 +03:00
fa98a96b34 Route any SLA text to corresponding handler 2026-02-15 00:45:56 +03:00
1dba6d4a0f Match SLA buttons via regex 2026-02-15 00:44:14 +03:00
b784deb02b Ack SLA requests immediately 2026-02-15 00:35:32 +03:00
5ae54618e8 Broaden SLA button matching 2026-02-15 00:32:09 +03:00
3fc99bdcfc Handle SLA buttons without emojis 2026-02-15 00:30:39 +03:00
c1d69adbc5 Make incidents diff resilient and send sample if empty 2026-02-09 04:21:27 +03:00
a14fb8fccd Show recent sample when incidents diff is empty 2026-02-09 04:19:59 +03:00
4ba8f48228 Auto-reset incidents diff marker if ahead of log 2026-02-09 04:18:05 +03:00
10bf265c29 Add reset option to /incidents_diff 2026-02-09 04:16:28 +03:00
fd179d24e8 Remove Incidents entry from main keyboard 2026-02-09 04:13:47 +03:00
2905528677 Keep incidents summary inside logs keyboard 2026-02-09 04:12:44 +03:00
2b87ce04a3 Keep backup/queue SLA and OpenWrt leases diff in their menus 2026-02-09 04:10:04 +03:00
02b8e2bb55 Keep docker restarts inside docker keyboard 2026-02-09 04:08:27 +03:00
f0fb2aad0e Split OpenWrt menu vs full status actions 2026-02-09 04:06:49 +03:00
219776c642 Disambiguate OpenWrt menu vs full status button 2026-02-09 04:05:25 +03:00
28caa551bd Narrow /docker_health match to avoid summary collisions 2026-02-09 04:03:17 +03:00
783f4abd98 Use icon buttons for incidents, queue and OpenWrt actions 2026-02-09 04:00:04 +03:00
f71c02835a Adjust keyboards with incidents and OpenWrt submenus 2026-02-09 03:45:13 +03:00
f7081b78e1 Add incident exports, queue SLA, and OpenWrt diff utilities 2026-02-09 02:57:16 +03:00
0fbd374823 Log docker restarts as incidents 2026-02-09 02:45:06 +03:00
c3db70160c Use semicolon delimiter in incidents_export CSV 2026-02-09 02:32:50 +03:00
1b9d260530 Use BufferedInputFile for incidents_export 2026-02-09 02:31:24 +03:00
040a6c96e4 Seek to start before sending incidents export files 2026-02-09 02:30:17 +03:00
4f6d6dd549 Fix incidents_export file delivery 2026-02-09 02:28:49 +03:00
2e0bf0c6ea Add incidents export, queue alerts, and health summaries 2026-02-09 02:24:08 +03:00
5a4234f59d Log incidents even when alerts are muted 2026-02-09 02:09:32 +03:00
1d24caa2a2 Fix docker_status log_incident indentation 2026-02-09 02:04:15 +03:00
c91c961134 Tag incidents with categories for summaries 2026-02-09 02:03:04 +03:00
75113b6182 Add selftest scheduler, queue history, and OpenWrt signal stats 2026-02-09 01:56:27 +03:00
aa7bd85687 Filter restic forget parsing to ignore summary rows 2026-02-09 01:41:11 +03:00
ff65e15509 Beautify restic forget table in backup history 2026-02-09 01:39:06 +03:00
08fa95dffd Trim backup history output to fit Telegram 2026-02-09 01:35:41 +03:00
b0a4413671 Add runtime state, auto-mute schedules, and backup retries 2026-02-09 01:14:37 +03:00
9399be4168 Update help with alert shortcuts and docker/openwrt commands 2026-02-08 23:34:27 +03:00
2e35885a5e Fix cfg import in docker handler 2026-02-08 23:31:38 +03:00
4d4e3767bc Add weekly report, multi-admin, docker health cmd, backup tail, openwrt filters 2026-02-08 23:27:23 +03:00
b78dc3cd5c Limit /alerts handler to exact command (fix alias collisions) 2026-02-08 23:09:09 +03:00
20cd56a8c0 Add inline alerts menu with callbacks 2026-02-08 23:07:39 +03:00
7d251a7078 Fix alerts command dispatch indentation 2026-02-08 23:04:35 +03:00
2ee9756d12 Add shortcut commands for alerts, backup, docker, openwrt 2026-02-08 23:01:33 +03:00
77571da4d9 Add /help alias for inline help 2026-02-08 22:54:50 +03:00
d4a19d309f Add multi-page inline help 2026-02-08 22:52:40 +03:00
972c8eb6a7 Add alert tools, mutes, short status, and backup summary 2026-02-08 22:43:16 +03:00
ae2d085214 Allow critical-only load alerts 2026-02-08 18:51:45 +03:00
5da7125fbb Filter status network to enp interfaces 2026-02-08 04:30:57 +03:00
65682ca162 Add quiet hours, health checks, and logging 2026-02-08 04:19:28 +03:00
8bcc3c6878 Preserve restic env for backup commands 2026-02-08 04:02:35 +03:00
ab58592523 Use full restic JSON output 2026-02-08 03:56:15 +03:00
a98292604a Harden backup JSON parsing and fix queue display 2026-02-08 03:54:51 +03:00
97524b92a2 Fix 2026-02-08 03:48:45 +03:00
0a761e5799 Fix OpenWrt rate/lease mapping and queue pending 2026-02-08 03:48:04 +03:00
40 changed files with 2890 additions and 141 deletions

View File

@@ -6,10 +6,12 @@ This project uses `config.yaml`. Start from `config.example.yaml`.
- `token` (string, required): Telegram bot token.
- `admin_id` (int, required): Telegram user id with admin access.
- `admin_ids` (list<int>): Optional list of admins (first is primary for alerts).
## paths
- `artifact_state` (string): JSON file for artifact state.
- `runtime_state` (string): File for runtime state (mutes, metrics, etc.).
- `restic_env` (string): Path to a file with RESTIC_* environment variables.
## thresholds
@@ -23,11 +25,26 @@ This project uses `config.yaml`. Start from `config.example.yaml`.
- `enabled` (bool): Enable resource alerts.
- `interval_sec` (int): Poll interval.
- `cooldown_sec` (int): Cooldown between alerts.
- `notify_cooldown_sec` (int): Global alert dedup cooldown (defaults to `cooldown_sec`).
- `load_only_critical` (bool): Only send critical load alerts (no warn/OK).
- `quiet_hours` (object): Quiet hours for noncritical alerts.
- `enabled` (bool): Enable quiet hours.
- `start` (string): Start time `HH:MM` (e.g. `23:00`).
- `end` (string): End time `HH:MM` (e.g. `08:00`).
- `allow_critical` (bool): Allow critical alerts during quiet hours.
- `auto_mute` (list): Per-category auto mutes by time window.
- `category` (string): load/disk/smart/raid/ssl/docker/test.
- `start` (string): Start `HH:MM`.
- `end` (string): End `HH:MM` (can wrap over midnight).
- `auto_mute_on_high_load_sec` (int): auto-mute `load` category for N seconds on critical load (0 disables).
- `notify_recovery` (bool): Send recovery notifications.
- `smart_enabled` (bool): Enable SMART health polling.
- `smart_interval_sec` (int): SMART poll interval.
- `smart_cooldown_sec` (int): SMART alert cooldown.
- `smart_temp_warn` (int): SMART temperature warning (C).
- `raid_enabled` (bool): Enable md RAID polling (`/proc/mdstat`).
- `raid_interval_sec` (int): RAID poll interval.
- `raid_cooldown_sec` (int): RAID alert cooldown.
## disk_report
@@ -51,6 +68,34 @@ This project uses `config.yaml`. Start from `config.example.yaml`.
- `rotate_when` (string): Rotation schedule for `TimedRotatingFileHandler`. Example `W0` for weekly on Monday.
- `backup_count` (int): How many rotated files to keep.
## logging
- `enabled` (bool): Enable bot logging.
- `path` (string): Log file path. Default `/var/server-bot/bot.log`.
- `rotate_when` (string): Rotation schedule for `TimedRotatingFileHandler`. Example `W0` for weekly on Monday.
- `backup_count` (int): How many rotated files to keep.
- `level` (string): Log level (`INFO`, `WARNING`, `ERROR`).
## safety
- `dry_run` (bool): If `true`, dangerous actions (upgrade/reboot/backup) are skipped.
## reports
- `weekly.enabled` (bool): Enable weekly report.
- `weekly.day` (string): Weekday `Mon`..`Sun` (default `Sun`).
- `weekly.time` (string): Local time `HH:MM` (default `08:00`).
## selftest
- `schedule.enabled` (bool): Enable auto self-test.
- `schedule.time` (string): Local time `HH:MM` (default `03:30`).
## queue
- `max_pending_alert` (int): Alert if pending tasks >= this value.
- `avg_wait_alert` (int): Alert if average wait exceeds N seconds.
- `cooldown_sec` (int): Cooldown between queue alerts (default 300s).
## external_checks
- `enabled` (bool): Enable background checks.

View File

@@ -6,10 +6,12 @@
- `token` (string, обяз.): токен бота.
- `admin_id` (int, обяз.): Telegram user id администратора.
- `admin_ids` (list<int>): список админов (первый используется как основной для уведомлений).
## paths
- `artifact_state` (string): JSON файл состояния артефактов.
- `runtime_state` (string): файл с runtime-состоянием (мьюты, метрики и т.п.).
- `restic_env` (string): путь к файлу с RESTIC_* переменными.
## thresholds
@@ -23,11 +25,26 @@
- `enabled` (bool): включить алерты.
- `interval_sec` (int): интервал опроса.
- `cooldown_sec` (int): кулдаун между алертами.
- `notify_cooldown_sec` (int): глобальный дедуп алертов (по умолчанию `cooldown_sec`).
- `load_only_critical` (bool): слать только критичные алерты по нагрузке (без warn/OK).
- `quiet_hours` (object): тихие часы для не‑критичных уведомлений.
- `enabled` (bool): включить тихие часы.
- `start` (string): начало, формат `HH:MM` (например `23:00`).
- `end` (string): конец, формат `HH:MM` (например `08:00`).
- `allow_critical` (bool): слать критичные алерты в тишину.
- `auto_mute` (list): авто‑мьюты по категориям и времени.
- `category` (string): load/disk/smart/raid/ssl/docker/test.
- `start` (string): начало `HH:MM`.
- `end` (string): конец `HH:MM` (интервал может пересекать ночь).
- `auto_mute_on_high_load_sec` (int): при critical load автоматически мьютить категорию `load` на N секунд (0 — выкл).
- `notify_recovery` (bool): уведомлять о восстановлении.
- `smart_enabled` (bool): SMART проверки.
- `smart_interval_sec` (int): интервал SMART.
- `smart_cooldown_sec` (int): кулдаун SMART.
- `smart_temp_warn` (int): порог температуры (C).
- `raid_enabled` (bool): RAID проверки (`/proc/mdstat`).
- `raid_interval_sec` (int): интервал RAID.
- `raid_cooldown_sec` (int): кулдаун RAID алертов.
## disk_report
@@ -51,6 +68,35 @@
- `rotate_when` (string): режим ротации (`TimedRotatingFileHandler`), например `W0`.
- `backup_count` (int): сколько файлов хранить.
## logging
- `enabled` (bool): включить лог бота.
- `path` (string): путь к лог-файлу. По умолчанию `/var/server-bot/bot.log`.
- `rotate_when` (string): режим ротации (`TimedRotatingFileHandler`), например `W0`.
- `backup_count` (int): сколько файлов хранить.
- `level` (string): уровень логирования (`INFO`, `WARNING`, `ERROR`).
## safety
- `dry_run` (bool): если `true`, опасные действия (upgrade/reboot/backup) не выполняются.
## reports
- `weekly.enabled` (bool): включить еженедельный отчёт.
- `weekly.day` (string): день недели (`Mon`..`Sun`), по умолчанию `Sun`.
- `weekly.time` (string): локальное время `HH:MM`, по умолчанию `08:00`.
## selftest
- `schedule.enabled` (bool): включить авто self-test.
- `schedule.time` (string): локальное время `HH:MM`, по умолчанию `03:30`.
## queue
- `max_pending_alert` (int): алерт, если задач в очереди >= этому значению.
- `avg_wait_alert` (int): алерт, если среднее ожидание превышает N секунд.
- `cooldown_sec` (int): кулдаун между алертами очереди, по умолчанию 300с.
## external_checks
- `enabled` (bool): включить фоновые проверки.

View File

@@ -8,8 +8,9 @@ Telegram admin bot for Linux servers. Provides quick status checks, backup contr
- Arcane: list projects, refresh, up/down, restart.
- Backups (restic): snapshots, repo stats, run backup, queue, restic check, weekly report.
- System: disks, security, URLs health, metrics, package updates, upgrade, reboot, hardware info, SSL cert status (NPMplus).
- Alerts: disk/load and SMART monitoring with cooldown.
- Alerts: disk/load/SMART with cooldown and quiet hours.
- Audit log: all button presses and messages (weekly rotation).
- Logs: bot log rotation and incidents.
## Requirements
@@ -68,4 +69,5 @@ GNU GPL v3.0. Full text in `LICENSE`.
- For NPMplus with self-signed TLS, set `npmplus.verify_tls: false`.
- The bot uses `sudo` for certain actions (reboot, upgrade, backup scripts). Ensure the service user has the required permissions.
- Enable `safety.dry_run` if you want a safe mode without actions.
- Audit log default path is `/var/server-bot/audit.log`.

View File

@@ -8,8 +8,9 @@ Telegram-бот администратора для Linux-серверов. Да
- Arcane: список проектов, refresh, up/down, restart.
- Бэкапы (restic): снапшоты, статистика репозитория, запуск бэкапа, очередь, restic check, weekly report.
- Система: диски, безопасность, проверка URL, метрики, обновления, upgrade, reboot, железо, SSL (NPMplus).
- Алерты: диск/нагрузка и SMART с cooldown.
- Алерты: диск/нагрузка/SMART с cooldown и quiet hours.
- Аудит: все нажатия и сообщения (ротация раз в неделю).
- Логи: ротация логов бота и инциденты.
## Требования
@@ -68,4 +69,5 @@ GNU GPL v3.0. Полный текст в `LICENSE`.
- Для NPMplus с self-signed TLS установи `npmplus.verify_tls: false`.
- Бот использует `sudo` для части операций — настрой права.
- Включи `safety.dry_run`, если хочешь безопасный режим без действий.
- Аудит по умолчанию пишется в `/var/server-bot/audit.log`.

15
app.py
View File

@@ -1,13 +1,22 @@
from aiogram import Bot, Dispatcher
from config import load_cfg, load_env
from services import runtime_state
cfg = load_cfg()
TOKEN = cfg["telegram"]["token"]
ADMIN_ID = cfg["telegram"]["admin_id"]
admin_ids_cfg = cfg["telegram"].get("admin_ids")
if isinstance(admin_ids_cfg, list) and admin_ids_cfg:
ADMIN_IDS = [int(x) for x in admin_ids_cfg]
ADMIN_ID = ADMIN_IDS[0]
else:
ADMIN_ID = int(cfg["telegram"]["admin_id"])
ADMIN_IDS = [ADMIN_ID]
ARTIFACT_STATE = cfg["paths"]["artifact_state"]
RESTIC_ENV = load_env(cfg["paths"].get("restic_env", "/etc/restic/restic.env"))
paths_cfg = cfg.get("paths", {})
runtime_state.configure(paths_cfg.get("runtime_state", "/var/server-bot/runtime.json"))
ARTIFACT_STATE = paths_cfg.get("artifact_state", "/opt/tg-bot/state.json")
RESTIC_ENV = load_env(paths_cfg.get("restic_env", "/etc/restic/restic.env"))
DISK_WARN = int(cfg.get("thresholds", {}).get("disk_warn", 80))
LOAD_WARN = float(cfg.get("thresholds", {}).get("load_warn", 2.0))

View File

@@ -1,10 +1,10 @@
from aiogram.types import Message, CallbackQuery
from app import ADMIN_ID
from app import ADMIN_IDS
def is_admin_msg(msg: Message) -> bool:
return msg.from_user and msg.from_user.id == ADMIN_ID
return msg.from_user and msg.from_user.id in ADMIN_IDS
def is_admin_cb(cb: CallbackQuery) -> bool:
return cb.from_user and cb.from_user.id == ADMIN_ID
return cb.from_user and cb.from_user.id in ADMIN_IDS

View File

@@ -1,10 +1,14 @@
telegram:
token: "YOUR_TELEGRAM_BOT_TOKEN"
admin_id: 123456789
# Optional list of admins (first is primary for alerts)
admin_ids:
- 123456789
paths:
# JSON state file for artifacts
artifact_state: "/opt/tg-bot/state.json"
runtime_state: "/var/server-bot/runtime.json"
# Optional env file with RESTIC_* variables
restic_env: "/etc/restic/restic.env"
@@ -17,11 +21,31 @@ alerts:
enabled: true
interval_sec: 60
cooldown_sec: 900
# Optional global dedup cooldown for notify() calls
notify_cooldown_sec: 900
# If true, only critical load alerts are sent (no warn/OK)
load_only_critical: false
# Optional auto-mute windows per category
auto_mute:
- category: "load"
start: "23:00"
end: "08:00"
# Auto-mute load when critical load fires (seconds)
auto_mute_on_high_load_sec: 600
quiet_hours:
enabled: false
start: "23:00"
end: "08:00"
# Allow critical alerts during quiet hours
allow_critical: true
notify_recovery: true
smart_enabled: true
smart_interval_sec: 3600
smart_cooldown_sec: 21600
smart_temp_warn: 50
raid_enabled: true
raid_interval_sec: 300
raid_cooldown_sec: 1800
disk_report:
threshold: 90
@@ -42,6 +66,33 @@ incidents:
rotate_when: "W0"
backup_count: 8
logging:
enabled: true
path: "/var/server-bot/bot.log"
rotate_when: "W0"
backup_count: 8
level: "INFO"
safety:
# If true, dangerous actions will be skipped
dry_run: false
reports:
weekly:
enabled: false
day: "Sun" # Mon/Tue/Wed/Thu/Fri/Sat/Sun
time: "08:00" # HH:MM server local time
selftest:
schedule:
enabled: false
time: "03:30"
queue:
max_pending_alert: 5
avg_wait_alert: 120
cooldown_sec: 300
external_checks:
enabled: true
state_path: "/var/server-bot/external_checks.json"

9
deploy.sh Normal file
View File

@@ -0,0 +1,9 @@
#!/usr/bin/env bash
set -euo pipefail
SSH_HOST="root@10.10.10.10"
SSH_PORT="1090"
APP_DIR="/opt/tg-bot"
ssh -p "$SSH_PORT" "$SSH_HOST" \
"cd \"$APP_DIR\" && git pull --ff-only && systemctl restart tg-bot"

162
handlers/alerts_admin.py Normal file
View File

@@ -0,0 +1,162 @@
import time
from datetime import datetime, timedelta, timezone
from aiogram import F
from aiogram.types import Message, CallbackQuery, InlineKeyboardMarkup, InlineKeyboardButton
from app import dp, bot, cfg, ADMIN_ID
from auth import is_admin_msg
from services.alert_mute import set_mute, clear_mute, list_mutes
from services.incidents import read_recent, log_incident
from services.notify import notify
HELP_TEXT = (
"Alerts:\n"
"/alerts test <critical|warn|info> - send test alert\n"
"/alerts mute <category> <minutes> - mute alerts for category\n"
"/alerts unmute <category> - unmute category\n"
"/alerts list - show active mutes\n"
"/alerts recent [hours] - show incidents log (default 24h)\n"
"Categories: load, disk, smart, raid, ssl, docker, test\n"
)
def _dispatch(msg: Message, action: str, args: list[str]):
return {"action": action, "args": args}
async def _handle_alerts(msg: Message, action: str, args: list[str]):
if action == "test":
level = args[0].lower() if args else "info"
if level not in ("critical", "warn", "info"):
level = "info"
key = f"test:{level}:{int(time.time())}"
await notify(bot, msg.chat.id, f"[TEST] {level.upper()} alert", level=level, key=key, category="test")
await msg.answer(f"Sent test alert: {level}")
log_incident(cfg, f"alert_test level={level} by {msg.from_user.id}", category="test")
return
if action == "mute":
if len(args) < 1:
await msg.answer("Usage: /alerts mute <category> <minutes>")
return
category = args[0].lower()
minutes = 60
if len(args) >= 2:
try:
minutes = max(1, int(args[1]))
except ValueError:
minutes = 60
until = set_mute(category, minutes * 60)
dt = datetime.fromtimestamp(until, tz=timezone.utc).astimezone()
await msg.answer(f"🔕 Muted {category} for {minutes}m (until {dt:%Y-%m-%d %H:%M:%S})")
log_incident(cfg, f"alert_mute category={category} minutes={minutes} by {msg.from_user.id}", category=category)
return
if action == "unmute":
if len(args) < 1:
await msg.answer("Usage: /alerts unmute <category>")
return
category = args[0].lower()
clear_mute(category)
await msg.answer(f"🔔 Unmuted {category}")
log_incident(cfg, f"alert_unmute category={category} by {msg.from_user.id}", category=category)
return
if action in ("list", "mutes"):
mutes = list_mutes()
if not mutes:
await msg.answer("🔔 No active mutes")
return
lines = ["🔕 Active mutes:"]
for cat, secs in mutes.items():
mins = max(0, secs) // 60
lines.append(f"- {cat}: {mins}m left")
await msg.answer("\n".join(lines))
return
if action == "recent":
hours = 24
if args:
try:
hours = max(1, int(args[0]))
except ValueError:
hours = 24
rows = read_recent(cfg, hours, limit=50)
if not rows:
await msg.answer(f"No incidents in last {hours}h")
return
await msg.answer("🧾 Incidents:\n" + "\n".join(rows))
return
await msg.answer(HELP_TEXT)
ALERTS_KB = InlineKeyboardMarkup(
inline_keyboard=[
[
InlineKeyboardButton(text="List", callback_data="alerts:list"),
InlineKeyboardButton(text="Recent 24h", callback_data="alerts:recent:24"),
],
[
InlineKeyboardButton(text="Mute load 60m", callback_data="alerts:mute:load:60"),
InlineKeyboardButton(text="Unmute load", callback_data="alerts:unmute:load"),
],
[
InlineKeyboardButton(text="Test CRIT", callback_data="alerts:test:critical"),
InlineKeyboardButton(text="Test WARN", callback_data="alerts:test:warn"),
InlineKeyboardButton(text="Test INFO", callback_data="alerts:test:info"),
],
]
)
@dp.message(F.text.regexp(r"^/alerts(\\s|$)"))
async def alerts_cmd(msg: Message):
if not is_admin_msg(msg):
return
parts = msg.text.split()
if len(parts) < 2:
await msg.answer(HELP_TEXT, reply_markup=ALERTS_KB)
return
action = parts[1].lower()
args = parts[2:]
await _handle_alerts(msg, action, args)
@dp.message(F.text == "/alerts_list")
async def alerts_list(msg: Message):
if not is_admin_msg(msg):
return
await _handle_alerts(msg, "list", [])
@dp.message(F.text == "/alerts_recent")
async def alerts_recent(msg: Message):
if not is_admin_msg(msg):
return
await _handle_alerts(msg, "recent", ["24"])
@dp.message(F.text == "/alerts_mute_load")
async def alerts_mute_load(msg: Message):
if not is_admin_msg(msg):
return
await _handle_alerts(msg, "mute", ["load", "60"])
@dp.callback_query(F.data.startswith("alerts:"))
async def alerts_cb(cb: CallbackQuery):
if cb.from_user.id != ADMIN_ID:
await cb.answer()
return
parts = cb.data.split(":")
# formats: alerts:action or alerts:action:arg1:arg2
if len(parts) < 2:
await cb.answer()
return
action = parts[1]
args = parts[2:] if len(parts) > 2 else []
await _handle_alerts(cb.message, action, args)
await cb.answer()

View File

@@ -2,7 +2,7 @@ import asyncio
from datetime import datetime
from aiogram import F
from aiogram.types import Message, InlineKeyboardMarkup, InlineKeyboardButton, CallbackQuery
from app import dp, cfg
from app import dp, cfg, ADMIN_IDS
from auth import is_admin_msg
from keyboards import docker_kb, arcane_kb
from services.arcane import list_projects, restart_project, set_project_state, get_project_details
@@ -115,7 +115,7 @@ async def arcane_refresh(msg: Message):
@dp.callback_query(F.data == "arcane:refresh")
async def arcane_refresh_inline(cb: CallbackQuery):
if cb.from_user.id != cfg["telegram"]["admin_id"]:
if cb.from_user.id not in ADMIN_IDS:
return
await cb.answer()
await cmd_arcane_projects(cb.message, edit=True)
@@ -123,7 +123,7 @@ async def arcane_refresh_inline(cb: CallbackQuery):
@dp.callback_query(F.data.startswith("arcane:page:"))
async def arcane_page(cb: CallbackQuery):
if cb.from_user.id != cfg["telegram"]["admin_id"]:
if cb.from_user.id not in ADMIN_IDS:
return
try:
page = int(cb.data.split(":", 2)[2])
@@ -141,7 +141,7 @@ async def arcane_page(cb: CallbackQuery):
@dp.callback_query(F.data.startswith("arcane:restart:"))
async def arcane_restart(cb: CallbackQuery):
if cb.from_user.id != cfg["telegram"]["admin_id"]:
if cb.from_user.id not in ADMIN_IDS:
return
_, _, pid = cb.data.split(":", 2)
@@ -160,7 +160,7 @@ async def arcane_restart(cb: CallbackQuery):
@dp.callback_query(F.data.startswith("arcane:details:"))
async def arcane_details(cb: CallbackQuery):
if cb.from_user.id != cfg["telegram"]["admin_id"]:
if cb.from_user.id not in ADMIN_IDS:
return
_, _, pid = cb.data.split(":", 2)
@@ -208,7 +208,7 @@ async def arcane_details(cb: CallbackQuery):
@dp.callback_query(F.data.startswith("arcane:deploy:"))
async def arcane_deploy_status(cb: CallbackQuery):
if cb.from_user.id != cfg["telegram"]["admin_id"]:
if cb.from_user.id not in ADMIN_IDS:
return
_, _, pid = cb.data.split(":", 2)
@@ -254,7 +254,7 @@ async def arcane_deploy_status(cb: CallbackQuery):
@dp.callback_query(F.data.startswith("arcane:up:"))
async def arcane_up(cb: CallbackQuery):
if cb.from_user.id != cfg["telegram"]["admin_id"]:
if cb.from_user.id not in ADMIN_IDS:
return
_, _, pid = cb.data.split(":", 2)
@@ -273,7 +273,7 @@ async def arcane_up(cb: CallbackQuery):
@dp.callback_query(F.data.startswith("arcane:down:"))
async def arcane_down(cb: CallbackQuery):
if cb.from_user.id != cfg["telegram"]["admin_id"]:
if cb.from_user.id not in ADMIN_IDS:
return
_, _, pid = cb.data.split(":", 2)

View File

@@ -1,15 +1,17 @@
import asyncio
import json
import os
from datetime import datetime
from aiogram import F
from aiogram.types import Message, InlineKeyboardMarkup, InlineKeyboardButton, CallbackQuery
from app import dp
from app import dp, cfg
from auth import is_admin_msg, is_admin_cb
from keyboards import backup_kb
from lock_utils import acquire_lock, release_lock
from services.queue import enqueue, format_status, format_details
from services.queue import enqueue, format_status, format_details, format_history
from services.backup import backup_badge, restore_help
from services.runner import run_cmd
from services.runner import run_cmd, run_cmd_full
from services.incidents import log_incident
def _parse_systemctl_kv(raw: str) -> dict[str, str]:
@@ -30,6 +32,156 @@ async def _unit_status(unit: str, props: list[str]) -> dict[str, str]:
return _parse_systemctl_kv(out)
def _sudo_cmd(cmd: list[str]) -> list[str]:
if os.geteuid() == 0:
return cmd
return ["sudo", "-E"] + cmd
def _format_backup_result(rc: int, out: str) -> str:
log_path = "/var/log/backup-auto.log"
header = "✅ Backup finished" if rc == 0 else "❌ Backup failed"
lines = out.strip().splitlines()
body = "\n".join(lines[:20])
if len(lines) > 20:
body += f"\n… trimmed {len(lines) - 20} lines"
extra = ""
if rc != 0 and os.path.exists(log_path):
try:
tail = ""
with open(log_path, "r", encoding="utf-8", errors="replace") as f:
tail_lines = f.readlines()[-40:]
tail = "".join(tail_lines).strip()
if tail:
extra = "\n\nLog tail:\n" + tail
except Exception:
pass
base = f"{header} (rc={rc})\nlog: {log_path}"
if body:
base += "\n\n" + body
if extra:
base += extra
return base
def _tail(path: str, lines: int = 120) -> str:
if not os.path.exists(path):
return f"⚠️ Log not found: {path}"
try:
with open(path, "r", encoding="utf-8", errors="replace") as f:
data = f.readlines()[-lines:]
except Exception as e:
return f"⚠️ Failed to read log: {e}"
return "".join(data).strip() or "(empty)"
def _beautify_restic_forget(raw: str) -> str | None:
"""
Parse restic forget output tables into a compact bullet list.
"""
if "Reasons" not in raw or "Paths" not in raw:
return None
import re
lines = raw.splitlines()
headers = []
for idx, line in enumerate(lines):
if line.startswith("ID") and "Reasons" in line and "Paths" in line:
headers.append(idx)
if not headers:
return None
def _valid_id(val: str) -> bool:
return bool(re.fullmatch(r"[0-9a-f]{7,64}", val.strip()))
def parse_block(start_idx: int, end_idx: int) -> list[dict]:
header = lines[start_idx]
cols = ["ID", "Time", "Host", "Tags", "Reasons", "Paths", "Size"]
positions = []
for name in cols:
pos = header.find(name)
if pos == -1:
return []
positions.append(pos)
positions.append(len(header))
entries: list[dict] = []
current: dict | None = None
for line in lines[start_idx + 2 : end_idx]:
if not line.strip():
continue
segments = []
for i in range(len(cols)):
segments.append(line[positions[i] : positions[i + 1]].strip())
row = dict(zip(cols, segments))
if row["ID"] and _valid_id(row["ID"]):
current = {
"id": row["ID"],
"time": row["Time"],
"host": row["Host"],
"size": row["Size"],
"tags": row["Tags"],
"reasons": [],
"paths": [],
}
if row["Reasons"]:
current["reasons"].append(row["Reasons"])
if row["Paths"]:
current["paths"].append(row["Paths"])
entries.append(current)
elif current:
if row["Reasons"] and not row["Reasons"].startswith("-"):
current["reasons"].append(row["Reasons"])
if row["Paths"] and not row["Paths"].startswith("-"):
current["paths"].append(row["Paths"])
return entries
blocks = []
for i, start in enumerate(headers):
end = headers[i + 1] if i + 1 < len(headers) else len(lines)
entries = parse_block(start, end)
if not entries:
continue
label = "Plan"
prev_line = lines[start - 1].lower() if start - 1 >= 0 else ""
prev2 = lines[start - 2].lower() if start - 2 >= 0 else ""
if "keep" in prev_line:
label = prev_line.strip()
elif "keep" in prev2:
label = prev2.strip()
elif "snapshots" in prev_line:
label = prev_line.strip()
blocks.append((label, entries))
if not blocks:
return None
out_lines = []
for label, entries in blocks:
out_lines.append(f"📦 {label}")
for e in entries:
head = f"🧉 {e['id']} | {e['time']} | {e['host']} | {e['size'] or 'n/a'}"
out_lines.append(head)
if e["reasons"]:
out_lines.append(" 📌 " + "; ".join(e["reasons"]))
if e["paths"]:
for p in e["paths"]:
out_lines.append(f"{p}")
out_lines.append("")
return "\n".join(out_lines).rstrip()
def _load_json(raw: str, label: str) -> tuple[bool, object | None, str]:
if not raw or not raw.strip():
return False, None, f"? {label} returned empty output"
try:
return True, json.loads(raw), ""
except json.JSONDecodeError:
preview = raw.strip().splitlines()
head = preview[0] if preview else "invalid output"
return False, None, f"? {label} invalid JSON: {head}"
async def send_backup_jobs_status(msg: Message):
services = [
("backup-auto", "backup-auto.timer"),
@@ -69,7 +221,7 @@ async def cmd_repo_stats(msg: Message):
await msg.answer("⏳ Loading repo stats…", reply_markup=backup_kb)
# --- restore-size stats ---
rc1, raw1 = await run_cmd(
rc1, raw1 = await run_cmd_full(
["restic", "stats", "--json"],
use_restic_env=True,
timeout=30
@@ -78,10 +230,14 @@ async def cmd_repo_stats(msg: Message):
await msg.answer(raw1, reply_markup=backup_kb)
return
restore = json.loads(raw1)
ok, restore, err = _load_json(raw1, "restic stats")
if not ok:
await msg.answer(err, reply_markup=backup_kb)
return
# --- raw-data stats ---
rc2, raw2 = await run_cmd(
rc2, raw2 = await run_cmd_full(
["restic", "stats", "--json", "--mode", "raw-data"],
use_restic_env=True,
timeout=30
@@ -90,15 +246,26 @@ async def cmd_repo_stats(msg: Message):
await msg.answer(raw2, reply_markup=backup_kb)
return
raw = json.loads(raw2)
ok, raw, err = _load_json(raw2, "restic stats raw-data")
if not ok:
await msg.answer(err, reply_markup=backup_kb)
return
# --- snapshots count ---
rc3, raw_snaps = await run_cmd(
rc3, raw_snaps = await run_cmd_full(
["restic", "snapshots", "--json"],
use_restic_env=True,
timeout=20
)
snaps = len(json.loads(raw_snaps)) if rc3 == 0 else "n/a"
if rc3 != 0:
snaps = "n/a"
else:
ok, snap_data, err = _load_json(raw_snaps, "restic snapshots")
if ok and isinstance(snap_data, list):
snaps = len(snap_data)
else:
snaps = "n/a"
msg_text = (
"📦 **Repository stats**\n\n"
@@ -115,7 +282,7 @@ async def cmd_backup_status(msg: Message):
await msg.answer("⏳ Loading snapshots…", reply_markup=backup_kb)
async def worker():
rc, raw = await run_cmd(
rc, raw = await run_cmd_full(
["restic", "snapshots", "--json"],
use_restic_env=True,
timeout=30
@@ -124,7 +291,10 @@ async def cmd_backup_status(msg: Message):
await msg.answer(raw, reply_markup=backup_kb)
return
snaps = json.loads(raw)
ok, snaps, err = _load_json(raw, "restic snapshots")
if not ok or not isinstance(snaps, list):
await msg.answer(err, reply_markup=backup_kb)
return
if not snaps:
await msg.answer("📦 Snapshots: none", reply_markup=backup_kb)
return
@@ -163,7 +333,14 @@ async def cmd_backup_status(msg: Message):
async def cmd_backup_now(msg: Message):
await schedule_backup(msg)
async def schedule_backup(msg: Message):
async def job():
if cfg.get("safety", {}).get("dry_run", False):
await msg.answer("🧪 Dry-run: backup skipped", reply_markup=backup_kb)
return
if not acquire_lock("backup"):
await msg.answer("⚠️ Backup уже выполняется", reply_markup=backup_kb)
return
@@ -171,20 +348,36 @@ async def cmd_backup_now(msg: Message):
await msg.answer("▶️ Backup запущен", reply_markup=backup_kb)
try:
rc, out = await run_cmd(["sudo", "/usr/local/bin/backup.py", "restic-backup"], timeout=6 * 3600)
await msg.answer(("✅ OK\n" if rc == 0 else "❌ FAIL\n") + out, reply_markup=backup_kb)
rc, out = await run_cmd(
_sudo_cmd(["/usr/local/bin/backup.py", "restic-backup"]),
use_restic_env=True,
timeout=6 * 3600,
)
kb = backup_kb
if rc != 0:
kb = InlineKeyboardMarkup(
inline_keyboard=[
[InlineKeyboardButton(text="🔁 Retry backup", callback_data="backup:retry")]
]
)
await msg.answer(_format_backup_result(rc, out), reply_markup=kb)
finally:
release_lock("backup")
pos = await enqueue("backup", job)
await msg.answer(f"🕓 Backup queued (#{pos})", reply_markup=backup_kb)
try:
from services.incidents import log_incident
log_incident(cfg, f"backup_queued by {msg.from_user.id}", category="backup")
except Exception:
pass
async def cmd_last_snapshot(msg: Message):
await msg.answer("⏳ Loading last snapshot…", reply_markup=backup_kb)
async def worker():
rc, raw = await run_cmd(
rc, raw = await run_cmd_full(
["restic", "snapshots", "--json"],
use_restic_env=True,
timeout=20
@@ -193,7 +386,10 @@ async def cmd_last_snapshot(msg: Message):
await msg.answer(raw, reply_markup=backup_kb)
return
snaps = json.loads(raw)
ok, snaps, err = _load_json(raw, "restic snapshots")
if not ok or not isinstance(snaps, list):
await msg.answer(err, reply_markup=backup_kb)
return
if not snaps:
await msg.answer("📦 Snapshots: none", reply_markup=backup_kb)
return
@@ -203,7 +399,7 @@ async def cmd_last_snapshot(msg: Message):
t = datetime.fromisoformat(s["time"].replace("Z", "+00:00"))
short_id = s["short_id"]
rc2, raw2 = await run_cmd(
rc2, raw2 = await run_cmd_full(
["restic", "stats", short_id, "--json"],
use_restic_env=True,
timeout=20
@@ -212,7 +408,10 @@ async def cmd_last_snapshot(msg: Message):
await msg.answer(raw2, reply_markup=backup_kb)
return
stats = json.loads(raw2)
ok, stats, err = _load_json(raw2, f"restic stats {short_id}")
if not ok or not isinstance(stats, dict):
await msg.answer(err, reply_markup=backup_kb)
return
msg_text = (
"📦 **Last snapshot**\n\n"
@@ -269,6 +468,12 @@ async def br(msg: Message):
await cmd_backup_now(msg)
@dp.message(F.text == "/backup_run")
async def br_cmd(msg: Message):
if is_admin_msg(msg):
await schedule_backup(msg)
@dp.message(F.text == "🧪 Restic check")
async def rc(msg: Message):
if not is_admin_msg(msg):
@@ -276,8 +481,19 @@ async def rc(msg: Message):
async def job():
await msg.answer("🧪 Restic check запущен", reply_markup=backup_kb)
rc2, out = await run_cmd(["sudo", "/usr/local/bin/restic-check.sh"], timeout=6 * 3600)
await msg.answer(("✅ OK\n" if rc2 == 0 else "❌ FAIL\n") + out, reply_markup=backup_kb)
rc2, out = await run_cmd(
_sudo_cmd(["/usr/local/bin/restic-check.sh"]),
use_restic_env=True,
timeout=6 * 3600,
)
kb = backup_kb
if rc2 != 0:
kb = InlineKeyboardMarkup(
inline_keyboard=[
[InlineKeyboardButton(text="🔁 Retry restic check", callback_data="backup:retry_check")]
]
)
await msg.answer(("✅ OK\n" if rc2 == 0 else "❌ FAIL\n") + out, reply_markup=kb)
pos = await enqueue("restic-check", job)
await msg.answer(f"🕓 Restic check queued (#{pos})", reply_markup=backup_kb)
@@ -290,7 +506,11 @@ async def wr(msg: Message):
async def job():
await msg.answer("📬 Weekly report запущен", reply_markup=backup_kb)
rc2, out = await run_cmd(["sudo", "/usr/local/bin/weekly-report.sh"], timeout=3600)
rc2, out = await run_cmd(
_sudo_cmd(["/usr/local/bin/weekly-report.sh"]),
use_restic_env=True,
timeout=3600,
)
await msg.answer(("✅ OK\n" if rc2 == 0 else "❌ FAIL\n") + out, reply_markup=backup_kb)
pos = await enqueue("weekly-report", job)
@@ -301,3 +521,55 @@ async def wr(msg: Message):
async def rh(msg: Message):
if is_admin_msg(msg):
await msg.answer(restore_help(), reply_markup=backup_kb)
@dp.message(F.text == "📜 History")
@dp.message(F.text == "/backup_history")
async def backup_history(msg: Message):
if not is_admin_msg(msg):
return
log_path = "/var/log/backup-auto.log"
content = _tail(log_path, lines=160)
if content.startswith("⚠️"):
await msg.answer(content, reply_markup=backup_kb)
return
pretty = _beautify_restic_forget(content)
trimmed = False
max_len = 3500
if len(content) > max_len:
content = content[-max_len:]
trimmed = True
header = "📜 Backup history (tail)"
if trimmed:
header += " (trimmed)"
if pretty:
await msg.answer(f"{header}\n`{log_path}`\n\n{pretty}", reply_markup=backup_kb)
else:
await msg.answer(
f"{header}\n`{log_path}`\n```\n{content}\n```",
reply_markup=backup_kb,
parse_mode="Markdown",
)
@dp.message(F.text == "/queue_history")
async def queue_history(msg: Message):
if not is_admin_msg(msg):
return
await msg.answer(format_history(), reply_markup=backup_kb)
@dp.callback_query(F.data == "backup:retry")
async def backup_retry(cb: CallbackQuery):
if not is_admin_cb(cb):
return
await cb.answer("Queuing backup…")
await schedule_backup(cb.message)
@dp.callback_query(F.data == "backup:retry_check")
async def backup_retry_check(cb: CallbackQuery):
if not is_admin_cb(cb):
return
await cb.answer("Queuing restic check…")
await rc(cb.message)

View File

@@ -2,8 +2,10 @@ import json
import time
from aiogram import F
from aiogram.types import CallbackQuery, InlineKeyboardMarkup, InlineKeyboardButton
from app import dp, ADMIN_ID
from app import dp, ADMIN_ID, cfg
from services.docker import docker_cmd
from services.incidents import log_incident
from services.runner import run_cmd
from state import DOCKER_MAP, LOG_FILTER_PENDING
from handlers.backup import cmd_backup_status
@@ -13,8 +15,15 @@ async def docker_callback(cb: CallbackQuery):
if cb.from_user.id != ADMIN_ID:
return
_, action, alias = cb.data.split(":", 2)
real = DOCKER_MAP[alias]
try:
_, action, alias = cb.data.split(":", 2)
except ValueError:
await cb.answer("Bad request")
return
real = DOCKER_MAP.get(alias)
if not real:
await cb.answer("Container not found")
return
if action == "restart":
await cb.answer("Restarting…")
@@ -24,6 +33,10 @@ async def docker_callback(cb: CallbackQuery):
f"🔄 **{alias} restarted**\n```{out}```",
parse_mode="Markdown"
)
try:
log_incident(cfg, f"docker_restart {alias}", category="docker")
except Exception:
pass
elif action == "logs":
await cb.answer()
@@ -54,7 +67,7 @@ async def snapshot_details(cb: CallbackQuery):
snap_id = cb.data.split(":", 1)[1]
await cb.answer("Loading snapshot…")
# получаем статистику snapshot
# получаем статистику snapshot
rc, raw = await run_cmd(
["restic", "stats", snap_id, "--json"],
use_restic_env=True,

24
handlers/config_check.py Normal file
View File

@@ -0,0 +1,24 @@
from aiogram import F
from aiogram.types import Message
from app import dp, cfg
from auth import is_admin_msg
from services.config_check import validate_cfg
@dp.message(F.text == "/config_check")
async def config_check(msg: Message):
if not is_admin_msg(msg):
return
errors, warnings = validate_cfg(cfg)
lines = []
if errors:
lines.append("❌ Config errors:")
lines += [f"- {e}" for e in errors]
if warnings:
if lines:
lines.append("")
lines.append("⚠️ Warnings:")
lines += [f"- {w}" for w in warnings]
if not lines:
lines.append("✅ Config looks OK")
await msg.answer("\n".join(lines))

View File

@@ -1,11 +1,13 @@
from aiogram import F
from aiogram.types import Message
from app import dp
from app import dp, cfg
from auth import is_admin_msg
from keyboards import docker_kb, docker_inline_kb
from services.docker import container_uptime, docker_cmd
from services.incidents import log_incident
from state import DOCKER_MAP, LOG_FILTER_PENDING
import time
import json
async def cmd_docker_status(msg: Message):
@@ -42,7 +44,7 @@ async def cmd_docker_status(msg: Message):
lines.append(f"{icon} {alias}: {status} ({up})")
await msg.answer("\n".join(lines), reply_markup=docker_kb)
log_incident(cfg, f"docker_status by {msg.from_user.id}", category="docker")
except Exception as e:
# ⬅️ КРИТИЧЕСКИ ВАЖНО
await msg.answer(
@@ -77,6 +79,82 @@ async def ds(msg: Message):
await cmd_docker_status(msg)
@dp.message(F.text == "/docker_status")
async def ds_cmd(msg: Message):
if is_admin_msg(msg):
await cmd_docker_status(msg)
@dp.message(F.text, F.func(lambda m: (m.text or "").split()[0] == "/docker_health"))
async def docker_health(msg: Message):
if not is_admin_msg(msg):
return
parts = msg.text.split()
if len(parts) < 2:
await msg.answer("Usage: /docker_health <alias>")
return
alias = parts[1]
real = DOCKER_MAP.get(alias)
if not real:
await msg.answer(f"⚠️ Unknown container: {alias}", reply_markup=docker_kb)
return
rc, out = await docker_cmd(["inspect", "-f", "{{json .State.Health}}", real], timeout=10)
if rc != 0 or not out.strip():
await msg.answer(f"⚠️ Failed to get health for {alias}", reply_markup=docker_kb)
return
try:
data = json.loads(out)
except json.JSONDecodeError:
await msg.answer(f"⚠️ Invalid health JSON for {alias}", reply_markup=docker_kb)
return
status = data.get("Status", "n/a")
fail = data.get("FailingStreak", "n/a")
logs = data.get("Log") or []
lines = [f"🐳 {alias} health", f"Status: {status}", f"Failing streak: {fail}"]
if logs:
lines.append("Recent logs:")
for entry in logs[-5:]:
if not isinstance(entry, dict):
continue
ts = entry.get("Start") or entry.get("End") or ""
exitc = entry.get("ExitCode", "")
out_line = entry.get("Output", "").strip()
lines.append(f"- {ts} rc={exitc} {out_line}")
await msg.answer("\n".join(lines), reply_markup=docker_kb)
log_incident(cfg, f"docker_health alias={alias} by {msg.from_user.id}", category="docker")
@dp.message(F.text == "/docker_health_summary")
async def docker_health_summary(msg: Message):
if not is_admin_msg(msg):
return
if not DOCKER_MAP:
await msg.answer("⚠️ DOCKER_MAP пуст", reply_markup=docker_kb)
return
problems = []
total = len(DOCKER_MAP)
for alias, real in DOCKER_MAP.items():
rc, out = await docker_cmd(["inspect", "-f", "{{json .State}}", real], timeout=10)
if rc != 0:
problems.append(f"{alias}: inspect error")
continue
try:
state = json.loads(out)
except Exception:
problems.append(f"{alias}: bad JSON")
continue
status = state.get("Status", "n/a")
health = (state.get("Health") or {}).get("Status", "n/a")
if status != "running" or health not in ("healthy", "none"):
problems.append(f"{alias}: {status}/{health}")
ok = total - len(problems)
lines = [f"🐳 Docker health: 🟢 {ok}/{total} healthy, 🔴 {len(problems)} issues"]
if problems:
lines.append("Problems:")
lines.extend([f"- {p}" for p in problems])
await msg.answer("\n".join(lines), reply_markup=docker_kb)
@dp.message(F.text == "📈 Stats")
async def dstats(msg: Message):
if not is_admin_msg(msg):

View File

@@ -1,24 +1,164 @@
from aiogram import F
from aiogram.types import Message
from app import dp
from aiogram.types import Message, CallbackQuery, InlineKeyboardMarkup, InlineKeyboardButton
from app import dp, ADMIN_ID
from auth import is_admin_msg
from keyboards import menu_kb
@dp.message(F.text.in_({" Help", " Help", "Help"}))
HELP_PAGES = [
(
"Overview",
" **Help — Overview**\n\n"
"🩺 *Health* — быстрый health-check.\n"
"📊 *Статус* — общая загрузка.\n"
"📋 */status_short* — кратко (load/RAM/диски).\n"
"🩺 */health_short* — краткий health.\n"
"🧪 */selftest* — health + restic snapshot probe.\n"
"🔧 Разделы: Docker, Backup, Artifacts, System, OpenWrt.",
),
(
"Alerts",
"🚨 **Alerts & Mute**\n\n"
"Команды:\n"
"• `/alerts test <critical|warn|info>`\n"
"• `/alerts mute <cat> <minutes>` / `/alerts unmute <cat>` / `/alerts list`\n"
"• `/alerts recent [hours]`\n"
"Шорткаты: `/alerts_list`, `/alerts_recent`, `/alerts_mute_load` (60м).\n"
"Категории: load, disk, smart, raid, ssl, docker, test.\n"
"Quiet hours: `alerts.quiet_hours` для не‑критичных.\n"
"Авто-мьют: `alerts.auto_mute` со слотами времени.\n"
"Только красные load: `alerts.load_only_critical: true`.\n"
"Валидатор конфига: `/config_check`.",
),
(
"Backup",
"💾 **Backup (restic)**\n\n"
"Кнопки: Status, Last snapshot, Repo stats, Run backup, Queue, Restic check, Weekly report, History.\n"
"History — хвост `/var/log/backup-auto.log`.\n"
"Fail → кнопка Retry (backup/check).\n"
"Run backup/Check учитывают `safety.dry_run`.\n"
"После бэкапа приходит TL;DR + путь к логу `/var/log/backup-auto.log`.\n"
"Queue → Details показывает отложенные задачи.",
),
(
"Docker & System",
"🐳 **Docker**\n"
"Status/Restart/Logs/Stats — клавиатура Docker.\n"
"Команды: `/docker_status`, `/docker_health <alias>`.\n\n"
"🖥 **System**\n"
"Info: Disks/Security/Metrics/Hardware/SMART/OpenWrt.\n"
"Ops: Updates/Upgrade/Reboot.\n"
"Logs: Audit/Incidents/Security/Integrations/Processes.\n"
"OpenWrt: `/openwrt`, `/openwrt_wan`, `/openwrt_clients`, `/openwrt_leases`.",
),
(
"Admin",
"🛠 **Admin & Deploy**\n\n"
"Config: `/config_check`, файл `config.yaml` (см. config.example.yaml).\n"
"Deploy: `deploy.sh` (ssh 10.10.10.10:1090 → git pull → systemctl restart tg-bot).\n"
"Incidents: `/incidents_summary`, `/incidents_diff [hours]`.\n"
"Export: `/incidents_export [hours] [csv|json]`, `/export_all [hours]` (zip).\n"
"Alerts log/heatmap: `/alerts_log [hours]`, `/alerts_heatmap [hours] [cat]`.\n"
"Backup SLA: `/backup_sla`; Docker restarts: `/docker_restarts [hours]`.\n"
"Disk snapshot: `/disk_snapshot`.\n"
"Queue: `/queue_history`, `/queue_sla`.\n"
"Self-test history: `/selftest_history`.\n"
"OpenWrt leases diff: `/openwrt_leases_diff`.\n"
"BotFather list: `/botfather_list`.\n"
"Безопасность: `safety.dry_run: true` блокирует опасные действия.\n"
"OpenWrt: кнопка в System → Info.",
),
]
def _help_kb(idx: int) -> InlineKeyboardMarkup:
buttons = []
if idx > 0:
buttons.append(InlineKeyboardButton(text="◀️ Prev", callback_data=f"help:{idx-1}"))
buttons.append(InlineKeyboardButton(text=f"{idx+1}/{len(HELP_PAGES)}", callback_data="help:noop"))
if idx < len(HELP_PAGES) - 1:
buttons.append(InlineKeyboardButton(text="Next ▶️", callback_data=f"help:{idx+1}"))
return InlineKeyboardMarkup(inline_keyboard=[buttons])
def _help_text(idx: int) -> str:
_title, body = HELP_PAGES[idx]
return body
@dp.message(F.text.in_({" Help", " Help", "Help", "/help"}))
async def help_cmd(msg: Message):
if not is_admin_msg(msg):
return
idx = 0
await msg.answer(
" **Help / Справка**\n\n"
"🩺 Health — быстрый health-check сервера\n"
"📊 Статус — общая загрузка сервера\n"
"🐳 Docker — управление контейнерами\n"
"📦 Backup — restic бэкапы\n"
"🧉 Artifacts — критичные образы (Clonezilla, NAND)\n"
"⚙️ System — подменю: Info / Ops / Logs\n\n"
"Inline-кнопки используются для выбора контейнеров.",
reply_markup=menu_kb,
_help_text(idx),
reply_markup=_help_kb(idx),
parse_mode="Markdown",
)
@dp.callback_query(F.data.startswith("help:"))
async def help_cb(cb: CallbackQuery):
if cb.from_user.id != ADMIN_ID:
await cb.answer()
return
payload = cb.data.split(":", 1)[1]
if payload == "noop":
await cb.answer()
return
try:
idx = int(payload)
except ValueError:
await cb.answer()
return
idx = max(0, min(idx, len(HELP_PAGES) - 1))
await cb.message.edit_text(
_help_text(idx),
reply_markup=_help_kb(idx),
parse_mode="Markdown",
)
await cb.answer()
BOTFATHER_LIST = """\
help - Show help pages
status_short - Compact host status
health_short - Compact health report
selftest - Health + restic snapshot probe
alerts - Manage alerts
alerts_list - List active mutes
alerts_recent - Show recent incidents (24h)
alerts_mute_load - Mute load alerts for 60m
alerts_log - Show suppressed alerts
alerts_heatmap - Hourly incidents heatmap
backup_run - Run backup (queued)
backup_history - Show backup log tail
queue_history - Show queue recent jobs
queue_sla - Queue SLA stats
docker_status - Docker summary
docker_health - Docker inspect/health by alias
docker_health_summary - Docker health summary (problems only)
openwrt - Full OpenWrt status
openwrt_wan - OpenWrt WAN only
openwrt_clients - OpenWrt wifi clients
openwrt_leases - OpenWrt DHCP leases
openwrt_fast - OpenWrt quick WAN view
openwrt_leases_diff - OpenWrt DHCP diff
incidents_summary - Incidents counters (24h/7d)
incidents_export - Export incidents (hours fmt)
incidents_diff - Show incidents since last check
export_all - Zip with incidents/queue/selftest
backup_sla - Backup SLA check
docker_restarts - Docker restart history
selftest_history - Self-test history
disk_snapshot - Disk usage snapshot
config_check - Validate config
"""
@dp.message(F.text == "/botfather_list")
async def botfather_list(msg: Message):
if not is_admin_msg(msg):
return
await msg.answer(f"Commands for BotFather:\n```\n{BOTFATHER_LIST}\n```", parse_mode="Markdown")

View File

@@ -1,4 +1,5 @@
import asyncio
import json
import socket
import time
import psutil
@@ -10,6 +11,8 @@ from keyboards import menu_kb
from services.system import format_disks
from services.health import health
from state import DOCKER_MAP
from services.runner import run_cmd_full
from services.selftest import run_selftest
async def cmd_status(msg: Message):
@@ -76,6 +79,59 @@ async def st(msg: Message):
await cmd_status(msg)
@dp.message(F.text == "/status_short")
async def st_short(msg: Message):
if not is_admin_msg(msg):
return
now = time.time()
uptime_sec = int(now - psutil.boot_time())
days, rem = divmod(uptime_sec, 86400)
hours, rem = divmod(rem, 3600)
minutes, _ = divmod(rem, 60)
load1, load5, load15 = psutil.getloadavg()
mem = psutil.virtual_memory()
disks = format_disks().splitlines()
disk_line = disks[1] if len(disks) > 1 else "Disks: n/a"
await msg.answer(
"📋 **Status (short)**\n"
f"🖥 `{socket.gethostname()}`\n"
f"⏱ Uptime: {days}d {hours}h {minutes}m\n"
f"⚙️ Load: {load1:.2f} {load5:.2f} {load15:.2f}\n"
f"🧠 RAM: {mem.percent}% ({mem.used // (1024**3)} / {mem.total // (1024**3)} GiB)\n"
f"💾 {disk_line}",
reply_markup=menu_kb,
parse_mode="Markdown",
)
@dp.message(F.text == "/health_short")
async def health_short(msg: Message):
if not is_admin_msg(msg):
return
try:
text = await asyncio.to_thread(health, cfg, DOCKER_MAP)
except Exception as e:
await msg.answer(f"❌ Health failed: {type(e).__name__}: {e}", reply_markup=menu_kb)
return
lines = [ln for ln in text.splitlines() if ln.strip()]
brief = " | ".join(lines[1:5]) if len(lines) > 1 else text
await msg.answer(f"🩺 Health (short)\n{brief}", reply_markup=menu_kb)
@dp.message(F.text.in_({"🧪 Self-test", "/selftest"}))
async def selftest(msg: Message):
if not is_admin_msg(msg):
return
await msg.answer("⏳ Self-test…", reply_markup=menu_kb)
async def worker():
text, _ok = await run_selftest(cfg, DOCKER_MAP)
await msg.answer(text, reply_markup=menu_kb)
asyncio.create_task(worker())
def _rate_str(value: float) -> str:
if value >= 1024 * 1024:
return f"{value / (1024 * 1024):.2f} MiB/s"
@@ -93,6 +149,8 @@ async def _network_snapshot(interval: float = 1.0) -> str:
for nic, s in end.items():
if nic.startswith("lo"):
continue
if not nic.startswith("enp"):
continue
e = start.get(nic)
if not e:
continue

View File

@@ -1,8 +1,9 @@
import asyncio
import os
from datetime import datetime, timezone, timedelta
from aiogram import F
from aiogram.types import Message, CallbackQuery, InlineKeyboardMarkup, InlineKeyboardButton
from app import dp, cfg
from aiogram.types import Message, CallbackQuery, InlineKeyboardMarkup, InlineKeyboardButton, InputFile, BufferedInputFile
from app import dp, cfg, ADMIN_IDS
from auth import is_admin_msg
from keyboards import (
system_info_kb,
@@ -10,21 +11,31 @@ from keyboards import (
system_logs_audit_kb,
system_logs_security_kb,
system_logs_integrations_kb,
system_logs_kb,
openwrt_kb,
docker_kb, backup_kb,
)
from system_checks import security, disks, hardware, list_disks, smart_last_test
from services.http_checks import get_url_checks, check_url
from services.queue import enqueue
from services.queue import enqueue, get_history_raw, get_stats
from services.updates import list_updates, apply_updates
from services.runner import run_cmd
from services.runner import run_cmd, run_cmd_full
from services.npmplus import fetch_certificates, format_certificates, list_proxy_hosts, set_proxy_host
from services.gitea import get_gitea_health
from services.openwrt import get_openwrt_status
from services.openwrt import get_openwrt_status, fetch_openwrt_leases
from services.system import worst_disk_usage
import state
from state import UPDATES_CACHE, REBOOT_PENDING
from services.metrics import summarize
from services.audit import read_audit_tail
from services.incidents import read_recent, incidents_path
from services.incidents import read_recent, incidents_path, read_raw, infer_category
from services.external_checks import format_report
from services.disk_report import build_disk_report
from services import runtime_state
import io
import json
import csv
import zipfile
@dp.message(F.text == "💽 Disks")
@@ -195,19 +206,91 @@ async def smart_status(msg: Message):
await msg.answer("\n".join(lines), reply_markup=system_info_kb)
@dp.message(F.text == "📡 OpenWrt")
@dp.message(F.text.in_({"/openwrt", "📡 Full status"}))
async def openwrt_status(msg: Message):
if not is_admin_msg(msg):
return
await msg.answer("⏳ Checking OpenWrt…", reply_markup=system_info_kb)
await msg.answer("⏳ Checking OpenWrt…", reply_markup=openwrt_kb)
async def worker():
try:
text = await get_openwrt_status(cfg)
except Exception as e:
text = f"⚠️ OpenWrt error: {e}"
await msg.answer(text, reply_markup=system_info_kb)
await msg.answer(text, reply_markup=openwrt_kb)
asyncio.create_task(worker())
@dp.message(F.text == "/openwrt_wan")
async def openwrt_wan(msg: Message):
if not is_admin_msg(msg):
return
await msg.answer("⏳ Checking OpenWrt WAN…", reply_markup=openwrt_kb)
async def worker():
try:
text = await get_openwrt_status(cfg, mode="wan")
except Exception as e:
text = f"⚠️ OpenWrt error: {e}"
await msg.answer(text, reply_markup=openwrt_kb)
asyncio.create_task(worker())
@dp.message(F.text == "📡 OpenWrt")
async def openwrt_menu(msg: Message):
if not is_admin_msg(msg):
return
await msg.answer("📡 OpenWrt menu", reply_markup=openwrt_kb)
@dp.message(F.text.in_({"/openwrt_clients", "📶 Wi-Fi clients"}))
async def openwrt_clients(msg: Message):
if not is_admin_msg(msg):
return
await msg.answer("⏳ Checking OpenWrt clients…", reply_markup=openwrt_kb)
async def worker():
try:
text = await get_openwrt_status(cfg, mode="clients")
except Exception as e:
text = f"⚠️ OpenWrt error: {e}"
await msg.answer(text, reply_markup=openwrt_kb)
asyncio.create_task(worker())
@dp.message(F.text.in_({"/openwrt_leases", "🧾 Leases"}))
async def openwrt_leases(msg: Message):
if not is_admin_msg(msg):
return
await msg.answer("⏳ Checking OpenWrt leases…", reply_markup=openwrt_kb)
async def worker():
try:
text = await get_openwrt_status(cfg, mode="leases")
except Exception as e:
text = f"⚠️ OpenWrt error: {e}"
await msg.answer(text, reply_markup=openwrt_kb)
asyncio.create_task(worker())
@dp.message(F.text == "/openwrt_fast")
@dp.message(F.text == "🌐 WAN fast")
async def openwrt_fast(msg: Message):
if not is_admin_msg(msg):
return
await msg.answer("⏳ OpenWrt fast…", reply_markup=openwrt_kb)
async def worker():
try:
text = await get_openwrt_status(cfg, mode="wan")
except Exception as e:
text = f"⚠️ OpenWrt error: {e}"
await msg.answer(text, reply_markup=openwrt_kb)
asyncio.create_task(worker())
@@ -253,6 +336,512 @@ async def incidents(msg: Message):
await msg.answer(text, reply_markup=system_logs_audit_kb, parse_mode="Markdown")
@dp.message(F.text == "🧾 Incidents")
async def incidents_entry(msg: Message):
if not is_admin_msg(msg):
return
await msg.answer(
"📣 Incidents menu:\n"
"- Summary\n- Diff\n- Heatmap\n- Export/All\n- Alerts log",
reply_markup=system_logs_audit_kb,
)
@dp.message(F.text.in_({"/incidents_summary", "📣 Summary"}))
async def incidents_summary(msg: Message):
if not is_admin_msg(msg):
return
last_24h = read_raw(cfg, hours=24, limit=2000)
last_7d = read_raw(cfg, hours=24 * 7, limit=4000)
def summarize(items):
total = len(items)
cats = {}
suppressed = {}
last_seen = {}
for dt, msg in items:
cat = infer_category(msg) or "n/a"
cats[cat] = cats.get(cat, 0) + 1
last_seen[cat] = dt
if "[suppressed" in msg.lower():
suppressed[cat] = suppressed.get(cat, 0) + 1
def fmt_top(d):
return ", ".join(f"{k}:{v}" for k, v in sorted(d.items(), key=lambda x: x[1], reverse=True)[:5]) or "n/a"
top = fmt_top(cats)
top_supp = fmt_top(suppressed)
last_parts = []
for k, dt in sorted(last_seen.items(), key=lambda x: x[1], reverse=True)[:5]:
last_parts.append(f"{k}:{dt.astimezone().strftime('%Y-%m-%d %H:%M')}")
last_str = ", ".join(last_parts) or "n/a"
return total, top, top_supp, last_str
t24, top24, supp24, last24 = summarize(last_24h)
t7, top7, supp7, last7 = summarize(last_7d)
text = (
"📣 Incidents summary\n\n"
f"24h: {t24} (top: {top24}; suppressed: {supp24}; last: {last24})\n"
f"7d: {t7} (top: {top7}; suppressed: {supp7}; last: {last7})"
)
await msg.answer(text, reply_markup=system_logs_kb)
@dp.message(F.text.startswith("/incidents_diff"))
@dp.message(F.text == "🆕 Diff")
async def incidents_diff(msg: Message):
if not is_admin_msg(msg):
return
try:
parts = msg.text.split()
hours = 24
reset = False
if len(parts) >= 2:
if parts[1].lower() in {"reset", "clear"}:
reset = True
else:
try:
hours = max(1, int(parts[1]))
except ValueError:
hours = 24
if reset:
runtime_state.set_state("incidents_diff_last_ts", None)
await msg.answer(
"📣 Diff marker reset. Next run will show all within window.",
reply_markup=system_logs_audit_kb,
)
return
last_iso = runtime_state.get("incidents_diff_last_ts")
last_dt = None
if isinstance(last_iso, str):
try:
last_dt = datetime.fromisoformat(last_iso)
except Exception:
last_dt = None
rows = read_raw(cfg, hours=hours, limit=5000, include_old=True)
def collect(from_dt):
fresh: list[tuple[datetime, str]] = []
for dt, line in rows:
if from_dt and dt <= from_dt:
continue
fresh.append((dt, line))
return fresh
fresh = collect(last_dt)
# auto-reset if marker is ahead of all rows
if not fresh and last_dt and rows and last_dt >= rows[-1][0]:
last_dt = None
runtime_state.set_state("incidents_diff_last_ts", None)
fresh = collect(None)
def fmt_lines(items: list[tuple[datetime, str]], limit_chars: int = 3500) -> str:
buf = []
total_len = 0
for dt, line in items:
entry = f"{dt.astimezone().strftime('%m-%d %H:%M')} {line}"
if total_len + len(entry) + 1 > limit_chars:
break
buf.append(entry)
total_len += len(entry) + 1
return "\n".join(buf)
if not fresh:
note = f"since {last_dt.astimezone().strftime('%Y-%m-%d %H:%M')}" if last_dt else "for the period"
if rows:
sample = rows[-50:][::-1] # newest first
body = fmt_lines(sample)
await msg.answer(
f"📣 No new incidents {note} (window {hours}h).\nRecent sample:\n```\n{body}\n```",
reply_markup=system_logs_audit_kb,
parse_mode="Markdown",
)
else:
await msg.answer(
f"📣 No new incidents {note} (window {hours}h)",
reply_markup=system_logs_audit_kb,
)
return
fresh.sort(key=lambda x: x[0], reverse=True)
body = fmt_lines(fresh)
await msg.answer(
f"📣 New incidents (since last mark, window {hours}h): {len(fresh)}\n```\n{body}\n```",
reply_markup=system_logs_audit_kb,
parse_mode="Markdown",
)
try:
runtime_state.set_state("incidents_diff_last_ts", fresh[0][0].isoformat())
except Exception:
pass
except Exception as e:
await msg.answer(f"⚠️ Diff error: {type(e).__name__}: {e}", reply_markup=system_logs_audit_kb)
@dp.message(F.text.startswith("/alerts_heatmap"))
@dp.message(F.text == "🔥 Heatmap")
async def alerts_heatmap(msg: Message):
if not is_admin_msg(msg):
return
hours = 48
category = None
if msg.text.startswith("/alerts_heatmap"):
parts = msg.text.split()
for part in parts[1:]:
if part.isdigit():
hours = max(1, int(part))
else:
category = part.lower()
rows = read_raw(cfg, hours=hours, limit=8000, include_old=True)
buckets: dict[datetime, int] = {}
for dt, line in rows:
cat = infer_category(line) or "n/a"
if category and cat != category:
continue
local = dt.astimezone()
hour = local.replace(minute=0, second=0, microsecond=0)
buckets[hour] = buckets.get(hour, 0) + 1
if not buckets:
await msg.answer(f"⚠️ No incidents for heatmap (hours={hours}, category={category or 'any'})")
return
start = datetime.now().astimezone() - timedelta(hours=hours - 1)
timeline = []
current = start.replace(minute=0, second=0, microsecond=0)
while current <= datetime.now().astimezone():
timeline.append((current, buckets.get(current, 0)))
current += timedelta(hours=1)
max_count = max(c for _, c in timeline) or 1
lines = [f"📊 Alerts heatmap (hours={hours}, category={category or 'any'})"]
for ts, cnt in timeline[-72:]:
bar_len = max(1, int(cnt / max_count * 12)) if cnt else 0
bar = "" * bar_len if cnt else ""
lines.append(f"{ts:%m-%d %H}: {cnt:>3} {bar}")
await msg.answer("\n".join(lines), reply_markup=system_logs_audit_kb)
@dp.message(F.text == "/disk_snapshot")
async def disk_snapshot(msg: Message):
if not is_admin_msg(msg):
return
usage, mount = worst_disk_usage()
mount = mount or "/"
try:
report = await build_disk_report(cfg, mount, usage or 0)
except Exception as e:
await msg.answer(f"⚠️ Disk snapshot error: {e}")
return
await msg.answer(f"💽 Disk snapshot ({mount})\n\n{report}", reply_markup=system_info_kb)
@dp.message(F.text.startswith("/alerts_log"))
async def alerts_log(msg: Message):
if not is_admin_msg(msg):
return
parts = msg.text.split()
hours = 24
if len(parts) >= 2:
try:
hours = max(1, int(parts[1]))
except ValueError:
hours = 24
rows = read_raw(cfg, hours=hours, limit=2000)
suppressed = [(dt, m) for dt, m in rows if "[suppressed" in m.lower()]
sent = [(dt, m) for dt, m in rows if "[suppressed" not in m.lower()]
lines = [f"📣 Alerts log ({hours}h)"]
lines.append(f"Sent: {len(sent)}, Suppressed: {len(suppressed)}")
if suppressed:
lines.append("\nSuppressed:")
for dt, m in suppressed[-20:]:
lines.append(f"{dt:%m-%d %H:%M} {m}")
await msg.answer("\n".join(lines), reply_markup=system_logs_audit_kb)
@dp.message(F.text.startswith("/incidents_export"))
@dp.message(F.text == "📤 Export")
async def incidents_export(msg: Message):
if not is_admin_msg(msg):
return
parts = msg.text.split()
hours = 24
fmt = "csv"
if len(parts) >= 2:
try:
hours = max(1, int(parts[1]))
except ValueError:
hours = 24
if len(parts) >= 3:
fmt = parts[2].lower()
rows = read_raw(cfg, hours=hours, limit=20000, include_old=False)
data = []
for dt, msg_line in rows:
data.append({
"timestamp": dt.astimezone().isoformat(),
"category": infer_category(msg_line) or "n/a",
"message": msg_line,
})
if fmt == "json":
payload = json.dumps(data, ensure_ascii=False, indent=2)
file_bytes = payload.encode("utf-8")
fname = f"incidents_{hours}h.json"
else:
sio = io.StringIO()
writer = csv.DictWriter(sio, fieldnames=["timestamp", "category", "message"], delimiter=";")
writer.writeheader()
for row in data:
writer.writerow(row)
file_bytes = sio.getvalue().encode("utf-8")
fname = f"incidents_{hours}h.csv"
summary = f"📤 Incidents export ({hours}h): {len(data)} rows, format {fmt}"
await msg.answer(summary)
await msg.answer_document(document=BufferedInputFile(file_bytes, filename=fname))
@dp.message(F.text.in_({"/backup_sla", "📉 Backup SLA", "Backup SLA"}))
@dp.message(F.text.contains("Backup SLA"))
@dp.message(F.text.regexp(r"(?i)backup.*sla|sla.*backup"))
@dp.message(F.text.func(lambda t: isinstance(t, str) and "backup" in t.lower() and "sla" in t.lower()))
async def backup_sla(msg: Message):
if not is_admin_msg(msg):
return
await msg.answer("⏳ Checking Backup SLA…", reply_markup=backup_kb)
rc, out = await run_cmd_full(["restic", "snapshots", "--json"], use_restic_env=True, timeout=40)
if rc != 0:
await msg.answer(f"⚠️ Restic error: {out.strip() or rc}", reply_markup=backup_kb)
return
try:
snaps = json.loads(out)
except Exception as e:
await msg.answer(f"⚠️ Invalid restic JSON: {e}", reply_markup=backup_kb)
return
if not isinstance(snaps, list) or not snaps:
await msg.answer("⚠️ No snapshots found", reply_markup=backup_kb)
return
snaps.sort(key=lambda s: s.get("time", ""), reverse=True)
last_time_raw = snaps[0].get("time")
if last_time_raw:
last_dt = datetime.fromisoformat(last_time_raw.replace("Z", "+00:00"))
age_h = (datetime.now(tz=last_dt.tzinfo) - last_dt).total_seconds() / 3600
else:
last_dt = None
age_h = None
sla = float(cfg.get("backup", {}).get("sla_hours", 26))
if age_h is None:
status = "🟡"
elif age_h <= sla:
status = "🟢"
elif age_h <= sla * 1.5:
status = "🟡"
else:
status = "🔴"
last_str = last_dt.astimezone().strftime("%Y-%m-%d %H:%M") if last_dt else "n/a"
age_str = f"{age_h:.1f}h" if age_h is not None else "n/a"
await msg.answer(
f"{status} Backup SLA\n"
f"Snapshots: {len(snaps)}\n"
f"Last: {last_str} (age {age_str})\n"
f"SLA: {sla}h",
reply_markup=backup_kb,
)
@dp.message(F.text.startswith("/docker_restarts"))
@dp.message(F.text == "♻️ Restarts")
async def docker_restarts(msg: Message):
if not is_admin_msg(msg):
return
parts = msg.text.split()
hours = 168
if len(parts) >= 2:
try:
hours = max(1, int(parts[1]))
except ValueError:
hours = 168
rows = read_raw(cfg, hours=hours, limit=5000, include_old=True)
restarts = [(dt, line) for dt, line in rows if "docker_restart" in line]
if not restarts:
await msg.answer(f"🐳 No docker restarts in last {hours}h", reply_markup=docker_kb)
return
counts: dict[str, int] = {}
for _dt, line in restarts:
parts_line = line.split()
if len(parts_line) >= 2:
alias = parts_line[-1]
else:
alias = "unknown"
counts[alias] = counts.get(alias, 0) + 1
top = ", ".join(f"{k}:{v}" for k, v in sorted(counts.items(), key=lambda x: x[1], reverse=True))
body = "\n".join(f"{dt.astimezone().strftime('%m-%d %H:%M')} {line}" for dt, line in restarts[-50:])
await msg.answer(
f"🐳 Docker restarts ({hours}h): {len(restarts)} ({top})\n```\n{body}\n```",
reply_markup=docker_kb,
parse_mode="Markdown",
)
@dp.message(F.text.startswith("/openwrt_leases_diff"))
@dp.message(F.text == "🔀 Leases diff")
async def openwrt_leases_diff(msg: Message):
if not is_admin_msg(msg):
return
await msg.answer("⏳ OpenWrt leases diff…", reply_markup=openwrt_kb)
async def worker():
try:
leases_now = await fetch_openwrt_leases(cfg)
except Exception as e:
await msg.answer(f"⚠️ OpenWrt error: {e}", reply_markup=openwrt_kb)
return
prev = runtime_state.get("openwrt_leases_prev", [])
if not prev:
runtime_state.set_state("openwrt_leases_prev", leases_now)
await msg.answer(f"Baseline saved ({len(leases_now)} leases)", reply_markup=openwrt_kb)
return
prev_set = set(prev)
now_set = set(leases_now)
added = sorted(list(now_set - prev_set))
removed = sorted(list(prev_set - now_set))
lines = [f"🧾 OpenWrt leases diff ({len(added)} added, {len(removed)} removed)"]
if added:
lines.append(" Added:")
lines.extend([f" - {x}" for x in added[:50]])
if removed:
lines.append(" Removed:")
lines.extend([f" - {x}" for x in removed[:50]])
if len(added) > 50 or len(removed) > 50:
lines.append("… trimmed")
runtime_state.set_state("openwrt_leases_prev", leases_now)
await msg.answer("\n".join(lines), reply_markup=openwrt_kb)
asyncio.create_task(worker())
@dp.message(F.text.in_({"/queue_sla", "📊 Queue SLA", "Queue SLA"}))
@dp.message(F.text.contains("Queue SLA"))
@dp.message(F.text.regexp(r"(?i)queue.*sla|sla.*queue"))
@dp.message(F.text.func(lambda t: isinstance(t, str) and "queue" in t.lower() and "sla" in t.lower()))
async def queue_sla(msg: Message):
if not is_admin_msg(msg):
return
await msg.answer("⏳ Calculating Queue SLA…", reply_markup=backup_kb)
hist = get_history_raw()
if not hist:
await msg.answer("🧾 Queue SLA: history is empty", reply_markup=backup_kb)
return
waits = [item.get("wait_sec", 0) for item in hist]
runs = [item.get("runtime_sec", 0) for item in hist]
def p95(values: list[int]) -> float:
if not values:
return 0.0
vals = sorted(values)
k = int(0.95 * (len(vals) - 1))
return float(vals[k])
lines = [
"🧾 Queue SLA",
f"Samples: {len(hist)}",
f"Max wait: {max(waits)}s, p95 {p95(waits):.1f}s",
f"Max run: {max(runs)}s, p95 {p95(runs):.1f}s",
]
stats = get_stats()
if stats:
lines.append(
f"Avg wait: {stats.get('avg_wait_sec', 0):.1f}s, "
f"avg run: {stats.get('avg_runtime_sec', 0):.1f}s"
)
await msg.answer("\n".join(lines), reply_markup=backup_kb)
# Fallback router: any message with "sla" is dispatched to backup or queue SLA.
@dp.message(F.text.regexp(r"(?i)sla"))
async def sla_fallback(msg: Message):
if not is_admin_msg(msg):
return
text = msg.text or ""
tl = text.lower()
if "queue" in tl:
await queue_sla(msg)
elif "backup" in tl:
await backup_sla(msg)
@dp.message(F.text == "/selftest_history")
async def selftest_history(msg: Message):
if not is_admin_msg(msg):
return
hist = runtime_state.get("selftest_history", [])
if not hist:
await msg.answer("🧪 Self-test history is empty", reply_markup=system_logs_audit_kb)
return
lines = ["🧪 Self-test history"]
for entry in hist[:15]:
ts = entry.get("ts", "")[:16]
ok = entry.get("ok", False)
emoji = "🟢" if ok else "🔴"
summary = entry.get("summary", "")
lines.append(f"{emoji} {ts} {summary}")
await msg.answer("\n".join(lines), reply_markup=system_logs_audit_kb)
@dp.message(F.text.startswith("/export_all"))
@dp.message(F.text == "📦 Export all")
async def export_all(msg: Message):
if not is_admin_msg(msg):
return
hours = 168
parts = msg.text.split()
if len(parts) >= 2:
try:
hours = max(1, int(parts[1]))
except ValueError:
hours = 168
incidents_rows = read_raw(cfg, hours=hours, limit=20000, include_old=True)
inc_sio = io.StringIO()
inc_writer = csv.writer(inc_sio, delimiter=";")
inc_writer.writerow(["timestamp", "category", "message"])
for dt, line in incidents_rows:
inc_writer.writerow([dt.astimezone().isoformat(), infer_category(line) or "n/a", line])
queue_rows = get_history_raw()
q_sio = io.StringIO()
q_writer = csv.writer(q_sio, delimiter=";")
q_writer.writerow(["finished_at", "label", "status", "wait_sec", "runtime_sec"])
for item in queue_rows:
q_writer.writerow(
[
datetime.fromtimestamp(item.get("finished_at", 0)).isoformat()
if item.get("finished_at")
else "",
item.get("label", ""),
item.get("status", ""),
item.get("wait_sec", ""),
item.get("runtime_sec", ""),
]
)
st_hist = runtime_state.get("selftest_history", [])
st_sio = io.StringIO()
st_writer = csv.writer(st_sio, delimiter=";")
st_writer.writerow(["timestamp", "ok", "summary"])
for entry in st_hist:
st_writer.writerow([entry.get("ts", ""), entry.get("ok", False), entry.get("summary", "")])
mem_zip = io.BytesIO()
with zipfile.ZipFile(mem_zip, "w", compression=zipfile.ZIP_DEFLATED) as zf:
zf.writestr(f"incidents_{hours}h.csv", inc_sio.getvalue())
zf.writestr("queue_history.csv", q_sio.getvalue())
zf.writestr("selftest_history.csv", st_sio.getvalue())
mem_zip.seek(0)
summary = (
f"📦 Export all\n"
f"Incidents: {len(incidents_rows)} rows ({hours}h)\n"
f"Queue history: {len(queue_rows)} rows\n"
f"Selftest: {len(st_hist)} rows"
)
await msg.answer(summary)
await msg.answer_document(document=BufferedInputFile(mem_zip.read(), filename="export_all.zip"))
@dp.message(F.text == "🔒 SSL")
async def ssl_certs(msg: Message):
if not is_admin_msg(msg):
@@ -433,11 +1022,14 @@ async def updates_page(cb: CallbackQuery):
@dp.callback_query(F.data == "upgrade:confirm")
async def upgrade_confirm(cb: CallbackQuery):
if cb.from_user.id != cfg["telegram"]["admin_id"]:
if cb.from_user.id not in ADMIN_IDS:
return
await cb.answer()
async def job():
if cfg.get("safety", {}).get("dry_run", False):
await cb.message.answer("🧪 Dry-run: upgrade skipped", reply_markup=system_ops_kb)
return
text = await apply_updates()
await cb.message.answer(text, reply_markup=system_ops_kb, parse_mode="Markdown")
@@ -453,7 +1045,7 @@ async def upgrade_cancel(cb: CallbackQuery):
@dp.callback_query(F.data == "reboot:confirm")
async def reboot_confirm(cb: CallbackQuery):
if cb.from_user.id != cfg["telegram"]["admin_id"]:
if cb.from_user.id not in ADMIN_IDS:
return
await cb.answer()
REBOOT_PENDING[cb.from_user.id] = {}
@@ -468,7 +1060,7 @@ async def reboot_cancel(cb: CallbackQuery):
@dp.callback_query(F.data.startswith("npmplus:"))
async def npmplus_toggle(cb: CallbackQuery):
if cb.from_user.id != cfg["telegram"]["admin_id"]:
if cb.from_user.id not in ADMIN_IDS:
return
parts = cb.data.split(":")
if len(parts) != 3:
@@ -506,6 +1098,9 @@ async def reboot_password(msg: Message):
return
async def job():
if cfg.get("safety", {}).get("dry_run", False):
await msg.answer("🧪 Dry-run: reboot skipped", reply_markup=system_ops_kb)
return
await msg.answer("🔄 Rebooting…", reply_markup=system_ops_kb)
await run_cmd(["sudo", "reboot"], timeout=10)

View File

@@ -10,7 +10,7 @@ menu_kb = ReplyKeyboardMarkup(
keyboard=[
[KeyboardButton(text="🩺 Health"), KeyboardButton(text="📊 Статус")],
[KeyboardButton(text="🐳 Docker"), KeyboardButton(text="📦 Backup")],
[KeyboardButton(text="🧉 Artifacts"), KeyboardButton(text="⚙️ System")],
[KeyboardButton(text="⚙️ System")],
[KeyboardButton(text=" Help")],
],
resize_keyboard=True,
@@ -20,7 +20,8 @@ docker_kb = ReplyKeyboardMarkup(
keyboard=[
[KeyboardButton(text="🐳 Status"), KeyboardButton(text="🧰 Arcane")],
[KeyboardButton(text="🔄 Restart"), KeyboardButton(text="📜 Logs")],
[KeyboardButton(text="📈 Stats"), KeyboardButton(text=" Назад")],
[KeyboardButton(text="📈 Stats"), KeyboardButton(text=" Restarts")],
[KeyboardButton(text="⬅️ Назад")],
],
resize_keyboard=True,
)
@@ -37,8 +38,8 @@ backup_kb = ReplyKeyboardMarkup(
keyboard=[
[KeyboardButton(text="📦 Status"), KeyboardButton(text="📦 Last snapshot")],
[KeyboardButton(text="📊 Repo stats"), KeyboardButton(text="🧯 Restore help")],
[KeyboardButton(text="▶️ Run backup"), KeyboardButton(text="🧾 Queue")],
[KeyboardButton(text="🧪 Restic check"), KeyboardButton(text="📬 Weekly report"), KeyboardButton(text="⬅️ Назад")],
[KeyboardButton(text="▶️ Run backup"), KeyboardButton(text="🧾 Queue"), KeyboardButton(text="📊 Queue SLA")],
[KeyboardButton(text="📉 Backup SLA"), KeyboardButton(text="📜 History"), KeyboardButton(text="⬅️ Назад")],
],
resize_keyboard=True,
)
@@ -83,6 +84,7 @@ system_logs_kb = ReplyKeyboardMarkup(
keyboard=[
[KeyboardButton(text="🧾 Audit/Incidents"), KeyboardButton(text="🔒 Security")],
[KeyboardButton(text="🧩 Integrations"), KeyboardButton(text="🧰 Processes")],
[KeyboardButton(text="📣 Summary"), KeyboardButton(text="🔥 Heatmap")],
[KeyboardButton(text="⬅️ System")],
],
resize_keyboard=True,
@@ -91,6 +93,8 @@ system_logs_kb = ReplyKeyboardMarkup(
system_logs_audit_kb = ReplyKeyboardMarkup(
keyboard=[
[KeyboardButton(text="🧾 Audit"), KeyboardButton(text="📣 Incidents")],
[KeyboardButton(text="🆕 Diff"), KeyboardButton(text="📤 Export")],
[KeyboardButton(text="📦 Export all"), KeyboardButton(text="🧰 Alerts log")],
[KeyboardButton(text="⬅️ Logs")],
],
resize_keyboard=True,
@@ -121,6 +125,17 @@ system_logs_tools_kb = ReplyKeyboardMarkup(
resize_keyboard=True,
)
# OpenWrt submenu (4 ряда)
openwrt_kb = ReplyKeyboardMarkup(
keyboard=[
[KeyboardButton(text="🌐 WAN fast"), KeyboardButton(text="📡 Full status")],
[KeyboardButton(text="📶 Wi-Fi clients"), KeyboardButton(text="🧾 Leases")],
[KeyboardButton(text="🔀 Leases diff")],
[KeyboardButton(text="⬅️ System")],
],
resize_keyboard=True,
)
def docker_inline_kb(action: str) -> InlineKeyboardMarkup:
rows = []

View File

@@ -1,4 +1,5 @@
from pathlib import Path
import os
import time
LOCK_DIR = Path("/var/run/tg-bot")
@@ -11,9 +12,14 @@ def lock_path(name: str) -> Path:
def acquire_lock(name: str) -> bool:
p = lock_path(name)
if p.exists():
try:
fd = os.open(str(p), os.O_CREAT | os.O_EXCL | os.O_WRONLY)
except FileExistsError:
return False
p.write_text(str(time.time()))
try:
os.write(fd, str(time.time()).encode("ascii", errors="ignore"))
finally:
os.close(fd)
return True

51
main.py
View File

@@ -1,16 +1,20 @@
import asyncio
import logging
import socket
from datetime import datetime
from app import bot, dp, cfg, ADMIN_ID
from app import bot, dp, cfg, ADMIN_ID, ADMIN_IDS
from keyboards import menu_kb
from services.docker import discover_containers, docker_watchdog
from services.alerts import monitor_resources, monitor_smart
from services.alerts import monitor_resources, monitor_smart, monitor_raid
from services.metrics import MetricsStore, start_sampler
from services.queue import worker as queue_worker
from services.queue import worker as queue_worker, configure as queue_configure
from services.notify import notify
from services.audit import AuditMiddleware, audit_start
from services.ssl_alerts import monitor_ssl
from services.external_checks import monitor_external
from services.incidents import log_incident
from services.logging_setup import setup_logging
from services.selftest import schedule_selftest
import state
import handlers.menu
import handlers.status
@@ -22,6 +26,39 @@ import handlers.help
import handlers.callbacks
import handlers.arcane
import handlers.processes
from services.weekly_report import weekly_reporter
import handlers.alerts_admin
import handlers.config_check
def _handle_async_exception(_loop, context):
msg = context.get("message") or "Unhandled exception"
exc = context.get("exception")
if exc:
text = f"{msg}: {type(exc).__name__}: {exc}"
else:
text = f"{msg}"
now = datetime.now()
if not hasattr(_handle_async_exception, "_recent"):
_handle_async_exception._recent = []
_handle_async_exception._last_alert = None
recent = _handle_async_exception._recent
recent.append(now)
# keep last hour
_handle_async_exception._recent = [t for t in recent if (now - t).total_seconds() < 3600]
if len(_handle_async_exception._recent) >= 3:
last_alert = getattr(_handle_async_exception, "_last_alert", None)
if not last_alert or (now - last_alert).total_seconds() > 3600:
try:
log_incident(cfg, "exception_flood", category="system")
except Exception:
pass
_handle_async_exception._last_alert = now
try:
log_incident(cfg, text, category="system")
except Exception:
pass
logging.getLogger("asyncio").error(text)
async def notify_start():
@@ -33,6 +70,7 @@ async def notify_start():
async def main():
setup_logging(cfg)
dp.message.middleware(AuditMiddleware(cfg))
dp.callback_query.middleware(AuditMiddleware(cfg))
audit_start(cfg)
@@ -44,13 +82,20 @@ async def main():
asyncio.create_task(monitor_resources(cfg, notify, bot, ADMIN_ID))
if cfg.get("alerts", {}).get("smart_enabled", True):
asyncio.create_task(monitor_smart(cfg, notify, bot, ADMIN_ID))
if cfg.get("alerts", {}).get("raid_enabled", True):
asyncio.create_task(monitor_raid(cfg, notify, bot, ADMIN_ID))
if cfg.get("npmplus", {}).get("alerts", {}).get("enabled", True):
asyncio.create_task(monitor_ssl(cfg, notify, bot, ADMIN_ID))
if cfg.get("external_checks", {}).get("enabled", True):
asyncio.create_task(monitor_external(cfg))
state.METRICS_STORE = MetricsStore()
asyncio.create_task(start_sampler(state.METRICS_STORE, interval=5))
queue_configure(cfg.get("queue", {}), cfg)
asyncio.create_task(queue_worker())
asyncio.create_task(weekly_reporter(cfg, bot, ADMIN_IDS, state.DOCKER_MAP))
asyncio.create_task(schedule_selftest(cfg, bot, ADMIN_IDS, state.DOCKER_MAP))
loop = asyncio.get_running_loop()
loop.set_exception_handler(_handle_async_exception)
await notify_start()
await dp.start_polling(bot)

93
services/alert_mute.py Normal file
View File

@@ -0,0 +1,93 @@
import time
from typing import Dict
from services.runtime_state import get_state, set_state
# category -> unix timestamp until muted
def _mutes() -> Dict[str, float]:
return get_state().get("mutes", {})
def _save(mutes: Dict[str, float]):
set_state("mutes", mutes)
def _cleanup() -> None:
mutes = _mutes()
now = time.time()
expired = [k for k, until in mutes.items() if until <= now]
for k in expired:
mutes.pop(k, None)
_save(mutes)
def set_mute(category: str, seconds: int) -> float:
_cleanup()
mutes = _mutes()
until = time.time() + max(0, seconds)
mutes[category] = until
_save(mutes)
return until
def clear_mute(category: str) -> None:
mutes = _mutes()
mutes.pop(category, None)
_save(mutes)
def is_muted(category: str | None) -> bool:
if not category:
return False
_cleanup()
mutes = _mutes()
until = mutes.get(category)
if until is None:
return False
if until <= time.time():
mutes.pop(category, None)
_save(mutes)
return False
return True
def list_mutes() -> dict[str, int]:
_cleanup()
now = time.time()
mutes = _mutes()
return {k: int(until - now) for k, until in mutes.items()}
def is_auto_muted(cfg: dict, category: str | None) -> bool:
if not category:
return False
auto_list = cfg.get("alerts", {}).get("auto_mute", [])
if not isinstance(auto_list, list):
return False
now = time.localtime()
now_minutes = now.tm_hour * 60 + now.tm_min
for item in auto_list:
if not isinstance(item, dict):
continue
cat = item.get("category")
if cat != category:
continue
start = item.get("start", "00:00")
end = item.get("end", "00:00")
try:
sh, sm = [int(x) for x in start.split(":")]
eh, em = [int(x) for x in end.split(":")]
except Exception:
continue
start_min = sh * 60 + sm
end_min = eh * 60 + em
if start_min == end_min:
continue
if start_min < end_min:
if start_min <= now_minutes < end_min:
return True
else:
if now_minutes >= start_min or now_minutes < end_min:
return True
return False

View File

@@ -1,7 +1,7 @@
import asyncio
import time
import psutil
from system_checks import list_disks, smart_health, disk_temperature
from system_checks import list_disks, smart_health, disk_temperature, list_md_arrays, md_array_status
from services.system import worst_disk_usage
from services.disk_report import build_disk_report
@@ -11,6 +11,8 @@ async def monitor_resources(cfg, notify, bot, chat_id):
interval = int(alerts_cfg.get("interval_sec", 60))
cooldown = int(alerts_cfg.get("cooldown_sec", 900))
notify_recovery = bool(alerts_cfg.get("notify_recovery", True))
load_only_critical = bool(alerts_cfg.get("load_only_critical", False))
auto_mute_high_load_sec = int(alerts_cfg.get("auto_mute_on_high_load_sec", 0))
disk_warn = int(cfg.get("thresholds", {}).get("disk_warn", 80))
snapshot_warn = int(cfg.get("disk_report", {}).get("threshold", disk_warn))
@@ -27,27 +29,27 @@ async def monitor_resources(cfg, notify, bot, chat_id):
usage, mount = worst_disk_usage()
if usage is None:
if not state["disk_na"] or now - last_sent["disk_na"] >= cooldown:
await notify(bot, chat_id, "⚠️ Disk usage n/a")
await notify(bot, chat_id, "⚠️ Disk usage n/a", level="warn", key="disk_na", category="disk")
state["disk_na"] = True
last_sent["disk_na"] = now
else:
if state["disk_na"] and notify_recovery:
await notify(bot, chat_id, f"🟢 Disk usage OK ({usage}% {mount})")
if state["disk_na"] and notify_recovery and not load_only_critical:
await notify(bot, chat_id, f"🟢 Disk usage OK ({usage}% {mount})", level="info", key="disk_ok", category="disk")
state["disk_na"] = False
if usage >= disk_warn:
if not state["disk_high"] or now - last_sent["disk"] >= cooldown:
await notify(bot, chat_id, f"🟡 Disk usage {usage}% ({mount})")
await notify(bot, chat_id, f"🟡 Disk usage {usage}% ({mount})", level="warn", key="disk_high", category="disk")
state["disk_high"] = True
last_sent["disk"] = now
else:
if state["disk_high"] and notify_recovery:
await notify(bot, chat_id, f"🟢 Disk usage OK ({usage}% {mount})")
if state["disk_high"] and notify_recovery and not load_only_critical:
await notify(bot, chat_id, f"🟢 Disk usage OK ({usage}% {mount})", level="info", key="disk_ok", category="disk")
state["disk_high"] = False
if usage >= snapshot_warn and now - last_sent["disk_report"] >= snapshot_cooldown:
report = await build_disk_report(cfg, mount or "/", usage)
await notify(bot, chat_id, f"📦 Disk snapshot\n\n{report}")
await notify(bot, chat_id, f"📦 Disk snapshot\n\n{report}", level="info", key="disk_snapshot", category="disk")
last_sent["disk_report"] = now
load = psutil.getloadavg()[0]
@@ -57,16 +59,24 @@ async def monitor_resources(cfg, notify, bot, chat_id):
level = 1
else:
level = 0
if load_only_critical and level == 1:
level = 0
if level == 0:
if state["load_level"] > 0 and notify_recovery:
await notify(bot, chat_id, f"🟢 Load OK: {load:.2f}")
if state["load_level"] > 0 and notify_recovery and not load_only_critical:
await notify(bot, chat_id, f"🟢 Load OK: {load:.2f}", level="info", key="load_ok", category="load")
state["load_level"] = 0
else:
if level != state["load_level"] or now - last_sent["load"] >= cooldown:
icon = "🔴" if level == 2 else "🟡"
await notify(bot, chat_id, f"{icon} Load high: {load:.2f}")
level_name = "critical" if level == 2 else "warn"
key = "load_high_crit" if level == 2 else "load_high_warn"
await notify(bot, chat_id, f"{icon} Load high: {load:.2f}", level=level_name, key=key, category="load")
last_sent["load"] = now
if level == 2 and auto_mute_high_load_sec > 0:
from services.alert_mute import set_mute
set_mute("load", auto_mute_high_load_sec)
state["load_level"] = level
await asyncio.sleep(interval)
@@ -91,7 +101,14 @@ async def monitor_smart(cfg, notify, bot, chat_id):
continue
if "FAILED" in health:
await notify(bot, chat_id, f"🔴 SMART FAIL {dev}: {health}, 🌡 {temp}")
await notify(
bot,
chat_id,
f"🔴 SMART FAIL {dev}: {health}, 🌡 {temp}",
level="critical",
key=f"smart_fail:{dev}",
category="smart",
)
last_sent[key] = now
continue
@@ -101,8 +118,66 @@ async def monitor_smart(cfg, notify, bot, chat_id):
except ValueError:
t = None
if t is not None and t >= temp_warn:
await notify(bot, chat_id, f"🟡 SMART HOT {dev}: {health}, 🌡 {temp}")
await notify(
bot,
chat_id,
f"🟡 SMART HOT {dev}: {health}, 🌡 {temp}",
level="warn",
key=f"smart_hot:{dev}",
category="smart",
)
last_sent[key] = now
continue
await asyncio.sleep(interval)
async def monitor_raid(cfg, notify, bot, chat_id):
alerts_cfg = cfg.get("alerts", {})
interval = int(alerts_cfg.get("raid_interval_sec", 300))
cooldown = int(alerts_cfg.get("raid_cooldown_sec", 1800))
notify_recovery = bool(alerts_cfg.get("notify_recovery", True))
last_sent: dict[str, float] = {}
bad_state: dict[str, bool] = {}
while True:
now = time.time()
for dev in list_md_arrays():
status = md_array_status(dev)
lower = status.lower()
level = None
key_suffix = None
if "inactive" in lower:
level = "critical"
key_suffix = "inactive"
elif "degraded" in lower:
level = "warn"
key_suffix = "degraded"
if level:
if not bad_state.get(dev) or (now - last_sent.get(dev, 0.0) >= cooldown):
icon = "🔴" if level == "critical" else "🟡"
await notify(
bot,
chat_id,
f"{icon} RAID {dev}: {status}",
level=level,
key=f"raid_{key_suffix}:{dev}",
category="raid",
)
last_sent[dev] = now
bad_state[dev] = True
else:
if bad_state.get(dev) and notify_recovery:
await notify(
bot,
chat_id,
f"🟢 RAID {dev}: {status}",
level="info",
key=f"raid_ok:{dev}",
category="raid",
)
bad_state[dev] = False
await asyncio.sleep(interval)

35
services/config_check.py Normal file
View File

@@ -0,0 +1,35 @@
import os
from typing import Any, Tuple, List
def validate_cfg(cfg: dict[str, Any]) -> Tuple[List[str], List[str]]:
errors: List[str] = []
warnings: List[str] = []
tg = cfg.get("telegram", {})
if not tg.get("token"):
errors.append("telegram.token is missing")
admin_ids = tg.get("admin_ids")
has_admin_ids = isinstance(admin_ids, list) and len(admin_ids) > 0
if not tg.get("admin_id") and not has_admin_ids:
errors.append("telegram.admin_id is missing")
thresholds = cfg.get("thresholds", {})
for key in ("disk_warn", "load_warn", "high_load_warn"):
if key not in thresholds:
warnings.append(f"thresholds.{key} not set")
paths = cfg.get("paths", {})
env_path = paths.get("restic_env")
if env_path and not os.path.exists(env_path):
warnings.append(f"paths.restic_env not found: {env_path}")
npm = cfg.get("npmplus", {})
if npm and not npm.get("token") and (not npm.get("identity") or not npm.get("secret")):
warnings.append("npmplus: token missing and identity/secret missing")
ow = cfg.get("openwrt", {})
if ow and not ow.get("host"):
warnings.append("openwrt.host is missing")
return errors, warnings

View File

@@ -1,11 +1,48 @@
import os
import re
from typing import Any
from services.runner import run_cmd
def _top_dirs_cmd(path: str, limit: int) -> list[str]:
return ["bash", "-lc", f"du -xhd1 {path} 2>/dev/null | sort -h | tail -n {limit}"]
_ = limit
return ["du", "-x", "-h", "-d", "1", path]
_SIZE_RE = re.compile(r"^\s*([0-9]+(?:\.[0-9]+)?)([KMGTP]?)(i?B?)?$", re.IGNORECASE)
def _size_to_bytes(value: str) -> float:
m = _SIZE_RE.match(value.strip())
if not m:
return -1.0
num = float(m.group(1))
unit = (m.group(2) or "").upper()
mul = {
"": 1,
"K": 1024,
"M": 1024**2,
"G": 1024**3,
"T": 1024**4,
"P": 1024**5,
}.get(unit, 1)
return num * mul
def _format_top_dirs(raw: str, limit: int) -> str:
rows: list[tuple[float, str]] = []
for line in raw.splitlines():
line = line.strip()
if not line:
continue
parts = line.split(maxsplit=1)
if len(parts) != 2:
continue
size, name = parts
rows.append((_size_to_bytes(size), f"{size}\t{name}"))
rows.sort(key=lambda x: x[0])
return "\n".join(line for _sz, line in rows[-max(1, limit):])
async def build_disk_report(cfg: dict[str, Any], mount: str, usage: int) -> str:
@@ -15,24 +52,27 @@ async def build_disk_report(cfg: dict[str, Any], mount: str, usage: int) -> str:
rc, out = await run_cmd(_top_dirs_cmd(mount, limit), timeout=30)
if rc == 0 and out.strip():
top_out = _format_top_dirs(out, limit)
lines.append("")
lines.append("Top directories:")
lines.append(out.strip())
lines.append(top_out)
docker_dir = cfg.get("disk_report", {}).get("docker_dir", "/var/lib/docker")
if docker_dir and os.path.exists(docker_dir):
rc2, out2 = await run_cmd(_top_dirs_cmd(docker_dir, limit), timeout=30)
if rc2 == 0 and out2.strip():
top_out2 = _format_top_dirs(out2, limit)
lines.append("")
lines.append(f"Docker dir: {docker_dir}")
lines.append(out2.strip())
lines.append(top_out2)
logs_dir = cfg.get("disk_report", {}).get("logs_dir", "/var/log")
if logs_dir and os.path.exists(logs_dir):
rc3, out3 = await run_cmd(_top_dirs_cmd(logs_dir, limit), timeout=30)
if rc3 == 0 and out3.strip():
top_out3 = _format_top_dirs(out3, limit)
lines.append("")
lines.append(f"Logs dir: {logs_dir}")
lines.append(out3.strip())
lines.append(top_out3)
return "\n".join(lines)

View File

@@ -144,8 +144,22 @@ async def docker_watchdog(container_map, notify, bot, chat_id):
reply_markup=kb,
)
elif health not in ("healthy", "n/a"):
await notify(bot, chat_id, f"⚠️ {alias} health: {health}")
await notify(
bot,
chat_id,
f"⚠️ {alias} health: {health}",
level="warn",
key=f"docker_health:{alias}",
category="docker",
)
else:
await notify(bot, chat_id, f"🐳 {alias}: {status}")
await notify(
bot,
chat_id,
f"🐳 {alias}: {status}",
level="info",
key=f"docker_status:{alias}:{status}",
category="docker",
)
last[alias] = (status, health)
await asyncio.sleep(120)

View File

@@ -1,6 +1,9 @@
import os
import os
import ssl
import subprocess
import psutil
from urllib.error import HTTPError, URLError
from urllib.request import Request, urlopen
from app import RESTIC_ENV
from services.system import worst_disk_usage
@@ -9,9 +12,35 @@ def _containers_from_cfg(cfg) -> dict:
return cfg.get("docker", {}).get("containers", {})
def _request_status(url: str, verify_tls: bool) -> int | None:
context = None
if not verify_tls:
context = ssl._create_unverified_context() # nosec - config-controlled
req = Request(url, headers={"User-Agent": "tg-admin-bot"})
try:
with urlopen(req, timeout=8, context=context) as resp:
return int(resp.status)
except HTTPError as e:
return int(e.code)
except URLError:
return None
def _npm_api_base(cfg) -> str | None:
npm_cfg = cfg.get("npmplus", {})
base = (npm_cfg.get("base_url") or "").rstrip("/")
if not base:
return None
if not base.endswith("/api"):
base = f"{base}/api"
return base
def health(cfg, container_map: dict | None = None) -> str:
lines = ["🩺 Health check\n"]
thresholds = cfg.get("thresholds", {})
disk_warn = int(thresholds.get("disk_warn", 80))
load_warn = float(thresholds.get("load_warn", 2.0))
try:
env = os.environ.copy()
env.update(RESTIC_ENV)
@@ -30,15 +59,47 @@ def health(cfg, container_map: dict | None = None) -> str:
else:
lines.append(f"🟢 {alias} OK")
npm_cfg = cfg.get("npmplus", {})
npm_base = _npm_api_base(cfg)
if npm_base:
npm_status = _request_status(npm_base, npm_cfg.get("verify_tls", True))
if npm_status == 200:
lines.append("🟢 NPMplus API OK")
elif npm_status is None:
lines.append("🔴 NPMplus API unreachable")
else:
lines.append(f"🟡 NPMplus API HTTP {npm_status}")
g_cfg = cfg.get("gitea", {})
g_base = (g_cfg.get("base_url") or "").rstrip("/")
if g_base:
health_paths = ["/api/healthz", "/api/v1/healthz"]
g_status = None
for path in health_paths:
status = _request_status(f"{g_base}{path}", g_cfg.get("verify_tls", True))
if status == 200:
g_status = status
break
if status not in (404, 405):
g_status = status
break
if g_status == 200:
lines.append("🟢 Gitea API OK")
elif g_status is None:
lines.append("🔴 Gitea API unreachable")
else:
lines.append(f"🟡 Gitea API HTTP {g_status}")
usage, mount = worst_disk_usage()
if usage is None:
lines.append("⚠️ Disk n/a")
elif usage > cfg["thresholds"]["disk_warn"]:
elif usage > disk_warn:
lines.append(f"🟡 Disk {usage}% ({mount})")
else:
lines.append(f"🟢 Disk {usage}% ({mount})")
load = psutil.getloadavg()[0]
lines.append(f"{'🟢' if load < cfg['thresholds']['load_warn'] else '🟡'} Load {load}")
lines.append(f"{'🟢' if load < load_warn else '🟡'} Load {load}")
return "\n".join(lines)

View File

@@ -4,6 +4,7 @@ from collections import deque
from datetime import datetime, timedelta, timezone
from logging.handlers import TimedRotatingFileHandler
from typing import Any
from services import runtime_state
def _get_path(cfg: dict[str, Any]) -> str:
@@ -44,9 +45,11 @@ def _get_logger(cfg: dict[str, Any]) -> logging.Logger:
return logger
def log_incident(cfg: dict[str, Any], text: str) -> None:
def log_incident(cfg: dict[str, Any], text: str, category: str | None = None) -> None:
if not cfg.get("incidents", {}).get("enabled", True):
return
if category and "category=" not in text:
text = f"category={category} {text}"
logger = _get_logger(cfg)
logger.info(text)
@@ -63,6 +66,10 @@ def _parse_line(line: str) -> tuple[datetime | None, str]:
def read_recent(cfg: dict[str, Any], hours: int, limit: int = 200) -> list[str]:
return [f"{dt:%Y-%m-%d %H:%M} {msg}" for dt, msg in read_raw(cfg, hours, limit=limit)]
def read_raw(cfg: dict[str, Any], hours: int, limit: int = 200, *, include_old: bool = False) -> list[tuple[datetime, str]]:
path = _get_path(cfg)
if not os.path.exists(path):
return []
@@ -72,7 +79,40 @@ def read_recent(cfg: dict[str, Any], hours: int, limit: int = 200) -> list[str]:
with open(path, "r", encoding="utf-8", errors="replace") as f:
for line in f:
dt, msg = _parse_line(line.rstrip())
if dt is None or dt < since:
if dt is None:
continue
lines.append(f"{dt:%Y-%m-%d %H:%M} {msg}")
if not include_old and dt < since:
continue
lines.append((dt, msg))
return list(lines)
def infer_category(text: str) -> str | None:
lower = text.lower()
if "category=" in lower:
import re
m = re.search(r"category=([a-z0-9_-]+)", lower)
if m:
return m.group(1)
if "load" in lower:
return "load"
if "docker" in lower:
return "docker"
if "restic" in lower or "backup" in lower:
return "backup"
if "smart" in lower:
return "smart"
if "ssl" in lower or "cert" in lower:
return "ssl"
if "npmplus" in lower:
return "npmplus"
if "gitea" in lower:
return "gitea"
if "openwrt" in lower:
return "openwrt"
if "queue" in lower:
return "queue"
if "selftest" in lower:
return "selftest"
return None

35
services/logging_setup.py Normal file
View File

@@ -0,0 +1,35 @@
import logging
import os
from logging.handlers import TimedRotatingFileHandler
def setup_logging(cfg: dict) -> None:
log_cfg = cfg.get("logging", {})
if not log_cfg.get("enabled", True):
return
path = log_cfg.get("path", "/var/server-bot/bot.log")
rotate_when = log_cfg.get("rotate_when", "W0")
backup_count = int(log_cfg.get("backup_count", 8))
level = str(log_cfg.get("level", "INFO")).upper()
os.makedirs(os.path.dirname(path), exist_ok=True)
root = logging.getLogger()
for handler in root.handlers:
if isinstance(handler, TimedRotatingFileHandler) and handler.baseFilename == path:
return
handler = TimedRotatingFileHandler(
path,
when=rotate_when,
interval=1,
backupCount=backup_count,
encoding="utf-8",
utc=True,
)
formatter = logging.Formatter("%(asctime)s\t%(levelname)s\t%(name)s\t%(message)s")
handler.setFormatter(formatter)
root.setLevel(level)
root.addHandler(handler)

View File

@@ -1,14 +1,83 @@
import time
from datetime import datetime
from aiogram import Bot
from app import cfg
from services.alert_mute import is_muted, is_auto_muted
from services.incidents import log_incident
async def notify(bot: Bot, chat_id: int, text: str):
_LAST_SENT: dict[str, float] = {}
def _parse_hhmm(value: str) -> int | None:
try:
hours, minutes = value.strip().split(":", 1)
h = int(hours)
m = int(minutes)
except Exception:
return None
if not (0 <= h <= 23 and 0 <= m <= 59):
return None
return h * 60 + m
def _in_quiet_hours(alerts_cfg: dict) -> bool:
quiet = alerts_cfg.get("quiet_hours", {})
if not quiet.get("enabled", False):
return False
start_min = _parse_hhmm(quiet.get("start", "23:00"))
end_min = _parse_hhmm(quiet.get("end", "08:00"))
if start_min is None or end_min is None:
return False
if start_min == end_min:
return False
now = datetime.now()
now_min = now.hour * 60 + now.minute
if start_min < end_min:
return start_min <= now_min < end_min
return now_min >= start_min or now_min < end_min
async def notify(
bot: Bot,
chat_id: int,
text: str,
level: str = "info",
key: str | None = None,
category: str | None = None,
):
alerts_cfg = cfg.get("alerts", {})
suppressed_reason = None
if category and is_muted(category):
suppressed_reason = "muted"
elif category and is_auto_muted(cfg, category):
suppressed_reason = "auto_mute"
elif _in_quiet_hours(alerts_cfg):
allow_critical = bool(alerts_cfg.get("quiet_hours", {}).get("allow_critical", True))
if not (allow_critical and level == "critical"):
suppressed_reason = "quiet_hours"
if suppressed_reason:
try:
log_incident(cfg, f"[suppressed:{suppressed_reason}] {text}", category=category)
except Exception:
pass
return
dedup_sec = int(alerts_cfg.get("notify_cooldown_sec", alerts_cfg.get("cooldown_sec", 900)))
if dedup_sec > 0:
dedup_key = key or text
now = time.time()
last_time = _LAST_SENT.get(dedup_key, 0)
if now - last_time < dedup_sec:
return
_LAST_SENT[dedup_key] = now
try:
await bot.send_message(chat_id, text)
except Exception:
pass
try:
log_incident(cfg, text)
log_incident(cfg, text, category=category)
except Exception:
pass

View File

@@ -38,7 +38,11 @@ def _format_rate(rate: Any) -> str:
return "?"
if val <= 0:
return "?"
return f"{val / 1000:.1f}M"
if val >= 1_000_000:
return f"{val / 1_000_000:.1f}M"
if val >= 1_000:
return f"{val / 1_000:.1f}K"
return f"{val:.0f}b"
def _extract_wan_ip(wan: dict[str, Any]) -> str | None:
@@ -128,6 +132,18 @@ def _extract_lease_name_map(leases: Any) -> dict[str, str]:
return out
def _extract_lease_name_map_fallback(raw: str) -> dict[str, str]:
out: dict[str, str] = {}
for line in raw.splitlines():
parts = line.strip().split()
if len(parts) < 4:
continue
_expiry, mac, _ipaddr, host = parts[:4]
host = host if host != "*" else "unknown"
out[str(mac).lower()] = str(host)
return out
def _extract_ifnames(wireless: dict[str, Any]) -> list[str]:
ifnames: list[str] = []
if not isinstance(wireless, dict):
@@ -189,6 +205,19 @@ def _extract_hostapd_ifnames(raw: str) -> list[str]:
return ifnames
def _net_label_for_ifname(ifname: str, ifname_meta: dict[str, dict[str, str]]) -> str:
meta = ifname_meta.get(ifname, {})
ssid = meta.get("ssid") or ""
band = meta.get("band") or ""
if ssid and band:
return f"{ssid} ({band})"
if ssid:
return ssid
if band:
return band
return ifname
def _safe_json_load(raw: str) -> Any | None:
if not raw:
return None
@@ -211,7 +240,7 @@ def _parse_hostapd_clients(
*,
name_map: dict[str, str] | None = None,
ifname_meta: dict[str, dict[str, str]] | None = None,
) -> list[str]:
) -> list[tuple[str, int | None, str]]:
if not isinstance(payload, dict):
return []
data = payload.get("clients")
@@ -219,7 +248,7 @@ def _parse_hostapd_clients(
items = data.items()
else:
return []
clients: list[str] = []
clients: list[tuple[str, int | None, str]] = []
name_map = name_map or {}
meta = (ifname_meta or {}).get(ifname, {})
ssid = meta.get("ssid") or ""
@@ -245,7 +274,8 @@ def _parse_hostapd_clients(
client_label = host
else:
client_label = str(mac)
clients.append(f"{net_label} {client_label} {sig} rx:{rx} tx:{tx}")
line = f"{net_label} {client_label} {sig} rx:{rx} tx:{tx}"
clients.append((line, signal if isinstance(signal, (int, float)) else None, net_label))
return clients
@@ -279,7 +309,7 @@ def _parse_leases_fallback(raw: str) -> list[str]:
return out
async def get_openwrt_status(cfg: dict[str, Any]) -> str:
async def get_openwrt_status(cfg: dict[str, Any], mode: str = "full") -> str:
ow_cfg = cfg.get("openwrt", {})
host = ow_cfg.get("host")
user = ow_cfg.get("user", "root")
@@ -324,19 +354,11 @@ async def get_openwrt_status(cfg: dict[str, Any]) -> str:
if len(parts) < 4:
return "⚠️ OpenWrt response incomplete"
sys_info = None
wan_status = None
wireless = None
leases = None
leases_fallback = ""
sys_info = _safe_json_load(parts[0])
if sys_info is None:
sys_info = None
wan_status = _safe_json_load(parts[1]) or {}
wireless = _safe_json_load(parts[2]) or {}
leases = _safe_json_load(parts[3])
if leases is None:
leases_fallback = parts[3]
leases_fallback = "" if leases is not None else parts[3]
if isinstance(sys_info, dict):
uptime_raw = sys_info.get("uptime")
@@ -360,6 +382,10 @@ async def get_openwrt_status(cfg: dict[str, Any]) -> str:
ifnames.extend(_extract_hostapd_ifnames(out_l))
ifnames = sorted({name for name in ifnames if name})
lease_name_map = _extract_lease_name_map(leases or {})
if leases_fallback:
lease_name_map.update(_extract_lease_name_map_fallback(leases_fallback))
wifi_net_counts: dict[str, int] = {}
wifi_signals: dict[str, list[int]] = {}
if ifnames:
for ifname in ifnames:
cmd_clients = ssh_cmd + ["ubus", "call", f"hostapd.{ifname}", "get_clients"]
@@ -369,43 +395,110 @@ async def get_openwrt_status(cfg: dict[str, Any]) -> str:
if rc2 == 0:
payload = _safe_json_load(out2)
if payload:
wifi_clients.extend(
_parse_hostapd_clients(
payload,
ifname,
name_map=lease_name_map,
ifname_meta=ifname_meta,
)
clients_payload = payload.get("clients") if isinstance(payload, dict) else None
if isinstance(clients_payload, dict):
label = _net_label_for_ifname(ifname, ifname_meta)
wifi_net_counts[label] = wifi_net_counts.get(label, 0) + len(clients_payload)
parsed = _parse_hostapd_clients(
payload,
ifname,
name_map=lease_name_map,
ifname_meta=ifname_meta,
)
wifi_clients.extend([p[0] for p in parsed])
for _line, sig, net_label in parsed:
if sig is not None and net_label:
wifi_signals.setdefault(net_label, []).append(sig)
if leases:
leases_list = _extract_leases(leases)
else:
leases_list = _parse_leases_fallback(leases_fallback)
lines = [
header = [
"📡 OpenWrt",
f"🕒 Uptime: {uptime}",
f"⚙️ Load: {load}",
f"🌐 WAN: {wan_ip} ({wan_state})",
"",
f"📶 Wi-Fi clients: {len(wifi_clients)}",
]
wifi_section: list[str] = []
if wifi_net_counts:
wifi_section.append("📶 Wi-Fi networks:")
for label, count in sorted(wifi_net_counts.items()):
sigs = wifi_signals.get(label) or []
if sigs:
avg_sig = sum(sigs) / len(sigs)
min_sig = min(sigs)
wifi_section.append(f" - {label}: {count} (avg {avg_sig:.0f}dBm, min {min_sig}dBm)")
else:
wifi_section.append(f" - {label}: {count}")
wifi_section.append("")
wifi_section.append(f"📶 Wi-Fi clients: {len(wifi_clients)}")
if wifi_clients:
for line in wifi_clients[:20]:
lines.append(f" - {line}")
wifi_section.append(f" - {line}")
if len(wifi_clients) > 20:
lines.append(f" … and {len(wifi_clients) - 20} more")
wifi_section.append(f" … and {len(wifi_clients) - 20} more")
else:
lines.append(" (none)")
wifi_section.append(" (none)")
lines += ["", f"🧾 DHCP leases: {len(leases_list)}"]
lease_section: list[str] = ["", f"🧾 DHCP leases: {len(leases_list)}"]
if leases_list:
for line in leases_list[:20]:
lines.append(f" - {line}")
lease_section.append(f" - {line}")
if len(leases_list) > 20:
lines.append(f" … and {len(leases_list) - 20} more")
lease_section.append(f" … and {len(leases_list) - 20} more")
else:
lines.append(" (none)")
lease_section.append(" (none)")
return "\n".join(lines)
if mode == "wan":
return "\n".join(header)
if mode == "clients":
return "\n".join(header + wifi_section)
if mode == "leases":
return "\n".join(header + lease_section)
return "\n".join(header + wifi_section + lease_section)
async def fetch_openwrt_leases(cfg: dict[str, Any]) -> list[str]:
"""
Fetch DHCP leases as list of strings "IP hostname (MAC)".
"""
ow_cfg = cfg.get("openwrt", {})
host = ow_cfg.get("host")
user = ow_cfg.get("user", "root")
port = ow_cfg.get("port", 22)
identity_file = ow_cfg.get("identity_file")
timeout_sec = ow_cfg.get("timeout_sec", 8)
strict = ow_cfg.get("strict_host_key_checking", True)
if not host:
raise RuntimeError("OpenWrt host not configured")
ssh_cmd = [
"ssh",
"-o",
"BatchMode=yes",
"-o",
f"ConnectTimeout={timeout_sec}",
"-o",
"LogLevel=ERROR",
]
if not strict:
ssh_cmd += ["-o", "StrictHostKeyChecking=no", "-o", "UserKnownHostsFile=/dev/null"]
if identity_file:
ssh_cmd += ["-i", str(identity_file)]
ssh_cmd += ["-p", str(port), f"{user}@{host}"]
remote = "ubus call luci-rpc getDHCPLeases '{\"family\":4}' 2>/dev/null || cat /tmp/dhcp.leases"
rc, out = await run_cmd_full(ssh_cmd + ["sh", "-c", remote], timeout=timeout_sec + 10)
if rc == 124:
raise RuntimeError("timeout")
if rc != 0:
raise RuntimeError(out.strip() or f"ssh rc={rc}")
leases = _safe_json_load(out)
if leases:
return _extract_leases(leases)
return _parse_leases_fallback(out)

View File

@@ -1,40 +1,149 @@
import asyncio
import logging
import time
from collections import deque
from typing import Awaitable, Callable, Any
from services import runtime_state
from services.incidents import log_incident
_queue: asyncio.Queue = asyncio.Queue()
_current_label: str | None = None
_current_meta: dict[str, Any] | None = None
_pending: deque[tuple[str, float]] = deque()
_stats: dict[str, Any] = runtime_state.get("queue_stats", {}) or {
"processed": 0,
"avg_wait_sec": 0.0,
"avg_runtime_sec": 0.0,
"last_label": "",
"last_finished_at": 0.0,
}
_history: deque[dict[str, Any]] = deque(runtime_state.get("queue_history", []) or [], maxlen=50)
_alert_cfg: dict[str, Any] = {
"max_pending": None,
"avg_wait": None,
"cooldown": 300,
"last_sent": 0.0,
}
_cfg: dict[str, Any] | None = None
_logger = logging.getLogger("queue")
def _save_stats():
runtime_state.set_state("queue_stats", _stats)
runtime_state.set_state("queue_history", list(_history))
def configure(queue_cfg: dict[str, Any], cfg: dict[str, Any]):
global _cfg
_cfg = cfg
_alert_cfg["max_pending"] = queue_cfg.get("max_pending_alert")
_alert_cfg["avg_wait"] = queue_cfg.get("avg_wait_alert")
_alert_cfg["cooldown"] = queue_cfg.get("cooldown_sec", 300)
def _check_congestion(pending_count: int, avg_wait: float | None):
max_pending = _alert_cfg.get("max_pending")
avg_wait_thr = _alert_cfg.get("avg_wait")
cooldown = _alert_cfg.get("cooldown", 300)
now = time.time()
if now - _alert_cfg.get("last_sent", 0) < cooldown:
return
reason = None
if max_pending and pending_count >= max_pending:
reason = f"pending={pending_count} >= {max_pending}"
if avg_wait_thr and avg_wait is not None and avg_wait >= avg_wait_thr:
reason = reason or f"avg_wait={avg_wait:.1f}s >= {avg_wait_thr}s"
if reason and _cfg:
try:
log_incident(_cfg, f"queue_congested {reason}", category="queue")
except Exception:
pass
_alert_cfg["last_sent"] = now
async def enqueue(label: str, job: Callable[[], Awaitable[None]]) -> int:
await _queue.put((label, job, time.time()))
return _queue.qsize()
enqueued_at = time.time()
await _queue.put((label, job, enqueued_at))
_pending.append((label, enqueued_at))
_check_congestion(len(_pending), None)
return len(_pending)
async def worker():
global _current_label, _current_meta
while True:
label, job, enqueued_at = await _queue.get()
if _pending:
if _pending[0] == (label, enqueued_at):
_pending.popleft()
else:
try:
_pending.remove((label, enqueued_at))
except ValueError:
pass
_current_label = label
_current_meta = {"enqueued_at": enqueued_at, "started_at": time.time()}
status = "ok"
try:
await job()
except Exception as e:
status = "err"
_logger.exception("Queue job failed: label=%s", label)
if _cfg:
try:
log_incident(
_cfg,
f"queue_job_failed label={label} error={type(e).__name__}: {e}",
category="queue",
)
except Exception:
pass
finally:
finished_at = time.time()
if _current_meta:
wait_sec = max(0.0, _current_meta["started_at"] - _current_meta["enqueued_at"])
runtime_sec = max(0.0, finished_at - _current_meta["started_at"])
n_prev = int(_stats.get("processed", 0))
_stats["processed"] = n_prev + 1
_stats["avg_wait_sec"] = (
(_stats.get("avg_wait_sec", 0.0) * n_prev) + wait_sec
) / _stats["processed"]
_stats["avg_runtime_sec"] = (
(_stats.get("avg_runtime_sec", 0.0) * n_prev) + runtime_sec
) / _stats["processed"]
_stats["last_label"] = label
_stats["last_finished_at"] = finished_at
_history.appendleft(
{
"label": label,
"wait_sec": int(wait_sec),
"runtime_sec": int(runtime_sec),
"finished_at": int(finished_at),
"status": status,
}
)
_save_stats()
_check_congestion(len(_pending), _stats.get("avg_wait_sec"))
_current_label = None
_current_meta = None
_queue.task_done()
def format_status() -> str:
pending = list(_queue._queue)
pending = list(_pending)
lines = ["🧾 Queue"]
lines.append(f"🔄 Running: {_current_label or 'idle'}")
lines.append(f"⏳ Pending: {len(pending)}")
if pending:
preview = ", ".join([p[0] for p in pending[:5]])
lines.append(f"➡️ Next: {preview}")
if _stats.get("processed"):
lines.append(
f"📈 Done: {_stats.get('processed')} | "
f"avg wait {int(_stats.get('avg_wait_sec', 0))}s | "
f"avg run {int(_stats.get('avg_runtime_sec', 0))}s"
)
return "\n".join(lines)
@@ -48,11 +157,53 @@ def format_details(limit: int = 10) -> str:
else:
lines.append("🔄 Running: idle")
pending = list(_queue._queue)
pending = list(_pending)
lines.append(f"⏳ Pending: {len(pending)}")
if pending:
lines.append("🔢 Position | Label | Wait")
for i, (label, _job, enqueued_at) in enumerate(pending[:limit], start=1):
for i, (label, enqueued_at) in enumerate(pending[:limit], start=1):
wait = int(now - enqueued_at)
lines.append(f"{i:>3} | {label} | {wait}s")
if _stats.get("processed"):
lines.append("")
lines.append(
"📈 Stats: "
f"{_stats.get('processed')} done, "
f"avg wait {int(_stats.get('avg_wait_sec', 0))}s, "
f"avg run {int(_stats.get('avg_runtime_sec', 0))}s"
)
last_label = _stats.get("last_label")
if last_label:
lines.append(f"Last: {last_label}")
if _history:
lines.append("")
lines.append("🗂 Last jobs:")
for item in list(_history)[:5]:
t = time.strftime("%H:%M:%S", time.localtime(item["finished_at"]))
lines.append(
f"- {t} {item['label']} {item['status']} "
f"(wait {item['wait_sec']}s, run {item['runtime_sec']}s)"
)
return "\n".join(lines)
def format_history(limit: int = 20) -> str:
lines = ["🗂 Queue history"]
if not _history:
lines.append("(empty)")
return "\n".join(lines)
for item in list(_history)[:limit]:
t = time.strftime("%m-%d %H:%M:%S", time.localtime(item["finished_at"]))
lines.append(
f"{t} {item['label']} {item['status']} "
f"(wait {item['wait_sec']}s, run {item['runtime_sec']}s)"
)
return "\n".join(lines)
def get_history_raw() -> list[dict[str, Any]]:
return list(_history)
def get_stats() -> dict[str, Any]:
return dict(_stats)

73
services/runtime_state.py Normal file
View File

@@ -0,0 +1,73 @@
import json
import os
import threading
import tempfile
from typing import Any, Dict
_PATH = "/var/server-bot/runtime.json"
_STATE: Dict[str, Any] = {}
_LOCK = threading.RLock()
_LOADED = False
def configure(path: str | None):
global _PATH
if path:
_PATH = path
def _load_from_disk():
global _STATE, _LOADED
if not os.path.exists(_PATH):
_STATE = {}
_LOADED = True
return
try:
with open(_PATH, "r", encoding="utf-8") as f:
_STATE = json.load(f)
except Exception:
_STATE = {}
_LOADED = True
def _save():
directory = os.path.dirname(_PATH) or "."
os.makedirs(directory, exist_ok=True)
try:
fd, tmp_path = tempfile.mkstemp(prefix=".runtime.", suffix=".json", dir=directory)
try:
with os.fdopen(fd, "w", encoding="utf-8") as f:
json.dump(_STATE, f, ensure_ascii=False)
f.flush()
os.fsync(f.fileno())
os.replace(tmp_path, _PATH)
finally:
if os.path.exists(tmp_path):
try:
os.unlink(tmp_path)
except Exception:
pass
except Exception:
pass
def get_state() -> Dict[str, Any]:
with _LOCK:
if not _LOADED:
_load_from_disk()
return _STATE
def set_state(key: str, value: Any):
with _LOCK:
if not _LOADED:
_load_from_disk()
_STATE[key] = value
_save()
def get(key: str, default: Any = None) -> Any:
with _LOCK:
if not _LOADED:
_load_from_disk()
return _STATE.get(key, default)

95
services/selftest.py Normal file
View File

@@ -0,0 +1,95 @@
import json
from datetime import datetime, timedelta
import asyncio
from typing import Any
from services.health import health
from services.runner import run_cmd_full
from services.incidents import log_incident
from services import runtime_state
def _save_history(entry: dict[str, Any]) -> None:
hist = runtime_state.get("selftest_history", [])
hist = hist[:50] if isinstance(hist, list) else []
hist.insert(0, entry)
runtime_state.set_state("selftest_history", hist[:20])
async def run_selftest(cfg: dict[str, Any], docker_map: dict[str, str]) -> tuple[str, bool]:
lines = ["🧪 Self-test"]
ok = True
# health
try:
htext = await asyncio.to_thread(health, cfg, docker_map)
h_lines = [ln for ln in htext.splitlines() if ln.strip()]
brief = " | ".join(h_lines[1:5]) if len(h_lines) > 1 else h_lines[0] if h_lines else "n/a"
lines.append(f"🟢 Health: {brief}")
except Exception as e:
lines.append(f"🔴 Health failed: {e}")
ok = False
# restic snapshots check
rc, out = await run_cmd_full(["restic", "snapshots", "--json"], use_restic_env=True, timeout=40)
if rc == 0:
try:
snaps = json.loads(out)
if isinstance(snaps, list) and snaps:
snaps.sort(key=lambda s: s.get("time", ""), reverse=True)
last = snaps[0]
t = last.get("time", "?").replace("Z", "").replace("T", " ")[:16]
lines.append(f"🟢 Restic snapshots: {len(snaps)}, last {t}")
else:
lines.append("🟡 Restic snapshots: empty")
except Exception:
lines.append("🟡 Restic snapshots: invalid JSON")
else:
lines.append(f"🔴 Restic snapshots error: {out.strip() or rc}")
ok = False
result_text = "\n".join(lines)
try:
_save_history(
{
"ts": datetime.now().isoformat(),
"ok": ok,
"summary": result_text.splitlines()[1] if len(lines) > 1 else "",
}
)
except Exception:
pass
return result_text, ok
async def schedule_selftest(cfg: dict[str, Any], bot, admin_ids: list[int], docker_map: dict[str, str]):
"""
Run selftest daily at configured time.
"""
sched_cfg = cfg.get("selftest", {}).get("schedule", {})
if not sched_cfg.get("enabled", False):
return
time_str = sched_cfg.get("time", "03:30")
try:
hh, mm = [int(x) for x in time_str.split(":")]
except Exception:
hh, mm = 3, 30
while True:
now = datetime.now()
run_at = now.replace(hour=hh, minute=mm, second=0, microsecond=0)
if run_at <= now:
run_at += timedelta(days=1)
await asyncio.sleep((run_at - now).total_seconds())
text, ok = await run_selftest(cfg, docker_map)
for chat_id in admin_ids:
try:
await bot.send_message(chat_id, text)
except Exception:
pass
if not ok:
try:
log_incident(cfg, "selftest failed", category="selftest")
except Exception:
pass

View File

@@ -46,10 +46,14 @@ async def monitor_ssl(cfg: dict[str, Any], notify, bot, chat_id: int):
key = f"{name}:{threshold}"
last_time = last_sent.get(key, 0)
if time.time() - last_time >= cooldown:
level = "critical" if days_left <= 1 else "warn"
await notify(
bot,
chat_id,
f"⚠️ SSL `{name}` expires in {days_left}d (threshold {threshold}d)",
level=level,
key=f"ssl:{name}:{threshold}",
category="ssl",
)
last_sent[key] = time.time()
break

107
services/weekly_report.py Normal file
View File

@@ -0,0 +1,107 @@
import asyncio
import socket
from datetime import datetime, timedelta
import psutil
from services.system import worst_disk_usage
from services.alert_mute import list_mutes
from services.incidents import read_recent
from services.docker import docker_cmd
def _parse_hhmm(value: str) -> tuple[int, int]:
try:
h, m = value.split(":", 1)
h = int(h)
m = int(m)
if 0 <= h <= 23 and 0 <= m <= 59:
return h, m
except Exception:
pass
return 8, 0
def _next_run(day: str, time_str: str) -> datetime:
day = (day or "Sun").lower()
day_map = {"mon": 0, "tue": 1, "wed": 2, "thu": 3, "fri": 4, "sat": 5, "sun": 6}
target_wd = day_map.get(day[:3], 6)
hour, minute = _parse_hhmm(time_str or "08:00")
now = datetime.now()
candidate = now.replace(hour=hour, minute=minute, second=0, microsecond=0)
# find next target weekday/time
while candidate <= now or candidate.weekday() != target_wd:
candidate = candidate + timedelta(days=1)
candidate = candidate.replace(hour=hour, minute=minute, second=0, microsecond=0)
return candidate
async def _docker_running_counts(docker_map: dict) -> tuple[int, int]:
total = len(docker_map)
running = 0
for real in docker_map.values():
rc, raw = await docker_cmd(["inspect", "-f", "{{.State.Status}}", real], timeout=10)
if rc == 0 and raw.strip() == "running":
running += 1
return running, total
def _format_uptime(seconds: int) -> str:
days, rem = divmod(seconds, 86400)
hours, rem = divmod(rem, 3600)
minutes, _ = divmod(rem, 60)
return f"{days}d {hours:02d}:{minutes:02d}"
async def build_weekly_report(cfg, docker_map: dict) -> str:
host = socket.gethostname()
uptime = int(datetime.now().timestamp() - psutil.boot_time())
load1, load5, load15 = psutil.getloadavg()
mem = psutil.virtual_memory()
disk_usage, disk_mount = worst_disk_usage()
running, total = await _docker_running_counts(docker_map)
mutes = list_mutes()
incidents_24 = len(read_recent(cfg, 24, limit=1000))
incidents_7d = len(read_recent(cfg, 24 * 7, limit=2000))
lines = [
f"🧾 Weekly report — {host}",
f"⏱ Uptime: {_format_uptime(uptime)}",
f"⚙️ Load: {load1:.2f} {load5:.2f} {load15:.2f}",
f"🧠 RAM: {mem.percent}%",
]
if disk_usage is None:
lines.append("💾 Disk: n/a")
else:
lines.append(f"💾 Disk: {disk_usage}% ({disk_mount})")
lines.append(f"🐳 Docker: {running}/{total} running")
lines.append(f"📓 Incidents: 24h={incidents_24}, 7d={incidents_7d}")
if mutes:
lines.append("🔕 Active mutes:")
for cat, secs in mutes.items():
mins = max(0, secs) // 60
lines.append(f"- {cat}: {mins}m left")
else:
lines.append("🔔 Mutes: none")
return "\n".join(lines)
async def weekly_reporter(cfg, bot, admin_ids: list[int], docker_map: dict):
reports_cfg = cfg.get("reports", {}).get("weekly", {})
if not reports_cfg.get("enabled", False):
return
day = reports_cfg.get("day", "Sun")
time_str = reports_cfg.get("time", "08:00")
while True:
target = _next_run(day, time_str)
wait_sec = (target - datetime.now()).total_seconds()
if wait_sec > 0:
await asyncio.sleep(wait_sec)
try:
text = await build_weekly_report(cfg, docker_map)
for admin_id in admin_ids:
await bot.send_message(admin_id, text)
except Exception:
pass
await asyncio.sleep(60) # small delay to avoid tight loop if time skew

View File

@@ -1,5 +1,6 @@
import subprocess
import os
import re
def _cmd(cmd: str) -> str:
@@ -82,6 +83,62 @@ def list_disks() -> list[str]:
return disks
def list_md_arrays() -> list[str]:
# Prefer /proc/mdstat: it reliably lists active md arrays
# even when lsblk tree/filters differ across distros.
out = _cmd("cat /proc/mdstat")
arrays: set[str] = set()
for line in out.splitlines():
m = re.match(r"^\s*(md\d+)\s*:", line)
if m:
arrays.add(f"/dev/{m.group(1)}")
if arrays:
return sorted(arrays)
# Fallback for environments where mdstat parsing is unavailable.
out = _cmd("ls -1 /dev/md* 2>/dev/null")
for line in out.splitlines():
dev = line.strip()
if dev and re.match(r"^/dev/md\d+$", dev):
arrays.add(dev)
return sorted(arrays)
def md_array_status(dev: str) -> str:
out = _cmd("cat /proc/mdstat")
if not out or "ERROR:" in out:
return "⚠️ n/a"
name = dev.rsplit("/", 1)[-1]
lines = out.splitlines()
header = None
idx = -1
for i, line in enumerate(lines):
s = line.strip()
if s.startswith(f"{name} :"):
header = s
idx = i
break
if not header:
return "⚠️ not found in /proc/mdstat"
if "inactive" in header:
return "🔴 inactive"
# Typical mdstat health marker: [UU] for healthy mirrors/raid members.
block = [header]
for line in lines[idx + 1:]:
if not line.strip():
break
block.append(line.strip())
block_text = " ".join(block)
if "[U_" in block_text or "[_U" in block_text:
return "🟡 degraded"
return "🟢 active"
def smart_health(dev: str) -> str:
out = _cmd(f"smartctl -H {dev}")
@@ -138,8 +195,9 @@ def smart_last_test(dev: str) -> str:
def disks() -> str:
disks = list_disks()
md_arrays = list_md_arrays()
if not disks:
if not disks and not md_arrays:
return "💽 Disks\n\n❌ No disks found"
lines = ["💽 Disks (SMART)\n"]
@@ -158,6 +216,12 @@ def disks() -> str:
lines.append(f"{icon} {d}{health}, 🌡 {temp}")
if md_arrays:
lines.append("")
lines.append("🧱 RAID (md)")
for md in md_arrays:
lines.append(f"{md}{md_array_status(md)}")
return "\n".join(lines)

View File

@@ -0,0 +1,20 @@
import unittest
from services.config_check import validate_cfg
class ConfigCheckTests(unittest.TestCase):
def test_admin_ids_without_admin_id_is_valid(self):
cfg = {
"telegram": {
"token": "x",
"admin_ids": [1, 2],
}
}
errors, warnings = validate_cfg(cfg)
self.assertEqual(errors, [])
self.assertIsInstance(warnings, list)
if __name__ == "__main__":
unittest.main()

21
tests/test_disk_report.py Normal file
View File

@@ -0,0 +1,21 @@
import unittest
import types
import sys
# Avoid runtime import of real app/aiogram in services.runner.
sys.modules.setdefault("app", types.SimpleNamespace(RESTIC_ENV={}))
from services.disk_report import _top_dirs_cmd
class DiskReportTests(unittest.TestCase):
def test_top_dirs_cmd_uses_exec_args_without_shell(self):
cmd = _top_dirs_cmd("/tmp/path with spaces", 5)
self.assertEqual(cmd[:4], ["du", "-x", "-h", "-d"])
self.assertNotIn("bash", cmd)
self.assertNotIn("-lc", cmd)
self.assertEqual(cmd[-1], "/tmp/path with spaces")
if __name__ == "__main__":
unittest.main()

59
tests/test_queue.py Normal file
View File

@@ -0,0 +1,59 @@
import asyncio
import tempfile
import unittest
from services import runtime_state
from services import queue as queue_service
class QueueTests(unittest.IsolatedAsyncioTestCase):
async def asyncSetUp(self):
self.tmp = tempfile.TemporaryDirectory()
runtime_state.configure(f"{self.tmp.name}/runtime.json")
queue_service._pending.clear() # type: ignore[attr-defined]
queue_service._history.clear() # type: ignore[attr-defined]
queue_service._stats = { # type: ignore[attr-defined]
"processed": 0,
"avg_wait_sec": 0.0,
"avg_runtime_sec": 0.0,
"last_label": "",
"last_finished_at": 0.0,
}
queue_service._cfg = {"incidents": {"enabled": True}} # type: ignore[attr-defined]
async def asyncTearDown(self):
self.tmp.cleanup()
async def test_worker_logs_failed_job_to_incidents(self):
logged = []
def fake_log_incident(cfg, text, category=None):
logged.append((text, category))
orig = queue_service.log_incident
queue_service.log_incident = fake_log_incident
async def boom():
raise RuntimeError("boom")
worker_task = asyncio.create_task(queue_service.worker())
try:
await queue_service.enqueue("broken-job", boom)
await asyncio.wait_for(queue_service._queue.join(), timeout=2.0) # type: ignore[attr-defined]
finally:
worker_task.cancel()
with contextlib.suppress(asyncio.CancelledError):
await worker_task
queue_service.log_incident = orig
self.assertEqual(queue_service._stats.get("processed"), 1) # type: ignore[attr-defined]
self.assertTrue(any("queue_job_failed label=broken-job" in t for t, _c in logged))
self.assertTrue(any(c == "queue" for _t, c in logged))
import contextlib
if __name__ == "__main__":
unittest.main()

View File

@@ -0,0 +1,28 @@
import json
import tempfile
import unittest
from pathlib import Path
from services import runtime_state
class RuntimeStateTests(unittest.TestCase):
def test_set_and_get_persist_between_loads(self):
with tempfile.TemporaryDirectory() as tmp:
path = Path(tmp) / "runtime.json"
runtime_state.configure(str(path))
runtime_state.set_state("foo", {"bar": 1})
self.assertEqual(runtime_state.get("foo"), {"bar": 1})
# Force a fresh in-memory state and load from disk again.
runtime_state._STATE = {} # type: ignore[attr-defined]
runtime_state._LOADED = False # type: ignore[attr-defined]
self.assertEqual(runtime_state.get("foo"), {"bar": 1})
raw = json.loads(path.read_text(encoding="utf-8"))
self.assertEqual(raw.get("foo"), {"bar": 1})
if __name__ == "__main__":
unittest.main()