| |
| """Select a metadata-balanced Xperience-10M pilot subset. |
| |
| The selector uses Hugging Face file metadata only. It does not download episode |
| data. Content-category balancing is deferred until annotations are staged, |
| because category text lives inside annotation.hdf5 files. |
| """ |
|
|
| from __future__ import annotations |
|
|
| import argparse |
| import csv |
| import getpass |
| import hashlib |
| import json |
| import os |
| import re |
| from collections import Counter, defaultdict |
| from datetime import datetime, timezone |
| from pathlib import Path |
| from statistics import median |
| from typing import Any |
|
|
| from huggingface_hub import HfApi |
|
|
|
|
| REQUIRED_FILES = [ |
| "annotation.hdf5", |
| "fisheye_cam0.mp4", |
| "fisheye_cam1.mp4", |
| "fisheye_cam2.mp4", |
| "fisheye_cam3.mp4", |
| "stereo_left.mp4", |
| "stereo_right.mp4", |
| ] |
| EXCLUDED_TRAINING_FILES = {"visualization.rrd"} |
| SIZE_BANDS = ["short", "lower_mid", "upper_mid", "long"] |
|
|
|
|
| def parse_args() -> argparse.Namespace: |
| parser = argparse.ArgumentParser(description=__doc__) |
| parser.add_argument("--repo-id", default="ropedia-ai/xperience-10m") |
| parser.add_argument("--target-episodes", type=int, default=128) |
| parser.add_argument("--seed", type=int, default=7) |
| parser.add_argument("--train-fraction", type=float, default=0.75) |
| parser.add_argument("--val-fraction", type=float, default=0.125) |
| parser.add_argument("--test-fraction", type=float, default=0.125) |
| parser.add_argument("--drop-bottom-annotation-percentile", type=float, default=0.05) |
| parser.add_argument("--drop-bottom-training-percentile", type=float, default=0.05) |
| parser.add_argument("--min-annotation-gib", type=float, default=0.5) |
| parser.add_argument("--windows-per-episode", type=int, default=256) |
| parser.add_argument("--output-json", type=Path, default=Path("results/omni_finetune/xperience10m_128_episode_selection.json")) |
| parser.add_argument("--output-csv", type=Path, default=Path("results/omni_finetune/xperience10m_128_episode_selection.csv")) |
| parser.add_argument("--download-list-output", type=Path, default=Path("results/omni_finetune/xperience10m_128_episode_download_files.txt")) |
| parser.add_argument("--report-output", type=Path, default=Path("results/omni_finetune/XPERIENCE10M_128_EPISODE_SELECTION.md")) |
| parser.add_argument("--token", default=os.environ.get("HF_TOKEN", "").strip()) |
| return parser.parse_args() |
|
|
|
|
| def file_size(sibling: Any) -> int: |
| value = getattr(sibling, "size", None) |
| if isinstance(value, int): |
| return value |
| lfs = getattr(sibling, "lfs", None) |
| if isinstance(lfs, dict) and isinstance(lfs.get("size"), int): |
| return int(lfs["size"]) |
| return 0 |
|
|
|
|
| def human_bytes(num: float | int) -> str: |
| value = float(num) |
| for unit in ["B", "KiB", "MiB", "GiB", "TiB"]: |
| if abs(value) < 1024.0 or unit == "TiB": |
| return f"{value:.2f} {unit}" |
| value /= 1024.0 |
| return f"{value:.2f} TiB" |
|
|
|
|
| def quantile(values: list[int], q: float) -> int: |
| if not values: |
| return 0 |
| ordered = sorted(values) |
| if len(ordered) == 1: |
| return ordered[0] |
| pos = min(max(q, 0.0), 1.0) * (len(ordered) - 1) |
| lo = int(pos) |
| hi = min(lo + 1, len(ordered) - 1) |
| frac = pos - lo |
| return int(round(ordered[lo] * (1.0 - frac) + ordered[hi] * frac)) |
|
|
|
|
| def summarize_sizes(values: list[int]) -> dict[str, Any]: |
| if not values: |
| return {"count": 0} |
| ordered = sorted(values) |
| return { |
| "count": len(ordered), |
| "min_bytes": ordered[0], |
| "p05_bytes": quantile(ordered, 0.05), |
| "p25_bytes": quantile(ordered, 0.25), |
| "median_bytes": int(median(ordered)), |
| "p75_bytes": quantile(ordered, 0.75), |
| "p95_bytes": quantile(ordered, 0.95), |
| "max_bytes": ordered[-1], |
| "mean_bytes": int(sum(ordered) / len(ordered)), |
| "min_human": human_bytes(ordered[0]), |
| "p05_human": human_bytes(quantile(ordered, 0.05)), |
| "p25_human": human_bytes(quantile(ordered, 0.25)), |
| "median_human": human_bytes(median(ordered)), |
| "p75_human": human_bytes(quantile(ordered, 0.75)), |
| "p95_human": human_bytes(quantile(ordered, 0.95)), |
| "max_human": human_bytes(ordered[-1]), |
| "mean_human": human_bytes(sum(ordered) / len(ordered)), |
| } |
|
|
|
|
| def stable_hash(seed: int, text: str) -> str: |
| return hashlib.sha256(f"{seed}:{text}".encode("utf-8")).hexdigest() |
|
|
|
|
| def stable_float(seed: int, text: str) -> float: |
| return int(stable_hash(seed, text)[:12], 16) / float(16**12) |
|
|
|
|
| def episode_number(episode_id: str) -> int | None: |
| match = re.fullmatch(r"ep(\d+)", episode_id) |
| return int(match.group(1)) if match else None |
|
|
|
|
| def size_band(annotation_bytes: int, q25: int, q50: int, q75: int) -> str: |
| if annotation_bytes <= q25: |
| return "short" |
| if annotation_bytes <= q50: |
| return "lower_mid" |
| if annotation_bytes <= q75: |
| return "upper_mid" |
| return "long" |
|
|
|
|
| def build_episode_records(siblings: list[Any]) -> list[dict[str, Any]]: |
| by_parent: dict[str, dict[str, Any]] = defaultdict(lambda: {"files": {}, "bytes": 0}) |
| for sibling in siblings: |
| path = str(getattr(sibling, "rfilename", "")) |
| if not path or path == ".gitattributes": |
| continue |
| name = Path(path).name |
| parent = Path(path).parent.as_posix() |
| if not parent: |
| continue |
| size = file_size(sibling) |
| bucket = by_parent[parent] |
| bucket["files"][name] = {"path": path, "bytes": size} |
| bucket["bytes"] += size |
|
|
| records = [] |
| for parent, bucket in by_parent.items(): |
| files = bucket["files"] |
| present = set(files) |
| if "annotation.hdf5" not in present: |
| continue |
| has_all_six_videos = all(name in present for name in REQUIRED_FILES[1:]) |
| training_bytes = sum( |
| meta["bytes"] |
| for name, meta in files.items() |
| if name not in EXCLUDED_TRAINING_FILES |
| ) |
| records.append( |
| { |
| "episode_path": parent, |
| "episode_id": Path(parent).name, |
| "episode_number": episode_number(Path(parent).name), |
| "top_level_session": parent.split("/", 1)[0], |
| "file_count": len(present), |
| "total_bytes": int(bucket["bytes"]), |
| "training_bytes_excluding_visualization_rrd": int(training_bytes), |
| "annotation_bytes": int(files["annotation.hdf5"]["bytes"]), |
| "video_bytes": int(sum(files[name]["bytes"] for name in REQUIRED_FILES[1:] if name in files)), |
| "has_annotation": True, |
| "has_all_six_videos": has_all_six_videos, |
| "has_visualization_rrd": "visualization.rrd" in present, |
| "missing_required_files": [name for name in REQUIRED_FILES if name not in present], |
| "download_files": [files[name]["path"] for name in REQUIRED_FILES if name in files], |
| } |
| ) |
| return records |
|
|
|
|
| def choose_target_counts(target: int) -> dict[str, int]: |
| base = target // len(SIZE_BANDS) |
| remainder = target % len(SIZE_BANDS) |
| return { |
| band: base + (1 if idx < remainder else 0) |
| for idx, band in enumerate(SIZE_BANDS) |
| } |
|
|
|
|
| def select_balanced(records: list[dict[str, Any]], target: int, seed: int) -> list[dict[str, Any]]: |
| counts = choose_target_counts(target) |
| by_band: dict[str, list[dict[str, Any]]] = {band: [] for band in SIZE_BANDS} |
| band_medians = { |
| band: median([record["annotation_bytes"] for record in records if record["size_band"] == band]) |
| for band in SIZE_BANDS |
| if any(record["size_band"] == band for record in records) |
| } |
|
|
| |
| |
| session_band_best: dict[tuple[str, str], dict[str, Any]] = {} |
| global_training_median = median([record["training_bytes_excluding_visualization_rrd"] for record in records]) |
| for record in records: |
| band = record["size_band"] |
| band_median = float(band_medians.get(band, record["annotation_bytes"]) or 1.0) |
| size_score = abs(record["annotation_bytes"] - band_median) / band_median |
| training_score = abs(record["training_bytes_excluding_visualization_rrd"] - global_training_median) / float(global_training_median or 1.0) |
| ep_num = record["episode_number"] |
| index_score = 0.0 if ep_num is None else min(ep_num / 64.0, 1.0) * 0.02 |
| tie = stable_float(seed, record["episode_path"]) * 0.001 |
| record["selection_score"] = round(float(size_score + 0.25 * training_score + index_score + tie), 8) |
| key = (record["top_level_session"], band) |
| current = session_band_best.get(key) |
| if current is None or record["selection_score"] < current["selection_score"]: |
| session_band_best[key] = record |
|
|
| for record in session_band_best.values(): |
| by_band[record["size_band"]].append(record) |
| for band in SIZE_BANDS: |
| by_band[band].sort(key=lambda item: (item["selection_score"], stable_hash(seed, item["episode_path"]))) |
|
|
| selected: list[dict[str, Any]] = [] |
| used_sessions: set[str] = set() |
| selected_by_band = Counter() |
| for band in SIZE_BANDS: |
| for record in by_band[band]: |
| if selected_by_band[band] >= counts[band]: |
| break |
| if record["top_level_session"] in used_sessions: |
| continue |
| selected.append(record) |
| used_sessions.add(record["top_level_session"]) |
| selected_by_band[band] += 1 |
|
|
| if len(selected) < target: |
| remaining = [ |
| record |
| for band in SIZE_BANDS |
| for record in by_band[band] |
| if record["top_level_session"] not in used_sessions |
| ] |
| remaining.sort(key=lambda item: (item["selection_score"], stable_hash(seed, item["episode_path"]))) |
| for record in remaining: |
| selected.append(record) |
| used_sessions.add(record["top_level_session"]) |
| selected_by_band[record["size_band"]] += 1 |
| if len(selected) >= target: |
| break |
|
|
| if len(selected) < target: |
| raise RuntimeError(f"Only selected {len(selected)} unique-session episodes; target is {target}.") |
| return selected[:target] |
|
|
|
|
| def assign_splits(selected: list[dict[str, Any]], seed: int, train_fraction: float, val_fraction: float, test_fraction: float) -> None: |
| total_fraction = train_fraction + val_fraction + test_fraction |
| if abs(total_fraction - 1.0) > 1e-6: |
| raise ValueError(f"Split fractions must sum to 1.0, got {total_fraction}") |
|
|
| for band in SIZE_BANDS: |
| band_records = [record for record in selected if record["size_band"] == band] |
| band_records.sort(key=lambda item: stable_hash(seed + 101, item["episode_path"])) |
| n = len(band_records) |
| val_n = int(round(n * val_fraction)) |
| test_n = int(round(n * test_fraction)) |
| train_n = n - val_n - test_n |
| for idx, record in enumerate(band_records): |
| if idx < train_n: |
| split = "train" |
| elif idx < train_n + val_n: |
| split = "val" |
| else: |
| split = "test" |
| record["split"] = split |
|
|
|
|
| def md_table(headers: list[str], rows: list[list[Any]]) -> list[str]: |
| lines = [ |
| "| " + " | ".join(headers) + " |", |
| "| " + " | ".join("---" for _ in headers) + " |", |
| ] |
| lines.extend("| " + " | ".join(str(cell) for cell in row) + " |" for row in rows) |
| return lines |
|
|
|
|
| def write_csv(path: Path, rows: list[dict[str, Any]]) -> None: |
| path.parent.mkdir(parents=True, exist_ok=True) |
| fields = [ |
| "selection_rank", |
| "split", |
| "size_band", |
| "episode_path", |
| "top_level_session", |
| "episode_id", |
| "annotation_human", |
| "training_human", |
| "annotation_bytes", |
| "training_bytes_excluding_visualization_rrd", |
| "has_visualization_rrd", |
| "selection_score", |
| ] |
| with path.open("w", newline="", encoding="utf-8") as handle: |
| writer = csv.DictWriter(handle, fieldnames=fields) |
| writer.writeheader() |
| for row in rows: |
| writer.writerow({field: row.get(field) for field in fields}) |
|
|
|
|
| def main() -> int: |
| args = parse_args() |
| token = args.token or getpass.getpass("HF token: ").strip() |
| if not token: |
| raise SystemExit("HF token is required for gated dataset metadata.") |
|
|
| api = HfApi(token=token) |
| info = api.repo_info(args.repo_id, repo_type="dataset", files_metadata=True, token=token) |
| records = build_episode_records(list(info.siblings or [])) |
| complete = [record for record in records if record["has_all_six_videos"]] |
|
|
| annotation_sizes = [record["annotation_bytes"] for record in complete] |
| training_sizes = [record["training_bytes_excluding_visualization_rrd"] for record in complete] |
| q25 = quantile(annotation_sizes, 0.25) |
| q50 = quantile(annotation_sizes, 0.50) |
| q75 = quantile(annotation_sizes, 0.75) |
| min_annotation = max( |
| int(args.min_annotation_gib * (1024**3)), |
| quantile(annotation_sizes, args.drop_bottom_annotation_percentile), |
| ) |
| min_training = quantile(training_sizes, args.drop_bottom_training_percentile) |
|
|
| candidates = [] |
| rejected = Counter() |
| for record in complete: |
| if record["annotation_bytes"] < min_annotation: |
| rejected["annotation_too_small"] += 1 |
| continue |
| if record["training_bytes_excluding_visualization_rrd"] < min_training: |
| rejected["training_too_small"] += 1 |
| continue |
| record = dict(record) |
| record["size_band"] = size_band(record["annotation_bytes"], q25, q50, q75) |
| record["annotation_human"] = human_bytes(record["annotation_bytes"]) |
| record["training_human"] = human_bytes(record["training_bytes_excluding_visualization_rrd"]) |
| candidates.append(record) |
|
|
| selected = select_balanced(candidates, args.target_episodes, args.seed) |
| selected.sort(key=lambda item: (SIZE_BANDS.index(item["size_band"]), item["selection_score"], item["episode_path"])) |
| assign_splits(selected, args.seed, args.train_fraction, args.val_fraction, args.test_fraction) |
| for idx, record in enumerate(selected, start=1): |
| record["selection_rank"] = idx |
|
|
| selected_download_files = [ |
| filename |
| for record in selected |
| for filename in record["download_files"] |
| ] |
| split_counts = Counter(record["split"] for record in selected) |
| band_counts = Counter(record["size_band"] for record in selected) |
| split_band_counts = Counter((record["split"], record["size_band"]) for record in selected) |
| selected_sessions = {record["top_level_session"] for record in selected} |
| train_sessions = {record["top_level_session"] for record in selected if record["split"] == "train"} |
| val_sessions = {record["top_level_session"] for record in selected if record["split"] == "val"} |
| test_sessions = {record["top_level_session"] for record in selected if record["split"] == "test"} |
|
|
| payload = { |
| "status": "pass", |
| "generated_at_utc": datetime.now(timezone.utc).isoformat(timespec="seconds"), |
| "repo_id": args.repo_id, |
| "repo_sha": getattr(info, "sha", None), |
| "selection_type": "metadata_balanced_first_pass", |
| "target_episodes": args.target_episodes, |
| "seed": args.seed, |
| "rules": { |
| "complete_episode_required_files": REQUIRED_FILES, |
| "excluded_training_files": sorted(EXCLUDED_TRAINING_FILES), |
| "one_episode_per_top_level_session": True, |
| "drop_bottom_annotation_percentile": args.drop_bottom_annotation_percentile, |
| "drop_bottom_training_percentile": args.drop_bottom_training_percentile, |
| "min_annotation_bytes": min_annotation, |
| "min_annotation_human": human_bytes(min_annotation), |
| "min_training_bytes": min_training, |
| "min_training_human": human_bytes(min_training), |
| "content_category_status": "not directly visible in HF metadata; refine after annotations are downloaded and captions are parsed", |
| }, |
| "available_complete_episodes": len(complete), |
| "candidate_episodes_after_filters": len(candidates), |
| "rejected_counts": dict(rejected), |
| "annotation_size_summary_complete": summarize_sizes(annotation_sizes), |
| "training_size_summary_complete": summarize_sizes(training_sizes), |
| "selected_summary": { |
| "episode_count": len(selected), |
| "unique_session_count": len(selected_sessions), |
| "split_counts": dict(split_counts), |
| "size_band_counts": dict(band_counts), |
| "split_band_counts": {f"{split}/{band}": count for (split, band), count in split_band_counts.items()}, |
| "estimated_download_bytes_excluding_visualization_rrd": sum(record["training_bytes_excluding_visualization_rrd"] for record in selected), |
| "estimated_download_human_excluding_visualization_rrd": human_bytes(sum(record["training_bytes_excluding_visualization_rrd"] for record in selected)), |
| "estimated_annotation_bytes": sum(record["annotation_bytes"] for record in selected), |
| "estimated_annotation_human": human_bytes(sum(record["annotation_bytes"] for record in selected)), |
| "estimated_windows_at_configured_limit": len(selected) * args.windows_per_episode, |
| "windows_per_episode": args.windows_per_episode, |
| "train_sessions_overlap_val": sorted(train_sessions & val_sessions), |
| "train_sessions_overlap_test": sorted(train_sessions & test_sessions), |
| "val_sessions_overlap_test": sorted(val_sessions & test_sessions), |
| }, |
| "selected_episodes": selected, |
| "download_files": selected_download_files, |
| } |
|
|
| args.output_json.parent.mkdir(parents=True, exist_ok=True) |
| args.output_json.write_text(json.dumps(payload, indent=2) + "\n", encoding="utf-8") |
| write_csv(args.output_csv, selected) |
| args.download_list_output.write_text("\n".join(selected_download_files) + "\n", encoding="utf-8") |
|
|
| summary = payload["selected_summary"] |
| report = [ |
| "# Xperience-10M 128-Episode Metadata-Balanced Selection", |
| "", |
| "This is a download plan, not a trained model result. It uses Hugging Face file metadata only and downloads no raw episode data.", |
| "", |
| "## Why This Selection", |
| "", |
| "- Use only complete episodes: `annotation.hdf5` plus six MP4 streams.", |
| "- Exclude `visualization.rrd` from the training download plan.", |
| "- Avoid tiny annotation outliers that are likely one-segment examples.", |
| "- Use one episode per top-level session to reduce leakage and overfitting to one capture session.", |
| "- Balance across four annotation-size bands as a proxy for duration/content richness before category labels are available.", |
| "- Split by session into train/val/test.", |
| "", |
| "## Selection Summary", |
| "", |
| *md_table( |
| ["Measure", "Value"], |
| [ |
| ["Selected episodes", summary["episode_count"]], |
| ["Unique sessions", summary["unique_session_count"]], |
| ["Split counts", json.dumps(summary["split_counts"], sort_keys=True)], |
| ["Size-band counts", json.dumps(summary["size_band_counts"], sort_keys=True)], |
| ["Estimated training download, no RRD", summary["estimated_download_human_excluding_visualization_rrd"]], |
| ["Estimated annotation bytes", summary["estimated_annotation_human"]], |
| ["Estimated windows at 256/episode", summary["estimated_windows_at_configured_limit"]], |
| ["Session leakage train/val", len(summary["train_sessions_overlap_val"])], |
| ["Session leakage train/test", len(summary["train_sessions_overlap_test"])], |
| ["Session leakage val/test", len(summary["val_sessions_overlap_test"])], |
| ], |
| ), |
| "", |
| "## Filters", |
| "", |
| *md_table( |
| ["Rule", "Value"], |
| [ |
| ["Available complete episodes", len(complete)], |
| ["Candidates after filters", len(candidates)], |
| ["Minimum annotation size", payload["rules"]["min_annotation_human"]], |
| ["Minimum training size", payload["rules"]["min_training_human"]], |
| ["Rejected counts", json.dumps(payload["rejected_counts"], sort_keys=True)], |
| ], |
| ), |
| "", |
| "## Split x Size Band", |
| "", |
| *md_table( |
| ["Split", *SIZE_BANDS], |
| [ |
| [split, *[split_band_counts.get((split, band), 0) for band in SIZE_BANDS]] |
| for split in ["train", "val", "test"] |
| ], |
| ), |
| "", |
| "## Important Limitation", |
| "", |
| "HF metadata does not expose semantic content categories. This selection is the best first-pass balance before downloading. After the selected annotations are staged, parse `Main Task`, `Sub Task`, `Current Action`, objects, and interaction text; then swap episodes if one content cluster dominates.", |
| "", |
| "## Output Files", |
| "", |
| f"- JSON: `{args.output_json}`", |
| f"- CSV: `{args.output_csv}`", |
| f"- Download file list: `{args.download_list_output}`", |
| ] |
| args.report_output.write_text("\n".join(report) + "\n", encoding="utf-8") |
|
|
| print(json.dumps(payload["selected_summary"], indent=2)) |
| print(f"PASS: wrote {args.output_json}") |
| print(f"PASS: wrote {args.output_csv}") |
| print(f"PASS: wrote {args.download_list_output}") |
| print(f"PASS: wrote {args.report_output}") |
| return 0 |
|
|
|
|
| if __name__ == "__main__": |
| raise SystemExit(main()) |
|
|