File size: 7,534 Bytes
627e5d7
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
#!/usr/bin/env python3
"""Build a manifest from a fixed selected-episode list.

This is used for progressive train/validation runs while the remaining held-out
test episodes are still staging. Splits come from the selection file, not from a
fresh random split, so the final test set stays sealed.
"""

from __future__ import annotations

import argparse
import json
from collections import Counter
from pathlib import Path

from build_episode_manifest import add_toolkit_to_path, inspect_episode
from qwen3_omni_dataset_utils import VIDEO_NAMES


def parse_args() -> argparse.Namespace:
    workspace_default = Path(__file__).resolve().parents[2]
    parser = argparse.ArgumentParser(description="Build a manifest from selected Xperience-10M episodes.")
    parser.add_argument("--workspace", type=Path, default=workspace_default)
    parser.add_argument("--data-root", type=Path, action="append", required=True)
    parser.add_argument("--selection-json", type=Path, required=True)
    parser.add_argument("--output", type=Path, required=True)
    parser.add_argument("--report-output", type=Path)
    parser.add_argument("--include-split", choices=["train", "val", "test"], action="append")
    parser.add_argument("--require-all-videos", action=argparse.BooleanOptionalAction, default=True)
    parser.add_argument("--min-train-episodes", type=int, default=1)
    parser.add_argument("--min-val-episodes", type=int, default=1)
    parser.add_argument("--window-frames", type=int, default=20)
    parser.add_argument("--stride-frames", type=int, default=20)
    parser.add_argument("--min-label-fraction", type=float, default=0.6)
    return parser.parse_args()


def is_complete_episode(path: Path, require_all_videos: bool) -> tuple[bool, list[str]]:
    missing = []
    if not (path / "annotation.hdf5").is_file():
        missing.append("annotation.hdf5")
    if require_all_videos:
        for name in VIDEO_NAMES:
            if not (path / name).is_file():
                missing.append(name)
    elif not any((path / name).is_file() for name in VIDEO_NAMES):
        missing.append("any_mp4")
    return not missing, missing


def selected_episode_path(data_roots: list[Path], episode_path: str) -> Path | None:
    for root in data_roots:
        candidate = root / episode_path
        if candidate.exists():
            return candidate.resolve()
    return None


def write_report(path: Path, summary: dict) -> None:
    lines = [
        "# Progressive Train/Validation Manifest",
        "",
        f"- Selected episodes: `{summary['selected_episode_count']}`",
        f"- Included splits: `{', '.join(summary['included_splits'])}`",
        f"- Included complete episodes: `{summary['included_episode_count']}`",
        f"- Included split counts: `{summary['included_split_counts']}`",
        f"- Complete selected episodes by split: `{summary['complete_selected_split_counts']}`",
        f"- Available test episodes kept sealed: `{summary['sealed_test_episodes_available']}`",
        f"- Require all six videos: `{summary['require_all_videos']}`",
        "",
        "The manifest uses the split labels from the fixed 128-episode selection file.",
        "Episodes assigned to the held-out test split are reported but not included unless explicitly requested.",
    ]
    path.parent.mkdir(parents=True, exist_ok=True)
    path.write_text("\n".join(lines) + "\n", encoding="utf-8")


def main() -> int:
    args = parse_args()
    args.workspace = args.workspace.expanduser().resolve()
    data_roots = [path.expanduser().resolve() for path in args.data_root]
    include_splits = args.include_split or ["train", "val"]
    add_toolkit_to_path(args.workspace)

    selection = json.loads(args.selection_json.expanduser().read_text(encoding="utf-8"))
    selected = selection.get("selected_episodes", [])
    if not selected:
        raise ValueError(f"No selected_episodes found in {args.selection_json}")

    episodes = []
    missing_or_incomplete = []
    complete_counts = Counter()
    selected_counts = Counter(str(item.get("split", "unspecified")) for item in selected)

    for item in selected:
        split = str(item.get("split", "unspecified"))
        rel_path = str(item.get("episode_path", ""))
        episode_dir = selected_episode_path(data_roots, rel_path)
        if episode_dir is None:
            missing_or_incomplete.append({"episode_path": rel_path, "split": split, "missing": ["episode_dir"]})
            continue
        complete, missing = is_complete_episode(episode_dir, args.require_all_videos)
        if not complete:
            missing_or_incomplete.append({"episode_path": rel_path, "split": split, "missing": missing})
            continue
        complete_counts[split] += 1
        if split not in include_splits:
            continue

        episode = inspect_episode(episode_dir / "annotation.hdf5", args)
        unique_id = rel_path.replace("/", "__")
        episode.update({
            "episode_id": unique_id,
            "source_episode_id": episode_dir.name,
            "episode_path": rel_path,
            "split": split,
            "selection_rank": item.get("selection_rank"),
            "selection_score": item.get("selection_score"),
            "size_band": item.get("size_band"),
            "top_level_session": item.get("top_level_session"),
        })
        episodes.append(episode)

    included_counts = Counter(ep["split"] for ep in episodes)
    if included_counts.get("train", 0) < args.min_train_episodes:
        raise SystemExit(f"Only {included_counts.get('train', 0)} train episodes available; need {args.min_train_episodes}.")
    if "val" in include_splits and included_counts.get("val", 0) < args.min_val_episodes:
        raise SystemExit(f"Only {included_counts.get('val', 0)} val episodes available; need {args.min_val_episodes}.")

    summary = {
        "selection_json": str(args.selection_json),
        "selected_episode_count": len(selected),
        "selected_split_counts": dict(selected_counts),
        "complete_selected_split_counts": dict(complete_counts),
        "included_splits": include_splits,
        "included_episode_count": len(episodes),
        "included_split_counts": dict(included_counts),
        "sealed_test_episodes_available": complete_counts.get("test", 0) if "test" not in include_splits else 0,
        "require_all_videos": args.require_all_videos,
        "train_minimal_bytes": sum(ep["train_minimal_bytes"] for ep in episodes),
        "total_bytes": sum(ep["total_bytes"] for ep in episodes),
        "windowing": {
            "window_frames": args.window_frames,
            "stride_frames": args.stride_frames,
            "min_label_fraction": args.min_label_fraction,
        },
        "notes": [
            "Splits are inherited from the fixed selected-episode file.",
            "Held-out test episodes are excluded by default for progressive train/validation runs.",
            "Episode ids are session-qualified to avoid collisions between repeated ep1/ep2 folder names.",
        ],
    }
    payload = {"summary": summary, "episodes": episodes, "missing_or_incomplete": missing_or_incomplete}
    args.output.parent.mkdir(parents=True, exist_ok=True)
    args.output.write_text(json.dumps(payload, indent=2), encoding="utf-8")
    if args.report_output:
        write_report(args.report_output.expanduser(), summary)
    print(json.dumps(summary, indent=2))
    print(f"Wrote {args.output}")
    return 0


if __name__ == "__main__":
    raise SystemExit(main())