from __future__ import annotations import time from concurrent.futures import ThreadPoolExecutor from unittest.mock import patch import fakeredis import pytest from fastapi.testclient import TestClient from backend.app.api.routes.mohp import MOHPEvaluateRequest, evaluate_mohp from backend.app.main import app from backend.app.services.session_store import add_emotion_sample, add_objection, add_turn, create_session, get_session from backend.app.services.webrtc_simulation import process_simulation_turn, start_simulation_session @pytest.fixture(autouse=True) def _patch_redis(): fake = fakeredis.FakeRedis(decode_responses=True) with patch("backend.app.db.redis_client.get_redis_client", return_value=fake): with patch("backend.app.services.session_store.get_redis_client", return_value=fake): with patch("backend.app.services.webrtc_simulation.get_redis_client", return_value=fake): yield fake @pytest.fixture() def client() -> TestClient: return TestClient(app) def test_sessions_list_is_empty_when_no_real_sessions_exist(client: TestClient) -> None: response = client.get("/api/analytics/sessions") assert response.status_code == 200 assert response.json()["sessions"] == [] def test_simulation_start_turn_and_end_are_visible_in_analytics(client: TestClient) -> None: start = client.post("/api/simulation/start", json={"persona_id": "HCP-05-010"}) assert start.status_code == 200 session_id = start.json()["session_id"] sessions = client.get("/api/analytics/sessions") assert sessions.status_code == 200 summaries = sessions.json()["sessions"] assert summaries[0]["session_id"] == session_id assert summaries[0]["cluster_id"] == 5 assert summaries[0]["turn_count"] == 0 with patch("backend.app.services.webrtc_simulation._best_semantic_cache_hit", return_value=None): with patch("backend.app.services.webrtc_simulation._cache_response_entry", return_value="cache-test"): with patch("backend.app.services.webrtc_simulation._generate_response_text", return_value="Please show me the evidence and safety data."): turn = client.post( "/api/simulation/turn", json={ "session_id": session_id, "input_text": "Can we discuss efficacy, safety, and biomarker-selected adoption?", }, ) assert turn.status_code == 200 detail = client.get(f"/api/analytics/session/{session_id}") assert detail.status_code == 200 assert len(detail.json()["conversation"]) >= 2 ended = client.post("/api/simulation/end", json={"session_id": session_id}) assert ended.status_code == 200 assert ended.json()["ended"] is True sessions_after_end = client.get("/api/analytics/sessions").json()["sessions"] assert sessions_after_end[0]["session_id"] == session_id assert sessions_after_end[0]["ended_at"] is not None def test_session_detail_backfills_missing_analytics(client: TestClient) -> None: create_session("analytics-backfill", "HCP-00-010", cluster_id=0) add_turn("analytics-backfill", "user", "Can you show me the efficacy and safety evidence?") add_turn("analytics-backfill", "assistant", "The Phase III dataset showed strong efficacy and manageable safety across eligible patients.") response = client.get("/api/analytics/session/analytics-backfill") assert response.status_code == 200 body = response.json() assert body["emotionTimeline"] assert body["objections"] assert body["totalPoints"] == 1 assert body["deliveredPoints"] == 1 def test_simulation_turn_persists_cluster_and_emotion_data() -> None: session = start_simulation_session({"persona_id": "HCP-02-010"}) process_simulation_turn( { "session_id": session["session_id"], "input_text": "We have strong efficacy, survival, and better access support.", } ) data = get_session(session["session_id"]) assert data is not None assert data["cluster_id"] == 2 assert len(data["emotion_timeline"]) == 1 assert data["emotion_timeline"][0]["metric_source"] == "heuristic" assert data["emotion_timeline"][0]["metric_confidence"] == "low" assert data["emotion_timeline"][0]["provider_enabled"] is False def test_concurrent_session_turn_appends_do_not_drop_data(_patch_redis) -> None: create_session("concurrent-session", "HCP-00-010", cluster_id=0) def append_turn(i: int) -> None: time.sleep(0.001) add_turn("concurrent-session", "user", f"turn {i}") with ThreadPoolExecutor(max_workers=8) as executor: list(executor.map(append_turn, range(40))) data = get_session("concurrent-session") assert data is not None assert len(data["conversation"]) == 40 def test_concurrent_session_analytics_appends_do_not_drop_data(_patch_redis) -> None: create_session("concurrent-analytics", "HCP-00-010", cluster_id=0) def append_payload(i: int) -> None: time.sleep(0.001) if i % 2 == 0: add_objection("concurrent-analytics", f"objection {i}", severity="medium") else: add_emotion_sample("concurrent-analytics", 0.1, 0.2, 0.3) with ThreadPoolExecutor(max_workers=8) as executor: list(executor.map(append_payload, range(40))) data = get_session("concurrent-analytics") assert data is not None assert len(data["objections"]) == 20 assert len(data["emotion_timeline"]) == 20 def test_mohp_evaluation_persists_objections_to_session_store() -> None: create_session("mohp-session", "HCP-00-010", cluster_id=0) response = evaluate_mohp( MOHPEvaluateRequest( session_id="mohp-session", input_text="We have efficacy and survival evidence from the trial.", cluster_id=0, persona_id="HCP-00-010", ) ) data = get_session("mohp-session") assert response["count"] >= 1 assert data is not None assert len(data["objections"]) >= 1