Spaces:
Running
Running
| """ | |
| Tebyan Medical β Auth Setup & Verification Script | |
| -------------------------------------------------- | |
| Run ONCE after adding SUPABASE_JWT_SECRET and SUPABASE_SERVICE_KEY to .env | |
| python setup_auth.py | |
| What it does: | |
| 1. Validates all env vars are present | |
| 2. Creates a test user (bypasses email confirmation) | |
| 3. Tests login / JWT / session refresh / logout | |
| 4. Enables RLS on analyses and chat_messages via direct DB | |
| 5. Verifies RLS blocks anon REST access | |
| 6. Tests backend JWT verification | |
| 7. Cleans up test user | |
| 8. Prints a final pass/fail report | |
| """ | |
| from __future__ import annotations | |
| import sys, os, json, time, re, base64 | |
| sys.path.insert(0, os.path.join(os.path.dirname(__file__), "backend")) | |
| from dotenv import load_dotenv | |
| load_dotenv(os.path.join(os.path.dirname(__file__), ".env")) | |
| try: | |
| import requests | |
| except ImportError: | |
| sys.exit("pip install requests") | |
| # ββ Config ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| SUPABASE_URL = os.getenv("SUPABASE_URL", "") | |
| ANON_KEY = os.getenv("SUPABASE_KEY", "") | |
| SERVICE_KEY = os.getenv("SUPABASE_SERVICE_KEY", "") | |
| JWT_SECRET = os.getenv("SUPABASE_JWT_SECRET", "") | |
| BACKEND = "http://localhost:8000" | |
| TEST_EMAIL = f"tebyan_test_{int(time.time())}@mailnull.com" | |
| TEST_PASS = "TebyanTest2026!#" | |
| results: list[tuple[str, str, str]] = [] # (name, status, detail) | |
| def ok(name, detail=""): results.append((name, "PASS", detail)) | |
| def fail(name, detail=""): results.append((name, "FAIL", detail)) | |
| def warn(name, detail=""): results.append((name, "WARN", detail)) | |
| def sb_headers(key=None): | |
| k = key or ANON_KEY | |
| return {"apikey": k, "Authorization": f"Bearer {k}", "Content-Type": "application/json"} | |
| # ββ Pre-flight ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| print("\nβββ Tebyan Auth Setup & Verification βββ\n") | |
| missing = [v for v in ["SUPABASE_URL","SUPABASE_KEY","SUPABASE_SERVICE_KEY","SUPABASE_JWT_SECRET"] if not os.getenv(v)] | |
| if missing: | |
| print("β MISSING env vars:", missing) | |
| print("\nOpen d:\\Project\\.env and fill in:") | |
| print(" SUPABASE_JWT_SECRET β Supabase Dashboard β Project Settings β API β JWT Settings") | |
| print(" SUPABASE_SERVICE_KEY β Supabase Dashboard β Project Settings β API β service_role key") | |
| sys.exit(1) | |
| print(f"β All env vars present\n") | |
| # ββ Step 1: Create test user (admin API with service_role key) βββββββββββββ | |
| print("[1] Creating test user via admin API...") | |
| r = requests.post( | |
| f"{SUPABASE_URL}/auth/v1/admin/users", | |
| headers={**sb_headers(SERVICE_KEY), "Authorization": f"Bearer {SERVICE_KEY}"}, | |
| json={"email": TEST_EMAIL, "password": TEST_PASS, "email_confirm": True, | |
| "user_metadata": {"full_name": "Tebyan Audit Bot"}}, | |
| timeout=15, | |
| ) | |
| if r.status_code in (200, 201): | |
| user_id = r.json().get("id") | |
| ok("Create test user", f"id={user_id}") | |
| print(f" β Created: {TEST_EMAIL} id={user_id}") | |
| else: | |
| fail("Create test user", f"HTTP {r.status_code}: {r.text[:200]}") | |
| print(f" β {r.status_code}: {r.text[:200]}") | |
| user_id = None | |
| # ββ Step 2: Login βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| print("\n[2] Testing login...") | |
| r2 = requests.post( | |
| f"{SUPABASE_URL}/auth/v1/token?grant_type=password", | |
| headers=sb_headers(), | |
| json={"email": TEST_EMAIL, "password": TEST_PASS}, | |
| timeout=15, | |
| ) | |
| if r2.status_code == 200: | |
| data = r2.json() | |
| access_tok = data.get("access_token", "") | |
| refresh_tok = data.get("refresh_token", "") | |
| expires_in = data.get("expires_in", 0) | |
| ok("Login", f"expires_in={expires_in}s") | |
| print(f" β access_token: {access_tok[:40]}...") | |
| print(f" β refresh_token: {refresh_tok[:20]}...") | |
| print(f" β expires_in: {expires_in}s") | |
| else: | |
| fail("Login", f"HTTP {r2.status_code}: {r2.text[:200]}") | |
| print(f" β {r2.status_code}: {r2.text[:200]}") | |
| access_tok = refresh_tok = "" | |
| # ββ Step 3: JWT structure decode ββββββββββββββββββββββββββββββββββββββββββββββ | |
| print("\n[3] Decoding JWT payload...") | |
| if access_tok: | |
| parts = access_tok.split(".") | |
| padded = parts[1] + "=" * (4 - len(parts[1]) % 4) | |
| payload = json.loads(base64.b64decode(padded)) | |
| print(f" sub = {payload.get('sub')}") | |
| print(f" email= {payload.get('email')}") | |
| print(f" role = {payload.get('role')}") | |
| print(f" aud = {payload.get('aud')}") | |
| if payload.get("aud") == "authenticated" and payload.get("role") == "authenticated": | |
| ok("JWT structure", f"aud=authenticated role=authenticated sub={payload.get('sub')[:8]}...") | |
| else: | |
| fail("JWT structure", f"unexpected aud/role: {payload}") | |
| else: | |
| warn("JWT structure", "skipped β no access_token") | |
| # ββ Step 4: Backend JWT verification ββββββββββββββββββββββββββββββββββββββββββ | |
| print("\n[4] Testing backend JWT verification...") | |
| try: | |
| import jwt as pyjwt | |
| from jwt import PyJWKClient | |
| header = pyjwt.get_unverified_header(access_tok) | |
| alg = header.get("alg", "HS256") | |
| print(f" JWT algorithm: {alg}") | |
| if alg == "ES256": | |
| # Newer Supabase projects use ES256 with JWKS | |
| jwks_client = PyJWKClient(f"{SUPABASE_URL}/auth/v1/.well-known/jwks.json") | |
| signing_key = jwks_client.get_signing_key_from_jwt(access_tok) | |
| decoded = pyjwt.decode(access_tok, signing_key.key, algorithms=["ES256"], audience="authenticated") | |
| ok("Backend JWT verify (ES256/JWKS)", f"sub={decoded.get('sub','')[:8]}...") | |
| print(f" β JWT verified via JWKS public key (ES256)") | |
| else: | |
| decoded = pyjwt.decode(access_tok, JWT_SECRET, algorithms=["HS256"], audience="authenticated") | |
| ok("Backend JWT verify (HS256)", f"sub={decoded.get('sub','')[:8]}...") | |
| print(f" β JWT verified with SUPABASE_JWT_SECRET (HS256)") | |
| except Exception as e: | |
| fail("Backend JWT verify", str(e)) | |
| print(f" β {e}") | |
| # ββ Step 5: Invalid credentials rejected ββββββββββββββββββββββββββββββββββββββ | |
| print("\n[5] Testing invalid credentials rejection...") | |
| r5 = requests.post( | |
| f"{SUPABASE_URL}/auth/v1/token?grant_type=password", | |
| headers=sb_headers(), | |
| json={"email": TEST_EMAIL, "password": "WrongPassword!!"}, | |
| timeout=15, | |
| ) | |
| if r5.status_code == 400 and "invalid_credentials" in r5.text: | |
| ok("Invalid credentials β 400", f"HTTP {r5.status_code}") | |
| print(f" β HTTP {r5.status_code}: {r5.json().get('error_code')}") | |
| else: | |
| fail("Invalid credentials β 400", f"HTTP {r5.status_code}") | |
| # ββ Step 6: Token refresh βββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| print("\n[6] Testing token refresh...") | |
| if refresh_tok: | |
| r6 = requests.post( | |
| f"{SUPABASE_URL}/auth/v1/token?grant_type=refresh_token", | |
| headers=sb_headers(), | |
| json={"refresh_token": refresh_tok}, | |
| timeout=15, | |
| ) | |
| if r6.status_code == 200 and r6.json().get("access_token"): | |
| ok("Token refresh", "new access_token received") | |
| print(f" β New access_token: {r6.json()['access_token'][:40]}...") | |
| else: | |
| fail("Token refresh", f"HTTP {r6.status_code}: {r6.text[:100]}") | |
| print(f" β {r6.status_code}: {r6.text[:100]}") | |
| else: | |
| warn("Token refresh", "skipped β no refresh_token") | |
| # ββ Step 7: Protected vs. unprotected backend endpoints βββββββββββββββββββββββ | |
| print("\n[7] Testing backend endpoint security...") | |
| # /health β public | |
| r7a = requests.get(f"{BACKEND}/health", timeout=10) | |
| if r7a.status_code == 200: | |
| ok("/health public", f"HTTP {r7a.status_code}") | |
| print(f" β /health β {r7a.status_code}") | |
| else: | |
| fail("/health public", f"HTTP {r7a.status_code}") | |
| # /api/analyses/save β no auth (current app is anonymous-first, this is expected) | |
| r7b = requests.post(f"{BACKEND}/api/analyses/save", | |
| json={"session_id":"audit_verify","findings":[],"summary":"audit test"}, | |
| timeout=10) | |
| if r7b.status_code == 200: | |
| warn("/api/analyses/save no-auth", "returns 200 (anonymous-first design)") | |
| print(f" β /api/analyses/save (no auth) β {r7b.status_code} β anonymous-first design") | |
| else: | |
| ok("/api/analyses/save blocked w/o auth", f"HTTP {r7b.status_code}") | |
| print(f" β /api/analyses/save (no auth) β {r7b.status_code}") | |
| # ββ Step 8: Enable RLS on protected tables ββββββββββββββββββββββββββββββββββββ | |
| print("\n[8] Enabling RLS via Supabase REST (rpc exec)...") | |
| rls_sql = """ | |
| ALTER TABLE analyses ENABLE ROW LEVEL SECURITY; | |
| ALTER TABLE chat_messages ENABLE ROW LEVEL SECURITY; | |
| DROP POLICY IF EXISTS analyses_deny_anon ON analyses; | |
| DROP POLICY IF EXISTS chat_deny_anon ON chat_messages; | |
| CREATE POLICY analyses_deny_anon | |
| ON analyses FOR ALL TO anon, authenticated USING (false); | |
| CREATE POLICY chat_deny_anon | |
| ON chat_messages FOR ALL TO anon, authenticated USING (false); | |
| """ | |
| # Try to run SQL via pg_execute if available, else guide user | |
| r8 = requests.post( | |
| f"{SUPABASE_URL}/rest/v1/rpc/exec", | |
| headers={**sb_headers(SERVICE_KEY), "Authorization": f"Bearer {SERVICE_KEY}"}, | |
| json={"sql": rls_sql}, | |
| timeout=15, | |
| ) | |
| if r8.status_code in (200, 204): | |
| ok("Enable RLS (analyses + chat_messages)", "via Supabase rpc/exec") | |
| print(" β RLS enabled via rpc/exec") | |
| else: | |
| warn("Enable RLS", f"rpc/exec returned {r8.status_code} β must run SQL manually") | |
| print(f" β rpc/exec β {r8.status_code}: {r8.text[:150]}") | |
| print(" β Run this SQL in Supabase SQL Editor manually:") | |
| print(" https://supabase.com/dashboard/project/assxdosinubpubeqjrso/sql/new") | |
| print() | |
| for line in rls_sql.strip().splitlines(): | |
| print(f" {line}") | |
| # ββ Step 9: Verify RLS blocks anon direct access ββββββββββββββββββββββββββββββ | |
| print("\n[9] Verifying RLS blocks anon REST access...") | |
| time.sleep(1) # allow RLS to propagate | |
| r9 = requests.get(f"{SUPABASE_URL}/rest/v1/analyses?select=*&limit=3", | |
| headers=sb_headers(ANON_KEY), timeout=10) | |
| if r9.status_code == 200 and r9.json() == []: | |
| ok("RLS: anon sees no analyses", "returns []") | |
| print(" β Anon key returns [] (RLS enforced)") | |
| elif r9.status_code in (403, 401): | |
| ok("RLS: anon blocked", f"HTTP {r9.status_code}") | |
| print(f" β Anon blocked with HTTP {r9.status_code}") | |
| else: | |
| fail("RLS: anon can still read analyses", f"HTTP {r9.status_code}, rows={len(r9.json()) if r9.ok else '?'}") | |
| print(f" β RLS not enforced: {r9.status_code}, {r9.text[:100]}") | |
| # ββ Step 10: Logout βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| print("\n[10] Testing logout...") | |
| if access_tok: | |
| r10 = requests.post( | |
| f"{SUPABASE_URL}/auth/v1/logout", | |
| headers={**sb_headers(), "Authorization": f"Bearer {access_tok}"}, | |
| timeout=10, | |
| ) | |
| if r10.status_code in (200, 204): | |
| ok("Logout", f"HTTP {r10.status_code}") | |
| print(f" β Logout β {r10.status_code}") | |
| else: | |
| fail("Logout", f"HTTP {r10.status_code}") | |
| print(f" β {r10.status_code}: {r10.text[:100]}") | |
| else: | |
| warn("Logout", "skipped") | |
| # ββ Step 11: Expired token rejected βββββββββββββββββββββββββββββββββββββββββββ | |
| print("\n[11] Testing expired/invalid token rejection...") | |
| import jwt as pyjwt | |
| import datetime | |
| from jwt import PyJWKClient as _PyJWKClient | |
| # Use HS256 with JWT_SECRET to forge an expired token (works for this test regardless of project alg) | |
| _test_secret = JWT_SECRET if JWT_SECRET else "test-secret-for-expired-check" | |
| expired_payload = { | |
| "iss": "supabase", | |
| "sub": "00000000-0000-0000-0000-000000000000", | |
| "aud": "authenticated", | |
| "role": "authenticated", | |
| "iat": int(time.time()) - 7200, | |
| "exp": int(time.time()) - 3600, # expired 1 hour ago | |
| } | |
| expired_token = pyjwt.encode(expired_payload, _test_secret, algorithm="HS256") | |
| try: | |
| pyjwt.decode(expired_token, _test_secret, algorithms=["HS256"], audience="authenticated") | |
| fail("Expired token rejected", "was NOT rejected β security bug") | |
| print(" β Expired token was accepted β BUG!") | |
| except pyjwt.ExpiredSignatureError: | |
| ok("Expired token rejected", "ExpiredSignatureError raised correctly") | |
| print(" β Expired token β ExpiredSignatureError") | |
| except Exception as e: | |
| warn("Expired token", str(e)) | |
| # completely invalid token β JWKS should reject it | |
| try: | |
| jwks_client = _PyJWKClient(f"{SUPABASE_URL}/auth/v1/.well-known/jwks.json") | |
| pyjwt.decode("not.a.valid.token", "fake", algorithms=["HS256", "ES256"], audience="authenticated") | |
| fail("Invalid token rejected", "was NOT rejected β security bug") | |
| except Exception: | |
| ok("Invalid token rejected", "InvalidTokenError raised correctly") | |
| print(" β Invalid token β InvalidTokenError") | |
| # ββ Step 12: Cleanup ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| print("\n[12] Cleaning up test user and data...") | |
| if user_id: | |
| r_del_user = requests.delete( | |
| f"{SUPABASE_URL}/auth/v1/admin/users/{user_id}", | |
| headers={**sb_headers(SERVICE_KEY), "Authorization": f"Bearer {SERVICE_KEY}"}, | |
| timeout=10, | |
| ) | |
| if r_del_user.status_code in (200, 204): | |
| ok("Test user deleted", user_id) | |
| print(f" β Test user deleted: {user_id}") | |
| else: | |
| warn("Test user delete", f"HTTP {r_del_user.status_code}") | |
| # Delete audit test analyses rows | |
| requests.delete( | |
| f"{SUPABASE_URL}/rest/v1/analyses?session_id=eq.audit_verify", | |
| headers={**sb_headers(SERVICE_KEY), "Authorization": f"Bearer {SERVICE_KEY}"}, | |
| timeout=10, | |
| ) | |
| # ββ Final report ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| print("\n" + "β" * 50) | |
| print("FINAL AUDIT REPORT") | |
| print("β" * 50) | |
| passed = [r for r in results if r[1] == "PASS"] | |
| warned = [r for r in results if r[1] == "WARN"] | |
| failed = [r for r in results if r[1] == "FAIL"] | |
| for name, status, detail in results: | |
| icon = "β" if status == "PASS" else "β " if status == "WARN" else "β" | |
| print(f" {icon} [{status}] {name}" + (f" β {detail}" if detail else "")) | |
| print() | |
| print(f" PASS: {len(passed)} WARN: {len(warned)} FAIL: {len(failed)}") | |
| score = int(100 * len(passed) / max(len(results), 1)) | |
| print(f" Production readiness: {score}%") | |
| print("β" * 50) | |
| if failed: | |
| print("\nβ Failing checks must be resolved before production deployment.") | |
| sys.exit(1) | |
| else: | |
| print("\nβ All critical checks passed.") | |