Files
Salus/app/auth.py
Christoph Gasser 284924e86d Initial release — Salus by Stranto v1.6.1.0
FastAPI/Jinja2 web app for viewing and rebooting TP-Link Omada APs
across all sites. Authentik OIDC auth, SQLite audit log, Docker deploy.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-27 14:36:02 +02:00

119 lines
3.9 KiB
Python

import json
import base64
import secrets
import os
import logging
from typing import Optional
from urllib.parse import urlencode
import httpx
from fastapi import Request, HTTPException
from fastapi.responses import RedirectResponse
logger = logging.getLogger(__name__)
AUTHENTIK_ISSUER = os.getenv("AUTHENTIK_ISSUER", "").rstrip("/")
AUTHENTIK_CLIENT_ID = os.getenv("AUTHENTIK_CLIENT_ID", "")
AUTHENTIK_CLIENT_SECRET = os.getenv("AUTHENTIK_CLIENT_SECRET", "")
AUTHENTIK_REDIRECT_URI = os.getenv("AUTHENTIK_REDIRECT_URI", "http://localhost:8080/auth/callback")
SESSION_SECRET_KEY = os.getenv("SESSION_SECRET_KEY", "changeme_please_set_in_env")
_oidc_config: Optional[dict] = None
async def get_oidc_config() -> dict:
global _oidc_config
if _oidc_config is None:
url = f"{AUTHENTIK_ISSUER}/.well-known/openid-configuration"
async with httpx.AsyncClient(timeout=10.0) as client:
resp = await client.get(url)
resp.raise_for_status()
_oidc_config = resp.json()
logger.info("OIDC config loaded from %s", url)
return _oidc_config
async def build_login_url(request: Request) -> str:
config = await get_oidc_config()
state = secrets.token_urlsafe(16)
nonce = secrets.token_urlsafe(16)
request.session["oauth_state"] = state
request.session["oauth_nonce"] = nonce
params = {
"response_type": "code",
"client_id": AUTHENTIK_CLIENT_ID,
"redirect_uri": AUTHENTIK_REDIRECT_URI,
"scope": "openid profile email",
"state": state,
"nonce": nonce,
}
return f"{config['authorization_endpoint']}?{urlencode(params)}"
def _decode_jwt_payload(token: str) -> dict:
"""Decode JWT payload without signature verification (Authentik is trusted)."""
try:
payload_b64 = token.split(".")[1]
padding = "=" * (4 - len(payload_b64) % 4)
return json.loads(base64.urlsafe_b64decode(payload_b64 + padding))
except Exception:
return {}
async def exchange_code(request: Request) -> dict:
"""Handle OIDC callback: exchange code for tokens, extract user info."""
config = await get_oidc_config()
code = request.query_params.get("code")
state = request.query_params.get("state")
error = request.query_params.get("error")
if error:
raise HTTPException(status_code=400, detail=f"OAuth error: {error}")
if not code:
raise HTTPException(status_code=400, detail="Missing authorization code")
if state != request.session.get("oauth_state"):
raise HTTPException(status_code=400, detail="OAuth state mismatch")
async with httpx.AsyncClient(timeout=15.0) as client:
token_resp = await client.post(
config["token_endpoint"],
data={
"grant_type": "authorization_code",
"code": code,
"redirect_uri": AUTHENTIK_REDIRECT_URI,
"client_id": AUTHENTIK_CLIENT_ID,
"client_secret": AUTHENTIK_CLIENT_SECRET,
},
)
token_resp.raise_for_status()
tokens = token_resp.json()
claims = _decode_jwt_payload(tokens.get("id_token", ""))
user = {
"username": claims.get("preferred_username") or claims.get("sub", "unknown"),
"email": claims.get("email", ""),
"name": claims.get("name", ""),
"sub": claims.get("sub", ""),
}
request.session["user"] = user
request.session.pop("oauth_state", None)
request.session.pop("oauth_nonce", None)
logger.info("User logged in: %s", user["username"])
return user
def get_current_user(request: Request) -> Optional[dict]:
return request.session.get("user")
def require_auth(request: Request) -> dict:
user = get_current_user(request)
if not user:
raise HTTPException(
status_code=302,
headers={"Location": "/auth/login"},
)
return user