import asyncio import time from collections import deque from typing import Awaitable, Callable, Any _queue: asyncio.Queue = asyncio.Queue() _current_label: str | None = None _current_meta: dict[str, Any] | None = None _pending: deque[tuple[str, float]] = deque() async def enqueue(label: str, job: Callable[[], Awaitable[None]]) -> int: enqueued_at = time.time() await _queue.put((label, job, enqueued_at)) _pending.append((label, enqueued_at)) return len(_pending) async def worker(): global _current_label, _current_meta while True: label, job, enqueued_at = await _queue.get() if _pending: if _pending[0] == (label, enqueued_at): _pending.popleft() else: try: _pending.remove((label, enqueued_at)) except ValueError: pass _current_label = label _current_meta = {"enqueued_at": enqueued_at, "started_at": time.time()} try: await job() finally: _current_label = None _current_meta = None _queue.task_done() def format_status() -> str: pending = list(_pending) lines = ["๐Ÿงพ Queue"] lines.append(f"๐Ÿ”„ Running: {_current_label or 'idle'}") lines.append(f"โณ Pending: {len(pending)}") if pending: preview = ", ".join([p[0] for p in pending[:5]]) lines.append(f"โžก๏ธ Next: {preview}") return "\n".join(lines) def format_details(limit: int = 10) -> str: now = time.time() lines = ["๐Ÿงพ Queue details"] if _current_label: started_at = _current_meta.get("started_at") if _current_meta else None runtime = f"{int(now - started_at)}s" if started_at else "n/a" lines.append(f"๐Ÿ”„ Running: {_current_label} ({runtime})") else: lines.append("๐Ÿ”„ Running: idle") pending = list(_pending) lines.append(f"โณ Pending: {len(pending)}") if pending: lines.append("๐Ÿ”ข Position | Label | Wait") for i, (label, enqueued_at) in enumerate(pending[:limit], start=1): wait = int(now - enqueued_at) lines.append(f"{i:>3} | {label} | {wait}s") return "\n".join(lines)