Files
tg-admin-bot/services/npmplus.py

172 lines
5.4 KiB
Python
Raw Blame History

This file contains invisible Unicode characters
This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import json
import ssl
from datetime import datetime, timezone, timedelta
from typing import Any
from urllib.error import HTTPError, URLError
from urllib.request import Request, urlopen
import state
def _parse_expiry(value: str | None) -> datetime | None:
if not value:
return None
cleaned = value.strip()
try:
if cleaned.endswith("Z"):
dt = datetime.fromisoformat(cleaned.replace("Z", "+00:00"))
else:
dt = datetime.fromisoformat(cleaned)
if dt.tzinfo is None:
dt = dt.replace(tzinfo=timezone.utc)
return dt
except ValueError:
for fmt in ("%Y-%m-%d %H:%M:%S", "%Y-%m-%d"):
try:
return datetime.strptime(cleaned, fmt).replace(tzinfo=timezone.utc)
except ValueError:
continue
return None
def _request_json(
url: str,
headers: dict[str, str],
data: dict[str, Any] | None,
verify_tls: bool,
) -> Any:
body = None
if data is not None:
body = json.dumps(data).encode("utf-8")
req = Request(url, headers=headers, data=body)
context = None
if not verify_tls:
context = ssl._create_unverified_context() # nosec - config-controlled
try:
with urlopen(req, timeout=10, context=context) as resp:
payload = resp.read().decode("utf-8")
except HTTPError as e:
raise RuntimeError(f"HTTP {e.code}") from e
except URLError as e:
raise RuntimeError(str(e.reason)) from e
return json.loads(payload)
def _get_token(cfg: dict[str, Any]) -> str:
npm_cfg = cfg.get("npmplus", {})
base_url = (npm_cfg.get("base_url") or "").rstrip("/")
identity = npm_cfg.get("identity")
secret = npm_cfg.get("secret")
static_token = npm_cfg.get("token")
verify_tls = npm_cfg.get("verify_tls", True)
now = datetime.now(timezone.utc)
cached = state.NPMPLUS_TOKEN
token = cached.get("token")
expires_at = cached.get("expires_at")
if token and isinstance(expires_at, datetime) and expires_at - now > timedelta(minutes=1):
return token
if token and base_url:
url = f"{base_url}/tokens"
headers = {
"Authorization": f"Bearer {token}",
"User-Agent": "tg-admin-bot",
}
try:
payload = _request_json(url, headers, None, verify_tls)
new_token = payload.get("token")
if new_token:
expires = _parse_expiry(payload.get("expires"))
if expires is None:
expires = now + timedelta(hours=1)
state.NPMPLUS_TOKEN = {"token": new_token, "expires_at": expires}
return new_token
except Exception:
pass
if identity and secret and base_url:
url = f"{base_url}/tokens"
headers = {
"Content-Type": "application/json",
"User-Agent": "tg-admin-bot",
}
data = {"identity": identity, "secret": secret}
payload = _request_json(url, headers, data, verify_tls)
token = payload.get("token")
if not token:
raise RuntimeError("Token not returned")
expires_at = _parse_expiry(payload.get("expires"))
if expires_at is None:
expires_at = now + timedelta(hours=1)
state.NPMPLUS_TOKEN = {"token": token, "expires_at": expires_at}
return token
if static_token:
return static_token
raise ValueError("NPMplus identity/secret or token not configured")
def fetch_certificates(cfg: dict[str, Any]) -> list[dict[str, Any]]:
npm_cfg = cfg.get("npmplus", {})
base_url = (npm_cfg.get("base_url") or "").rstrip("/")
verify_tls = npm_cfg.get("verify_tls", True)
if not base_url:
raise ValueError("NPMplus base_url not configured")
token = _get_token(cfg)
url = f"{base_url}/nginx/certificates"
headers = {
"Authorization": f"Bearer {token}",
"User-Agent": "tg-admin-bot",
}
data = _request_json(url, headers, None, verify_tls)
if not isinstance(data, list):
raise RuntimeError("Unexpected API response")
return data
def format_certificates(certs: list[dict[str, Any]]) -> str:
if not certs:
return "🔒 SSL certificates\n\n No certificates found"
now = datetime.now(timezone.utc)
items: list[tuple[datetime | None, str]] = []
for cert in certs:
name = cert.get("nice_name")
if not name:
domains = cert.get("domain_names") or []
if isinstance(domains, list):
name = ", ".join(domains)
if not name:
name = "unknown"
expiry = _parse_expiry(cert.get("expires_on"))
if expiry is None:
items.append((expiry, f"⚠️ {name}: unknown expiry"))
continue
days = (expiry - now).days
date_str = expiry.astimezone(timezone.utc).strftime("%Y-%m-%d")
if days < 0:
line = f"🔴 {name}: expired {-days}d ago ({date_str})"
elif days <= 7:
line = f"🟠 {name}: {days}d ({date_str})"
elif days <= 30:
line = f"🟡 {name}: {days}d ({date_str})"
else:
line = f"🟢 {name}: {days}d ({date_str})"
items.append((expiry, line))
items.sort(key=lambda item: item[0] or datetime.max.replace(tzinfo=timezone.utc))
lines = ["🔒 SSL certificates\n"]
lines.extend(line for _, line in items)
return "\n".join(lines)