ropedia-xperience-10m-task-baselines / scripts /build_omni_model_comparison.py
cy0307's picture
Update final Qwen model scripts
627e5d7 verified
Raw
History Blame
14.1 kB
#!/usr/bin/env python3
"""Build a compact comparison of the current single-episode and 128-episode runs."""
from __future__ import annotations
import csv
import json
from datetime import datetime, timezone
from pathlib import Path
from typing import Any
ROOT = Path(__file__).resolve().parents[2]
OUTPUT_JSON = ROOT / "docs/data/omni_model_comparison.json"
OUTPUT_MD = ROOT / "results/omni_finetune/OMNI_MODEL_COMPARISON.md"
VERIFIED_PUBLIC = ROOT / "results/omni_finetune/verified_public"
PRIMARY_METRICS = {
"timeline_action": "macro_f1",
"timeline_subtask": "macro_f1",
"transition_detection": "macro_f1",
"next_action": "macro_f1",
"hand_trajectory_forecast": "mpjpe",
"contact_prediction": "macro_f1",
"object_relevance": "micro_f1",
"caption_grounding": "mrr",
"cross_modal_retrieval": "mrr",
"modality_reconstruction": "r2",
"temporal_order": "accuracy",
"misalignment_detection": "f1",
}
TASK_DISPLAY_NAMES = {
"timeline_action": "Action Recognition",
"timeline_subtask": "Procedure Step Recognition",
"transition_detection": "Action Boundary Detection",
"next_action": "Next-Action Prediction",
"hand_trajectory_forecast": "Hand Trajectory Forecasting",
"contact_prediction": "Contact State Prediction",
"object_relevance": "Object Relevance Prediction",
"caption_grounding": "Language Grounding",
"cross_modal_retrieval": "Cross-Modal Retrieval",
"modality_reconstruction": "Cross-Modal Reconstruction",
"temporal_order": "Temporal Order Verification",
"misalignment_detection": "Multimodal Synchronization Detection",
}
def load_json(path: Path) -> dict[str, Any]:
if not path.exists():
return {}
return json.loads(path.read_text(encoding="utf-8"))
def rel(path: Path) -> str:
return path.relative_to(ROOT).as_posix()
def scalar(value: Any) -> float | int | str | None:
if isinstance(value, (float, int, str)) or value is None:
return value
return None
def metric_from_task(task_id: str, metrics: dict[str, Any]) -> tuple[str, float | int | str | None]:
metric_name = PRIMARY_METRICS.get(task_id, "primary_score")
if metric_name in metrics:
return metric_name, scalar(metrics.get(metric_name))
if "primary_metric" in metrics:
return str(metrics.get("primary_metric")), scalar(metrics.get("primary_score"))
return metric_name, None
def single_episode_summary() -> dict[str, Any]:
path = ROOT / "results/episode_task_suite/summary_report.json"
summary = load_json(path)
tasks = summary.get("tasks", {}) if isinstance(summary.get("tasks"), dict) else {}
neural = summary.get("neural_tasks", {}) if isinstance(summary.get("neural_tasks"), dict) else {}
task_rows = []
for task_id in sorted(TASK_DISPLAY_NAMES):
simple_metric, simple_score = metric_from_task(task_id, tasks.get(task_id, {}))
neural_metric, neural_score = metric_from_task(task_id, neural.get(task_id, {}))
task_rows.append(
{
"task": task_id,
"task_display_name": TASK_DISPLAY_NAMES[task_id],
"simple_status": "pass" if task_id in tasks else "missing",
"simple_primary_metric": simple_metric,
"simple_primary_score": simple_score,
"neural_status": "pass" if task_id in neural else "missing",
"neural_primary_metric": neural_metric,
"neural_primary_score": neural_score,
}
)
return {
"id": "v1_single_episode_public_sample",
"title": "Single-Episode Public-Sample Task Suite",
"status": "verified",
"scope": "one public Xperience-10M sample episode",
"source": rel(path),
"split": "chronological 70/30 within one episode",
"counts": {
"episodes": 1,
"windows": summary.get("num_windows"),
"frames": summary.get("num_frames"),
"feature_dim": summary.get("feature_dim"),
"task_count": len(tasks),
"neural_task_count": len(neural),
},
"models": ["minimal task heads", "compact neural MLP task heads"],
"task_metrics": task_rows,
"interpretation": (
"This layer verifies the 12 task contracts and raw multimodal feature "
"pipeline on the public sample. It is not a cross-episode benchmark."
),
}
def read_baseline_csv(path: Path) -> list[dict[str, Any]]:
if not path.exists():
return []
rows: list[dict[str, Any]] = []
with path.open("r", encoding="utf-8", newline="") as handle:
for row in csv.DictReader(handle):
item: dict[str, Any] = dict(row)
for key in ("simple_primary_score", "neural_primary_score"):
if item.get(key) in ("", None):
item[key] = None
else:
item[key] = float(item[key])
task_id = str(item.get("task", ""))
item["task_display_name"] = TASK_DISPLAY_NAMES.get(task_id, task_id.replace("_", " ").title())
rows.append(item)
return rows
def aligned_baseline_summary() -> dict[str, Any]:
summary_path = ROOT / "results/omni_finetune/multi_episode_128_task_baselines/summary_report.json"
csv_path = ROOT / "results/omni_finetune/multi_episode_128_task_baselines/task_metrics.csv"
report_path = ROOT / "results/omni_finetune/multi_episode_128_task_baselines/BASELINE_ALIGNMENT_REPORT.md"
summary = load_json(summary_path)
task_rows = read_baseline_csv(csv_path)
supported_simple = sum(1 for row in task_rows if row.get("simple_status") == "pass")
supported_neural = sum(1 for row in task_rows if row.get("neural_status") == "pass")
return {
"id": "v2_multi_episode_128_aligned_metadata_baselines",
"title": "128-Episode Aligned Simple/NN Baselines",
"status": summary.get("status", "unknown"),
"scope": "selected 128-episode 96/16/16 split",
"source": rel(report_path),
"split": "train/val/test by selected episode/session",
"counts": {
"rows": summary.get("num_rows"),
"split_counts": summary.get("split_counts"),
"episode_counts": summary.get("episode_counts"),
"task_count": len(task_rows),
"simple_supported_task_count": supported_simple,
"neural_supported_task_count": supported_neural,
},
"models": ["metadata/text simple baselines", "metadata/text neural MLP baselines"],
"task_metrics": task_rows,
"interpretation": (
"This layer aligns the previous simple and neural baseline framing to "
"the same selected 96/16/16 split used by the model branches. It uses "
"public-safe JSONL metadata/text features, so raw-feature-only tasks "
"remain explicitly unsupported until 128-run sensor feature blocks exist."
),
}
def verified_summaries() -> list[dict[str, Any]]:
out = []
for path in sorted(VERIFIED_PUBLIC.glob("*/verified_result_summary.json")):
payload = load_json(path)
if not payload:
continue
payload["_summary_path"] = rel(path)
out.append(payload)
return out
def model_branch_entry(payload: dict[str, Any]) -> dict[str, Any]:
eval_payload = payload.get("eval", {})
training = payload.get("training", {})
dataset = payload.get("dataset", {})
return {
"id": payload.get("eval_run_id"),
"title": payload.get("backbone_display_name", payload.get("backbone")),
"status": payload.get("status"),
"backbone": payload.get("backbone"),
"dataset_contract": payload.get("dataset_contract"),
"training_objective": payload.get("training_objective"),
"source": payload.get("_summary_path"),
"dataset_run_id": payload.get("dataset_run_id"),
"train_run_id": payload.get("train_run_id"),
"eval_run_id": payload.get("eval_run_id"),
"counts": {
"dataset_samples": dataset.get("num_samples"),
"dataset_episodes": dataset.get("num_episodes"),
"split_counts": dataset.get("split_counts"),
"train_samples": training.get("num_train_samples"),
"val_samples": training.get("num_val_samples"),
"eval_samples": eval_payload.get("num_samples"),
"held_out_episode_count": eval_payload.get("held_out_episode_count"),
"num_processes": training.get("num_processes"),
},
"primary_metrics": eval_payload.get("primary_metrics", {}),
"history": training.get("history", []),
}
def model_branch_summary() -> dict[str, Any]:
branches = [model_branch_entry(payload) for payload in verified_summaries()]
qwen = [item for item in branches if item.get("backbone") == "qwen3_omni_lora"]
cosmos = [item for item in branches if item.get("backbone") == "cosmos_world_model"]
return {
"id": "v3_multi_episode_foundation_model_branches",
"title": "128-Episode Foundation-Model Branches",
"status": "partial_verified",
"scope": "selected 128-episode split and compatible derived windows",
"source": "results/omni_finetune/verified_public/",
"split": "episode/session held-out split; exact task target depends on backbone contract",
"counts": {
"verified_branch_count": len(branches),
"qwen3_verified_package_count": len(qwen),
"cosmos3_verified_package_count": len(cosmos),
},
"models": ["Qwen3-Omni LoRA", "Cosmos3-Nano future-window compatibility branch"],
"branches": branches,
"interpretation": (
"This layer contains the held-out foundation-model packages. Qwen3-Omni "
"packages evaluate structured JSON task prediction; Cosmos3-Nano currently "
"evaluates a future-window world-model compatibility adapter, not a full "
"diffusion-weight fine-tune."
),
}
def build_report() -> dict[str, Any]:
versions = [single_episode_summary(), aligned_baseline_summary(), model_branch_summary()]
return {
"title": "Ropedia Xperience-10M Current Result Versions",
"generated_at_utc": datetime.now(timezone.utc).isoformat(timespec="seconds"),
"status": "pass",
"version_count": len(versions),
"comparison_rule": (
"Compare only rows with the same scope and target. Single-episode raw-feature "
"metrics, 128-episode metadata baselines, Qwen3 structured JSON metrics, and "
"Cosmos3 future-window metrics answer different questions."
),
"versions": versions,
"pending": [
"Replace the latest Qwen3 branch entry after the in-progress two-epoch full run completes held-out eval and packaging.",
"Promote Cosmos3 from compatibility adapter to full Cosmos3 fine-tuning only after a separate environment with matching Diffusers/Cosmos dependencies is prepared.",
],
}
def fmt_score(value: Any) -> str:
if value is None:
return ""
if isinstance(value, float):
return f"{value:.4f}"
return str(value)
def markdown(report: dict[str, Any]) -> str:
lines = [
"# Omni Model Comparison",
"",
f"Generated: `{report['generated_at_utc']}`",
"",
report["comparison_rule"],
"",
"## Current Result Versions",
"",
"| version | status | scope | source |",
"| --- | --- | --- | --- |",
]
for version in report["versions"]:
lines.append(
"| {title} | {status} | {scope} | `{source}` |".format(
title=version["title"],
status=version.get("status"),
scope=version.get("scope"),
source=version.get("source"),
)
)
lines.extend(["", "## 128-Episode Task Baselines", "", "| task | simple | neural |", "| --- | ---: | ---: |"])
baseline = report["versions"][1]
for row in baseline.get("task_metrics", []):
simple = f"{row.get('simple_primary_metric') or ''} {fmt_score(row.get('simple_primary_score'))}".strip()
neural = f"{row.get('neural_primary_metric') or ''} {fmt_score(row.get('neural_primary_score'))}".strip()
lines.append(f"| {row.get('task_display_name')} | {simple} | {neural} |")
lines.extend(["", "## Verified Model Branches", "", "| branch | backbone | eval samples | held-out episodes | key metrics |", "| --- | --- | ---: | ---: | --- |"])
for branch in report["versions"][2].get("branches", []):
metrics = branch.get("primary_metrics", {})
key_metrics = ", ".join(
f"{key}={fmt_score(value)}"
for key, value in metrics.items()
if key in {"json_validity_rate", "action_macro_f1", "future_retrieval_mrr", "temporal_consistency", "transition_accuracy", "contact_accuracy"}
)
counts = branch.get("counts", {})
lines.append(
"| {title} | `{backbone}` | {samples} | {episodes} | {metrics} |".format(
title=branch.get("title"),
backbone=branch.get("backbone"),
samples=counts.get("eval_samples", ""),
episodes=counts.get("held_out_episode_count", ""),
metrics=key_metrics,
)
)
lines.extend(["", "## Pending", ""])
lines.extend(f"- {item}" for item in report.get("pending", []))
lines.append("")
return "\n".join(lines)
def main() -> int:
report = build_report()
OUTPUT_JSON.parent.mkdir(parents=True, exist_ok=True)
OUTPUT_MD.parent.mkdir(parents=True, exist_ok=True)
OUTPUT_JSON.write_text(json.dumps(report, indent=2) + "\n", encoding="utf-8")
OUTPUT_MD.write_text(markdown(report), encoding="utf-8")
print(f"PASS: wrote {OUTPUT_JSON}")
print(f"PASS: wrote {OUTPUT_MD}")
return 0
if __name__ == "__main__":
raise SystemExit(main())