#!/usr/bin/env python3 """Discover available Xperience-10M episodes and generate a readiness gate report.""" from __future__ import annotations import argparse import json from dataclasses import dataclass from pathlib import Path from typing import Callable VIDEO_FILES = [ "annotation.hdf5", "fisheye_cam0.mp4", "fisheye_cam1.mp4", "fisheye_cam2.mp4", "fisheye_cam3.mp4", "stereo_left.mp4", "stereo_right.mp4", ] @dataclass class EpisodeRecord: source: str episode_id: str episode_path: str has_annotation: bool has_fisheye_cam0: bool has_all_videos: bool has_any_video: bool missing_views: list[str] @property def is_degraded_valid(self) -> bool: return self.has_annotation and self.has_fisheye_cam0 @property def is_complete(self) -> bool: return self.is_degraded_valid and self.has_all_videos def as_dict(self) -> dict: return { "source": self.source, "episode_id": self.episode_id, "episode_path": self.episode_path, "has_annotation": self.has_annotation, "has_fisheye_cam0": self.has_fisheye_cam0, "has_all_videos": self.has_all_videos, "has_any_video": self.has_any_video, "missing_views": self.missing_views, "is_degraded_valid": self.is_degraded_valid, "is_complete": self.is_complete, } def parse_args() -> argparse.Namespace: workspace_default = Path(__file__).resolve().parents[2] parser = argparse.ArgumentParser(description="Discover Xperience-10M episode availability.") parser.add_argument("--workspace", type=Path, default=workspace_default) parser.add_argument("--data-root", type=Path, default=Path("modelscope_data")) parser.add_argument("--output", type=Path, default=Path("results/omni_finetune/source_discovery.json")) parser.add_argument("--report-output", type=Path, default=Path("results/omni_finetune/DATA_BLOCKER_REPORT.md")) parser.add_argument("--target-episodes", type=int, default=32) parser.add_argument( "--modelscope-repo-id", action="append", default=["ropedia-ai/xperience-10m", "ropedia-ai/xperience-10m-sample"], help="ModelScope dataset repo ids to probe.", ) parser.add_argument( "--hf-repo-id", action="append", default=["ropedia-ai/xperience-10m", "ropedia-ai/xperience-10m-sample"], help="Hugging Face dataset repo ids to probe.", ) parser.add_argument("--skip-modelscope", action="store_true") parser.add_argument("--skip-huggingface", action="store_true") return parser.parse_args() def _coerce_files(payload) -> list[str]: if payload is None: return [] if isinstance(payload, dict): for key in ("data", "files", "FilePaths", "File"): if key in payload and isinstance(payload[key], list): payload = payload[key] break if not isinstance(payload, list): payload = [payload] output = [] for item in payload: if isinstance(item, str): output.append(item) continue if isinstance(item, dict): for key in ("path", "rfilename", "name", "Path", "uri"): value = item.get(key) if isinstance(value, str) and value: output.append(value) break return [i for i in output if i] def _call_provider_api(callers: list[Callable[[], object]]) -> tuple[list[str], list[str]]: errors: list[str] = [] for idx, fn in enumerate(callers): try: payload = fn() files = _coerce_files(payload) except Exception as exc: errors.append(f"call {idx} failed: {exc}") continue if files: return files, [] return [], errors def scan_local_episodes(data_root: Path) -> list[EpisodeRecord]: if not data_root.exists(): return [] out: dict[str, EpisodeRecord] = {} for annotation in sorted(data_root.rglob("annotation.hdf5")): episode_dir = annotation.parent present = {name: (episode_dir / name).exists() for name in VIDEO_FILES} missing = [name for name in VIDEO_FILES[1:] if not present[name]] out[str(episode_dir)] = EpisodeRecord( source="local", episode_id=episode_dir.name, episode_path=str(episode_dir), has_annotation=present["annotation.hdf5"], has_fisheye_cam0=present["fisheye_cam0.mp4"], has_all_videos=all(present[name] for name in VIDEO_FILES[1:]), has_any_video=any(present[name] for name in VIDEO_FILES[1:]), missing_views=missing, ) return sorted(out.values(), key=lambda ep: ep.episode_id) def collect_remote_records(source: str, repo_id: str, files: list[str]) -> list[EpisodeRecord]: grouped: dict[str, dict[str, bool]] = {} for raw_path in files: norm = str(raw_path).replace("\\", "/").strip("/") if not norm: continue name = Path(norm).name if name not in VIDEO_FILES: continue parent = Path(norm).parent.as_posix() if not parent: episode_key = Path(repo_id).name episode_path = f"{source}:{repo_id}" bucket_key = f"{source}:{repo_id}:." else: episode_key = Path(parent).name episode_path = f"{source}:{repo_id}/{parent}" bucket_key = f"{source}:{repo_id}:{parent}" bucket = grouped.setdefault( bucket_key, { "episode_id": episode_key, "episode_path": episode_path, "annotation.hdf5": False, "fisheye_cam0.mp4": False, "fisheye_cam1.mp4": False, "fisheye_cam2.mp4": False, "fisheye_cam3.mp4": False, "stereo_left.mp4": False, "stereo_right.mp4": False, }, ) bucket[name] = True episodes = [] for bucket in grouped.values(): episodes.append( EpisodeRecord( source=source, episode_id=bucket["episode_id"], episode_path=bucket["episode_path"], has_annotation=bucket["annotation.hdf5"], has_fisheye_cam0=bucket["fisheye_cam0.mp4"], has_all_videos=all(bucket[n] for n in VIDEO_FILES[1:]), has_any_video=any(bucket[n] for n in VIDEO_FILES[1:]), missing_views=[name for name in VIDEO_FILES[1:] if not bucket[name]], ) ) return sorted(episodes, key=lambda ep: ep.episode_id) def summarize_episodes(episodes: list[EpisodeRecord], errors: list[str], name: str) -> dict: return { "source": name, "num_episodes": len(episodes), "num_degraded_valid_episodes": sum(ep.is_degraded_valid for ep in episodes), "num_complete_episodes": sum(ep.is_complete for ep in episodes), "errors": errors, "episodes": [ep.as_dict() for ep in episodes], } def build_modelscope_records(repo_id: str) -> tuple[list[EpisodeRecord], list[str]]: try: from modelscope.hub.api import HubApi except Exception as exc: return [], [f"modelscope import failed: {exc}"] try: api = HubApi() except Exception as exc: return [], [f"modelscope HubApi init failed: {exc}"] callers = [ lambda: api.get_dataset_files(repo_id), lambda: api.get_dataset_files(repo_id=repo_id), lambda: api.get_dataset_files(repo_id=repo_id, revision="master"), lambda: api.list_repo_files(repo_id=repo_id, repo_type="dataset"), lambda: api.get_repo_files(repo_id, repo_type="dataset"), ] files, errs = _call_provider_api(callers) if not files: return [], errs or ["modelscope returned no files"] return collect_remote_records("modelscope", repo_id, files), [] def build_huggingface_records(repo_id: str) -> tuple[list[EpisodeRecord], list[str]]: try: from huggingface_hub import HfApi except Exception as exc: return [], [f"huggingface_hub import failed: {exc}"] api = HfApi() try: files = api.list_repo_files(repo_id=repo_id, repo_type="dataset") except Exception as exc: return [], [f"huggingface list_repo_files failed: {exc}"] records = _coerce_files(files) if not records: return [], ["huggingface returned no files"] return collect_remote_records("huggingface", repo_id, records), [] def pick_source(local: dict, modelscope: dict, huggingface: dict, target: int) -> tuple[str, list[str]]: if local["num_degraded_valid_episodes"] >= target: return "local", [] if modelscope["num_degraded_valid_episodes"] >= target: return "modelscope", [] if huggingface["num_degraded_valid_episodes"] >= target: return "huggingface", [] data_status_items = [ f"Not enough degraded-valid episodes for a 32-episode pilot. Need {target}, local has {local['num_degraded_valid_episodes']}.", "Current local data supports one-episode training-stack validation only.", ] if local["num_episodes"] == 0: data_status_items.append(f"No local annotation.hdf5 found under {local.get('data_root', 'configured data root')}") if not modelscope["episodes"]: data_status_items.append("ModelScope probe unavailable or reported no matching episode files.") if not huggingface["episodes"]: data_status_items.append("Hugging Face probe unavailable or reported no matching episode files.") return "none", data_status_items def write_blocker_report(payload: dict, path: Path) -> None: lines = [ "# Xperience-10M Fine-Tune Readiness", "", f"Target episodes: {payload['target_episodes']}", f"Ready for 32-episode pilot: {payload['ready_for_32_episode_pilot']}", f"Selected source: {payload['selected_source']}", "", "## Source counts", f"- local (degraded-valid): {payload['local']['num_degraded_valid_episodes']} / {payload['local']['num_episodes']}", f"- modelscope (degraded-valid): {payload['modelscope']['num_degraded_valid_episodes']} / {payload['modelscope']['num_episodes']}", f"- huggingface (degraded-valid): {payload['huggingface']['num_degraded_valid_episodes']} / {payload['huggingface']['num_episodes']}", "", "## Current data status", ] if payload["data_status_items"]: lines.extend([f"- {item}" for item in payload["data_status_items"]]) else: lines.append("- none") lines.extend( [ "", "## Interpretation", "- Degraded-valid means: annotation.hdf5 and fisheye_cam0.mp4 both exist.", "- Complete means all six MP4 views are present with annotation.", "- A 32-episode pilot moves to full execution only after this script selects a source with 32+ degraded-valid episodes.", ] ) path.write_text("\n".join(lines) + "\n", encoding="utf-8") def main() -> int: args = parse_args() workspace = args.workspace.expanduser().resolve() data_root = args.data_root.expanduser().resolve() local_episodes = scan_local_episodes(data_root) local_summary = summarize_episodes(local_episodes, [], "local") local_summary["data_root"] = str(data_root) modelscope_episodes = [] modelscope_errors: list[str] = [] if not args.skip_modelscope: for repo in args.modelscope_repo_id: ep, errs = build_modelscope_records(repo) modelscope_episodes.extend(ep) modelscope_errors.extend([f"{repo}: {x}" for x in errs]) modelscope_summary = summarize_episodes(modelscope_episodes, modelscope_errors, "modelscope") hf_episodes = [] hf_errors: list[str] = [] if not args.skip_huggingface: for repo in args.hf_repo_id: ep, errs = build_huggingface_records(repo) hf_episodes.extend(ep) hf_errors.extend([f"{repo}: {x}" for x in errs]) huggingface_summary = summarize_episodes(hf_episodes, hf_errors, "huggingface") selected, data_status_items = pick_source(local_summary, modelscope_summary, huggingface_summary, args.target_episodes) ready = selected != "none" payload = { "target_episodes": args.target_episodes, "workspace": str(workspace), "data_root": str(data_root), "ready_for_32_episode_pilot": ready, "selected_source": selected, "local": local_summary, "modelscope": modelscope_summary, "huggingface": huggingface_summary, "data_status_items": data_status_items, } args.output.parent.mkdir(parents=True, exist_ok=True) args.output.write_text(json.dumps(payload, indent=2), encoding="utf-8") args.report_output.parent.mkdir(parents=True, exist_ok=True) write_blocker_report(payload, args.report_output) print(json.dumps(payload, indent=2)) return 0 if __name__ == "__main__": raise SystemExit(main())