Add queue status command

This commit is contained in:
2026-02-07 22:57:36 +03:00
parent 77801e9123
commit 8d5eda3244
4 changed files with 29 additions and 7 deletions

View File

@@ -3,17 +3,32 @@ from typing import Awaitable, Callable
_queue: asyncio.Queue = asyncio.Queue()
_current_label: str | None = None
async def enqueue(job: Callable[[], Awaitable[None]]) -> int:
await _queue.put(job)
async def enqueue(label: str, job: Callable[[], Awaitable[None]]) -> int:
await _queue.put((label, job))
return _queue.qsize()
async def worker():
global _current_label
while True:
job = await _queue.get()
label, job = await _queue.get()
_current_label = label
try:
await job()
finally:
_current_label = None
_queue.task_done()
def format_status() -> str:
pending = [label for label, _ in list(_queue._queue)]
lines = ["🧾 Queue"]
lines.append(f"🔄 Running: {_current_label or 'idle'}")
lines.append(f"⏳ Pending: {len(pending)}")
if pending:
preview = ", ".join(pending[:5])
lines.append(f"➡️ Next: {preview}")
return "\n".join(lines)