Compare commits

..

10 Commits

22 changed files with 402 additions and 64 deletions

View File

@@ -33,7 +33,7 @@ This project uses `config.yaml`. Start from `config.example.yaml`.
- `end` (string): End time `HH:MM` (e.g. `08:00`).
- `allow_critical` (bool): Allow critical alerts during quiet hours.
- `auto_mute` (list): Per-category auto mutes by time window.
- `category` (string): load/disk/smart/ssl/docker/test.
- `category` (string): load/disk/smart/raid/ssl/docker/test.
- `start` (string): Start `HH:MM`.
- `end` (string): End `HH:MM` (can wrap over midnight).
- `auto_mute_on_high_load_sec` (int): auto-mute `load` category for N seconds on critical load (0 disables).
@@ -42,6 +42,9 @@ This project uses `config.yaml`. Start from `config.example.yaml`.
- `smart_interval_sec` (int): SMART poll interval.
- `smart_cooldown_sec` (int): SMART alert cooldown.
- `smart_temp_warn` (int): SMART temperature warning (C).
- `raid_enabled` (bool): Enable md RAID polling (`/proc/mdstat`).
- `raid_interval_sec` (int): RAID poll interval.
- `raid_cooldown_sec` (int): RAID alert cooldown.
## disk_report

View File

@@ -33,7 +33,7 @@
- `end` (string): конец, формат `HH:MM` (например `08:00`).
- `allow_critical` (bool): слать критичные алерты в тишину.
- `auto_mute` (list): авто‑мьюты по категориям и времени.
- `category` (string): load/disk/smart/ssl/docker/test.
- `category` (string): load/disk/smart/raid/ssl/docker/test.
- `start` (string): начало `HH:MM`.
- `end` (string): конец `HH:MM` (интервал может пересекать ночь).
- `auto_mute_on_high_load_sec` (int): при critical load автоматически мьютить категорию `load` на N секунд (0 — выкл).
@@ -42,6 +42,9 @@
- `smart_interval_sec` (int): интервал SMART.
- `smart_cooldown_sec` (int): кулдаун SMART.
- `smart_temp_warn` (int): порог температуры (C).
- `raid_enabled` (bool): RAID проверки (`/proc/mdstat`).
- `raid_interval_sec` (int): интервал RAID.
- `raid_cooldown_sec` (int): кулдаун RAID алертов.
## disk_report

2
app.py
View File

@@ -15,7 +15,7 @@ else:
paths_cfg = cfg.get("paths", {})
runtime_state.configure(paths_cfg.get("runtime_state", "/var/server-bot/runtime.json"))
ARTIFACT_STATE = paths_cfg["artifact_state"]
ARTIFACT_STATE = paths_cfg.get("artifact_state", "/opt/tg-bot/state.json")
RESTIC_ENV = load_env(paths_cfg.get("restic_env", "/etc/restic/restic.env"))
DISK_WARN = int(cfg.get("thresholds", {}).get("disk_warn", 80))

View File

@@ -43,6 +43,9 @@ alerts:
smart_interval_sec: 3600
smart_cooldown_sec: 21600
smart_temp_warn: 50
raid_enabled: true
raid_interval_sec: 300
raid_cooldown_sec: 1800
disk_report:
threshold: 90

View File

@@ -16,7 +16,7 @@ HELP_TEXT = (
"/alerts unmute <category> - unmute category\n"
"/alerts list - show active mutes\n"
"/alerts recent [hours] - show incidents log (default 24h)\n"
"Categories: load, disk, smart, ssl, docker, test\n"
"Categories: load, disk, smart, raid, ssl, docker, test\n"
)

View File

@@ -2,7 +2,7 @@ import asyncio
from datetime import datetime
from aiogram import F
from aiogram.types import Message, InlineKeyboardMarkup, InlineKeyboardButton, CallbackQuery
from app import dp, cfg
from app import dp, cfg, ADMIN_IDS
from auth import is_admin_msg
from keyboards import docker_kb, arcane_kb
from services.arcane import list_projects, restart_project, set_project_state, get_project_details
@@ -115,7 +115,7 @@ async def arcane_refresh(msg: Message):
@dp.callback_query(F.data == "arcane:refresh")
async def arcane_refresh_inline(cb: CallbackQuery):
if cb.from_user.id != cfg["telegram"]["admin_id"]:
if cb.from_user.id not in ADMIN_IDS:
return
await cb.answer()
await cmd_arcane_projects(cb.message, edit=True)
@@ -123,7 +123,7 @@ async def arcane_refresh_inline(cb: CallbackQuery):
@dp.callback_query(F.data.startswith("arcane:page:"))
async def arcane_page(cb: CallbackQuery):
if cb.from_user.id != cfg["telegram"]["admin_id"]:
if cb.from_user.id not in ADMIN_IDS:
return
try:
page = int(cb.data.split(":", 2)[2])
@@ -141,7 +141,7 @@ async def arcane_page(cb: CallbackQuery):
@dp.callback_query(F.data.startswith("arcane:restart:"))
async def arcane_restart(cb: CallbackQuery):
if cb.from_user.id != cfg["telegram"]["admin_id"]:
if cb.from_user.id not in ADMIN_IDS:
return
_, _, pid = cb.data.split(":", 2)
@@ -160,7 +160,7 @@ async def arcane_restart(cb: CallbackQuery):
@dp.callback_query(F.data.startswith("arcane:details:"))
async def arcane_details(cb: CallbackQuery):
if cb.from_user.id != cfg["telegram"]["admin_id"]:
if cb.from_user.id not in ADMIN_IDS:
return
_, _, pid = cb.data.split(":", 2)
@@ -208,7 +208,7 @@ async def arcane_details(cb: CallbackQuery):
@dp.callback_query(F.data.startswith("arcane:deploy:"))
async def arcane_deploy_status(cb: CallbackQuery):
if cb.from_user.id != cfg["telegram"]["admin_id"]:
if cb.from_user.id not in ADMIN_IDS:
return
_, _, pid = cb.data.split(":", 2)
@@ -254,7 +254,7 @@ async def arcane_deploy_status(cb: CallbackQuery):
@dp.callback_query(F.data.startswith("arcane:up:"))
async def arcane_up(cb: CallbackQuery):
if cb.from_user.id != cfg["telegram"]["admin_id"]:
if cb.from_user.id not in ADMIN_IDS:
return
_, _, pid = cb.data.split(":", 2)
@@ -273,7 +273,7 @@ async def arcane_up(cb: CallbackQuery):
@dp.callback_query(F.data.startswith("arcane:down:"))
async def arcane_down(cb: CallbackQuery):
if cb.from_user.id != cfg["telegram"]["admin_id"]:
if cb.from_user.id not in ADMIN_IDS:
return
_, _, pid = cb.data.split(":", 2)

View File

@@ -15,8 +15,15 @@ async def docker_callback(cb: CallbackQuery):
if cb.from_user.id != ADMIN_ID:
return
_, action, alias = cb.data.split(":", 2)
real = DOCKER_MAP[alias]
try:
_, action, alias = cb.data.split(":", 2)
except ValueError:
await cb.answer("Bad request")
return
real = DOCKER_MAP.get(alias)
if not real:
await cb.answer("Container not found")
return
if action == "restart":
await cb.answer("Restarting…")
@@ -60,7 +67,7 @@ async def snapshot_details(cb: CallbackQuery):
snap_id = cb.data.split(":", 1)[1]
await cb.answer("Loading snapshot…")
# получаем статистику snapshot
# получаем статистику snapshot
rc, raw = await run_cmd(
["restic", "stats", snap_id, "--json"],
use_restic_env=True,

View File

@@ -24,7 +24,7 @@ HELP_PAGES = [
"• `/alerts mute <cat> <minutes>` / `/alerts unmute <cat>` / `/alerts list`\n"
"• `/alerts recent [hours]`\n"
"Шорткаты: `/alerts_list`, `/alerts_recent`, `/alerts_mute_load` (60м).\n"
"Категории: load, disk, smart, ssl, docker, test.\n"
"Категории: load, disk, smart, raid, ssl, docker, test.\n"
"Quiet hours: `alerts.quiet_hours` для не‑критичных.\n"
"Авто-мьют: `alerts.auto_mute` со слотами времени.\n"
"Только красные load: `alerts.load_only_critical: true`.\n"

View File

@@ -3,7 +3,7 @@ import os
from datetime import datetime, timezone, timedelta
from aiogram import F
from aiogram.types import Message, CallbackQuery, InlineKeyboardMarkup, InlineKeyboardButton, InputFile, BufferedInputFile
from app import dp, cfg
from app import dp, cfg, ADMIN_IDS
from auth import is_admin_msg
from keyboards import (
system_info_kb,
@@ -13,8 +13,7 @@ from keyboards import (
system_logs_integrations_kb,
system_logs_kb,
openwrt_kb,
docker_kb,
backup_kb,
docker_kb, backup_kb,
)
from system_checks import security, disks, hardware, list_disks, smart_last_test
from services.http_checks import get_url_checks, check_url
@@ -224,13 +223,6 @@ async def openwrt_status(msg: Message):
asyncio.create_task(worker())
@dp.message(F.text == "/openwrt")
async def openwrt_cmd(msg: Message):
if not is_admin_msg(msg):
return
await openwrt_status(msg)
@dp.message(F.text == "/openwrt_wan")
async def openwrt_wan(msg: Message):
if not is_admin_msg(msg):
@@ -490,14 +482,15 @@ async def incidents_diff(msg: Message):
async def alerts_heatmap(msg: Message):
if not is_admin_msg(msg):
return
parts = msg.text.split()
hours = 48
category = None
for part in parts[1:]:
if part.isdigit():
hours = max(1, int(part))
else:
category = part.lower()
if msg.text.startswith("/alerts_heatmap"):
parts = msg.text.split()
for part in parts[1:]:
if part.isdigit():
hours = max(1, int(part))
else:
category = part.lower()
rows = read_raw(cfg, hours=hours, limit=8000, include_old=True)
buckets: dict[datetime, int] = {}
for dt, line in rows:
@@ -1029,7 +1022,7 @@ async def updates_page(cb: CallbackQuery):
@dp.callback_query(F.data == "upgrade:confirm")
async def upgrade_confirm(cb: CallbackQuery):
if cb.from_user.id != cfg["telegram"]["admin_id"]:
if cb.from_user.id not in ADMIN_IDS:
return
await cb.answer()
@@ -1052,7 +1045,7 @@ async def upgrade_cancel(cb: CallbackQuery):
@dp.callback_query(F.data == "reboot:confirm")
async def reboot_confirm(cb: CallbackQuery):
if cb.from_user.id != cfg["telegram"]["admin_id"]:
if cb.from_user.id not in ADMIN_IDS:
return
await cb.answer()
REBOOT_PENDING[cb.from_user.id] = {}
@@ -1067,7 +1060,7 @@ async def reboot_cancel(cb: CallbackQuery):
@dp.callback_query(F.data.startswith("npmplus:"))
async def npmplus_toggle(cb: CallbackQuery):
if cb.from_user.id != cfg["telegram"]["admin_id"]:
if cb.from_user.id not in ADMIN_IDS:
return
parts = cb.data.split(":")
if len(parts) != 3:

View File

@@ -1,4 +1,5 @@
from pathlib import Path
import os
import time
LOCK_DIR = Path("/var/run/tg-bot")
@@ -11,9 +12,14 @@ def lock_path(name: str) -> Path:
def acquire_lock(name: str) -> bool:
p = lock_path(name)
if p.exists():
try:
fd = os.open(str(p), os.O_CREAT | os.O_EXCL | os.O_WRONLY)
except FileExistsError:
return False
p.write_text(str(time.time()))
try:
os.write(fd, str(time.time()).encode("ascii", errors="ignore"))
finally:
os.close(fd)
return True

View File

@@ -5,7 +5,7 @@ from datetime import datetime
from app import bot, dp, cfg, ADMIN_ID, ADMIN_IDS
from keyboards import menu_kb
from services.docker import discover_containers, docker_watchdog
from services.alerts import monitor_resources, monitor_smart
from services.alerts import monitor_resources, monitor_smart, monitor_raid
from services.metrics import MetricsStore, start_sampler
from services.queue import worker as queue_worker, configure as queue_configure
from services.notify import notify
@@ -82,6 +82,8 @@ async def main():
asyncio.create_task(monitor_resources(cfg, notify, bot, ADMIN_ID))
if cfg.get("alerts", {}).get("smart_enabled", True):
asyncio.create_task(monitor_smart(cfg, notify, bot, ADMIN_ID))
if cfg.get("alerts", {}).get("raid_enabled", True):
asyncio.create_task(monitor_raid(cfg, notify, bot, ADMIN_ID))
if cfg.get("npmplus", {}).get("alerts", {}).get("enabled", True):
asyncio.create_task(monitor_ssl(cfg, notify, bot, ADMIN_ID))
if cfg.get("external_checks", {}).get("enabled", True):

View File

@@ -1,7 +1,7 @@
import asyncio
import time
import psutil
from system_checks import list_disks, smart_health, disk_temperature
from system_checks import list_disks, smart_health, disk_temperature, list_md_arrays, md_array_status
from services.system import worst_disk_usage
from services.disk_report import build_disk_report
@@ -130,3 +130,54 @@ async def monitor_smart(cfg, notify, bot, chat_id):
continue
await asyncio.sleep(interval)
async def monitor_raid(cfg, notify, bot, chat_id):
alerts_cfg = cfg.get("alerts", {})
interval = int(alerts_cfg.get("raid_interval_sec", 300))
cooldown = int(alerts_cfg.get("raid_cooldown_sec", 1800))
notify_recovery = bool(alerts_cfg.get("notify_recovery", True))
last_sent: dict[str, float] = {}
bad_state: dict[str, bool] = {}
while True:
now = time.time()
for dev in list_md_arrays():
status = md_array_status(dev)
lower = status.lower()
level = None
key_suffix = None
if "inactive" in lower:
level = "critical"
key_suffix = "inactive"
elif "degraded" in lower:
level = "warn"
key_suffix = "degraded"
if level:
if not bad_state.get(dev) or (now - last_sent.get(dev, 0.0) >= cooldown):
icon = "🔴" if level == "critical" else "🟡"
await notify(
bot,
chat_id,
f"{icon} RAID {dev}: {status}",
level=level,
key=f"raid_{key_suffix}:{dev}",
category="raid",
)
last_sent[dev] = now
bad_state[dev] = True
else:
if bad_state.get(dev) and notify_recovery:
await notify(
bot,
chat_id,
f"🟢 RAID {dev}: {status}",
level="info",
key=f"raid_ok:{dev}",
category="raid",
)
bad_state[dev] = False
await asyncio.sleep(interval)

View File

@@ -9,7 +9,9 @@ def validate_cfg(cfg: dict[str, Any]) -> Tuple[List[str], List[str]]:
tg = cfg.get("telegram", {})
if not tg.get("token"):
errors.append("telegram.token is missing")
if not tg.get("admin_id"):
admin_ids = tg.get("admin_ids")
has_admin_ids = isinstance(admin_ids, list) and len(admin_ids) > 0
if not tg.get("admin_id") and not has_admin_ids:
errors.append("telegram.admin_id is missing")
thresholds = cfg.get("thresholds", {})

View File

@@ -1,11 +1,48 @@
import os
import re
from typing import Any
from services.runner import run_cmd
def _top_dirs_cmd(path: str, limit: int) -> list[str]:
return ["bash", "-lc", f"du -xhd1 {path} 2>/dev/null | sort -h | tail -n {limit}"]
_ = limit
return ["du", "-x", "-h", "-d", "1", path]
_SIZE_RE = re.compile(r"^\s*([0-9]+(?:\.[0-9]+)?)([KMGTP]?)(i?B?)?$", re.IGNORECASE)
def _size_to_bytes(value: str) -> float:
m = _SIZE_RE.match(value.strip())
if not m:
return -1.0
num = float(m.group(1))
unit = (m.group(2) or "").upper()
mul = {
"": 1,
"K": 1024,
"M": 1024**2,
"G": 1024**3,
"T": 1024**4,
"P": 1024**5,
}.get(unit, 1)
return num * mul
def _format_top_dirs(raw: str, limit: int) -> str:
rows: list[tuple[float, str]] = []
for line in raw.splitlines():
line = line.strip()
if not line:
continue
parts = line.split(maxsplit=1)
if len(parts) != 2:
continue
size, name = parts
rows.append((_size_to_bytes(size), f"{size}\t{name}"))
rows.sort(key=lambda x: x[0])
return "\n".join(line for _sz, line in rows[-max(1, limit):])
async def build_disk_report(cfg: dict[str, Any], mount: str, usage: int) -> str:
@@ -15,24 +52,27 @@ async def build_disk_report(cfg: dict[str, Any], mount: str, usage: int) -> str:
rc, out = await run_cmd(_top_dirs_cmd(mount, limit), timeout=30)
if rc == 0 and out.strip():
top_out = _format_top_dirs(out, limit)
lines.append("")
lines.append("Top directories:")
lines.append(out.strip())
lines.append(top_out)
docker_dir = cfg.get("disk_report", {}).get("docker_dir", "/var/lib/docker")
if docker_dir and os.path.exists(docker_dir):
rc2, out2 = await run_cmd(_top_dirs_cmd(docker_dir, limit), timeout=30)
if rc2 == 0 and out2.strip():
top_out2 = _format_top_dirs(out2, limit)
lines.append("")
lines.append(f"Docker dir: {docker_dir}")
lines.append(out2.strip())
lines.append(top_out2)
logs_dir = cfg.get("disk_report", {}).get("logs_dir", "/var/log")
if logs_dir and os.path.exists(logs_dir):
rc3, out3 = await run_cmd(_top_dirs_cmd(logs_dir, limit), timeout=30)
if rc3 == 0 and out3.strip():
top_out3 = _format_top_dirs(out3, limit)
lines.append("")
lines.append(f"Logs dir: {logs_dir}")
lines.append(out3.strip())
lines.append(top_out3)
return "\n".join(lines)

View File

@@ -1,4 +1,4 @@
import os
import os
import ssl
import subprocess
import psutil
@@ -38,7 +38,9 @@ def _npm_api_base(cfg) -> str | None:
def health(cfg, container_map: dict | None = None) -> str:
lines = ["🩺 Health check\n"]
thresholds = cfg.get("thresholds", {})
disk_warn = int(thresholds.get("disk_warn", 80))
load_warn = float(thresholds.get("load_warn", 2.0))
try:
env = os.environ.copy()
env.update(RESTIC_ENV)
@@ -91,12 +93,13 @@ def health(cfg, container_map: dict | None = None) -> str:
usage, mount = worst_disk_usage()
if usage is None:
lines.append("⚠️ Disk n/a")
elif usage > cfg["thresholds"]["disk_warn"]:
elif usage > disk_warn:
lines.append(f"🟡 Disk {usage}% ({mount})")
else:
lines.append(f"🟢 Disk {usage}% ({mount})")
load = psutil.getloadavg()[0]
lines.append(f"{'🟢' if load < cfg['thresholds']['load_warn'] else '🟡'} Load {load}")
lines.append(f"{'🟢' if load < load_warn else '🟡'} Load {load}")
return "\n".join(lines)

View File

@@ -1,4 +1,5 @@
import asyncio
import logging
import time
from collections import deque
from typing import Awaitable, Callable, Any
@@ -25,6 +26,7 @@ _alert_cfg: dict[str, Any] = {
"last_sent": 0.0,
}
_cfg: dict[str, Any] | None = None
_logger = logging.getLogger("queue")
def _save_stats():
@@ -85,8 +87,18 @@ async def worker():
status = "ok"
try:
await job()
except Exception:
except Exception as e:
status = "err"
_logger.exception("Queue job failed: label=%s", label)
if _cfg:
try:
log_incident(
_cfg,
f"queue_job_failed label={label} error={type(e).__name__}: {e}",
category="queue",
)
except Exception:
pass
finally:
finished_at = time.time()
if _current_meta:

View File

@@ -1,9 +1,13 @@
import json
import os
import threading
import tempfile
from typing import Any, Dict
_PATH = "/var/server-bot/runtime.json"
_STATE: Dict[str, Any] = {}
_LOCK = threading.RLock()
_LOADED = False
def configure(path: str | None):
@@ -13,40 +17,57 @@ def configure(path: str | None):
def _load_from_disk():
global _STATE
global _STATE, _LOADED
if not os.path.exists(_PATH):
_STATE = {}
_LOADED = True
return
try:
with open(_PATH, "r", encoding="utf-8") as f:
_STATE = json.load(f)
except Exception:
_STATE = {}
_LOADED = True
def _save():
os.makedirs(os.path.dirname(_PATH), exist_ok=True)
directory = os.path.dirname(_PATH) or "."
os.makedirs(directory, exist_ok=True)
try:
with open(_PATH, "w", encoding="utf-8") as f:
json.dump(_STATE, f)
fd, tmp_path = tempfile.mkstemp(prefix=".runtime.", suffix=".json", dir=directory)
try:
with os.fdopen(fd, "w", encoding="utf-8") as f:
json.dump(_STATE, f, ensure_ascii=False)
f.flush()
os.fsync(f.fileno())
os.replace(tmp_path, _PATH)
finally:
if os.path.exists(tmp_path):
try:
os.unlink(tmp_path)
except Exception:
pass
except Exception:
pass
def get_state() -> Dict[str, Any]:
if not _STATE:
_load_from_disk()
return _STATE
with _LOCK:
if not _LOADED:
_load_from_disk()
return _STATE
def set_state(key: str, value: Any):
if not _STATE:
_load_from_disk()
_STATE[key] = value
_save()
with _LOCK:
if not _LOADED:
_load_from_disk()
_STATE[key] = value
_save()
def get(key: str, default: Any = None) -> Any:
if not _STATE:
_load_from_disk()
return _STATE.get(key, default)
with _LOCK:
if not _LOADED:
_load_from_disk()
return _STATE.get(key, default)

View File

@@ -1,5 +1,6 @@
import subprocess
import os
import re
def _cmd(cmd: str) -> str:
@@ -82,6 +83,62 @@ def list_disks() -> list[str]:
return disks
def list_md_arrays() -> list[str]:
# Prefer /proc/mdstat: it reliably lists active md arrays
# even when lsblk tree/filters differ across distros.
out = _cmd("cat /proc/mdstat")
arrays: set[str] = set()
for line in out.splitlines():
m = re.match(r"^\s*(md\d+)\s*:", line)
if m:
arrays.add(f"/dev/{m.group(1)}")
if arrays:
return sorted(arrays)
# Fallback for environments where mdstat parsing is unavailable.
out = _cmd("ls -1 /dev/md* 2>/dev/null")
for line in out.splitlines():
dev = line.strip()
if dev and re.match(r"^/dev/md\d+$", dev):
arrays.add(dev)
return sorted(arrays)
def md_array_status(dev: str) -> str:
out = _cmd("cat /proc/mdstat")
if not out or "ERROR:" in out:
return "⚠️ n/a"
name = dev.rsplit("/", 1)[-1]
lines = out.splitlines()
header = None
idx = -1
for i, line in enumerate(lines):
s = line.strip()
if s.startswith(f"{name} :"):
header = s
idx = i
break
if not header:
return "⚠️ not found in /proc/mdstat"
if "inactive" in header:
return "🔴 inactive"
# Typical mdstat health marker: [UU] for healthy mirrors/raid members.
block = [header]
for line in lines[idx + 1:]:
if not line.strip():
break
block.append(line.strip())
block_text = " ".join(block)
if "[U_" in block_text or "[_U" in block_text:
return "🟡 degraded"
return "🟢 active"
def smart_health(dev: str) -> str:
out = _cmd(f"smartctl -H {dev}")
@@ -138,8 +195,9 @@ def smart_last_test(dev: str) -> str:
def disks() -> str:
disks = list_disks()
md_arrays = list_md_arrays()
if not disks:
if not disks and not md_arrays:
return "💽 Disks\n\n❌ No disks found"
lines = ["💽 Disks (SMART)\n"]
@@ -158,6 +216,12 @@ def disks() -> str:
lines.append(f"{icon} {d}{health}, 🌡 {temp}")
if md_arrays:
lines.append("")
lines.append("🧱 RAID (md)")
for md in md_arrays:
lines.append(f"{md}{md_array_status(md)}")
return "\n".join(lines)

View File

@@ -0,0 +1,20 @@
import unittest
from services.config_check import validate_cfg
class ConfigCheckTests(unittest.TestCase):
def test_admin_ids_without_admin_id_is_valid(self):
cfg = {
"telegram": {
"token": "x",
"admin_ids": [1, 2],
}
}
errors, warnings = validate_cfg(cfg)
self.assertEqual(errors, [])
self.assertIsInstance(warnings, list)
if __name__ == "__main__":
unittest.main()

21
tests/test_disk_report.py Normal file
View File

@@ -0,0 +1,21 @@
import unittest
import types
import sys
# Avoid runtime import of real app/aiogram in services.runner.
sys.modules.setdefault("app", types.SimpleNamespace(RESTIC_ENV={}))
from services.disk_report import _top_dirs_cmd
class DiskReportTests(unittest.TestCase):
def test_top_dirs_cmd_uses_exec_args_without_shell(self):
cmd = _top_dirs_cmd("/tmp/path with spaces", 5)
self.assertEqual(cmd[:4], ["du", "-x", "-h", "-d"])
self.assertNotIn("bash", cmd)
self.assertNotIn("-lc", cmd)
self.assertEqual(cmd[-1], "/tmp/path with spaces")
if __name__ == "__main__":
unittest.main()

59
tests/test_queue.py Normal file
View File

@@ -0,0 +1,59 @@
import asyncio
import tempfile
import unittest
from services import runtime_state
from services import queue as queue_service
class QueueTests(unittest.IsolatedAsyncioTestCase):
async def asyncSetUp(self):
self.tmp = tempfile.TemporaryDirectory()
runtime_state.configure(f"{self.tmp.name}/runtime.json")
queue_service._pending.clear() # type: ignore[attr-defined]
queue_service._history.clear() # type: ignore[attr-defined]
queue_service._stats = { # type: ignore[attr-defined]
"processed": 0,
"avg_wait_sec": 0.0,
"avg_runtime_sec": 0.0,
"last_label": "",
"last_finished_at": 0.0,
}
queue_service._cfg = {"incidents": {"enabled": True}} # type: ignore[attr-defined]
async def asyncTearDown(self):
self.tmp.cleanup()
async def test_worker_logs_failed_job_to_incidents(self):
logged = []
def fake_log_incident(cfg, text, category=None):
logged.append((text, category))
orig = queue_service.log_incident
queue_service.log_incident = fake_log_incident
async def boom():
raise RuntimeError("boom")
worker_task = asyncio.create_task(queue_service.worker())
try:
await queue_service.enqueue("broken-job", boom)
await asyncio.wait_for(queue_service._queue.join(), timeout=2.0) # type: ignore[attr-defined]
finally:
worker_task.cancel()
with contextlib.suppress(asyncio.CancelledError):
await worker_task
queue_service.log_incident = orig
self.assertEqual(queue_service._stats.get("processed"), 1) # type: ignore[attr-defined]
self.assertTrue(any("queue_job_failed label=broken-job" in t for t, _c in logged))
self.assertTrue(any(c == "queue" for _t, c in logged))
import contextlib
if __name__ == "__main__":
unittest.main()

View File

@@ -0,0 +1,28 @@
import json
import tempfile
import unittest
from pathlib import Path
from services import runtime_state
class RuntimeStateTests(unittest.TestCase):
def test_set_and_get_persist_between_loads(self):
with tempfile.TemporaryDirectory() as tmp:
path = Path(tmp) / "runtime.json"
runtime_state.configure(str(path))
runtime_state.set_state("foo", {"bar": 1})
self.assertEqual(runtime_state.get("foo"), {"bar": 1})
# Force a fresh in-memory state and load from disk again.
runtime_state._STATE = {} # type: ignore[attr-defined]
runtime_state._LOADED = False # type: ignore[attr-defined]
self.assertEqual(runtime_state.get("foo"), {"bar": 1})
raw = json.loads(path.read_text(encoding="utf-8"))
self.assertEqual(raw.get("foo"), {"bar": 1})
if __name__ == "__main__":
unittest.main()