Queue long-running backup and upload
This commit is contained in:
@@ -9,6 +9,7 @@ from auth import is_admin_msg
|
|||||||
from keyboards import artifacts_kb
|
from keyboards import artifacts_kb
|
||||||
from lock_utils import acquire_lock, release_lock
|
from lock_utils import acquire_lock, release_lock
|
||||||
from services.artifacts import artifact_last
|
from services.artifacts import artifact_last
|
||||||
|
from services.queue import enqueue
|
||||||
from services.runner import run_cmd
|
from services.runner import run_cmd
|
||||||
|
|
||||||
|
|
||||||
@@ -40,20 +41,21 @@ async def cmd_artifacts_last(msg: Message):
|
|||||||
|
|
||||||
|
|
||||||
async def cmd_artifacts_upload(msg: Message):
|
async def cmd_artifacts_upload(msg: Message):
|
||||||
|
async def job():
|
||||||
if not acquire_lock("artifacts"):
|
if not acquire_lock("artifacts"):
|
||||||
await msg.answer("⛔ Upload уже идёт", reply_markup=artifacts_kb)
|
await msg.answer("⚠️ Upload уже идёт", reply_markup=artifacts_kb)
|
||||||
return
|
return
|
||||||
|
|
||||||
await msg.answer("📤 Upload…", reply_markup=artifacts_kb)
|
await msg.answer("📤 Upload…", reply_markup=artifacts_kb)
|
||||||
|
|
||||||
async def worker():
|
|
||||||
try:
|
try:
|
||||||
rc, out = await run_cmd(["sudo", "/usr/local/bin/backup.py", "artifact-upload"], timeout=12 * 3600)
|
rc, out = await run_cmd(["sudo", "/usr/local/bin/backup.py", "artifact-upload"], timeout=12 * 3600)
|
||||||
await msg.answer(("✅ OK\n" if rc == 0 else "❌ FAIL\n") + out, reply_markup=artifacts_kb)
|
await msg.answer(("✅ OK\n" if rc == 0 else "❌ FAIL\n") + out, reply_markup=artifacts_kb)
|
||||||
finally:
|
finally:
|
||||||
release_lock("artifacts")
|
release_lock("artifacts")
|
||||||
|
|
||||||
asyncio.create_task(worker())
|
pos = await enqueue(job)
|
||||||
|
await msg.answer(f"🕓 Upload queued (#{pos})", reply_markup=artifacts_kb)
|
||||||
|
|
||||||
|
|
||||||
@dp.message(F.text == "🧉 Status")
|
@dp.message(F.text == "🧉 Status")
|
||||||
|
|||||||
@@ -7,6 +7,7 @@ from app import dp
|
|||||||
from auth import is_admin_msg
|
from auth import is_admin_msg
|
||||||
from keyboards import backup_kb
|
from keyboards import backup_kb
|
||||||
from lock_utils import acquire_lock, release_lock
|
from lock_utils import acquire_lock, release_lock
|
||||||
|
from services.queue import enqueue
|
||||||
from services.backup import backup_badge, restore_help
|
from services.backup import backup_badge, restore_help
|
||||||
from services.runner import run_cmd
|
from services.runner import run_cmd
|
||||||
|
|
||||||
@@ -108,20 +109,21 @@ async def cmd_backup_status(msg: Message):
|
|||||||
|
|
||||||
|
|
||||||
async def cmd_backup_now(msg: Message):
|
async def cmd_backup_now(msg: Message):
|
||||||
|
async def job():
|
||||||
if not acquire_lock("backup"):
|
if not acquire_lock("backup"):
|
||||||
await msg.answer("⚠️ Backup уже выполняется", reply_markup=backup_kb)
|
await msg.answer("⚠️ Backup уже выполняется", reply_markup=backup_kb)
|
||||||
return
|
return
|
||||||
|
|
||||||
await msg.answer("▶️ Backup запущен", reply_markup=backup_kb)
|
await msg.answer("▶️ Backup запущен", reply_markup=backup_kb)
|
||||||
|
|
||||||
async def worker():
|
|
||||||
try:
|
try:
|
||||||
rc, out = await run_cmd(["sudo", "/usr/local/bin/backup.py", "restic-backup"], timeout=6 * 3600)
|
rc, out = await run_cmd(["sudo", "/usr/local/bin/backup.py", "restic-backup"], timeout=6 * 3600)
|
||||||
await msg.answer(("✅ OK\n" if rc == 0 else "❌ FAIL\n") + out, reply_markup=backup_kb)
|
await msg.answer(("✅ OK\n" if rc == 0 else "❌ FAIL\n") + out, reply_markup=backup_kb)
|
||||||
finally:
|
finally:
|
||||||
release_lock("backup")
|
release_lock("backup")
|
||||||
|
|
||||||
asyncio.create_task(worker())
|
pos = await enqueue(job)
|
||||||
|
await msg.answer(f"🕓 Backup queued (#{pos})", reply_markup=backup_kb)
|
||||||
|
|
||||||
|
|
||||||
async def cmd_last_snapshot(msg: Message):
|
async def cmd_last_snapshot(msg: Message):
|
||||||
|
|||||||
2
main.py
2
main.py
@@ -5,6 +5,7 @@ from app import bot, dp, cfg, ADMIN_ID
|
|||||||
from keyboards import menu_kb
|
from keyboards import menu_kb
|
||||||
from services.docker import discover_containers, docker_watchdog
|
from services.docker import discover_containers, docker_watchdog
|
||||||
from services.alerts import monitor_resources, monitor_smart
|
from services.alerts import monitor_resources, monitor_smart
|
||||||
|
from services.queue import worker as queue_worker
|
||||||
from services.notify import notify
|
from services.notify import notify
|
||||||
import state
|
import state
|
||||||
import handlers.menu
|
import handlers.menu
|
||||||
@@ -34,6 +35,7 @@ async def main():
|
|||||||
asyncio.create_task(monitor_resources(cfg, notify, bot, ADMIN_ID))
|
asyncio.create_task(monitor_resources(cfg, notify, bot, ADMIN_ID))
|
||||||
if cfg.get("alerts", {}).get("smart_enabled", True):
|
if cfg.get("alerts", {}).get("smart_enabled", True):
|
||||||
asyncio.create_task(monitor_smart(cfg, notify, bot, ADMIN_ID))
|
asyncio.create_task(monitor_smart(cfg, notify, bot, ADMIN_ID))
|
||||||
|
asyncio.create_task(queue_worker())
|
||||||
await notify_start()
|
await notify_start()
|
||||||
await dp.start_polling(bot)
|
await dp.start_polling(bot)
|
||||||
|
|
||||||
|
|||||||
19
services/queue.py
Normal file
19
services/queue.py
Normal file
@@ -0,0 +1,19 @@
|
|||||||
|
import asyncio
|
||||||
|
from typing import Awaitable, Callable
|
||||||
|
|
||||||
|
|
||||||
|
_queue: asyncio.Queue = asyncio.Queue()
|
||||||
|
|
||||||
|
|
||||||
|
async def enqueue(job: Callable[[], Awaitable[None]]) -> int:
|
||||||
|
await _queue.put(job)
|
||||||
|
return _queue.qsize()
|
||||||
|
|
||||||
|
|
||||||
|
async def worker():
|
||||||
|
while True:
|
||||||
|
job = await _queue.get()
|
||||||
|
try:
|
||||||
|
await job()
|
||||||
|
finally:
|
||||||
|
_queue.task_done()
|
||||||
Reference in New Issue
Block a user