| |
| """ |
| Minimal end-to-end action-recognition pipeline for an Xperience-10M episode. |
| |
| Input: |
| annotation.hdf5 |
| |
| Features: |
| hand joints, body joints, contacts, camera trajectory, IMU summary statistics. |
| |
| Target: |
| caption action_label by default. Use --target subtask for Sub Task labels. |
| |
| Model: |
| Numpy-only multinomial logistic regression. |
| |
| Outputs: |
| metrics.json, per_class_metrics.csv, confusion_matrix.csv, predictions.csv, |
| feature_dataset.npz, model.npz. |
| """ |
|
|
| from __future__ import annotations |
|
|
| import argparse |
| import csv |
| import json |
| import math |
| import sys |
| from collections import Counter, OrderedDict |
| from pathlib import Path |
|
|
| import numpy as np |
|
|
|
|
| def parse_args() -> argparse.Namespace: |
| workspace_default = Path(__file__).resolve().parents[1] |
| data_default = workspace_default / "data/sample/xperience-10m-sample/annotation.hdf5" |
| out_default = workspace_default / "outputs/min_action_model" |
|
|
| parser = argparse.ArgumentParser(description="Train a minimal action classifier on Ropedia annotation.hdf5.") |
| parser.add_argument("--workspace", type=Path, default=workspace_default, help="Ropedia workspace root.") |
| parser.add_argument("--annotation", type=Path, default=data_default, help="Path to annotation.hdf5.") |
| parser.add_argument("--output-dir", type=Path, default=out_default, help="Output artifact directory.") |
| parser.add_argument("--target", choices=["action", "subtask"], default="action", help="Prediction target.") |
| parser.add_argument("--window-frames", type=int, default=20, help="Frames per training window.") |
| parser.add_argument("--stride-frames", type=int, default=5, help="Stride between windows.") |
| parser.add_argument("--min-label-fraction", type=float, default=0.6, help="Minimum majority-label fraction in a window.") |
| parser.add_argument("--test-fraction", type=float, default=0.25, help="Stratified test fraction.") |
| parser.add_argument("--epochs", type=int, default=800, help="Training epochs.") |
| parser.add_argument("--learning-rate", type=float, default=0.2, help="Softmax learning rate.") |
| parser.add_argument("--l2", type=float, default=1e-3, help="L2 weight decay.") |
| parser.add_argument("--seed", type=int, default=7, help="Random seed.") |
| parser.add_argument("--no-class-weights", action="store_true", help="Disable inverse-frequency class weighting.") |
| return parser.parse_args() |
|
|
|
|
| def add_toolkit_to_path(workspace: Path) -> None: |
| toolkit = workspace / "HOMIE-toolkit" |
| if not toolkit.exists(): |
| raise FileNotFoundError(f"HOMIE-toolkit not found: {toolkit}") |
| sys.path.insert(0, str(toolkit)) |
|
|
|
|
| def portable_path(path: Path, workspace: Path | None = None) -> str: |
| roots = [workspace, Path.cwd()] |
| for root in roots: |
| if root is None: |
| continue |
| try: |
| return path.resolve().relative_to(Path(root).resolve()).as_posix() |
| except (FileNotFoundError, ValueError): |
| continue |
| return path.name |
|
|
|
|
| def temporal_stats(arr: np.ndarray) -> np.ndarray: |
| """Return fixed statistics over time for an array shaped (T, ...).""" |
| arr = np.asarray(arr, dtype=np.float32) |
| if arr.ndim == 0: |
| arr = arr.reshape(1, 1) |
| elif arr.ndim == 1: |
| arr = arr[:, None] |
| flat = arr.reshape(arr.shape[0], -1) |
| flat = np.nan_to_num(flat, nan=0.0, posinf=0.0, neginf=0.0) |
| if flat.shape[0] == 0: |
| raise ValueError("temporal_stats received an empty time axis") |
|
|
| mean = flat.mean(axis=0) |
| std = flat.std(axis=0) |
| amin = flat.min(axis=0) |
| amax = flat.max(axis=0) |
| delta = flat[-1] - flat[0] |
| if flat.shape[0] > 1: |
| vel = np.diff(flat, axis=0) |
| vel_mean = vel.mean(axis=0) |
| vel_std = vel.std(axis=0) |
| else: |
| vel_mean = np.zeros(flat.shape[1], dtype=np.float32) |
| vel_std = np.zeros(flat.shape[1], dtype=np.float32) |
| return np.concatenate([mean, std, amin, amax, delta, vel_mean, vel_std]).astype(np.float32) |
|
|
|
|
| def safe_window(arr: np.ndarray | None, start: int, end: int) -> np.ndarray | None: |
| if arr is None: |
| return None |
| if start >= len(arr): |
| return None |
| return np.asarray(arr[start:min(end, len(arr))]) |
|
|
|
|
| def center_by_body_root(values: np.ndarray, body: np.ndarray | None) -> np.ndarray: |
| if body is None or len(body) != len(values) or body.ndim < 3 or body.shape[-1] != 3: |
| return values |
| root = body[:, :1, :] |
| return values - root |
|
|
|
|
| def extract_window_features(ann: dict, start: int, end: int) -> np.ndarray: |
| body = safe_window(ann.get("smplh_body_joints"), start, end) |
| left = safe_window(ann.get("hand_left_joints"), start, end) |
| right = safe_window(ann.get("hand_right_joints"), start, end) |
| contacts = safe_window(ann.get("contacts"), start, end) |
| cam_t = safe_window(ann.get("t_c2w_all"), start, end) |
|
|
| chunks: list[np.ndarray] = [] |
|
|
| if left is not None: |
| chunks.append(temporal_stats(center_by_body_root(left, body))) |
| if right is not None: |
| chunks.append(temporal_stats(center_by_body_root(right, body))) |
| if body is not None: |
| root = body[:, :1, :] if body.ndim == 3 else 0.0 |
| chunks.append(temporal_stats(body - root)) |
| if contacts is not None: |
| chunks.append(temporal_stats(contacts)) |
| if cam_t is not None: |
| cam_t = cam_t - cam_t[:1] |
| chunks.append(temporal_stats(cam_t)) |
|
|
| imu_accel = ann.get("imu_accel_xyz") |
| imu_gyro = ann.get("imu_gyro_xyz") |
| imu_keyframes = ann.get("imu_keyframe_indices") |
| if imu_accel is not None and imu_gyro is not None and imu_keyframes is not None and len(imu_keyframes) > end - 1: |
| imu_start = int(max(0, imu_keyframes[start])) |
| imu_end = int(min(len(imu_accel), max(imu_start + 1, imu_keyframes[end - 1] + 1))) |
| imu = np.concatenate([imu_accel[imu_start:imu_end], imu_gyro[imu_start:imu_end]], axis=1) |
| chunks.append(temporal_stats(imu)) |
|
|
| if not chunks: |
| raise ValueError("No usable numeric modalities found in annotation.") |
| return np.concatenate(chunks).astype(np.float32) |
|
|
|
|
| def frame_label(info: dict, target: str) -> str: |
| if target == "subtask": |
| label = info.get("theme", "") |
| else: |
| label = info.get("action_label", "") |
| label = str(label).strip() |
| if not label or label.upper() == "N/A": |
| return "" |
| return label |
|
|
|
|
| def majority_label(labels: list[str], min_fraction: float) -> tuple[str, float]: |
| labels = [x for x in labels if x] |
| if not labels: |
| return "", 0.0 |
| label, count = Counter(labels).most_common(1)[0] |
| frac = count / len(labels) |
| if frac < min_fraction: |
| return "", frac |
| return label, frac |
|
|
|
|
| def build_feature_dataset(ann: dict, target: str, window_frames: int, stride_frames: int, min_label_fraction: float): |
| frame_info = ann.get("caption_frame_info_map") |
| if frame_info is None: |
| raise ValueError("No caption_frame_info_map found in annotation.") |
|
|
| n_frames = len(ann["img_names"]) |
| X, y_labels, starts, ends, label_fracs = [], [], [], [], [] |
| for start in range(0, n_frames - window_frames + 1, stride_frames): |
| end = start + window_frames |
| labels = [frame_label(frame_info.get(i, {}), target) for i in range(start, end)] |
| label, frac = majority_label(labels, min_label_fraction) |
| if not label: |
| continue |
| X.append(extract_window_features(ann, start, end)) |
| y_labels.append(label) |
| starts.append(start) |
| ends.append(end - 1) |
| label_fracs.append(frac) |
|
|
| if not X: |
| raise ValueError("No labeled windows were created. Try lowering --min-label-fraction.") |
|
|
| return ( |
| np.stack(X).astype(np.float32), |
| np.asarray(y_labels, dtype=object), |
| np.asarray(starts, dtype=np.int64), |
| np.asarray(ends, dtype=np.int64), |
| np.asarray(label_fracs, dtype=np.float32), |
| ) |
|
|
|
|
| def encode_labels(y_labels: np.ndarray) -> tuple[np.ndarray, list[str]]: |
| seen = OrderedDict() |
| for label in y_labels: |
| if label not in seen: |
| seen[label] = len(seen) |
| class_names = list(seen.keys()) |
| y = np.asarray([seen[label] for label in y_labels], dtype=np.int64) |
| return y, class_names |
|
|
|
|
| def stratified_split(y: np.ndarray, test_fraction: float, seed: int) -> tuple[np.ndarray, np.ndarray]: |
| rng = np.random.default_rng(seed) |
| train_idx, test_idx = [], [] |
| for cls in np.unique(y): |
| idx = np.flatnonzero(y == cls) |
| rng.shuffle(idx) |
| if len(idx) < 2: |
| train_idx.extend(idx.tolist()) |
| continue |
| n_test = int(round(len(idx) * test_fraction)) |
| n_test = max(1, min(n_test, len(idx) - 1)) |
| test_idx.extend(idx[:n_test].tolist()) |
| train_idx.extend(idx[n_test:].tolist()) |
| rng.shuffle(train_idx) |
| rng.shuffle(test_idx) |
| return np.asarray(train_idx, dtype=np.int64), np.asarray(test_idx, dtype=np.int64) |
|
|
|
|
| def fit_scaler(X: np.ndarray) -> tuple[np.ndarray, np.ndarray]: |
| mean = X.mean(axis=0) |
| std = X.std(axis=0) |
| std = np.where(std < 1e-6, 1.0, std) |
| return mean.astype(np.float32), std.astype(np.float32) |
|
|
|
|
| def softmax(logits: np.ndarray) -> np.ndarray: |
| logits = logits - logits.max(axis=1, keepdims=True) |
| exp = np.exp(logits) |
| return exp / exp.sum(axis=1, keepdims=True) |
|
|
|
|
| def train_softmax_classifier( |
| X: np.ndarray, |
| y: np.ndarray, |
| n_classes: int, |
| epochs: int, |
| lr: float, |
| l2: float, |
| use_class_weights: bool, |
| seed: int, |
| ) -> tuple[np.ndarray, np.ndarray, list[dict]]: |
| rng = np.random.default_rng(seed) |
| n, d = X.shape |
| W = rng.normal(0.0, 0.01, size=(d, n_classes)).astype(np.float32) |
| b = np.zeros(n_classes, dtype=np.float32) |
| onehot = np.eye(n_classes, dtype=np.float32)[y] |
|
|
| if use_class_weights: |
| counts = np.bincount(y, minlength=n_classes).astype(np.float32) |
| weights_by_class = n / np.maximum(counts, 1.0) / n_classes |
| sample_weights = weights_by_class[y] |
| else: |
| sample_weights = np.ones(n, dtype=np.float32) |
| sample_weights = sample_weights / sample_weights.mean() |
|
|
| history = [] |
| report_every = max(1, epochs // 10) |
| for epoch in range(1, epochs + 1): |
| logits = X @ W + b |
| probs = softmax(logits) |
| weighted_diff = (probs - onehot) * sample_weights[:, None] / n |
| grad_W = X.T @ weighted_diff + l2 * W |
| grad_b = weighted_diff.sum(axis=0) |
| W -= lr * grad_W |
| b -= lr * grad_b |
|
|
| if epoch == 1 or epoch == epochs or epoch % report_every == 0: |
| p_true = np.clip(probs[np.arange(n), y], 1e-9, 1.0) |
| loss = float(-(sample_weights * np.log(p_true)).mean() + 0.5 * l2 * float(np.sum(W * W))) |
| acc = float(np.mean(np.argmax(probs, axis=1) == y)) |
| history.append({"epoch": epoch, "loss": loss, "train_accuracy": acc}) |
| return W.astype(np.float32), b.astype(np.float32), history |
|
|
|
|
| def predict(X: np.ndarray, W: np.ndarray, b: np.ndarray) -> tuple[np.ndarray, np.ndarray]: |
| probs = softmax(X @ W + b) |
| return np.argmax(probs, axis=1), probs |
|
|
|
|
| def compute_metrics(y_true: np.ndarray, y_pred: np.ndarray, class_names: list[str]) -> tuple[dict, list[dict], np.ndarray]: |
| n_classes = len(class_names) |
| cm = np.zeros((n_classes, n_classes), dtype=np.int64) |
| for t, p in zip(y_true, y_pred): |
| cm[int(t), int(p)] += 1 |
|
|
| rows = [] |
| recalls, f1s, weighted_f1_total = [], [], 0.0 |
| support_total = int(cm.sum()) |
| for i, name in enumerate(class_names): |
| tp = int(cm[i, i]) |
| support = int(cm[i, :].sum()) |
| pred_count = int(cm[:, i].sum()) |
| precision = tp / pred_count if pred_count else 0.0 |
| recall = tp / support if support else 0.0 |
| f1 = 2 * precision * recall / (precision + recall) if precision + recall else 0.0 |
| if support: |
| recalls.append(recall) |
| f1s.append(f1) |
| weighted_f1_total += f1 * support |
| rows.append({ |
| "class_id": i, |
| "class_name": name, |
| "support": support, |
| "predicted": pred_count, |
| "precision": precision, |
| "recall": recall, |
| "f1": f1, |
| }) |
|
|
| accuracy = float(np.mean(y_true == y_pred)) if len(y_true) else 0.0 |
| macro_f1 = float(np.mean(f1s)) if f1s else 0.0 |
| balanced_accuracy = float(np.mean(recalls)) if recalls else 0.0 |
| weighted_f1 = float(weighted_f1_total / support_total) if support_total else 0.0 |
| metrics = { |
| "accuracy": accuracy, |
| "balanced_accuracy": balanced_accuracy, |
| "macro_f1": macro_f1, |
| "weighted_f1": weighted_f1, |
| "num_eval_windows": int(len(y_true)), |
| "num_classes": n_classes, |
| } |
| return metrics, rows, cm |
|
|
|
|
| def write_csv(path: Path, rows: list[dict], fieldnames: list[str]) -> None: |
| with path.open("w", newline="", encoding="utf-8") as fp: |
| writer = csv.DictWriter(fp, fieldnames=fieldnames, lineterminator="\n") |
| writer.writeheader() |
| writer.writerows(rows) |
|
|
|
|
| def save_artifacts( |
| output_dir: Path, |
| X: np.ndarray, |
| y: np.ndarray, |
| y_labels: np.ndarray, |
| starts: np.ndarray, |
| ends: np.ndarray, |
| label_fracs: np.ndarray, |
| train_idx: np.ndarray, |
| test_idx: np.ndarray, |
| class_names: list[str], |
| mean: np.ndarray, |
| std: np.ndarray, |
| W: np.ndarray, |
| b: np.ndarray, |
| history: list[dict], |
| metrics: dict, |
| per_class_rows: list[dict], |
| cm: np.ndarray, |
| y_pred: np.ndarray, |
| probs: np.ndarray, |
| args: argparse.Namespace, |
| ) -> None: |
| output_dir.mkdir(parents=True, exist_ok=True) |
|
|
| np.savez_compressed( |
| output_dir / "feature_dataset.npz", |
| X=X, |
| y=y, |
| labels=y_labels.astype(str), |
| start_frame=starts, |
| end_frame=ends, |
| label_fraction=label_fracs, |
| train_idx=train_idx, |
| test_idx=test_idx, |
| class_names=np.asarray(class_names, dtype=object), |
| ) |
| np.savez_compressed(output_dir / "model.npz", mean=mean, std=std, W=W, b=b, class_names=np.asarray(class_names, dtype=object)) |
|
|
| metadata = { |
| "annotation": portable_path(args.annotation, args.workspace), |
| "target": args.target, |
| "window_frames": args.window_frames, |
| "stride_frames": args.stride_frames, |
| "min_label_fraction": args.min_label_fraction, |
| "test_fraction": args.test_fraction, |
| "epochs": args.epochs, |
| "learning_rate": args.learning_rate, |
| "l2": args.l2, |
| "class_weights": not args.no_class_weights, |
| "num_windows": int(len(y)), |
| "num_features": int(X.shape[1]), |
| "num_train_windows": int(len(train_idx)), |
| "num_test_windows": int(len(test_idx)), |
| "classes": class_names, |
| "history": history, |
| } |
| (output_dir / "metadata.json").write_text(json.dumps(metadata, indent=2), encoding="utf-8") |
| (output_dir / "metrics.json").write_text(json.dumps(metrics, indent=2), encoding="utf-8") |
|
|
| write_csv( |
| output_dir / "per_class_metrics.csv", |
| per_class_rows, |
| ["class_id", "class_name", "support", "predicted", "precision", "recall", "f1"], |
| ) |
|
|
| with (output_dir / "confusion_matrix.csv").open("w", newline="", encoding="utf-8") as fp: |
| writer = csv.writer(fp, lineterminator="\n") |
| writer.writerow(["true\\pred"] + class_names) |
| for i, name in enumerate(class_names): |
| writer.writerow([name] + [int(v) for v in cm[i]]) |
|
|
| pred_rows = [] |
| pred_lookup = {int(idx): k for k, idx in enumerate(test_idx)} |
| for idx in test_idx: |
| idx = int(idx) |
| k = pred_lookup[idx] |
| pred_id = int(y_pred[k]) |
| true_id = int(y[idx]) |
| pred_rows.append({ |
| "window_index": idx, |
| "start_frame": int(starts[idx]), |
| "end_frame": int(ends[idx]), |
| "true_label": class_names[true_id], |
| "predicted_label": class_names[pred_id], |
| "confidence": float(probs[k, pred_id]), |
| "correct": int(pred_id == true_id), |
| "label_fraction": float(label_fracs[idx]), |
| }) |
| write_csv( |
| output_dir / "predictions.csv", |
| pred_rows, |
| ["window_index", "start_frame", "end_frame", "true_label", "predicted_label", "confidence", "correct", "label_fraction"], |
| ) |
|
|
|
|
| def main() -> int: |
| args = parse_args() |
| add_toolkit_to_path(args.workspace) |
| from data_loader import load_from_annotation_hdf5 |
|
|
| if not args.annotation.exists(): |
| raise FileNotFoundError(f"annotation.hdf5 not found: {args.annotation}") |
|
|
| print(f"Loading annotation: {args.annotation}") |
| ann = load_from_annotation_hdf5(args.annotation, 0, None, load_slam_point_cloud=False) |
|
|
| print("Building windowed feature dataset") |
| X, y_labels, starts, ends, label_fracs = build_feature_dataset( |
| ann, |
| target=args.target, |
| window_frames=args.window_frames, |
| stride_frames=args.stride_frames, |
| min_label_fraction=args.min_label_fraction, |
| ) |
| y, class_names = encode_labels(y_labels) |
| train_idx, test_idx = stratified_split(y, args.test_fraction, args.seed) |
| if len(test_idx) == 0: |
| raise ValueError("No test windows available. Lower --test-fraction or use more data.") |
|
|
| mean, std = fit_scaler(X[train_idx]) |
| X_scaled = (X - mean) / std |
|
|
| print(f"Windows: {len(y)} total, {len(train_idx)} train, {len(test_idx)} test") |
| print(f"Features: {X.shape[1]}, classes: {len(class_names)}") |
| for name, count in Counter(y_labels).most_common(): |
| print(f" {count:4d} windows {name}") |
|
|
| print("Training softmax classifier") |
| W, b, history = train_softmax_classifier( |
| X_scaled[train_idx], |
| y[train_idx], |
| n_classes=len(class_names), |
| epochs=args.epochs, |
| lr=args.learning_rate, |
| l2=args.l2, |
| use_class_weights=not args.no_class_weights, |
| seed=args.seed, |
| ) |
|
|
| y_pred, probs = predict(X_scaled[test_idx], W, b) |
| metrics, per_class_rows, cm = compute_metrics(y[test_idx], y_pred, class_names) |
|
|
| majority_class = Counter(y[train_idx]).most_common(1)[0][0] |
| metrics["majority_baseline_accuracy"] = float(np.mean(y[test_idx] == majority_class)) |
| metrics["train_final_accuracy"] = history[-1]["train_accuracy"] if history else math.nan |
| metrics["train_final_loss"] = history[-1]["loss"] if history else math.nan |
|
|
| save_artifacts( |
| args.output_dir, |
| X, |
| y, |
| y_labels, |
| starts, |
| ends, |
| label_fracs, |
| train_idx, |
| test_idx, |
| class_names, |
| mean, |
| std, |
| W, |
| b, |
| history, |
| metrics, |
| per_class_rows, |
| cm, |
| y_pred, |
| probs, |
| args, |
| ) |
|
|
| print("\nEvaluation") |
| print(f" accuracy: {metrics['accuracy']:.4f}") |
| print(f" balanced_accuracy: {metrics['balanced_accuracy']:.4f}") |
| print(f" macro_f1: {metrics['macro_f1']:.4f}") |
| print(f" weighted_f1: {metrics['weighted_f1']:.4f}") |
| print(f" majority_baseline: {metrics['majority_baseline_accuracy']:.4f}") |
| print(f"\nArtifacts written to: {args.output_dir}") |
| return 0 |
|
|
|
|
| if __name__ == "__main__": |
| raise SystemExit(main()) |
|
|