import asyncio import time from collections import deque from typing import Awaitable, Callable, Any from services import runtime_state _queue: asyncio.Queue = asyncio.Queue() _current_label: str | None = None _current_meta: dict[str, Any] | None = None _pending: deque[tuple[str, float]] = deque() _stats: dict[str, Any] = runtime_state.get("queue_stats", {}) or { "processed": 0, "avg_wait_sec": 0.0, "avg_runtime_sec": 0.0, "last_label": "", "last_finished_at": 0.0, } def _save_stats(): runtime_state.set_state("queue_stats", _stats) async def enqueue(label: str, job: Callable[[], Awaitable[None]]) -> int: enqueued_at = time.time() await _queue.put((label, job, enqueued_at)) _pending.append((label, enqueued_at)) return len(_pending) async def worker(): global _current_label, _current_meta while True: label, job, enqueued_at = await _queue.get() if _pending: if _pending[0] == (label, enqueued_at): _pending.popleft() else: try: _pending.remove((label, enqueued_at)) except ValueError: pass _current_label = label _current_meta = {"enqueued_at": enqueued_at, "started_at": time.time()} try: await job() finally: finished_at = time.time() if _current_meta: wait_sec = max(0.0, _current_meta["started_at"] - _current_meta["enqueued_at"]) runtime_sec = max(0.0, finished_at - _current_meta["started_at"]) n_prev = int(_stats.get("processed", 0)) _stats["processed"] = n_prev + 1 _stats["avg_wait_sec"] = ( (_stats.get("avg_wait_sec", 0.0) * n_prev) + wait_sec ) / _stats["processed"] _stats["avg_runtime_sec"] = ( (_stats.get("avg_runtime_sec", 0.0) * n_prev) + runtime_sec ) / _stats["processed"] _stats["last_label"] = label _stats["last_finished_at"] = finished_at _save_stats() _current_label = None _current_meta = None _queue.task_done() def format_status() -> str: pending = list(_pending) lines = ["๐Ÿงพ Queue"] lines.append(f"๐Ÿ”„ Running: {_current_label or 'idle'}") lines.append(f"โณ Pending: {len(pending)}") if pending: preview = ", ".join([p[0] for p in pending[:5]]) lines.append(f"โžก๏ธ Next: {preview}") if _stats.get("processed"): lines.append( f"๐Ÿ“ˆ Done: {_stats.get('processed')} | " f"avg wait {int(_stats.get('avg_wait_sec', 0))}s | " f"avg run {int(_stats.get('avg_runtime_sec', 0))}s" ) return "\n".join(lines) def format_details(limit: int = 10) -> str: now = time.time() lines = ["๐Ÿงพ Queue details"] if _current_label: started_at = _current_meta.get("started_at") if _current_meta else None runtime = f"{int(now - started_at)}s" if started_at else "n/a" lines.append(f"๐Ÿ”„ Running: {_current_label} ({runtime})") else: lines.append("๐Ÿ”„ Running: idle") pending = list(_pending) lines.append(f"โณ Pending: {len(pending)}") if pending: lines.append("๐Ÿ”ข Position | Label | Wait") for i, (label, enqueued_at) in enumerate(pending[:limit], start=1): wait = int(now - enqueued_at) lines.append(f"{i:>3} | {label} | {wait}s") if _stats.get("processed"): lines.append("") lines.append( "๐Ÿ“ˆ Stats: " f"{_stats.get('processed')} done, " f"avg wait {int(_stats.get('avg_wait_sec', 0))}s, " f"avg run {int(_stats.get('avg_runtime_sec', 0))}s" ) last_label = _stats.get("last_label") if last_label: lines.append(f"Last: {last_label}") return "\n".join(lines)