454 lines
15 KiB
Python
454 lines
15 KiB
Python
import json
|
|
from typing import Any
|
|
|
|
from services.runner import run_cmd, run_cmd_full
|
|
|
|
|
|
def _format_uptime(seconds: int | float | None) -> str:
|
|
if seconds is None:
|
|
return "unknown"
|
|
total = int(seconds)
|
|
days, rem = divmod(total, 86400)
|
|
hours, rem = divmod(rem, 3600)
|
|
minutes, _ = divmod(rem, 60)
|
|
if days > 0:
|
|
return f"{days}d {hours:02d}:{minutes:02d}"
|
|
return f"{hours:02d}:{minutes:02d}"
|
|
|
|
|
|
def _format_load(load: list[Any] | None) -> str:
|
|
if not load or not isinstance(load, list):
|
|
return "unknown"
|
|
values = []
|
|
for raw in load[:3]:
|
|
try:
|
|
values.append(float(raw))
|
|
except (TypeError, ValueError):
|
|
values.append(0.0)
|
|
scale = 1.0
|
|
if values and max(values) > 1000:
|
|
scale = 1 / 65536.0
|
|
return " ".join(f"{val * scale:.2f}" for val in values)
|
|
|
|
|
|
def _format_rate(rate: Any) -> str:
|
|
try:
|
|
val = float(rate)
|
|
except (TypeError, ValueError):
|
|
return "?"
|
|
if val <= 0:
|
|
return "?"
|
|
if val >= 1_000_000:
|
|
return f"{val / 1_000_000:.1f}M"
|
|
if val >= 1_000:
|
|
return f"{val / 1_000:.1f}K"
|
|
return f"{val:.0f}b"
|
|
|
|
|
|
def _extract_wan_ip(wan: dict[str, Any]) -> str | None:
|
|
if not isinstance(wan, dict):
|
|
return None
|
|
addrs = wan.get("ipv4-address") or []
|
|
if isinstance(addrs, list):
|
|
for item in addrs:
|
|
if isinstance(item, dict):
|
|
ip = item.get("address")
|
|
if ip:
|
|
return str(ip)
|
|
return None
|
|
|
|
|
|
def _extract_wifi_clients(wireless: dict[str, Any]) -> list[str]:
|
|
clients: list[str] = []
|
|
if not isinstance(wireless, dict):
|
|
return clients
|
|
for radio in wireless.values():
|
|
if not isinstance(radio, dict):
|
|
continue
|
|
for iface in radio.get("interfaces", []) or []:
|
|
if not isinstance(iface, dict):
|
|
continue
|
|
ifname = iface.get("ifname") or "wifi"
|
|
assoclist = iface.get("assoclist")
|
|
stations = iface.get("stations")
|
|
if isinstance(assoclist, dict):
|
|
for mac, meta in assoclist.items():
|
|
if not isinstance(meta, dict):
|
|
continue
|
|
signal = meta.get("signal")
|
|
rx = _format_rate((meta.get("rx") or {}).get("rate"))
|
|
tx = _format_rate((meta.get("tx") or {}).get("rate"))
|
|
sig = f"{signal}dBm" if isinstance(signal, (int, float)) else "?"
|
|
clients.append(f"{ifname} {mac} {sig} rx:{rx} tx:{tx}")
|
|
elif isinstance(stations, list):
|
|
for meta in stations:
|
|
if not isinstance(meta, dict):
|
|
continue
|
|
mac = meta.get("mac") or "?"
|
|
signal = meta.get("signal")
|
|
rx = _format_rate((meta.get("rx") or {}).get("rate"))
|
|
tx = _format_rate((meta.get("tx") or {}).get("rate"))
|
|
sig = f"{signal}dBm" if isinstance(signal, (int, float)) else "?"
|
|
clients.append(f"{ifname} {mac} {sig} rx:{rx} tx:{tx}")
|
|
return clients
|
|
|
|
|
|
def _extract_leases(leases: dict[str, Any]) -> list[str]:
|
|
items = None
|
|
if isinstance(leases, dict):
|
|
items = leases.get("leases") or leases.get("dhcp_leases") or leases.get("ipv4_leases")
|
|
elif isinstance(leases, list):
|
|
items = leases
|
|
if not isinstance(items, list):
|
|
return []
|
|
out = []
|
|
for lease in items:
|
|
if not isinstance(lease, dict):
|
|
continue
|
|
ipaddr = lease.get("ipaddr") or "?"
|
|
host = lease.get("hostname") or "unknown"
|
|
mac = lease.get("macaddr") or "?"
|
|
out.append(f"{ipaddr} {host} ({mac})")
|
|
return out
|
|
|
|
|
|
def _extract_lease_name_map(leases: Any) -> dict[str, str]:
|
|
items = None
|
|
if isinstance(leases, dict):
|
|
items = leases.get("leases") or leases.get("dhcp_leases") or leases.get("ipv4_leases")
|
|
elif isinstance(leases, list):
|
|
items = leases
|
|
if not isinstance(items, list):
|
|
return {}
|
|
out: dict[str, str] = {}
|
|
for lease in items:
|
|
if not isinstance(lease, dict):
|
|
continue
|
|
mac = lease.get("macaddr")
|
|
if not mac:
|
|
continue
|
|
host = lease.get("hostname") or "unknown"
|
|
out[str(mac).lower()] = str(host)
|
|
return out
|
|
|
|
|
|
def _extract_lease_name_map_fallback(raw: str) -> dict[str, str]:
|
|
out: dict[str, str] = {}
|
|
for line in raw.splitlines():
|
|
parts = line.strip().split()
|
|
if len(parts) < 4:
|
|
continue
|
|
_expiry, mac, _ipaddr, host = parts[:4]
|
|
host = host if host != "*" else "unknown"
|
|
out[str(mac).lower()] = str(host)
|
|
return out
|
|
|
|
|
|
def _extract_ifnames(wireless: dict[str, Any]) -> list[str]:
|
|
ifnames: list[str] = []
|
|
if not isinstance(wireless, dict):
|
|
return ifnames
|
|
for radio in wireless.values():
|
|
if not isinstance(radio, dict):
|
|
continue
|
|
for iface in radio.get("interfaces", []) or []:
|
|
if not isinstance(iface, dict):
|
|
continue
|
|
ifname = iface.get("ifname")
|
|
if ifname:
|
|
ifnames.append(str(ifname))
|
|
return ifnames
|
|
|
|
|
|
def _extract_ifname_meta(wireless: dict[str, Any]) -> dict[str, dict[str, str]]:
|
|
meta: dict[str, dict[str, str]] = {}
|
|
if not isinstance(wireless, dict):
|
|
return meta
|
|
for radio in wireless.values():
|
|
if not isinstance(radio, dict):
|
|
continue
|
|
band = None
|
|
cfg = radio.get("config") or {}
|
|
if isinstance(cfg, dict):
|
|
band = cfg.get("band")
|
|
band_label = None
|
|
if band == "2g":
|
|
band_label = "2.4GHz"
|
|
elif band == "5g":
|
|
band_label = "5GHz"
|
|
elif band:
|
|
band_label = str(band)
|
|
for iface in radio.get("interfaces", []) or []:
|
|
if not isinstance(iface, dict):
|
|
continue
|
|
ifname = iface.get("ifname")
|
|
if not ifname:
|
|
continue
|
|
iface_cfg = iface.get("config") or {}
|
|
ssid = None
|
|
if isinstance(iface_cfg, dict):
|
|
ssid = iface_cfg.get("ssid")
|
|
meta[str(ifname)] = {
|
|
"ssid": str(ssid) if ssid else "",
|
|
"band": band_label or "",
|
|
}
|
|
return meta
|
|
|
|
|
|
def _extract_hostapd_ifnames(raw: str) -> list[str]:
|
|
ifnames: list[str] = []
|
|
for line in raw.splitlines():
|
|
name = line.strip()
|
|
if not name or name == "hostapd":
|
|
continue
|
|
ifnames.append(name)
|
|
return ifnames
|
|
|
|
|
|
def _net_label_for_ifname(ifname: str, ifname_meta: dict[str, dict[str, str]]) -> str:
|
|
meta = ifname_meta.get(ifname, {})
|
|
ssid = meta.get("ssid") or ""
|
|
band = meta.get("band") or ""
|
|
if ssid and band:
|
|
return f"{ssid} ({band})"
|
|
if ssid:
|
|
return ssid
|
|
if band:
|
|
return band
|
|
return ifname
|
|
|
|
|
|
def _safe_json_load(raw: str) -> Any | None:
|
|
if not raw:
|
|
return None
|
|
try:
|
|
return json.loads(raw)
|
|
except json.JSONDecodeError:
|
|
start = raw.find("{")
|
|
end = raw.rfind("}")
|
|
if start == -1 or end == -1 or end <= start:
|
|
return None
|
|
try:
|
|
return json.loads(raw[start : end + 1])
|
|
except json.JSONDecodeError:
|
|
return None
|
|
|
|
|
|
def _parse_hostapd_clients(
|
|
payload: Any,
|
|
ifname: str,
|
|
*,
|
|
name_map: dict[str, str] | None = None,
|
|
ifname_meta: dict[str, dict[str, str]] | None = None,
|
|
) -> list[str]:
|
|
if not isinstance(payload, dict):
|
|
return []
|
|
data = payload.get("clients")
|
|
if isinstance(data, dict):
|
|
items = data.items()
|
|
else:
|
|
return []
|
|
clients: list[str] = []
|
|
name_map = name_map or {}
|
|
meta = (ifname_meta or {}).get(ifname, {})
|
|
ssid = meta.get("ssid") or ""
|
|
band = meta.get("band") or ""
|
|
if ssid and band:
|
|
net_label = f"{ssid} ({band})"
|
|
elif ssid:
|
|
net_label = ssid
|
|
elif band:
|
|
net_label = band
|
|
else:
|
|
net_label = ifname
|
|
for mac, meta in items:
|
|
if not isinstance(meta, dict):
|
|
continue
|
|
signal = meta.get("signal")
|
|
rate = meta.get("rate") or {}
|
|
rx = _format_rate((rate or {}).get("rx"))
|
|
tx = _format_rate((rate or {}).get("tx"))
|
|
sig = f"{signal}dBm" if isinstance(signal, (int, float)) else "?"
|
|
host = name_map.get(str(mac).lower())
|
|
if host and host != "unknown":
|
|
client_label = host
|
|
else:
|
|
client_label = str(mac)
|
|
clients.append(f"{net_label} {client_label} {sig} rx:{rx} tx:{tx}")
|
|
return clients
|
|
|
|
|
|
def _parse_proc_fallback(raw: str) -> tuple[int | None, list[float] | None]:
|
|
uptime = None
|
|
load = None
|
|
for line in raw.splitlines():
|
|
parts = line.split()
|
|
if len(parts) >= 2 and uptime is None:
|
|
try:
|
|
uptime = int(float(parts[0]))
|
|
except ValueError:
|
|
uptime = None
|
|
if len(parts) >= 3 and load is None:
|
|
try:
|
|
load = [float(parts[0]), float(parts[1]), float(parts[2])]
|
|
except ValueError:
|
|
load = None
|
|
return uptime, load
|
|
|
|
|
|
def _parse_leases_fallback(raw: str) -> list[str]:
|
|
out = []
|
|
for line in raw.splitlines():
|
|
parts = line.strip().split()
|
|
if len(parts) < 4:
|
|
continue
|
|
_expiry, mac, ipaddr, host = parts[:4]
|
|
host = host if host != "*" else "unknown"
|
|
out.append(f"{ipaddr} {host} ({mac})")
|
|
return out
|
|
|
|
|
|
async def get_openwrt_status(cfg: dict[str, Any]) -> str:
|
|
ow_cfg = cfg.get("openwrt", {})
|
|
host = ow_cfg.get("host")
|
|
user = ow_cfg.get("user", "root")
|
|
port = ow_cfg.get("port", 22)
|
|
identity_file = ow_cfg.get("identity_file")
|
|
timeout_sec = ow_cfg.get("timeout_sec", 8)
|
|
strict = ow_cfg.get("strict_host_key_checking", True)
|
|
|
|
if not host:
|
|
return "⚠️ OpenWrt host not configured"
|
|
|
|
ssh_cmd = [
|
|
"ssh",
|
|
"-o",
|
|
"BatchMode=yes",
|
|
"-o",
|
|
f"ConnectTimeout={timeout_sec}",
|
|
"-o",
|
|
"LogLevel=ERROR",
|
|
]
|
|
if not strict:
|
|
ssh_cmd += ["-o", "StrictHostKeyChecking=no", "-o", "UserKnownHostsFile=/dev/null"]
|
|
if identity_file:
|
|
ssh_cmd += ["-i", str(identity_file)]
|
|
ssh_cmd += ["-p", str(port), f"{user}@{host}"]
|
|
|
|
remote = (
|
|
"ubus call system info 2>/dev/null || (cat /proc/uptime; echo; cat /proc/loadavg); "
|
|
"echo __SEP__;"
|
|
"ubus call network.interface.wan status 2>/dev/null; echo __SEP__;"
|
|
"ubus call network.wireless status 2>/dev/null; echo __SEP__;"
|
|
"ubus call luci-rpc getDHCPLeases '{\"family\":4}' 2>/dev/null || cat /tmp/dhcp.leases"
|
|
)
|
|
cmd = ssh_cmd + ["sh", "-c", remote]
|
|
rc, out = await run_cmd_full(cmd, timeout=timeout_sec + 15)
|
|
if rc == 124:
|
|
return "⚠️ OpenWrt SSH error: timeout"
|
|
if rc != 0:
|
|
return f"⚠️ OpenWrt SSH error: {out.strip() or 'unknown error'}"
|
|
|
|
parts = [p.strip() for p in out.split("__SEP__")]
|
|
if len(parts) < 4:
|
|
return "⚠️ OpenWrt response incomplete"
|
|
|
|
sys_info = None
|
|
wan_status = None
|
|
wireless = None
|
|
leases = None
|
|
leases_fallback = ""
|
|
sys_info = _safe_json_load(parts[0])
|
|
if sys_info is None:
|
|
sys_info = None
|
|
wan_status = _safe_json_load(parts[1]) or {}
|
|
wireless = _safe_json_load(parts[2]) or {}
|
|
leases = _safe_json_load(parts[3])
|
|
if leases is None:
|
|
leases_fallback = parts[3]
|
|
|
|
if isinstance(sys_info, dict):
|
|
uptime_raw = sys_info.get("uptime")
|
|
load_raw = sys_info.get("load")
|
|
else:
|
|
uptime_raw, load_raw = _parse_proc_fallback(parts[0])
|
|
uptime = _format_uptime(uptime_raw)
|
|
load = _format_load(load_raw)
|
|
wan_ip = _extract_wan_ip(wan_status) or "unknown"
|
|
wan_up = wan_status.get("up") if isinstance(wan_status, dict) else None
|
|
wan_state = "up" if wan_up else "down"
|
|
|
|
wifi_clients = _extract_wifi_clients(wireless)
|
|
ifnames = _extract_ifnames(wireless)
|
|
ifname_meta = _extract_ifname_meta(wireless)
|
|
rc_l, out_l = await run_cmd_full(
|
|
ssh_cmd + ["sh", "-c", r"ubus -S list | awk -F. '/^hostapd\.phy/{print $2}'"],
|
|
timeout=timeout_sec + 15,
|
|
)
|
|
if rc_l == 0 and out_l.strip():
|
|
ifnames.extend(_extract_hostapd_ifnames(out_l))
|
|
ifnames = sorted({name for name in ifnames if name})
|
|
lease_name_map = _extract_lease_name_map(leases or {})
|
|
if leases_fallback:
|
|
lease_name_map.update(_extract_lease_name_map_fallback(leases_fallback))
|
|
wifi_net_counts: dict[str, int] = {}
|
|
if ifnames:
|
|
for ifname in ifnames:
|
|
cmd_clients = ssh_cmd + ["ubus", "call", f"hostapd.{ifname}", "get_clients"]
|
|
rc2, out2 = await run_cmd_full(cmd_clients, timeout=timeout_sec + 15)
|
|
if rc2 == 124:
|
|
return f"⚠️ OpenWrt SSH error (wifi clients {ifname}): timeout"
|
|
if rc2 == 0:
|
|
payload = _safe_json_load(out2)
|
|
if payload:
|
|
clients_payload = payload.get("clients") if isinstance(payload, dict) else None
|
|
if isinstance(clients_payload, dict):
|
|
label = _net_label_for_ifname(ifname, ifname_meta)
|
|
wifi_net_counts[label] = wifi_net_counts.get(label, 0) + len(clients_payload)
|
|
wifi_clients.extend(
|
|
_parse_hostapd_clients(
|
|
payload,
|
|
ifname,
|
|
name_map=lease_name_map,
|
|
ifname_meta=ifname_meta,
|
|
)
|
|
)
|
|
|
|
if leases:
|
|
leases_list = _extract_leases(leases)
|
|
else:
|
|
leases_list = _parse_leases_fallback(leases_fallback)
|
|
|
|
lines = [
|
|
"📡 OpenWrt",
|
|
f"🕒 Uptime: {uptime}",
|
|
f"⚙️ Load: {load}",
|
|
f"🌐 WAN: {wan_ip} ({wan_state})",
|
|
"",
|
|
]
|
|
if wifi_net_counts:
|
|
lines.append("📶 Wi-Fi networks:")
|
|
for label, count in sorted(wifi_net_counts.items()):
|
|
lines.append(f" - {label}: {count}")
|
|
lines.append("")
|
|
|
|
lines.append(f"📶 Wi-Fi clients: {len(wifi_clients)}")
|
|
if wifi_clients:
|
|
for line in wifi_clients[:20]:
|
|
lines.append(f" - {line}")
|
|
if len(wifi_clients) > 20:
|
|
lines.append(f" … and {len(wifi_clients) - 20} more")
|
|
else:
|
|
lines.append(" (none)")
|
|
|
|
lines += ["", f"🧾 DHCP leases: {len(leases_list)}"]
|
|
if leases_list:
|
|
for line in leases_list[:20]:
|
|
lines.append(f" - {line}")
|
|
if len(leases_list) > 20:
|
|
lines.append(f" … and {len(leases_list) - 20} more")
|
|
else:
|
|
lines.append(" (none)")
|
|
|
|
return "\n".join(lines)
|