Robotics
PyTorch
Cosmos
xperience10m_task_baseline_suite
embodied-ai
multimodal
xperience-10m
baseline
evaluation
qwen3-omni
Instructions to use cy0307/ropedia-xperience-10m-task-baselines with libraries, inference providers, notebooks, and local apps. Follow these links to get started.
- Libraries
- Cosmos
How to use cy0307/ropedia-xperience-10m-task-baselines with Cosmos:
# No code snippets available yet for this library. # To use this model, check the repository files and the library's documentation. # Want to help? PRs adding snippets are welcome at: # https://github.com/huggingface/huggingface.js
- Notebooks
- Google Colab
- Kaggle
| #!/usr/bin/env python3 | |
| """ | |
| Minimal end-to-end action-recognition pipeline for an Xperience-10M episode. | |
| Input: | |
| annotation.hdf5 | |
| Features: | |
| hand joints, body joints, contacts, camera trajectory, IMU summary statistics. | |
| Target: | |
| caption action_label by default. Use --target subtask for Sub Task labels. | |
| Model: | |
| Numpy-only multinomial logistic regression. | |
| Outputs: | |
| metrics.json, per_class_metrics.csv, confusion_matrix.csv, predictions.csv, | |
| feature_dataset.npz, model.npz. | |
| """ | |
| from __future__ import annotations | |
| import argparse | |
| import csv | |
| import json | |
| import math | |
| import sys | |
| from collections import Counter, OrderedDict | |
| from pathlib import Path | |
| import numpy as np | |
| def parse_args() -> argparse.Namespace: | |
| workspace_default = Path(__file__).resolve().parents[1] | |
| data_default = workspace_default / "data/sample/xperience-10m-sample/annotation.hdf5" | |
| out_default = workspace_default / "outputs/min_action_model" | |
| parser = argparse.ArgumentParser(description="Train a minimal action classifier on Ropedia annotation.hdf5.") | |
| parser.add_argument("--workspace", type=Path, default=workspace_default, help="Ropedia workspace root.") | |
| parser.add_argument("--annotation", type=Path, default=data_default, help="Path to annotation.hdf5.") | |
| parser.add_argument("--output-dir", type=Path, default=out_default, help="Output artifact directory.") | |
| parser.add_argument("--target", choices=["action", "subtask"], default="action", help="Prediction target.") | |
| parser.add_argument("--window-frames", type=int, default=20, help="Frames per training window.") | |
| parser.add_argument("--stride-frames", type=int, default=5, help="Stride between windows.") | |
| parser.add_argument("--min-label-fraction", type=float, default=0.6, help="Minimum majority-label fraction in a window.") | |
| parser.add_argument("--test-fraction", type=float, default=0.25, help="Stratified test fraction.") | |
| parser.add_argument("--epochs", type=int, default=800, help="Training epochs.") | |
| parser.add_argument("--learning-rate", type=float, default=0.2, help="Softmax learning rate.") | |
| parser.add_argument("--l2", type=float, default=1e-3, help="L2 weight decay.") | |
| parser.add_argument("--seed", type=int, default=7, help="Random seed.") | |
| parser.add_argument("--no-class-weights", action="store_true", help="Disable inverse-frequency class weighting.") | |
| return parser.parse_args() | |
| def add_toolkit_to_path(workspace: Path) -> None: | |
| toolkit = workspace / "HOMIE-toolkit" | |
| if not toolkit.exists(): | |
| raise FileNotFoundError(f"HOMIE-toolkit not found: {toolkit}") | |
| sys.path.insert(0, str(toolkit)) | |
| def portable_path(path: Path, workspace: Path | None = None) -> str: | |
| roots = [workspace, Path.cwd()] | |
| for root in roots: | |
| if root is None: | |
| continue | |
| try: | |
| return path.resolve().relative_to(Path(root).resolve()).as_posix() | |
| except (FileNotFoundError, ValueError): | |
| continue | |
| return path.name | |
| def temporal_stats(arr: np.ndarray) -> np.ndarray: | |
| """Return fixed statistics over time for an array shaped (T, ...).""" | |
| arr = np.asarray(arr, dtype=np.float32) | |
| if arr.ndim == 0: | |
| arr = arr.reshape(1, 1) | |
| elif arr.ndim == 1: | |
| arr = arr[:, None] | |
| flat = arr.reshape(arr.shape[0], -1) | |
| flat = np.nan_to_num(flat, nan=0.0, posinf=0.0, neginf=0.0) | |
| if flat.shape[0] == 0: | |
| raise ValueError("temporal_stats received an empty time axis") | |
| mean = flat.mean(axis=0) | |
| std = flat.std(axis=0) | |
| amin = flat.min(axis=0) | |
| amax = flat.max(axis=0) | |
| delta = flat[-1] - flat[0] | |
| if flat.shape[0] > 1: | |
| vel = np.diff(flat, axis=0) | |
| vel_mean = vel.mean(axis=0) | |
| vel_std = vel.std(axis=0) | |
| else: | |
| vel_mean = np.zeros(flat.shape[1], dtype=np.float32) | |
| vel_std = np.zeros(flat.shape[1], dtype=np.float32) | |
| return np.concatenate([mean, std, amin, amax, delta, vel_mean, vel_std]).astype(np.float32) | |
| def safe_window(arr: np.ndarray | None, start: int, end: int) -> np.ndarray | None: | |
| if arr is None: | |
| return None | |
| if start >= len(arr): | |
| return None | |
| return np.asarray(arr[start:min(end, len(arr))]) | |
| def center_by_body_root(values: np.ndarray, body: np.ndarray | None) -> np.ndarray: | |
| if body is None or len(body) != len(values) or body.ndim < 3 or body.shape[-1] != 3: | |
| return values | |
| root = body[:, :1, :] | |
| return values - root | |
| def extract_window_features(ann: dict, start: int, end: int) -> np.ndarray: | |
| body = safe_window(ann.get("smplh_body_joints"), start, end) | |
| left = safe_window(ann.get("hand_left_joints"), start, end) | |
| right = safe_window(ann.get("hand_right_joints"), start, end) | |
| contacts = safe_window(ann.get("contacts"), start, end) | |
| cam_t = safe_window(ann.get("t_c2w_all"), start, end) | |
| chunks: list[np.ndarray] = [] | |
| if left is not None: | |
| chunks.append(temporal_stats(center_by_body_root(left, body))) | |
| if right is not None: | |
| chunks.append(temporal_stats(center_by_body_root(right, body))) | |
| if body is not None: | |
| root = body[:, :1, :] if body.ndim == 3 else 0.0 | |
| chunks.append(temporal_stats(body - root)) | |
| if contacts is not None: | |
| chunks.append(temporal_stats(contacts)) | |
| if cam_t is not None: | |
| cam_t = cam_t - cam_t[:1] | |
| chunks.append(temporal_stats(cam_t)) | |
| imu_accel = ann.get("imu_accel_xyz") | |
| imu_gyro = ann.get("imu_gyro_xyz") | |
| imu_keyframes = ann.get("imu_keyframe_indices") | |
| if imu_accel is not None and imu_gyro is not None and imu_keyframes is not None and len(imu_keyframes) > end - 1: | |
| imu_start = int(max(0, imu_keyframes[start])) | |
| imu_end = int(min(len(imu_accel), max(imu_start + 1, imu_keyframes[end - 1] + 1))) | |
| imu = np.concatenate([imu_accel[imu_start:imu_end], imu_gyro[imu_start:imu_end]], axis=1) | |
| chunks.append(temporal_stats(imu)) | |
| if not chunks: | |
| raise ValueError("No usable numeric modalities found in annotation.") | |
| return np.concatenate(chunks).astype(np.float32) | |
| def frame_label(info: dict, target: str) -> str: | |
| if target == "subtask": | |
| label = info.get("theme", "") | |
| else: | |
| label = info.get("action_label", "") | |
| label = str(label).strip() | |
| if not label or label.upper() == "N/A": | |
| return "" | |
| return label | |
| def majority_label(labels: list[str], min_fraction: float) -> tuple[str, float]: | |
| labels = [x for x in labels if x] | |
| if not labels: | |
| return "", 0.0 | |
| label, count = Counter(labels).most_common(1)[0] | |
| frac = count / len(labels) | |
| if frac < min_fraction: | |
| return "", frac | |
| return label, frac | |
| def build_feature_dataset(ann: dict, target: str, window_frames: int, stride_frames: int, min_label_fraction: float): | |
| frame_info = ann.get("caption_frame_info_map") | |
| if frame_info is None: | |
| raise ValueError("No caption_frame_info_map found in annotation.") | |
| n_frames = len(ann["img_names"]) | |
| X, y_labels, starts, ends, label_fracs = [], [], [], [], [] | |
| for start in range(0, n_frames - window_frames + 1, stride_frames): | |
| end = start + window_frames | |
| labels = [frame_label(frame_info.get(i, {}), target) for i in range(start, end)] | |
| label, frac = majority_label(labels, min_label_fraction) | |
| if not label: | |
| continue | |
| X.append(extract_window_features(ann, start, end)) | |
| y_labels.append(label) | |
| starts.append(start) | |
| ends.append(end - 1) | |
| label_fracs.append(frac) | |
| if not X: | |
| raise ValueError("No labeled windows were created. Try lowering --min-label-fraction.") | |
| return ( | |
| np.stack(X).astype(np.float32), | |
| np.asarray(y_labels, dtype=object), | |
| np.asarray(starts, dtype=np.int64), | |
| np.asarray(ends, dtype=np.int64), | |
| np.asarray(label_fracs, dtype=np.float32), | |
| ) | |
| def encode_labels(y_labels: np.ndarray) -> tuple[np.ndarray, list[str]]: | |
| seen = OrderedDict() | |
| for label in y_labels: | |
| if label not in seen: | |
| seen[label] = len(seen) | |
| class_names = list(seen.keys()) | |
| y = np.asarray([seen[label] for label in y_labels], dtype=np.int64) | |
| return y, class_names | |
| def stratified_split(y: np.ndarray, test_fraction: float, seed: int) -> tuple[np.ndarray, np.ndarray]: | |
| rng = np.random.default_rng(seed) | |
| train_idx, test_idx = [], [] | |
| for cls in np.unique(y): | |
| idx = np.flatnonzero(y == cls) | |
| rng.shuffle(idx) | |
| if len(idx) < 2: | |
| train_idx.extend(idx.tolist()) | |
| continue | |
| n_test = int(round(len(idx) * test_fraction)) | |
| n_test = max(1, min(n_test, len(idx) - 1)) | |
| test_idx.extend(idx[:n_test].tolist()) | |
| train_idx.extend(idx[n_test:].tolist()) | |
| rng.shuffle(train_idx) | |
| rng.shuffle(test_idx) | |
| return np.asarray(train_idx, dtype=np.int64), np.asarray(test_idx, dtype=np.int64) | |
| def fit_scaler(X: np.ndarray) -> tuple[np.ndarray, np.ndarray]: | |
| mean = X.mean(axis=0) | |
| std = X.std(axis=0) | |
| std = np.where(std < 1e-6, 1.0, std) | |
| return mean.astype(np.float32), std.astype(np.float32) | |
| def softmax(logits: np.ndarray) -> np.ndarray: | |
| logits = logits - logits.max(axis=1, keepdims=True) | |
| exp = np.exp(logits) | |
| return exp / exp.sum(axis=1, keepdims=True) | |
| def train_softmax_classifier( | |
| X: np.ndarray, | |
| y: np.ndarray, | |
| n_classes: int, | |
| epochs: int, | |
| lr: float, | |
| l2: float, | |
| use_class_weights: bool, | |
| seed: int, | |
| ) -> tuple[np.ndarray, np.ndarray, list[dict]]: | |
| rng = np.random.default_rng(seed) | |
| n, d = X.shape | |
| W = rng.normal(0.0, 0.01, size=(d, n_classes)).astype(np.float32) | |
| b = np.zeros(n_classes, dtype=np.float32) | |
| onehot = np.eye(n_classes, dtype=np.float32)[y] | |
| if use_class_weights: | |
| counts = np.bincount(y, minlength=n_classes).astype(np.float32) | |
| weights_by_class = n / np.maximum(counts, 1.0) / n_classes | |
| sample_weights = weights_by_class[y] | |
| else: | |
| sample_weights = np.ones(n, dtype=np.float32) | |
| sample_weights = sample_weights / sample_weights.mean() | |
| history = [] | |
| report_every = max(1, epochs // 10) | |
| for epoch in range(1, epochs + 1): | |
| logits = X @ W + b | |
| probs = softmax(logits) | |
| weighted_diff = (probs - onehot) * sample_weights[:, None] / n | |
| grad_W = X.T @ weighted_diff + l2 * W | |
| grad_b = weighted_diff.sum(axis=0) | |
| W -= lr * grad_W | |
| b -= lr * grad_b | |
| if epoch == 1 or epoch == epochs or epoch % report_every == 0: | |
| p_true = np.clip(probs[np.arange(n), y], 1e-9, 1.0) | |
| loss = float(-(sample_weights * np.log(p_true)).mean() + 0.5 * l2 * float(np.sum(W * W))) | |
| acc = float(np.mean(np.argmax(probs, axis=1) == y)) | |
| history.append({"epoch": epoch, "loss": loss, "train_accuracy": acc}) | |
| return W.astype(np.float32), b.astype(np.float32), history | |
| def predict(X: np.ndarray, W: np.ndarray, b: np.ndarray) -> tuple[np.ndarray, np.ndarray]: | |
| probs = softmax(X @ W + b) | |
| return np.argmax(probs, axis=1), probs | |
| def compute_metrics(y_true: np.ndarray, y_pred: np.ndarray, class_names: list[str]) -> tuple[dict, list[dict], np.ndarray]: | |
| n_classes = len(class_names) | |
| cm = np.zeros((n_classes, n_classes), dtype=np.int64) | |
| for t, p in zip(y_true, y_pred): | |
| cm[int(t), int(p)] += 1 | |
| rows = [] | |
| recalls, f1s, weighted_f1_total = [], [], 0.0 | |
| support_total = int(cm.sum()) | |
| for i, name in enumerate(class_names): | |
| tp = int(cm[i, i]) | |
| support = int(cm[i, :].sum()) | |
| pred_count = int(cm[:, i].sum()) | |
| precision = tp / pred_count if pred_count else 0.0 | |
| recall = tp / support if support else 0.0 | |
| f1 = 2 * precision * recall / (precision + recall) if precision + recall else 0.0 | |
| if support: | |
| recalls.append(recall) | |
| f1s.append(f1) | |
| weighted_f1_total += f1 * support | |
| rows.append({ | |
| "class_id": i, | |
| "class_name": name, | |
| "support": support, | |
| "predicted": pred_count, | |
| "precision": precision, | |
| "recall": recall, | |
| "f1": f1, | |
| }) | |
| accuracy = float(np.mean(y_true == y_pred)) if len(y_true) else 0.0 | |
| macro_f1 = float(np.mean(f1s)) if f1s else 0.0 | |
| balanced_accuracy = float(np.mean(recalls)) if recalls else 0.0 | |
| weighted_f1 = float(weighted_f1_total / support_total) if support_total else 0.0 | |
| metrics = { | |
| "accuracy": accuracy, | |
| "balanced_accuracy": balanced_accuracy, | |
| "macro_f1": macro_f1, | |
| "weighted_f1": weighted_f1, | |
| "num_eval_windows": int(len(y_true)), | |
| "num_classes": n_classes, | |
| } | |
| return metrics, rows, cm | |
| def write_csv(path: Path, rows: list[dict], fieldnames: list[str]) -> None: | |
| with path.open("w", newline="", encoding="utf-8") as fp: | |
| writer = csv.DictWriter(fp, fieldnames=fieldnames, lineterminator="\n") | |
| writer.writeheader() | |
| writer.writerows(rows) | |
| def save_artifacts( | |
| output_dir: Path, | |
| X: np.ndarray, | |
| y: np.ndarray, | |
| y_labels: np.ndarray, | |
| starts: np.ndarray, | |
| ends: np.ndarray, | |
| label_fracs: np.ndarray, | |
| train_idx: np.ndarray, | |
| test_idx: np.ndarray, | |
| class_names: list[str], | |
| mean: np.ndarray, | |
| std: np.ndarray, | |
| W: np.ndarray, | |
| b: np.ndarray, | |
| history: list[dict], | |
| metrics: dict, | |
| per_class_rows: list[dict], | |
| cm: np.ndarray, | |
| y_pred: np.ndarray, | |
| probs: np.ndarray, | |
| args: argparse.Namespace, | |
| ) -> None: | |
| output_dir.mkdir(parents=True, exist_ok=True) | |
| np.savez_compressed( | |
| output_dir / "feature_dataset.npz", | |
| X=X, | |
| y=y, | |
| labels=y_labels.astype(str), | |
| start_frame=starts, | |
| end_frame=ends, | |
| label_fraction=label_fracs, | |
| train_idx=train_idx, | |
| test_idx=test_idx, | |
| class_names=np.asarray(class_names, dtype=object), | |
| ) | |
| np.savez_compressed(output_dir / "model.npz", mean=mean, std=std, W=W, b=b, class_names=np.asarray(class_names, dtype=object)) | |
| metadata = { | |
| "annotation": portable_path(args.annotation, args.workspace), | |
| "target": args.target, | |
| "window_frames": args.window_frames, | |
| "stride_frames": args.stride_frames, | |
| "min_label_fraction": args.min_label_fraction, | |
| "test_fraction": args.test_fraction, | |
| "epochs": args.epochs, | |
| "learning_rate": args.learning_rate, | |
| "l2": args.l2, | |
| "class_weights": not args.no_class_weights, | |
| "num_windows": int(len(y)), | |
| "num_features": int(X.shape[1]), | |
| "num_train_windows": int(len(train_idx)), | |
| "num_test_windows": int(len(test_idx)), | |
| "classes": class_names, | |
| "history": history, | |
| } | |
| (output_dir / "metadata.json").write_text(json.dumps(metadata, indent=2), encoding="utf-8") | |
| (output_dir / "metrics.json").write_text(json.dumps(metrics, indent=2), encoding="utf-8") | |
| write_csv( | |
| output_dir / "per_class_metrics.csv", | |
| per_class_rows, | |
| ["class_id", "class_name", "support", "predicted", "precision", "recall", "f1"], | |
| ) | |
| with (output_dir / "confusion_matrix.csv").open("w", newline="", encoding="utf-8") as fp: | |
| writer = csv.writer(fp, lineterminator="\n") | |
| writer.writerow(["true\\pred"] + class_names) | |
| for i, name in enumerate(class_names): | |
| writer.writerow([name] + [int(v) for v in cm[i]]) | |
| pred_rows = [] | |
| pred_lookup = {int(idx): k for k, idx in enumerate(test_idx)} | |
| for idx in test_idx: | |
| idx = int(idx) | |
| k = pred_lookup[idx] | |
| pred_id = int(y_pred[k]) | |
| true_id = int(y[idx]) | |
| pred_rows.append({ | |
| "window_index": idx, | |
| "start_frame": int(starts[idx]), | |
| "end_frame": int(ends[idx]), | |
| "true_label": class_names[true_id], | |
| "predicted_label": class_names[pred_id], | |
| "confidence": float(probs[k, pred_id]), | |
| "correct": int(pred_id == true_id), | |
| "label_fraction": float(label_fracs[idx]), | |
| }) | |
| write_csv( | |
| output_dir / "predictions.csv", | |
| pred_rows, | |
| ["window_index", "start_frame", "end_frame", "true_label", "predicted_label", "confidence", "correct", "label_fraction"], | |
| ) | |
| def main() -> int: | |
| args = parse_args() | |
| add_toolkit_to_path(args.workspace) | |
| from data_loader import load_from_annotation_hdf5 | |
| if not args.annotation.exists(): | |
| raise FileNotFoundError(f"annotation.hdf5 not found: {args.annotation}") | |
| print(f"Loading annotation: {args.annotation}") | |
| ann = load_from_annotation_hdf5(args.annotation, 0, None, load_slam_point_cloud=False) | |
| print("Building windowed feature dataset") | |
| X, y_labels, starts, ends, label_fracs = build_feature_dataset( | |
| ann, | |
| target=args.target, | |
| window_frames=args.window_frames, | |
| stride_frames=args.stride_frames, | |
| min_label_fraction=args.min_label_fraction, | |
| ) | |
| y, class_names = encode_labels(y_labels) | |
| train_idx, test_idx = stratified_split(y, args.test_fraction, args.seed) | |
| if len(test_idx) == 0: | |
| raise ValueError("No test windows available. Lower --test-fraction or use more data.") | |
| mean, std = fit_scaler(X[train_idx]) | |
| X_scaled = (X - mean) / std | |
| print(f"Windows: {len(y)} total, {len(train_idx)} train, {len(test_idx)} test") | |
| print(f"Features: {X.shape[1]}, classes: {len(class_names)}") | |
| for name, count in Counter(y_labels).most_common(): | |
| print(f" {count:4d} windows {name}") | |
| print("Training softmax classifier") | |
| W, b, history = train_softmax_classifier( | |
| X_scaled[train_idx], | |
| y[train_idx], | |
| n_classes=len(class_names), | |
| epochs=args.epochs, | |
| lr=args.learning_rate, | |
| l2=args.l2, | |
| use_class_weights=not args.no_class_weights, | |
| seed=args.seed, | |
| ) | |
| y_pred, probs = predict(X_scaled[test_idx], W, b) | |
| metrics, per_class_rows, cm = compute_metrics(y[test_idx], y_pred, class_names) | |
| majority_class = Counter(y[train_idx]).most_common(1)[0][0] | |
| metrics["majority_baseline_accuracy"] = float(np.mean(y[test_idx] == majority_class)) | |
| metrics["train_final_accuracy"] = history[-1]["train_accuracy"] if history else math.nan | |
| metrics["train_final_loss"] = history[-1]["loss"] if history else math.nan | |
| save_artifacts( | |
| args.output_dir, | |
| X, | |
| y, | |
| y_labels, | |
| starts, | |
| ends, | |
| label_fracs, | |
| train_idx, | |
| test_idx, | |
| class_names, | |
| mean, | |
| std, | |
| W, | |
| b, | |
| history, | |
| metrics, | |
| per_class_rows, | |
| cm, | |
| y_pred, | |
| probs, | |
| args, | |
| ) | |
| print("\nEvaluation") | |
| print(f" accuracy: {metrics['accuracy']:.4f}") | |
| print(f" balanced_accuracy: {metrics['balanced_accuracy']:.4f}") | |
| print(f" macro_f1: {metrics['macro_f1']:.4f}") | |
| print(f" weighted_f1: {metrics['weighted_f1']:.4f}") | |
| print(f" majority_baseline: {metrics['majority_baseline_accuracy']:.4f}") | |
| print(f"\nArtifacts written to: {args.output_dir}") | |
| return 0 | |
| if __name__ == "__main__": | |
| raise SystemExit(main()) | |