#!/usr/bin/env python3 """Audio contribution variants for the Xperience-10M task suite. This script is artifact-driven where possible. It consumes the committed single-episode task-suite windows and feature manifest, derives an alternate audio representation from the local public sample MP4, and writes measured task deltas. """ from __future__ import annotations import argparse import csv import json import math import shutil import subprocess import sys from collections import OrderedDict from pathlib import Path from typing import Iterable import numpy as np from single_episode_diagnostics import ( TASKS, TASK_DISPLAY, block_indices, chronological_split, classification_metrics, encode_labels, frame_centers, labels_from_windows, load_inputs, multilabel_metrics, onehot, read_csv, regression_metrics, retrieval_metrics, ridge_predict, standardize, transition_labels_from_boundaries, write_csv, write_json, ) VARIANTS = [ "all_handcrafted_audio", "all_except_audio", "handcrafted_audio_only", "raw_logmel_audio_only", "replace_handcrafted_with_raw", "all_plus_raw_logmel", ] VARIANT_DISPLAY = { "all_handcrafted_audio": "All Current Features", "all_except_audio": "All Except Audio", "handcrafted_audio_only": "Audio Only", "raw_logmel_audio_only": "Alternate Audio Only", "replace_handcrafted_with_raw": "Audio Representation Replacement", "all_plus_raw_logmel": "All Current Features + Alternate Audio", } PRIMARY_METRIC_HIGHER_IS_BETTER = { "timeline_action": True, "timeline_subtask": True, "transition_detection": True, "next_action": True, "hand_trajectory_forecast": False, "contact_prediction": True, "object_relevance": True, "caption_grounding": True, "cross_modal_retrieval": True, "modality_reconstruction": False, "temporal_order": True, "misalignment_detection": True, } def parse_args() -> argparse.Namespace: root = Path(__file__).resolve().parents[1] parser = argparse.ArgumentParser(description=__doc__) parser.add_argument("--workspace", type=Path, default=root) parser.add_argument("--suite-dir", type=Path, default=root / "results/episode_task_suite") parser.add_argument("--output-dir", type=Path, default=root / "results/audio_ablation") parser.add_argument("--raw-sample-dir", type=Path, default=None) parser.add_argument("--annotation", type=Path, default=None) parser.add_argument("--homie-toolkit", type=Path, default=None) parser.add_argument("--audio-source", default="fisheye_cam0.mp4") parser.add_argument("--sample-rate", type=int, default=16000) parser.add_argument("--mel-bands", type=int, default=64) parser.add_argument("--fft-size", type=int, default=512) parser.add_argument("--hop-length", type=int, default=160) parser.add_argument("--ridge-l2", type=float, default=10.0) parser.add_argument("--test-fraction", type=float, default=0.30) parser.add_argument("--future-offset-windows", type=int, default=4) parser.add_argument("--forecast-frames", type=int, default=10) parser.add_argument("--misalignment-shift-windows", type=int, default=8) parser.add_argument("--force", action="store_true") return parser.parse_args() def infer_raw_sample_dir(workspace: Path, explicit: Path | None) -> Path | None: if explicit is not None: return explicit.expanduser().resolve() candidates = [ workspace / "data/sample/xperience-10m-sample", workspace.parent / "data/sample/xperience-10m-sample", Path.home() / "Library/CloudStorage/Dropbox/Ropedia/data/sample/xperience-10m-sample", ] for candidate in candidates: if (candidate / "fisheye_cam0.mp4").exists(): return candidate.resolve() return None def infer_homie_toolkit(raw_sample_dir: Path | None, explicit: Path | None) -> Path | None: if explicit is not None: return explicit.expanduser().resolve() candidates = [] if raw_sample_dir is not None: for parent in raw_sample_dir.parents: candidates.append(parent / "HOMIE-toolkit") candidates.append(Path.home() / "Library/CloudStorage/Dropbox/Ropedia/HOMIE-toolkit") for candidate in candidates: if candidate.exists(): return candidate.resolve() return None def public_raw_sample_ref(path: Path | None) -> str: if path is None: return "not_available" if path.name == "fisheye_cam0.mp4": return "local_public_sample/fisheye_cam0.mp4" if path.name == "annotation.hdf5": return "local_public_sample/annotation.hdf5" return f"local_public_sample/{path.name}" def decode_audio_mono(path: Path, sample_rate: int) -> np.ndarray: if not path.exists() or shutil.which("ffmpeg") is None: return np.zeros(0, dtype=np.float32) cmd = [ "ffmpeg", "-v", "error", "-i", str(path), "-vn", "-ac", "1", "-ar", str(sample_rate), "-f", "f32le", "-", ] try: proc = subprocess.run(cmd, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) except (subprocess.CalledProcessError, FileNotFoundError): return np.zeros(0, dtype=np.float32) audio = np.frombuffer(proc.stdout, dtype=np.float32) return np.nan_to_num(audio, nan=0.0, posinf=0.0, neginf=0.0).astype(np.float32) def video_fps(path: Path) -> float | None: if not path.exists() or shutil.which("ffprobe") is None: return None cmd = [ "ffprobe", "-v", "error", "-select_streams", "v:0", "-show_entries", "stream=avg_frame_rate,r_frame_rate", "-of", "json", str(path), ] try: proc = subprocess.run(cmd, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) payload = json.loads(proc.stdout) except (subprocess.CalledProcessError, FileNotFoundError, json.JSONDecodeError): return None streams = payload.get("streams") or [] for stream in streams: for key in ("avg_frame_rate", "r_frame_rate"): value = str(stream.get(key) or "") if "/" in value: num, den = value.split("/", 1) try: fps = float(num) / max(float(den), 1e-12) except ValueError: continue else: try: fps = float(value) except ValueError: continue if np.isfinite(fps) and fps > 0: return fps return None def hz_to_mel(hz: np.ndarray) -> np.ndarray: return 2595.0 * np.log10(1.0 + hz / 700.0) def mel_to_hz(mel: np.ndarray) -> np.ndarray: return 700.0 * (10.0 ** (mel / 2595.0) - 1.0) def mel_filterbank(sample_rate: int, fft_size: int, n_mels: int, f_min: float = 40.0) -> np.ndarray: n_freqs = fft_size // 2 + 1 f_max = sample_rate / 2.0 mel_points = np.linspace(hz_to_mel(np.asarray([f_min]))[0], hz_to_mel(np.asarray([f_max]))[0], n_mels + 2) hz_points = mel_to_hz(mel_points) bins = np.floor((fft_size + 1) * hz_points / sample_rate).astype(int) bins = np.clip(bins, 0, n_freqs - 1) fb = np.zeros((n_mels, n_freqs), dtype=np.float32) for i in range(n_mels): left, center, right = int(bins[i]), int(bins[i + 1]), int(bins[i + 2]) if center <= left: center = min(left + 1, n_freqs - 1) if right <= center: right = min(center + 1, n_freqs) if center > left: fb[i, left:center] = (np.arange(left, center) - left) / max(center - left, 1) if right > center: fb[i, center:right] = (right - np.arange(center, right)) / max(right - center, 1) denom = fb.sum(axis=1, keepdims=True) denom[denom < 1e-8] = 1.0 return fb / denom def stft_power(segment: np.ndarray, fft_size: int, hop_length: int) -> np.ndarray: segment = np.asarray(segment, dtype=np.float32).reshape(-1) if segment.size == 0: return np.zeros((1, fft_size // 2 + 1), dtype=np.float32) if segment.size < fft_size: segment = np.pad(segment, (0, fft_size - segment.size)) n_frames = 1 + max(0, (segment.size - fft_size) // hop_length) if n_frames <= 0: n_frames = 1 window = np.hanning(fft_size).astype(np.float32) frames = np.zeros((n_frames, fft_size), dtype=np.float32) for i in range(n_frames): start = i * hop_length chunk = segment[start : start + fft_size] if chunk.size < fft_size: chunk = np.pad(chunk, (0, fft_size - chunk.size)) frames[i] = chunk * window spec = np.fft.rfft(frames, n=fft_size, axis=1) return (np.abs(spec) ** 2).astype(np.float32) def raw_audio_segment_embedding(segment: np.ndarray, sample_rate: int, mel_fb: np.ndarray, fft_size: int, hop_length: int) -> np.ndarray: segment = np.asarray(segment, dtype=np.float32).reshape(-1) if segment.size == 0: return np.zeros(mel_fb.shape[0] * 9 + 12, dtype=np.float32) segment = np.nan_to_num(segment, nan=0.0, posinf=0.0, neginf=0.0) power = stft_power(segment, fft_size, hop_length) mel = np.log1p(power @ mel_fb.T) delta = np.diff(mel, axis=0) if mel.shape[0] > 1 else np.zeros_like(mel) stats = [ mel.mean(axis=0), mel.std(axis=0), mel.min(axis=0), mel.max(axis=0), np.percentile(mel, 10, axis=0), np.percentile(mel, 50, axis=0), np.percentile(mel, 90, axis=0), delta.mean(axis=0), delta.std(axis=0), ] abs_seg = np.abs(segment) rms = float(np.sqrt(np.mean(segment * segment))) zcr = float(np.mean(segment[1:] * segment[:-1] < 0.0)) if segment.size > 1 else 0.0 energy = abs_seg.reshape(-1) thirds = np.array_split(energy, 3) third_means = [float(x.mean()) if len(x) else 0.0 for x in thirds] waveform = np.asarray( [ rms, float(abs_seg.mean()), float(abs_seg.std()), float(abs_seg.max(initial=0.0)), zcr, float(np.log1p(np.mean(segment * segment))), *third_means, float(third_means[-1] - third_means[0]), float(segment.size / max(sample_rate, 1)), float(mel.shape[0]), ], dtype=np.float32, ) return np.concatenate([*stats, waveform]).astype(np.float32) def extract_raw_audio_window_features( audio_path: Path, windows: list[dict], n_frames: int, output_dir: Path, sample_rate: int, mel_bands: int, fft_size: int, hop_length: int, force: bool, ) -> tuple[np.ndarray, dict]: output_dir.mkdir(parents=True, exist_ok=True) cache_path = output_dir / f"raw_logmel_{audio_path.stem}_sr{sample_rate}_mels{mel_bands}_fft{fft_size}_hop{hop_length}.npz" if cache_path.exists() and not force: data = np.load(cache_path, allow_pickle=True) return data["features"].astype(np.float32), json.loads(str(data["metadata"].item())) audio = decode_audio_mono(audio_path, sample_rate) fps = video_fps(audio_path) has_audio = bool(audio.size > 0) if has_audio and fps is None: fps = n_frames / max(audio.size / float(sample_rate), 1e-6) mel_fb = mel_filterbank(sample_rate, fft_size, mel_bands) feature_dim = mel_bands * 9 + 12 features = np.zeros((len(windows), feature_dim), dtype=np.float32) if has_audio and fps is not None: for i, row in enumerate(windows): start_frame = int(row["start_frame"]) end_frame = int(row["end_frame"]) + 1 start_sample = int(round((start_frame / fps) * sample_rate)) end_sample = int(round((end_frame / fps) * sample_rate)) start_sample = max(0, min(start_sample, audio.size)) end_sample = max(start_sample + 1, min(end_sample, audio.size)) features[i] = raw_audio_segment_embedding(audio[start_sample:end_sample], sample_rate, mel_fb, fft_size, hop_length) if i and i % 250 == 0: print(f" raw log-mel audio windows: {i}/{len(windows)}") metadata = { "source": public_raw_sample_ref(audio_path), "exists": bool(audio_path.exists()), "has_audio": has_audio, "sample_rate": int(sample_rate), "fps": float(fps) if fps is not None else None, "num_samples": int(audio.size), "num_windows": int(len(windows)), "feature_dim": int(features.shape[1]), "mel_bands": int(mel_bands), "fft_size": int(fft_size), "hop_length": int(hop_length), "feature_description": "Per-window raw waveform STFT log-mel statistics plus delta and waveform envelope statistics.", } np.savez_compressed(cache_path, features=features, metadata=json.dumps(metadata, sort_keys=True)) return features, metadata def load_annotation(annotation: Path | None, toolkit: Path | None) -> dict | None: if annotation is None or not annotation.exists() or toolkit is None or not toolkit.exists(): return None sys.path.insert(0, str(toolkit)) from data_loader import load_from_annotation_hdf5 return load_from_annotation_hdf5(annotation, 0, None, load_slam_point_cloud=False) def object_targets_from_annotation(ann: dict | None, windows: list[dict]) -> dict | None: if ann is None: return None frame_info = ann.get("caption_frame_info_map") if frame_info is None: return None vocab: OrderedDict[str, int] = OrderedDict() labels: list[list[str]] = [] for row in windows: objects: OrderedDict[str, None] = OrderedDict() for frame in range(int(row["start_frame"]), int(row["end_frame"]) + 1): info = frame_info.get(frame, {}) raw_objects = info.get("objects") if isinstance(raw_objects, list): for obj in raw_objects: text = str(obj).strip() if text: objects.setdefault(text, None) elif raw_objects: text = str(raw_objects).strip() if text: objects.setdefault(text, None) obj_list = list(objects.keys()) for obj in obj_list: if obj not in vocab: vocab[obj] = len(vocab) labels.append(obj_list) if not vocab: return None Y = np.zeros((len(windows), len(vocab)), dtype=np.float32) for i, obj_list in enumerate(labels): for obj in obj_list: Y[i, vocab[obj]] = 1.0 return {"Y": Y, "vocab": list(vocab.keys())} def exact_hand_targets_from_annotation(ann: dict | None, windows: list[dict], forecast_frames: int) -> tuple[np.ndarray, np.ndarray] | None: if ann is None: return None left = ann.get("hand_left_joints") right = ann.get("hand_right_joints") body = ann.get("smplh_body_joints") if left is None or right is None: return None valid, targets = [], [] n_frames = len(left) for i, row in enumerate(windows): future_start = int(row["end_frame"]) + 1 future_end = future_start + forecast_frames if future_end > n_frames: continue hand = np.concatenate([left[future_start:future_end], right[future_start:future_end]], axis=1) if body is not None and future_end <= len(body): root = body[future_start:future_end, :1, :] hand = hand - root valid.append(i) targets.append(hand.reshape(-1)) if not targets: return None return np.asarray(valid, dtype=np.int64), np.stack(targets).astype(np.float32) def exact_contact_labels_from_annotation(ann: dict | None, windows: list[dict]) -> np.ndarray | None: if ann is None or ann.get("contacts") is None: return None contacts = ann["contacts"] labels = [] for row in windows: c = contacts[int(row["start_frame"]) : int(row["end_frame"]) + 1] labels.append("contact" if np.any(c > 0) else "no_contact") return np.asarray(labels, dtype=object) def exact_next_action_labels_from_annotation(ann: dict | None, windows: list[dict], future_frames: int = 20) -> np.ndarray | None: if ann is None or ann.get("caption_frame_info_map") is None: return None frame_info = ann["caption_frame_info_map"] n_frames = len(ann["img_names"]) labels = [] for row in windows: future_frame = min(n_frames - 1, int(row["end_frame"]) + future_frames) info = frame_info.get(future_frame, {}) label = info.get("action_label") or info.get("action") or "" labels.append(str(label)) return np.asarray(labels, dtype=object) def setdiff_idx(a: np.ndarray, b: np.ndarray) -> np.ndarray: return np.setdiff1d(np.asarray(a, dtype=np.int64), np.asarray(b, dtype=np.int64), assume_unique=False) def task_base_indices(task: str, manifest: list[dict]) -> np.ndarray: audio = block_indices(manifest, ["audio_"]) caption = block_indices(manifest, ["caption_objects_interaction_text"]) contact = block_indices(manifest, ["body_contacts"]) all_idx = block_indices(manifest) sensor = setdiff_idx(all_idx, caption) if task in {"caption_grounding"}: return sensor if task in {"cross_modal_retrieval", "modality_reconstruction"}: return block_indices(manifest, ["hand_", "body_joints", "body_contacts", "camera_", "imu_", "audio_"]) if task == "contact_prediction": return setdiff_idx(sensor, contact) if task == "object_relevance": return sensor return all_idx def feature_matrix_for_variant(task: str, variant: str, X: np.ndarray, raw_audio: np.ndarray, manifest: list[dict]) -> tuple[np.ndarray, str]: base = task_base_indices(task, manifest) audio = block_indices(manifest, ["audio_"]) base_no_audio = setdiff_idx(base, audio) if variant == "all_handcrafted_audio": return X[:, base], f"task contract feature blocks with audio where applicable ({len(base)} dims)" if variant == "all_except_audio": return X[:, base_no_audio], f"same task contract with audio columns removed ({len(base_no_audio)} dims)" if variant == "handcrafted_audio_only": return X[:, audio], f"audio feature block only ({len(audio)} dims)" if variant == "raw_logmel_audio_only": return raw_audio, f"raw waveform log-mel embedding only ({raw_audio.shape[1]} dims)" if variant == "replace_handcrafted_with_raw": return np.concatenate([X[:, base_no_audio], raw_audio], axis=1), ( f"task contract with baseline audio removed and alternate audio representation added ({len(base_no_audio) + raw_audio.shape[1]} dims)" ) if variant == "all_plus_raw_logmel": return np.concatenate([X[:, base], raw_audio], axis=1), ( f"task contract with existing handcrafted AAC plus raw log-mel ({len(base) + raw_audio.shape[1]} dims)" ) raise KeyError(variant) def one_dim_target_standardize(train: np.ndarray, test: np.ndarray) -> tuple[np.ndarray, np.ndarray]: return standardize(train, test) def fit_classification_matrix(Xv: np.ndarray, labels: np.ndarray, train_idx: np.ndarray, test_idx: np.ndarray, l2: float) -> dict: y, class_names = encode_labels(labels) train_classes = set(int(x) for x in y[train_idx]) test_classes = set(int(x) for x in y[test_idx]) unseen = [class_names[i] for i in sorted(test_classes - train_classes)] X_train, X_test = standardize(Xv[train_idx], Xv[test_idx]) scores = ridge_predict(X_train, onehot(y[train_idx], len(class_names)), X_test, l2) pred = scores.argmax(axis=1) metrics = classification_metrics(y[test_idx], pred) metrics.update({ "num_classes": int(len(class_names)), "num_train": int(len(train_idx)), "num_test": int(len(test_idx)), "unseen_test_classes": unseen, "unseen_test_class_count": int(len(unseen)), }) return metrics def fit_multilabel_matrix(Xv: np.ndarray, Y: np.ndarray, train_idx: np.ndarray, test_idx: np.ndarray, l2: float) -> dict: X_train, X_test = standardize(Xv[train_idx], Xv[test_idx]) scores = ridge_predict(X_train, Y[train_idx], X_test, l2) pred = (scores >= 0.5).astype(np.float32) empty = np.where(pred.sum(axis=1) == 0)[0] if len(empty): pred[empty, np.argmax(scores[empty], axis=1)] = 1.0 metrics = multilabel_metrics(Y[test_idx], pred) metrics.update({"num_objects": int(Y.shape[1]), "num_train": int(len(train_idx)), "num_test": int(len(test_idx))}) return metrics def fit_regression_matrix(Xv: np.ndarray, Y: np.ndarray, train_idx: np.ndarray, test_idx: np.ndarray, l2: float) -> dict: X_train, X_test = standardize(Xv[train_idx], Xv[test_idx]) Y_train, Y_test = standardize(Y[train_idx], Y[test_idx]) pred = ridge_predict(X_train, Y_train, X_test, l2) metrics = regression_metrics(Y_test, pred) metrics.update({"num_train": int(len(train_idx)), "num_test": int(len(test_idx)), "target_dim": int(Y.shape[1])}) return metrics def fit_retrieval_matrix(Xv: np.ndarray, Y: np.ndarray, train_idx: np.ndarray, test_idx: np.ndarray, l2: float) -> dict: X_train, X_test = standardize(Xv[train_idx], Xv[test_idx]) Y_train, Y_test = standardize(Y[train_idx], Y[test_idx]) pred = ridge_predict(X_train, Y_train, X_test, l2) metrics = retrieval_metrics(pred, Y_test) metrics.update({"num_train": int(len(train_idx)), "num_test": int(len(test_idx)), "target_dim": int(Y.shape[1])}) return metrics def pair_features_generic(F: np.ndarray, pairs: np.ndarray) -> np.ndarray: left = F[pairs[:, 0]] right = F[pairs[:, 1]] return np.concatenate([left, right, right - left], axis=1).astype(np.float32) def misalignment_features( variant: str, X: np.ndarray, raw_audio: np.ndarray, manifest: list[dict], pairs: np.ndarray, ) -> tuple[np.ndarray, str]: motion = block_indices(manifest, ["hand_", "body_joints", "body_contacts", "camera_", "imu_"]) visual_audio = block_indices(manifest, ["depth_confidence", "video_", "audio_"]) audio = block_indices(manifest, ["audio_"]) visual_no_audio = setdiff_idx(visual_audio, audio) if variant == "all_handcrafted_audio": left = X[pairs[:, 0]][:, motion] right = X[pairs[:, 1]][:, visual_audio] return np.concatenate([left, right], axis=1).astype(np.float32), "motion/current visual+handcrafted audio pair" if variant == "all_except_audio": left = X[pairs[:, 0]][:, motion] right = X[pairs[:, 1]][:, visual_no_audio] return np.concatenate([left, right], axis=1).astype(np.float32), "motion/current visual pair with audio removed" if variant == "handcrafted_audio_only": return pair_features_generic(X[:, audio], pairs), "audio self-alignment pair" if variant == "raw_logmel_audio_only": return pair_features_generic(raw_audio, pairs), "raw log-mel audio self-alignment pair" if variant == "replace_handcrafted_with_raw": left = X[pairs[:, 0]][:, motion] right = np.concatenate([X[pairs[:, 1]][:, visual_no_audio], raw_audio[pairs[:, 1]]], axis=1) return np.concatenate([left, right], axis=1).astype(np.float32), "motion/current visual pair with raw log-mel replacing handcrafted audio" if variant == "all_plus_raw_logmel": left = X[pairs[:, 0]][:, motion] right = np.concatenate([X[pairs[:, 1]][:, visual_audio], raw_audio[pairs[:, 1]]], axis=1) return np.concatenate([left, right], axis=1).astype(np.float32), "motion/current visual+handcrafted audio pair plus raw log-mel" raise KeyError(variant) def task_target( task: str, X: np.ndarray, windows: list[dict], manifest: list[dict], suite_dir: Path, args: argparse.Namespace, raw_targets: dict, ) -> dict: n = len(windows) all_rows = np.arange(n, dtype=np.int64) if task == "timeline_action": return {"kind": "classification", "labels": labels_from_windows(windows, "action_label"), "rows": all_rows, "metric": "macro_f1"} if task == "timeline_subtask": return {"kind": "classification", "labels": labels_from_windows(windows, "subtask_label"), "rows": all_rows, "metric": "macro_f1"} if task == "transition_detection": labels = transition_labels_from_boundaries(suite_dir, frame_centers(windows)) return {"kind": "classification", "labels": labels, "rows": all_rows, "metric": "macro_f1"} if task == "next_action": labels = raw_targets.get("next_action_labels") if labels is not None: return {"kind": "classification", "labels": labels, "rows": all_rows, "metric": "macro_f1", "target_variant": "future action from annotation frame labels"} rows = np.arange(0, n - args.future_offset_windows, dtype=np.int64) labels = labels_from_windows(windows, "action_label")[rows + args.future_offset_windows] return {"kind": "classification", "labels": labels, "rows": rows, "metric": "macro_f1", "target_variant": "future action from windows.csv"} if task == "contact_prediction": labels = raw_targets.get("contact_labels") if labels is None: contacts = block_indices(manifest, ["body_contacts"]) labels = np.where(np.abs(X[:, contacts]).sum(axis=1) > 1e-8, "contact", "no_contact") return {"kind": "classification", "labels": labels, "rows": all_rows, "metric": "macro_f1"} if task == "object_relevance": obj = raw_targets.get("object_targets") if obj is None: return {"kind": "not_available", "reason": "object labels require local annotation.hdf5"} return {"kind": "multilabel", "target": obj["Y"], "rows": all_rows, "metric": "micro_f1", "num_objects": len(obj["vocab"])} if task == "hand_trajectory_forecast": exact = raw_targets.get("hand_targets") if exact is not None: rows, target = exact return {"kind": "regression", "target": target, "rows": rows, "metric": "mae", "target_variant": "future hand joints from annotation.hdf5"} rows = np.arange(0, n - args.future_offset_windows, dtype=np.int64) hand = block_indices(manifest, ["hand_left_joints", "hand_right_joints"]) return {"kind": "regression", "target": X[rows + args.future_offset_windows][:, hand], "rows": rows, "metric": "mae", "target_variant": "future hand feature block"} if task == "caption_grounding": text = block_indices(manifest, ["caption_objects_interaction_text"]) return {"kind": "retrieval", "target": X[:, text], "rows": all_rows, "metric": "mrr"} if task in {"cross_modal_retrieval", "modality_reconstruction"}: visual = block_indices(manifest, ["depth_confidence", "video_"]) return {"kind": "retrieval" if task == "cross_modal_retrieval" else "regression", "target": X[:, visual], "rows": all_rows, "metric": "mrr" if task == "cross_modal_retrieval" else "mae"} if task == "temporal_order": pairs, labels = [], [] for i in range(n - 1): pairs.append((i, i + 1)) labels.append("forward") pairs.append((i + 1, i)) labels.append("reversed") return {"kind": "pair_classification", "pairs": np.asarray(pairs, dtype=np.int64), "labels": np.asarray(labels, dtype=object), "metric": "macro_f1"} if task == "misalignment_detection": pairs, labels = [], [] shift = args.misalignment_shift_windows for i in range(n - shift): pairs.append((i, i)) labels.append("aligned") pairs.append((i, i + shift)) labels.append("shifted") return {"kind": "misalignment", "pairs": np.asarray(pairs, dtype=np.int64), "labels": np.asarray(labels, dtype=object), "metric": "macro_f1"} raise KeyError(task) def evaluate_task_variant( task: str, variant: str, X: np.ndarray, raw_audio: np.ndarray, windows: list[dict], manifest: list[dict], suite_dir: Path, args: argparse.Namespace, raw_targets: dict, ) -> dict: info = task_target(task, X, windows, manifest, suite_dir, args, raw_targets) row = { "task": task, "task_display": TASK_DISPLAY.get(task, task), "variant": variant, "variant_display": VARIANT_DISPLAY[variant], "status": "computed", "primary_metric": info.get("metric", ""), "primary_value": "", "higher_is_better": str(PRIMARY_METRIC_HIGHER_IS_BETTER[task]).lower(), "feature_dim": "", "num_train": "", "num_test": "", "input_contract": "", "target_variant": info.get("target_variant", ""), "reason": "", } if info["kind"] == "not_available": row.update({"status": "not_computed", "reason": info["reason"]}) return row try: if info["kind"] == "misalignment": feats, desc = misalignment_features(variant, X, raw_audio, manifest, np.asarray(info["pairs"], dtype=np.int64)) labels = np.asarray(info["labels"], dtype=object) train_idx, test_idx = chronological_split(len(labels), args.test_fraction) metrics = fit_classification_matrix(feats, labels, train_idx, test_idx, args.ridge_l2) row["input_contract"] = desc elif info["kind"] == "pair_classification": F, desc = feature_matrix_for_variant(task, variant, X, raw_audio, manifest) feats = pair_features_generic(F, np.asarray(info["pairs"], dtype=np.int64)) labels = np.asarray(info["labels"], dtype=object) train_idx, test_idx = chronological_split(len(labels), args.test_fraction) metrics = fit_classification_matrix(feats, labels, train_idx, test_idx, args.ridge_l2) row["input_contract"] = desc else: F, desc = feature_matrix_for_variant(task, variant, X, raw_audio, manifest) data_rows = np.asarray(info["rows"], dtype=np.int64) train_idx, test_idx = chronological_split(len(data_rows), args.test_fraction) if info["kind"] == "classification": metrics = fit_classification_matrix(F[data_rows], np.asarray(info["labels"], dtype=object), train_idx, test_idx, args.ridge_l2) elif info["kind"] == "multilabel": metrics = fit_multilabel_matrix(F[data_rows], np.asarray(info["target"], dtype=np.float32), train_idx, test_idx, args.ridge_l2) elif info["kind"] == "regression": metrics = fit_regression_matrix(F[data_rows], np.asarray(info["target"], dtype=np.float32), train_idx, test_idx, args.ridge_l2) elif info["kind"] == "retrieval": metrics = fit_retrieval_matrix(F[data_rows], np.asarray(info["target"], dtype=np.float32), train_idx, test_idx, args.ridge_l2) else: raise KeyError(info["kind"]) row["input_contract"] = desc row["feature_dim"] = int(F.shape[1]) row.update(metrics) row["primary_value"] = float(metrics[info["metric"]]) row["num_train"] = int(metrics.get("num_train", row.get("num_train") or 0)) row["num_test"] = int(metrics.get("num_test", row.get("num_test") or 0)) if row["feature_dim"] == "": row["feature_dim"] = int(feats.shape[1]) except Exception as exc: row.update({"status": "not_computed", "reason": f"{type(exc).__name__}: {exc}"}) return row def delta(base: float, compare: float, higher_is_better: bool) -> float: return compare - base if higher_is_better else base - compare def build_summary(rows: list[dict], raw_meta: dict) -> dict: by_task: dict[str, dict[str, dict]] = {} for row in rows: if row.get("status") != "computed": continue by_task.setdefault(row["task"], {})[row["variant"]] = row task_summaries = [] for task in TASKS: variants = by_task.get(task, {}) base = variants.get("all_handcrafted_audio") no_audio = variants.get("all_except_audio") raw_only = variants.get("raw_logmel_audio_only") replace = variants.get("replace_handcrafted_with_raw") plus = variants.get("all_plus_raw_logmel") if not base: continue higher = PRIMARY_METRIC_HIGHER_IS_BETTER[task] item = { "task": task, "task_display": TASK_DISPLAY.get(task, task), "primary_metric": base["primary_metric"], "higher_is_better": higher, "all_handcrafted_audio": float(base["primary_value"]), } if no_audio: item["all_except_audio"] = float(no_audio["primary_value"]) item["handcrafted_audio_delta"] = delta(float(no_audio["primary_value"]), float(base["primary_value"]), higher) if raw_only: item["raw_logmel_audio_only"] = float(raw_only["primary_value"]) if replace and no_audio: item["replace_handcrafted_with_raw"] = float(replace["primary_value"]) item["raw_replacement_delta_vs_no_audio"] = delta(float(no_audio["primary_value"]), float(replace["primary_value"]), higher) item["raw_replacement_delta_vs_handcrafted"] = delta(float(base["primary_value"]), float(replace["primary_value"]), higher) if plus: item["all_plus_raw_logmel"] = float(plus["primary_value"]) item["all_plus_raw_delta_vs_handcrafted"] = delta(float(base["primary_value"]), float(plus["primary_value"]), higher) task_summaries.append(item) handcrafted_deltas = [x["handcrafted_audio_delta"] for x in task_summaries if "handcrafted_audio_delta" in x] raw_replace_deltas = [x["raw_replacement_delta_vs_handcrafted"] for x in task_summaries if "raw_replacement_delta_vs_handcrafted" in x] return { "description": "Measured audio ablation and raw log-mel audio upgrade over the single public Xperience-10M sample episode.", "scope": "single public sample episode; chronological split; ridge heads over fixed feature contracts", "raw_audio_metadata": raw_meta, "num_tasks": len(task_summaries), "variants": VARIANT_DISPLAY, "task_summaries": task_summaries, "aggregate": { "mean_handcrafted_audio_delta": float(np.mean(handcrafted_deltas)) if handcrafted_deltas else None, "tasks_where_handcrafted_audio_improves": int(sum(1 for x in handcrafted_deltas if x > 0)), "mean_raw_replacement_delta_vs_handcrafted": float(np.mean(raw_replace_deltas)) if raw_replace_deltas else None, "tasks_where_raw_replacement_improves_over_handcrafted": int(sum(1 for x in raw_replace_deltas if x > 0)), }, } def write_summary_markdown(path: Path, summary: dict) -> None: lines = [ "# Audio Ablation and Raw-Audio Upgrade", "", "This report is generated from committed task-suite artifacts plus the local public-sample MP4 audio stream.", "It measures whether audio changes each single-episode task under the same chronological split.", "", "## Raw Audio Feature", "", ] meta = summary["raw_audio_metadata"] lines.extend([ f"- Source: `{meta.get('source')}`", f"- Has audio: `{meta.get('has_audio')}`", f"- Sample rate: `{meta.get('sample_rate')}`", f"- Window feature dim: `{meta.get('feature_dim')}`", f"- Feature: {meta.get('feature_description')}", "", "## Task Deltas", "", "| Task | Metric | Current audio | No audio | Current audio delta | Raw replaces audio | Raw replacement delta |", "| --- | --- | ---: | ---: | ---: | ---: | ---: |", ]) for item in summary["task_summaries"]: lines.append( "| {task} | {metric} | {cur:.4f} | {no:.4f} | {d1:.4f} | {raw:.4f} | {d2:.4f} |".format( task=item["task_display"], metric=item["primary_metric"], cur=item.get("all_handcrafted_audio", float("nan")), no=item.get("all_except_audio", float("nan")), d1=item.get("handcrafted_audio_delta", float("nan")), raw=item.get("replace_handcrafted_with_raw", float("nan")), d2=item.get("raw_replacement_delta_vs_handcrafted", float("nan")), ) ) agg = summary["aggregate"] lines.extend([ "", "## Aggregate", "", f"- Mean current-audio delta: `{agg['mean_handcrafted_audio_delta']}`", f"- Tasks where current handcrafted audio improves the primary metric: `{agg['tasks_where_handcrafted_audio_improves']}`", f"- Mean raw-replacement delta vs current handcrafted audio: `{agg['mean_raw_replacement_delta_vs_handcrafted']}`", f"- Tasks where raw log-mel replacement improves over current handcrafted audio: `{agg['tasks_where_raw_replacement_improves_over_handcrafted']}`", "", "Positive deltas always mean better according to each task's primary metric. For MAE tasks, lower MAE is converted into a positive improvement.", "", ]) path.parent.mkdir(parents=True, exist_ok=True) path.write_text("\n".join(lines), encoding="utf-8") def write_delta_chart(path: Path, summary: dict) -> None: items = summary["task_summaries"] width = 1320 row_h = 42 height = 120 + row_h * len(items) max_abs = max([abs(x.get("handcrafted_audio_delta", 0.0)) for x in items] + [1e-6]) left = 410 mid = 680 scale = 240 / max_abs lines = [ f'', '', 'Measured Audio Delta Across Original Xperience-10M Task Contracts', 'Positive means audio improved the task primary metric on the single public sample split.', f'', ] for i, item in enumerate(items): y = 112 + i * row_h task = item["task_display"].replace("&", "&") value = float(item.get("handcrafted_audio_delta", 0.0)) bar_w = abs(value) * scale x = mid if value >= 0 else mid - bar_w color = "#7ae5c3" if value >= 0 else "#ff8a6a" lines.extend([ f'{task}', f'', f'{value:+.4f} {item["primary_metric"]}', ]) lines.append("") path.parent.mkdir(parents=True, exist_ok=True) path.write_text("\n".join(lines), encoding="utf-8") def main() -> int: args = parse_args() args.output_dir.mkdir(parents=True, exist_ok=True) raw_sample_dir = infer_raw_sample_dir(args.workspace, args.raw_sample_dir) audio_path = raw_sample_dir / args.audio_source if raw_sample_dir is not None else Path(args.audio_source) annotation = args.annotation or (raw_sample_dir / "annotation.hdf5" if raw_sample_dir is not None else None) toolkit = infer_homie_toolkit(raw_sample_dir, args.homie_toolkit) if raw_sample_dir is None or not audio_path.exists(): raise FileNotFoundError("Local public sample MP4 is required for alternate audio extraction. Pass --raw-sample-dir.") if shutil.which("ffmpeg") is None: raise RuntimeError("ffmpeg is required to decode the MP4 audio stream.") X, _starts, _ends, windows, manifest, _summary = load_inputs(args.suite_dir) raw_audio, raw_meta = extract_raw_audio_window_features( audio_path, windows, X.shape[0], args.output_dir, args.sample_rate, args.mel_bands, args.fft_size, args.hop_length, args.force, ) ann = load_annotation(annotation, toolkit) raw_targets = { "object_targets": object_targets_from_annotation(ann, windows), "hand_targets": exact_hand_targets_from_annotation(ann, windows, args.forecast_frames), "contact_labels": exact_contact_labels_from_annotation(ann, windows), "next_action_labels": exact_next_action_labels_from_annotation(ann, windows), } rows: list[dict] = [] for task in TASKS: print(f"Audio ablation task: {task}") for variant in VARIANTS: rows.append(evaluate_task_variant(task, variant, X, raw_audio, windows, manifest, args.suite_dir, args, raw_targets)) write_csv(args.output_dir / "audio_ablation_metrics.csv", rows) summary = build_summary(rows, raw_meta) summary["provenance"] = { "suite_dir": "results/episode_task_suite", "shared_windows": "results/episode_task_suite/shared_windows.npz", "feature_manifest": "results/episode_task_suite/feature_manifest.json", "audio_source": public_raw_sample_ref(audio_path), "annotation_source": public_raw_sample_ref(annotation) if annotation is not None and annotation.exists() else "not_available", "homie_toolkit_available": bool(toolkit is not None and toolkit.exists()), } write_json(args.output_dir / "audio_ablation_summary.json", summary) write_summary_markdown(args.output_dir / "AUDIO_ABLATION_SUMMARY.md", summary) write_delta_chart(args.workspace / "docs/assets/charts/audio_ablation_delta.svg", summary) write_json(args.workspace / "docs/data/audio_ablation_summary.json", summary) compact_rows = [] for item in summary["task_summaries"]: compact_rows.append({ "task": item["task"], "task_display": item["task_display"], "metric": item["primary_metric"], "current_audio": item.get("all_handcrafted_audio", ""), "no_audio": item.get("all_except_audio", ""), "current_audio_delta": item.get("handcrafted_audio_delta", ""), "raw_audio_only": item.get("raw_logmel_audio_only", ""), "replace_with_raw": item.get("replace_handcrafted_with_raw", ""), "raw_replacement_delta_vs_current": item.get("raw_replacement_delta_vs_handcrafted", ""), "all_plus_raw": item.get("all_plus_raw_logmel", ""), "all_plus_raw_delta_vs_current": item.get("all_plus_raw_delta_vs_handcrafted", ""), }) write_csv(args.output_dir / "audio_delta_summary.csv", compact_rows) print(f"Wrote {args.output_dir}") return 0 if __name__ == "__main__": raise SystemExit(main())