import asyncio import time from typing import Awaitable, Callable, Any _queue: asyncio.Queue = asyncio.Queue() _current_label: str | None = None _current_meta: dict[str, Any] | None = None async def enqueue(label: str, job: Callable[[], Awaitable[None]]) -> int: await _queue.put((label, job, time.time())) return _queue.qsize() async def worker(): global _current_label, _current_meta while True: label, job, enqueued_at = await _queue.get() _current_label = label _current_meta = {"enqueued_at": enqueued_at, "started_at": time.time()} try: await job() finally: _current_label = None _current_meta = None _queue.task_done() def format_status() -> str: pending = list(_queue._queue) lines = ["๐Ÿงพ Queue"] lines.append(f"๐Ÿ”„ Running: {_current_label or 'idle'}") lines.append(f"โณ Pending: {len(pending)}") if pending: preview = ", ".join([p[0] for p in pending[:5]]) lines.append(f"โžก๏ธ Next: {preview}") return "\n".join(lines) def format_details(limit: int = 10) -> str: now = time.time() lines = ["๐Ÿงพ Queue details"] if _current_label: started_at = _current_meta.get("started_at") if _current_meta else None runtime = f"{int(now - started_at)}s" if started_at else "n/a" lines.append(f"๐Ÿ”„ Running: {_current_label} ({runtime})") else: lines.append("๐Ÿ”„ Running: idle") pending = list(_queue._queue) lines.append(f"โณ Pending: {len(pending)}") if pending: lines.append("๐Ÿ”ข Position | Label | Wait") for i, (label, _job, enqueued_at) in enumerate(pending[:limit], start=1): wait = int(now - enqueued_at) lines.append(f"{i:>3} | {label} | {wait}s") return "\n".join(lines)