tebyan-medical-backend / setup_auth.py
Ψ±ΨΊΨ―
feat: complete platform β€” auth, deployment, hardening
344e369
raw
history blame
15.8 kB
"""
Tebyan Medical β€” Auth Setup & Verification Script
--------------------------------------------------
Run ONCE after adding SUPABASE_JWT_SECRET and SUPABASE_SERVICE_KEY to .env
python setup_auth.py
What it does:
1. Validates all env vars are present
2. Creates a test user (bypasses email confirmation)
3. Tests login / JWT / session refresh / logout
4. Enables RLS on analyses and chat_messages via direct DB
5. Verifies RLS blocks anon REST access
6. Tests backend JWT verification
7. Cleans up test user
8. Prints a final pass/fail report
"""
from __future__ import annotations
import sys, os, json, time, re, base64
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "backend"))
from dotenv import load_dotenv
load_dotenv(os.path.join(os.path.dirname(__file__), ".env"))
try:
import requests
except ImportError:
sys.exit("pip install requests")
# ── Config ────────────────────────────────────────────────────────────────────
SUPABASE_URL = os.getenv("SUPABASE_URL", "")
ANON_KEY = os.getenv("SUPABASE_KEY", "")
SERVICE_KEY = os.getenv("SUPABASE_SERVICE_KEY", "")
JWT_SECRET = os.getenv("SUPABASE_JWT_SECRET", "")
BACKEND = "http://localhost:8000"
TEST_EMAIL = f"tebyan_test_{int(time.time())}@mailnull.com"
TEST_PASS = "TebyanTest2026!#"
results: list[tuple[str, str, str]] = [] # (name, status, detail)
def ok(name, detail=""): results.append((name, "PASS", detail))
def fail(name, detail=""): results.append((name, "FAIL", detail))
def warn(name, detail=""): results.append((name, "WARN", detail))
def sb_headers(key=None):
k = key or ANON_KEY
return {"apikey": k, "Authorization": f"Bearer {k}", "Content-Type": "application/json"}
# ── Pre-flight ────────────────────────────────────────────────────────────────
print("\n━━━ Tebyan Auth Setup & Verification ━━━\n")
missing = [v for v in ["SUPABASE_URL","SUPABASE_KEY","SUPABASE_SERVICE_KEY","SUPABASE_JWT_SECRET"] if not os.getenv(v)]
if missing:
print("❌ MISSING env vars:", missing)
print("\nOpen d:\\Project\\.env and fill in:")
print(" SUPABASE_JWT_SECRET β†’ Supabase Dashboard β†’ Project Settings β†’ API β†’ JWT Settings")
print(" SUPABASE_SERVICE_KEY β†’ Supabase Dashboard β†’ Project Settings β†’ API β†’ service_role key")
sys.exit(1)
print(f"βœ“ All env vars present\n")
# ── Step 1: Create test user (admin API with service_role key) ─────────────
print("[1] Creating test user via admin API...")
r = requests.post(
f"{SUPABASE_URL}/auth/v1/admin/users",
headers={**sb_headers(SERVICE_KEY), "Authorization": f"Bearer {SERVICE_KEY}"},
json={"email": TEST_EMAIL, "password": TEST_PASS, "email_confirm": True,
"user_metadata": {"full_name": "Tebyan Audit Bot"}},
timeout=15,
)
if r.status_code in (200, 201):
user_id = r.json().get("id")
ok("Create test user", f"id={user_id}")
print(f" βœ“ Created: {TEST_EMAIL} id={user_id}")
else:
fail("Create test user", f"HTTP {r.status_code}: {r.text[:200]}")
print(f" βœ— {r.status_code}: {r.text[:200]}")
user_id = None
# ── Step 2: Login ─────────────────────────────────────────────────────────────
print("\n[2] Testing login...")
r2 = requests.post(
f"{SUPABASE_URL}/auth/v1/token?grant_type=password",
headers=sb_headers(),
json={"email": TEST_EMAIL, "password": TEST_PASS},
timeout=15,
)
if r2.status_code == 200:
data = r2.json()
access_tok = data.get("access_token", "")
refresh_tok = data.get("refresh_token", "")
expires_in = data.get("expires_in", 0)
ok("Login", f"expires_in={expires_in}s")
print(f" βœ“ access_token: {access_tok[:40]}...")
print(f" βœ“ refresh_token: {refresh_tok[:20]}...")
print(f" βœ“ expires_in: {expires_in}s")
else:
fail("Login", f"HTTP {r2.status_code}: {r2.text[:200]}")
print(f" βœ— {r2.status_code}: {r2.text[:200]}")
access_tok = refresh_tok = ""
# ── Step 3: JWT structure decode ──────────────────────────────────────────────
print("\n[3] Decoding JWT payload...")
if access_tok:
parts = access_tok.split(".")
padded = parts[1] + "=" * (4 - len(parts[1]) % 4)
payload = json.loads(base64.b64decode(padded))
print(f" sub = {payload.get('sub')}")
print(f" email= {payload.get('email')}")
print(f" role = {payload.get('role')}")
print(f" aud = {payload.get('aud')}")
if payload.get("aud") == "authenticated" and payload.get("role") == "authenticated":
ok("JWT structure", f"aud=authenticated role=authenticated sub={payload.get('sub')[:8]}...")
else:
fail("JWT structure", f"unexpected aud/role: {payload}")
else:
warn("JWT structure", "skipped β€” no access_token")
# ── Step 4: Backend JWT verification ──────────────────────────────────────────
print("\n[4] Testing backend JWT verification...")
try:
import jwt as pyjwt
from jwt import PyJWKClient
header = pyjwt.get_unverified_header(access_tok)
alg = header.get("alg", "HS256")
print(f" JWT algorithm: {alg}")
if alg == "ES256":
# Newer Supabase projects use ES256 with JWKS
jwks_client = PyJWKClient(f"{SUPABASE_URL}/auth/v1/.well-known/jwks.json")
signing_key = jwks_client.get_signing_key_from_jwt(access_tok)
decoded = pyjwt.decode(access_tok, signing_key.key, algorithms=["ES256"], audience="authenticated")
ok("Backend JWT verify (ES256/JWKS)", f"sub={decoded.get('sub','')[:8]}...")
print(f" βœ“ JWT verified via JWKS public key (ES256)")
else:
decoded = pyjwt.decode(access_tok, JWT_SECRET, algorithms=["HS256"], audience="authenticated")
ok("Backend JWT verify (HS256)", f"sub={decoded.get('sub','')[:8]}...")
print(f" βœ“ JWT verified with SUPABASE_JWT_SECRET (HS256)")
except Exception as e:
fail("Backend JWT verify", str(e))
print(f" βœ— {e}")
# ── Step 5: Invalid credentials rejected ──────────────────────────────────────
print("\n[5] Testing invalid credentials rejection...")
r5 = requests.post(
f"{SUPABASE_URL}/auth/v1/token?grant_type=password",
headers=sb_headers(),
json={"email": TEST_EMAIL, "password": "WrongPassword!!"},
timeout=15,
)
if r5.status_code == 400 and "invalid_credentials" in r5.text:
ok("Invalid credentials β†’ 400", f"HTTP {r5.status_code}")
print(f" βœ“ HTTP {r5.status_code}: {r5.json().get('error_code')}")
else:
fail("Invalid credentials β†’ 400", f"HTTP {r5.status_code}")
# ── Step 6: Token refresh ─────────────────────────────────────────────────────
print("\n[6] Testing token refresh...")
if refresh_tok:
r6 = requests.post(
f"{SUPABASE_URL}/auth/v1/token?grant_type=refresh_token",
headers=sb_headers(),
json={"refresh_token": refresh_tok},
timeout=15,
)
if r6.status_code == 200 and r6.json().get("access_token"):
ok("Token refresh", "new access_token received")
print(f" βœ“ New access_token: {r6.json()['access_token'][:40]}...")
else:
fail("Token refresh", f"HTTP {r6.status_code}: {r6.text[:100]}")
print(f" βœ— {r6.status_code}: {r6.text[:100]}")
else:
warn("Token refresh", "skipped β€” no refresh_token")
# ── Step 7: Protected vs. unprotected backend endpoints ───────────────────────
print("\n[7] Testing backend endpoint security...")
# /health β€” public
r7a = requests.get(f"{BACKEND}/health", timeout=10)
if r7a.status_code == 200:
ok("/health public", f"HTTP {r7a.status_code}")
print(f" βœ“ /health β†’ {r7a.status_code}")
else:
fail("/health public", f"HTTP {r7a.status_code}")
# /api/analyses/save β€” no auth (current app is anonymous-first, this is expected)
r7b = requests.post(f"{BACKEND}/api/analyses/save",
json={"session_id":"audit_verify","findings":[],"summary":"audit test"},
timeout=10)
if r7b.status_code == 200:
warn("/api/analyses/save no-auth", "returns 200 (anonymous-first design)")
print(f" ⚠ /api/analyses/save (no auth) β†’ {r7b.status_code} β€” anonymous-first design")
else:
ok("/api/analyses/save blocked w/o auth", f"HTTP {r7b.status_code}")
print(f" βœ“ /api/analyses/save (no auth) β†’ {r7b.status_code}")
# ── Step 8: Enable RLS on protected tables ────────────────────────────────────
print("\n[8] Enabling RLS via Supabase REST (rpc exec)...")
rls_sql = """
ALTER TABLE analyses ENABLE ROW LEVEL SECURITY;
ALTER TABLE chat_messages ENABLE ROW LEVEL SECURITY;
DROP POLICY IF EXISTS analyses_deny_anon ON analyses;
DROP POLICY IF EXISTS chat_deny_anon ON chat_messages;
CREATE POLICY analyses_deny_anon
ON analyses FOR ALL TO anon, authenticated USING (false);
CREATE POLICY chat_deny_anon
ON chat_messages FOR ALL TO anon, authenticated USING (false);
"""
# Try to run SQL via pg_execute if available, else guide user
r8 = requests.post(
f"{SUPABASE_URL}/rest/v1/rpc/exec",
headers={**sb_headers(SERVICE_KEY), "Authorization": f"Bearer {SERVICE_KEY}"},
json={"sql": rls_sql},
timeout=15,
)
if r8.status_code in (200, 204):
ok("Enable RLS (analyses + chat_messages)", "via Supabase rpc/exec")
print(" βœ“ RLS enabled via rpc/exec")
else:
warn("Enable RLS", f"rpc/exec returned {r8.status_code} β€” must run SQL manually")
print(f" ⚠ rpc/exec β†’ {r8.status_code}: {r8.text[:150]}")
print(" β†’ Run this SQL in Supabase SQL Editor manually:")
print(" https://supabase.com/dashboard/project/assxdosinubpubeqjrso/sql/new")
print()
for line in rls_sql.strip().splitlines():
print(f" {line}")
# ── Step 9: Verify RLS blocks anon direct access ──────────────────────────────
print("\n[9] Verifying RLS blocks anon REST access...")
time.sleep(1) # allow RLS to propagate
r9 = requests.get(f"{SUPABASE_URL}/rest/v1/analyses?select=*&limit=3",
headers=sb_headers(ANON_KEY), timeout=10)
if r9.status_code == 200 and r9.json() == []:
ok("RLS: anon sees no analyses", "returns []")
print(" βœ“ Anon key returns [] (RLS enforced)")
elif r9.status_code in (403, 401):
ok("RLS: anon blocked", f"HTTP {r9.status_code}")
print(f" βœ“ Anon blocked with HTTP {r9.status_code}")
else:
fail("RLS: anon can still read analyses", f"HTTP {r9.status_code}, rows={len(r9.json()) if r9.ok else '?'}")
print(f" βœ— RLS not enforced: {r9.status_code}, {r9.text[:100]}")
# ── Step 10: Logout ───────────────────────────────────────────────────────────
print("\n[10] Testing logout...")
if access_tok:
r10 = requests.post(
f"{SUPABASE_URL}/auth/v1/logout",
headers={**sb_headers(), "Authorization": f"Bearer {access_tok}"},
timeout=10,
)
if r10.status_code in (200, 204):
ok("Logout", f"HTTP {r10.status_code}")
print(f" βœ“ Logout β†’ {r10.status_code}")
else:
fail("Logout", f"HTTP {r10.status_code}")
print(f" βœ— {r10.status_code}: {r10.text[:100]}")
else:
warn("Logout", "skipped")
# ── Step 11: Expired token rejected ───────────────────────────────────────────
print("\n[11] Testing expired/invalid token rejection...")
import jwt as pyjwt
import datetime
from jwt import PyJWKClient as _PyJWKClient
# Use HS256 with JWT_SECRET to forge an expired token (works for this test regardless of project alg)
_test_secret = JWT_SECRET if JWT_SECRET else "test-secret-for-expired-check"
expired_payload = {
"iss": "supabase",
"sub": "00000000-0000-0000-0000-000000000000",
"aud": "authenticated",
"role": "authenticated",
"iat": int(time.time()) - 7200,
"exp": int(time.time()) - 3600, # expired 1 hour ago
}
expired_token = pyjwt.encode(expired_payload, _test_secret, algorithm="HS256")
try:
pyjwt.decode(expired_token, _test_secret, algorithms=["HS256"], audience="authenticated")
fail("Expired token rejected", "was NOT rejected β€” security bug")
print(" βœ— Expired token was accepted β€” BUG!")
except pyjwt.ExpiredSignatureError:
ok("Expired token rejected", "ExpiredSignatureError raised correctly")
print(" βœ“ Expired token β†’ ExpiredSignatureError")
except Exception as e:
warn("Expired token", str(e))
# completely invalid token β€” JWKS should reject it
try:
jwks_client = _PyJWKClient(f"{SUPABASE_URL}/auth/v1/.well-known/jwks.json")
pyjwt.decode("not.a.valid.token", "fake", algorithms=["HS256", "ES256"], audience="authenticated")
fail("Invalid token rejected", "was NOT rejected β€” security bug")
except Exception:
ok("Invalid token rejected", "InvalidTokenError raised correctly")
print(" βœ“ Invalid token β†’ InvalidTokenError")
# ── Step 12: Cleanup ──────────────────────────────────────────────────────────
print("\n[12] Cleaning up test user and data...")
if user_id:
r_del_user = requests.delete(
f"{SUPABASE_URL}/auth/v1/admin/users/{user_id}",
headers={**sb_headers(SERVICE_KEY), "Authorization": f"Bearer {SERVICE_KEY}"},
timeout=10,
)
if r_del_user.status_code in (200, 204):
ok("Test user deleted", user_id)
print(f" βœ“ Test user deleted: {user_id}")
else:
warn("Test user delete", f"HTTP {r_del_user.status_code}")
# Delete audit test analyses rows
requests.delete(
f"{SUPABASE_URL}/rest/v1/analyses?session_id=eq.audit_verify",
headers={**sb_headers(SERVICE_KEY), "Authorization": f"Bearer {SERVICE_KEY}"},
timeout=10,
)
# ── Final report ──────────────────────────────────────────────────────────────
print("\n" + "━" * 50)
print("FINAL AUDIT REPORT")
print("━" * 50)
passed = [r for r in results if r[1] == "PASS"]
warned = [r for r in results if r[1] == "WARN"]
failed = [r for r in results if r[1] == "FAIL"]
for name, status, detail in results:
icon = "βœ“" if status == "PASS" else "⚠" if status == "WARN" else "βœ—"
print(f" {icon} [{status}] {name}" + (f" β€” {detail}" if detail else ""))
print()
print(f" PASS: {len(passed)} WARN: {len(warned)} FAIL: {len(failed)}")
score = int(100 * len(passed) / max(len(results), 1))
print(f" Production readiness: {score}%")
print("━" * 50)
if failed:
print("\n⚠ Failing checks must be resolved before production deployment.")
sys.exit(1)
else:
print("\nβœ“ All critical checks passed.")