"""JWT + bcrypt authentication for Orsync Scenarist v7.0. Uses HS256 signing via python-jose and bcrypt password hashing. Provides a FastAPI dependency (`get_current_user`) for protecting routes. """ from __future__ import annotations from datetime import datetime, timedelta, timezone from typing import Any import hmac import logging import bcrypt from fastapi import Depends, Header, HTTPException, status from fastapi.security import OAuth2PasswordBearer from jose import JWTError, jwt from backend.app.core.config import settings oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/api/auth/login") logger = logging.getLogger(__name__) # ── Password hashing ───────────────────────────────────────────────────────── def hash_password(plain: str) -> str: return bcrypt.hashpw(plain.encode("utf-8"), bcrypt.gensalt()).decode("utf-8") def verify_password(plain: str, hashed: str) -> bool: return bcrypt.checkpw(plain.encode("utf-8"), hashed.encode("utf-8")) # ── JWT token operations ────────────────────────────────────────────────────── def create_access_token(data: dict[str, Any], expires_delta: timedelta | None = None) -> str: to_encode = data.copy() expire = datetime.now(timezone.utc) + ( expires_delta or timedelta(minutes=settings.jwt_access_token_expire_minutes) ) to_encode["exp"] = expire return jwt.encode(to_encode, settings.effective_jwt_secret_key, algorithm=settings.jwt_algorithm) def decode_access_token(token: str) -> dict[str, Any]: return jwt.decode(token, settings.effective_jwt_secret_key, algorithms=[settings.jwt_algorithm]) # ── FastAPI dependency ──────────────────────────────────────────────────────── def get_current_user(token: str = Depends(oauth2_scheme)) -> dict[str, Any]: """Decode the bearer token and return the user payload. Raises HTTP 401 if the token is invalid or expired. """ credentials_exception = HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid or expired token", headers={"WWW-Authenticate": "Bearer"}, ) try: payload = decode_access_token(token) sub: str | None = payload.get("sub") if sub is None: raise credentials_exception return {"sub": sub, **{k: v for k, v in payload.items() if k != "exp"}} except JWTError: raise credentials_exception def require_admin_mutation( x_admin_token: str | None = Header(default=None, alias="X-Admin-Token"), authorization: str | None = Header(default=None), ) -> bool: """Minimal MVP guard for destructive/admin mutation endpoints.""" if not settings.enable_admin_mutations: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="Admin mutation endpoints are disabled by default. Set ENABLE_ADMIN_MUTATIONS=true to enable.", ) configured = settings.admin_token.strip() bearer = "" if authorization and authorization.lower().startswith("bearer "): bearer = authorization.split(" ", 1)[1].strip() supplied = (x_admin_token or bearer or "").strip() if configured: if not supplied or not hmac.compare_digest(supplied, configured): raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Invalid admin token") elif settings.is_production: raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="ADMIN_TOKEN is required in production") else: logger.warning("Admin mutations are enabled without ADMIN_TOKEN in development.") return True