Spaces:
Running on Zero
Running on Zero
| """Deterministic codebook analysis for coding-agent narrative traces.""" | |
| from __future__ import annotations | |
| import re | |
| from collections import Counter | |
| from datetime import datetime, timezone | |
| from pathlib import Path | |
| from typing import Iterable | |
| from model_runtime import MODEL_CHOICES, run_model_assist | |
| from parser import parse_trace | |
| from redaction import redact_text | |
| from schemas import AnalysisResult, DifficultyEpisode, MessageSpan, NarrativeMessage | |
| ANALYSIS_SCOPE = ( | |
| "assistant narrative messages only, with user prompts included only as optional context; " | |
| "raw tool-call contents are ignored by default" | |
| ) | |
| DIFFICULTY_SIGNALS = { | |
| "error", | |
| "failed", | |
| "failure", | |
| "fails", | |
| "problem", | |
| "issue", | |
| "bug", | |
| "blocked", | |
| "blocker", | |
| "cannot", | |
| "can't", | |
| "could not", | |
| "unclear", | |
| "ambiguous", | |
| "not sure", | |
| "risk", | |
| "regression", | |
| "however", | |
| "but", | |
| "unfortunately", | |
| "missing", | |
| "incomplete", | |
| "permission", | |
| "auth", | |
| "timeout", | |
| "dependency", | |
| "conflict", | |
| "mismatch", | |
| "unexpected", | |
| "verify", | |
| "verification", | |
| "test failing", | |
| } | |
| INTENTION_SIGNALS = { | |
| "i will", | |
| "i'll", | |
| "i am going to", | |
| "i'm going to", | |
| "next", | |
| "plan", | |
| "goal", | |
| "need to", | |
| "i need", | |
| "i should", | |
| "let me", | |
| "i'm checking", | |
| "i'm going", | |
| "i will inspect", | |
| "i'll inspect", | |
| } | |
| SHIFT_SIGNALS = { | |
| "instead", | |
| "alternative", | |
| "switch", | |
| "change approach", | |
| "narrow", | |
| "smaller", | |
| "decompose", | |
| "split", | |
| "break down", | |
| "rollback", | |
| "revert", | |
| "try another", | |
| "workaround", | |
| "safer", | |
| "different route", | |
| "verify with", | |
| } | |
| OUTCOME_SIGNALS = { | |
| "done", | |
| "fixed", | |
| "resolved", | |
| "complete", | |
| "implemented", | |
| "verified", | |
| "passes", | |
| "works", | |
| "unable", | |
| "could not", | |
| "still failing", | |
| "needs verification", | |
| "not verified", | |
| "caveat", | |
| "partial", | |
| "partially", | |
| } | |
| def analyze_trace_file( | |
| path: str | Path, | |
| *, | |
| include_user_context: bool = True, | |
| redact_secrets: bool = True, | |
| ignore_tool_calls: bool = True, | |
| report_style: str = "field_notes", | |
| analysis_engine: str = "deterministic", | |
| hf_token: str | None = None, | |
| ) -> tuple[AnalysisResult, str]: | |
| """Parse, optionally redact, and analyze an uploaded trace file.""" | |
| parsed_messages, agent_type = parse_trace( | |
| path, | |
| include_user_context=include_user_context, | |
| ignore_tool_calls=ignore_tool_calls, | |
| ) | |
| redaction_count = 0 | |
| privacy_notes = [ | |
| "Uploaded traces are processed for this request only; the app exports a redacted narrative text file.", | |
| "The analysis uses visible messages and does not inspect hidden reasoning.", | |
| ] | |
| if ignore_tool_calls: | |
| privacy_notes.append("Tool-call contents were ignored before analysis.") | |
| messages = parsed_messages | |
| if redact_secrets: | |
| redacted_messages: list[NarrativeMessage] = [] | |
| all_notes: Counter[str] = Counter() | |
| for message in parsed_messages: | |
| result = redact_text(message.text) | |
| redaction_count += result.count | |
| for note in result.notes: | |
| label, _, count = note.partition(": ") | |
| all_notes[label] += int(count or 0) | |
| redacted_messages.append( | |
| NarrativeMessage( | |
| index=message.index, | |
| role=message.role, | |
| text=result.text, | |
| timestamp=message.timestamp, | |
| source=message.source, | |
| ) | |
| ) | |
| messages = redacted_messages | |
| if all_notes: | |
| privacy_notes.append( | |
| "Redactions applied: " | |
| + ", ".join(f"{label} ({count})" for label, count in sorted(all_notes.items())) | |
| + "." | |
| ) | |
| else: | |
| privacy_notes.append("No likely secrets matched the built-in redaction patterns.") | |
| else: | |
| privacy_notes.append("Secret redaction was disabled by the user.") | |
| episodes = identify_episodes(messages) | |
| result = AnalysisResult( | |
| trace_title=derive_trace_title(path, agent_type), | |
| agent_type_guess=agent_type, | |
| analysis_scope=ANALYSIS_SCOPE, | |
| privacy_notes=privacy_notes, | |
| episodes=episodes, | |
| overall_patterns=summarize_patterns(episodes, messages), | |
| narrative_message_count=len(messages), | |
| redaction_count=redaction_count, | |
| engine="deterministic-codebook", | |
| ) | |
| narrative_text = render_redacted_narrative(messages) | |
| if analysis_engine != "deterministic": | |
| if analysis_engine not in MODEL_CHOICES: | |
| result.model_notes.append( | |
| f"Unknown analysis engine {analysis_engine!r}; deterministic analysis was returned." | |
| ) | |
| else: | |
| try: | |
| assist = run_model_assist( | |
| engine=analysis_engine, | |
| result=result, | |
| narrative_text=narrative_text, | |
| token=hf_token, | |
| ) | |
| except Exception as exc: | |
| result.model_notes.append( | |
| "Small-model assist was requested but unavailable: " | |
| f"{type(exc).__name__}: {exc}. Deterministic analysis was returned." | |
| ) | |
| else: | |
| result.engine = f"deterministic-codebook + {assist.model_id}" | |
| result.model_memo = assist.memo | |
| result.model_notes.append(assist.note) | |
| return result, narrative_text | |
| def derive_trace_title(path: str | Path, agent_type: str) -> str: | |
| stem = Path(path).stem if path else "uploaded trace" | |
| readable_agent = { | |
| "codex": "Codex", | |
| "claude_code": "Claude Code", | |
| "pi": "Pi Agent", | |
| "unknown": "Agent", | |
| }.get(agent_type, "Agent") | |
| return f"{readable_agent} trace: {stem}" | |
| def identify_episodes(messages: list[NarrativeMessage]) -> list[DifficultyEpisode]: | |
| assistant_indexes = [message.index for message in messages if message.role == "assistant"] | |
| if not assistant_indexes: | |
| return [] | |
| candidate_spans: list[tuple[int, int]] = [] | |
| for index, message in enumerate(messages): | |
| if message.role != "assistant": | |
| continue | |
| score = signal_score(message.text, DIFFICULTY_SIGNALS) | |
| score += 1 if signal_score(message.text, SHIFT_SIGNALS) else 0 | |
| if score < 2: | |
| continue | |
| start = previous_assistant_index(messages, index, max_distance=2) | |
| end = next_episode_end(messages, index, max_distance=3) | |
| candidate_spans.append((start, end)) | |
| if not candidate_spans: | |
| return [] | |
| merged_spans = merge_spans(candidate_spans) | |
| episodes: list[DifficultyEpisode] = [] | |
| for episode_number, (start, end) in enumerate(merged_spans[:12], start=1): | |
| span_messages = [ | |
| message | |
| for message in messages | |
| if start <= message.index <= end and message.role == "assistant" | |
| ] | |
| if not span_messages: | |
| continue | |
| episodes.append(build_episode(episode_number, start, end, span_messages)) | |
| return episodes | |
| def previous_assistant_index( | |
| messages: list[NarrativeMessage], | |
| index: int, | |
| *, | |
| max_distance: int, | |
| ) -> int: | |
| start = messages[index].index | |
| for position in range(index - 1, max(-1, index - max_distance - 1), -1): | |
| if messages[position].role == "assistant" and signal_score(messages[position].text, INTENTION_SIGNALS): | |
| start = messages[position].index | |
| break | |
| return start | |
| def next_episode_end( | |
| messages: list[NarrativeMessage], | |
| index: int, | |
| *, | |
| max_distance: int, | |
| ) -> int: | |
| end = messages[index].index | |
| for position in range(index, min(len(messages), index + max_distance + 1)): | |
| if messages[position].role != "assistant": | |
| continue | |
| end = messages[position].index | |
| if position > index and signal_score(messages[position].text, OUTCOME_SIGNALS): | |
| break | |
| return end | |
| def merge_spans(spans: Iterable[tuple[int, int]]) -> list[tuple[int, int]]: | |
| ordered = sorted(spans) | |
| merged: list[tuple[int, int]] = [] | |
| for start, end in ordered: | |
| if not merged or start > merged[-1][1] + 1: | |
| merged.append((start, end)) | |
| else: | |
| prev_start, prev_end = merged[-1] | |
| merged[-1] = (prev_start, max(prev_end, end)) | |
| return merged | |
| def build_episode( | |
| episode_number: int, | |
| start: int, | |
| end: int, | |
| span_messages: list[NarrativeMessage], | |
| ) -> DifficultyEpisode: | |
| combined = "\n\n".join(message.text for message in span_messages) | |
| difficulty_sentence = first_sentence_with(combined, DIFFICULTY_SIGNALS) or first_sentence(combined) | |
| intention = first_sentence_with(combined, INTENTION_SIGNALS) or first_sentence(combined) | |
| shift_sentence = first_sentence_with(combined, SHIFT_SIGNALS) | |
| outcome_sentence = first_sentence_with(combined, OUTCOME_SIGNALS) | |
| difficulty_type = classify_difficulty(combined) | |
| appraisal = classify_appraisal(combined) | |
| detour_type = classify_detour(combined) | |
| resolution_mode = classify_resolution(combined) | |
| outcome_claim = classify_outcome(combined) | |
| recovery_pattern = classify_recovery(combined, detour_type, outcome_claim) | |
| productive_detour = classify_productive_detour(detour_type, outcome_claim, recovery_pattern) | |
| title = make_episode_title(difficulty_type, difficulty_sentence) | |
| evidence = compact_quotes([difficulty_sentence, shift_sentence, outcome_sentence]) | |
| return DifficultyEpisode( | |
| episode_id=f"E{episode_number:02d}", | |
| title=title, | |
| message_span=MessageSpan( | |
| start_index=start, | |
| end_index=end, | |
| start_time=span_messages[0].timestamp, | |
| end_time=span_messages[-1].timestamp, | |
| duration_label=duration_label(span_messages[0].timestamp, span_messages[-1].timestamp), | |
| ), | |
| initial_intention=trim_sentence(intention, max_words=36), | |
| reported_difficulty=trim_sentence(difficulty_sentence, max_words=40), | |
| difficulty_type=difficulty_type, | |
| appraisal=appraisal, | |
| strategy_before=infer_strategy_before(combined), | |
| strategy_after=trim_sentence(shift_sentence or outcome_sentence or "No explicit strategy shift was visible.", max_words=36), | |
| detour_type=detour_type, | |
| resolution_mode=resolution_mode, | |
| recovery_pattern=recovery_pattern, | |
| outcome_claim=outcome_claim, | |
| productive_detour=productive_detour, | |
| evidence_quotes=evidence, | |
| analyst_memo=make_analyst_memo( | |
| difficulty_type, | |
| appraisal, | |
| detour_type, | |
| recovery_pattern, | |
| outcome_claim, | |
| ), | |
| ) | |
| def signal_score(text: str, signals: set[str]) -> int: | |
| lowered = text.lower() | |
| return sum(1 for signal in signals if signal in lowered) | |
| def first_sentence(text: str) -> str: | |
| return split_sentences(text)[0] if split_sentences(text) else "" | |
| def first_sentence_with(text: str, signals: set[str]) -> str: | |
| for sentence in split_sentences(text): | |
| if signal_score(sentence, signals): | |
| return sentence | |
| return "" | |
| def split_sentences(text: str) -> list[str]: | |
| normalized = re.sub(r"\s+", " ", text).strip() | |
| if not normalized: | |
| return [] | |
| parts = re.split(r"(?<=[.!?])\s+|\n+", normalized) | |
| return [part.strip(" -") for part in parts if part.strip(" -")] | |
| def classify_difficulty(text: str) -> str: | |
| lowered = text.lower() | |
| checks = [ | |
| ("environment_blocker", ("dependency", "install", "permission", "auth", "network", "timeout", "sandbox", "environment", "ci", "build fail")), | |
| ("verification_difficulty", ("verify", "verification", "test", "reproduce", "confirmed", "validate", "cannot run", "not able to run")), | |
| ("compatibility_risk", ("regression", "break", "compatibility", "existing behavior", "side effect", "risk", "backward")), | |
| ("requirement_uncertainty", ("requirement", "spec", "unclear", "ambiguous", "user intent", "not specified", "scope unclear")), | |
| ("localization_difficulty", ("where", "locate", "which file", "module", "root cause", "grep", "search", "trace through")), | |
| ("architecture_complexity", ("architecture", "dependency", "shared", "coupling", "system structure", "cross-module", "data flow")), | |
| ("implementation_difficulty", ("implement", "tricky", "complex", "not sure how", "hard to", "edge case")), | |
| ("insufficient_context", ("more context", "missing context", "need context", "cannot inspect", "not enough information")), | |
| ("conflicting_assumptions", ("assumption", "expected", "actually", "mismatch", "conflict", "turns out")), | |
| ] | |
| return first_matching_code(lowered, checks) | |
| def classify_appraisal(text: str) -> str: | |
| lowered = text.lower() | |
| checks = [ | |
| ("cannot_reliably_verify", ("cannot verify", "can't verify", "not verified", "need to verify", "cannot run", "unable to run")), | |
| ("needs_more_context", ("need more context", "need to inspect", "need more information", "missing context")), | |
| ("initial_hypothesis_wrong", ("hypothesis", "assumption", "i thought", "turns out", "actually")), | |
| ("risk_is_higher_than_expected", ("risk", "regression", "side effect", "break existing", "higher than expected")), | |
| ("scope_too_large", ("too large", "scope", "narrow", "smaller", "limit this")), | |
| ("needs_alternative_path", ("alternative", "instead", "different approach", "try another", "workaround")), | |
| ("task_boundary_unclear", ("boundary", "unclear", "ambiguous", "not specified")), | |
| ("local_fix_possible", ("local", "small patch", "focused", "straightforward", "fix")), | |
| ] | |
| return first_matching_code(lowered, checks) | |
| def classify_detour(text: str) -> str: | |
| lowered = text.lower() | |
| checks = [ | |
| ("premature_closure", ("done", "complete", "fixed", "should work")), | |
| ("rollback_or_reversal", ("rollback", "roll back", "revert", "abandon", "undo")), | |
| ("verification_shift", ("verify with", "instead test", "different verification", "check by", "validate by")), | |
| ("hypothesis_switch", ("new hypothesis", "different hypothesis", "assumption was", "turns out")), | |
| ("workaround", ("workaround", "bypass", "skip the issue", "without fixing", "temporary fix")), | |
| ("scope_narrowing", ("narrow", "smaller", "limit", "focus only", "minimal")), | |
| ("decomposition", ("decompose", "break down", "split", "step by step")), | |
| ("alternative_path", ("alternative", "instead", "switch", "try another", "different approach")), | |
| ] | |
| code = first_matching_code(lowered, checks) | |
| if code == "premature_closure" and signal_score(text, DIFFICULTY_SIGNALS) >= 2: | |
| return "premature_closure" | |
| if code == "premature_closure": | |
| return "direct_continuation" | |
| return code if code != "unknown" else "direct_continuation" | |
| def classify_resolution(text: str) -> str: | |
| lowered = text.lower() | |
| checks = [ | |
| ("explicit_limitation", ("could not", "unable", "limitation", "caveat", "not verified")), | |
| ("goal_reduction", ("partial", "partially", "narrow", "smaller scope", "only")), | |
| ("structural_change", ("refactor", "architecture", "new module", "extract", "centralize", "schema")), | |
| ("defensive_handling", ("guard", "validate", "fallback", "error handling", "defensive", "sanitize")), | |
| ("alternative_implementation", ("alternative implementation", "different implementation", "switch to", "instead")), | |
| ("problem_reframing", ("reframe", "actually", "not a", "instead of treating")), | |
| ("information_gathering", ("inspect", "search", "read", "looked at", "context")), | |
| ("minimal_patch", ("small patch", "focused change", "minimal", "fix")), | |
| ] | |
| return first_matching_code(lowered, checks) | |
| def classify_outcome(text: str) -> str: | |
| lowered = text.lower() | |
| if any(token in lowered for token in ("not resolved", "still failing", "could not", "unable to")): | |
| return "not_resolved" | |
| if any(token in lowered for token in ("need to verify", "needs verification", "not verified", "cannot verify", "can't verify")): | |
| return "needs_verification" | |
| if any(token in lowered for token in ("partial", "partially", "some of", "subset")): | |
| return "partially_resolved" | |
| if any(token in lowered for token in ("caveat", "assuming", "should", "likely", "not run")) and any( | |
| token in lowered for token in ("done", "fixed", "implemented", "resolved", "complete") | |
| ): | |
| return "resolved_with_caveat" | |
| if any(token in lowered for token in ("done", "fixed", "implemented", "resolved", "complete", "verified", "passes")): | |
| if signal_score(text, DIFFICULTY_SIGNALS) >= 3 and "verified" not in lowered and "passes" not in lowered: | |
| return "premature_success_claim" | |
| return "resolved_with_confidence" | |
| if any(token in lowered for token in ("uncertain", "not sure", "proceed")): | |
| return "uncertain_but_proceeding" | |
| return "unknown" | |
| def classify_recovery(text: str, detour_type: str, outcome_claim: str) -> str: | |
| lowered = text.lower() | |
| if outcome_claim in {"not_resolved", "needs_verification"}: | |
| return "failed_recovery" if outcome_claim == "not_resolved" else "partial_recovery" | |
| if outcome_claim == "premature_success_claim": | |
| return "overconfident_recovery" | |
| if any(token in lowered for token in ("assumption", "turns out", "actually", "hypothesis")): | |
| return "reflective_recovery" | |
| if detour_type in {"alternative_path", "workaround", "scope_narrowing", "verification_shift"}: | |
| return "detour_recovery" | |
| if any(token in lowered for token in ("retry", "again", "iterate", "second", "another attempt")): | |
| return "iterative_recovery" | |
| if outcome_claim in {"resolved_with_confidence", "resolved_with_caveat"}: | |
| return "smooth_recovery" | |
| return "unknown" | |
| def classify_productive_detour(detour_type: str, outcome_claim: str, recovery_pattern: str) -> str: | |
| if detour_type in {"direct_continuation", "unknown"}: | |
| return "unknown" | |
| if recovery_pattern in {"overconfident_recovery", "failed_recovery", "avoidant_recovery"}: | |
| return "no" | |
| if outcome_claim in {"partially_resolved", "needs_verification", "resolved_with_caveat"}: | |
| return "mixed" | |
| return "yes" | |
| def first_matching_code(lowered_text: str, checks: list[tuple[str, tuple[str, ...]]]) -> str: | |
| for code, needles in checks: | |
| if any(needle in lowered_text for needle in needles): | |
| return code | |
| return "unknown" | |
| def infer_strategy_before(text: str) -> str: | |
| sentence = first_sentence_with(text, INTENTION_SIGNALS) | |
| if sentence: | |
| return trim_sentence(sentence, max_words=36) | |
| return "The agent appears to continue from the prior task context." | |
| def make_episode_title(difficulty_type: str, sentence: str) -> str: | |
| label = difficulty_type.replace("_", " ").title() | |
| topic = trim_sentence(sentence, max_words=8) | |
| return f"{label}: {topic}" if topic else label | |
| def compact_quotes(sentences: list[str | None]) -> list[str]: | |
| quotes: list[str] = [] | |
| seen: set[str] = set() | |
| for sentence in sentences: | |
| if not sentence: | |
| continue | |
| quote = trim_sentence(sentence, max_words=30) | |
| if quote and quote not in seen: | |
| quotes.append(quote) | |
| seen.add(quote) | |
| return quotes[:3] | |
| def trim_sentence(text: str, *, max_words: int) -> str: | |
| words = re.sub(r"\s+", " ", text or "").strip().split() | |
| if len(words) <= max_words: | |
| return " ".join(words) | |
| return " ".join(words[:max_words]).rstrip(",.;:") + "..." | |
| def make_analyst_memo( | |
| difficulty_type: str, | |
| appraisal: str, | |
| detour_type: str, | |
| recovery_pattern: str, | |
| outcome_claim: str, | |
| ) -> str: | |
| return ( | |
| f"The visible narrative frames this as {difficulty_type.replace('_', ' ')}; " | |
| f"the appraisal is {appraisal.replace('_', ' ')}, with {detour_type.replace('_', ' ')} " | |
| f"and {recovery_pattern.replace('_', ' ')}. The outcome claim reads as " | |
| f"{outcome_claim.replace('_', ' ')}." | |
| ) | |
| def summarize_patterns( | |
| episodes: list[DifficultyEpisode], | |
| messages: list[NarrativeMessage], | |
| ) -> dict[str, str]: | |
| if not episodes: | |
| return { | |
| "difficulty_style": "No explicit difficulty episode was detected in the visible assistant narrative.", | |
| "detour_style": "No strategy shift or detour was visible enough to classify.", | |
| "recovery_style": "No recovery pattern can be inferred from the available narrative.", | |
| "risk_or_caveat": "The analyzer only inspects visible narrative messages, so absence of evidence is not proof that the session was difficulty-free.", | |
| } | |
| difficulty_counts = Counter(episode.difficulty_type for episode in episodes) | |
| detour_counts = Counter(episode.detour_type for episode in episodes) | |
| recovery_counts = Counter(episode.recovery_pattern for episode in episodes) | |
| outcome_counts = Counter(episode.outcome_claim for episode in episodes) | |
| primary_difficulty = readable_count_summary(difficulty_counts) | |
| primary_detour = readable_count_summary(detour_counts) | |
| primary_recovery = readable_count_summary(recovery_counts) | |
| risky = [ | |
| episode.episode_id | |
| for episode in episodes | |
| if episode.outcome_claim in {"needs_verification", "premature_success_claim", "not_resolved"} | |
| ] | |
| caveat = ( | |
| f"Watch {', '.join(risky)}: these episodes end with unresolved, unverifiable, or overconfident claims." | |
| if risky | |
| else f"Outcome claims lean toward {readable_count_summary(outcome_counts)}." | |
| ) | |
| return { | |
| "difficulty_style": f"Main difficulty pattern: {primary_difficulty}.", | |
| "detour_style": f"Main detour pattern: {primary_detour}.", | |
| "recovery_style": f"Main recovery pattern: {primary_recovery}.", | |
| "risk_or_caveat": caveat, | |
| } | |
| def readable_count_summary(counter: Counter[str]) -> str: | |
| if not counter: | |
| return "unknown" | |
| return ", ".join(f"{code.replace('_', ' ')} ({count})" for code, count in counter.most_common(3)) | |
| def duration_label(start_time: str | None, end_time: str | None) -> str: | |
| if not start_time or not end_time: | |
| return "unknown" | |
| start = parse_timestamp(start_time) | |
| end = parse_timestamp(end_time) | |
| if not start or not end or end < start: | |
| return "unknown" | |
| seconds = int((end - start).total_seconds()) | |
| if seconds < 60: | |
| return f"{seconds}s" | |
| minutes, secs = divmod(seconds, 60) | |
| if minutes < 60: | |
| return f"{minutes}m {secs}s" | |
| hours, minutes = divmod(minutes, 60) | |
| return f"{hours}h {minutes}m" | |
| def parse_timestamp(value: str) -> datetime | None: | |
| text = value.strip() | |
| if text.endswith("Z"): | |
| text = text[:-1] + "+00:00" | |
| try: | |
| parsed = datetime.fromisoformat(text) | |
| except ValueError: | |
| return None | |
| if parsed.tzinfo is None: | |
| return parsed.replace(tzinfo=timezone.utc) | |
| return parsed | |
| def render_redacted_narrative(messages: list[NarrativeMessage]) -> str: | |
| blocks: list[str] = [] | |
| for message in messages: | |
| timestamp = f" [{message.timestamp}]" if message.timestamp else "" | |
| blocks.append(f"## {message.index:04d} {message.role}{timestamp}\n\n{message.text}") | |
| return "\n\n".join(blocks).strip() + "\n" | |