Compare commits
4 Commits
568cd86844
...
7c56430f32
| Author | SHA1 | Date | |
|---|---|---|---|
| 7c56430f32 | |||
| b54a094185 | |||
| 6d5fb9c258 | |||
| 5099ae4fe2 |
2
app.py
2
app.py
@@ -15,7 +15,7 @@ else:
|
|||||||
|
|
||||||
paths_cfg = cfg.get("paths", {})
|
paths_cfg = cfg.get("paths", {})
|
||||||
runtime_state.configure(paths_cfg.get("runtime_state", "/var/server-bot/runtime.json"))
|
runtime_state.configure(paths_cfg.get("runtime_state", "/var/server-bot/runtime.json"))
|
||||||
ARTIFACT_STATE = paths_cfg["artifact_state"]
|
ARTIFACT_STATE = paths_cfg.get("artifact_state", "/opt/tg-bot/state.json")
|
||||||
RESTIC_ENV = load_env(paths_cfg.get("restic_env", "/etc/restic/restic.env"))
|
RESTIC_ENV = load_env(paths_cfg.get("restic_env", "/etc/restic/restic.env"))
|
||||||
|
|
||||||
DISK_WARN = int(cfg.get("thresholds", {}).get("disk_warn", 80))
|
DISK_WARN = int(cfg.get("thresholds", {}).get("disk_warn", 80))
|
||||||
|
|||||||
@@ -2,7 +2,7 @@ import asyncio
|
|||||||
from datetime import datetime
|
from datetime import datetime
|
||||||
from aiogram import F
|
from aiogram import F
|
||||||
from aiogram.types import Message, InlineKeyboardMarkup, InlineKeyboardButton, CallbackQuery
|
from aiogram.types import Message, InlineKeyboardMarkup, InlineKeyboardButton, CallbackQuery
|
||||||
from app import dp, cfg
|
from app import dp, cfg, ADMIN_IDS
|
||||||
from auth import is_admin_msg
|
from auth import is_admin_msg
|
||||||
from keyboards import docker_kb, arcane_kb
|
from keyboards import docker_kb, arcane_kb
|
||||||
from services.arcane import list_projects, restart_project, set_project_state, get_project_details
|
from services.arcane import list_projects, restart_project, set_project_state, get_project_details
|
||||||
@@ -115,7 +115,7 @@ async def arcane_refresh(msg: Message):
|
|||||||
|
|
||||||
@dp.callback_query(F.data == "arcane:refresh")
|
@dp.callback_query(F.data == "arcane:refresh")
|
||||||
async def arcane_refresh_inline(cb: CallbackQuery):
|
async def arcane_refresh_inline(cb: CallbackQuery):
|
||||||
if cb.from_user.id != cfg["telegram"]["admin_id"]:
|
if cb.from_user.id not in ADMIN_IDS:
|
||||||
return
|
return
|
||||||
await cb.answer()
|
await cb.answer()
|
||||||
await cmd_arcane_projects(cb.message, edit=True)
|
await cmd_arcane_projects(cb.message, edit=True)
|
||||||
@@ -123,7 +123,7 @@ async def arcane_refresh_inline(cb: CallbackQuery):
|
|||||||
|
|
||||||
@dp.callback_query(F.data.startswith("arcane:page:"))
|
@dp.callback_query(F.data.startswith("arcane:page:"))
|
||||||
async def arcane_page(cb: CallbackQuery):
|
async def arcane_page(cb: CallbackQuery):
|
||||||
if cb.from_user.id != cfg["telegram"]["admin_id"]:
|
if cb.from_user.id not in ADMIN_IDS:
|
||||||
return
|
return
|
||||||
try:
|
try:
|
||||||
page = int(cb.data.split(":", 2)[2])
|
page = int(cb.data.split(":", 2)[2])
|
||||||
@@ -141,7 +141,7 @@ async def arcane_page(cb: CallbackQuery):
|
|||||||
|
|
||||||
@dp.callback_query(F.data.startswith("arcane:restart:"))
|
@dp.callback_query(F.data.startswith("arcane:restart:"))
|
||||||
async def arcane_restart(cb: CallbackQuery):
|
async def arcane_restart(cb: CallbackQuery):
|
||||||
if cb.from_user.id != cfg["telegram"]["admin_id"]:
|
if cb.from_user.id not in ADMIN_IDS:
|
||||||
return
|
return
|
||||||
|
|
||||||
_, _, pid = cb.data.split(":", 2)
|
_, _, pid = cb.data.split(":", 2)
|
||||||
@@ -160,7 +160,7 @@ async def arcane_restart(cb: CallbackQuery):
|
|||||||
|
|
||||||
@dp.callback_query(F.data.startswith("arcane:details:"))
|
@dp.callback_query(F.data.startswith("arcane:details:"))
|
||||||
async def arcane_details(cb: CallbackQuery):
|
async def arcane_details(cb: CallbackQuery):
|
||||||
if cb.from_user.id != cfg["telegram"]["admin_id"]:
|
if cb.from_user.id not in ADMIN_IDS:
|
||||||
return
|
return
|
||||||
|
|
||||||
_, _, pid = cb.data.split(":", 2)
|
_, _, pid = cb.data.split(":", 2)
|
||||||
@@ -208,7 +208,7 @@ async def arcane_details(cb: CallbackQuery):
|
|||||||
|
|
||||||
@dp.callback_query(F.data.startswith("arcane:deploy:"))
|
@dp.callback_query(F.data.startswith("arcane:deploy:"))
|
||||||
async def arcane_deploy_status(cb: CallbackQuery):
|
async def arcane_deploy_status(cb: CallbackQuery):
|
||||||
if cb.from_user.id != cfg["telegram"]["admin_id"]:
|
if cb.from_user.id not in ADMIN_IDS:
|
||||||
return
|
return
|
||||||
|
|
||||||
_, _, pid = cb.data.split(":", 2)
|
_, _, pid = cb.data.split(":", 2)
|
||||||
@@ -254,7 +254,7 @@ async def arcane_deploy_status(cb: CallbackQuery):
|
|||||||
|
|
||||||
@dp.callback_query(F.data.startswith("arcane:up:"))
|
@dp.callback_query(F.data.startswith("arcane:up:"))
|
||||||
async def arcane_up(cb: CallbackQuery):
|
async def arcane_up(cb: CallbackQuery):
|
||||||
if cb.from_user.id != cfg["telegram"]["admin_id"]:
|
if cb.from_user.id not in ADMIN_IDS:
|
||||||
return
|
return
|
||||||
|
|
||||||
_, _, pid = cb.data.split(":", 2)
|
_, _, pid = cb.data.split(":", 2)
|
||||||
@@ -273,7 +273,7 @@ async def arcane_up(cb: CallbackQuery):
|
|||||||
|
|
||||||
@dp.callback_query(F.data.startswith("arcane:down:"))
|
@dp.callback_query(F.data.startswith("arcane:down:"))
|
||||||
async def arcane_down(cb: CallbackQuery):
|
async def arcane_down(cb: CallbackQuery):
|
||||||
if cb.from_user.id != cfg["telegram"]["admin_id"]:
|
if cb.from_user.id not in ADMIN_IDS:
|
||||||
return
|
return
|
||||||
|
|
||||||
_, _, pid = cb.data.split(":", 2)
|
_, _, pid = cb.data.split(":", 2)
|
||||||
|
|||||||
@@ -15,8 +15,15 @@ async def docker_callback(cb: CallbackQuery):
|
|||||||
if cb.from_user.id != ADMIN_ID:
|
if cb.from_user.id != ADMIN_ID:
|
||||||
return
|
return
|
||||||
|
|
||||||
|
try:
|
||||||
_, action, alias = cb.data.split(":", 2)
|
_, action, alias = cb.data.split(":", 2)
|
||||||
real = DOCKER_MAP[alias]
|
except ValueError:
|
||||||
|
await cb.answer("Bad request")
|
||||||
|
return
|
||||||
|
real = DOCKER_MAP.get(alias)
|
||||||
|
if not real:
|
||||||
|
await cb.answer("Container not found")
|
||||||
|
return
|
||||||
|
|
||||||
if action == "restart":
|
if action == "restart":
|
||||||
await cb.answer("Restarting…")
|
await cb.answer("Restarting…")
|
||||||
|
|||||||
@@ -3,7 +3,7 @@ import os
|
|||||||
from datetime import datetime, timezone, timedelta
|
from datetime import datetime, timezone, timedelta
|
||||||
from aiogram import F
|
from aiogram import F
|
||||||
from aiogram.types import Message, CallbackQuery, InlineKeyboardMarkup, InlineKeyboardButton, InputFile, BufferedInputFile
|
from aiogram.types import Message, CallbackQuery, InlineKeyboardMarkup, InlineKeyboardButton, InputFile, BufferedInputFile
|
||||||
from app import dp, cfg
|
from app import dp, cfg, ADMIN_IDS
|
||||||
from auth import is_admin_msg
|
from auth import is_admin_msg
|
||||||
from keyboards import (
|
from keyboards import (
|
||||||
system_info_kb,
|
system_info_kb,
|
||||||
@@ -223,13 +223,6 @@ async def openwrt_status(msg: Message):
|
|||||||
asyncio.create_task(worker())
|
asyncio.create_task(worker())
|
||||||
|
|
||||||
|
|
||||||
@dp.message(F.text == "/openwrt")
|
|
||||||
async def openwrt_cmd(msg: Message):
|
|
||||||
if not is_admin_msg(msg):
|
|
||||||
return
|
|
||||||
await openwrt_status(msg)
|
|
||||||
|
|
||||||
|
|
||||||
@dp.message(F.text == "/openwrt_wan")
|
@dp.message(F.text == "/openwrt_wan")
|
||||||
async def openwrt_wan(msg: Message):
|
async def openwrt_wan(msg: Message):
|
||||||
if not is_admin_msg(msg):
|
if not is_admin_msg(msg):
|
||||||
@@ -1029,7 +1022,7 @@ async def updates_page(cb: CallbackQuery):
|
|||||||
|
|
||||||
@dp.callback_query(F.data == "upgrade:confirm")
|
@dp.callback_query(F.data == "upgrade:confirm")
|
||||||
async def upgrade_confirm(cb: CallbackQuery):
|
async def upgrade_confirm(cb: CallbackQuery):
|
||||||
if cb.from_user.id != cfg["telegram"]["admin_id"]:
|
if cb.from_user.id not in ADMIN_IDS:
|
||||||
return
|
return
|
||||||
await cb.answer()
|
await cb.answer()
|
||||||
|
|
||||||
@@ -1052,7 +1045,7 @@ async def upgrade_cancel(cb: CallbackQuery):
|
|||||||
|
|
||||||
@dp.callback_query(F.data == "reboot:confirm")
|
@dp.callback_query(F.data == "reboot:confirm")
|
||||||
async def reboot_confirm(cb: CallbackQuery):
|
async def reboot_confirm(cb: CallbackQuery):
|
||||||
if cb.from_user.id != cfg["telegram"]["admin_id"]:
|
if cb.from_user.id not in ADMIN_IDS:
|
||||||
return
|
return
|
||||||
await cb.answer()
|
await cb.answer()
|
||||||
REBOOT_PENDING[cb.from_user.id] = {}
|
REBOOT_PENDING[cb.from_user.id] = {}
|
||||||
@@ -1067,7 +1060,7 @@ async def reboot_cancel(cb: CallbackQuery):
|
|||||||
|
|
||||||
@dp.callback_query(F.data.startswith("npmplus:"))
|
@dp.callback_query(F.data.startswith("npmplus:"))
|
||||||
async def npmplus_toggle(cb: CallbackQuery):
|
async def npmplus_toggle(cb: CallbackQuery):
|
||||||
if cb.from_user.id != cfg["telegram"]["admin_id"]:
|
if cb.from_user.id not in ADMIN_IDS:
|
||||||
return
|
return
|
||||||
parts = cb.data.split(":")
|
parts = cb.data.split(":")
|
||||||
if len(parts) != 3:
|
if len(parts) != 3:
|
||||||
|
|||||||
@@ -1,4 +1,5 @@
|
|||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
|
import os
|
||||||
import time
|
import time
|
||||||
|
|
||||||
LOCK_DIR = Path("/var/run/tg-bot")
|
LOCK_DIR = Path("/var/run/tg-bot")
|
||||||
@@ -11,9 +12,14 @@ def lock_path(name: str) -> Path:
|
|||||||
|
|
||||||
def acquire_lock(name: str) -> bool:
|
def acquire_lock(name: str) -> bool:
|
||||||
p = lock_path(name)
|
p = lock_path(name)
|
||||||
if p.exists():
|
try:
|
||||||
|
fd = os.open(str(p), os.O_CREAT | os.O_EXCL | os.O_WRONLY)
|
||||||
|
except FileExistsError:
|
||||||
return False
|
return False
|
||||||
p.write_text(str(time.time()))
|
try:
|
||||||
|
os.write(fd, str(time.time()).encode("ascii", errors="ignore"))
|
||||||
|
finally:
|
||||||
|
os.close(fd)
|
||||||
return True
|
return True
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@@ -9,7 +9,9 @@ def validate_cfg(cfg: dict[str, Any]) -> Tuple[List[str], List[str]]:
|
|||||||
tg = cfg.get("telegram", {})
|
tg = cfg.get("telegram", {})
|
||||||
if not tg.get("token"):
|
if not tg.get("token"):
|
||||||
errors.append("telegram.token is missing")
|
errors.append("telegram.token is missing")
|
||||||
if not tg.get("admin_id"):
|
admin_ids = tg.get("admin_ids")
|
||||||
|
has_admin_ids = isinstance(admin_ids, list) and len(admin_ids) > 0
|
||||||
|
if not tg.get("admin_id") and not has_admin_ids:
|
||||||
errors.append("telegram.admin_id is missing")
|
errors.append("telegram.admin_id is missing")
|
||||||
|
|
||||||
thresholds = cfg.get("thresholds", {})
|
thresholds = cfg.get("thresholds", {})
|
||||||
|
|||||||
@@ -1,11 +1,48 @@
|
|||||||
import os
|
import os
|
||||||
|
import re
|
||||||
from typing import Any
|
from typing import Any
|
||||||
|
|
||||||
from services.runner import run_cmd
|
from services.runner import run_cmd
|
||||||
|
|
||||||
|
|
||||||
def _top_dirs_cmd(path: str, limit: int) -> list[str]:
|
def _top_dirs_cmd(path: str, limit: int) -> list[str]:
|
||||||
return ["bash", "-lc", f"du -xhd1 {path} 2>/dev/null | sort -h | tail -n {limit}"]
|
_ = limit
|
||||||
|
return ["du", "-x", "-h", "-d", "1", path]
|
||||||
|
|
||||||
|
|
||||||
|
_SIZE_RE = re.compile(r"^\s*([0-9]+(?:\.[0-9]+)?)([KMGTP]?)(i?B?)?$", re.IGNORECASE)
|
||||||
|
|
||||||
|
|
||||||
|
def _size_to_bytes(value: str) -> float:
|
||||||
|
m = _SIZE_RE.match(value.strip())
|
||||||
|
if not m:
|
||||||
|
return -1.0
|
||||||
|
num = float(m.group(1))
|
||||||
|
unit = (m.group(2) or "").upper()
|
||||||
|
mul = {
|
||||||
|
"": 1,
|
||||||
|
"K": 1024,
|
||||||
|
"M": 1024**2,
|
||||||
|
"G": 1024**3,
|
||||||
|
"T": 1024**4,
|
||||||
|
"P": 1024**5,
|
||||||
|
}.get(unit, 1)
|
||||||
|
return num * mul
|
||||||
|
|
||||||
|
|
||||||
|
def _format_top_dirs(raw: str, limit: int) -> str:
|
||||||
|
rows: list[tuple[float, str]] = []
|
||||||
|
for line in raw.splitlines():
|
||||||
|
line = line.strip()
|
||||||
|
if not line:
|
||||||
|
continue
|
||||||
|
parts = line.split(maxsplit=1)
|
||||||
|
if len(parts) != 2:
|
||||||
|
continue
|
||||||
|
size, name = parts
|
||||||
|
rows.append((_size_to_bytes(size), f"{size}\t{name}"))
|
||||||
|
rows.sort(key=lambda x: x[0])
|
||||||
|
return "\n".join(line for _sz, line in rows[-max(1, limit):])
|
||||||
|
|
||||||
|
|
||||||
async def build_disk_report(cfg: dict[str, Any], mount: str, usage: int) -> str:
|
async def build_disk_report(cfg: dict[str, Any], mount: str, usage: int) -> str:
|
||||||
@@ -15,24 +52,27 @@ async def build_disk_report(cfg: dict[str, Any], mount: str, usage: int) -> str:
|
|||||||
|
|
||||||
rc, out = await run_cmd(_top_dirs_cmd(mount, limit), timeout=30)
|
rc, out = await run_cmd(_top_dirs_cmd(mount, limit), timeout=30)
|
||||||
if rc == 0 and out.strip():
|
if rc == 0 and out.strip():
|
||||||
|
top_out = _format_top_dirs(out, limit)
|
||||||
lines.append("")
|
lines.append("")
|
||||||
lines.append("Top directories:")
|
lines.append("Top directories:")
|
||||||
lines.append(out.strip())
|
lines.append(top_out)
|
||||||
|
|
||||||
docker_dir = cfg.get("disk_report", {}).get("docker_dir", "/var/lib/docker")
|
docker_dir = cfg.get("disk_report", {}).get("docker_dir", "/var/lib/docker")
|
||||||
if docker_dir and os.path.exists(docker_dir):
|
if docker_dir and os.path.exists(docker_dir):
|
||||||
rc2, out2 = await run_cmd(_top_dirs_cmd(docker_dir, limit), timeout=30)
|
rc2, out2 = await run_cmd(_top_dirs_cmd(docker_dir, limit), timeout=30)
|
||||||
if rc2 == 0 and out2.strip():
|
if rc2 == 0 and out2.strip():
|
||||||
|
top_out2 = _format_top_dirs(out2, limit)
|
||||||
lines.append("")
|
lines.append("")
|
||||||
lines.append(f"Docker dir: {docker_dir}")
|
lines.append(f"Docker dir: {docker_dir}")
|
||||||
lines.append(out2.strip())
|
lines.append(top_out2)
|
||||||
|
|
||||||
logs_dir = cfg.get("disk_report", {}).get("logs_dir", "/var/log")
|
logs_dir = cfg.get("disk_report", {}).get("logs_dir", "/var/log")
|
||||||
if logs_dir and os.path.exists(logs_dir):
|
if logs_dir and os.path.exists(logs_dir):
|
||||||
rc3, out3 = await run_cmd(_top_dirs_cmd(logs_dir, limit), timeout=30)
|
rc3, out3 = await run_cmd(_top_dirs_cmd(logs_dir, limit), timeout=30)
|
||||||
if rc3 == 0 and out3.strip():
|
if rc3 == 0 and out3.strip():
|
||||||
|
top_out3 = _format_top_dirs(out3, limit)
|
||||||
lines.append("")
|
lines.append("")
|
||||||
lines.append(f"Logs dir: {logs_dir}")
|
lines.append(f"Logs dir: {logs_dir}")
|
||||||
lines.append(out3.strip())
|
lines.append(top_out3)
|
||||||
|
|
||||||
return "\n".join(lines)
|
return "\n".join(lines)
|
||||||
|
|||||||
@@ -1,4 +1,4 @@
|
|||||||
import os
|
import os
|
||||||
import ssl
|
import ssl
|
||||||
import subprocess
|
import subprocess
|
||||||
import psutil
|
import psutil
|
||||||
@@ -37,15 +37,17 @@ def _npm_api_base(cfg) -> str | None:
|
|||||||
|
|
||||||
|
|
||||||
def health(cfg, container_map: dict | None = None) -> str:
|
def health(cfg, container_map: dict | None = None) -> str:
|
||||||
lines = ["🩺 Health check\n"]
|
lines = ["рџ©є Health check\n"]
|
||||||
|
thresholds = cfg.get("thresholds", {})
|
||||||
|
disk_warn = int(thresholds.get("disk_warn", 80))
|
||||||
|
load_warn = float(thresholds.get("load_warn", 2.0))
|
||||||
try:
|
try:
|
||||||
env = os.environ.copy()
|
env = os.environ.copy()
|
||||||
env.update(RESTIC_ENV)
|
env.update(RESTIC_ENV)
|
||||||
subprocess.check_output(["restic", "snapshots"], timeout=10, env=env)
|
subprocess.check_output(["restic", "snapshots"], timeout=10, env=env)
|
||||||
lines.append("🟢 Backup repo reachable")
|
lines.append("рџџў Backup repo reachable")
|
||||||
except Exception:
|
except Exception:
|
||||||
lines.append("🔴 Backup repo unreachable")
|
lines.append("🔴 Backup repo unreachable")
|
||||||
|
|
||||||
containers = container_map if container_map is not None else _containers_from_cfg(cfg)
|
containers = container_map if container_map is not None else _containers_from_cfg(cfg)
|
||||||
for alias, real in containers.items():
|
for alias, real in containers.items():
|
||||||
@@ -53,20 +55,20 @@ def health(cfg, container_map: dict | None = None) -> str:
|
|||||||
f"docker inspect -f '{{{{.State.Status}}}}' {real}"
|
f"docker inspect -f '{{{{.State.Status}}}}' {real}"
|
||||||
)
|
)
|
||||||
if out.strip() != "running":
|
if out.strip() != "running":
|
||||||
lines.append(f"🔴 {alias} down")
|
lines.append(f"🔴 {alias} down")
|
||||||
else:
|
else:
|
||||||
lines.append(f"🟢 {alias} OK")
|
lines.append(f"рџџў {alias} OK")
|
||||||
|
|
||||||
npm_cfg = cfg.get("npmplus", {})
|
npm_cfg = cfg.get("npmplus", {})
|
||||||
npm_base = _npm_api_base(cfg)
|
npm_base = _npm_api_base(cfg)
|
||||||
if npm_base:
|
if npm_base:
|
||||||
npm_status = _request_status(npm_base, npm_cfg.get("verify_tls", True))
|
npm_status = _request_status(npm_base, npm_cfg.get("verify_tls", True))
|
||||||
if npm_status == 200:
|
if npm_status == 200:
|
||||||
lines.append("🟢 NPMplus API OK")
|
lines.append("рџџў NPMplus API OK")
|
||||||
elif npm_status is None:
|
elif npm_status is None:
|
||||||
lines.append("🔴 NPMplus API unreachable")
|
lines.append("🔴 NPMplus API unreachable")
|
||||||
else:
|
else:
|
||||||
lines.append(f"🟡 NPMplus API HTTP {npm_status}")
|
lines.append(f"рџџЎ NPMplus API HTTP {npm_status}")
|
||||||
|
|
||||||
g_cfg = cfg.get("gitea", {})
|
g_cfg = cfg.get("gitea", {})
|
||||||
g_base = (g_cfg.get("base_url") or "").rstrip("/")
|
g_base = (g_cfg.get("base_url") or "").rstrip("/")
|
||||||
@@ -82,21 +84,22 @@ def health(cfg, container_map: dict | None = None) -> str:
|
|||||||
g_status = status
|
g_status = status
|
||||||
break
|
break
|
||||||
if g_status == 200:
|
if g_status == 200:
|
||||||
lines.append("🟢 Gitea API OK")
|
lines.append("рџџў Gitea API OK")
|
||||||
elif g_status is None:
|
elif g_status is None:
|
||||||
lines.append("🔴 Gitea API unreachable")
|
lines.append("🔴 Gitea API unreachable")
|
||||||
else:
|
else:
|
||||||
lines.append(f"🟡 Gitea API HTTP {g_status}")
|
lines.append(f"рџџЎ Gitea API HTTP {g_status}")
|
||||||
|
|
||||||
usage, mount = worst_disk_usage()
|
usage, mount = worst_disk_usage()
|
||||||
if usage is None:
|
if usage is None:
|
||||||
lines.append("⚠️ Disk n/a")
|
lines.append("вљ пёЏ Disk n/a")
|
||||||
elif usage > cfg["thresholds"]["disk_warn"]:
|
elif usage > disk_warn:
|
||||||
lines.append(f"🟡 Disk {usage}% ({mount})")
|
lines.append(f"рџџЎ Disk {usage}% ({mount})")
|
||||||
else:
|
else:
|
||||||
lines.append(f"🟢 Disk {usage}% ({mount})")
|
lines.append(f"рџџў Disk {usage}% ({mount})")
|
||||||
|
|
||||||
load = psutil.getloadavg()[0]
|
load = psutil.getloadavg()[0]
|
||||||
lines.append(f"{'🟢' if load < cfg['thresholds']['load_warn'] else '🟡'} Load {load}")
|
lines.append(f"{'рџџў' if load < load_warn else 'рџџЎ'} Load {load}")
|
||||||
|
|
||||||
return "\n".join(lines)
|
return "\n".join(lines)
|
||||||
|
|
||||||
|
|||||||
@@ -1,4 +1,5 @@
|
|||||||
import asyncio
|
import asyncio
|
||||||
|
import logging
|
||||||
import time
|
import time
|
||||||
from collections import deque
|
from collections import deque
|
||||||
from typing import Awaitable, Callable, Any
|
from typing import Awaitable, Callable, Any
|
||||||
@@ -25,6 +26,7 @@ _alert_cfg: dict[str, Any] = {
|
|||||||
"last_sent": 0.0,
|
"last_sent": 0.0,
|
||||||
}
|
}
|
||||||
_cfg: dict[str, Any] | None = None
|
_cfg: dict[str, Any] | None = None
|
||||||
|
_logger = logging.getLogger("queue")
|
||||||
|
|
||||||
|
|
||||||
def _save_stats():
|
def _save_stats():
|
||||||
@@ -85,8 +87,18 @@ async def worker():
|
|||||||
status = "ok"
|
status = "ok"
|
||||||
try:
|
try:
|
||||||
await job()
|
await job()
|
||||||
except Exception:
|
except Exception as e:
|
||||||
status = "err"
|
status = "err"
|
||||||
|
_logger.exception("Queue job failed: label=%s", label)
|
||||||
|
if _cfg:
|
||||||
|
try:
|
||||||
|
log_incident(
|
||||||
|
_cfg,
|
||||||
|
f"queue_job_failed label={label} error={type(e).__name__}: {e}",
|
||||||
|
category="queue",
|
||||||
|
)
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
finally:
|
finally:
|
||||||
finished_at = time.time()
|
finished_at = time.time()
|
||||||
if _current_meta:
|
if _current_meta:
|
||||||
|
|||||||
@@ -1,9 +1,13 @@
|
|||||||
import json
|
import json
|
||||||
import os
|
import os
|
||||||
|
import threading
|
||||||
|
import tempfile
|
||||||
from typing import Any, Dict
|
from typing import Any, Dict
|
||||||
|
|
||||||
_PATH = "/var/server-bot/runtime.json"
|
_PATH = "/var/server-bot/runtime.json"
|
||||||
_STATE: Dict[str, Any] = {}
|
_STATE: Dict[str, Any] = {}
|
||||||
|
_LOCK = threading.RLock()
|
||||||
|
_LOADED = False
|
||||||
|
|
||||||
|
|
||||||
def configure(path: str | None):
|
def configure(path: str | None):
|
||||||
@@ -13,40 +17,57 @@ def configure(path: str | None):
|
|||||||
|
|
||||||
|
|
||||||
def _load_from_disk():
|
def _load_from_disk():
|
||||||
global _STATE
|
global _STATE, _LOADED
|
||||||
if not os.path.exists(_PATH):
|
if not os.path.exists(_PATH):
|
||||||
_STATE = {}
|
_STATE = {}
|
||||||
|
_LOADED = True
|
||||||
return
|
return
|
||||||
try:
|
try:
|
||||||
with open(_PATH, "r", encoding="utf-8") as f:
|
with open(_PATH, "r", encoding="utf-8") as f:
|
||||||
_STATE = json.load(f)
|
_STATE = json.load(f)
|
||||||
except Exception:
|
except Exception:
|
||||||
_STATE = {}
|
_STATE = {}
|
||||||
|
_LOADED = True
|
||||||
|
|
||||||
|
|
||||||
def _save():
|
def _save():
|
||||||
os.makedirs(os.path.dirname(_PATH), exist_ok=True)
|
directory = os.path.dirname(_PATH) or "."
|
||||||
|
os.makedirs(directory, exist_ok=True)
|
||||||
try:
|
try:
|
||||||
with open(_PATH, "w", encoding="utf-8") as f:
|
fd, tmp_path = tempfile.mkstemp(prefix=".runtime.", suffix=".json", dir=directory)
|
||||||
json.dump(_STATE, f)
|
try:
|
||||||
|
with os.fdopen(fd, "w", encoding="utf-8") as f:
|
||||||
|
json.dump(_STATE, f, ensure_ascii=False)
|
||||||
|
f.flush()
|
||||||
|
os.fsync(f.fileno())
|
||||||
|
os.replace(tmp_path, _PATH)
|
||||||
|
finally:
|
||||||
|
if os.path.exists(tmp_path):
|
||||||
|
try:
|
||||||
|
os.unlink(tmp_path)
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
except Exception:
|
except Exception:
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
|
||||||
def get_state() -> Dict[str, Any]:
|
def get_state() -> Dict[str, Any]:
|
||||||
if not _STATE:
|
with _LOCK:
|
||||||
|
if not _LOADED:
|
||||||
_load_from_disk()
|
_load_from_disk()
|
||||||
return _STATE
|
return _STATE
|
||||||
|
|
||||||
|
|
||||||
def set_state(key: str, value: Any):
|
def set_state(key: str, value: Any):
|
||||||
if not _STATE:
|
with _LOCK:
|
||||||
|
if not _LOADED:
|
||||||
_load_from_disk()
|
_load_from_disk()
|
||||||
_STATE[key] = value
|
_STATE[key] = value
|
||||||
_save()
|
_save()
|
||||||
|
|
||||||
|
|
||||||
def get(key: str, default: Any = None) -> Any:
|
def get(key: str, default: Any = None) -> Any:
|
||||||
if not _STATE:
|
with _LOCK:
|
||||||
|
if not _LOADED:
|
||||||
_load_from_disk()
|
_load_from_disk()
|
||||||
return _STATE.get(key, default)
|
return _STATE.get(key, default)
|
||||||
|
|||||||
Reference in New Issue
Block a user