Files
Salus/app/omada.py
Christoph Gasser b6082e2c60 Fix stale session causing persistent empty AP list
Two changes:

1. _request_with_retry: after a session-error re-login, also re-fetch
   sites and refresh the client reference so the retry uses valid state.

2. get_aps: if every site fails with an exception, clear the token and
   sites cache so the very next page reload triggers a fresh login.
   Previously the broken state was sticky until a container restart.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-28 09:11:55 +02:00

270 lines
12 KiB
Python

import os
import time
import logging
from typing import Optional
import httpx
logger = logging.getLogger(__name__)
OMADA_BASE_URL = os.getenv("OMADA_BASE_URL", "https://192.168.1.1:8043").rstrip("/")
OMADA_USERNAME = os.getenv("OMADA_USERNAME", "")
OMADA_PASSWORD = os.getenv("OMADA_PASSWORD", "")
OMADA_VERIFY_SSL = os.getenv("OMADA_VERIFY_SSL", "true").strip().lower() != "false"
OMADA_CLIENT_ID = os.getenv("OMADA_CLIENT_ID", "")
OMADA_CLIENT_SECRET = os.getenv("OMADA_CLIENT_SECRET", "")
class OmadaClient:
def __init__(self):
self._client: Optional[httpx.AsyncClient] = None
self._omadac_id: Optional[str] = None
self._token: Optional[str] = None
# {site_name: site_key}
self._sites: Optional[dict[str, str]] = None
# OpenAPI OAuth2
self._openapi_client: Optional[httpx.AsyncClient] = None
self._openapi_token: Optional[str] = None
self._openapi_token_expires: float = 0.0
def _get_client(self) -> httpx.AsyncClient:
"""Session client — carries web-login cookies for the v2 API."""
if self._client is None or self._client.is_closed:
self._client = httpx.AsyncClient(
verify=OMADA_VERIFY_SSL,
timeout=30.0,
follow_redirects=True,
)
return self._client
def _get_openapi_client(self) -> httpx.AsyncClient:
"""Cookie-free client for OpenAPI calls — session cookies must not bleed in."""
if self._openapi_client is None or self._openapi_client.is_closed:
self._openapi_client = httpx.AsyncClient(
verify=OMADA_VERIFY_SSL,
timeout=30.0,
follow_redirects=True,
cookies={},
)
return self._openapi_client
def _auth_headers(self) -> dict:
return {"Csrf-Token": self._token} if self._token else {}
async def _fetch_omadac_id(self) -> str:
resp = await self._get_client().get(f"{OMADA_BASE_URL}/api/info")
resp.raise_for_status()
data = resp.json()
if data.get("errorCode", -1) != 0:
raise RuntimeError(f"Omada /api/info error: {data.get('msg')}")
self._omadac_id = data["result"]["omadacId"]
logger.info("Omada controller ID: %s", self._omadac_id)
return self._omadac_id
async def _login(self) -> str:
if not self._omadac_id:
await self._fetch_omadac_id()
resp = await self._get_client().post(
f"{OMADA_BASE_URL}/{self._omadac_id}/api/v2/login",
json={"username": OMADA_USERNAME, "password": OMADA_PASSWORD},
)
resp.raise_for_status()
data = resp.json()
if data.get("errorCode", -1) != 0:
raise RuntimeError(f"Omada login failed: {data.get('msg')}")
self._token = data["result"]["token"]
logger.info("Omada web-API login successful")
return self._token
async def _fetch_sites(self) -> dict[str, str]:
"""Return {site_name: site_key} for all sites accessible to this user."""
resp = await self._get_client().get(
f"{OMADA_BASE_URL}/{self._omadac_id}/api/v2/users/current",
headers=self._auth_headers(),
)
resp.raise_for_status()
data = resp.json()
if data.get("errorCode", -1) != 0:
raise RuntimeError(f"Omada users/current error: {data.get('msg')}")
raw = data.get("result", {}).get("privilege", {}).get("sites", [])
self._sites = {s["name"]: s["key"] for s in raw}
logger.info("Loaded %d Omada sites", len(self._sites))
return self._sites
async def _ensure_ready(self):
if not self._token:
await self._login()
if self._sites is None:
await self._fetch_sites()
async def _request_with_retry(self, method: str, url: str, **kwargs):
client = self._get_client()
resp = await client.request(method, url, headers=self._auth_headers(), **kwargs)
resp.raise_for_status()
data = resp.json()
if data.get("errorCode", 0) in (-1006, -1003, -30109):
logger.info("Session error %s — re-authenticating", data.get("errorCode"))
self._token = None
self._sites = None
await self._login()
await self._fetch_sites()
client = self._get_client()
resp = await client.request(method, url, headers=self._auth_headers(), **kwargs)
resp.raise_for_status()
data = resp.json()
if data.get("errorCode", 0) != 0:
raise RuntimeError(f"Omada API error {data.get('errorCode')}: {data.get('msg')}")
return data
# ── OpenAPI OAuth2 (commands) ──────────────────────────────────────────────
async def _get_openapi_token(self) -> str:
if self._openapi_token and time.time() < self._openapi_token_expires - 60:
return self._openapi_token
if not self._omadac_id:
await self._fetch_omadac_id()
resp = await self._get_openapi_client().post(
f"{OMADA_BASE_URL}/openapi/authorize/token",
data={
"grantType": "client_credentials",
"clientId": OMADA_CLIENT_ID,
"clientSecret": OMADA_CLIENT_SECRET,
},
)
resp.raise_for_status()
data = resp.json()
if data.get("errorCode", -1) != 0:
raise RuntimeError(f"Omada OpenAPI token error: {data.get('msg')}")
self._openapi_token = data["result"]["accessToken"]
self._openapi_token_expires = time.time() + data["result"].get("expiresIn", 3600)
logger.info("Omada OpenAPI token acquired")
return self._openapi_token
async def _openapi_post(self, path: str, body: dict) -> dict:
token = await self._get_openapi_token()
resp = await self._get_openapi_client().post(
f"{OMADA_BASE_URL}/openapi/v1/{self._omadac_id}/{path}",
headers={"Authorization": f"AccessToken={token}"},
json=body,
)
resp.raise_for_status()
data = resp.json()
if data.get("errorCode", 0) != 0:
raise RuntimeError(f"Omada OpenAPI error {data.get('errorCode')}: {data.get('msg')}")
return data
# ── Public methods ─────────────────────────────────────────────────────────
async def get_all_sites(self) -> dict[str, str]:
await self._ensure_ready()
return self._sites
async def get_aps(self) -> list[dict]:
"""Fetch APs from every accessible site and return a combined list."""
await self._ensure_ready()
all_aps = []
failed_sites = 0
total_sites = len(self._sites)
for site_name, site_key in self._sites.items():
try:
data = await self._request_with_retry(
"GET",
f"{OMADA_BASE_URL}/{self._omadac_id}/api/v2/sites/{site_key}/devices",
)
devices = data.get("result") or []
for d in devices:
if d.get("type") == "ap":
d["_site_name"] = site_name
d["_site_key"] = site_key
all_aps.append(d)
except Exception as exc:
logger.warning("Failed to fetch devices for site '%s': %s", site_name, exc)
failed_sites += 1
if failed_sites > 0 and failed_sites == total_sites:
# Every site failed — session is likely stale with an unrecognised error code.
# Clear auth state so the next request triggers a full re-login instead of
# requiring a container restart.
logger.warning("All %d site(s) failed — clearing session state to force re-auth on next request", total_sites)
self._token = None
self._sites = None
logger.info("Fetched %d APs across %d sites", len(all_aps), total_sites)
return all_aps
async def _get_ap_uptime(self, mac: str, site_key: str) -> int:
"""Return current uptimeLong (seconds) for a specific AP."""
data = await self._request_with_retry(
"GET",
f"{OMADA_BASE_URL}/{self._omadac_id}/api/v2/sites/{site_key}/devices",
)
for device in (data.get("result") or []):
if device.get("mac") == mac and device.get("type") == "ap":
return int(device.get("uptimeLong", 0))
raise RuntimeError(f"AP {mac} not found in site {site_key}")
async def get_all_clients(self) -> dict[str, list[dict]]:
"""Return {ap_mac: [clients]} for every site (wireless clients only)."""
await self._ensure_ready()
ap_clients: dict[str, list] = {}
for site_name, site_key in self._sites.items():
try:
data = await self._request_with_retry(
"GET",
f"{OMADA_BASE_URL}/{self._omadac_id}/api/v2/sites/{site_key}/clients",
params={"page": 1, "pageSize": 1000, "filters.active": "true"},
)
result = data.get("result", {})
clients = result.get("data", []) if isinstance(result, dict) else (result if isinstance(result, list) else [])
for c in clients:
mac = c.get("apMac", "").upper().replace("-", ":")
if mac:
ap_clients.setdefault(mac, []).append(c)
except Exception as exc:
logger.warning("Failed to fetch clients for site '%s': %s", site_name, exc)
return ap_clients
async def get_ap_clients(self, ap_mac: str, site_key: str) -> list[dict]:
"""Fetch wireless clients currently connected to a specific AP."""
await self._ensure_ready()
norm_mac = ap_mac.upper().replace("-", ":")
try:
data = await self._request_with_retry(
"GET",
f"{OMADA_BASE_URL}/{self._omadac_id}/api/v2/sites/{site_key}/clients",
params={"page": 1, "pageSize": 200, "filters.active": "true", "filters.apMac": ap_mac},
)
except RuntimeError:
# Older firmware may not support filters.apMac — fall back and filter client-side
data = await self._request_with_retry(
"GET",
f"{OMADA_BASE_URL}/{self._omadac_id}/api/v2/sites/{site_key}/clients",
params={"page": 1, "pageSize": 200, "filters.active": "true"},
)
result = data.get("result", {})
clients = result.get("data", []) if isinstance(result, dict) else (result if isinstance(result, list) else [])
return [c for c in clients if c.get("apMac", "").upper().replace("-", ":") == norm_mac]
async def reboot_ap(self, mac: str, site_key: str, min_uptime: int = 300) -> dict:
await self._ensure_ready()
uptime = await self._get_ap_uptime(mac, site_key)
if uptime < min_uptime:
mins = uptime // 60
secs = uptime % 60
raise RuntimeError(
f"AP has only been online for {mins}m {secs}s. "
f"A minimum uptime of {min_uptime // 60} minutes is required before rebooting."
)
resp = await self._get_client().post(
f"{OMADA_BASE_URL}/{self._omadac_id}/api/v2/sites/{site_key}/cmd/devices/{mac}/reboot",
headers=self._auth_headers(),
json={},
)
resp.raise_for_status()
data = resp.json()
# -39009 = "Rebooting... Please wait." — Omada's success code for this command
if data.get("errorCode", 0) not in (0, -39009):
raise RuntimeError(f"Omada API error {data.get('errorCode')}: {data.get('msg')}")
return data
omada_client = OmadaClient()