#!/usr/bin/env python3 """Export Xperience-10M windows as Qwen3-Omni JSON-QA fine-tuning records.""" from __future__ import annotations import argparse import json import shutil import subprocess from collections import Counter, defaultdict from pathlib import Path import cv2 import numpy as np from qwen3_omni_dataset_utils import ( build_messages, episode_dirs_from_sources, existing_videos, label_counts, primary_video_path, split_for_episode, write_jsonl, ) def parse_args() -> argparse.Namespace: workspace_default = Path(__file__).resolve().parents[2] parser = argparse.ArgumentParser(description="Export Qwen3-Omni JSON-QA SFT windows.") parser.add_argument("--workspace", type=Path, default=workspace_default) parser.add_argument("--episode-root", type=Path, action="append") parser.add_argument("--manifest", type=Path) parser.add_argument("--split", choices=["all", "train", "val", "test"], default="all") parser.add_argument("--run-id", default="xperience10m_omni_dataset") parser.add_argument("--output-dir", type=Path) parser.add_argument("--cache-dir", type=Path, default=workspace_default / "outputs/omni_exploration/feature_cache") parser.add_argument("--window-frames", type=int, default=20) parser.add_argument("--stride-frames", type=int, default=20) parser.add_argument("--qwen-context-frames", type=int, default=120) parser.add_argument("--max-video-frames", type=int, default=32) parser.add_argument("--min-label-fraction", type=float, default=0.6) parser.add_argument("--max-windows-per-episode", type=int, default=256) parser.add_argument("--video-image-size", type=int, default=32) parser.add_argument("--video-grid-size", type=int, default=8) parser.add_argument("--video-hist-bins", type=int, default=8) parser.add_argument("--depth-grid-size", type=int, default=8) parser.add_argument("--text-hash-dim", type=int, default=128) parser.add_argument("--audio-source", default="fisheye_cam0") parser.add_argument("--audio-sample-rate", type=int, default=16000) parser.add_argument("--audio-band-count", type=int, default=16) parser.add_argument("--mosaic-tile-width", type=int, default=320) parser.add_argument("--mosaic-tile-height", type=int, default=240) parser.add_argument("--mosaic-fps", type=float, default=8.0) parser.add_argument("--force-rebuild-cache", action="store_true") parser.add_argument( "--allow-empty", action="store_true", help="Write an empty dataset manifest instead of failing when every selected episode is skipped.", ) parser.add_argument( "--with-handcrafted-video-features", action="store_true", help="Also decode MP4s into handcrafted sensor features. The default skips this because Qwen consumes rendered mosaic video directly.", ) parser.add_argument("--render-media", action=argparse.BooleanOptionalAction, default=True) return parser.parse_args() def add_repo_imports(args: argparse.Namespace) -> None: from qwen3_omni_adapter_smoke import add_repo_imports, add_toolkit_to_path add_repo_imports(args.workspace) add_toolkit_to_path(args.workspace) def load_episode_dataset(args: argparse.Namespace, episode_dir: Path, target: str): from qwen3_omni_adapter_smoke import load_episode local_args = argparse.Namespace(**vars(args)) local_args.target = target local_args.include_label_text = False local_args.max_windows_per_episode = 0 local_args.skip_video_features = not args.with_handcrafted_video_features local_args.cache_dir = args.cache_dir / target local_args.output_dir = args.output_dir local_args.base_model_id = "Qwen/Qwen3-Omni-30B-A3B-Instruct" return load_episode(local_args, episode_dir) def load_annotation(args: argparse.Namespace, episode_dir: Path) -> dict: from data_loader import load_from_annotation_hdf5 return load_from_annotation_hdf5(episode_dir / "annotation.hdf5", 0, None, load_slam_point_cloud=True) def frame_label(info: dict, target: str) -> str: key = "theme" if target == "subtask" else "action_label" label = str(info.get(key, "")).strip() return "" if not label or label.upper() == "N/A" else label def majority_label(labels: list[str], min_fraction: float) -> str: labels = [label for label in labels if label] if not labels: return "unknown" label, count = Counter(labels).most_common(1)[0] return label if count / len(labels) >= min_fraction else "unknown" def is_skippable_episode_error(exc: ValueError) -> bool: message = str(exc) skippable_markers = ( "No labeled windows were created", "No caption_frame_info_map found in annotation", ) return any(marker in message for marker in skippable_markers) def collect_objects(frame_info: dict, start: int, end: int) -> list[str]: counts = Counter() for idx in range(start, end): objects = frame_info.get(idx, {}).get("objects", []) if isinstance(objects, str): objects = [objects] for obj in objects or []: value = str(obj).strip() if value: counts[value] += 1 return [name for name, _count in counts.most_common()] def contact_label(ann: dict, start: int, end: int) -> str: contacts = ann.get("contacts") if contacts is None or start >= len(contacts): return "unknown" window = np.asarray(contacts[start:min(end, len(contacts))]) return "yes" if np.any(window > 0) else "no" def transition_label(frame_info: dict, start: int, end: int) -> str: labels = [frame_label(frame_info.get(idx, {}), "action") for idx in range(start, end)] labels = [label for label in labels if label] return "yes" if len(set(labels)) > 1 else "no" def context_span(start: int, end: int, n_frames: int, context_frames: int) -> tuple[int, int]: center = (start + end) // 2 half = context_frames // 2 ctx_start = max(0, center - half) ctx_end = min(n_frames - 1, ctx_start + context_frames - 1) ctx_start = max(0, ctx_end - context_frames + 1) return ctx_start, ctx_end def video_fps(path: Path | None) -> float: if path is None or not path.exists(): return 30.0 cap = cv2.VideoCapture(str(path)) fps = cap.get(cv2.CAP_PROP_FPS) if cap.isOpened() else 0.0 cap.release() return fps if fps and fps > 1e-3 else 30.0 def render_mosaic(video_paths: list[dict], output_path: Path, ctx_start: int, ctx_end: int, args: argparse.Namespace) -> bool: output_path.parent.mkdir(parents=True, exist_ok=True) captures = [] for item in video_paths: cap = cv2.VideoCapture(item["path"]) captures.append(cap if cap.isOpened() else None) if not any(captures): for cap in captures: if cap is not None: cap.release() return False tile_w, tile_h = args.mosaic_tile_width, args.mosaic_tile_height writer = cv2.VideoWriter( str(output_path), cv2.VideoWriter_fourcc(*"mp4v"), args.mosaic_fps, (tile_w * 3, tile_h * 2), ) frame_indices = np.linspace(ctx_start, ctx_end, min(args.max_video_frames, ctx_end - ctx_start + 1), dtype=np.int64) black = np.zeros((tile_h, tile_w, 3), dtype=np.uint8) for frame_idx in frame_indices: tiles = [] for cap in captures: if cap is None: tiles.append(black.copy()) continue cap.set(cv2.CAP_PROP_POS_FRAMES, int(frame_idx)) ok, frame = cap.read() if not ok: tiles.append(black.copy()) else: tiles.append(cv2.resize(frame, (tile_w, tile_h), interpolation=cv2.INTER_AREA)) while len(tiles) < 6: tiles.append(black.copy()) mosaic = np.vstack([np.hstack(tiles[:3]), np.hstack(tiles[3:6])]) writer.write(mosaic) writer.release() for cap in captures: if cap is not None: cap.release() return output_path.exists() def extract_audio(video_path: Path | None, output_path: Path, start_sec: float, duration_sec: float) -> bool: if video_path is None or not video_path.exists(): return False ffmpeg = shutil.which("ffmpeg") if ffmpeg is None: try: import imageio_ffmpeg ffmpeg = imageio_ffmpeg.get_ffmpeg_exe() except Exception: return False output_path.parent.mkdir(parents=True, exist_ok=True) cmd = [ ffmpeg, "-y", "-v", "error", "-ss", f"{start_sec:.3f}", "-t", f"{duration_sec:.3f}", "-i", str(video_path), "-vn", "-ac", "1", "-ar", "16000", str(output_path), ] try: subprocess.run(cmd, check=True) except (FileNotFoundError, subprocess.CalledProcessError): return False return output_path.exists() def stratified_cap(indices: list[int], labels: list[str], max_count: int) -> list[int]: if max_count <= 0 or len(indices) <= max_count: return indices by_label = defaultdict(list) for idx in indices: by_label[labels[idx]].append(idx) selected = [] while len(selected) < max_count and any(by_label.values()): for label in sorted(by_label): if by_label[label] and len(selected) < max_count: selected.append(by_label[label].pop(0)) return sorted(selected) def build_answer(ann: dict, start: int, end: int, min_fraction: float) -> dict: frame_info = ann.get("caption_frame_info_map") or {} action = majority_label([frame_label(frame_info.get(idx, {}), "action") for idx in range(start, end)], min_fraction) subtask = majority_label([frame_label(frame_info.get(idx, {}), "subtask") for idx in range(start, end)], min_fraction) next_start = end next_end = min(end + (end - start), len(ann["img_names"])) next_action = "unknown" if next_start < next_end: next_action = majority_label([frame_label(frame_info.get(idx, {}), "action") for idx in range(next_start, next_end)], min_fraction) return { "action": action, "subtask": subtask, "objects": collect_objects(frame_info, start, end), "contact": contact_label(ann, start, end), "transition": transition_label(frame_info, start, end), "next_action": next_action, "evidence_window": {"start_frame": int(start), "end_frame": int(end - 1)}, } def export_episode(args: argparse.Namespace, episode_dir: Path, records: list[dict], summaries: dict) -> None: ann = load_annotation(args, episode_dir) action_ep = load_episode_dataset(args, episode_dir, "action") episode_key = f"{episode_dir.parent.name}__{episode_dir.name}" answers = [ build_answer(ann, int(start), int(end) + 1, args.min_label_fraction) for start, end in zip(action_ep.starts, action_ep.ends) ] videos = existing_videos(episode_dir) primary = primary_video_path(videos) primary_path = Path(primary) if primary else None fps = video_fps(primary_path) feature_dir = args.output_dir / "sensor_features" media_dir = args.output_dir / "media" / episode_key feature_path = feature_dir / f"{episode_key}_sensor_features.npz" feature_dir.mkdir(parents=True, exist_ok=True) np.savez_compressed( feature_path, features=action_ep.X.astype(np.float32), action_labels=np.asarray([answer["action"] for answer in answers], dtype=object), subtask_labels=np.asarray([answer["subtask"] for answer in answers], dtype=object), starts=action_ep.starts, ends=action_ep.ends, ) labels = [str(answer["action"]) for answer in answers] keep = stratified_cap(list(range(len(action_ep.labels))), labels, args.max_windows_per_episode) split = split_for_episode(episode_key, args.manifest, episode_dir) n_frames = len(ann["img_names"]) for idx in keep: start = int(action_ep.starts[idx]) end_inclusive = int(action_ep.ends[idx]) end_exclusive = end_inclusive + 1 ctx_start, ctx_end = context_span(start, end_inclusive, n_frames, args.qwen_context_frames) media = { "video_paths": videos, "context_start_frame": ctx_start, "context_end_frame": ctx_end, "max_video_frames": args.max_video_frames, "mosaic_video_path": None, "audio_path": None, } if args.render_media: stem = f"{episode_key}_w{idx:05d}_ctx{ctx_start}_{ctx_end}" mosaic_path = media_dir / f"{stem}_mosaic.mp4" audio_path = media_dir / f"{stem}_audio.wav" if render_mosaic(videos, mosaic_path, ctx_start, ctx_end, args): media["mosaic_video_path"] = str(mosaic_path) start_sec = ctx_start / fps duration_sec = max((ctx_end - ctx_start + 1) / fps, 0.01) if extract_audio(primary_path, audio_path, start_sec, duration_sec): media["audio_path"] = str(audio_path) answer = answers[idx] record = { "id": f"{episode_key}:qa:{idx}", "episode_id": episode_key, "source_episode_id": action_ep.episode_id, "episode_path": str(episode_dir), "split": split, "target": "episode_qa", "prompt_type": "json_episode_understanding", "center_window": {"start_frame": start, "end_frame": end_inclusive, "num_frames": args.window_frames}, "media": media, "sensor_feature_path": str(feature_path), "sensor_feature_index": int(idx), "sensor_feature_dim": int(action_ep.X.shape[1]), "question": "Given the synchronized egocentric video/audio context and sensor window, identify the current embodied episode state.", "answer_json": answer, "label": answer["action"], } records.append(record) summaries["feature_manifest"] = action_ep.feature_manifest summaries["available_modalities"].append({"episode_id": episode_key, "modalities": action_ep.available_modalities}) def main() -> int: args = parse_args() args.workspace = args.workspace.expanduser().resolve() if args.output_dir is None: args.output_dir = args.workspace / "results" / "omni_finetune" / args.run_id args.output_dir.mkdir(parents=True, exist_ok=True) add_repo_imports(args) episode_dirs = episode_dirs_from_sources(args.episode_root, args.manifest, args.split) if not episode_dirs: default_episode = args.workspace / "data/sample/xperience-10m-sample" if default_episode.exists(): episode_dirs = [default_episode.resolve()] else: raise ValueError("No episode directories found. Pass --episode-root or --manifest.") records: list[dict] = [] summaries = {"feature_manifest": [], "available_modalities": [], "skipped_episodes": []} for episode_dir in episode_dirs: try: export_episode(args, episode_dir, records, summaries) except ValueError as exc: if not is_skippable_episode_error(exc): raise summaries["skipped_episodes"].append({ "episode_path": str(episode_dir), "reason": str(exc), }) if not records and not args.allow_empty: raise ValueError("No dataset records were exported from the selected episodes.") action_options = sorted({record["answer_json"]["action"] for record in records if record["answer_json"]["action"] != "unknown"}) subtask_options = sorted({record["answer_json"]["subtask"] for record in records if record["answer_json"]["subtask"] != "unknown"}) for record in records: record["action_options"] = action_options record["subtask_options"] = subtask_options record["label_options"] = action_options record["messages"] = build_messages(record, action_options, include_answer=True) dataset_path = args.output_dir / "dataset.jsonl" write_jsonl(dataset_path, records) dataset_manifest = { "run_id": args.run_id, "dataset_path": str(dataset_path), "num_samples": len(records), "num_episodes": len({record["episode_id"] for record in records}), "split_counts": dict(Counter(record["split"] for record in records)), "label_counts": label_counts(records), "action_options": action_options, "subtask_options": subtask_options, "clip_policy": { "label_window_frames": args.window_frames, "qwen_context_frames": args.qwen_context_frames, "max_video_frames": args.max_video_frames, "audio_span": "same_as_video_context", "mosaic": "2x3 multi-camera grid", "allow_empty": args.allow_empty, }, "feature_manifest": summaries["feature_manifest"], "available_modalities": summaries["available_modalities"], "skipped_episodes": summaries["skipped_episodes"], "notes": [ "Assistant answers are strict JSON for episode understanding, not robot-control policies.", "Sensor features are stored as NPZ pointers; raw annotation.hdf5 is not copied into the dataset records.", "Episodes with no labeled windows under the configured label rule are skipped and reported.", ], } (args.output_dir / "dataset_manifest.json").write_text(json.dumps(dataset_manifest, indent=2), encoding="utf-8") (args.output_dir / "config.yaml").write_text( "\n".join([ f"run_id: {args.run_id}", "objective: episode_understanding_json_qa", "backbone: Qwen/Qwen3-Omni-30B-A3B-Instruct", f"max_windows_per_episode: {args.max_windows_per_episode}", f"qwen_context_frames: {args.qwen_context_frames}", f"max_video_frames: {args.max_video_frames}", f"render_media: {str(args.render_media).lower()}", f"with_handcrafted_video_features: {str(args.with_handcrafted_video_features).lower()}", ]) + "\n", encoding="utf-8", ) print(json.dumps(dataset_manifest, indent=2)) return 0 if __name__ == "__main__": raise SystemExit(main())