import json import base64 import secrets import os import logging from typing import Optional from urllib.parse import urlencode import httpx from fastapi import Request, HTTPException from fastapi.responses import RedirectResponse logger = logging.getLogger(__name__) AUTHENTIK_ISSUER = os.getenv("AUTHENTIK_ISSUER", "").rstrip("/") AUTHENTIK_CLIENT_ID = os.getenv("AUTHENTIK_CLIENT_ID", "") AUTHENTIK_CLIENT_SECRET = os.getenv("AUTHENTIK_CLIENT_SECRET", "") AUTHENTIK_REDIRECT_URI = os.getenv("AUTHENTIK_REDIRECT_URI", "http://localhost:8080/auth/callback") SESSION_SECRET_KEY = os.getenv("SESSION_SECRET_KEY", "changeme_please_set_in_env") _oidc_config: Optional[dict] = None async def get_oidc_config() -> dict: global _oidc_config if _oidc_config is None: url = f"{AUTHENTIK_ISSUER}/.well-known/openid-configuration" async with httpx.AsyncClient(timeout=10.0) as client: resp = await client.get(url) resp.raise_for_status() _oidc_config = resp.json() logger.info("OIDC config loaded from %s", url) return _oidc_config async def build_login_url(request: Request) -> str: config = await get_oidc_config() state = secrets.token_urlsafe(16) nonce = secrets.token_urlsafe(16) request.session["oauth_state"] = state request.session["oauth_nonce"] = nonce params = { "response_type": "code", "client_id": AUTHENTIK_CLIENT_ID, "redirect_uri": AUTHENTIK_REDIRECT_URI, "scope": "openid profile email", "state": state, "nonce": nonce, } return f"{config['authorization_endpoint']}?{urlencode(params)}" def _decode_jwt_payload(token: str) -> dict: """Decode JWT payload without signature verification (Authentik is trusted).""" try: payload_b64 = token.split(".")[1] padding = "=" * (4 - len(payload_b64) % 4) return json.loads(base64.urlsafe_b64decode(payload_b64 + padding)) except Exception: return {} async def exchange_code(request: Request) -> dict: """Handle OIDC callback: exchange code for tokens, extract user info.""" config = await get_oidc_config() code = request.query_params.get("code") state = request.query_params.get("state") error = request.query_params.get("error") if error: raise HTTPException(status_code=400, detail=f"OAuth error: {error}") if not code: raise HTTPException(status_code=400, detail="Missing authorization code") if state != request.session.get("oauth_state"): raise HTTPException(status_code=400, detail="OAuth state mismatch") async with httpx.AsyncClient(timeout=15.0) as client: token_resp = await client.post( config["token_endpoint"], data={ "grant_type": "authorization_code", "code": code, "redirect_uri": AUTHENTIK_REDIRECT_URI, "client_id": AUTHENTIK_CLIENT_ID, "client_secret": AUTHENTIK_CLIENT_SECRET, }, ) token_resp.raise_for_status() tokens = token_resp.json() claims = _decode_jwt_payload(tokens.get("id_token", "")) user = { "username": claims.get("preferred_username") or claims.get("sub", "unknown"), "email": claims.get("email", ""), "name": claims.get("name", ""), "sub": claims.get("sub", ""), } request.session["user"] = user request.session.pop("oauth_state", None) request.session.pop("oauth_nonce", None) logger.info("User logged in: %s", user["username"]) return user def get_current_user(request: Request) -> Optional[dict]: return request.session.get("user") def require_auth(request: Request) -> dict: user = get_current_user(request) if not user: raise HTTPException( status_code=302, headers={"Location": "/auth/login"}, ) return user