Compare commits

...

2 Commits

Author SHA1 Message Date
8d5eda3244 Add queue status command 2026-02-07 22:57:36 +03:00
77801e9123 Queue long-running backup and upload 2026-02-07 22:55:34 +03:00
5 changed files with 60 additions and 13 deletions

View File

@@ -9,6 +9,7 @@ from auth import is_admin_msg
from keyboards import artifacts_kb from keyboards import artifacts_kb
from lock_utils import acquire_lock, release_lock from lock_utils import acquire_lock, release_lock
from services.artifacts import artifact_last from services.artifacts import artifact_last
from services.queue import enqueue
from services.runner import run_cmd from services.runner import run_cmd
@@ -40,20 +41,21 @@ async def cmd_artifacts_last(msg: Message):
async def cmd_artifacts_upload(msg: Message): async def cmd_artifacts_upload(msg: Message):
if not acquire_lock("artifacts"): async def job():
await msg.answer("⛔ Upload уже идёт", reply_markup=artifacts_kb) if not acquire_lock("artifacts"):
return await msg.answer("⚠️ Upload уже идёт", reply_markup=artifacts_kb)
return
await msg.answer("📤 Upload…", reply_markup=artifacts_kb) await msg.answer("📤 Upload…", reply_markup=artifacts_kb)
async def worker():
try: try:
rc, out = await run_cmd(["sudo", "/usr/local/bin/backup.py", "artifact-upload"], timeout=12 * 3600) rc, out = await run_cmd(["sudo", "/usr/local/bin/backup.py", "artifact-upload"], timeout=12 * 3600)
await msg.answer(("✅ OK\n" if rc == 0 else "❌ FAIL\n") + out, reply_markup=artifacts_kb) await msg.answer(("✅ OK\n" if rc == 0 else "❌ FAIL\n") + out, reply_markup=artifacts_kb)
finally: finally:
release_lock("artifacts") release_lock("artifacts")
asyncio.create_task(worker()) pos = await enqueue("artifact-upload", job)
await msg.answer(f"🕓 Upload queued (#{pos})", reply_markup=artifacts_kb)
@dp.message(F.text == "🧉 Status") @dp.message(F.text == "🧉 Status")

View File

@@ -7,6 +7,7 @@ from app import dp
from auth import is_admin_msg from auth import is_admin_msg
from keyboards import backup_kb from keyboards import backup_kb
from lock_utils import acquire_lock, release_lock from lock_utils import acquire_lock, release_lock
from services.queue import enqueue, format_status
from services.backup import backup_badge, restore_help from services.backup import backup_badge, restore_help
from services.runner import run_cmd from services.runner import run_cmd
@@ -108,20 +109,21 @@ async def cmd_backup_status(msg: Message):
async def cmd_backup_now(msg: Message): async def cmd_backup_now(msg: Message):
if not acquire_lock("backup"): async def job():
await msg.answer("⚠️ Backup уже выполняется", reply_markup=backup_kb) if not acquire_lock("backup"):
return await msg.answer("⚠️ Backup уже выполняется", reply_markup=backup_kb)
return
await msg.answer("▶️ Backup запущен", reply_markup=backup_kb) await msg.answer("▶️ Backup запущен", reply_markup=backup_kb)
async def worker():
try: try:
rc, out = await run_cmd(["sudo", "/usr/local/bin/backup.py", "restic-backup"], timeout=6 * 3600) rc, out = await run_cmd(["sudo", "/usr/local/bin/backup.py", "restic-backup"], timeout=6 * 3600)
await msg.answer(("✅ OK\n" if rc == 0 else "❌ FAIL\n") + out, reply_markup=backup_kb) await msg.answer(("✅ OK\n" if rc == 0 else "❌ FAIL\n") + out, reply_markup=backup_kb)
finally: finally:
release_lock("backup") release_lock("backup")
asyncio.create_task(worker()) pos = await enqueue("backup", job)
await msg.answer(f"🕓 Backup queued (#{pos})", reply_markup=backup_kb)
async def cmd_last_snapshot(msg: Message): async def cmd_last_snapshot(msg: Message):
@@ -188,6 +190,12 @@ async def ls(msg: Message):
await cmd_last_snapshot(msg) await cmd_last_snapshot(msg)
@dp.message(F.text == "🧾 Queue")
async def qb(msg: Message):
if is_admin_msg(msg):
await msg.answer(format_status(), reply_markup=backup_kb)
@dp.message(F.text == "▶️ Run backup") @dp.message(F.text == "▶️ Run backup")
async def br(msg: Message): async def br(msg: Message):
if is_admin_msg(msg): if is_admin_msg(msg):

View File

@@ -29,7 +29,8 @@ backup_kb = ReplyKeyboardMarkup(
keyboard=[ keyboard=[
[KeyboardButton(text="📦 Status"), KeyboardButton(text="📦 Last snapshot")], [KeyboardButton(text="📦 Status"), KeyboardButton(text="📦 Last snapshot")],
[KeyboardButton(text="📊 Repo stats"), KeyboardButton(text="🧯 Restore help")], [KeyboardButton(text="📊 Repo stats"), KeyboardButton(text="🧯 Restore help")],
[KeyboardButton(text="▶️ Run backup"), KeyboardButton(text="⬅️ Назад")], [KeyboardButton(text="▶️ Run backup"), KeyboardButton(text="🧾 Queue")],
[KeyboardButton(text="⬅️ Назад")],
], ],
resize_keyboard=True, resize_keyboard=True,
) )

View File

@@ -5,6 +5,7 @@ from app import bot, dp, cfg, ADMIN_ID
from keyboards import menu_kb from keyboards import menu_kb
from services.docker import discover_containers, docker_watchdog from services.docker import discover_containers, docker_watchdog
from services.alerts import monitor_resources, monitor_smart from services.alerts import monitor_resources, monitor_smart
from services.queue import worker as queue_worker
from services.notify import notify from services.notify import notify
import state import state
import handlers.menu import handlers.menu
@@ -34,6 +35,7 @@ async def main():
asyncio.create_task(monitor_resources(cfg, notify, bot, ADMIN_ID)) asyncio.create_task(monitor_resources(cfg, notify, bot, ADMIN_ID))
if cfg.get("alerts", {}).get("smart_enabled", True): if cfg.get("alerts", {}).get("smart_enabled", True):
asyncio.create_task(monitor_smart(cfg, notify, bot, ADMIN_ID)) asyncio.create_task(monitor_smart(cfg, notify, bot, ADMIN_ID))
asyncio.create_task(queue_worker())
await notify_start() await notify_start()
await dp.start_polling(bot) await dp.start_polling(bot)

34
services/queue.py Normal file
View File

@@ -0,0 +1,34 @@
import asyncio
from typing import Awaitable, Callable
_queue: asyncio.Queue = asyncio.Queue()
_current_label: str | None = None
async def enqueue(label: str, job: Callable[[], Awaitable[None]]) -> int:
await _queue.put((label, job))
return _queue.qsize()
async def worker():
global _current_label
while True:
label, job = await _queue.get()
_current_label = label
try:
await job()
finally:
_current_label = None
_queue.task_done()
def format_status() -> str:
pending = [label for label, _ in list(_queue._queue)]
lines = ["🧾 Queue"]
lines.append(f"🔄 Running: {_current_label or 'idle'}")
lines.append(f"⏳ Pending: {len(pending)}")
if pending:
preview = ", ".join(pending[:5])
lines.append(f"➡️ Next: {preview}")
return "\n".join(lines)