73 lines
2.2 KiB
Python
73 lines
2.2 KiB
Python
import asyncio
|
|
import time
|
|
from collections import deque
|
|
from typing import Awaitable, Callable, Any
|
|
|
|
|
|
_queue: asyncio.Queue = asyncio.Queue()
|
|
_current_label: str | None = None
|
|
_current_meta: dict[str, Any] | None = None
|
|
_pending: deque[tuple[str, float]] = deque()
|
|
|
|
|
|
async def enqueue(label: str, job: Callable[[], Awaitable[None]]) -> int:
|
|
enqueued_at = time.time()
|
|
await _queue.put((label, job, enqueued_at))
|
|
_pending.append((label, enqueued_at))
|
|
return len(_pending)
|
|
|
|
|
|
async def worker():
|
|
global _current_label, _current_meta
|
|
while True:
|
|
label, job, enqueued_at = await _queue.get()
|
|
if _pending:
|
|
if _pending[0] == (label, enqueued_at):
|
|
_pending.popleft()
|
|
else:
|
|
try:
|
|
_pending.remove((label, enqueued_at))
|
|
except ValueError:
|
|
pass
|
|
_current_label = label
|
|
_current_meta = {"enqueued_at": enqueued_at, "started_at": time.time()}
|
|
try:
|
|
await job()
|
|
finally:
|
|
_current_label = None
|
|
_current_meta = None
|
|
_queue.task_done()
|
|
|
|
|
|
def format_status() -> str:
|
|
pending = list(_pending)
|
|
lines = ["?? Queue"]
|
|
lines.append(f"?? Running: {_current_label or 'idle'}")
|
|
lines.append(f"? Pending: {len(pending)}")
|
|
if pending:
|
|
preview = ", ".join([p[0] for p in pending[:5]])
|
|
lines.append(f"?? Next: {preview}")
|
|
return "
|
|
".join(lines)
|
|
|
|
|
|
def format_details(limit: int = 10) -> str:
|
|
now = time.time()
|
|
lines = ["?? Queue details"]
|
|
if _current_label:
|
|
started_at = _current_meta.get("started_at") if _current_meta else None
|
|
runtime = f"{int(now - started_at)}s" if started_at else "n/a"
|
|
lines.append(f"?? Running: {_current_label} ({runtime})")
|
|
else:
|
|
lines.append("?? Running: idle")
|
|
|
|
pending = list(_pending)
|
|
lines.append(f"? Pending: {len(pending)}")
|
|
if pending:
|
|
lines.append("?? Position | Label | Wait")
|
|
for i, (label, enqueued_at) in enumerate(pending[:limit], start=1):
|
|
wait = int(now - enqueued_at)
|
|
lines.append(f"{i:>3} | {label} | {wait}s")
|
|
return "
|
|
".join(lines)
|