Files
tg-admin-bot/services/openwrt.py

463 lines
16 KiB
Python

import json
from typing import Any
from services.runner import run_cmd, run_cmd_full
def _format_uptime(seconds: int | float | None) -> str:
if seconds is None:
return "unknown"
total = int(seconds)
days, rem = divmod(total, 86400)
hours, rem = divmod(rem, 3600)
minutes, _ = divmod(rem, 60)
if days > 0:
return f"{days}d {hours:02d}:{minutes:02d}"
return f"{hours:02d}:{minutes:02d}"
def _format_load(load: list[Any] | None) -> str:
if not load or not isinstance(load, list):
return "unknown"
values = []
for raw in load[:3]:
try:
values.append(float(raw))
except (TypeError, ValueError):
values.append(0.0)
scale = 1.0
if values and max(values) > 1000:
scale = 1 / 65536.0
return " ".join(f"{val * scale:.2f}" for val in values)
def _format_rate(rate: Any) -> str:
try:
val = float(rate)
except (TypeError, ValueError):
return "?"
if val <= 0:
return "?"
if val >= 1_000_000:
return f"{val / 1_000_000:.1f}M"
if val >= 1_000:
return f"{val / 1_000:.1f}K"
return f"{val:.0f}b"
def _extract_wan_ip(wan: dict[str, Any]) -> str | None:
if not isinstance(wan, dict):
return None
addrs = wan.get("ipv4-address") or []
if isinstance(addrs, list):
for item in addrs:
if isinstance(item, dict):
ip = item.get("address")
if ip:
return str(ip)
return None
def _extract_wifi_clients(wireless: dict[str, Any]) -> list[str]:
clients: list[str] = []
if not isinstance(wireless, dict):
return clients
for radio in wireless.values():
if not isinstance(radio, dict):
continue
for iface in radio.get("interfaces", []) or []:
if not isinstance(iface, dict):
continue
ifname = iface.get("ifname") or "wifi"
assoclist = iface.get("assoclist")
stations = iface.get("stations")
if isinstance(assoclist, dict):
for mac, meta in assoclist.items():
if not isinstance(meta, dict):
continue
signal = meta.get("signal")
rx = _format_rate((meta.get("rx") or {}).get("rate"))
tx = _format_rate((meta.get("tx") or {}).get("rate"))
sig = f"{signal}dBm" if isinstance(signal, (int, float)) else "?"
clients.append(f"{ifname} {mac} {sig} rx:{rx} tx:{tx}")
elif isinstance(stations, list):
for meta in stations:
if not isinstance(meta, dict):
continue
mac = meta.get("mac") or "?"
signal = meta.get("signal")
rx = _format_rate((meta.get("rx") or {}).get("rate"))
tx = _format_rate((meta.get("tx") or {}).get("rate"))
sig = f"{signal}dBm" if isinstance(signal, (int, float)) else "?"
clients.append(f"{ifname} {mac} {sig} rx:{rx} tx:{tx}")
return clients
def _extract_leases(leases: dict[str, Any]) -> list[str]:
items = None
if isinstance(leases, dict):
items = leases.get("leases") or leases.get("dhcp_leases") or leases.get("ipv4_leases")
elif isinstance(leases, list):
items = leases
if not isinstance(items, list):
return []
out = []
for lease in items:
if not isinstance(lease, dict):
continue
ipaddr = lease.get("ipaddr") or "?"
host = lease.get("hostname") or "unknown"
mac = lease.get("macaddr") or "?"
out.append(f"{ipaddr} {host} ({mac})")
return out
def _extract_lease_name_map(leases: Any) -> dict[str, str]:
items = None
if isinstance(leases, dict):
items = leases.get("leases") or leases.get("dhcp_leases") or leases.get("ipv4_leases")
elif isinstance(leases, list):
items = leases
if not isinstance(items, list):
return {}
out: dict[str, str] = {}
for lease in items:
if not isinstance(lease, dict):
continue
mac = lease.get("macaddr")
if not mac:
continue
host = lease.get("hostname") or "unknown"
out[str(mac).lower()] = str(host)
return out
def _extract_lease_name_map_fallback(raw: str) -> dict[str, str]:
out: dict[str, str] = {}
for line in raw.splitlines():
parts = line.strip().split()
if len(parts) < 4:
continue
_expiry, mac, _ipaddr, host = parts[:4]
host = host if host != "*" else "unknown"
out[str(mac).lower()] = str(host)
return out
def _extract_ifnames(wireless: dict[str, Any]) -> list[str]:
ifnames: list[str] = []
if not isinstance(wireless, dict):
return ifnames
for radio in wireless.values():
if not isinstance(radio, dict):
continue
for iface in radio.get("interfaces", []) or []:
if not isinstance(iface, dict):
continue
ifname = iface.get("ifname")
if ifname:
ifnames.append(str(ifname))
return ifnames
def _extract_ifname_meta(wireless: dict[str, Any]) -> dict[str, dict[str, str]]:
meta: dict[str, dict[str, str]] = {}
if not isinstance(wireless, dict):
return meta
for radio in wireless.values():
if not isinstance(radio, dict):
continue
band = None
cfg = radio.get("config") or {}
if isinstance(cfg, dict):
band = cfg.get("band")
band_label = None
if band == "2g":
band_label = "2.4GHz"
elif band == "5g":
band_label = "5GHz"
elif band:
band_label = str(band)
for iface in radio.get("interfaces", []) or []:
if not isinstance(iface, dict):
continue
ifname = iface.get("ifname")
if not ifname:
continue
iface_cfg = iface.get("config") or {}
ssid = None
if isinstance(iface_cfg, dict):
ssid = iface_cfg.get("ssid")
meta[str(ifname)] = {
"ssid": str(ssid) if ssid else "",
"band": band_label or "",
}
return meta
def _extract_hostapd_ifnames(raw: str) -> list[str]:
ifnames: list[str] = []
for line in raw.splitlines():
name = line.strip()
if not name or name == "hostapd":
continue
ifnames.append(name)
return ifnames
def _net_label_for_ifname(ifname: str, ifname_meta: dict[str, dict[str, str]]) -> str:
meta = ifname_meta.get(ifname, {})
ssid = meta.get("ssid") or ""
band = meta.get("band") or ""
if ssid and band:
return f"{ssid} ({band})"
if ssid:
return ssid
if band:
return band
return ifname
def _safe_json_load(raw: str) -> Any | None:
if not raw:
return None
try:
return json.loads(raw)
except json.JSONDecodeError:
start = raw.find("{")
end = raw.rfind("}")
if start == -1 or end == -1 or end <= start:
return None
try:
return json.loads(raw[start : end + 1])
except json.JSONDecodeError:
return None
def _parse_hostapd_clients(
payload: Any,
ifname: str,
*,
name_map: dict[str, str] | None = None,
ifname_meta: dict[str, dict[str, str]] | None = None,
) -> list[tuple[str, int | None, str]]:
if not isinstance(payload, dict):
return []
data = payload.get("clients")
if isinstance(data, dict):
items = data.items()
else:
return []
clients: list[tuple[str, int | None, str]] = []
name_map = name_map or {}
meta = (ifname_meta or {}).get(ifname, {})
ssid = meta.get("ssid") or ""
band = meta.get("band") or ""
if ssid and band:
net_label = f"{ssid} ({band})"
elif ssid:
net_label = ssid
elif band:
net_label = band
else:
net_label = ifname
for mac, meta in items:
if not isinstance(meta, dict):
continue
signal = meta.get("signal")
rate = meta.get("rate") or {}
rx = _format_rate((rate or {}).get("rx"))
tx = _format_rate((rate or {}).get("tx"))
sig = f"{signal}dBm" if isinstance(signal, (int, float)) else "?"
host = name_map.get(str(mac).lower())
if host and host != "unknown":
client_label = host
else:
client_label = str(mac)
line = f"{net_label} {client_label} {sig} rx:{rx} tx:{tx}"
clients.append((line, signal if isinstance(signal, (int, float)) else None, net_label))
return clients
def _parse_proc_fallback(raw: str) -> tuple[int | None, list[float] | None]:
uptime = None
load = None
for line in raw.splitlines():
parts = line.split()
if len(parts) >= 2 and uptime is None:
try:
uptime = int(float(parts[0]))
except ValueError:
uptime = None
if len(parts) >= 3 and load is None:
try:
load = [float(parts[0]), float(parts[1]), float(parts[2])]
except ValueError:
load = None
return uptime, load
def _parse_leases_fallback(raw: str) -> list[str]:
out = []
for line in raw.splitlines():
parts = line.strip().split()
if len(parts) < 4:
continue
_expiry, mac, ipaddr, host = parts[:4]
host = host if host != "*" else "unknown"
out.append(f"{ipaddr} {host} ({mac})")
return out
async def get_openwrt_status(cfg: dict[str, Any], mode: str = "full") -> str:
ow_cfg = cfg.get("openwrt", {})
host = ow_cfg.get("host")
user = ow_cfg.get("user", "root")
port = ow_cfg.get("port", 22)
identity_file = ow_cfg.get("identity_file")
timeout_sec = ow_cfg.get("timeout_sec", 8)
strict = ow_cfg.get("strict_host_key_checking", True)
if not host:
return "⚠️ OpenWrt host not configured"
ssh_cmd = [
"ssh",
"-o",
"BatchMode=yes",
"-o",
f"ConnectTimeout={timeout_sec}",
"-o",
"LogLevel=ERROR",
]
if not strict:
ssh_cmd += ["-o", "StrictHostKeyChecking=no", "-o", "UserKnownHostsFile=/dev/null"]
if identity_file:
ssh_cmd += ["-i", str(identity_file)]
ssh_cmd += ["-p", str(port), f"{user}@{host}"]
remote = (
"ubus call system info 2>/dev/null || (cat /proc/uptime; echo; cat /proc/loadavg); "
"echo __SEP__;"
"ubus call network.interface.wan status 2>/dev/null; echo __SEP__;"
"ubus call network.wireless status 2>/dev/null; echo __SEP__;"
"ubus call luci-rpc getDHCPLeases '{\"family\":4}' 2>/dev/null || cat /tmp/dhcp.leases"
)
cmd = ssh_cmd + ["sh", "-c", remote]
rc, out = await run_cmd_full(cmd, timeout=timeout_sec + 15)
if rc == 124:
return "⚠️ OpenWrt SSH error: timeout"
if rc != 0:
return f"⚠️ OpenWrt SSH error: {out.strip() or 'unknown error'}"
parts = [p.strip() for p in out.split("__SEP__")]
if len(parts) < 4:
return "⚠️ OpenWrt response incomplete"
sys_info = _safe_json_load(parts[0])
wan_status = _safe_json_load(parts[1]) or {}
wireless = _safe_json_load(parts[2]) or {}
leases = _safe_json_load(parts[3])
leases_fallback = "" if leases is not None else parts[3]
if isinstance(sys_info, dict):
uptime_raw = sys_info.get("uptime")
load_raw = sys_info.get("load")
else:
uptime_raw, load_raw = _parse_proc_fallback(parts[0])
uptime = _format_uptime(uptime_raw)
load = _format_load(load_raw)
wan_ip = _extract_wan_ip(wan_status) or "unknown"
wan_up = wan_status.get("up") if isinstance(wan_status, dict) else None
wan_state = "up" if wan_up else "down"
wifi_clients = _extract_wifi_clients(wireless)
ifnames = _extract_ifnames(wireless)
ifname_meta = _extract_ifname_meta(wireless)
rc_l, out_l = await run_cmd_full(
ssh_cmd + ["sh", "-c", r"ubus -S list | awk -F. '/^hostapd\.phy/{print $2}'"],
timeout=timeout_sec + 15,
)
if rc_l == 0 and out_l.strip():
ifnames.extend(_extract_hostapd_ifnames(out_l))
ifnames = sorted({name for name in ifnames if name})
lease_name_map = _extract_lease_name_map(leases or {})
if leases_fallback:
lease_name_map.update(_extract_lease_name_map_fallback(leases_fallback))
wifi_net_counts: dict[str, int] = {}
wifi_signals: dict[str, list[int]] = {}
if ifnames:
for ifname in ifnames:
cmd_clients = ssh_cmd + ["ubus", "call", f"hostapd.{ifname}", "get_clients"]
rc2, out2 = await run_cmd_full(cmd_clients, timeout=timeout_sec + 15)
if rc2 == 124:
return f"⚠️ OpenWrt SSH error (wifi clients {ifname}): timeout"
if rc2 == 0:
payload = _safe_json_load(out2)
if payload:
clients_payload = payload.get("clients") if isinstance(payload, dict) else None
if isinstance(clients_payload, dict):
label = _net_label_for_ifname(ifname, ifname_meta)
wifi_net_counts[label] = wifi_net_counts.get(label, 0) + len(clients_payload)
parsed = _parse_hostapd_clients(
payload,
ifname,
name_map=lease_name_map,
ifname_meta=ifname_meta,
)
wifi_clients.extend([p[0] for p in parsed])
for _line, sig, net_label in parsed:
if sig is not None and net_label:
wifi_signals.setdefault(net_label, []).append(sig)
if leases:
leases_list = _extract_leases(leases)
else:
leases_list = _parse_leases_fallback(leases_fallback)
header = [
"📡 OpenWrt",
f"🕒 Uptime: {uptime}",
f"⚙️ Load: {load}",
f"🌐 WAN: {wan_ip} ({wan_state})",
"",
]
wifi_section: list[str] = []
if wifi_net_counts:
wifi_section.append("📶 Wi-Fi networks:")
for label, count in sorted(wifi_net_counts.items()):
sigs = wifi_signals.get(label) or []
if sigs:
avg_sig = sum(sigs) / len(sigs)
min_sig = min(sigs)
wifi_section.append(f" - {label}: {count} (avg {avg_sig:.0f}dBm, min {min_sig}dBm)")
else:
wifi_section.append(f" - {label}: {count}")
wifi_section.append("")
wifi_section.append(f"📶 Wi-Fi clients: {len(wifi_clients)}")
if wifi_clients:
for line in wifi_clients[:20]:
wifi_section.append(f" - {line}")
if len(wifi_clients) > 20:
wifi_section.append(f" … and {len(wifi_clients) - 20} more")
else:
wifi_section.append(" (none)")
lease_section: list[str] = ["", f"🧾 DHCP leases: {len(leases_list)}"]
if leases_list:
for line in leases_list[:20]:
lease_section.append(f" - {line}")
if len(leases_list) > 20:
lease_section.append(f" … and {len(leases_list) - 20} more")
else:
lease_section.append(" (none)")
if mode == "wan":
return "\n".join(header)
if mode == "clients":
return "\n".join(header + wifi_section)
if mode == "leases":
return "\n".join(header + lease_section)
return "\n".join(header + wifi_section + lease_section)