ropedia-xperience-10m-task-baselines / scripts /omni /build_episode_manifest.py
cy0307's picture
Publish Ropedia Xperience-10M task baseline cards
cfd29be verified
Raw
History Blame
10.6 kB
#!/usr/bin/env python3
"""Build a lightweight manifest for local Ropedia/Xperience episode folders.
The manifest is intentionally metadata-only. It lets us decide how many
episodes fit on target storage before downloading or copying large media.
"""
from __future__ import annotations
import argparse
import json
import random
import sys
from collections import Counter
from pathlib import Path
import h5py
VIDEO_NAMES = [
"fisheye_cam0.mp4",
"fisheye_cam1.mp4",
"fisheye_cam2.mp4",
"fisheye_cam3.mp4",
"stereo_left.mp4",
"stereo_right.mp4",
]
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(description="Scan Ropedia/Xperience episode folders.")
workspace_default = Path(__file__).resolve().parents[2]
parser.add_argument("--workspace", type=Path, default=workspace_default)
parser.add_argument(
"--data-root",
type=Path,
action="append",
required=True,
help="Root to scan. May be passed multiple times.",
)
parser.add_argument(
"--output",
type=Path,
default=Path("outputs/omni_exploration/episode_manifest.json"),
)
parser.add_argument("--max-episodes", type=int, default=0, help="0 means no cap.")
parser.add_argument("--window-frames", type=int, default=20)
parser.add_argument("--stride-frames", type=int, default=20)
parser.add_argument("--min-label-fraction", type=float, default=0.6)
parser.add_argument("--train-fraction", type=float, default=0.80)
parser.add_argument("--val-fraction", type=float, default=0.10)
parser.add_argument("--test-fraction", type=float, default=0.10)
parser.add_argument("--split-seed", type=int, default=7)
return parser.parse_args()
def add_toolkit_to_path(workspace: Path) -> None:
toolkit = workspace / "HOMIE-toolkit"
if not toolkit.exists():
raise FileNotFoundError(f"HOMIE-toolkit not found: {toolkit}")
if str(toolkit) not in sys.path:
sys.path.insert(0, str(toolkit))
def size_or_zero(path: Path) -> int:
try:
return path.stat().st_size
except FileNotFoundError:
return 0
def decode_frame_name(value) -> str:
raw = value
if hasattr(raw, "tobytes"):
raw = raw.tobytes()
if isinstance(raw, bytes):
return raw.decode("utf-8", errors="replace").strip("\x00")
return str(raw)
def infer_frame_names(annotation: Path) -> list[str]:
with h5py.File(annotation, "r") as f:
if "slam/frame_names" in f:
ds = f["slam/frame_names"]
return [decode_frame_name(ds[i]) for i in range(ds.shape[0])]
for key in ("hand_mocap/left_joints_3d", "depth/depth", "full_body_mocap/keypoints"):
if key in f:
return [f"frame_{idx:06d}.jpg" for idx in range(f[key].shape[0])]
return []
def hdf5_presence(annotation: Path) -> dict:
checks = {
"calibration": "calibration",
"slam_pose": "slam/quat_wxyz",
"slam_point_cloud": "slam/point_cloud",
"depth": "depth/depth",
"depth_confidence": "depth/confidence",
"hand_mocap": "hand_mocap/left_joints_3d",
"body_mocap": "full_body_mocap/keypoints",
"contacts": "full_body_mocap/contacts",
"imu": "imu/accel_xyz",
"caption": "caption",
"captions": "captions",
}
with h5py.File(annotation, "r") as f:
return {name: key in f for name, key in checks.items()}
def frame_label(info: dict, target: str) -> str:
key = "theme" if target == "subtask" else "action_label"
label = str(info.get(key, "")).strip()
if not label or label.upper() == "N/A":
return ""
return label
def majority_label(labels: list[str], min_fraction: float) -> tuple[str, float]:
labels = [label for label in labels if label]
if not labels:
return "", 0.0
label, count = Counter(labels).most_common(1)[0]
fraction = count / len(labels)
if fraction < min_fraction:
return "", fraction
return label, fraction
def label_metadata(annotation: Path, frame_names: list[str], args: argparse.Namespace) -> dict:
from utils.caption_utils import load_caption_data_from_annotation_hdf5
main_task, frame_info, segment_boundaries, _task_to_id = load_caption_data_from_annotation_hdf5(
annotation,
str(annotation.parent),
frame_names,
)
if frame_info is None:
return {
"main_task": "",
"segments": 0,
"frame_labels": {"action": {}, "subtask": {}},
"window_labels": {"action": {}, "subtask": {}},
"num_labeled_windows": {"action": 0, "subtask": 0},
}
frame_counts = {"action": Counter(), "subtask": Counter()}
for idx in range(len(frame_names)):
info = frame_info.get(idx, {})
for target in frame_counts:
label = frame_label(info, target)
if label:
frame_counts[target][label] += 1
window_counts = {"action": Counter(), "subtask": Counter()}
for target in window_counts:
for start in range(0, len(frame_names) - args.window_frames + 1, args.stride_frames):
end = start + args.window_frames
labels = [frame_label(frame_info.get(i, {}), target) for i in range(start, end)]
label, _frac = majority_label(labels, args.min_label_fraction)
if label:
window_counts[target][label] += 1
return {
"main_task": main_task,
"segments": len(segment_boundaries),
"frame_labels": {target: dict(counts.most_common()) for target, counts in frame_counts.items()},
"window_labels": {target: dict(counts.most_common()) for target, counts in window_counts.items()},
"num_labeled_windows": {target: int(sum(counts.values())) for target, counts in window_counts.items()},
}
def assign_splits(episodes: list[dict], args: argparse.Namespace) -> None:
if not episodes:
return
total = args.train_fraction + args.val_fraction + args.test_fraction
if total <= 0:
raise ValueError("Split fractions must sum to a positive value.")
train_fraction = args.train_fraction / total
val_fraction = args.val_fraction / total
order = list(range(len(episodes)))
rng = random.Random(args.split_seed)
rng.shuffle(order)
n = len(order)
n_train = int(round(n * train_fraction))
n_val = int(round(n * val_fraction))
if n >= 3:
n_train = max(1, min(n_train, n - 2))
n_val = max(1, min(n_val, n - n_train - 1))
elif n == 2:
n_train, n_val = 1, 0
else:
n_train, n_val = 1, 0
split_by_idx = {}
for pos, idx in enumerate(order):
if pos < n_train:
split = "train"
elif pos < n_train + n_val:
split = "val"
else:
split = "test"
split_by_idx[idx] = split
for idx, episode in enumerate(episodes):
episode["split"] = split_by_idx[idx]
def inspect_episode(annotation: Path, args: argparse.Namespace) -> dict:
episode_dir = annotation.parent
files = [{"name": "annotation.hdf5", "bytes": size_or_zero(annotation), "exists": annotation.exists()}]
for name in VIDEO_NAMES:
path = episode_dir / name
files.append({"name": name, "bytes": size_or_zero(path), "exists": path.exists()})
rrd = episode_dir / "visualization.rrd"
files.append({"name": "visualization.rrd", "bytes": size_or_zero(rrd), "exists": rrd.exists()})
total_bytes = sum(item["bytes"] for item in files)
train_bytes = sum(item["bytes"] for item in files if item["name"] != "visualization.rrd")
frame_names = infer_frame_names(annotation)
hdf5_modalities = hdf5_presence(annotation)
labels = label_metadata(annotation, frame_names, args)
videos = [
{
"name": name,
"path": str(episode_dir / name),
"bytes": size_or_zero(episode_dir / name),
"exists": (episode_dir / name).exists(),
}
for name in VIDEO_NAMES
]
return {
"episode_id": episode_dir.name,
"path": str(episode_dir),
"annotation": str(annotation),
"frame_count": len(frame_names),
"main_task": labels["main_task"],
"files": files,
"videos": videos,
"hdf5_modalities": hdf5_modalities,
"label_stats": labels,
"total_bytes": total_bytes,
"train_minimal_bytes": train_bytes,
"has_annotation": annotation.exists(),
"has_any_video": any((episode_dir / name).exists() for name in VIDEO_NAMES),
"has_all_videos": all((episode_dir / name).exists() for name in VIDEO_NAMES),
"has_rrd": rrd.exists(),
}
def main() -> int:
args = parse_args()
args.workspace = args.workspace.expanduser().resolve()
add_toolkit_to_path(args.workspace)
annotations: list[Path] = []
for root in args.data_root:
annotations.extend(sorted(root.expanduser().resolve().rglob("annotation.hdf5")))
if args.max_episodes > 0:
annotations = annotations[: args.max_episodes]
episodes = [inspect_episode(path, args) for path in annotations]
assign_splits(episodes, args)
split_counts = Counter(ep["split"] for ep in episodes)
summary = {
"num_episodes": len(episodes),
"total_bytes": sum(ep["total_bytes"] for ep in episodes),
"train_minimal_bytes": sum(ep["train_minimal_bytes"] for ep in episodes),
"split_counts": dict(split_counts),
"split_fractions": {
"train": args.train_fraction,
"val": args.val_fraction,
"test": args.test_fraction,
"seed": args.split_seed,
},
"windowing": {
"window_frames": args.window_frames,
"stride_frames": args.stride_frames,
"min_label_fraction": args.min_label_fraction,
},
"notes": [
"train_minimal_bytes excludes visualization.rrd because model training does not need it.",
"This file is metadata-only; it does not copy or download raw data.",
"Splits are assigned by whole episode to avoid window leakage.",
],
}
payload = {"summary": summary, "episodes": episodes}
args.output.parent.mkdir(parents=True, exist_ok=True)
args.output.write_text(json.dumps(payload, indent=2), encoding="utf-8")
print(json.dumps(summary, indent=2))
print(f"Wrote {args.output}")
return 0
if __name__ == "__main__":
raise SystemExit(main())