ropedia-xperience-10m-task-baselines / scripts /omni /build_task_suite_enhancement_128.py
cy0307's picture
Publish Ropedia Xperience-10M task baseline cards
7c58b77 verified
Raw
History Blame
27.2 kB
#!/usr/bin/env python3
"""Build a public-safe 128-episode task-suite enhancement pack.
This does not train a new model and does not overwrite prior result packages.
It turns the current verified 128-episode evidence into concrete next-run
contracts: dense-window sizing, hierarchical action targets, bottleneck
priorities, and experiment cards for pushing the task suite harder without
adding more raw episodes.
"""
from __future__ import annotations
import argparse
import csv
import json
import statistics
from collections import Counter, defaultdict
from datetime import datetime, timezone
from pathlib import Path
from typing import Any
ROOT = Path(__file__).resolve().parents[2]
DEFAULT_RUN_ID = "task_suite_enhancement_128_v1_20260608"
DEFAULT_OUTPUT_ROOT = ROOT / "results/omni_finetune"
PUBLIC_JSON = ROOT / "docs/data/task_suite_enhancement_128.json"
PUBLIC_MD = ROOT / "TASK_SUITE_ENHANCEMENT_128.md"
WINDOWS_CSV = ROOT / "results/omni_finetune/multi_episode_128_task_baselines/windows.csv"
BASELINE_SUMMARY = ROOT / "results/omni_finetune/multi_episode_128_task_baselines/summary_report.json"
QWEN_V4_METRICS = (
ROOT
/ "results/omni_finetune/verified_public/"
/ "xperience10m_qwen3_omni_128ep_structured_json_v4_4epoch_full8gpu_lora_eval_test_full/eval/metrics.json"
)
QWEN_V4_PREDICTIONS = (
ROOT
/ "results/omni_finetune/verified_public/"
/ "xperience10m_qwen3_omni_128ep_structured_json_v4_4epoch_full8gpu_lora_eval_test_full/eval/predictions.jsonl"
)
COSMOS_FD_SUMMARY = (
ROOT
/ "results/omni_finetune/verified_public/"
/ "xperience10m_cosmos3_super_forward_dynamics_lora_128ep_train1epoch_256_attn_full8gpu_20260608_eval_test_full_fsdp/"
/ "verified_result_summary.json"
)
ACTION_FAMILY_PATTERNS = [
("locomotion", ["walk", "enter", "approach", "move through", "move towards", "arrive"]),
("reach_grasp_release", ["reach", "grasp", "pick up", "hold", "release"]),
("place_arrange_align", ["place", "arrange", "align", "position", "set"]),
("manipulate_adjust", ["manipulate", "adjust", "bend", "fold", "open", "close", "secure"]),
("tool_cut_mark_write", ["cut", "mark", "draw", "write", "scissor", "knife", "pen", "marker"]),
("sort_count_organize", ["sort", "count", "organize", "bundle", "gather"]),
("inspect_observe_use", ["inspect", "observe", "browse", "use", "operate", "check", "examine"]),
("clean_cook", ["clean", "wipe", "rinse", "wash", "stir", "pot", "stove"]),
]
TASK_PRIORITIES = {
"timeline_action": "highest",
"timeline_subtask": "highest",
"next_action": "highest",
"hand_trajectory_forecast": "high",
"cross_modal_retrieval": "high",
"modality_reconstruction": "high",
"misalignment_detection": "high",
"object_relevance": "medium",
"caption_grounding": "medium",
"temporal_order": "medium",
"contact_prediction": "medium",
"transition_detection": "medium",
}
def read_json(path: Path) -> dict[str, Any]:
return json.loads(path.read_text(encoding="utf-8"))
def load_jsonl(path: Path) -> list[dict[str, Any]]:
rows = []
with path.open(encoding="utf-8") as handle:
for line in handle:
line = line.strip()
if line:
rows.append(json.loads(line))
return rows
def write_json(path: Path, payload: dict[str, Any]) -> None:
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(json.dumps(payload, indent=2) + "\n", encoding="utf-8")
def load_windows(path: Path) -> list[dict[str, Any]]:
rows: list[dict[str, Any]] = []
with path.open(encoding="utf-8", newline="") as handle:
reader = csv.DictReader(handle)
for row in reader:
row["start_frame"] = int(row["start_frame"])
row["end_frame"] = int(row["end_frame"])
rows.append(row)
return rows
def action_family(label: str | None) -> str:
text = (label or "unknown").strip().lower()
if not text or text == "unknown":
return "unknown"
for family, needles in ACTION_FAMILY_PATTERNS:
if any(needle in text for needle in needles):
return family
return "other_fine_action"
def split_counts(rows: list[dict[str, Any]]) -> dict[str, int]:
return dict(sorted(Counter(row["split"] for row in rows).items()))
def episode_counts(rows: list[dict[str, Any]]) -> dict[str, int]:
by_split: dict[str, set[str]] = defaultdict(set)
for row in rows:
by_split[row["split"]].add(row["episode_id"])
return {split: len(episodes) for split, episodes in sorted(by_split.items())}
def dense_window_estimates(rows: list[dict[str, Any]]) -> list[dict[str, Any]]:
by_episode: dict[str, list[dict[str, Any]]] = defaultdict(list)
for row in rows:
by_episode[row["episode_id"]].append(row)
current_total = len(rows)
scenarios = [
{
"id": "current_export",
"window_frames": 20,
"stride_frames": "selected_sparse_windows",
"role": "current public 128-episode JSON-task export",
},
{
"id": "dense_20f_stride20",
"window_frames": 20,
"stride_frames": 20,
"role": "non-overlap dense coverage over each observed episode frame span",
},
{
"id": "dense_20f_stride10",
"window_frames": 20,
"stride_frames": 10,
"role": "2x overlap action/subtask densification",
},
{
"id": "dense_20f_stride5",
"window_frames": 20,
"stride_frames": 5,
"role": "high-overlap action boundary and transition stress setting",
},
{
"id": "medium_40f_stride20",
"window_frames": 40,
"stride_frames": 20,
"role": "subtask/procedure context window",
},
{
"id": "long_80f_stride40",
"window_frames": 80,
"stride_frames": 40,
"role": "procedure and world-model context window",
},
]
records: list[dict[str, Any]] = []
for scenario in scenarios:
if scenario["id"] == "current_export":
counts = Counter(row["split"] for row in rows)
total = current_total
else:
counts: Counter[str] = Counter()
for episode_rows in by_episode.values():
max_frame = max(row["end_frame"] for row in episode_rows) + 1
window = int(scenario["window_frames"])
stride = int(scenario["stride_frames"])
count = max(0, ((max_frame - window) // stride) + 1) if max_frame >= window else 0
counts[episode_rows[0]["split"]] += count
total = sum(counts.values())
records.append(
{
**scenario,
"estimated_windows": total,
"estimated_split_windows": dict(sorted(counts.items())),
"multiplier_vs_current_export": round(total / current_total, 2) if current_total else None,
"source_note": "Estimated from current public-safe window frame spans; the real exporter must still validate raw-stream availability and label coverage.",
}
)
multiscale = {
"id": "multiscale_20s10_40s20_80s40",
"role": "recommended no-new-episode v5 export: short action windows plus medium/long procedure context",
"components": ["dense_20f_stride10", "medium_40f_stride20", "long_80f_stride40"],
}
component_records = {record["id"]: record for record in records}
total = sum(component_records[item]["estimated_windows"] for item in multiscale["components"])
split_total: Counter[str] = Counter()
for item in multiscale["components"]:
split_total.update(component_records[item]["estimated_split_windows"])
records.append(
{
**multiscale,
"estimated_windows": total,
"estimated_split_windows": dict(sorted(split_total.items())),
"multiplier_vs_current_export": round(total / current_total, 2) if current_total else None,
"source_note": "Composite planning estimate; store as a new export run rather than replacing existing 128-episode packages.",
}
)
return records
def task_bottlenecks(summary: dict[str, Any]) -> list[dict[str, Any]]:
records = []
for task in summary.get("tasks", []):
simple = task.get("simple") or {}
neural = task.get("neural") or {}
simple_status = simple.get("status", "missing")
score = simple.get("primary_score")
unseen = simple.get("unseen_test_class_count")
classes = simple.get("num_classes")
if simple_status.startswith("unsupported"):
bottleneck = "missing raw 128-episode feature blocks"
next_action = "export compact raw-feature shards for this task before model comparison"
elif unseen:
bottleneck = "fine-grained label explosion and held-out unseen labels"
next_action = "add hierarchical action/subtask families plus label-normalized scoring"
elif score is not None and float(score) < 0.05:
bottleneck = "weak public-safe metadata/text baseline"
next_action = "add dense windows and stronger fusion baselines before interpreting model quality"
elif task.get("task") in {"contact_prediction", "transition_detection", "temporal_order"}:
bottleneck = "usable control task"
next_action = "keep as sanity/control metric for future dense-window and model runs"
else:
bottleneck = "moderate task signal, still needs robustness split"
next_action = "add session/task-family slices and bootstrap confidence intervals"
records.append(
{
"task": task.get("task"),
"display_name": summary.get("task_display_names", {}).get(task.get("task"), task.get("task")),
"priority": TASK_PRIORITIES.get(task.get("task"), "medium"),
"simple_status": simple_status,
"simple_primary_metric": simple.get("primary_metric"),
"simple_primary_score": score,
"neural_status": neural.get("status", "not_run") if neural else "not_run",
"neural_primary_score": neural.get("primary_score") if neural else None,
"num_classes": classes,
"unseen_test_class_count": unseen,
"bottleneck": bottleneck,
"next_action": next_action,
}
)
priority_order = {"highest": 0, "high": 1, "medium": 2, "low": 3}
return sorted(records, key=lambda item: (priority_order.get(item["priority"], 9), item["task"] or ""))
def qwen_error_summary(metrics: dict[str, Any], predictions: list[dict[str, Any]]) -> dict[str, Any]:
families: dict[str, dict[str, Any]] = defaultdict(lambda: {
"samples": 0,
"action_exact": 0,
"seen_samples": 0,
"unseen_samples": 0,
"contact_exact": 0,
"transition_exact": 0,
})
object_counts: Counter[str] = Counter()
for row in predictions:
true_json = row.get("true_json") or {}
pred_json = row.get("pred_json") or {}
family = action_family(true_json.get("action") or row.get("true_label"))
bucket = families[family]
bucket["samples"] += 1
bucket["action_exact"] += int((true_json.get("action") or row.get("true_label")) == pred_json.get("action"))
bucket["seen_samples"] += int(bool(row.get("true_label_seen_in_train")))
bucket["unseen_samples"] += int(not bool(row.get("true_label_seen_in_train")))
bucket["contact_exact"] += int(true_json.get("contact") == pred_json.get("contact"))
bucket["transition_exact"] += int(true_json.get("transition") == pred_json.get("transition"))
object_counts.update(str(item).lower() for item in true_json.get("objects") or [])
family_records = []
for family, bucket in families.items():
samples = bucket["samples"]
family_records.append(
{
"family": family,
"samples": samples,
"action_exact_rate": bucket["action_exact"] / samples if samples else 0,
"seen_share": bucket["seen_samples"] / samples if samples else 0,
"contact_exact_rate": bucket["contact_exact"] / samples if samples else 0,
"transition_exact_rate": bucket["transition_exact"] / samples if samples else 0,
}
)
family_records.sort(key=lambda item: (-item["samples"], item["family"]))
eval_label_counts = metrics.get("eval_label_counts") or {}
singleton_labels = sum(1 for value in eval_label_counts.values() if value == 1)
return {
"run_id": metrics.get("run_id"),
"samples": metrics.get("num_samples"),
"json_validity_rate": metrics.get("json_validity_rate"),
"action_macro_f1": metrics.get("action_macro_f1"),
"subtask_accuracy": metrics.get("subtask_accuracy"),
"next_action_accuracy": metrics.get("next_action_accuracy"),
"contact_accuracy": metrics.get("contact_accuracy"),
"transition_accuracy": metrics.get("transition_accuracy"),
"object_micro_f1": metrics.get("object_micro_f1"),
"num_unseen_label_samples": metrics.get("num_unseen_label_samples"),
"unseen_label_sample_share": (
metrics.get("num_unseen_label_samples") / metrics.get("num_samples")
if metrics.get("num_samples")
else None
),
"seen_label_accuracy": metrics.get("seen_label_accuracy"),
"unseen_label_accuracy": metrics.get("unseen_label_accuracy"),
"eval_unique_labels": len(eval_label_counts),
"eval_singleton_label_count": singleton_labels,
"eval_singleton_label_share": singleton_labels / len(eval_label_counts) if eval_label_counts else None,
"action_family_error_summary": family_records,
"top_true_objects": [{"object": item, "count": count} for item, count in object_counts.most_common(20)],
}
def hierarchical_contract() -> dict[str, Any]:
return {
"id": "xperience10m_128_hierarchical_action_targets_v1",
"status": "ready_for_export",
"purpose": "Reduce fine-grained label sparsity without changing the sealed 96/16/16 episode split.",
"target_fields": [
{
"field": "action_family",
"source": "normalized true action string",
"values": [family for family, _ in ACTION_FAMILY_PATTERNS] + ["other_fine_action", "unknown"],
"metric": "macro_f1",
},
{
"field": "action_verb",
"source": "first normalized verb phrase from action label",
"metric": "macro_f1 with train-seen and unseen slices",
},
{
"field": "fine_action",
"source": "existing action label",
"metric": "exact match and label-normalized semantic family match",
},
{
"field": "subtask_family",
"source": "normalized subtask phrase or main task fallback",
"metric": "accuracy and macro_f1",
},
{
"field": "contact_transition",
"source": "existing contact and transition fields",
"metric": "accuracy, balanced accuracy, calibration",
},
{
"field": "object_set",
"source": "existing objects list",
"metric": "micro_f1 and object-category recall",
},
],
"public_safety": [
"No raw MP4/HDF5/RRD files are written.",
"No full Qwen/Cosmos weights are mirrored.",
"Generated labels and aggregate metrics remain public-safe derived metadata.",
],
}
def experiment_backlog(public_result_dir: str) -> list[dict[str, Any]]:
return [
{
"id": "dense_window_export_v1",
"priority": 1,
"status": "ready_to_implement",
"goal": "Create a new dense-window export over the same 128 episodes without replacing existing JSONL packages.",
"expected_artifacts": [
"dataset_dense_20f_stride10.jsonl",
"dataset_dense_multiscale_manifest.json",
"label_family_distribution.json",
],
"gate": "episode ids and split assignment must exactly match the current 96/16/16 split",
},
{
"id": "hierarchical_qwen3_v5",
"priority": 2,
"status": "ready_after_dense_export",
"goal": "Train/evaluate Qwen3 with hierarchical action/subtask targets, constrained label options, and no-public-overwrite packaging.",
"suggested_setup": "high-rank LoRA or partial projector/last-layer unfreeze before full-parameter tuning",
"primary_comparison": "Qwen3 v4 action/subtask/next-action plus seen/unseen-label slices",
},
{
"id": "raw_feature_unblocker_128",
"priority": 3,
"status": "ready_to_implement_on_training_host",
"goal": "Export compact 128-episode raw feature shards for tasks currently marked unsupported_without_raw_128_feature_blocks.",
"target_tasks": [
"hand_trajectory_forecast",
"cross_modal_retrieval",
"modality_reconstruction",
"misalignment_detection",
],
},
{
"id": "cosmos3_fd_v2_multiscale",
"priority": 4,
"status": "ready_after_dense_export",
"goal": "Continue Cosmos3-Super forward-dynamics with multiscale horizons and temporal consistency metrics.",
"primary_comparison": "Cosmos3-Super Forward-Dynamics v1 validation/test MSE and rank-level loss records",
},
{
"id": "robustness_and_confidence_pack",
"priority": 5,
"status": "ready_from_existing_outputs",
"goal": "Add bootstrap confidence intervals, task-family slices, session slices, and random-time/random-label sanity checks.",
"public_output": f"{public_result_dir}/robustness_pack_v1.json",
},
]
def write_csv(path: Path, rows: list[dict[str, Any]], fieldnames: list[str]) -> None:
path.parent.mkdir(parents=True, exist_ok=True)
with path.open("w", encoding="utf-8", newline="") as handle:
writer = csv.DictWriter(handle, fieldnames=fieldnames, lineterminator="\n")
writer.writeheader()
writer.writerows({field: row.get(field) for field in fieldnames} for row in rows)
def markdown(payload: dict[str, Any]) -> str:
dense = payload["dense_window_scenarios"]
bottlenecks = payload["task_bottlenecks"]
qwen = payload["qwen_v4_error_pressure"]
selected_counts = payload["current_128_split"]["selected_episode_counts"]
windowed_counts = payload["current_128_split"]["windowed_episode_counts"]
split_windows = payload["current_128_split"]["split_windows"]
selected_text = f"train {selected_counts['train']} / val {selected_counts['val']} / test {selected_counts['test']}"
windowed_text = f"train {windowed_counts['train']} / val {windowed_counts['val']} / test {windowed_counts['test']}"
windows_text = f"train {split_windows['train']} / val {split_windows['val']} / test {split_windows['test']}"
lines = [
"# 128-Episode Task Suite Enhancement Pack",
"",
f"Run id: `{payload['run_id']}`",
"",
"This non-overwriting enhancement pack records how to push the current 128-episode task suite harder without adding more raw episodes.",
"",
"## Current Evidence",
"",
f"- Current public export windows: `{payload['current_128_split']['total_windows']}`",
f"- Window split counts: `{windows_text}`",
f"- Selected episode split: `{selected_text}`",
f"- Windowed episode ids in baseline CSV: `{windowed_text}`",
f"- Qwen3 v4 JSON validity: `{qwen['json_validity_rate']:.4f}`",
f"- Qwen3 v4 action macro-F1: `{qwen['action_macro_f1']:.6f}`",
f"- Qwen3 v4 subtask accuracy: `{qwen['subtask_accuracy']:.6f}`",
f"- Qwen3 v4 unseen-label sample share: `{qwen['unseen_label_sample_share']:.4f}`",
"",
"## Dense-Window Scenarios",
"",
"| scenario | estimated windows | multiplier | role |",
"| --- | ---: | ---: | --- |",
]
for item in dense:
lines.append(
f"| `{item['id']}` | {item['estimated_windows']} | {item['multiplier_vs_current_export']} | {item['role']} |"
)
lines.extend([
"",
"## Highest-Priority Bottlenecks",
"",
"| task | priority | simple score | bottleneck | next action |",
"| --- | --- | ---: | --- | --- |",
])
for item in bottlenecks[:8]:
score = item["simple_primary_score"]
score_text = "" if score is None else f"{score:.6f}"
lines.append(
f"| {item['display_name']} | {item['priority']} | {score_text} | {item['bottleneck']} | {item['next_action']} |"
)
lines.extend([
"",
"## Recommended Next Run",
"",
"Use `multiscale_20s10_40s20_80s40` as the next export target, then train a Qwen3 v5 hierarchical-target LoRA/partial-unfreeze run against the unchanged 96/16/16 episode split.",
"",
"In parallel, export compact raw 128-episode feature shards for trajectory, retrieval, reconstruction, and synchronization tasks so the simple and neural baselines can be fully aligned beyond the JSON-supported labels.",
"",
"The current artifacts remain the baseline; future runs should write new run ids and publish separate verified packages.",
"",
])
return "\n".join(lines)
def build_payload(run_id: str, output_dir: Path) -> dict[str, Any]:
windows = load_windows(WINDOWS_CSV)
baseline = read_json(BASELINE_SUMMARY)
qwen_metrics = read_json(QWEN_V4_METRICS)
qwen_predictions = load_jsonl(QWEN_V4_PREDICTIONS)
cosmos_summary = read_json(COSMOS_FD_SUMMARY)
per_episode_counts = Counter(row["episode_id"] for row in windows)
now = datetime.now(timezone.utc).isoformat(timespec="seconds")
current_split = {
"total_windows": len(windows),
"split_windows": split_counts(windows),
"selected_episode_counts": baseline.get("episode_counts"),
"windowed_episode_counts": episode_counts(windows),
"unique_main_tasks": len({row["main_task"] for row in windows}),
"windows_per_episode": {
"min": min(per_episode_counts.values()),
"median": statistics.median(per_episode_counts.values()),
"max": max(per_episode_counts.values()),
},
}
public_result_dir = output_dir.relative_to(ROOT).as_posix()
return {
"title": "Ropedia Xperience-10M 128-Episode Task Suite Enhancement Pack",
"status": "pass",
"run_id": run_id,
"generated_at_utc": now,
"scope": "No-new-episode enhancement plan over the current selected 128-episode 96/16/16 split.",
"current_128_split": current_split,
"dense_window_scenarios": dense_window_estimates(windows),
"hierarchical_target_contract": hierarchical_contract(),
"task_bottlenecks": task_bottlenecks(baseline),
"qwen_v4_error_pressure": qwen_error_summary(qwen_metrics, qwen_predictions),
"cosmos3_super_forward_dynamics_reference": {
"status": cosmos_summary.get("status"),
"run_id": cosmos_summary.get("run_id"),
"train_rows": cosmos_summary.get("train_rows"),
"val_rows": cosmos_summary.get("val_rows"),
"test_rows": cosmos_summary.get("test_rows"),
"test_mse": cosmos_summary.get("test_mse"),
"adapter_parameter_numel": cosmos_summary.get("adapter_parameter_numel"),
},
"experiment_backlog": experiment_backlog(public_result_dir),
"public_artifacts": {
"result_dir": public_result_dir,
"public_json": "docs/data/task_suite_enhancement_128.json",
"public_markdown": "TASK_SUITE_ENHANCEMENT_128.md",
},
"non_overwrite_policy": {
"result_directory_created_once": True,
"stable_public_summaries_update_to_latest_enhancement_pack": True,
"prior_model_result_packages_overwritten": False,
},
}
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument("--run-id", default=DEFAULT_RUN_ID)
parser.add_argument("--output-root", type=Path, default=DEFAULT_OUTPUT_ROOT)
parser.add_argument("--force", action="store_true", help="allow rewriting the run directory")
return parser.parse_args()
def main() -> int:
args = parse_args()
output_dir = args.output_root / args.run_id
if output_dir.exists() and not args.force:
raise SystemExit(f"Refusing to overwrite existing enhancement run: {output_dir}")
output_dir.mkdir(parents=True, exist_ok=args.force)
payload = build_payload(args.run_id, output_dir)
report_md = markdown(payload)
write_json(output_dir / "enhancement_plan.json", payload)
write_json(output_dir / "hierarchical_target_contract.json", payload["hierarchical_target_contract"])
write_json(output_dir / "experiment_backlog.json", {"status": "pass", "experiments": payload["experiment_backlog"]})
write_json(PUBLIC_JSON, payload)
(output_dir / "ENHANCEMENT_REPORT.md").write_text(report_md, encoding="utf-8")
PUBLIC_MD.write_text(report_md, encoding="utf-8")
write_csv(
output_dir / "dense_window_scenarios.csv",
payload["dense_window_scenarios"],
["id", "estimated_windows", "multiplier_vs_current_export", "window_frames", "stride_frames", "role", "source_note"],
)
write_csv(
output_dir / "task_bottlenecks.csv",
payload["task_bottlenecks"],
[
"task",
"display_name",
"priority",
"simple_status",
"simple_primary_metric",
"simple_primary_score",
"neural_status",
"neural_primary_score",
"num_classes",
"unseen_test_class_count",
"bottleneck",
"next_action",
],
)
write_csv(
output_dir / "qwen_action_family_error_summary.csv",
payload["qwen_v4_error_pressure"]["action_family_error_summary"],
["family", "samples", "action_exact_rate", "seen_share", "contact_exact_rate", "transition_exact_rate"],
)
print(f"PASS: wrote {output_dir / 'enhancement_plan.json'}")
print(f"PASS: wrote {PUBLIC_JSON}")
print(f"PASS: wrote {PUBLIC_MD}")
return 0
if __name__ == "__main__":
raise SystemExit(main())