Initial release — Salus by Stranto v1.6.1.0

FastAPI/Jinja2 web app for viewing and rebooting TP-Link Omada APs
across all sites. Authentik OIDC auth, SQLite audit log, Docker deploy.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-04-27 14:36:02 +02:00
commit 284924e86d
17 changed files with 1646 additions and 0 deletions

118
app/auth.py Normal file
View File

@@ -0,0 +1,118 @@
import json
import base64
import secrets
import os
import logging
from typing import Optional
from urllib.parse import urlencode
import httpx
from fastapi import Request, HTTPException
from fastapi.responses import RedirectResponse
logger = logging.getLogger(__name__)
AUTHENTIK_ISSUER = os.getenv("AUTHENTIK_ISSUER", "").rstrip("/")
AUTHENTIK_CLIENT_ID = os.getenv("AUTHENTIK_CLIENT_ID", "")
AUTHENTIK_CLIENT_SECRET = os.getenv("AUTHENTIK_CLIENT_SECRET", "")
AUTHENTIK_REDIRECT_URI = os.getenv("AUTHENTIK_REDIRECT_URI", "http://localhost:8080/auth/callback")
SESSION_SECRET_KEY = os.getenv("SESSION_SECRET_KEY", "changeme_please_set_in_env")
_oidc_config: Optional[dict] = None
async def get_oidc_config() -> dict:
global _oidc_config
if _oidc_config is None:
url = f"{AUTHENTIK_ISSUER}/.well-known/openid-configuration"
async with httpx.AsyncClient(timeout=10.0) as client:
resp = await client.get(url)
resp.raise_for_status()
_oidc_config = resp.json()
logger.info("OIDC config loaded from %s", url)
return _oidc_config
async def build_login_url(request: Request) -> str:
config = await get_oidc_config()
state = secrets.token_urlsafe(16)
nonce = secrets.token_urlsafe(16)
request.session["oauth_state"] = state
request.session["oauth_nonce"] = nonce
params = {
"response_type": "code",
"client_id": AUTHENTIK_CLIENT_ID,
"redirect_uri": AUTHENTIK_REDIRECT_URI,
"scope": "openid profile email",
"state": state,
"nonce": nonce,
}
return f"{config['authorization_endpoint']}?{urlencode(params)}"
def _decode_jwt_payload(token: str) -> dict:
"""Decode JWT payload without signature verification (Authentik is trusted)."""
try:
payload_b64 = token.split(".")[1]
padding = "=" * (4 - len(payload_b64) % 4)
return json.loads(base64.urlsafe_b64decode(payload_b64 + padding))
except Exception:
return {}
async def exchange_code(request: Request) -> dict:
"""Handle OIDC callback: exchange code for tokens, extract user info."""
config = await get_oidc_config()
code = request.query_params.get("code")
state = request.query_params.get("state")
error = request.query_params.get("error")
if error:
raise HTTPException(status_code=400, detail=f"OAuth error: {error}")
if not code:
raise HTTPException(status_code=400, detail="Missing authorization code")
if state != request.session.get("oauth_state"):
raise HTTPException(status_code=400, detail="OAuth state mismatch")
async with httpx.AsyncClient(timeout=15.0) as client:
token_resp = await client.post(
config["token_endpoint"],
data={
"grant_type": "authorization_code",
"code": code,
"redirect_uri": AUTHENTIK_REDIRECT_URI,
"client_id": AUTHENTIK_CLIENT_ID,
"client_secret": AUTHENTIK_CLIENT_SECRET,
},
)
token_resp.raise_for_status()
tokens = token_resp.json()
claims = _decode_jwt_payload(tokens.get("id_token", ""))
user = {
"username": claims.get("preferred_username") or claims.get("sub", "unknown"),
"email": claims.get("email", ""),
"name": claims.get("name", ""),
"sub": claims.get("sub", ""),
}
request.session["user"] = user
request.session.pop("oauth_state", None)
request.session.pop("oauth_nonce", None)
logger.info("User logged in: %s", user["username"])
return user
def get_current_user(request: Request) -> Optional[dict]:
return request.session.get("user")
def require_auth(request: Request) -> dict:
user = get_current_user(request)
if not user:
raise HTTPException(
status_code=302,
headers={"Location": "/auth/login"},
)
return user