FastAPI/Jinja2 web app for viewing and rebooting TP-Link Omada APs across all sites. Authentik OIDC auth, SQLite audit log, Docker deploy. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
215 lines
8.9 KiB
Python
215 lines
8.9 KiB
Python
import os
|
|
import time
|
|
import logging
|
|
from typing import Optional
|
|
|
|
import httpx
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
OMADA_BASE_URL = os.getenv("OMADA_BASE_URL", "https://192.168.1.1:8043").rstrip("/")
|
|
OMADA_USERNAME = os.getenv("OMADA_USERNAME", "")
|
|
OMADA_PASSWORD = os.getenv("OMADA_PASSWORD", "")
|
|
OMADA_VERIFY_SSL = os.getenv("OMADA_VERIFY_SSL", "true").strip().lower() != "false"
|
|
OMADA_CLIENT_ID = os.getenv("OMADA_CLIENT_ID", "")
|
|
OMADA_CLIENT_SECRET = os.getenv("OMADA_CLIENT_SECRET", "")
|
|
|
|
|
|
class OmadaClient:
|
|
def __init__(self):
|
|
self._client: Optional[httpx.AsyncClient] = None
|
|
self._omadac_id: Optional[str] = None
|
|
self._token: Optional[str] = None
|
|
# {site_name: site_key}
|
|
self._sites: Optional[dict[str, str]] = None
|
|
# OpenAPI OAuth2
|
|
self._openapi_client: Optional[httpx.AsyncClient] = None
|
|
self._openapi_token: Optional[str] = None
|
|
self._openapi_token_expires: float = 0.0
|
|
|
|
def _get_client(self) -> httpx.AsyncClient:
|
|
"""Session client — carries web-login cookies for the v2 API."""
|
|
if self._client is None or self._client.is_closed:
|
|
self._client = httpx.AsyncClient(
|
|
verify=OMADA_VERIFY_SSL,
|
|
timeout=30.0,
|
|
follow_redirects=True,
|
|
)
|
|
return self._client
|
|
|
|
def _get_openapi_client(self) -> httpx.AsyncClient:
|
|
"""Cookie-free client for OpenAPI calls — session cookies must not bleed in."""
|
|
if self._openapi_client is None or self._openapi_client.is_closed:
|
|
self._openapi_client = httpx.AsyncClient(
|
|
verify=OMADA_VERIFY_SSL,
|
|
timeout=30.0,
|
|
follow_redirects=True,
|
|
cookies={},
|
|
)
|
|
return self._openapi_client
|
|
|
|
def _auth_headers(self) -> dict:
|
|
return {"Csrf-Token": self._token} if self._token else {}
|
|
|
|
async def _fetch_omadac_id(self) -> str:
|
|
resp = await self._get_client().get(f"{OMADA_BASE_URL}/api/info")
|
|
resp.raise_for_status()
|
|
data = resp.json()
|
|
if data.get("errorCode", -1) != 0:
|
|
raise RuntimeError(f"Omada /api/info error: {data.get('msg')}")
|
|
self._omadac_id = data["result"]["omadacId"]
|
|
logger.info("Omada controller ID: %s", self._omadac_id)
|
|
return self._omadac_id
|
|
|
|
async def _login(self) -> str:
|
|
if not self._omadac_id:
|
|
await self._fetch_omadac_id()
|
|
resp = await self._get_client().post(
|
|
f"{OMADA_BASE_URL}/{self._omadac_id}/api/v2/login",
|
|
json={"username": OMADA_USERNAME, "password": OMADA_PASSWORD},
|
|
)
|
|
resp.raise_for_status()
|
|
data = resp.json()
|
|
if data.get("errorCode", -1) != 0:
|
|
raise RuntimeError(f"Omada login failed: {data.get('msg')}")
|
|
self._token = data["result"]["token"]
|
|
logger.info("Omada web-API login successful")
|
|
return self._token
|
|
|
|
async def _fetch_sites(self) -> dict[str, str]:
|
|
"""Return {site_name: site_key} for all sites accessible to this user."""
|
|
resp = await self._get_client().get(
|
|
f"{OMADA_BASE_URL}/{self._omadac_id}/api/v2/users/current",
|
|
headers=self._auth_headers(),
|
|
)
|
|
resp.raise_for_status()
|
|
data = resp.json()
|
|
if data.get("errorCode", -1) != 0:
|
|
raise RuntimeError(f"Omada users/current error: {data.get('msg')}")
|
|
raw = data.get("result", {}).get("privilege", {}).get("sites", [])
|
|
self._sites = {s["name"]: s["key"] for s in raw}
|
|
logger.info("Loaded %d Omada sites", len(self._sites))
|
|
return self._sites
|
|
|
|
async def _ensure_ready(self):
|
|
if not self._token:
|
|
await self._login()
|
|
if self._sites is None:
|
|
await self._fetch_sites()
|
|
|
|
async def _request_with_retry(self, method: str, url: str, **kwargs):
|
|
client = self._get_client()
|
|
resp = await client.request(method, url, headers=self._auth_headers(), **kwargs)
|
|
resp.raise_for_status()
|
|
data = resp.json()
|
|
if data.get("errorCode", 0) in (-1006, -1003, -30109):
|
|
self._token = None
|
|
self._sites = None
|
|
await self._login()
|
|
resp = await client.request(method, url, headers=self._auth_headers(), **kwargs)
|
|
resp.raise_for_status()
|
|
data = resp.json()
|
|
if data.get("errorCode", 0) != 0:
|
|
raise RuntimeError(f"Omada API error {data.get('errorCode')}: {data.get('msg')}")
|
|
return data
|
|
|
|
# ── OpenAPI OAuth2 (commands) ──────────────────────────────────────────────
|
|
|
|
async def _get_openapi_token(self) -> str:
|
|
if self._openapi_token and time.time() < self._openapi_token_expires - 60:
|
|
return self._openapi_token
|
|
if not self._omadac_id:
|
|
await self._fetch_omadac_id()
|
|
resp = await self._get_openapi_client().post(
|
|
f"{OMADA_BASE_URL}/openapi/authorize/token",
|
|
data={
|
|
"grantType": "client_credentials",
|
|
"clientId": OMADA_CLIENT_ID,
|
|
"clientSecret": OMADA_CLIENT_SECRET,
|
|
},
|
|
)
|
|
resp.raise_for_status()
|
|
data = resp.json()
|
|
if data.get("errorCode", -1) != 0:
|
|
raise RuntimeError(f"Omada OpenAPI token error: {data.get('msg')}")
|
|
self._openapi_token = data["result"]["accessToken"]
|
|
self._openapi_token_expires = time.time() + data["result"].get("expiresIn", 3600)
|
|
logger.info("Omada OpenAPI token acquired")
|
|
return self._openapi_token
|
|
|
|
async def _openapi_post(self, path: str, body: dict) -> dict:
|
|
token = await self._get_openapi_token()
|
|
resp = await self._get_openapi_client().post(
|
|
f"{OMADA_BASE_URL}/openapi/v1/{self._omadac_id}/{path}",
|
|
headers={"Authorization": f"AccessToken={token}"},
|
|
json=body,
|
|
)
|
|
resp.raise_for_status()
|
|
data = resp.json()
|
|
if data.get("errorCode", 0) != 0:
|
|
raise RuntimeError(f"Omada OpenAPI error {data.get('errorCode')}: {data.get('msg')}")
|
|
return data
|
|
|
|
# ── Public methods ─────────────────────────────────────────────────────────
|
|
|
|
async def get_all_sites(self) -> dict[str, str]:
|
|
await self._ensure_ready()
|
|
return self._sites
|
|
|
|
async def get_aps(self) -> list[dict]:
|
|
"""Fetch APs from every accessible site and return a combined list."""
|
|
await self._ensure_ready()
|
|
all_aps = []
|
|
for site_name, site_key in self._sites.items():
|
|
try:
|
|
data = await self._request_with_retry(
|
|
"GET",
|
|
f"{OMADA_BASE_URL}/{self._omadac_id}/api/v2/sites/{site_key}/devices",
|
|
)
|
|
devices = data.get("result") or []
|
|
for d in devices:
|
|
if d.get("type") == "ap":
|
|
d["_site_name"] = site_name
|
|
d["_site_key"] = site_key
|
|
all_aps.append(d)
|
|
except Exception as exc:
|
|
logger.warning("Failed to fetch devices for site '%s': %s", site_name, exc)
|
|
logger.info("Fetched %d APs across %d sites", len(all_aps), len(self._sites))
|
|
return all_aps
|
|
|
|
async def _get_ap_uptime(self, mac: str, site_key: str) -> int:
|
|
"""Return current uptimeLong (seconds) for a specific AP."""
|
|
data = await self._request_with_retry(
|
|
"GET",
|
|
f"{OMADA_BASE_URL}/{self._omadac_id}/api/v2/sites/{site_key}/devices",
|
|
)
|
|
for device in (data.get("result") or []):
|
|
if device.get("mac") == mac and device.get("type") == "ap":
|
|
return int(device.get("uptimeLong", 0))
|
|
raise RuntimeError(f"AP {mac} not found in site {site_key}")
|
|
|
|
async def reboot_ap(self, mac: str, site_key: str, min_uptime: int = 300) -> dict:
|
|
await self._ensure_ready()
|
|
uptime = await self._get_ap_uptime(mac, site_key)
|
|
if uptime < min_uptime:
|
|
mins = uptime // 60
|
|
secs = uptime % 60
|
|
raise RuntimeError(
|
|
f"AP has only been online for {mins}m {secs}s. "
|
|
f"A minimum uptime of {min_uptime // 60} minutes is required before rebooting."
|
|
)
|
|
resp = await self._get_client().post(
|
|
f"{OMADA_BASE_URL}/{self._omadac_id}/api/v2/sites/{site_key}/cmd/devices/{mac}/reboot",
|
|
headers=self._auth_headers(),
|
|
json={},
|
|
)
|
|
resp.raise_for_status()
|
|
data = resp.json()
|
|
# -39009 = "Rebooting... Please wait." — Omada's success code for this command
|
|
if data.get("errorCode", 0) not in (0, -39009):
|
|
raise RuntimeError(f"Omada API error {data.get('errorCode')}: {data.get('msg')}")
|
|
return data
|
|
|
|
|
|
omada_client = OmadaClient()
|