60 lines
1.9 KiB
Python
60 lines
1.9 KiB
Python
import asyncio
|
|
import tempfile
|
|
import unittest
|
|
|
|
from services import runtime_state
|
|
from services import queue as queue_service
|
|
|
|
|
|
class QueueTests(unittest.IsolatedAsyncioTestCase):
|
|
async def asyncSetUp(self):
|
|
self.tmp = tempfile.TemporaryDirectory()
|
|
runtime_state.configure(f"{self.tmp.name}/runtime.json")
|
|
|
|
queue_service._pending.clear() # type: ignore[attr-defined]
|
|
queue_service._history.clear() # type: ignore[attr-defined]
|
|
queue_service._stats = { # type: ignore[attr-defined]
|
|
"processed": 0,
|
|
"avg_wait_sec": 0.0,
|
|
"avg_runtime_sec": 0.0,
|
|
"last_label": "",
|
|
"last_finished_at": 0.0,
|
|
}
|
|
queue_service._cfg = {"incidents": {"enabled": True}} # type: ignore[attr-defined]
|
|
|
|
async def asyncTearDown(self):
|
|
self.tmp.cleanup()
|
|
|
|
async def test_worker_logs_failed_job_to_incidents(self):
|
|
logged = []
|
|
|
|
def fake_log_incident(cfg, text, category=None):
|
|
logged.append((text, category))
|
|
|
|
orig = queue_service.log_incident
|
|
queue_service.log_incident = fake_log_incident
|
|
|
|
async def boom():
|
|
raise RuntimeError("boom")
|
|
|
|
worker_task = asyncio.create_task(queue_service.worker())
|
|
try:
|
|
await queue_service.enqueue("broken-job", boom)
|
|
await asyncio.wait_for(queue_service._queue.join(), timeout=2.0) # type: ignore[attr-defined]
|
|
finally:
|
|
worker_task.cancel()
|
|
with contextlib.suppress(asyncio.CancelledError):
|
|
await worker_task
|
|
queue_service.log_incident = orig
|
|
|
|
self.assertEqual(queue_service._stats.get("processed"), 1) # type: ignore[attr-defined]
|
|
self.assertTrue(any("queue_job_failed label=broken-job" in t for t, _c in logged))
|
|
self.assertTrue(any(c == "queue" for _t, c in logged))
|
|
|
|
|
|
import contextlib
|
|
|
|
|
|
if __name__ == "__main__":
|
|
unittest.main()
|