Improve OpenWrt Wi-Fi client display
This commit is contained in:
@@ -108,6 +108,26 @@ def _extract_leases(leases: dict[str, Any]) -> list[str]:
|
||||
return out
|
||||
|
||||
|
||||
def _extract_lease_name_map(leases: Any) -> dict[str, str]:
|
||||
items = None
|
||||
if isinstance(leases, dict):
|
||||
items = leases.get("leases") or leases.get("dhcp_leases") or leases.get("ipv4_leases")
|
||||
elif isinstance(leases, list):
|
||||
items = leases
|
||||
if not isinstance(items, list):
|
||||
return {}
|
||||
out: dict[str, str] = {}
|
||||
for lease in items:
|
||||
if not isinstance(lease, dict):
|
||||
continue
|
||||
mac = lease.get("macaddr")
|
||||
if not mac:
|
||||
continue
|
||||
host = lease.get("hostname") or "unknown"
|
||||
out[str(mac).lower()] = str(host)
|
||||
return out
|
||||
|
||||
|
||||
def _extract_ifnames(wireless: dict[str, Any]) -> list[str]:
|
||||
ifnames: list[str] = []
|
||||
if not isinstance(wireless, dict):
|
||||
@@ -124,6 +144,41 @@ def _extract_ifnames(wireless: dict[str, Any]) -> list[str]:
|
||||
return ifnames
|
||||
|
||||
|
||||
def _extract_ifname_meta(wireless: dict[str, Any]) -> dict[str, dict[str, str]]:
|
||||
meta: dict[str, dict[str, str]] = {}
|
||||
if not isinstance(wireless, dict):
|
||||
return meta
|
||||
for radio in wireless.values():
|
||||
if not isinstance(radio, dict):
|
||||
continue
|
||||
band = None
|
||||
cfg = radio.get("config") or {}
|
||||
if isinstance(cfg, dict):
|
||||
band = cfg.get("band")
|
||||
band_label = None
|
||||
if band == "2g":
|
||||
band_label = "2.4GHz"
|
||||
elif band == "5g":
|
||||
band_label = "5GHz"
|
||||
elif band:
|
||||
band_label = str(band)
|
||||
for iface in radio.get("interfaces", []) or []:
|
||||
if not isinstance(iface, dict):
|
||||
continue
|
||||
ifname = iface.get("ifname")
|
||||
if not ifname:
|
||||
continue
|
||||
iface_cfg = iface.get("config") or {}
|
||||
ssid = None
|
||||
if isinstance(iface_cfg, dict):
|
||||
ssid = iface_cfg.get("ssid")
|
||||
meta[str(ifname)] = {
|
||||
"ssid": str(ssid) if ssid else "",
|
||||
"band": band_label or "",
|
||||
}
|
||||
return meta
|
||||
|
||||
|
||||
def _extract_hostapd_ifnames(raw: str) -> list[str]:
|
||||
ifnames: list[str] = []
|
||||
for line in raw.splitlines():
|
||||
@@ -150,7 +205,13 @@ def _safe_json_load(raw: str) -> Any | None:
|
||||
return None
|
||||
|
||||
|
||||
def _parse_hostapd_clients(payload: Any, ifname: str) -> list[str]:
|
||||
def _parse_hostapd_clients(
|
||||
payload: Any,
|
||||
ifname: str,
|
||||
*,
|
||||
name_map: dict[str, str] | None = None,
|
||||
ifname_meta: dict[str, dict[str, str]] | None = None,
|
||||
) -> list[str]:
|
||||
if not isinstance(payload, dict):
|
||||
return []
|
||||
data = payload.get("clients")
|
||||
@@ -159,6 +220,18 @@ def _parse_hostapd_clients(payload: Any, ifname: str) -> list[str]:
|
||||
else:
|
||||
return []
|
||||
clients: list[str] = []
|
||||
name_map = name_map or {}
|
||||
meta = (ifname_meta or {}).get(ifname, {})
|
||||
ssid = meta.get("ssid") or ""
|
||||
band = meta.get("band") or ""
|
||||
if ssid and band:
|
||||
net_label = f"{ssid} ({band})"
|
||||
elif ssid:
|
||||
net_label = ssid
|
||||
elif band:
|
||||
net_label = band
|
||||
else:
|
||||
net_label = ifname
|
||||
for mac, meta in items:
|
||||
if not isinstance(meta, dict):
|
||||
continue
|
||||
@@ -167,7 +240,12 @@ def _parse_hostapd_clients(payload: Any, ifname: str) -> list[str]:
|
||||
rx = _format_rate((rate or {}).get("rx"))
|
||||
tx = _format_rate((rate or {}).get("tx"))
|
||||
sig = f"{signal}dBm" if isinstance(signal, (int, float)) else "?"
|
||||
clients.append(f"{ifname} {mac} {sig} rx:{rx} tx:{tx}")
|
||||
host = name_map.get(str(mac).lower())
|
||||
if host and host != "unknown":
|
||||
client_label = f"{host} ({mac})"
|
||||
else:
|
||||
client_label = str(mac)
|
||||
clients.append(f"{net_label} {client_label} {sig} rx:{rx} tx:{tx}")
|
||||
return clients
|
||||
|
||||
|
||||
@@ -273,13 +351,15 @@ async def get_openwrt_status(cfg: dict[str, Any]) -> str:
|
||||
|
||||
wifi_clients = _extract_wifi_clients(wireless)
|
||||
ifnames = _extract_ifnames(wireless)
|
||||
ifname_meta = _extract_ifname_meta(wireless)
|
||||
rc_l, out_l = await run_cmd_full(
|
||||
ssh_cmd + ["sh", "-c", "ubus -S list | awk -F. '/^hostapd\\.phy/{print $2}'"],
|
||||
ssh_cmd + ["sh", "-c", "ubus -S list | awk -F. '/^hostapd\.phy/{print $2}'"],
|
||||
timeout=timeout_sec + 15,
|
||||
)
|
||||
if rc_l == 0 and out_l.strip():
|
||||
ifnames.extend(_extract_hostapd_ifnames(out_l))
|
||||
ifnames = sorted({name for name in ifnames if name})
|
||||
lease_name_map = _extract_lease_name_map(leases or {})
|
||||
if ifnames:
|
||||
for ifname in ifnames:
|
||||
cmd_clients = ssh_cmd + ["ubus", "call", f"hostapd.{ifname}", "get_clients"]
|
||||
@@ -289,7 +369,14 @@ async def get_openwrt_status(cfg: dict[str, Any]) -> str:
|
||||
if rc2 == 0:
|
||||
payload = _safe_json_load(out2)
|
||||
if payload:
|
||||
wifi_clients.extend(_parse_hostapd_clients(payload, ifname))
|
||||
wifi_clients.extend(
|
||||
_parse_hostapd_clients(
|
||||
payload,
|
||||
ifname,
|
||||
name_map=lease_name_map,
|
||||
ifname_meta=ifname_meta,
|
||||
)
|
||||
)
|
||||
|
||||
if leases:
|
||||
leases_list = _extract_leases(leases)
|
||||
|
||||
Reference in New Issue
Block a user