"""
Jira RAG Test Case Generator — HuggingFace Spaces Edition
Inference backend : StableHorde (free, crowdsourced GPU)
Embedding backend : fastembed — BAAI/bge-base-en-v1.5 (local ONNX, no API needed)
Retrieval backend : Pinecone vector search
Endpoints:
POST /generate → JSON test cases
POST /stream → SSE streaming (text/event-stream)
POST /gentest → Slack slash command handler
GET /health → liveness + model info
"""
import asyncio
import hashlib
import hmac
import json
import os
import re
import sys
import threading
import time
import httpx
from dotenv import load_dotenv
from fastapi import BackgroundTasks, FastAPI, Request, Response
from fastapi.responses import JSONResponse, StreamingResponse
from pydantic import BaseModel
os.environ.setdefault("PYTHONUNBUFFERED", "1")
print(f"[boot] Python {sys.version} starting up", flush=True)
load_dotenv()
# ── Config ─────────────────────────────────────────────────────────────────────
PINECONE_API_KEY = os.environ.get("PINECONE_API_KEY", "")
INDEX_NAME = "jira-qwen3-06b"
TOP_K = 5
MAX_CONTEXT_CHARS = 1400
MIN_SCORE = 0.35
# ── Embedding model ────────────────────────────────────────────────────────────
EMBED_MODEL = "Qwen/Qwen3-Embedding-0.6B"
# ── StableHorde inference ──────────────────────────────────────────────────────
STABLEHORDE_API = "https://stablehorde.net/api/v2"
STABLEHORDE_API_KEY = os.environ.get("STABLEHORDE_API_KEY", "")
STABLEHORDE_MODEL = os.environ.get("STABLEHORDE_MODEL", "TheDrummer/Cydonia-24B-v4.3")
STABLEHORDE_MAX_LEN = int(os.environ.get("STABLEHORDE_MAX_LEN", "2048"))
# ── Groq inference (primary — free tier, fast) ─────────────────────────────────
GROQ_API_KEY = os.environ.get("GROQ_API_KEY", "")
GROQ_MODEL = os.environ.get("GROQ_MODEL", "qwen/qwen3-32b")
GROQ_API = "https://api.groq.com/openai/v1"
# ── Privacy helpers ────────────────────────────────────────────────────────────
_TICKET_RE = re.compile(r"\b[A-Z][A-Z0-9]{1,5}-\d+\b")
_JIRA_MENTION_RE = re.compile(r"\[~[^\]]+\]")
_SLACK_MENTION_RE = re.compile(r"<@[A-Z0-9]+>")
def sanitize(text: str) -> str:
t = _TICKET_RE.sub("", text)
t = _JIRA_MENTION_RE.sub("", t)
t = _SLACK_MENTION_RE.sub("", t)
return re.sub(r" +", " ", t)
def _anonymize(text: str) -> str:
text = re.sub(r"\b(?:Assignee|Reporter):\s*[^|\n]+",
lambda m: m.group(0).split(":")[0] + ": [person]", text)
return re.sub(r"Comment by [^:\n]+:", "Comment by [person]:", text)
# ── Team → Pinecone filter mapping ────────────────────────────────────────────
TEAMS = ["casting", "talent", "agency", "be", "mobile"]
_TEAM_PROJECTS: dict[str, list[str]] = {
"casting": ["CAV1"],
"agency": ["AAV1", "AAV2"],
"be": ["BE"],
"talent": ["TWV2", "PA"],
"mobile": ["TM2"],
}
_TEAM_COMPONENTS: dict[str, list[str]] = {
"casting": ["Casting Web", "Studio App"],
"agency": ["Agent Web"],
"be": ["General Microservice", "Login Microservice"],
"talent": ["Talent Web"],
"mobile": ["Talent App - iOS", "Talent App - Android Kotlin"],
}
def _team_filter(team: str) -> dict | None:
t = team.lower()
if t not in TEAMS:
return None
conds: list[dict] = []
non_ca = _TEAM_PROJECTS.get(t, [])
if non_ca:
conds.append({"project": {"$in": non_ca}})
comps = _TEAM_COMPONENTS.get(t, [])
if comps:
conds.append({"$and": [{"project": {"$eq": "CA"}}, {"components": {"$in": comps}}]})
if not conds:
return None
return {"$or": conds} if len(conds) > 1 else conds[0]
# ── Prompts ────────────────────────────────────────────────────────────────────
_PRIVACY_RULE = (
"NEVER include ticket IDs or issue keys (e.g. CAV-123, CA-456, BE-789, "
"AAV1-101, TWV2-202, or any other KEY-NUMBER pattern) in your response. "
"NEVER include any person's name."
)
_BASE_RULES = f"""\
RULES:
1. Use the context as your grounding — extract exact field names, error messages, \
platform versions, and behaviors described there.
2. Synthesize and extend coverage to edge cases, negative paths, and boundary \
conditions the context implies.
3. Do NOT repeat existing test cases verbatim — always add QA value.
4. Generate at least 5 test cases; do NOT stop early.
5. {_PRIVACY_RULE}
6. Only generate test cases for the team indicated in the feature request."""
SYSTEM_PROMPT_STREAM = f"""\
You are a senior QA engineer. Use the numbered context snippets as your \
primary knowledge base.
{_BASE_RULES}
For EACH test case use exactly this structure:
*Test Case N:
*
*Steps:*
1.
2.
*Expected Result:*
•
---
Cover happy path, edge cases, negative cases, and boundary conditions."""
SYSTEM_PROMPT_JSON = f"""\
You are a senior QA engineer. Use the numbered context snippets as your \
primary knowledge base.
{_BASE_RULES}
Output ONLY lines in this exact format — no JSON, no markdown, nothing else:
TC: | S: ;; | E: ;
Rules:
- Each test case is ONE line starting with "TC:"
- Steps separated by semicolons after "S:"
- Expected results separated by semicolons after "E:"
- Keep each step and result under 10 words
- Generate 5+ test cases covering happy path, edge cases, negatives"""
def _build_prompt(feature: str, matches: list, team: str | None = None) -> str:
seen: set[str] = set()
parts: list[str] = []
filtered = 0
for m in matches:
if m.id in seen:
continue
seen.add(m.id)
if getattr(m, "score", 1.0) < MIN_SCORE:
filtered += 1
continue
text = sanitize(_anonymize(m.metadata.get("text", "")))
meta = m.metadata or {}
hdr = []
if meta.get("issue_type"):
hdr.append(meta["issue_type"])
if meta.get("priority"):
hdr.append(f"Priority:{meta['priority']}")
if meta.get("status"):
hdr.append(f"Status:{meta['status']}")
if meta.get("components"):
comps = meta["components"] if isinstance(meta["components"], list) else [meta["components"]]
hdr.append(f"Component:{'/'.join(comps)}")
if meta.get("labels"):
labels = meta["labels"] if isinstance(meta["labels"], list) else [meta["labels"]]
hdr.append(f"Labels:{','.join(labels)}")
header = f"[{' | '.join(hdr)}]\n" if hdr else ""
snippet = text[:MAX_CONTEXT_CHARS].strip()
if len(text) > MAX_CONTEXT_CHARS:
snippet += "..."
if snippet:
parts.append(f"[{len(parts) + 1}] {header}{snippet}")
if parts:
note = f"{len(parts)} snippet(s)"
if filtered:
note += f"; {filtered} low-relevance excluded"
context = "\n\n".join(parts)
else:
note = "no relevant context found"
context = "⚠️ No relevant context retrieved. Generate generic test cases and note the lack of grounding."
team_note = f" | Team: {team.upper()}" if team else ""
return (
f"## Context ({note}{team_note}):\n{context}\n\n"
f"---\n## Feature to test:\n{feature}\n\n"
f"---\nGenerate test cases now:"
)
# ── StableHorde sampling params ────────────────────────────────────────────────
# stop_sequence requires trusted user status on StableHorde — omit it
_HORDE_PARAMS = dict(
temperature=0.6,
top_k=20,
top_p=0.95,
rep_pen=1.05,
max_length=STABLEHORDE_MAX_LEN,
max_context_length=4096,
)
# ── App state ──────────────────────────────────────────────────────────────────
_index = None
_embedder = None
_models_ready = threading.Event() # set when models are loaded
_models_lock = threading.Lock()
_models_error: str | None = None
def _ensure_models() -> None:
"""Lazy-load sentence-transformers + Pinecone on first call. Thread-safe."""
global _index, _embedder, _models_error
if _models_ready.is_set():
return
with _models_lock:
if _models_ready.is_set():
return
try:
from sentence_transformers import SentenceTransformer
print(f"[lazy-load] loading embedding model {EMBED_MODEL} …", flush=True)
_embedder = SentenceTransformer(EMBED_MODEL, device="cpu")
_embedder.max_seq_length = 256 # cap context to save RAM on cpu-basic
_embedder.encode(["warmup"], normalize_embeddings=True)
print("[lazy-load] embedding model ready", flush=True)
from pinecone import Pinecone # deferred — avoids gRPC init on startup
print("[lazy-load] connecting to Pinecone …", flush=True)
_index = Pinecone(api_key=PINECONE_API_KEY).Index(INDEX_NAME)
print(f"[lazy-load] ready ({STABLEHORDE_MODEL})", flush=True)
except Exception as exc:
_models_error = str(exc)
print(f"[lazy-load] FATAL: {exc}", flush=True)
finally:
_models_ready.set()
app = FastAPI()
CORS_HEADERS = {
"Access-Control-Allow-Origin": "*",
"Access-Control-Allow-Methods": "GET, POST, OPTIONS",
"Access-Control-Allow-Headers": "Content-Type",
}
@app.options("/{path:path}")
async def preflight(_: str):
return JSONResponse(None, status_code=204, headers=CORS_HEADERS)
# ── Core helpers ───────────────────────────────────────────────────────────────
async def _embed(text: str) -> list[float]:
"""Embed text — lazy-loads sentence-transformers model on first call."""
loop = asyncio.get_event_loop()
await loop.run_in_executor(None, _ensure_models)
if _embedder is None:
raise RuntimeError(f"Embedding model failed to load: {_models_error}")
QUERY_INSTR = "Instruct: Retrieve relevant Jira tickets for QA test generation.\nQuery: "
prompted = QUERY_INSTR + text if "Qwen3-Embedding" in EMBED_MODEL else text
result = await loop.run_in_executor(
None, lambda: _embedder.encode([prompted], normalize_embeddings=True)[0]
)
return result.tolist()
async def _retrieve(feature: str, team: str | None = None) -> list:
vec = await _embed(feature)
f = _team_filter(team) if team else None
k = TOP_K * 2 if f else TOP_K
return _index.query(vector=vec, top_k=k, include_metadata=True, filter=f).matches
async def _groq_generate(system: str, user: str) -> str:
"""Call Groq API and return generated text (strips thinking blocks)."""
is_thinking = "qwen3" in GROQ_MODEL
# Groq's qwen3 uses /think or /no_think suffix to control thinking mode
user_msg = user + "\n/think" if is_thinking else user
async with httpx.AsyncClient(timeout=120.0) as client:
r = await client.post(
f"{GROQ_API}/chat/completions",
json={
"model": GROQ_MODEL,
"messages": [
{"role": "system", "content": system},
{"role": "user", "content": user_msg},
],
"max_tokens": 8192 if is_thinking else 2048,
"temperature": 0.6,
"top_p": 0.95,
},
headers={"Authorization": f"Bearer {GROQ_API_KEY}"},
)
r.raise_for_status()
text = r.json()["choices"][0]["message"]["content"]
return re.sub(r".*?", "", text, flags=re.DOTALL).strip()
async def _groq_stream(system: str, user: str):
"""Async generator: streams Groq response, skipping thinking blocks."""
is_thinking = "qwen3" in GROQ_MODEL
user_msg = user + "\n/think" if is_thinking else user
in_think = False
think_buf = ""
async with httpx.AsyncClient(timeout=120.0) as client:
async with client.stream(
"POST", f"{GROQ_API}/chat/completions",
json={
"model": GROQ_MODEL,
"messages": [
{"role": "system", "content": system},
{"role": "user", "content": user_msg},
],
"max_tokens": 8192 if is_thinking else 2048,
"temperature": 0.6,
"top_p": 0.95,
"stream": True,
},
headers={"Authorization": f"Bearer {GROQ_API_KEY}"},
) as resp:
resp.raise_for_status()
async for line in resp.aiter_lines():
if not line.startswith("data: "):
continue
data = line[6:]
if data == "[DONE]":
return
try:
chunk = json.loads(data)
delta = chunk["choices"][0]["delta"].get("content", "") or ""
if not delta:
continue
# Strip ... blocks from stream
if is_thinking:
think_buf += delta
while True:
if in_think:
end = think_buf.find("")
if end == -1:
think_buf = ""
break
think_buf = think_buf[end + 8:]
in_think = False
else:
start = think_buf.find("")
if start == -1:
yield think_buf
think_buf = ""
break
if start > 0:
yield think_buf[:start]
think_buf = think_buf[start + 7:]
in_think = True
else:
yield delta
except Exception:
continue
async def _horde_generate(prompt: str) -> str:
"""Submit a generation job to StableHorde and return the raw text."""
async with httpx.AsyncClient(timeout=300.0) as client:
r = await client.post(
f"{STABLEHORDE_API}/generate/text/async",
json={"prompt": prompt, "params": {**_HORDE_PARAMS}, "models": [STABLEHORDE_MODEL]},
headers={"apikey": STABLEHORDE_API_KEY},
)
r.raise_for_status()
job_id = r.json()["id"]
while True:
await asyncio.sleep(2.0)
r = await client.get(
f"{STABLEHORDE_API}/generate/text/status/{job_id}",
headers={"apikey": STABLEHORDE_API_KEY},
)
data = r.json()
if data.get("faulted"):
raise RuntimeError("StableHorde job faulted")
if data.get("done"):
gens = data.get("generations", [])
return gens[0]["text"] if gens else ""
def _parse_compact(text: str) -> list[dict]:
"""Parse compact TC: ... | S: ... | E: ... lines into dicts."""
cases = []
for line in text.splitlines():
line = line.strip()
if not line.upper().startswith("TC:"):
continue
parts = [p.strip() for p in line.split("|")]
if len(parts) < 3:
continue
title = parts[0][3:].strip()
steps_raw = next((p[2:].strip() for p in parts if p.strip().upper().startswith("S:")), "")
expect_raw = next((p[2:].strip() for p in parts if p.strip().upper().startswith("E:")), "")
steps = [s.strip() for s in steps_raw.split(";") if s.strip()]
expect = [e.strip() for e in expect_raw.split(";") if e.strip()]
if title and steps and expect:
cases.append({"title": title, "steps": steps, "expected_result": expect})
return cases
def _fmt_slack_case(i: int, tc: dict) -> str:
"""Format a single parsed test case as Slack markdown."""
lines = [f"*Test Case {i}: {tc['title']}*", "", "*Steps:*"]
for j, step in enumerate(tc["steps"], 1):
lines.append(f"{j}. {step}")
lines += ["", "*Expected Result:*"]
for exp in tc["expected_result"]:
lines.append(f"• {exp}")
lines += ["", "---", ""]
return "\n".join(lines)
def _make_prompt(system: str, user: str, assistant_prefix: str = "") -> str:
"""Format as ChatML for StableHorde workers.
assistant_prefix seeds the assistant turn to steer the model's output format.
"""
return (
f"<|im_start|>system\n{system}\n<|im_end|>\n"
f"<|im_start|>user\n{user}<|im_end|>\n"
f"<|im_start|>assistant\n{assistant_prefix}"
)
async def _generate_full(system: str, user: str, assistant_prefix: str = "") -> str:
"""Generate text — uses Groq if GROQ_API_KEY is set, else StableHorde."""
if GROQ_API_KEY:
return sanitize(await _groq_generate(system, user))
prompt = _make_prompt(system, user, assistant_prefix)
text = await _horde_generate(prompt)
return sanitize((assistant_prefix + text).strip())
async def _horde_stream(system: str, user: str, assistant_prefix: str = ""):
"""Async generator: streams response. Uses Groq if available, else StableHorde."""
if GROQ_API_KEY:
async for token in _groq_stream(system, user):
yield token
return
prompt = _make_prompt(system, user, assistant_prefix)
async with httpx.AsyncClient(timeout=300.0) as client:
r = await client.post(
f"{STABLEHORDE_API}/generate/text/async",
json={"prompt": prompt, "params": {**_HORDE_PARAMS}, "models": [STABLEHORDE_MODEL]},
headers={"apikey": STABLEHORDE_API_KEY},
)
r.raise_for_status()
job_id = r.json()["id"]
prev_pos = None
while True:
await asyncio.sleep(2.0)
r = await client.get(
f"{STABLEHORDE_API}/generate/text/status/{job_id}",
headers={"apikey": STABLEHORDE_API_KEY},
)
data = r.json()
if data.get("faulted"):
yield "❌ StableHorde generation faulted."
return
if data.get("done"):
gens = data.get("generations", [])
text = sanitize(gens[0]["text"]) if gens else ""
step = 20
for i in range(0, len(text), step):
yield text[i: i + step]
return
pos = data.get("queue_position")
wait = data.get("wait_time", "?")
if pos != prev_pos:
yield f"⏳ Queue position {pos} (~{wait}s)…\n"
prev_pos = pos
def _fmt_think(raw: str) -> str:
"""
Replace ... blocks with an italicised single-line summary.
An unclosed (model still reasoning) becomes _🤔 thinking..._
"""
result = re.sub(
r"(.*?)",
lambda m: (
"_" + " ".join(ln.strip() for ln in m.group(1).strip().splitlines() if ln.strip()) + "_\n\n"
if m.group(1).strip() else ""
),
raw,
flags=re.DOTALL,
)
open_idx = result.rfind("")
if open_idx != -1:
result = result[:open_idx] + "_🤔 thinking..._"
return result
# ── Request / response models ──────────────────────────────────────────────────
class GenerateRequest(BaseModel):
feature: str
team: str | None = None # casting | talent | agency | be | mobile
class TestCase(BaseModel):
title: str
steps: list[str]
expected_result: list[str]
@classmethod
def validate_list(cls, raw: list) -> tuple[list["TestCase"], list[str]]:
valid, errors = [], []
required = ("title", "steps", "expected_result")
for i, item in enumerate(raw):
if not isinstance(item, dict):
errors.append(f"item {i}: not an object")
continue
missing = [k for k in required if k not in item]
if missing:
errors.append(f"item {i}: missing {missing}")
continue
try:
valid.append(cls(**item))
except Exception as e:
errors.append(f"item {i}: {e}")
return valid, errors
# ── Routes ─────────────────────────────────────────────────────────────────────
@app.get("/")
async def root():
"""Root endpoint — HF Spaces probes this to detect a live container."""
ready = _models_ready.is_set() and _embedder is not None
return JSONResponse(
{"status": "ok" if ready else "starting", "service": "ai-qa-api"},
headers=CORS_HEADERS,
)
@app.get("/health")
async def health():
ready = _models_ready.is_set() and _embedder is not None
llm_backend = "groq" if GROQ_API_KEY else "stablehorde"
llm_model = GROQ_MODEL if GROQ_API_KEY else STABLEHORDE_MODEL
return JSONResponse(
{
"status": "ok" if ready else ("error" if _models_error else "starting"),
"embed_model": EMBED_MODEL,
"llm": llm_backend,
"model": llm_model,
**({"error": _models_error} if _models_error else {}),
},
headers=CORS_HEADERS,
)
@app.post("/generate")
async def generate_http(req: GenerateRequest):
feature = req.feature.strip()
if not feature:
return JSONResponse({"error": "feature is required"}, status_code=400, headers=CORS_HEADERS)
team = req.team.lower().strip() if req.team else None
if team and team not in TEAMS:
return JSONResponse(
{"error": f"unknown team '{team}'. Valid: {', '.join(TEAMS)}"},
status_code=400, headers=CORS_HEADERS,
)
try:
matches = await _retrieve(feature, team)
except Exception as e:
return JSONResponse({"error": f"retrieval failed: {e}"}, status_code=502, headers=CORS_HEADERS)
user_content = _build_prompt(feature, matches, team)
try:
raw = await _generate_full(SYSTEM_PROMPT_JSON, user_content, assistant_prefix="TC: ")
except Exception as e:
return JSONResponse({"error": f"generation failed: {e}"}, status_code=502, headers=CORS_HEADERS)
parsed = _parse_compact(raw)
valid_cases, schema_errors = TestCase.validate_list(parsed)
resp: dict = {
"feature": feature,
"team": team,
"test_cases": [tc.model_dump() for tc in valid_cases],
}
if not valid_cases:
resp["raw"] = raw
if schema_errors:
resp["warnings"] = schema_errors
return JSONResponse(resp, headers=CORS_HEADERS)
@app.post("/stream")
async def stream_sse(req: GenerateRequest):
feature = req.feature.strip()
if not feature:
return JSONResponse({"error": "feature is required"}, status_code=400, headers=CORS_HEADERS)
team = req.team.lower().strip() if req.team else None
if team and team not in TEAMS:
return JSONResponse(
{"error": f"unknown team '{team}'. Valid: {', '.join(TEAMS)}"},
status_code=400, headers=CORS_HEADERS,
)
try:
matches = await _retrieve(feature, team)
except Exception as e:
return JSONResponse({"error": f"retrieval failed: {e}"}, status_code=502, headers=CORS_HEADERS)
user_content = _build_prompt(feature, matches, team)
async def event_generator():
try:
async for token in _horde_stream(SYSTEM_PROMPT_STREAM, user_content):
yield f"data: {json.dumps({'type': 'output', 'token': token})}\n\n"
except Exception as e:
yield f"data: {json.dumps({'type': 'error', 'message': str(e)})}\n\n"
yield f"data: {json.dumps({'type': 'done'})}\n\n"
headers = {**CORS_HEADERS, "Cache-Control": "no-cache", "X-Accel-Buffering": "no"}
return StreamingResponse(event_generator(), media_type="text/event-stream", headers=headers)
# ── Slack /gentest slash command ───────────────────────────────────────────────
SLACK_BOT_TOKEN = os.environ.get("SLACK_BOT_TOKEN", "")
SLACK_SIGNING_SECRET = os.environ.get("SLACK_SIGNING_SECRET", "")
SLACK_API = "https://slack.com/api"
_MSG_LIMIT = 2_800
_SLACK_INTERVAL = 1.1 # seconds between chat.update calls (Slack rate-limit safe)
def _verify_slack(body: bytes, ts: str, sig: str) -> bool:
if not SLACK_SIGNING_SECRET:
return True
if abs(time.time() - int(ts)) > 300:
return False
base = f"v0:{ts}:{body.decode()}"
digest = hmac.new(SLACK_SIGNING_SECRET.encode(), base.encode(), hashlib.sha256).hexdigest()
return hmac.compare_digest(f"v0={digest}", sig)
def _split(text: str, limit: int = _MSG_LIMIT) -> tuple[str, str]:
if len(text) <= limit:
return text, ""
cut = text.rfind("\n", 0, limit)
cut = cut if cut > limit // 2 else limit
return text[:cut], text[cut:].lstrip()
async def _slack_post(
c: httpx.AsyncClient, ch: str, text: str, thread_ts: str | None = None
) -> str:
payload: dict = {"channel": ch, "text": text}
if thread_ts:
payload["thread_ts"] = thread_ts
r = await c.post(f"{SLACK_API}/chat.postMessage", json=payload)
d = r.json()
if not d.get("ok"):
raise RuntimeError(f"postMessage: {d.get('error')}")
return d["ts"]
async def _slack_update(c: httpx.AsyncClient, ch: str, ts: str, text: str):
await c.post(f"{SLACK_API}/chat.update", json={"channel": ch, "ts": ts, "text": text})
async def _run_gentest(feature: str, channel_id: str, team: str | None = None):
hdrs = {"Authorization": f"Bearer {SLACK_BOT_TOKEN}"}
async with httpx.AsyncClient(headers=hdrs, timeout=300.0) as c:
tag = f" _(team: {team})_" if team else ""
# ── Header — stays in main channel ────────────────────────────────────
header_ts = await _slack_post(c, channel_id, f"🧪 *Test cases:* {feature}{tag}")
# ── Live stream — threaded reply under the header ──────────────────────
cur_ts = await _slack_post(c, channel_id, "▌", thread_ts=header_ts)
last_update = time.time()
async def safe_update(ts: str, text: str) -> None:
nonlocal last_update
gap = _SLACK_INTERVAL - (time.time() - last_update)
if gap > 0:
await asyncio.sleep(gap)
await _slack_update(c, channel_id, ts, text)
last_update = time.time()
try:
matches = await _retrieve(feature, team)
user_content = _build_prompt(feature, matches, team)
# Stream and accumulate — update Slack every 2s with live preview
raw = ""
async for token in _horde_stream(SYSTEM_PROMPT_STREAM, user_content):
raw += token
if time.time() - last_update >= 2.0:
preview = raw[-_MSG_LIMIT:].lstrip() if len(raw) > _MSG_LIMIT else raw
await safe_update(cur_ts, preview.rstrip() + "\n▌")
if not raw:
await safe_update(cur_ts, "❌ No output received.")
return
# Parse into blocks (one per test case, separated by ---)
blocks = [b.strip() for b in re.split(r'\n-{3,}\n?', raw.strip()) if b.strip()]
if not blocks:
blocks = [raw.strip()]
def _is_complete(block: str) -> bool:
b = block.lower()
return ("step" in b or re.search(r'^\d+\.', block, re.MULTILINE)) \
and ("expected" in b or "•" in b)
truncated = len(blocks) > 1 and not _is_complete(blocks[-1])
if truncated:
blocks = blocks[:-1]
# Progressive reveal — reuse the single cursor message, only open new
# messages when the current one hits the Slack character limit
buf = ""
total = len(blocks)
for idx, block in enumerate(blocks):
entry = block + "\n\n---\n\n" if idx < total - 1 else block
if buf and len(buf) + len(entry) >= _MSG_LIMIT:
# Finalise current message without cursor, start fresh
await safe_update(cur_ts, buf.rstrip())
cur_ts = await _slack_post(c, channel_id, "▌", thread_ts=header_ts)
last_update = time.time()
buf = ""
buf += entry
is_last = idx == total - 1
await safe_update(cur_ts, buf.rstrip() if is_last else buf.rstrip() + "\n▌")
if truncated:
await _slack_post(
c, channel_id,
"⚠️ _Generation was cut off — last test case was incomplete and omitted. "
"Try a more specific feature description._",
thread_ts=header_ts)
except Exception as exc:
try:
await _slack_update(c, channel_id, cur_ts, f"❌ Error: {str(exc)[:300]}")
except Exception:
pass
@app.post("/gentest")
async def gentest_slack(request: Request, background_tasks: BackgroundTasks):
body = await request.body()
ts = request.headers.get("X-Slack-Request-Timestamp", "0")
sig = request.headers.get("X-Slack-Signature", "")
if not _verify_slack(body, ts, sig):
return Response(status_code=403)
from urllib.parse import parse_qs
params = parse_qs(body.decode(), keep_blank_values=True)
raw_text = (params.get("text", [""])[0] or "").strip()
channel_id = (params.get("channel_id", [""])[0] or "").strip()
if not raw_text:
return JSONResponse({"response_type": "ephemeral",
"text": f"Usage: `/gentest [team] `\nTeams: {', '.join(TEAMS)}"})
if not SLACK_BOT_TOKEN:
return JSONResponse({"response_type": "ephemeral",
"text": "❌ `SLACK_BOT_TOKEN` not configured."})
words = raw_text.split()
if words[0].lower() in TEAMS:
team = words[0].lower()
feature = " ".join(words[1:]).strip()
else:
team = None
feature = raw_text
if not feature:
return JSONResponse({"response_type": "ephemeral",
"text": "Please add a feature description after the team name."})
background_tasks.add_task(_run_gentest, feature, channel_id, team)
return Response(status_code=200)