scenarist / app /core /security.py
github-actions[bot]
Sync backend to Hugging Face Space (commit: 39b5c807918249fa80049d49f4b6a74d6a0ed1fc)
6d86412
Raw
History Blame Contribute Delete
3.92 kB
"""JWT + bcrypt authentication for Orsync Scenarist v7.0.
Uses HS256 signing via python-jose and bcrypt password hashing.
Provides a FastAPI dependency (`get_current_user`) for protecting routes.
"""
from __future__ import annotations
from datetime import datetime, timedelta, timezone
from typing import Any
import hmac
import logging
import bcrypt
from fastapi import Depends, Header, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
from jose import JWTError, jwt
from backend.app.core.config import settings
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/api/auth/login")
logger = logging.getLogger(__name__)
# ── Password hashing ─────────────────────────────────────────────────────────
def hash_password(plain: str) -> str:
return bcrypt.hashpw(plain.encode("utf-8"), bcrypt.gensalt()).decode("utf-8")
def verify_password(plain: str, hashed: str) -> bool:
return bcrypt.checkpw(plain.encode("utf-8"), hashed.encode("utf-8"))
# ── JWT token operations ──────────────────────────────────────────────────────
def create_access_token(data: dict[str, Any], expires_delta: timedelta | None = None) -> str:
to_encode = data.copy()
expire = datetime.now(timezone.utc) + (
expires_delta or timedelta(minutes=settings.jwt_access_token_expire_minutes)
)
to_encode["exp"] = expire
return jwt.encode(to_encode, settings.effective_jwt_secret_key, algorithm=settings.jwt_algorithm)
def decode_access_token(token: str) -> dict[str, Any]:
return jwt.decode(token, settings.effective_jwt_secret_key, algorithms=[settings.jwt_algorithm])
# ── FastAPI dependency ────────────────────────────────────────────────────────
def get_current_user(token: str = Depends(oauth2_scheme)) -> dict[str, Any]:
"""Decode the bearer token and return the user payload.
Raises HTTP 401 if the token is invalid or expired.
"""
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid or expired token",
headers={"WWW-Authenticate": "Bearer"},
)
try:
payload = decode_access_token(token)
sub: str | None = payload.get("sub")
if sub is None:
raise credentials_exception
return {"sub": sub, **{k: v for k, v in payload.items() if k != "exp"}}
except JWTError:
raise credentials_exception
def require_admin_mutation(
x_admin_token: str | None = Header(default=None, alias="X-Admin-Token"),
authorization: str | None = Header(default=None),
) -> bool:
"""Minimal MVP guard for destructive/admin mutation endpoints."""
if not settings.enable_admin_mutations:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Admin mutation endpoints are disabled by default. Set ENABLE_ADMIN_MUTATIONS=true to enable.",
)
configured = settings.admin_token.strip()
bearer = ""
if authorization and authorization.lower().startswith("bearer "):
bearer = authorization.split(" ", 1)[1].strip()
supplied = (x_admin_token or bearer or "").strip()
if configured:
if not supplied or not hmac.compare_digest(supplied, configured):
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Invalid admin token")
elif settings.is_production:
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="ADMIN_TOKEN is required in production")
else:
logger.warning("Admin mutations are enabled without ADMIN_TOKEN in development.")
return True