| """FastAPI service — `POST /analyze`, `POST /neighbors`, `GET /health`. |
| |
| The lifespan startup loads CLAP **and** the corpus (`corpus.json` + |
| `embeddings.npy` + `segment_embeddings.npz`) once into memory so similarity |
| queries are microsecond-fast. |
| |
| Endpoints: |
| - `/analyze` — single-track scoring (Soundcheck): the technical-quality gate. |
| - `/neighbors` — similarity audit (Twin Check): given an uploaded track, return |
| the top-k most similar tracks already in the catalog with mean-pooled and |
| max-segment similarity metrics. |
| |
| Errors are returned as `{"error": "<code>"}` to match the frontend's `api.js`: |
| - `unsupported_media` (415) — wrong mime / extension |
| - `empty_file` (422) — zero-byte upload |
| - `file_too_large` (413) — > MAX_UPLOAD_BYTES (~50 MB) |
| - `decode_failed` (422) — librosa couldn't decode |
| - `empty_audio` (422) — decoded but no samples |
| """ |
|
|
| from __future__ import annotations |
|
|
| import hashlib |
| import io |
| import json |
| import os |
| import threading |
| from contextlib import asynccontextmanager |
| from pathlib import Path |
|
|
| import librosa |
| import numpy as np |
| import soundfile as sf |
| from fastapi import FastAPI, File, UploadFile |
| from fastapi.middleware.cors import CORSMiddleware |
| from fastapi.responses import JSONResponse |
| from pydantic import BaseModel, Field |
|
|
| |
| |
| |
| |
| from . import __version__, acrcloud_engine, context_token, muq_engine, narrative_telemetry, clap_windowed, config, mir_features, similarity |
| from .librosa_engine import analyze_array |
| from .scoring import compute_report |
|
|
| |
| _sentry_dsn = os.getenv("SENTRY_DSN", "").strip() |
| if _sentry_dsn: |
| import sentry_sdk |
| sentry_sdk.init( |
| dsn=_sentry_dsn, |
| traces_sample_rate=float(os.getenv("SENTRY_TRACES_SAMPLE_RATE", "0.1")), |
| environment=os.getenv("SENTRY_ENVIRONMENT", "production"), |
| release=__version__, |
| ) |
|
|
| |
| _clap_lock = threading.Lock() |
|
|
| |
| _corpus_tracks: list[dict] = [] |
| _corpus_embeddings: np.ndarray | None = None |
| _corpus_by_id: dict[str, dict] = {} |
| _flat_catalog: similarity.FlatCatalog | None = None |
| _catalog_cosine_distribution: np.ndarray | None = None |
| _model_sha: str = "" |
| _catalog_sha: str = "" |
| _threshold_default: float = config.SIMILARITY_THRESHOLD_DEFAULT |
|
|
|
|
| def _default_corpus_dir() -> Path: |
| """Search for the corpus next to the repo's quality-scorer/.""" |
| here = Path(__file__).resolve() |
| |
| return here.parents[2] / "quality-scorer" / "public" / "corpus" |
|
|
|
|
| def _load_corpus() -> None: |
| """Populate corpus globals from disk if all corpus artifacts are present.""" |
| global _corpus_tracks, _corpus_embeddings, _corpus_by_id, _flat_catalog |
| global _catalog_cosine_distribution |
| global _model_sha, _catalog_sha, _threshold_default |
| corpus_dir = Path(os.getenv("CORPUS_DIR", str(_default_corpus_dir()))) |
| cpath = corpus_dir / "corpus.json" |
| epath = corpus_dir / "embeddings.npy" |
| spath = corpus_dir / "segment_embeddings.npz" |
| mpath = corpus_dir / "manifest.json" |
| missing = [p.name for p in (cpath, epath, spath, mpath) if not p.exists()] |
| if missing: |
| print( |
| f"[api] corpus not found at {corpus_dir} " |
| f"(missing: {', '.join(missing)}) " |
| f"— /neighbors will return no_corpus" |
| ) |
| _corpus_tracks = [] |
| _corpus_embeddings = None |
| _corpus_by_id = {} |
| _flat_catalog = None |
| _model_sha = "" |
| _catalog_sha = "" |
| _threshold_default = config.SIMILARITY_THRESHOLD_DEFAULT |
| return |
| try: |
| data = json.loads(cpath.read_text()) |
| _corpus_tracks = data if isinstance(data, list) else data.get("tracks", []) |
| _corpus_embeddings = np.load(epath).astype(np.float32) |
| with np.load(spath) as npz: |
| segment_embeddings = {k: npz[k].astype(np.float32) for k in npz.files} |
| manifest_bytes = mpath.read_bytes() |
| manifest = json.loads(manifest_bytes.decode("utf-8")) |
| _model_sha = str(manifest.get("model_sha") or "unpinned") |
| if _model_sha == "unpinned": |
| print("[api] WARNING manifest missing model_sha; using 'unpinned'") |
| |
| |
| |
| |
| _catalog_sha = hashlib.sha256(manifest_bytes).hexdigest() |
| _threshold_default = similarity.threshold_from_manifest(manifest) |
| _flat_catalog = similarity.build_flat_catalog(_corpus_tracks, _corpus_embeddings, segment_embeddings) |
| _catalog_cosine_distribution = similarity.compute_catalog_distribution(_flat_catalog) |
| _corpus_by_id = {str(row["track_id"]): row for row in _corpus_tracks if row.get("track_id")} |
| if _corpus_embeddings.shape[0] != len(_corpus_tracks): |
| print( |
| f"[api] WARNING corpus length {len(_corpus_tracks)} ≠ embeddings rows " |
| f"{_corpus_embeddings.shape[0]} — /neighbors may be inconsistent" |
| ) |
| print( |
| f"[api] corpus loaded: {len(_corpus_tracks)} tracks · " |
| f"embeddings {_corpus_embeddings.shape} · segments {_flat_catalog.segs_flat.shape[0]}" |
| ) |
| except Exception as e: |
| print(f"[api] corpus load failed: {e!r}") |
| _corpus_tracks = [] |
| _corpus_embeddings = None |
| _corpus_by_id = {} |
| _flat_catalog = None |
| _catalog_cosine_distribution = None |
| _model_sha = "" |
| _catalog_sha = "" |
| _threshold_default = config.SIMILARITY_THRESHOLD_DEFAULT |
|
|
|
|
| @asynccontextmanager |
| async def lifespan(_app): |
| muq_engine.load() |
| _load_corpus() |
| yield |
|
|
|
|
| app = FastAPI(title="PiedPiper", version=__version__, lifespan=lifespan) |
|
|
| _CORS_ORIGIN = os.getenv("CORS_ORIGIN", "http://localhost:5173") |
| app.add_middleware( |
| CORSMiddleware, |
| allow_origins=[_CORS_ORIGIN], |
| allow_origin_regex=r"https://.*\.vercel\.app", |
| allow_methods=["POST", "GET"], |
| allow_headers=["*"], |
| ) |
|
|
|
|
| |
|
|
| def _err(status: int, code: str) -> JSONResponse: |
| return JSONResponse(status_code=status, content={"error": code}) |
|
|
|
|
| def _validate_upload(file: UploadFile, raw: bytes) -> JSONResponse | None: |
| ext = Path(file.filename or "").suffix.lower() |
| mime = (file.content_type or "").lower() |
| if ext not in config.ALLOWED_EXTENSIONS and not mime.startswith(config.ALLOWED_MIME_PREFIX): |
| return _err(415, "unsupported_media") |
| if not raw: |
| return _err(422, "empty_file") |
| if len(raw) > config.MAX_UPLOAD_BYTES: |
| return _err(413, "file_too_large") |
| return None |
|
|
|
|
| def _decode_and_pipeline(raw: bytes, ext: str = "") -> dict | JSONResponse: |
| """Decode bytes; run librosa + CLAP; return all artifacts (analysis, embedding, genres, report). |
| |
| Returns a dict or a JSONResponse on error. |
| |
| `ext` is the upload's file extension (e.g. ".m4a"). It's used only as the |
| suffix on the temp-file fallback path: when librosa.load on a BytesIO |
| fails for an AAC-LC `.m4a` upload (libsndfile can't decode AAC, and |
| audioread's ffmpeg fallback requires a path not a BytesIO), we write the |
| bytes to a temp file with the right suffix and retry. The suffix matters |
| because ffmpeg's format dispatch is partially extension-driven. |
| """ |
| try: |
| duration_full = float(sf.info(io.BytesIO(raw)).duration) |
| except Exception: |
| duration_full = None |
| try: |
| y, sr = librosa.load(io.BytesIO(raw), sr=config.ANALYSIS_SR, mono=False) |
| except Exception: |
| |
| |
| |
| import tempfile |
| suffix = ext if ext and ext.startswith(".") else "" |
| try: |
| with tempfile.NamedTemporaryFile(suffix=suffix, delete=True) as tmp: |
| tmp.write(raw) |
| tmp.flush() |
| y, sr = librosa.load(tmp.name, sr=config.ANALYSIS_SR, mono=False) |
| except Exception: |
| return _err(422, "decode_failed") |
| if (y if y.ndim == 1 else y).shape[-1] == 0: |
| return _err(422, "empty_audio") |
|
|
| analysis = analyze_array(y, sr, duration_override=duration_full) |
| mono = librosa.to_mono(y) if y.ndim > 1 else y |
| cap_n = int(config.CLIP_CAP_S * sr) |
| if mono.shape[-1] > cap_n: |
| mono = mono[:cap_n] |
| acrcloud_n = int(15 * sr) |
| acrcloud_slice = mono[:acrcloud_n] |
| acrcloud_buf = io.BytesIO() |
| sf.write(acrcloud_buf, acrcloud_slice, sr, format="WAV", subtype="PCM_16") |
| with _clap_lock: |
| emb, segment_embeddings = clap_windowed.encode_windowed(mono, sr, max_seconds=None) |
| genres = muq_engine.top_genres(emb) |
| report = compute_report(analysis["raw"]) |
|
|
| |
| |
| |
| |
| try: |
| query_mir = mir_features.compute(mono, sr) |
| except Exception as exc: |
| print(f"[api] mir_features.compute failed: {exc!r}") |
| query_mir = None |
|
|
| return { |
| "analysis": analysis, |
| "report": report, |
| "genres": genres, |
| "emb": emb, |
| "segment_embeddings": segment_embeddings, |
| "mir": query_mir, |
| "acrcloud_audio": acrcloud_buf.getvalue(), |
| } |
|
|
|
|
| def _build_criteria_block(query_mir, match_mir_dict) -> dict | None: |
| """Per ADR-0004: compose the four-criterion comparison block for one neighbor. |
| |
| Args: |
| query_mir: MirFeatures dataclass (or None) computed on the upload. |
| match_mir_dict: dict from corpus.json `mir_features` field, or None. |
| |
| Returns: |
| Dict with `tempo`/`key`/`harmonic`/`timbre` entries, or None when |
| either side is missing MIR data (which is the case for un-backfilled |
| catalog tracks during the rollout window). |
| """ |
| if query_mir is None or not match_mir_dict: |
| return None |
| try: |
| match_mir = mir_features.MirFeatures.from_dict(match_mir_dict) |
| except (KeyError, TypeError, ValueError): |
| return None |
|
|
| tempo_cmp = similarity.compare_tempos(query_mir.tempo_bpm, match_mir.tempo_bpm) |
| key_cmp = similarity.compare_keys(query_mir.key, query_mir.mode, match_mir.key, match_mir.mode) |
| chroma_cmp = similarity.compare_chroma_vectors(query_mir.chroma_mean, match_mir.chroma_mean) |
| timbre_cmp = similarity.compare_timbre_vectors(query_mir.timbre_mean, match_mir.timbre_mean) |
|
|
| return { |
| "tempo": { |
| "queryValue": round(float(query_mir.tempo_bpm), 1), |
| "matchValue": round(float(match_mir.tempo_bpm), 1), |
| "agreement": float(tempo_cmp["agreement"]), |
| "label": str(tempo_cmp["label"]), |
| }, |
| "key": { |
| "queryValue": f"{query_mir.key} {query_mir.mode}", |
| "matchValue": f"{match_mir.key} {match_mir.mode}", |
| "agreement": float(key_cmp["agreement"]), |
| "label": str(key_cmp["label"]), |
| }, |
| "harmonic": { |
| "agreement": float(chroma_cmp["agreement"]), |
| "label": str(chroma_cmp["label"]), |
| }, |
| "timbre": { |
| "agreement": float(timbre_cmp["agreement"]), |
| "label": str(timbre_cmp["label"]), |
| }, |
| } |
|
|
|
|
| def _build_track(file: UploadFile, pipeline: dict, *, source: str, id_: str) -> dict: |
| return { |
| "id": id_, |
| "title": Path(file.filename or id_).stem or id_, |
| "genre": pipeline["genres"][0][0] if pipeline["genres"] else None, |
| "genres": [{"label": lbl, "score": float(s)} for lbl, s in pipeline["genres"]], |
| "durationSec": pipeline["analysis"]["durationSec"], |
| "source": source, |
| "waveform": pipeline["analysis"]["waveform"], |
| "problems": pipeline["analysis"]["problems"], |
| **pipeline["report"], |
| } |
|
|
|
|
| |
|
|
| @app.get("/health") |
| def health() -> dict: |
| return { |
| "ok": True, |
| "model": muq_engine.model_id(), |
| "modelSha": _model_sha, |
| "version": __version__, |
| "corpus": len(_corpus_tracks), |
| "segments": int(_flat_catalog.segs_flat.shape[0]) if _flat_catalog else 0, |
| "acrcloudEnabled": acrcloud_engine.is_enabled(), |
| } |
|
|
|
|
| @app.post("/analyze") |
| async def analyze_endpoint(file: UploadFile = File(...)): |
| raw = await file.read() |
| if (err := _validate_upload(file, raw)) is not None: |
| return err |
| ext = Path(file.filename or "").suffix.lower() |
| pipeline = _decode_and_pipeline(raw, ext=ext) |
| if isinstance(pipeline, JSONResponse): |
| return pipeline |
| return _build_track(file, pipeline, source="upload", id_="upload") |
|
|
|
|
| @app.post("/neighbors") |
| async def neighbors_endpoint(file: UploadFile = File(...), k: int = 5): |
| """Similarity audit: top-k nearest tracks in the catalog.""" |
| raw = await file.read() |
| if (err := _validate_upload(file, raw)) is not None: |
| return err |
| |
| |
| |
| query_fingerprint = hashlib.sha256(raw).hexdigest() |
| ext = Path(file.filename or "").suffix.lower() |
| pipeline = _decode_and_pipeline(raw, ext=ext) |
| if isinstance(pipeline, JSONResponse): |
| return pipeline |
| query_track = _build_track(file, pipeline, source="upload", id_="upload") |
|
|
| if _flat_catalog is None: |
| return { |
| "query": query_track, |
| "neighbors": [], |
| "verdict": "no_corpus", |
| "topMeanPooledSimilarity": 0.0, |
| "topMaxSegmentSimilarity": 0.0, |
| "modelSha": _model_sha, |
| "thresholdDefault": _threshold_default, |
| "acrcloud": acrcloud_engine.to_response_dict(acrcloud_engine.disabled_response()), |
| "queryFingerprint": query_fingerprint, |
| "contextToken": None, |
| } |
|
|
| neighbors = similarity.top_k_neighbors( |
| pipeline["emb"].astype(np.float32), |
| pipeline["segment_embeddings"].astype(np.float32), |
| _flat_catalog, |
| k=k, |
| ) |
|
|
| |
| |
| distribution = _catalog_cosine_distribution if _catalog_cosine_distribution is not None else np.empty((0,), dtype=np.float32) |
| for nb in neighbors: |
| nb["track"] = _corpus_by_id.get(nb["trackId"], {}) |
| raw = float(nb["meanPooledSimilarity"]) |
| seg = float(nb["maxSegmentSimilarity"]) |
| pct = similarity.cosine_to_percentile(raw, distribution) |
| nb["rawCosine"] = raw |
| nb["percentileRank"] = float(pct) |
| nb["similarityLabel"] = similarity.similarity_label(pct) |
| nb["segmentSupport"] = seg |
| |
| nb["calibratedScore"] = float(pct) |
| |
| |
| |
| |
| q_win = int(nb.pop("matchQueryWindow", 0)) |
| c_win = int(nb.pop("matchCatalogWindow", 0)) |
| win_s = float(config.CLAP_WINDOW_SECONDS) |
| nb["matchTimestamp"] = { |
| "queryStartSec": q_win * win_s, |
| "queryEndSec": (q_win + 1) * win_s, |
| "catalogStartSec": c_win * win_s, |
| "catalogEndSec": (c_win + 1) * win_s, |
| "windowSeconds": win_s, |
| } |
|
|
| |
| |
| |
| |
| nb["criteria"] = _build_criteria_block(pipeline.get("mir"), nb["track"].get("mir_features")) |
|
|
| specificity = float(similarity.query_specificity(pipeline["emb"].astype(np.float32), _flat_catalog)) |
| acr = acrcloud_engine.call_for_query(pipeline["acrcloud_audio"]) |
| acr_response = acrcloud_engine.to_response_dict(acr) |
|
|
| |
| |
| |
| |
| ctx_token = None |
| if context_token.is_configured(): |
| neighbor_fragments: dict[str, dict] = {} |
| for nb in neighbors: |
| track = nb.get("track") or {} |
| ts = nb.get("matchTimestamp") or {} |
| neighbor_fragments[str(nb["trackId"])] = context_token.neighbor_context_fragment( |
| track_id=str(nb["trackId"]), |
| title=str(track.get("title") or nb["trackId"]), |
| artist=track.get("artist"), |
| query_window=( |
| float(ts.get("queryStartSec", 0.0)), |
| float(ts.get("queryEndSec", 0.0)), |
| ), |
| match_window=( |
| float(ts.get("catalogStartSec", 0.0)), |
| float(ts.get("catalogEndSec", 0.0)), |
| ), |
| raw_cosine=float(nb.get("rawCosine", 0.0)), |
| criteria=_criteria_to_token_fragment(nb.get("criteria")), |
| ) |
| ctx_token = context_token.issue( |
| query_fingerprint=query_fingerprint, |
| model_sha=_model_sha or "unpinned", |
| catalog_sha=_catalog_sha or "no-catalog", |
| neighbors=neighbor_fragments, |
| acrcloud_cover_song_id=acr_response.get("coverSongId"), |
| ) |
|
|
| return { |
| "query": query_track, |
| "neighbors": neighbors, |
| "topMeanPooledSimilarity": float(neighbors[0]["meanPooledSimilarity"]) if neighbors else 0.0, |
| "topMaxSegmentSimilarity": float(neighbors[0]["maxSegmentSimilarity"]) if neighbors else 0.0, |
| "topPercentileRank": float(neighbors[0]["percentileRank"]) if neighbors else 0.0, |
| "topSimilarityLabel": neighbors[0]["similarityLabel"] if neighbors else "weak", |
| "querySpecificity": specificity, |
| "modelSha": _model_sha, |
| "thresholdDefault": _threshold_default, |
| "acrcloud": acr_response, |
| "queryFingerprint": query_fingerprint, |
| "contextToken": ctx_token, |
| } |
|
|
|
|
| def _criteria_to_token_fragment(criteria_block: dict | None) -> list[dict] | None: |
| """Reshape /neighbors' criteria block into the list-of-CriterionContext |
| form Codex's rag_narrative module expects. |
| |
| The /neighbors response groups criteria by id under a top-level dict; |
| NarrativeContext takes a flat list of {id, queryValue, matchValue, |
| agreement, label}. Convert here so the token payload matches the |
| NarrativeContext shape directly. |
| """ |
| if not criteria_block: |
| return None |
| out: list[dict] = [] |
| for cid in ("tempo", "key", "harmonic", "timbre"): |
| entry = criteria_block.get(cid) |
| if not entry: |
| continue |
| |
| |
| |
| |
| q_val = entry.get("queryValue") |
| m_val = entry.get("matchValue") |
| if cid in ("harmonic", "timbre") and q_val is None and m_val is None: |
| q_val = {"vector": "elided"} |
| m_val = {"vector": "elided"} |
| out.append({ |
| "id": cid, |
| "queryValue": q_val, |
| "matchValue": m_val, |
| "agreement": float(entry.get("agreement", 0.0)), |
| "label": str(entry.get("label", "")), |
| }) |
| return out or None |
|
|
|
|
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
|
|
|
|
| class NarrativeRequest(BaseModel): |
| contextToken: str = Field(..., min_length=1) |
| trackId: str = Field(..., min_length=1) |
| mode: str = Field(..., min_length=1) |
|
|
|
|
| _TOKEN_ERROR_TO_HTTP = { |
| "malformed": (400, "malformed-token"), |
| "invalid-signature": (401, "invalid-token"), |
| "token-expired": (412, "token-expired"), |
| "stale-model": (412, "stale-token"), |
| "stale-catalog": (412, "stale-token"), |
| "hmac-key-missing": (503, "narrative-disabled"), |
| } |
|
|
|
|
| @app.post("/narrative") |
| async def narrative_endpoint(req: NarrativeRequest): |
| """RAG explanatory layer — see ADR-0005 for the full spec.""" |
| with narrative_telemetry.measure_call(req.mode) as tel: |
| |
| if not os.getenv("OPENAI_API_KEY", "").strip(): |
| tel.set(error_code="narrative-disabled") |
| return _err(503, "narrative-disabled") |
| |
| if not context_token.is_configured(): |
| tel.set(error_code="narrative-disabled") |
| return _err(503, "narrative-disabled") |
| |
| if req.mode not in ("whySimilar", "creatorAdvice"): |
| tel.set(error_code="unsupported-mode") |
| return _err(422, "unsupported-mode") |
|
|
| |
| try: |
| verified = context_token.verify( |
| req.contextToken, |
| expected_model_sha=_model_sha or "unpinned", |
| expected_catalog_sha=_catalog_sha or "no-catalog", |
| ) |
| except context_token.TokenError as exc: |
| status, code = _TOKEN_ERROR_TO_HTTP.get(exc.code, (400, "malformed-token")) |
| tel.set(error_code=code) |
| return _err(status, code) |
|
|
| |
| fragment = verified.neighbors.get(req.trackId) |
| if not fragment: |
| tel.set(error_code="not-in-context", trackId=req.trackId) |
| return _err(404, "not-in-context") |
|
|
| |
| |
| |
| |
| try: |
| from . import rag_narrative |
| except ImportError: |
| tel.set(error_code="narrative-disabled") |
| return _err(503, "narrative-disabled") |
|
|
| |
| |
| try: |
| context = rag_narrative.NarrativeContext( |
| queryFingerprint=verified.queryFingerprint, |
| trackId=fragment["trackId"], |
| title=fragment.get("title", ""), |
| artist=fragment.get("artist"), |
| queryWindow=tuple(fragment["queryWindow"]), |
| matchWindow=tuple(fragment["matchWindow"]), |
| rawCosine=float(fragment["rawCosine"]), |
| criteria=[ |
| rag_narrative.CriterionContext(**c) |
| for c in (fragment.get("criteria") or []) |
| ], |
| acrcloudCoverSongId=verified.acrcloudCoverSongId, |
| ) |
| except Exception: |
| |
| |
| tel.set(error_code="malformed-context", trackId=req.trackId) |
| return _err(422, "malformed-context") |
|
|
| model_id = os.getenv("OPENAI_MODEL_ID", "gpt-4o-mini") |
| try: |
| result = rag_narrative.generate_narrative( |
| context, |
| req.mode, |
| model_sha=_model_sha or "unpinned", |
| catalog_sha=_catalog_sha or "no-catalog", |
| model_id=model_id, |
| ) |
| except Exception as exc: |
| print(f"[api] /narrative generate_narrative raised: {exc!r}") |
| tel.set(error_code="narrative-error", trackId=req.trackId) |
| return _err(500, "narrative-error") |
|
|
| |
| |
| |
| |
| |
| result_kind = getattr(result, "kind", None) |
| completion_chars = 0 |
| if result_kind == "narrative": |
| completion_chars = len(getattr(result, "prose", "") or "") |
| |
| |
| |
| prompt_chars_estimate = len(fragment.get("title", "")) + 600 |
| tel.set( |
| result_kind=result_kind, |
| openai_called=(result_kind == "narrative" or result_kind == "unavailable"), |
| gate_short_circuit=(result_kind == "low_confidence"), |
| prompt_chars=prompt_chars_estimate, |
| completion_chars=completion_chars, |
| trackId=req.trackId, |
| ) |
|
|
| |
| |
| |
| if hasattr(result, "model_dump"): |
| return result.model_dump() |
| return result |
|
|
|
|
| @app.get("/narrative/stats") |
| def narrative_stats_endpoint() -> dict: |
| """Return the in-process counters snapshot for the /narrative layer. |
| |
| Senior-reviewer-friendly visibility into what's actually happening in |
| production — call counts, latency percentiles, mode distribution, |
| error distribution, rough cost estimate. Counters reset on restart; |
| this is not a long-term metrics store, it's a "right now" snapshot. |
| |
| Cost estimate is char-based × GPT-4o-mini pricing — directional, not |
| accounting-grade. The honest framing from ADR-0005 holds. |
| """ |
| return narrative_telemetry.snapshot() |
|
|
|
|
| def run() -> None: |
| """Convenience launcher: `python -m backend.api` or `uvicorn backend.api:app`.""" |
| import uvicorn |
|
|
| uvicorn.run( |
| "backend.api:app", |
| host="0.0.0.0", |
| port=int(os.getenv("PORT", "8000")), |
| reload=False, |
| ) |
|
|
|
|
| if __name__ == "__main__": |
| run() |
|
|