From b4a243e72f059e2179a85bfcbc1c64d4b95e3fab Mon Sep 17 00:00:00 2001 From: benya Date: Sun, 8 Feb 2026 03:32:10 +0300 Subject: [PATCH] Improve OpenWrt Wi-Fi client display --- services/openwrt.py | 95 +++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 91 insertions(+), 4 deletions(-) diff --git a/services/openwrt.py b/services/openwrt.py index 31f8d52..5e5d45b 100644 --- a/services/openwrt.py +++ b/services/openwrt.py @@ -108,6 +108,26 @@ def _extract_leases(leases: dict[str, Any]) -> list[str]: return out +def _extract_lease_name_map(leases: Any) -> dict[str, str]: + items = None + if isinstance(leases, dict): + items = leases.get("leases") or leases.get("dhcp_leases") or leases.get("ipv4_leases") + elif isinstance(leases, list): + items = leases + if not isinstance(items, list): + return {} + out: dict[str, str] = {} + for lease in items: + if not isinstance(lease, dict): + continue + mac = lease.get("macaddr") + if not mac: + continue + host = lease.get("hostname") or "unknown" + out[str(mac).lower()] = str(host) + return out + + def _extract_ifnames(wireless: dict[str, Any]) -> list[str]: ifnames: list[str] = [] if not isinstance(wireless, dict): @@ -124,6 +144,41 @@ def _extract_ifnames(wireless: dict[str, Any]) -> list[str]: return ifnames +def _extract_ifname_meta(wireless: dict[str, Any]) -> dict[str, dict[str, str]]: + meta: dict[str, dict[str, str]] = {} + if not isinstance(wireless, dict): + return meta + for radio in wireless.values(): + if not isinstance(radio, dict): + continue + band = None + cfg = radio.get("config") or {} + if isinstance(cfg, dict): + band = cfg.get("band") + band_label = None + if band == "2g": + band_label = "2.4GHz" + elif band == "5g": + band_label = "5GHz" + elif band: + band_label = str(band) + for iface in radio.get("interfaces", []) or []: + if not isinstance(iface, dict): + continue + ifname = iface.get("ifname") + if not ifname: + continue + iface_cfg = iface.get("config") or {} + ssid = None + if isinstance(iface_cfg, dict): + ssid = iface_cfg.get("ssid") + meta[str(ifname)] = { + "ssid": str(ssid) if ssid else "", + "band": band_label or "", + } + return meta + + def _extract_hostapd_ifnames(raw: str) -> list[str]: ifnames: list[str] = [] for line in raw.splitlines(): @@ -150,7 +205,13 @@ def _safe_json_load(raw: str) -> Any | None: return None -def _parse_hostapd_clients(payload: Any, ifname: str) -> list[str]: +def _parse_hostapd_clients( + payload: Any, + ifname: str, + *, + name_map: dict[str, str] | None = None, + ifname_meta: dict[str, dict[str, str]] | None = None, +) -> list[str]: if not isinstance(payload, dict): return [] data = payload.get("clients") @@ -159,6 +220,18 @@ def _parse_hostapd_clients(payload: Any, ifname: str) -> list[str]: else: return [] clients: list[str] = [] + name_map = name_map or {} + meta = (ifname_meta or {}).get(ifname, {}) + ssid = meta.get("ssid") or "" + band = meta.get("band") or "" + if ssid and band: + net_label = f"{ssid} ({band})" + elif ssid: + net_label = ssid + elif band: + net_label = band + else: + net_label = ifname for mac, meta in items: if not isinstance(meta, dict): continue @@ -167,7 +240,12 @@ def _parse_hostapd_clients(payload: Any, ifname: str) -> list[str]: rx = _format_rate((rate or {}).get("rx")) tx = _format_rate((rate or {}).get("tx")) sig = f"{signal}dBm" if isinstance(signal, (int, float)) else "?" - clients.append(f"{ifname} {mac} {sig} rx:{rx} tx:{tx}") + host = name_map.get(str(mac).lower()) + if host and host != "unknown": + client_label = f"{host} ({mac})" + else: + client_label = str(mac) + clients.append(f"{net_label} {client_label} {sig} rx:{rx} tx:{tx}") return clients @@ -273,13 +351,15 @@ async def get_openwrt_status(cfg: dict[str, Any]) -> str: wifi_clients = _extract_wifi_clients(wireless) ifnames = _extract_ifnames(wireless) + ifname_meta = _extract_ifname_meta(wireless) rc_l, out_l = await run_cmd_full( - ssh_cmd + ["sh", "-c", "ubus -S list | awk -F. '/^hostapd\\.phy/{print $2}'"], + ssh_cmd + ["sh", "-c", "ubus -S list | awk -F. '/^hostapd\.phy/{print $2}'"], timeout=timeout_sec + 15, ) if rc_l == 0 and out_l.strip(): ifnames.extend(_extract_hostapd_ifnames(out_l)) ifnames = sorted({name for name in ifnames if name}) + lease_name_map = _extract_lease_name_map(leases or {}) if ifnames: for ifname in ifnames: cmd_clients = ssh_cmd + ["ubus", "call", f"hostapd.{ifname}", "get_clients"] @@ -289,7 +369,14 @@ async def get_openwrt_status(cfg: dict[str, Any]) -> str: if rc2 == 0: payload = _safe_json_load(out2) if payload: - wifi_clients.extend(_parse_hostapd_clients(payload, ifname)) + wifi_clients.extend( + _parse_hostapd_clients( + payload, + ifname, + name_map=lease_name_map, + ifname_meta=ifname_meta, + ) + ) if leases: leases_list = _extract_leases(leases)