ropedia-xperience-10m-task-baselines / scripts /omni /build_selection_episode_manifest.py
cy0307's picture
Update final Qwen model scripts
627e5d7 verified
Raw
History Blame
7.53 kB
#!/usr/bin/env python3
"""Build a manifest from a fixed selected-episode list.
This is used for progressive train/validation runs while the remaining held-out
test episodes are still staging. Splits come from the selection file, not from a
fresh random split, so the final test set stays sealed.
"""
from __future__ import annotations
import argparse
import json
from collections import Counter
from pathlib import Path
from build_episode_manifest import add_toolkit_to_path, inspect_episode
from qwen3_omni_dataset_utils import VIDEO_NAMES
def parse_args() -> argparse.Namespace:
workspace_default = Path(__file__).resolve().parents[2]
parser = argparse.ArgumentParser(description="Build a manifest from selected Xperience-10M episodes.")
parser.add_argument("--workspace", type=Path, default=workspace_default)
parser.add_argument("--data-root", type=Path, action="append", required=True)
parser.add_argument("--selection-json", type=Path, required=True)
parser.add_argument("--output", type=Path, required=True)
parser.add_argument("--report-output", type=Path)
parser.add_argument("--include-split", choices=["train", "val", "test"], action="append")
parser.add_argument("--require-all-videos", action=argparse.BooleanOptionalAction, default=True)
parser.add_argument("--min-train-episodes", type=int, default=1)
parser.add_argument("--min-val-episodes", type=int, default=1)
parser.add_argument("--window-frames", type=int, default=20)
parser.add_argument("--stride-frames", type=int, default=20)
parser.add_argument("--min-label-fraction", type=float, default=0.6)
return parser.parse_args()
def is_complete_episode(path: Path, require_all_videos: bool) -> tuple[bool, list[str]]:
missing = []
if not (path / "annotation.hdf5").is_file():
missing.append("annotation.hdf5")
if require_all_videos:
for name in VIDEO_NAMES:
if not (path / name).is_file():
missing.append(name)
elif not any((path / name).is_file() for name in VIDEO_NAMES):
missing.append("any_mp4")
return not missing, missing
def selected_episode_path(data_roots: list[Path], episode_path: str) -> Path | None:
for root in data_roots:
candidate = root / episode_path
if candidate.exists():
return candidate.resolve()
return None
def write_report(path: Path, summary: dict) -> None:
lines = [
"# Progressive Train/Validation Manifest",
"",
f"- Selected episodes: `{summary['selected_episode_count']}`",
f"- Included splits: `{', '.join(summary['included_splits'])}`",
f"- Included complete episodes: `{summary['included_episode_count']}`",
f"- Included split counts: `{summary['included_split_counts']}`",
f"- Complete selected episodes by split: `{summary['complete_selected_split_counts']}`",
f"- Available test episodes kept sealed: `{summary['sealed_test_episodes_available']}`",
f"- Require all six videos: `{summary['require_all_videos']}`",
"",
"The manifest uses the split labels from the fixed 128-episode selection file.",
"Episodes assigned to the held-out test split are reported but not included unless explicitly requested.",
]
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text("\n".join(lines) + "\n", encoding="utf-8")
def main() -> int:
args = parse_args()
args.workspace = args.workspace.expanduser().resolve()
data_roots = [path.expanduser().resolve() for path in args.data_root]
include_splits = args.include_split or ["train", "val"]
add_toolkit_to_path(args.workspace)
selection = json.loads(args.selection_json.expanduser().read_text(encoding="utf-8"))
selected = selection.get("selected_episodes", [])
if not selected:
raise ValueError(f"No selected_episodes found in {args.selection_json}")
episodes = []
missing_or_incomplete = []
complete_counts = Counter()
selected_counts = Counter(str(item.get("split", "unspecified")) for item in selected)
for item in selected:
split = str(item.get("split", "unspecified"))
rel_path = str(item.get("episode_path", ""))
episode_dir = selected_episode_path(data_roots, rel_path)
if episode_dir is None:
missing_or_incomplete.append({"episode_path": rel_path, "split": split, "missing": ["episode_dir"]})
continue
complete, missing = is_complete_episode(episode_dir, args.require_all_videos)
if not complete:
missing_or_incomplete.append({"episode_path": rel_path, "split": split, "missing": missing})
continue
complete_counts[split] += 1
if split not in include_splits:
continue
episode = inspect_episode(episode_dir / "annotation.hdf5", args)
unique_id = rel_path.replace("/", "__")
episode.update({
"episode_id": unique_id,
"source_episode_id": episode_dir.name,
"episode_path": rel_path,
"split": split,
"selection_rank": item.get("selection_rank"),
"selection_score": item.get("selection_score"),
"size_band": item.get("size_band"),
"top_level_session": item.get("top_level_session"),
})
episodes.append(episode)
included_counts = Counter(ep["split"] for ep in episodes)
if included_counts.get("train", 0) < args.min_train_episodes:
raise SystemExit(f"Only {included_counts.get('train', 0)} train episodes available; need {args.min_train_episodes}.")
if "val" in include_splits and included_counts.get("val", 0) < args.min_val_episodes:
raise SystemExit(f"Only {included_counts.get('val', 0)} val episodes available; need {args.min_val_episodes}.")
summary = {
"selection_json": str(args.selection_json),
"selected_episode_count": len(selected),
"selected_split_counts": dict(selected_counts),
"complete_selected_split_counts": dict(complete_counts),
"included_splits": include_splits,
"included_episode_count": len(episodes),
"included_split_counts": dict(included_counts),
"sealed_test_episodes_available": complete_counts.get("test", 0) if "test" not in include_splits else 0,
"require_all_videos": args.require_all_videos,
"train_minimal_bytes": sum(ep["train_minimal_bytes"] for ep in episodes),
"total_bytes": sum(ep["total_bytes"] for ep in episodes),
"windowing": {
"window_frames": args.window_frames,
"stride_frames": args.stride_frames,
"min_label_fraction": args.min_label_fraction,
},
"notes": [
"Splits are inherited from the fixed selected-episode file.",
"Held-out test episodes are excluded by default for progressive train/validation runs.",
"Episode ids are session-qualified to avoid collisions between repeated ep1/ep2 folder names.",
],
}
payload = {"summary": summary, "episodes": episodes, "missing_or_incomplete": missing_or_incomplete}
args.output.parent.mkdir(parents=True, exist_ok=True)
args.output.write_text(json.dumps(payload, indent=2), encoding="utf-8")
if args.report_output:
write_report(args.report_output.expanduser(), summary)
print(json.dumps(summary, indent=2))
print(f"Wrote {args.output}")
return 0
if __name__ == "__main__":
raise SystemExit(main())