import json from typing import Any from services.runner import run_cmd def _format_uptime(seconds: int | float | None) -> str: if seconds is None: return "unknown" total = int(seconds) days, rem = divmod(total, 86400) hours, rem = divmod(rem, 3600) minutes, _ = divmod(rem, 60) if days > 0: return f"{days}d {hours:02d}:{minutes:02d}" return f"{hours:02d}:{minutes:02d}" def _format_load(load: list[Any] | None) -> str: if not load or not isinstance(load, list): return "unknown" values = [] for raw in load[:3]: try: values.append(float(raw) / 65536.0) except (TypeError, ValueError): values.append(0.0) return " ".join(f"{val:.2f}" for val in values) def _format_rate(rate: Any) -> str: try: val = float(rate) except (TypeError, ValueError): return "?" if val <= 0: return "?" return f"{val / 1000:.1f}M" def _extract_wan_ip(wan: dict[str, Any]) -> str | None: if not isinstance(wan, dict): return None addrs = wan.get("ipv4-address") or [] if isinstance(addrs, list): for item in addrs: if isinstance(item, dict): ip = item.get("address") if ip: return str(ip) return None def _extract_wifi_clients(wireless: dict[str, Any]) -> list[str]: clients: list[str] = [] if not isinstance(wireless, dict): return clients for radio in wireless.values(): if not isinstance(radio, dict): continue for iface in radio.get("interfaces", []) or []: if not isinstance(iface, dict): continue ifname = iface.get("ifname") or "wifi" assoclist = iface.get("assoclist") or {} if not isinstance(assoclist, dict): continue for mac, meta in assoclist.items(): if not isinstance(meta, dict): continue signal = meta.get("signal") rx = _format_rate((meta.get("rx") or {}).get("rate")) tx = _format_rate((meta.get("tx") or {}).get("rate")) sig = f"{signal}dBm" if isinstance(signal, (int, float)) else "?" clients.append(f"{ifname} {mac} {sig} rx:{rx} tx:{tx}") return clients def _extract_leases(leases: dict[str, Any]) -> list[str]: items = leases.get("leases") if isinstance(leases, dict) else None if not isinstance(items, list): return [] out = [] for lease in items: if not isinstance(lease, dict): continue ipaddr = lease.get("ipaddr") or "?" host = lease.get("hostname") or "unknown" mac = lease.get("macaddr") or "?" out.append(f"{ipaddr} {host} ({mac})") return out async def get_openwrt_status(cfg: dict[str, Any]) -> str: ow_cfg = cfg.get("openwrt", {}) host = ow_cfg.get("host") user = ow_cfg.get("user", "root") port = ow_cfg.get("port", 22) identity_file = ow_cfg.get("identity_file") timeout_sec = ow_cfg.get("timeout_sec", 8) strict = ow_cfg.get("strict_host_key_checking", True) if not host: return "⚠️ OpenWrt host not configured" ssh_cmd = [ "ssh", "-o", "BatchMode=yes", "-o", f"ConnectTimeout={timeout_sec}", ] if not strict: ssh_cmd += ["-o", "StrictHostKeyChecking=no", "-o", "UserKnownHostsFile=/dev/null"] if identity_file: ssh_cmd += ["-i", str(identity_file)] ssh_cmd += ["-p", str(port), f"{user}@{host}"] remote = ( "ubus call system info; echo __SEP__;" "ubus call network.interface.wan status; echo __SEP__;" "ubus call network.wireless status; echo __SEP__;" "ubus call dhcp ipv4leases" ) cmd = ssh_cmd + ["sh", "-c", remote] rc, out = await run_cmd(cmd, timeout=timeout_sec + 5) if rc != 0: return f"⚠️ OpenWrt SSH error: {out.strip() or 'unknown error'}" parts = [p.strip() for p in out.split("__SEP__")] if len(parts) < 4: return "⚠️ OpenWrt response incomplete" try: sys_info = json.loads(parts[0]) wan_status = json.loads(parts[1]) wireless = json.loads(parts[2]) leases = json.loads(parts[3]) except json.JSONDecodeError: return "⚠️ OpenWrt response parse error" uptime = _format_uptime(sys_info.get("uptime") if isinstance(sys_info, dict) else None) load = _format_load(sys_info.get("load") if isinstance(sys_info, dict) else None) wan_ip = _extract_wan_ip(wan_status) or "unknown" wan_up = wan_status.get("up") if isinstance(wan_status, dict) else None wan_state = "up" if wan_up else "down" wifi_clients = _extract_wifi_clients(wireless) leases_list = _extract_leases(leases) lines = [ "📡 OpenWrt", f"🕒 Uptime: {uptime}", f"⚙️ Load: {load}", f"🌐 WAN: {wan_ip} ({wan_state})", "", f"📶 Wi-Fi clients: {len(wifi_clients)}", ] if wifi_clients: for line in wifi_clients[:20]: lines.append(f" - {line}") if len(wifi_clients) > 20: lines.append(f" … and {len(wifi_clients) - 20} more") else: lines.append(" (none)") lines += ["", f"🧾 DHCP leases: {len(leases_list)}"] if leases_list: for line in leases_list[:20]: lines.append(f" - {line}") if len(leases_list) > 20: lines.append(f" … and {len(leases_list) - 20} more") else: lines.append(" (none)") return "\n".join(lines)