Spaces:
Sleeping
Sleeping
github-actions[bot]
Sync backend to Hugging Face Space (commit: 39b5c807918249fa80049d49f4b6a74d6a0ed1fc)
6d86412 | """JWT + bcrypt authentication for Orsync Scenarist v7.0. | |
| Uses HS256 signing via python-jose and bcrypt password hashing. | |
| Provides a FastAPI dependency (`get_current_user`) for protecting routes. | |
| """ | |
| from __future__ import annotations | |
| from datetime import datetime, timedelta, timezone | |
| from typing import Any | |
| import hmac | |
| import logging | |
| import bcrypt | |
| from fastapi import Depends, Header, HTTPException, status | |
| from fastapi.security import OAuth2PasswordBearer | |
| from jose import JWTError, jwt | |
| from backend.app.core.config import settings | |
| oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/api/auth/login") | |
| logger = logging.getLogger(__name__) | |
| # ββ Password hashing βββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def hash_password(plain: str) -> str: | |
| return bcrypt.hashpw(plain.encode("utf-8"), bcrypt.gensalt()).decode("utf-8") | |
| def verify_password(plain: str, hashed: str) -> bool: | |
| return bcrypt.checkpw(plain.encode("utf-8"), hashed.encode("utf-8")) | |
| # ββ JWT token operations ββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def create_access_token(data: dict[str, Any], expires_delta: timedelta | None = None) -> str: | |
| to_encode = data.copy() | |
| expire = datetime.now(timezone.utc) + ( | |
| expires_delta or timedelta(minutes=settings.jwt_access_token_expire_minutes) | |
| ) | |
| to_encode["exp"] = expire | |
| return jwt.encode(to_encode, settings.effective_jwt_secret_key, algorithm=settings.jwt_algorithm) | |
| def decode_access_token(token: str) -> dict[str, Any]: | |
| return jwt.decode(token, settings.effective_jwt_secret_key, algorithms=[settings.jwt_algorithm]) | |
| # ββ FastAPI dependency ββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def get_current_user(token: str = Depends(oauth2_scheme)) -> dict[str, Any]: | |
| """Decode the bearer token and return the user payload. | |
| Raises HTTP 401 if the token is invalid or expired. | |
| """ | |
| credentials_exception = HTTPException( | |
| status_code=status.HTTP_401_UNAUTHORIZED, | |
| detail="Invalid or expired token", | |
| headers={"WWW-Authenticate": "Bearer"}, | |
| ) | |
| try: | |
| payload = decode_access_token(token) | |
| sub: str | None = payload.get("sub") | |
| if sub is None: | |
| raise credentials_exception | |
| return {"sub": sub, **{k: v for k, v in payload.items() if k != "exp"}} | |
| except JWTError: | |
| raise credentials_exception | |
| def require_admin_mutation( | |
| x_admin_token: str | None = Header(default=None, alias="X-Admin-Token"), | |
| authorization: str | None = Header(default=None), | |
| ) -> bool: | |
| """Minimal MVP guard for destructive/admin mutation endpoints.""" | |
| if not settings.enable_admin_mutations: | |
| raise HTTPException( | |
| status_code=status.HTTP_403_FORBIDDEN, | |
| detail="Admin mutation endpoints are disabled by default. Set ENABLE_ADMIN_MUTATIONS=true to enable.", | |
| ) | |
| configured = settings.admin_token.strip() | |
| bearer = "" | |
| if authorization and authorization.lower().startswith("bearer "): | |
| bearer = authorization.split(" ", 1)[1].strip() | |
| supplied = (x_admin_token or bearer or "").strip() | |
| if configured: | |
| if not supplied or not hmac.compare_digest(supplied, configured): | |
| raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Invalid admin token") | |
| elif settings.is_production: | |
| raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="ADMIN_TOKEN is required in production") | |
| else: | |
| logger.warning("Admin mutations are enabled without ADMIN_TOKEN in development.") | |
| return True | |