Add lightweight unittest coverage for stability fixes

This commit is contained in:
2026-02-15 01:25:11 +03:00
parent 7c56430f32
commit 678332e6d0
4 changed files with 128 additions and 0 deletions

View File

@@ -0,0 +1,20 @@
import unittest
from services.config_check import validate_cfg
class ConfigCheckTests(unittest.TestCase):
def test_admin_ids_without_admin_id_is_valid(self):
cfg = {
"telegram": {
"token": "x",
"admin_ids": [1, 2],
}
}
errors, warnings = validate_cfg(cfg)
self.assertEqual(errors, [])
self.assertIsInstance(warnings, list)
if __name__ == "__main__":
unittest.main()

21
tests/test_disk_report.py Normal file
View File

@@ -0,0 +1,21 @@
import unittest
import types
import sys
# Avoid runtime import of real app/aiogram in services.runner.
sys.modules.setdefault("app", types.SimpleNamespace(RESTIC_ENV={}))
from services.disk_report import _top_dirs_cmd
class DiskReportTests(unittest.TestCase):
def test_top_dirs_cmd_uses_exec_args_without_shell(self):
cmd = _top_dirs_cmd("/tmp/path with spaces", 5)
self.assertEqual(cmd[:4], ["du", "-x", "-h", "-d"])
self.assertNotIn("bash", cmd)
self.assertNotIn("-lc", cmd)
self.assertEqual(cmd[-1], "/tmp/path with spaces")
if __name__ == "__main__":
unittest.main()

59
tests/test_queue.py Normal file
View File

@@ -0,0 +1,59 @@
import asyncio
import tempfile
import unittest
from services import runtime_state
from services import queue as queue_service
class QueueTests(unittest.IsolatedAsyncioTestCase):
async def asyncSetUp(self):
self.tmp = tempfile.TemporaryDirectory()
runtime_state.configure(f"{self.tmp.name}/runtime.json")
queue_service._pending.clear() # type: ignore[attr-defined]
queue_service._history.clear() # type: ignore[attr-defined]
queue_service._stats = { # type: ignore[attr-defined]
"processed": 0,
"avg_wait_sec": 0.0,
"avg_runtime_sec": 0.0,
"last_label": "",
"last_finished_at": 0.0,
}
queue_service._cfg = {"incidents": {"enabled": True}} # type: ignore[attr-defined]
async def asyncTearDown(self):
self.tmp.cleanup()
async def test_worker_logs_failed_job_to_incidents(self):
logged = []
def fake_log_incident(cfg, text, category=None):
logged.append((text, category))
orig = queue_service.log_incident
queue_service.log_incident = fake_log_incident
async def boom():
raise RuntimeError("boom")
worker_task = asyncio.create_task(queue_service.worker())
try:
await queue_service.enqueue("broken-job", boom)
await asyncio.wait_for(queue_service._queue.join(), timeout=2.0) # type: ignore[attr-defined]
finally:
worker_task.cancel()
with contextlib.suppress(asyncio.CancelledError):
await worker_task
queue_service.log_incident = orig
self.assertEqual(queue_service._stats.get("processed"), 1) # type: ignore[attr-defined]
self.assertTrue(any("queue_job_failed label=broken-job" in t for t, _c in logged))
self.assertTrue(any(c == "queue" for _t, c in logged))
import contextlib
if __name__ == "__main__":
unittest.main()

View File

@@ -0,0 +1,28 @@
import json
import tempfile
import unittest
from pathlib import Path
from services import runtime_state
class RuntimeStateTests(unittest.TestCase):
def test_set_and_get_persist_between_loads(self):
with tempfile.TemporaryDirectory() as tmp:
path = Path(tmp) / "runtime.json"
runtime_state.configure(str(path))
runtime_state.set_state("foo", {"bar": 1})
self.assertEqual(runtime_state.get("foo"), {"bar": 1})
# Force a fresh in-memory state and load from disk again.
runtime_state._STATE = {} # type: ignore[attr-defined]
runtime_state._LOADED = False # type: ignore[attr-defined]
self.assertEqual(runtime_state.get("foo"), {"bar": 1})
raw = json.loads(path.read_text(encoding="utf-8"))
self.assertEqual(raw.get("foo"), {"bar": 1})
if __name__ == "__main__":
unittest.main()