import asyncio import logging import time from collections import deque from typing import Awaitable, Callable, Any from services import runtime_state from services.incidents import log_incident _queue: asyncio.Queue = asyncio.Queue() _current_label: str | None = None _current_meta: dict[str, Any] | None = None _pending: deque[tuple[str, float]] = deque() _stats: dict[str, Any] = runtime_state.get("queue_stats", {}) or { "processed": 0, "avg_wait_sec": 0.0, "avg_runtime_sec": 0.0, "last_label": "", "last_finished_at": 0.0, } _history: deque[dict[str, Any]] = deque(runtime_state.get("queue_history", []) or [], maxlen=50) _alert_cfg: dict[str, Any] = { "max_pending": None, "avg_wait": None, "cooldown": 300, "last_sent": 0.0, } _cfg: dict[str, Any] | None = None _logger = logging.getLogger("queue") def _save_stats(): runtime_state.set_state("queue_stats", _stats) runtime_state.set_state("queue_history", list(_history)) def configure(queue_cfg: dict[str, Any], cfg: dict[str, Any]): global _cfg _cfg = cfg _alert_cfg["max_pending"] = queue_cfg.get("max_pending_alert") _alert_cfg["avg_wait"] = queue_cfg.get("avg_wait_alert") _alert_cfg["cooldown"] = queue_cfg.get("cooldown_sec", 300) def _check_congestion(pending_count: int, avg_wait: float | None): max_pending = _alert_cfg.get("max_pending") avg_wait_thr = _alert_cfg.get("avg_wait") cooldown = _alert_cfg.get("cooldown", 300) now = time.time() if now - _alert_cfg.get("last_sent", 0) < cooldown: return reason = None if max_pending and pending_count >= max_pending: reason = f"pending={pending_count} >= {max_pending}" if avg_wait_thr and avg_wait is not None and avg_wait >= avg_wait_thr: reason = reason or f"avg_wait={avg_wait:.1f}s >= {avg_wait_thr}s" if reason and _cfg: try: log_incident(_cfg, f"queue_congested {reason}", category="queue") except Exception: pass _alert_cfg["last_sent"] = now async def enqueue(label: str, job: Callable[[], Awaitable[None]]) -> int: enqueued_at = time.time() await _queue.put((label, job, enqueued_at)) _pending.append((label, enqueued_at)) _check_congestion(len(_pending), None) return len(_pending) async def worker(): global _current_label, _current_meta while True: label, job, enqueued_at = await _queue.get() if _pending: if _pending[0] == (label, enqueued_at): _pending.popleft() else: try: _pending.remove((label, enqueued_at)) except ValueError: pass _current_label = label _current_meta = {"enqueued_at": enqueued_at, "started_at": time.time()} status = "ok" try: await job() except Exception as e: status = "err" _logger.exception("Queue job failed: label=%s", label) if _cfg: try: log_incident( _cfg, f"queue_job_failed label={label} error={type(e).__name__}: {e}", category="queue", ) except Exception: pass finally: finished_at = time.time() if _current_meta: wait_sec = max(0.0, _current_meta["started_at"] - _current_meta["enqueued_at"]) runtime_sec = max(0.0, finished_at - _current_meta["started_at"]) n_prev = int(_stats.get("processed", 0)) _stats["processed"] = n_prev + 1 _stats["avg_wait_sec"] = ( (_stats.get("avg_wait_sec", 0.0) * n_prev) + wait_sec ) / _stats["processed"] _stats["avg_runtime_sec"] = ( (_stats.get("avg_runtime_sec", 0.0) * n_prev) + runtime_sec ) / _stats["processed"] _stats["last_label"] = label _stats["last_finished_at"] = finished_at _history.appendleft( { "label": label, "wait_sec": int(wait_sec), "runtime_sec": int(runtime_sec), "finished_at": int(finished_at), "status": status, } ) _save_stats() _check_congestion(len(_pending), _stats.get("avg_wait_sec")) _current_label = None _current_meta = None _queue.task_done() def format_status() -> str: pending = list(_pending) lines = ["๐Ÿงพ Queue"] lines.append(f"๐Ÿ”„ Running: {_current_label or 'idle'}") lines.append(f"โณ Pending: {len(pending)}") if pending: preview = ", ".join([p[0] for p in pending[:5]]) lines.append(f"โžก๏ธ Next: {preview}") if _stats.get("processed"): lines.append( f"๐Ÿ“ˆ Done: {_stats.get('processed')} | " f"avg wait {int(_stats.get('avg_wait_sec', 0))}s | " f"avg run {int(_stats.get('avg_runtime_sec', 0))}s" ) return "\n".join(lines) def format_details(limit: int = 10) -> str: now = time.time() lines = ["๐Ÿงพ Queue details"] if _current_label: started_at = _current_meta.get("started_at") if _current_meta else None runtime = f"{int(now - started_at)}s" if started_at else "n/a" lines.append(f"๐Ÿ”„ Running: {_current_label} ({runtime})") else: lines.append("๐Ÿ”„ Running: idle") pending = list(_pending) lines.append(f"โณ Pending: {len(pending)}") if pending: lines.append("๐Ÿ”ข Position | Label | Wait") for i, (label, enqueued_at) in enumerate(pending[:limit], start=1): wait = int(now - enqueued_at) lines.append(f"{i:>3} | {label} | {wait}s") if _stats.get("processed"): lines.append("") lines.append( "๐Ÿ“ˆ Stats: " f"{_stats.get('processed')} done, " f"avg wait {int(_stats.get('avg_wait_sec', 0))}s, " f"avg run {int(_stats.get('avg_runtime_sec', 0))}s" ) last_label = _stats.get("last_label") if last_label: lines.append(f"Last: {last_label}") if _history: lines.append("") lines.append("๐Ÿ—‚ Last jobs:") for item in list(_history)[:5]: t = time.strftime("%H:%M:%S", time.localtime(item["finished_at"])) lines.append( f"- {t} {item['label']} {item['status']} " f"(wait {item['wait_sec']}s, run {item['runtime_sec']}s)" ) return "\n".join(lines) def format_history(limit: int = 20) -> str: lines = ["๐Ÿ—‚ Queue history"] if not _history: lines.append("(empty)") return "\n".join(lines) for item in list(_history)[:limit]: t = time.strftime("%m-%d %H:%M:%S", time.localtime(item["finished_at"])) lines.append( f"{t} {item['label']} {item['status']} " f"(wait {item['wait_sec']}s, run {item['runtime_sec']}s)" ) return "\n".join(lines) def get_history_raw() -> list[dict[str, Any]]: return list(_history) def get_stats() -> dict[str, Any]: return dict(_stats)