Files
tg-admin-bot/services/npmplus.py

235 lines
7.4 KiB
Python
Raw Permalink Blame History

This file contains invisible Unicode characters
This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import json
import ssl
from datetime import datetime, timezone, timedelta
from typing import Any
from urllib.error import HTTPError, URLError
from urllib.request import Request, urlopen
import state
def _parse_expiry(value: str | None) -> datetime | None:
if not value:
return None
cleaned = value.strip()
try:
if cleaned.endswith("Z"):
dt = datetime.fromisoformat(cleaned.replace("Z", "+00:00"))
else:
dt = datetime.fromisoformat(cleaned)
if dt.tzinfo is None:
dt = dt.replace(tzinfo=timezone.utc)
return dt
except ValueError:
for fmt in ("%Y-%m-%d %H:%M:%S", "%Y-%m-%d"):
try:
return datetime.strptime(cleaned, fmt).replace(tzinfo=timezone.utc)
except ValueError:
continue
return None
def _request_json(
url: str,
headers: dict[str, str],
data: dict[str, Any] | None,
verify_tls: bool,
method: str | None = None,
) -> Any:
body = None
if data is not None:
body = json.dumps(data).encode("utf-8")
req = Request(url, headers=headers, data=body, method=method)
context = None
if not verify_tls:
context = ssl._create_unverified_context() # nosec - config-controlled
try:
with urlopen(req, timeout=10, context=context) as resp:
payload = resp.read().decode("utf-8")
except HTTPError as e:
detail = f"HTTP {e.code}"
try:
payload = e.read().decode("utf-8").strip()
except Exception:
payload = ""
if payload:
payload = " ".join(payload.split())
if len(payload) > 300:
payload = payload[:300] + "..."
detail = f"{detail}: {payload}"
raise RuntimeError(f"{detail} ({url})") from e
except URLError as e:
raise RuntimeError(str(e.reason)) from e
return json.loads(payload)
def _api_base(cfg: dict[str, Any]) -> str:
npm_cfg = cfg.get("npmplus", {})
base = (npm_cfg.get("base_url") or "").rstrip("/")
if not base:
return ""
if not base.endswith("/api"):
base = f"{base}/api"
return base
def _get_token(cfg: dict[str, Any]) -> str:
npm_cfg = cfg.get("npmplus", {})
base_url = _api_base(cfg)
identity = npm_cfg.get("identity")
secret = npm_cfg.get("secret")
static_token = npm_cfg.get("token")
verify_tls = npm_cfg.get("verify_tls", True)
now = datetime.now(timezone.utc)
cached = state.NPMPLUS_TOKEN
token = cached.get("token")
expires_at = cached.get("expires_at")
if token and isinstance(expires_at, datetime) and expires_at - now > timedelta(minutes=1):
return token
if token and base_url:
url = f"{base_url}/tokens"
headers = {
"Authorization": f"Bearer {token}",
"User-Agent": "tg-admin-bot",
}
try:
payload = _request_json(url, headers, None, verify_tls)
new_token = payload.get("token")
if new_token:
expires = _parse_expiry(payload.get("expires"))
if expires is None:
expires = now + timedelta(hours=1)
state.NPMPLUS_TOKEN = {"token": new_token, "expires_at": expires}
return new_token
except Exception:
pass
if identity and secret and base_url:
url = f"{base_url}/tokens"
headers = {
"Content-Type": "application/json",
"User-Agent": "tg-admin-bot",
}
data = {"identity": identity, "secret": secret}
payload = _request_json(url, headers, data, verify_tls)
token = payload.get("token")
if not token:
raise RuntimeError("Token not returned")
expires_at = _parse_expiry(payload.get("expires"))
if expires_at is None:
expires_at = now + timedelta(hours=1)
state.NPMPLUS_TOKEN = {"token": token, "expires_at": expires_at}
return token
if static_token:
return static_token
raise ValueError("NPMplus identity/secret or token not configured")
def fetch_certificates(cfg: dict[str, Any]) -> list[dict[str, Any]]:
npm_cfg = cfg.get("npmplus", {})
base_url = _api_base(cfg)
verify_tls = npm_cfg.get("verify_tls", True)
if not base_url:
raise ValueError("NPMplus base_url not configured")
token = _get_token(cfg)
url = f"{base_url}/nginx/certificates"
headers = {
"Authorization": f"Bearer {token}",
"User-Agent": "tg-admin-bot",
}
data = _request_json(url, headers, None, verify_tls)
if not isinstance(data, list):
raise RuntimeError("Unexpected API response")
return data
def list_proxy_hosts(cfg: dict[str, Any]) -> list[dict[str, Any]]:
npm_cfg = cfg.get("npmplus", {})
base_url = _api_base(cfg)
verify_tls = npm_cfg.get("verify_tls", True)
if not base_url:
raise ValueError("NPMplus base_url not configured")
token = _get_token(cfg)
url = f"{base_url}/nginx/proxy-hosts"
headers = {
"Authorization": f"Bearer {token}",
"User-Agent": "tg-admin-bot",
}
data = _request_json(url, headers, None, verify_tls)
if not isinstance(data, list):
raise RuntimeError("Unexpected API response")
return data
def set_proxy_host(cfg: dict[str, Any], host_id: int, enabled: bool) -> tuple[bool, str]:
npm_cfg = cfg.get("npmplus", {})
base_url = _api_base(cfg)
verify_tls = npm_cfg.get("verify_tls", True)
if not base_url:
return False, "NPMplus base_url not configured"
token = _get_token(cfg)
action = "enable" if enabled else "disable"
url = f"{base_url}/nginx/proxy-hosts/{host_id}/{action}"
headers = {
"Authorization": f"Bearer {token}",
"User-Agent": "tg-admin-bot",
}
try:
payload = _request_json(url, headers, None, verify_tls, method="POST")
except Exception as e:
return False, str(e)
if payload is True or (isinstance(payload, dict) and payload.get("success", True)):
return True, "OK"
return False, "API returned error"
def format_certificates(certs: list[dict[str, Any]]) -> str:
if not certs:
return "🔒 SSL certificates\n\n No certificates found"
now = datetime.now(timezone.utc)
items: list[tuple[datetime | None, str]] = []
for cert in certs:
name = cert.get("nice_name")
if not name:
domains = cert.get("domain_names") or []
if isinstance(domains, list):
name = ", ".join(domains)
if not name:
name = "unknown"
expiry = _parse_expiry(cert.get("expires_on"))
if expiry is None:
items.append((expiry, f"⚠️ {name}: unknown expiry"))
continue
days = (expiry - now).days
date_str = expiry.astimezone(timezone.utc).strftime("%Y-%m-%d")
if days < 0:
line = f"🔴 {name}: expired {-days}d ago ({date_str})"
elif days <= 7:
line = f"🟠 {name}: {days}d ({date_str})"
elif days <= 30:
line = f"🟡 {name}: {days}d ({date_str})"
else:
line = f"🟢 {name}: {days}d ({date_str})"
items.append((expiry, line))
items.sort(key=lambda item: item[0] or datetime.max.replace(tzinfo=timezone.utc))
lines = ["🔒 SSL certificates\n"]
lines.extend(line for _, line in items)
return "\n".join(lines)