Harden OpenWrt JSON parsing
This commit is contained in:
@@ -124,16 +124,28 @@ def _extract_ifnames(wireless: dict[str, Any]) -> list[str]:
|
|||||||
return ifnames
|
return ifnames
|
||||||
|
|
||||||
|
|
||||||
def _parse_hostapd_clients(raw: str, ifname: str) -> list[str]:
|
def _safe_json_load(raw: str) -> Any | None:
|
||||||
|
if not raw:
|
||||||
|
return None
|
||||||
try:
|
try:
|
||||||
payload = json.loads(raw)
|
return json.loads(raw)
|
||||||
except json.JSONDecodeError:
|
except json.JSONDecodeError:
|
||||||
|
start = raw.find("{")
|
||||||
|
end = raw.rfind("}")
|
||||||
|
if start == -1 or end == -1 or end <= start:
|
||||||
|
return None
|
||||||
|
try:
|
||||||
|
return json.loads(raw[start : end + 1])
|
||||||
|
except json.JSONDecodeError:
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
def _parse_hostapd_clients(payload: Any, ifname: str) -> list[str]:
|
||||||
|
if not isinstance(payload, dict):
|
||||||
return []
|
return []
|
||||||
data = payload.get("clients") if isinstance(payload, dict) else None
|
data = payload.get("clients")
|
||||||
if isinstance(data, dict):
|
if isinstance(data, dict):
|
||||||
items = data.items()
|
items = data.items()
|
||||||
elif isinstance(payload, dict):
|
|
||||||
items = payload.items()
|
|
||||||
else:
|
else:
|
||||||
return []
|
return []
|
||||||
clients: list[str] = []
|
clients: list[str] = []
|
||||||
@@ -229,22 +241,13 @@ async def get_openwrt_status(cfg: dict[str, Any]) -> str:
|
|||||||
wireless = None
|
wireless = None
|
||||||
leases = None
|
leases = None
|
||||||
leases_fallback = ""
|
leases_fallback = ""
|
||||||
try:
|
sys_info = _safe_json_load(parts[0])
|
||||||
sys_info = json.loads(parts[0])
|
if sys_info is None:
|
||||||
except json.JSONDecodeError:
|
|
||||||
sys_info = None
|
sys_info = None
|
||||||
try:
|
wan_status = _safe_json_load(parts[1]) or {}
|
||||||
wan_status = json.loads(parts[1])
|
wireless = _safe_json_load(parts[2]) or {}
|
||||||
except json.JSONDecodeError:
|
leases = _safe_json_load(parts[3])
|
||||||
wan_status = {}
|
if leases is None:
|
||||||
try:
|
|
||||||
wireless = json.loads(parts[2])
|
|
||||||
except json.JSONDecodeError:
|
|
||||||
wireless = {}
|
|
||||||
try:
|
|
||||||
leases = json.loads(parts[3])
|
|
||||||
except json.JSONDecodeError:
|
|
||||||
leases = None
|
|
||||||
leases_fallback = parts[3]
|
leases_fallback = parts[3]
|
||||||
|
|
||||||
if isinstance(sys_info, dict):
|
if isinstance(sys_info, dict):
|
||||||
@@ -270,8 +273,9 @@ async def get_openwrt_status(cfg: dict[str, Any]) -> str:
|
|||||||
rc2, out2 = await run_cmd_full(cmd_clients, timeout=timeout_sec + 15)
|
rc2, out2 = await run_cmd_full(cmd_clients, timeout=timeout_sec + 15)
|
||||||
if rc2 == 124:
|
if rc2 == 124:
|
||||||
return f"⚠️ OpenWrt SSH error (wifi clients {ifname}): timeout"
|
return f"⚠️ OpenWrt SSH error (wifi clients {ifname}): timeout"
|
||||||
if rc2 == 0 and out2.strip():
|
payload = _safe_json_load(out2)
|
||||||
wifi_clients.extend(_parse_hostapd_clients(out2.strip(), ifname))
|
if payload:
|
||||||
|
wifi_clients.extend(_parse_hostapd_clients(payload, ifname))
|
||||||
|
|
||||||
if leases:
|
if leases:
|
||||||
leases_list = _extract_leases(leases)
|
leases_list = _extract_leases(leases)
|
||||||
|
|||||||
Reference in New Issue
Block a user