import json from typing import Any from services.runner import run_cmd, run_cmd_full def _format_uptime(seconds: int | float | None) -> str: if seconds is None: return "unknown" total = int(seconds) days, rem = divmod(total, 86400) hours, rem = divmod(rem, 3600) minutes, _ = divmod(rem, 60) if days > 0: return f"{days}d {hours:02d}:{minutes:02d}" return f"{hours:02d}:{minutes:02d}" def _format_load(load: list[Any] | None) -> str: if not load or not isinstance(load, list): return "unknown" values = [] for raw in load[:3]: try: values.append(float(raw)) except (TypeError, ValueError): values.append(0.0) scale = 1.0 if values and max(values) > 1000: scale = 1 / 65536.0 return " ".join(f"{val * scale:.2f}" for val in values) def _format_rate(rate: Any) -> str: try: val = float(rate) except (TypeError, ValueError): return "?" if val <= 0: return "?" if val >= 1_000_000: return f"{val / 1_000_000:.1f}M" if val >= 1_000: return f"{val / 1_000:.1f}K" return f"{val:.0f}b" def _extract_wan_ip(wan: dict[str, Any]) -> str | None: if not isinstance(wan, dict): return None addrs = wan.get("ipv4-address") or [] if isinstance(addrs, list): for item in addrs: if isinstance(item, dict): ip = item.get("address") if ip: return str(ip) return None def _extract_wifi_clients(wireless: dict[str, Any]) -> list[str]: clients: list[str] = [] if not isinstance(wireless, dict): return clients for radio in wireless.values(): if not isinstance(radio, dict): continue for iface in radio.get("interfaces", []) or []: if not isinstance(iface, dict): continue ifname = iface.get("ifname") or "wifi" assoclist = iface.get("assoclist") stations = iface.get("stations") if isinstance(assoclist, dict): for mac, meta in assoclist.items(): if not isinstance(meta, dict): continue signal = meta.get("signal") rx = _format_rate((meta.get("rx") or {}).get("rate")) tx = _format_rate((meta.get("tx") or {}).get("rate")) sig = f"{signal}dBm" if isinstance(signal, (int, float)) else "?" clients.append(f"{ifname} {mac} {sig} rx:{rx} tx:{tx}") elif isinstance(stations, list): for meta in stations: if not isinstance(meta, dict): continue mac = meta.get("mac") or "?" signal = meta.get("signal") rx = _format_rate((meta.get("rx") or {}).get("rate")) tx = _format_rate((meta.get("tx") or {}).get("rate")) sig = f"{signal}dBm" if isinstance(signal, (int, float)) else "?" clients.append(f"{ifname} {mac} {sig} rx:{rx} tx:{tx}") return clients def _extract_leases(leases: dict[str, Any]) -> list[str]: items = None if isinstance(leases, dict): items = leases.get("leases") or leases.get("dhcp_leases") or leases.get("ipv4_leases") elif isinstance(leases, list): items = leases if not isinstance(items, list): return [] out = [] for lease in items: if not isinstance(lease, dict): continue ipaddr = lease.get("ipaddr") or "?" host = lease.get("hostname") or "unknown" mac = lease.get("macaddr") or "?" out.append(f"{ipaddr} {host} ({mac})") return out def _extract_lease_name_map(leases: Any) -> dict[str, str]: items = None if isinstance(leases, dict): items = leases.get("leases") or leases.get("dhcp_leases") or leases.get("ipv4_leases") elif isinstance(leases, list): items = leases if not isinstance(items, list): return {} out: dict[str, str] = {} for lease in items: if not isinstance(lease, dict): continue mac = lease.get("macaddr") if not mac: continue host = lease.get("hostname") or "unknown" out[str(mac).lower()] = str(host) return out def _extract_lease_name_map_fallback(raw: str) -> dict[str, str]: out: dict[str, str] = {} for line in raw.splitlines(): parts = line.strip().split() if len(parts) < 4: continue _expiry, mac, _ipaddr, host = parts[:4] host = host if host != "*" else "unknown" out[str(mac).lower()] = str(host) return out def _extract_ifnames(wireless: dict[str, Any]) -> list[str]: ifnames: list[str] = [] if not isinstance(wireless, dict): return ifnames for radio in wireless.values(): if not isinstance(radio, dict): continue for iface in radio.get("interfaces", []) or []: if not isinstance(iface, dict): continue ifname = iface.get("ifname") if ifname: ifnames.append(str(ifname)) return ifnames def _extract_ifname_meta(wireless: dict[str, Any]) -> dict[str, dict[str, str]]: meta: dict[str, dict[str, str]] = {} if not isinstance(wireless, dict): return meta for radio in wireless.values(): if not isinstance(radio, dict): continue band = None cfg = radio.get("config") or {} if isinstance(cfg, dict): band = cfg.get("band") band_label = None if band == "2g": band_label = "2.4GHz" elif band == "5g": band_label = "5GHz" elif band: band_label = str(band) for iface in radio.get("interfaces", []) or []: if not isinstance(iface, dict): continue ifname = iface.get("ifname") if not ifname: continue iface_cfg = iface.get("config") or {} ssid = None if isinstance(iface_cfg, dict): ssid = iface_cfg.get("ssid") meta[str(ifname)] = { "ssid": str(ssid) if ssid else "", "band": band_label or "", } return meta def _extract_hostapd_ifnames(raw: str) -> list[str]: ifnames: list[str] = [] for line in raw.splitlines(): name = line.strip() if not name or name == "hostapd": continue ifnames.append(name) return ifnames def _net_label_for_ifname(ifname: str, ifname_meta: dict[str, dict[str, str]]) -> str: meta = ifname_meta.get(ifname, {}) ssid = meta.get("ssid") or "" band = meta.get("band") or "" if ssid and band: return f"{ssid} ({band})" if ssid: return ssid if band: return band return ifname def _safe_json_load(raw: str) -> Any | None: if not raw: return None try: return json.loads(raw) except json.JSONDecodeError: start = raw.find("{") end = raw.rfind("}") if start == -1 or end == -1 or end <= start: return None try: return json.loads(raw[start : end + 1]) except json.JSONDecodeError: return None def _parse_hostapd_clients( payload: Any, ifname: str, *, name_map: dict[str, str] | None = None, ifname_meta: dict[str, dict[str, str]] | None = None, ) -> list[tuple[str, int | None, str]]: if not isinstance(payload, dict): return [] data = payload.get("clients") if isinstance(data, dict): items = data.items() else: return [] clients: list[tuple[str, int | None, str]] = [] name_map = name_map or {} meta = (ifname_meta or {}).get(ifname, {}) ssid = meta.get("ssid") or "" band = meta.get("band") or "" if ssid and band: net_label = f"{ssid} ({band})" elif ssid: net_label = ssid elif band: net_label = band else: net_label = ifname for mac, meta in items: if not isinstance(meta, dict): continue signal = meta.get("signal") rate = meta.get("rate") or {} rx = _format_rate((rate or {}).get("rx")) tx = _format_rate((rate or {}).get("tx")) sig = f"{signal}dBm" if isinstance(signal, (int, float)) else "?" host = name_map.get(str(mac).lower()) if host and host != "unknown": client_label = host else: client_label = str(mac) line = f"{net_label} {client_label} {sig} rx:{rx} tx:{tx}" clients.append((line, signal if isinstance(signal, (int, float)) else None, net_label)) return clients def _parse_proc_fallback(raw: str) -> tuple[int | None, list[float] | None]: uptime = None load = None for line in raw.splitlines(): parts = line.split() if len(parts) >= 2 and uptime is None: try: uptime = int(float(parts[0])) except ValueError: uptime = None if len(parts) >= 3 and load is None: try: load = [float(parts[0]), float(parts[1]), float(parts[2])] except ValueError: load = None return uptime, load def _parse_leases_fallback(raw: str) -> list[str]: out = [] for line in raw.splitlines(): parts = line.strip().split() if len(parts) < 4: continue _expiry, mac, ipaddr, host = parts[:4] host = host if host != "*" else "unknown" out.append(f"{ipaddr} {host} ({mac})") return out async def get_openwrt_status(cfg: dict[str, Any], mode: str = "full") -> str: ow_cfg = cfg.get("openwrt", {}) host = ow_cfg.get("host") user = ow_cfg.get("user", "root") port = ow_cfg.get("port", 22) identity_file = ow_cfg.get("identity_file") timeout_sec = ow_cfg.get("timeout_sec", 8) strict = ow_cfg.get("strict_host_key_checking", True) if not host: return "⚠️ OpenWrt host not configured" ssh_cmd = [ "ssh", "-o", "BatchMode=yes", "-o", f"ConnectTimeout={timeout_sec}", "-o", "LogLevel=ERROR", ] if not strict: ssh_cmd += ["-o", "StrictHostKeyChecking=no", "-o", "UserKnownHostsFile=/dev/null"] if identity_file: ssh_cmd += ["-i", str(identity_file)] ssh_cmd += ["-p", str(port), f"{user}@{host}"] remote = ( "ubus call system info 2>/dev/null || (cat /proc/uptime; echo; cat /proc/loadavg); " "echo __SEP__;" "ubus call network.interface.wan status 2>/dev/null; echo __SEP__;" "ubus call network.wireless status 2>/dev/null; echo __SEP__;" "ubus call luci-rpc getDHCPLeases '{\"family\":4}' 2>/dev/null || cat /tmp/dhcp.leases" ) cmd = ssh_cmd + ["sh", "-c", remote] rc, out = await run_cmd_full(cmd, timeout=timeout_sec + 15) if rc == 124: return "⚠️ OpenWrt SSH error: timeout" if rc != 0: return f"⚠️ OpenWrt SSH error: {out.strip() or 'unknown error'}" parts = [p.strip() for p in out.split("__SEP__")] if len(parts) < 4: return "⚠️ OpenWrt response incomplete" sys_info = _safe_json_load(parts[0]) wan_status = _safe_json_load(parts[1]) or {} wireless = _safe_json_load(parts[2]) or {} leases = _safe_json_load(parts[3]) leases_fallback = "" if leases is not None else parts[3] if isinstance(sys_info, dict): uptime_raw = sys_info.get("uptime") load_raw = sys_info.get("load") else: uptime_raw, load_raw = _parse_proc_fallback(parts[0]) uptime = _format_uptime(uptime_raw) load = _format_load(load_raw) wan_ip = _extract_wan_ip(wan_status) or "unknown" wan_up = wan_status.get("up") if isinstance(wan_status, dict) else None wan_state = "up" if wan_up else "down" wifi_clients = _extract_wifi_clients(wireless) ifnames = _extract_ifnames(wireless) ifname_meta = _extract_ifname_meta(wireless) rc_l, out_l = await run_cmd_full( ssh_cmd + ["sh", "-c", r"ubus -S list | awk -F. '/^hostapd\.phy/{print $2}'"], timeout=timeout_sec + 15, ) if rc_l == 0 and out_l.strip(): ifnames.extend(_extract_hostapd_ifnames(out_l)) ifnames = sorted({name for name in ifnames if name}) lease_name_map = _extract_lease_name_map(leases or {}) if leases_fallback: lease_name_map.update(_extract_lease_name_map_fallback(leases_fallback)) wifi_net_counts: dict[str, int] = {} wifi_signals: dict[str, list[int]] = {} if ifnames: for ifname in ifnames: cmd_clients = ssh_cmd + ["ubus", "call", f"hostapd.{ifname}", "get_clients"] rc2, out2 = await run_cmd_full(cmd_clients, timeout=timeout_sec + 15) if rc2 == 124: return f"⚠️ OpenWrt SSH error (wifi clients {ifname}): timeout" if rc2 == 0: payload = _safe_json_load(out2) if payload: clients_payload = payload.get("clients") if isinstance(payload, dict) else None if isinstance(clients_payload, dict): label = _net_label_for_ifname(ifname, ifname_meta) wifi_net_counts[label] = wifi_net_counts.get(label, 0) + len(clients_payload) parsed = _parse_hostapd_clients( payload, ifname, name_map=lease_name_map, ifname_meta=ifname_meta, ) wifi_clients.extend([p[0] for p in parsed]) for _line, sig, net_label in parsed: if sig is not None and net_label: wifi_signals.setdefault(net_label, []).append(sig) if leases: leases_list = _extract_leases(leases) else: leases_list = _parse_leases_fallback(leases_fallback) header = [ "📡 OpenWrt", f"🕒 Uptime: {uptime}", f"⚙️ Load: {load}", f"🌐 WAN: {wan_ip} ({wan_state})", "", ] wifi_section: list[str] = [] if wifi_net_counts: wifi_section.append("📶 Wi-Fi networks:") for label, count in sorted(wifi_net_counts.items()): sigs = wifi_signals.get(label) or [] if sigs: avg_sig = sum(sigs) / len(sigs) min_sig = min(sigs) wifi_section.append(f" - {label}: {count} (avg {avg_sig:.0f}dBm, min {min_sig}dBm)") else: wifi_section.append(f" - {label}: {count}") wifi_section.append("") wifi_section.append(f"📶 Wi-Fi clients: {len(wifi_clients)}") if wifi_clients: for line in wifi_clients[:20]: wifi_section.append(f" - {line}") if len(wifi_clients) > 20: wifi_section.append(f" … and {len(wifi_clients) - 20} more") else: wifi_section.append(" (none)") lease_section: list[str] = ["", f"🧾 DHCP leases: {len(leases_list)}"] if leases_list: for line in leases_list[:20]: lease_section.append(f" - {line}") if len(leases_list) > 20: lease_section.append(f" … and {len(leases_list) - 20} more") else: lease_section.append(" (none)") if mode == "wan": return "\n".join(header) if mode == "clients": return "\n".join(header + wifi_section) if mode == "leases": return "\n".join(header + lease_section) return "\n".join(header + wifi_section + lease_section) async def fetch_openwrt_leases(cfg: dict[str, Any]) -> list[str]: """ Fetch DHCP leases as list of strings "IP hostname (MAC)". """ ow_cfg = cfg.get("openwrt", {}) host = ow_cfg.get("host") user = ow_cfg.get("user", "root") port = ow_cfg.get("port", 22) identity_file = ow_cfg.get("identity_file") timeout_sec = ow_cfg.get("timeout_sec", 8) strict = ow_cfg.get("strict_host_key_checking", True) if not host: raise RuntimeError("OpenWrt host not configured") ssh_cmd = [ "ssh", "-o", "BatchMode=yes", "-o", f"ConnectTimeout={timeout_sec}", "-o", "LogLevel=ERROR", ] if not strict: ssh_cmd += ["-o", "StrictHostKeyChecking=no", "-o", "UserKnownHostsFile=/dev/null"] if identity_file: ssh_cmd += ["-i", str(identity_file)] ssh_cmd += ["-p", str(port), f"{user}@{host}"] remote = "ubus call luci-rpc getDHCPLeases '{\"family\":4}' 2>/dev/null || cat /tmp/dhcp.leases" rc, out = await run_cmd_full(ssh_cmd + ["sh", "-c", remote], timeout=timeout_sec + 10) if rc == 124: raise RuntimeError("timeout") if rc != 0: raise RuntimeError(out.strip() or f"ssh rc={rc}") leases = _safe_json_load(out) if leases: return _extract_leases(leases) return _parse_leases_fallback(out)