import json import ssl from datetime import datetime, timezone, timedelta from typing import Any from urllib.error import HTTPError, URLError from urllib.request import Request, urlopen import state def _parse_expiry(value: str | None) -> datetime | None: if not value: return None cleaned = value.strip() try: if cleaned.endswith("Z"): dt = datetime.fromisoformat(cleaned.replace("Z", "+00:00")) else: dt = datetime.fromisoformat(cleaned) if dt.tzinfo is None: dt = dt.replace(tzinfo=timezone.utc) return dt except ValueError: for fmt in ("%Y-%m-%d %H:%M:%S", "%Y-%m-%d"): try: return datetime.strptime(cleaned, fmt).replace(tzinfo=timezone.utc) except ValueError: continue return None def _request_json( url: str, headers: dict[str, str], data: dict[str, Any] | None, verify_tls: bool, ) -> Any: body = None if data is not None: body = json.dumps(data).encode("utf-8") req = Request(url, headers=headers, data=body) context = None if not verify_tls: context = ssl._create_unverified_context() # nosec - config-controlled try: with urlopen(req, timeout=10, context=context) as resp: payload = resp.read().decode("utf-8") except HTTPError as e: raise RuntimeError(f"HTTP {e.code}") from e except URLError as e: raise RuntimeError(str(e.reason)) from e return json.loads(payload) def _get_token(cfg: dict[str, Any]) -> str: npm_cfg = cfg.get("npmplus", {}) base_url = (npm_cfg.get("base_url") or "").rstrip("/") identity = npm_cfg.get("identity") secret = npm_cfg.get("secret") static_token = npm_cfg.get("token") verify_tls = npm_cfg.get("verify_tls", True) now = datetime.now(timezone.utc) cached = state.NPMPLUS_TOKEN token = cached.get("token") expires_at = cached.get("expires_at") if token and isinstance(expires_at, datetime) and expires_at - now > timedelta(minutes=1): return token if token and base_url: url = f"{base_url}/tokens" headers = { "Authorization": f"Bearer {token}", "User-Agent": "tg-admin-bot", } try: payload = _request_json(url, headers, None, verify_tls) new_token = payload.get("token") if new_token: expires = _parse_expiry(payload.get("expires")) if expires is None: expires = now + timedelta(hours=1) state.NPMPLUS_TOKEN = {"token": new_token, "expires_at": expires} return new_token except Exception: pass if identity and secret and base_url: url = f"{base_url}/tokens" headers = { "Content-Type": "application/json", "User-Agent": "tg-admin-bot", } data = {"identity": identity, "secret": secret} payload = _request_json(url, headers, data, verify_tls) token = payload.get("token") if not token: raise RuntimeError("Token not returned") expires_at = _parse_expiry(payload.get("expires")) if expires_at is None: expires_at = now + timedelta(hours=1) state.NPMPLUS_TOKEN = {"token": token, "expires_at": expires_at} return token if static_token: return static_token raise ValueError("NPMplus identity/secret or token not configured") def fetch_certificates(cfg: dict[str, Any]) -> list[dict[str, Any]]: npm_cfg = cfg.get("npmplus", {}) base_url = (npm_cfg.get("base_url") or "").rstrip("/") verify_tls = npm_cfg.get("verify_tls", True) if not base_url: raise ValueError("NPMplus base_url not configured") token = _get_token(cfg) url = f"{base_url}/nginx/certificates" headers = { "Authorization": f"Bearer {token}", "User-Agent": "tg-admin-bot", } data = _request_json(url, headers, None, verify_tls) if not isinstance(data, list): raise RuntimeError("Unexpected API response") return data def format_certificates(certs: list[dict[str, Any]]) -> str: if not certs: return "šŸ”’ SSL certificates\n\nā„¹ļø No certificates found" now = datetime.now(timezone.utc) items: list[tuple[datetime | None, str]] = [] for cert in certs: name = cert.get("nice_name") if not name: domains = cert.get("domain_names") or [] if isinstance(domains, list): name = ", ".join(domains) if not name: name = "unknown" expiry = _parse_expiry(cert.get("expires_on")) if expiry is None: items.append((expiry, f"āš ļø {name}: unknown expiry")) continue days = (expiry - now).days date_str = expiry.astimezone(timezone.utc).strftime("%Y-%m-%d") if days < 0: line = f"šŸ”“ {name}: expired {-days}d ago ({date_str})" elif days <= 7: line = f"🟠 {name}: {days}d ({date_str})" elif days <= 30: line = f"🟔 {name}: {days}d ({date_str})" else: line = f"🟢 {name}: {days}d ({date_str})" items.append((expiry, line)) items.sort(key=lambda item: item[0] or datetime.max.replace(tzinfo=timezone.utc)) lines = ["šŸ”’ SSL certificates\n"] lines.extend(line for _, line in items) return "\n".join(lines)