""" Jira RAG Test Case Generator — HuggingFace Spaces Edition Inference backend : StableHorde (free, crowdsourced GPU) Embedding backend : fastembed — BAAI/bge-base-en-v1.5 (local ONNX, no API needed) Retrieval backend : Pinecone vector search Endpoints: POST /generate → JSON test cases POST /stream → SSE streaming (text/event-stream) POST /gentest → Slack slash command handler GET /health → liveness + model info """ import asyncio import hashlib import hmac import json import os import re import sys import threading import time import httpx from dotenv import load_dotenv from fastapi import BackgroundTasks, FastAPI, Request, Response from fastapi.responses import JSONResponse, StreamingResponse from pydantic import BaseModel os.environ.setdefault("PYTHONUNBUFFERED", "1") print(f"[boot] Python {sys.version} starting up", flush=True) load_dotenv() # ── Config ───────────────────────────────────────────────────────────────────── PINECONE_API_KEY = os.environ.get("PINECONE_API_KEY", "") INDEX_NAME = "jira-qwen3-06b" TOP_K = 5 MAX_CONTEXT_CHARS = 1400 MIN_SCORE = 0.35 # ── Embedding model ──────────────────────────────────────────────────────────── EMBED_MODEL = "Qwen/Qwen3-Embedding-0.6B" # ── StableHorde inference ────────────────────────────────────────────────────── STABLEHORDE_API = "https://stablehorde.net/api/v2" STABLEHORDE_API_KEY = os.environ.get("STABLEHORDE_API_KEY", "") STABLEHORDE_MODEL = os.environ.get("STABLEHORDE_MODEL", "TheDrummer/Cydonia-24B-v4.3") STABLEHORDE_MAX_LEN = int(os.environ.get("STABLEHORDE_MAX_LEN", "2048")) # ── Groq inference (primary — free tier, fast) ───────────────────────────────── GROQ_API_KEY = os.environ.get("GROQ_API_KEY", "") GROQ_MODEL = os.environ.get("GROQ_MODEL", "qwen/qwen3-32b") GROQ_API = "https://api.groq.com/openai/v1" # ── Privacy helpers ──────────────────────────────────────────────────────────── _TICKET_RE = re.compile(r"\b[A-Z][A-Z0-9]{1,5}-\d+\b") _JIRA_MENTION_RE = re.compile(r"\[~[^\]]+\]") _SLACK_MENTION_RE = re.compile(r"<@[A-Z0-9]+>") def sanitize(text: str) -> str: t = _TICKET_RE.sub("", text) t = _JIRA_MENTION_RE.sub("", t) t = _SLACK_MENTION_RE.sub("", t) return re.sub(r" +", " ", t) def _anonymize(text: str) -> str: text = re.sub(r"\b(?:Assignee|Reporter):\s*[^|\n]+", lambda m: m.group(0).split(":")[0] + ": [person]", text) return re.sub(r"Comment by [^:\n]+:", "Comment by [person]:", text) # ── Team → Pinecone filter mapping ──────────────────────────────────────────── TEAMS = ["casting", "talent", "agency", "be", "mobile"] _TEAM_PROJECTS: dict[str, list[str]] = { "casting": ["CAV1"], "agency": ["AAV1", "AAV2"], "be": ["BE"], "talent": ["TWV2", "PA"], "mobile": ["TM2"], } _TEAM_COMPONENTS: dict[str, list[str]] = { "casting": ["Casting Web", "Studio App"], "agency": ["Agent Web"], "be": ["General Microservice", "Login Microservice"], "talent": ["Talent Web"], "mobile": ["Talent App - iOS", "Talent App - Android Kotlin"], } def _team_filter(team: str) -> dict | None: t = team.lower() if t not in TEAMS: return None conds: list[dict] = [] non_ca = _TEAM_PROJECTS.get(t, []) if non_ca: conds.append({"project": {"$in": non_ca}}) comps = _TEAM_COMPONENTS.get(t, []) if comps: conds.append({"$and": [{"project": {"$eq": "CA"}}, {"components": {"$in": comps}}]}) if not conds: return None return {"$or": conds} if len(conds) > 1 else conds[0] # ── Prompts ──────────────────────────────────────────────────────────────────── _PRIVACY_RULE = ( "NEVER include ticket IDs or issue keys (e.g. CAV-123, CA-456, BE-789, " "AAV1-101, TWV2-202, or any other KEY-NUMBER pattern) in your response. " "NEVER include any person's name." ) _BASE_RULES = f"""\ RULES: 1. Use the context as your grounding — extract exact field names, error messages, \ platform versions, and behaviors described there. 2. Synthesize and extend coverage to edge cases, negative paths, and boundary \ conditions the context implies. 3. Do NOT repeat existing test cases verbatim — always add QA value. 4. Generate at least 5 test cases; do NOT stop early. 5. {_PRIVACY_RULE} 6. Only generate test cases for the team indicated in the feature request.""" SYSTEM_PROMPT_STREAM = f"""\ You are a senior QA engineer. Use the numbered context snippets as your \ primary knowledge base. {_BASE_RULES} For EACH test case use exactly this structure: *Test Case N: * *Steps:* 1. <step> 2. <step> *Expected Result:* • <expected outcome> --- Cover happy path, edge cases, negative cases, and boundary conditions.""" SYSTEM_PROMPT_JSON = f"""\ You are a senior QA engineer. Use the numbered context snippets as your \ primary knowledge base. {_BASE_RULES} Output ONLY lines in this exact format — no JSON, no markdown, nothing else: TC: <title> | S: <step1>;<step2>;<step3> | E: <result1>;<result2> Rules: - Each test case is ONE line starting with "TC:" - Steps separated by semicolons after "S:" - Expected results separated by semicolons after "E:" - Keep each step and result under 10 words - Generate 5+ test cases covering happy path, edge cases, negatives""" def _build_prompt(feature: str, matches: list, team: str | None = None) -> str: seen: set[str] = set() parts: list[str] = [] filtered = 0 for m in matches: if m.id in seen: continue seen.add(m.id) if getattr(m, "score", 1.0) < MIN_SCORE: filtered += 1 continue text = sanitize(_anonymize(m.metadata.get("text", ""))) meta = m.metadata or {} hdr = [] if meta.get("issue_type"): hdr.append(meta["issue_type"]) if meta.get("priority"): hdr.append(f"Priority:{meta['priority']}") if meta.get("status"): hdr.append(f"Status:{meta['status']}") if meta.get("components"): comps = meta["components"] if isinstance(meta["components"], list) else [meta["components"]] hdr.append(f"Component:{'/'.join(comps)}") if meta.get("labels"): labels = meta["labels"] if isinstance(meta["labels"], list) else [meta["labels"]] hdr.append(f"Labels:{','.join(labels)}") header = f"[{' | '.join(hdr)}]\n" if hdr else "" snippet = text[:MAX_CONTEXT_CHARS].strip() if len(text) > MAX_CONTEXT_CHARS: snippet += "..." if snippet: parts.append(f"[{len(parts) + 1}] {header}{snippet}") if parts: note = f"{len(parts)} snippet(s)" if filtered: note += f"; {filtered} low-relevance excluded" context = "\n\n".join(parts) else: note = "no relevant context found" context = "⚠️ No relevant context retrieved. Generate generic test cases and note the lack of grounding." team_note = f" | Team: {team.upper()}" if team else "" return ( f"## Context ({note}{team_note}):\n{context}\n\n" f"---\n## Feature to test:\n{feature}\n\n" f"---\nGenerate test cases now:" ) # ── StableHorde sampling params ──────────────────────────────────────────────── # stop_sequence requires trusted user status on StableHorde — omit it _HORDE_PARAMS = dict( temperature=0.6, top_k=20, top_p=0.95, rep_pen=1.05, max_length=STABLEHORDE_MAX_LEN, max_context_length=4096, ) # ── App state ────────────────────────────────────────────────────────────────── _index = None _embedder = None _models_ready = threading.Event() # set when models are loaded _models_lock = threading.Lock() _models_error: str | None = None def _ensure_models() -> None: """Lazy-load sentence-transformers + Pinecone on first call. Thread-safe.""" global _index, _embedder, _models_error if _models_ready.is_set(): return with _models_lock: if _models_ready.is_set(): return try: from sentence_transformers import SentenceTransformer print(f"[lazy-load] loading embedding model {EMBED_MODEL} …", flush=True) _embedder = SentenceTransformer(EMBED_MODEL, device="cpu") _embedder.max_seq_length = 256 # cap context to save RAM on cpu-basic _embedder.encode(["warmup"], normalize_embeddings=True) print("[lazy-load] embedding model ready", flush=True) from pinecone import Pinecone # deferred — avoids gRPC init on startup print("[lazy-load] connecting to Pinecone …", flush=True) _index = Pinecone(api_key=PINECONE_API_KEY).Index(INDEX_NAME) print(f"[lazy-load] ready ({STABLEHORDE_MODEL})", flush=True) except Exception as exc: _models_error = str(exc) print(f"[lazy-load] FATAL: {exc}", flush=True) finally: _models_ready.set() app = FastAPI() CORS_HEADERS = { "Access-Control-Allow-Origin": "*", "Access-Control-Allow-Methods": "GET, POST, OPTIONS", "Access-Control-Allow-Headers": "Content-Type", } @app.options("/{path:path}") async def preflight(_: str): return JSONResponse(None, status_code=204, headers=CORS_HEADERS) # ── Core helpers ─────────────────────────────────────────────────────────────── async def _embed(text: str) -> list[float]: """Embed text — lazy-loads sentence-transformers model on first call.""" loop = asyncio.get_event_loop() await loop.run_in_executor(None, _ensure_models) if _embedder is None: raise RuntimeError(f"Embedding model failed to load: {_models_error}") QUERY_INSTR = "Instruct: Retrieve relevant Jira tickets for QA test generation.\nQuery: " prompted = QUERY_INSTR + text if "Qwen3-Embedding" in EMBED_MODEL else text result = await loop.run_in_executor( None, lambda: _embedder.encode([prompted], normalize_embeddings=True)[0] ) return result.tolist() async def _retrieve(feature: str, team: str | None = None) -> list: vec = await _embed(feature) f = _team_filter(team) if team else None k = TOP_K * 2 if f else TOP_K return _index.query(vector=vec, top_k=k, include_metadata=True, filter=f).matches async def _groq_generate(system: str, user: str) -> str: """Call Groq API and return generated text (strips thinking blocks).""" is_thinking = "qwen3" in GROQ_MODEL # Groq's qwen3 uses /think or /no_think suffix to control thinking mode user_msg = user + "\n/think" if is_thinking else user async with httpx.AsyncClient(timeout=120.0) as client: r = await client.post( f"{GROQ_API}/chat/completions", json={ "model": GROQ_MODEL, "messages": [ {"role": "system", "content": system}, {"role": "user", "content": user_msg}, ], "max_tokens": 8192 if is_thinking else 2048, "temperature": 0.6, "top_p": 0.95, }, headers={"Authorization": f"Bearer {GROQ_API_KEY}"}, ) r.raise_for_status() text = r.json()["choices"][0]["message"]["content"] return re.sub(r"<think>.*?</think>", "", text, flags=re.DOTALL).strip() async def _groq_stream(system: str, user: str): """Async generator: streams Groq response, skipping thinking blocks.""" is_thinking = "qwen3" in GROQ_MODEL user_msg = user + "\n/think" if is_thinking else user in_think = False think_buf = "" async with httpx.AsyncClient(timeout=120.0) as client: async with client.stream( "POST", f"{GROQ_API}/chat/completions", json={ "model": GROQ_MODEL, "messages": [ {"role": "system", "content": system}, {"role": "user", "content": user_msg}, ], "max_tokens": 8192 if is_thinking else 2048, "temperature": 0.6, "top_p": 0.95, "stream": True, }, headers={"Authorization": f"Bearer {GROQ_API_KEY}"}, ) as resp: resp.raise_for_status() async for line in resp.aiter_lines(): if not line.startswith("data: "): continue data = line[6:] if data == "[DONE]": return try: chunk = json.loads(data) delta = chunk["choices"][0]["delta"].get("content", "") or "" if not delta: continue # Strip <think>...</think> blocks from stream if is_thinking: think_buf += delta while True: if in_think: end = think_buf.find("</think>") if end == -1: think_buf = "" break think_buf = think_buf[end + 8:] in_think = False else: start = think_buf.find("<think>") if start == -1: yield think_buf think_buf = "" break if start > 0: yield think_buf[:start] think_buf = think_buf[start + 7:] in_think = True else: yield delta except Exception: continue async def _horde_generate(prompt: str) -> str: """Submit a generation job to StableHorde and return the raw text.""" async with httpx.AsyncClient(timeout=300.0) as client: r = await client.post( f"{STABLEHORDE_API}/generate/text/async", json={"prompt": prompt, "params": {**_HORDE_PARAMS}, "models": [STABLEHORDE_MODEL]}, headers={"apikey": STABLEHORDE_API_KEY}, ) r.raise_for_status() job_id = r.json()["id"] while True: await asyncio.sleep(2.0) r = await client.get( f"{STABLEHORDE_API}/generate/text/status/{job_id}", headers={"apikey": STABLEHORDE_API_KEY}, ) data = r.json() if data.get("faulted"): raise RuntimeError("StableHorde job faulted") if data.get("done"): gens = data.get("generations", []) return gens[0]["text"] if gens else "" def _parse_compact(text: str) -> list[dict]: """Parse compact TC: ... | S: ... | E: ... lines into dicts.""" cases = [] for line in text.splitlines(): line = line.strip() if not line.upper().startswith("TC:"): continue parts = [p.strip() for p in line.split("|")] if len(parts) < 3: continue title = parts[0][3:].strip() steps_raw = next((p[2:].strip() for p in parts if p.strip().upper().startswith("S:")), "") expect_raw = next((p[2:].strip() for p in parts if p.strip().upper().startswith("E:")), "") steps = [s.strip() for s in steps_raw.split(";") if s.strip()] expect = [e.strip() for e in expect_raw.split(";") if e.strip()] if title and steps and expect: cases.append({"title": title, "steps": steps, "expected_result": expect}) return cases def _fmt_slack_case(i: int, tc: dict) -> str: """Format a single parsed test case as Slack markdown.""" lines = [f"*Test Case {i}: {tc['title']}*", "", "*Steps:*"] for j, step in enumerate(tc["steps"], 1): lines.append(f"{j}. {step}") lines += ["", "*Expected Result:*"] for exp in tc["expected_result"]: lines.append(f"• {exp}") lines += ["", "---", ""] return "\n".join(lines) def _make_prompt(system: str, user: str, assistant_prefix: str = "") -> str: """Format as ChatML for StableHorde workers. assistant_prefix seeds the assistant turn to steer the model's output format. """ return ( f"<|im_start|>system\n{system}\n<|im_end|>\n" f"<|im_start|>user\n{user}<|im_end|>\n" f"<|im_start|>assistant\n{assistant_prefix}" ) async def _generate_full(system: str, user: str, assistant_prefix: str = "") -> str: """Generate text — uses Groq if GROQ_API_KEY is set, else StableHorde.""" if GROQ_API_KEY: return sanitize(await _groq_generate(system, user)) prompt = _make_prompt(system, user, assistant_prefix) text = await _horde_generate(prompt) return sanitize((assistant_prefix + text).strip()) async def _horde_stream(system: str, user: str, assistant_prefix: str = ""): """Async generator: streams response. Uses Groq if available, else StableHorde.""" if GROQ_API_KEY: async for token in _groq_stream(system, user): yield token return prompt = _make_prompt(system, user, assistant_prefix) async with httpx.AsyncClient(timeout=300.0) as client: r = await client.post( f"{STABLEHORDE_API}/generate/text/async", json={"prompt": prompt, "params": {**_HORDE_PARAMS}, "models": [STABLEHORDE_MODEL]}, headers={"apikey": STABLEHORDE_API_KEY}, ) r.raise_for_status() job_id = r.json()["id"] prev_pos = None while True: await asyncio.sleep(2.0) r = await client.get( f"{STABLEHORDE_API}/generate/text/status/{job_id}", headers={"apikey": STABLEHORDE_API_KEY}, ) data = r.json() if data.get("faulted"): yield "❌ StableHorde generation faulted." return if data.get("done"): gens = data.get("generations", []) text = sanitize(gens[0]["text"]) if gens else "" step = 20 for i in range(0, len(text), step): yield text[i: i + step] return pos = data.get("queue_position") wait = data.get("wait_time", "?") if pos != prev_pos: yield f"<think>⏳ Queue position {pos} (~{wait}s)…</think>\n" prev_pos = pos def _fmt_think(raw: str) -> str: """ Replace <think>...</think> blocks with an italicised single-line summary. An unclosed <think> (model still reasoning) becomes _🤔 thinking..._ """ result = re.sub( r"<think>(.*?)</think>", lambda m: ( "_" + " ".join(ln.strip() for ln in m.group(1).strip().splitlines() if ln.strip()) + "_\n\n" if m.group(1).strip() else "" ), raw, flags=re.DOTALL, ) open_idx = result.rfind("<think>") if open_idx != -1: result = result[:open_idx] + "_🤔 thinking..._" return result # ── Request / response models ────────────────────────────────────────────────── class GenerateRequest(BaseModel): feature: str team: str | None = None # casting | talent | agency | be | mobile class TestCase(BaseModel): title: str steps: list[str] expected_result: list[str] @classmethod def validate_list(cls, raw: list) -> tuple[list["TestCase"], list[str]]: valid, errors = [], [] required = ("title", "steps", "expected_result") for i, item in enumerate(raw): if not isinstance(item, dict): errors.append(f"item {i}: not an object") continue missing = [k for k in required if k not in item] if missing: errors.append(f"item {i}: missing {missing}") continue try: valid.append(cls(**item)) except Exception as e: errors.append(f"item {i}: {e}") return valid, errors # ── Routes ───────────────────────────────────────────────────────────────────── @app.get("/") async def root(): """Root endpoint — HF Spaces probes this to detect a live container.""" ready = _models_ready.is_set() and _embedder is not None return JSONResponse( {"status": "ok" if ready else "starting", "service": "ai-qa-api"}, headers=CORS_HEADERS, ) @app.get("/health") async def health(): ready = _models_ready.is_set() and _embedder is not None llm_backend = "groq" if GROQ_API_KEY else "stablehorde" llm_model = GROQ_MODEL if GROQ_API_KEY else STABLEHORDE_MODEL return JSONResponse( { "status": "ok" if ready else ("error" if _models_error else "starting"), "embed_model": EMBED_MODEL, "llm": llm_backend, "model": llm_model, **({"error": _models_error} if _models_error else {}), }, headers=CORS_HEADERS, ) @app.post("/generate") async def generate_http(req: GenerateRequest): feature = req.feature.strip() if not feature: return JSONResponse({"error": "feature is required"}, status_code=400, headers=CORS_HEADERS) team = req.team.lower().strip() if req.team else None if team and team not in TEAMS: return JSONResponse( {"error": f"unknown team '{team}'. Valid: {', '.join(TEAMS)}"}, status_code=400, headers=CORS_HEADERS, ) try: matches = await _retrieve(feature, team) except Exception as e: return JSONResponse({"error": f"retrieval failed: {e}"}, status_code=502, headers=CORS_HEADERS) user_content = _build_prompt(feature, matches, team) try: raw = await _generate_full(SYSTEM_PROMPT_JSON, user_content, assistant_prefix="TC: ") except Exception as e: return JSONResponse({"error": f"generation failed: {e}"}, status_code=502, headers=CORS_HEADERS) parsed = _parse_compact(raw) valid_cases, schema_errors = TestCase.validate_list(parsed) resp: dict = { "feature": feature, "team": team, "test_cases": [tc.model_dump() for tc in valid_cases], } if not valid_cases: resp["raw"] = raw if schema_errors: resp["warnings"] = schema_errors return JSONResponse(resp, headers=CORS_HEADERS) @app.post("/stream") async def stream_sse(req: GenerateRequest): feature = req.feature.strip() if not feature: return JSONResponse({"error": "feature is required"}, status_code=400, headers=CORS_HEADERS) team = req.team.lower().strip() if req.team else None if team and team not in TEAMS: return JSONResponse( {"error": f"unknown team '{team}'. Valid: {', '.join(TEAMS)}"}, status_code=400, headers=CORS_HEADERS, ) try: matches = await _retrieve(feature, team) except Exception as e: return JSONResponse({"error": f"retrieval failed: {e}"}, status_code=502, headers=CORS_HEADERS) user_content = _build_prompt(feature, matches, team) async def event_generator(): try: async for token in _horde_stream(SYSTEM_PROMPT_STREAM, user_content): yield f"data: {json.dumps({'type': 'output', 'token': token})}\n\n" except Exception as e: yield f"data: {json.dumps({'type': 'error', 'message': str(e)})}\n\n" yield f"data: {json.dumps({'type': 'done'})}\n\n" headers = {**CORS_HEADERS, "Cache-Control": "no-cache", "X-Accel-Buffering": "no"} return StreamingResponse(event_generator(), media_type="text/event-stream", headers=headers) # ── Slack /gentest slash command ─────────────────────────────────────────────── SLACK_BOT_TOKEN = os.environ.get("SLACK_BOT_TOKEN", "") SLACK_SIGNING_SECRET = os.environ.get("SLACK_SIGNING_SECRET", "") SLACK_API = "https://slack.com/api" _MSG_LIMIT = 2_800 _SLACK_INTERVAL = 1.1 # seconds between chat.update calls (Slack rate-limit safe) def _verify_slack(body: bytes, ts: str, sig: str) -> bool: if not SLACK_SIGNING_SECRET: return True if abs(time.time() - int(ts)) > 300: return False base = f"v0:{ts}:{body.decode()}" digest = hmac.new(SLACK_SIGNING_SECRET.encode(), base.encode(), hashlib.sha256).hexdigest() return hmac.compare_digest(f"v0={digest}", sig) def _split(text: str, limit: int = _MSG_LIMIT) -> tuple[str, str]: if len(text) <= limit: return text, "" cut = text.rfind("\n", 0, limit) cut = cut if cut > limit // 2 else limit return text[:cut], text[cut:].lstrip() async def _slack_post( c: httpx.AsyncClient, ch: str, text: str, thread_ts: str | None = None ) -> str: payload: dict = {"channel": ch, "text": text} if thread_ts: payload["thread_ts"] = thread_ts r = await c.post(f"{SLACK_API}/chat.postMessage", json=payload) d = r.json() if not d.get("ok"): raise RuntimeError(f"postMessage: {d.get('error')}") return d["ts"] async def _slack_update(c: httpx.AsyncClient, ch: str, ts: str, text: str): await c.post(f"{SLACK_API}/chat.update", json={"channel": ch, "ts": ts, "text": text}) async def _run_gentest(feature: str, channel_id: str, team: str | None = None): hdrs = {"Authorization": f"Bearer {SLACK_BOT_TOKEN}"} async with httpx.AsyncClient(headers=hdrs, timeout=300.0) as c: tag = f" _(team: {team})_" if team else "" # ── Header — stays in main channel ──────────────────────────────────── header_ts = await _slack_post(c, channel_id, f"🧪 *Test cases:* {feature}{tag}") # ── Live stream — threaded reply under the header ────────────────────── cur_ts = await _slack_post(c, channel_id, "▌", thread_ts=header_ts) last_update = time.time() async def safe_update(ts: str, text: str) -> None: nonlocal last_update gap = _SLACK_INTERVAL - (time.time() - last_update) if gap > 0: await asyncio.sleep(gap) await _slack_update(c, channel_id, ts, text) last_update = time.time() try: matches = await _retrieve(feature, team) user_content = _build_prompt(feature, matches, team) # Stream and accumulate — update Slack every 2s with live preview raw = "" async for token in _horde_stream(SYSTEM_PROMPT_STREAM, user_content): raw += token if time.time() - last_update >= 2.0: preview = raw[-_MSG_LIMIT:].lstrip() if len(raw) > _MSG_LIMIT else raw await safe_update(cur_ts, preview.rstrip() + "\n▌") if not raw: await safe_update(cur_ts, "❌ No output received.") return # Parse into blocks (one per test case, separated by ---) blocks = [b.strip() for b in re.split(r'\n-{3,}\n?', raw.strip()) if b.strip()] if not blocks: blocks = [raw.strip()] def _is_complete(block: str) -> bool: b = block.lower() return ("step" in b or re.search(r'^\d+\.', block, re.MULTILINE)) \ and ("expected" in b or "•" in b) truncated = len(blocks) > 1 and not _is_complete(blocks[-1]) if truncated: blocks = blocks[:-1] # Progressive reveal — reuse the single cursor message, only open new # messages when the current one hits the Slack character limit buf = "" total = len(blocks) for idx, block in enumerate(blocks): entry = block + "\n\n---\n\n" if idx < total - 1 else block if buf and len(buf) + len(entry) >= _MSG_LIMIT: # Finalise current message without cursor, start fresh await safe_update(cur_ts, buf.rstrip()) cur_ts = await _slack_post(c, channel_id, "▌", thread_ts=header_ts) last_update = time.time() buf = "" buf += entry is_last = idx == total - 1 await safe_update(cur_ts, buf.rstrip() if is_last else buf.rstrip() + "\n▌") if truncated: await _slack_post( c, channel_id, "⚠️ _Generation was cut off — last test case was incomplete and omitted. " "Try a more specific feature description._", thread_ts=header_ts) except Exception as exc: try: await _slack_update(c, channel_id, cur_ts, f"❌ Error: {str(exc)[:300]}") except Exception: pass @app.post("/gentest") async def gentest_slack(request: Request, background_tasks: BackgroundTasks): body = await request.body() ts = request.headers.get("X-Slack-Request-Timestamp", "0") sig = request.headers.get("X-Slack-Signature", "") if not _verify_slack(body, ts, sig): return Response(status_code=403) from urllib.parse import parse_qs params = parse_qs(body.decode(), keep_blank_values=True) raw_text = (params.get("text", [""])[0] or "").strip() channel_id = (params.get("channel_id", [""])[0] or "").strip() if not raw_text: return JSONResponse({"response_type": "ephemeral", "text": f"Usage: `/gentest [team] <feature>`\nTeams: {', '.join(TEAMS)}"}) if not SLACK_BOT_TOKEN: return JSONResponse({"response_type": "ephemeral", "text": "❌ `SLACK_BOT_TOKEN` not configured."}) words = raw_text.split() if words[0].lower() in TEAMS: team = words[0].lower() feature = " ".join(words[1:]).strip() else: team = None feature = raw_text if not feature: return JSONResponse({"response_type": "ephemeral", "text": "Please add a feature description after the team name."}) background_tasks.add_task(_run_gentest, feature, channel_id, team) return Response(status_code=200)