diff --git a/config.example.yaml b/config.example.yaml index 97e552c..5db000c 100644 --- a/config.example.yaml +++ b/config.example.yaml @@ -27,6 +27,14 @@ arcane: api_key: "arc_..." env_id: 0 +npmplus: + base_url: "https://10.10.10.10:81/api" + identity: "your@email.com" + secret: "yourPassword" + # Optional static token (not recommended if it expires) + token: "" + verify_tls: true + security: reboot_password: "CHANGE_ME" diff --git a/handlers/system.py b/handlers/system.py index 57df58e..489c726 100644 --- a/handlers/system.py +++ b/handlers/system.py @@ -9,6 +9,7 @@ from services.http_checks import get_url_checks, check_url from services.queue import enqueue from services.updates import list_updates, apply_updates from services.runner import run_cmd +from services.npmplus import fetch_certificates, format_certificates import state from state import UPDATES_CACHE, REBOOT_PENDING from services.metrics import summarize @@ -67,6 +68,24 @@ async def metrics(msg: Message): await msg.answer(summarize(state.METRICS_STORE, minutes=15), reply_markup=system_kb) +@dp.message(F.text == "πŸ”’ SSL") +async def ssl_certs(msg: Message): + if not is_admin_msg(msg): + return + + await msg.answer("⏳ Checking SSL certificates…", reply_markup=system_kb) + + async def worker(): + try: + certs = await asyncio.to_thread(fetch_certificates, cfg) + text = format_certificates(certs) + except Exception as e: + text = f"⚠️ NPMplus error: {e}" + await msg.answer(text, reply_markup=system_kb) + + asyncio.create_task(worker()) + + @dp.message(F.text == "πŸ“¦ Updates") async def updates_list(msg: Message): if not is_admin_msg(msg): diff --git a/keyboards.py b/keyboards.py index 0639a6b..b659300 100644 --- a/keyboards.py +++ b/keyboards.py @@ -55,7 +55,7 @@ artifacts_kb = ReplyKeyboardMarkup( system_kb = ReplyKeyboardMarkup( keyboard=[ [KeyboardButton(text="πŸ’½ Disks"), KeyboardButton(text="πŸ” Security")], - [KeyboardButton(text="🌐 URLs"), KeyboardButton(text="πŸ“ˆ Metrics")], + [KeyboardButton(text="🌐 URLs"), KeyboardButton(text="πŸ“ˆ Metrics"), KeyboardButton(text="πŸ”’ SSL")], [KeyboardButton(text="πŸ“¦ Updates"), KeyboardButton(text="⬆️ Upgrade")], [KeyboardButton(text="🧱 Hardware"), KeyboardButton(text="πŸ”„ Reboot"), KeyboardButton(text="⬅️ Назад")], ], diff --git a/services/npmplus.py b/services/npmplus.py new file mode 100644 index 0000000..828ae0b --- /dev/null +++ b/services/npmplus.py @@ -0,0 +1,167 @@ +import json +import ssl +from datetime import datetime, timezone, timedelta +from typing import Any +from urllib.error import HTTPError, URLError +from urllib.request import Request, urlopen + +import state + + +def _parse_expiry(value: str | None) -> datetime | None: + if not value: + return None + cleaned = value.strip() + try: + if cleaned.endswith("Z"): + return datetime.fromisoformat(cleaned.replace("Z", "+00:00")) + return datetime.fromisoformat(cleaned) + except ValueError: + for fmt in ("%Y-%m-%d %H:%M:%S", "%Y-%m-%d"): + try: + return datetime.strptime(cleaned, fmt).replace(tzinfo=timezone.utc) + except ValueError: + continue + return None + + +def _request_json( + url: str, + headers: dict[str, str], + data: dict[str, Any] | None, + verify_tls: bool, +) -> Any: + body = None + if data is not None: + body = json.dumps(data).encode("utf-8") + req = Request(url, headers=headers, data=body) + + context = None + if not verify_tls: + context = ssl._create_unverified_context() # nosec - config-controlled + + try: + with urlopen(req, timeout=10, context=context) as resp: + payload = resp.read().decode("utf-8") + except HTTPError as e: + raise RuntimeError(f"HTTP {e.code}") from e + except URLError as e: + raise RuntimeError(str(e.reason)) from e + + return json.loads(payload) + + +def _get_token(cfg: dict[str, Any]) -> str: + npm_cfg = cfg.get("npmplus", {}) + base_url = (npm_cfg.get("base_url") or "").rstrip("/") + identity = npm_cfg.get("identity") + secret = npm_cfg.get("secret") + static_token = npm_cfg.get("token") + verify_tls = npm_cfg.get("verify_tls", True) + + now = datetime.now(timezone.utc) + cached = state.NPMPLUS_TOKEN + token = cached.get("token") + expires_at = cached.get("expires_at") + if token and isinstance(expires_at, datetime) and expires_at - now > timedelta(minutes=1): + return token + + if token and base_url: + url = f"{base_url}/tokens" + headers = { + "Authorization": f"Bearer {token}", + "User-Agent": "tg-admin-bot", + } + try: + payload = _request_json(url, headers, None, verify_tls) + new_token = payload.get("token") + if new_token: + expires = _parse_expiry(payload.get("expires")) + if expires is None: + expires = now + timedelta(hours=1) + state.NPMPLUS_TOKEN = {"token": new_token, "expires_at": expires} + return new_token + except Exception: + pass + + if identity and secret and base_url: + url = f"{base_url}/tokens" + headers = { + "Content-Type": "application/json", + "User-Agent": "tg-admin-bot", + } + data = {"identity": identity, "secret": secret} + payload = _request_json(url, headers, data, verify_tls) + token = payload.get("token") + if not token: + raise RuntimeError("Token not returned") + expires_at = _parse_expiry(payload.get("expires")) + if expires_at is None: + expires_at = now + timedelta(hours=1) + state.NPMPLUS_TOKEN = {"token": token, "expires_at": expires_at} + return token + + if static_token: + return static_token + + raise ValueError("NPMplus identity/secret or token not configured") + + +def fetch_certificates(cfg: dict[str, Any]) -> list[dict[str, Any]]: + npm_cfg = cfg.get("npmplus", {}) + base_url = (npm_cfg.get("base_url") or "").rstrip("/") + verify_tls = npm_cfg.get("verify_tls", True) + + if not base_url: + raise ValueError("NPMplus base_url not configured") + + token = _get_token(cfg) + url = f"{base_url}/nginx/certificates" + headers = { + "Authorization": f"Bearer {token}", + "User-Agent": "tg-admin-bot", + } + + data = _request_json(url, headers, None, verify_tls) + if not isinstance(data, list): + raise RuntimeError("Unexpected API response") + return data + + +def format_certificates(certs: list[dict[str, Any]]) -> str: + if not certs: + return "πŸ”’ SSL certificates\n\nℹ️ No certificates found" + + now = datetime.now(timezone.utc) + items: list[tuple[datetime | None, str]] = [] + + for cert in certs: + name = cert.get("nice_name") + if not name: + domains = cert.get("domain_names") or [] + if isinstance(domains, list): + name = ", ".join(domains) + if not name: + name = "unknown" + + expiry = _parse_expiry(cert.get("expires_on")) + if expiry is None: + items.append((expiry, f"⚠️ {name}: unknown expiry")) + continue + + days = (expiry - now).days + date_str = expiry.astimezone(timezone.utc).strftime("%Y-%m-%d") + if days < 0: + line = f"πŸ”΄ {name}: expired {-days}d ago ({date_str})" + elif days <= 7: + line = f"🟠 {name}: {days}d ({date_str})" + elif days <= 30: + line = f"🟑 {name}: {days}d ({date_str})" + else: + line = f"🟒 {name}: {days}d ({date_str})" + items.append((expiry, line)) + + items.sort(key=lambda item: item[0] or datetime.max.replace(tzinfo=timezone.utc)) + lines = ["πŸ”’ SSL certificates\n"] + lines.extend(line for _, line in items) + return "\n".join(lines) diff --git a/state.py b/state.py index f057b7a..37b4865 100644 --- a/state.py +++ b/state.py @@ -6,3 +6,4 @@ UPDATES_CACHE: Dict[int, dict] = {} ARCANE_CACHE: Dict[int, dict] = {} REBOOT_PENDING: Dict[int, dict] = {} METRICS_STORE = None +NPMPLUS_TOKEN: Dict[str, object] = {}