Compare commits

...

4 Commits

10 changed files with 146 additions and 62 deletions

2
app.py
View File

@@ -15,7 +15,7 @@ else:
paths_cfg = cfg.get("paths", {}) paths_cfg = cfg.get("paths", {})
runtime_state.configure(paths_cfg.get("runtime_state", "/var/server-bot/runtime.json")) runtime_state.configure(paths_cfg.get("runtime_state", "/var/server-bot/runtime.json"))
ARTIFACT_STATE = paths_cfg["artifact_state"] ARTIFACT_STATE = paths_cfg.get("artifact_state", "/opt/tg-bot/state.json")
RESTIC_ENV = load_env(paths_cfg.get("restic_env", "/etc/restic/restic.env")) RESTIC_ENV = load_env(paths_cfg.get("restic_env", "/etc/restic/restic.env"))
DISK_WARN = int(cfg.get("thresholds", {}).get("disk_warn", 80)) DISK_WARN = int(cfg.get("thresholds", {}).get("disk_warn", 80))

View File

@@ -2,7 +2,7 @@ import asyncio
from datetime import datetime from datetime import datetime
from aiogram import F from aiogram import F
from aiogram.types import Message, InlineKeyboardMarkup, InlineKeyboardButton, CallbackQuery from aiogram.types import Message, InlineKeyboardMarkup, InlineKeyboardButton, CallbackQuery
from app import dp, cfg from app import dp, cfg, ADMIN_IDS
from auth import is_admin_msg from auth import is_admin_msg
from keyboards import docker_kb, arcane_kb from keyboards import docker_kb, arcane_kb
from services.arcane import list_projects, restart_project, set_project_state, get_project_details from services.arcane import list_projects, restart_project, set_project_state, get_project_details
@@ -115,7 +115,7 @@ async def arcane_refresh(msg: Message):
@dp.callback_query(F.data == "arcane:refresh") @dp.callback_query(F.data == "arcane:refresh")
async def arcane_refresh_inline(cb: CallbackQuery): async def arcane_refresh_inline(cb: CallbackQuery):
if cb.from_user.id != cfg["telegram"]["admin_id"]: if cb.from_user.id not in ADMIN_IDS:
return return
await cb.answer() await cb.answer()
await cmd_arcane_projects(cb.message, edit=True) await cmd_arcane_projects(cb.message, edit=True)
@@ -123,7 +123,7 @@ async def arcane_refresh_inline(cb: CallbackQuery):
@dp.callback_query(F.data.startswith("arcane:page:")) @dp.callback_query(F.data.startswith("arcane:page:"))
async def arcane_page(cb: CallbackQuery): async def arcane_page(cb: CallbackQuery):
if cb.from_user.id != cfg["telegram"]["admin_id"]: if cb.from_user.id not in ADMIN_IDS:
return return
try: try:
page = int(cb.data.split(":", 2)[2]) page = int(cb.data.split(":", 2)[2])
@@ -141,7 +141,7 @@ async def arcane_page(cb: CallbackQuery):
@dp.callback_query(F.data.startswith("arcane:restart:")) @dp.callback_query(F.data.startswith("arcane:restart:"))
async def arcane_restart(cb: CallbackQuery): async def arcane_restart(cb: CallbackQuery):
if cb.from_user.id != cfg["telegram"]["admin_id"]: if cb.from_user.id not in ADMIN_IDS:
return return
_, _, pid = cb.data.split(":", 2) _, _, pid = cb.data.split(":", 2)
@@ -160,7 +160,7 @@ async def arcane_restart(cb: CallbackQuery):
@dp.callback_query(F.data.startswith("arcane:details:")) @dp.callback_query(F.data.startswith("arcane:details:"))
async def arcane_details(cb: CallbackQuery): async def arcane_details(cb: CallbackQuery):
if cb.from_user.id != cfg["telegram"]["admin_id"]: if cb.from_user.id not in ADMIN_IDS:
return return
_, _, pid = cb.data.split(":", 2) _, _, pid = cb.data.split(":", 2)
@@ -208,7 +208,7 @@ async def arcane_details(cb: CallbackQuery):
@dp.callback_query(F.data.startswith("arcane:deploy:")) @dp.callback_query(F.data.startswith("arcane:deploy:"))
async def arcane_deploy_status(cb: CallbackQuery): async def arcane_deploy_status(cb: CallbackQuery):
if cb.from_user.id != cfg["telegram"]["admin_id"]: if cb.from_user.id not in ADMIN_IDS:
return return
_, _, pid = cb.data.split(":", 2) _, _, pid = cb.data.split(":", 2)
@@ -254,7 +254,7 @@ async def arcane_deploy_status(cb: CallbackQuery):
@dp.callback_query(F.data.startswith("arcane:up:")) @dp.callback_query(F.data.startswith("arcane:up:"))
async def arcane_up(cb: CallbackQuery): async def arcane_up(cb: CallbackQuery):
if cb.from_user.id != cfg["telegram"]["admin_id"]: if cb.from_user.id not in ADMIN_IDS:
return return
_, _, pid = cb.data.split(":", 2) _, _, pid = cb.data.split(":", 2)
@@ -273,7 +273,7 @@ async def arcane_up(cb: CallbackQuery):
@dp.callback_query(F.data.startswith("arcane:down:")) @dp.callback_query(F.data.startswith("arcane:down:"))
async def arcane_down(cb: CallbackQuery): async def arcane_down(cb: CallbackQuery):
if cb.from_user.id != cfg["telegram"]["admin_id"]: if cb.from_user.id not in ADMIN_IDS:
return return
_, _, pid = cb.data.split(":", 2) _, _, pid = cb.data.split(":", 2)

View File

@@ -15,8 +15,15 @@ async def docker_callback(cb: CallbackQuery):
if cb.from_user.id != ADMIN_ID: if cb.from_user.id != ADMIN_ID:
return return
_, action, alias = cb.data.split(":", 2) try:
real = DOCKER_MAP[alias] _, action, alias = cb.data.split(":", 2)
except ValueError:
await cb.answer("Bad request")
return
real = DOCKER_MAP.get(alias)
if not real:
await cb.answer("Container not found")
return
if action == "restart": if action == "restart":
await cb.answer("Restarting…") await cb.answer("Restarting…")

View File

@@ -3,7 +3,7 @@ import os
from datetime import datetime, timezone, timedelta from datetime import datetime, timezone, timedelta
from aiogram import F from aiogram import F
from aiogram.types import Message, CallbackQuery, InlineKeyboardMarkup, InlineKeyboardButton, InputFile, BufferedInputFile from aiogram.types import Message, CallbackQuery, InlineKeyboardMarkup, InlineKeyboardButton, InputFile, BufferedInputFile
from app import dp, cfg from app import dp, cfg, ADMIN_IDS
from auth import is_admin_msg from auth import is_admin_msg
from keyboards import ( from keyboards import (
system_info_kb, system_info_kb,
@@ -223,13 +223,6 @@ async def openwrt_status(msg: Message):
asyncio.create_task(worker()) asyncio.create_task(worker())
@dp.message(F.text == "/openwrt")
async def openwrt_cmd(msg: Message):
if not is_admin_msg(msg):
return
await openwrt_status(msg)
@dp.message(F.text == "/openwrt_wan") @dp.message(F.text == "/openwrt_wan")
async def openwrt_wan(msg: Message): async def openwrt_wan(msg: Message):
if not is_admin_msg(msg): if not is_admin_msg(msg):
@@ -1029,7 +1022,7 @@ async def updates_page(cb: CallbackQuery):
@dp.callback_query(F.data == "upgrade:confirm") @dp.callback_query(F.data == "upgrade:confirm")
async def upgrade_confirm(cb: CallbackQuery): async def upgrade_confirm(cb: CallbackQuery):
if cb.from_user.id != cfg["telegram"]["admin_id"]: if cb.from_user.id not in ADMIN_IDS:
return return
await cb.answer() await cb.answer()
@@ -1052,7 +1045,7 @@ async def upgrade_cancel(cb: CallbackQuery):
@dp.callback_query(F.data == "reboot:confirm") @dp.callback_query(F.data == "reboot:confirm")
async def reboot_confirm(cb: CallbackQuery): async def reboot_confirm(cb: CallbackQuery):
if cb.from_user.id != cfg["telegram"]["admin_id"]: if cb.from_user.id not in ADMIN_IDS:
return return
await cb.answer() await cb.answer()
REBOOT_PENDING[cb.from_user.id] = {} REBOOT_PENDING[cb.from_user.id] = {}
@@ -1067,7 +1060,7 @@ async def reboot_cancel(cb: CallbackQuery):
@dp.callback_query(F.data.startswith("npmplus:")) @dp.callback_query(F.data.startswith("npmplus:"))
async def npmplus_toggle(cb: CallbackQuery): async def npmplus_toggle(cb: CallbackQuery):
if cb.from_user.id != cfg["telegram"]["admin_id"]: if cb.from_user.id not in ADMIN_IDS:
return return
parts = cb.data.split(":") parts = cb.data.split(":")
if len(parts) != 3: if len(parts) != 3:

View File

@@ -1,4 +1,5 @@
from pathlib import Path from pathlib import Path
import os
import time import time
LOCK_DIR = Path("/var/run/tg-bot") LOCK_DIR = Path("/var/run/tg-bot")
@@ -11,9 +12,14 @@ def lock_path(name: str) -> Path:
def acquire_lock(name: str) -> bool: def acquire_lock(name: str) -> bool:
p = lock_path(name) p = lock_path(name)
if p.exists(): try:
fd = os.open(str(p), os.O_CREAT | os.O_EXCL | os.O_WRONLY)
except FileExistsError:
return False return False
p.write_text(str(time.time())) try:
os.write(fd, str(time.time()).encode("ascii", errors="ignore"))
finally:
os.close(fd)
return True return True

View File

@@ -9,7 +9,9 @@ def validate_cfg(cfg: dict[str, Any]) -> Tuple[List[str], List[str]]:
tg = cfg.get("telegram", {}) tg = cfg.get("telegram", {})
if not tg.get("token"): if not tg.get("token"):
errors.append("telegram.token is missing") errors.append("telegram.token is missing")
if not tg.get("admin_id"): admin_ids = tg.get("admin_ids")
has_admin_ids = isinstance(admin_ids, list) and len(admin_ids) > 0
if not tg.get("admin_id") and not has_admin_ids:
errors.append("telegram.admin_id is missing") errors.append("telegram.admin_id is missing")
thresholds = cfg.get("thresholds", {}) thresholds = cfg.get("thresholds", {})

View File

@@ -1,11 +1,48 @@
import os import os
import re
from typing import Any from typing import Any
from services.runner import run_cmd from services.runner import run_cmd
def _top_dirs_cmd(path: str, limit: int) -> list[str]: def _top_dirs_cmd(path: str, limit: int) -> list[str]:
return ["bash", "-lc", f"du -xhd1 {path} 2>/dev/null | sort -h | tail -n {limit}"] _ = limit
return ["du", "-x", "-h", "-d", "1", path]
_SIZE_RE = re.compile(r"^\s*([0-9]+(?:\.[0-9]+)?)([KMGTP]?)(i?B?)?$", re.IGNORECASE)
def _size_to_bytes(value: str) -> float:
m = _SIZE_RE.match(value.strip())
if not m:
return -1.0
num = float(m.group(1))
unit = (m.group(2) or "").upper()
mul = {
"": 1,
"K": 1024,
"M": 1024**2,
"G": 1024**3,
"T": 1024**4,
"P": 1024**5,
}.get(unit, 1)
return num * mul
def _format_top_dirs(raw: str, limit: int) -> str:
rows: list[tuple[float, str]] = []
for line in raw.splitlines():
line = line.strip()
if not line:
continue
parts = line.split(maxsplit=1)
if len(parts) != 2:
continue
size, name = parts
rows.append((_size_to_bytes(size), f"{size}\t{name}"))
rows.sort(key=lambda x: x[0])
return "\n".join(line for _sz, line in rows[-max(1, limit):])
async def build_disk_report(cfg: dict[str, Any], mount: str, usage: int) -> str: async def build_disk_report(cfg: dict[str, Any], mount: str, usage: int) -> str:
@@ -15,24 +52,27 @@ async def build_disk_report(cfg: dict[str, Any], mount: str, usage: int) -> str:
rc, out = await run_cmd(_top_dirs_cmd(mount, limit), timeout=30) rc, out = await run_cmd(_top_dirs_cmd(mount, limit), timeout=30)
if rc == 0 and out.strip(): if rc == 0 and out.strip():
top_out = _format_top_dirs(out, limit)
lines.append("") lines.append("")
lines.append("Top directories:") lines.append("Top directories:")
lines.append(out.strip()) lines.append(top_out)
docker_dir = cfg.get("disk_report", {}).get("docker_dir", "/var/lib/docker") docker_dir = cfg.get("disk_report", {}).get("docker_dir", "/var/lib/docker")
if docker_dir and os.path.exists(docker_dir): if docker_dir and os.path.exists(docker_dir):
rc2, out2 = await run_cmd(_top_dirs_cmd(docker_dir, limit), timeout=30) rc2, out2 = await run_cmd(_top_dirs_cmd(docker_dir, limit), timeout=30)
if rc2 == 0 and out2.strip(): if rc2 == 0 and out2.strip():
top_out2 = _format_top_dirs(out2, limit)
lines.append("") lines.append("")
lines.append(f"Docker dir: {docker_dir}") lines.append(f"Docker dir: {docker_dir}")
lines.append(out2.strip()) lines.append(top_out2)
logs_dir = cfg.get("disk_report", {}).get("logs_dir", "/var/log") logs_dir = cfg.get("disk_report", {}).get("logs_dir", "/var/log")
if logs_dir and os.path.exists(logs_dir): if logs_dir and os.path.exists(logs_dir):
rc3, out3 = await run_cmd(_top_dirs_cmd(logs_dir, limit), timeout=30) rc3, out3 = await run_cmd(_top_dirs_cmd(logs_dir, limit), timeout=30)
if rc3 == 0 and out3.strip(): if rc3 == 0 and out3.strip():
top_out3 = _format_top_dirs(out3, limit)
lines.append("") lines.append("")
lines.append(f"Logs dir: {logs_dir}") lines.append(f"Logs dir: {logs_dir}")
lines.append(out3.strip()) lines.append(top_out3)
return "\n".join(lines) return "\n".join(lines)

View File

@@ -1,4 +1,4 @@
import os import os
import ssl import ssl
import subprocess import subprocess
import psutil import psutil
@@ -37,15 +37,17 @@ def _npm_api_base(cfg) -> str | None:
def health(cfg, container_map: dict | None = None) -> str: def health(cfg, container_map: dict | None = None) -> str:
lines = ["🩺 Health check\n"] lines = ["рџ©є Health check\n"]
thresholds = cfg.get("thresholds", {})
disk_warn = int(thresholds.get("disk_warn", 80))
load_warn = float(thresholds.get("load_warn", 2.0))
try: try:
env = os.environ.copy() env = os.environ.copy()
env.update(RESTIC_ENV) env.update(RESTIC_ENV)
subprocess.check_output(["restic", "snapshots"], timeout=10, env=env) subprocess.check_output(["restic", "snapshots"], timeout=10, env=env)
lines.append("🟢 Backup repo reachable") lines.append("рџџў Backup repo reachable")
except Exception: except Exception:
lines.append("🔴 Backup repo unreachable") lines.append("🔴 Backup repo unreachable")
containers = container_map if container_map is not None else _containers_from_cfg(cfg) containers = container_map if container_map is not None else _containers_from_cfg(cfg)
for alias, real in containers.items(): for alias, real in containers.items():
@@ -53,20 +55,20 @@ def health(cfg, container_map: dict | None = None) -> str:
f"docker inspect -f '{{{{.State.Status}}}}' {real}" f"docker inspect -f '{{{{.State.Status}}}}' {real}"
) )
if out.strip() != "running": if out.strip() != "running":
lines.append(f"🔴 {alias} down") lines.append(f"🔴 {alias} down")
else: else:
lines.append(f"🟢 {alias} OK") lines.append(f"рџџў {alias} OK")
npm_cfg = cfg.get("npmplus", {}) npm_cfg = cfg.get("npmplus", {})
npm_base = _npm_api_base(cfg) npm_base = _npm_api_base(cfg)
if npm_base: if npm_base:
npm_status = _request_status(npm_base, npm_cfg.get("verify_tls", True)) npm_status = _request_status(npm_base, npm_cfg.get("verify_tls", True))
if npm_status == 200: if npm_status == 200:
lines.append("🟢 NPMplus API OK") lines.append("рџџў NPMplus API OK")
elif npm_status is None: elif npm_status is None:
lines.append("🔴 NPMplus API unreachable") lines.append("🔴 NPMplus API unreachable")
else: else:
lines.append(f"🟡 NPMplus API HTTP {npm_status}") lines.append(f"рџџЎ NPMplus API HTTP {npm_status}")
g_cfg = cfg.get("gitea", {}) g_cfg = cfg.get("gitea", {})
g_base = (g_cfg.get("base_url") or "").rstrip("/") g_base = (g_cfg.get("base_url") or "").rstrip("/")
@@ -82,21 +84,22 @@ def health(cfg, container_map: dict | None = None) -> str:
g_status = status g_status = status
break break
if g_status == 200: if g_status == 200:
lines.append("🟢 Gitea API OK") lines.append("рџџў Gitea API OK")
elif g_status is None: elif g_status is None:
lines.append("🔴 Gitea API unreachable") lines.append("🔴 Gitea API unreachable")
else: else:
lines.append(f"🟡 Gitea API HTTP {g_status}") lines.append(f"рџџЎ Gitea API HTTP {g_status}")
usage, mount = worst_disk_usage() usage, mount = worst_disk_usage()
if usage is None: if usage is None:
lines.append("⚠️ Disk n/a") lines.append("вљ пёЏ Disk n/a")
elif usage > cfg["thresholds"]["disk_warn"]: elif usage > disk_warn:
lines.append(f"🟡 Disk {usage}% ({mount})") lines.append(f"рџџЎ Disk {usage}% ({mount})")
else: else:
lines.append(f"🟢 Disk {usage}% ({mount})") lines.append(f"рџџў Disk {usage}% ({mount})")
load = psutil.getloadavg()[0] load = psutil.getloadavg()[0]
lines.append(f"{'🟢' if load < cfg['thresholds']['load_warn'] else '🟡'} Load {load}") lines.append(f"{'рџџў' if load < load_warn else 'рџџЎ'} Load {load}")
return "\n".join(lines) return "\n".join(lines)

View File

@@ -1,4 +1,5 @@
import asyncio import asyncio
import logging
import time import time
from collections import deque from collections import deque
from typing import Awaitable, Callable, Any from typing import Awaitable, Callable, Any
@@ -25,6 +26,7 @@ _alert_cfg: dict[str, Any] = {
"last_sent": 0.0, "last_sent": 0.0,
} }
_cfg: dict[str, Any] | None = None _cfg: dict[str, Any] | None = None
_logger = logging.getLogger("queue")
def _save_stats(): def _save_stats():
@@ -85,8 +87,18 @@ async def worker():
status = "ok" status = "ok"
try: try:
await job() await job()
except Exception: except Exception as e:
status = "err" status = "err"
_logger.exception("Queue job failed: label=%s", label)
if _cfg:
try:
log_incident(
_cfg,
f"queue_job_failed label={label} error={type(e).__name__}: {e}",
category="queue",
)
except Exception:
pass
finally: finally:
finished_at = time.time() finished_at = time.time()
if _current_meta: if _current_meta:

View File

@@ -1,9 +1,13 @@
import json import json
import os import os
import threading
import tempfile
from typing import Any, Dict from typing import Any, Dict
_PATH = "/var/server-bot/runtime.json" _PATH = "/var/server-bot/runtime.json"
_STATE: Dict[str, Any] = {} _STATE: Dict[str, Any] = {}
_LOCK = threading.RLock()
_LOADED = False
def configure(path: str | None): def configure(path: str | None):
@@ -13,40 +17,57 @@ def configure(path: str | None):
def _load_from_disk(): def _load_from_disk():
global _STATE global _STATE, _LOADED
if not os.path.exists(_PATH): if not os.path.exists(_PATH):
_STATE = {} _STATE = {}
_LOADED = True
return return
try: try:
with open(_PATH, "r", encoding="utf-8") as f: with open(_PATH, "r", encoding="utf-8") as f:
_STATE = json.load(f) _STATE = json.load(f)
except Exception: except Exception:
_STATE = {} _STATE = {}
_LOADED = True
def _save(): def _save():
os.makedirs(os.path.dirname(_PATH), exist_ok=True) directory = os.path.dirname(_PATH) or "."
os.makedirs(directory, exist_ok=True)
try: try:
with open(_PATH, "w", encoding="utf-8") as f: fd, tmp_path = tempfile.mkstemp(prefix=".runtime.", suffix=".json", dir=directory)
json.dump(_STATE, f) try:
with os.fdopen(fd, "w", encoding="utf-8") as f:
json.dump(_STATE, f, ensure_ascii=False)
f.flush()
os.fsync(f.fileno())
os.replace(tmp_path, _PATH)
finally:
if os.path.exists(tmp_path):
try:
os.unlink(tmp_path)
except Exception:
pass
except Exception: except Exception:
pass pass
def get_state() -> Dict[str, Any]: def get_state() -> Dict[str, Any]:
if not _STATE: with _LOCK:
_load_from_disk() if not _LOADED:
return _STATE _load_from_disk()
return _STATE
def set_state(key: str, value: Any): def set_state(key: str, value: Any):
if not _STATE: with _LOCK:
_load_from_disk() if not _LOADED:
_STATE[key] = value _load_from_disk()
_save() _STATE[key] = value
_save()
def get(key: str, default: Any = None) -> Any: def get(key: str, default: Any = None) -> Any:
if not _STATE: with _LOCK:
_load_from_disk() if not _LOADED:
return _STATE.get(key, default) _load_from_disk()
return _STATE.get(key, default)