#!/usr/bin/env python3 """Run one extra data-backed probe for each Ropedia research direction. These tasks reuse the committed single-episode feature tensor generated by `episode_task_suite.py`. They are extension probes, not full implementations of the research directions are solved. """ from __future__ import annotations import argparse import csv import html import json import math from collections import OrderedDict from pathlib import Path from typing import Any import numpy as np ROOT = Path(__file__).resolve().parents[1] RESULTS = ROOT / "results" / "episode_task_suite" OUT_DIR = RESULTS / "research_direction_extensions" DOCS_DATA = ROOT / "docs" / "data" CHARTS = ROOT / "docs" / "assets" / "charts" WINDOWS_NPZ = RESULTS / "shared_windows.npz" WINDOWS_CSV = RESULTS / "windows.csv" FEATURE_MANIFEST = RESULTS / "feature_manifest.json" TASK_SPECS: OrderedDict[str, dict[str, Any]] = OrderedDict( [ ( "body_motion_intensity", { "direction": "A", "direction_name": "Human Modeling & Motion Understanding", "name": "Body and Hand Motion Intensity", "family": "classification", "case_study": "A window with a fast reach or pour should be classified as high motion; a steady holding window should be low motion.", "input": "Current non-mocap feature blocks: video, audio, depth, camera pose/rotation, IMU, SLAM, calibration, and language context.", "middle_process": "Compute the target from hand/body joint changes between neighboring windows, hide the mocap blocks from the input, then classify high versus low motion using the train-set median as the threshold.", "output": "Binary label: high_motion or low_motion.", "minimal_baseline": "Ridge classifier on standardized non-mocap features.", "neural_baseline": "One-hidden-layer MLP binary classifier on the same input features.", "metric_name": "macro-F1", "metric_key": "macro_f1", "metric_direction": "higher", "current_limit": "This is a motion-energy proxy, not a SMPL/MANO body model or a generative motion prior.", }, ), ( "multi_view_consistency_retrieval", { "direction": "B", "direction_name": "3D/4D Reconstruction & Neural Rendering", "name": "Multi-View Consistency Retrieval", "family": "retrieval", "case_study": "Given the fisheye camera features for a pouring moment, retrieve the synchronized stereo-left view from the same time window.", "input": "Query side: fisheye_cam0 video feature block. Candidate side: stereo_left video feature block from held-out windows.", "middle_process": "Learn a projection from one camera-view feature space into another, then rank held-out candidate windows by cosine similarity.", "output": "Ranked candidate windows; the correct synchronized view should rank near the top.", "minimal_baseline": "Ridge projection followed by cosine nearest-neighbor retrieval.", "neural_baseline": "One-hidden-layer MLP projection followed by the same cosine retrieval evaluator.", "metric_name": "MRR", "metric_key": "mrr", "metric_direction": "higher", "current_limit": "This checks calibrated multi-view signal, but it is still feature retrieval, not NeRF, Gaussian Splatting, or novel-view synthesis.", }, ), ( "action_phase_progress", { "direction": "C", "direction_name": "Egocentric Vision & Interaction", "name": "Action Phase Progress Estimation", "family": "regression", "case_study": "Inside a Pour coffee action segment, estimate whether the current window is near the beginning, middle, or end of that action.", "input": "Current non-caption multimodal feature vector, so the label text cannot be copied directly from the language block.", "middle_process": "Convert contiguous action-label runs into a normalized 0-to-1 progress target, train on earlier windows, and regress progress for later windows.", "output": "A scalar progress value between 0.0 and 1.0 for the current action segment.", "minimal_baseline": "Ridge regressor on standardized non-caption features.", "neural_baseline": "One-hidden-layer MLP regressor on the same input features.", "metric_name": "MAE", "metric_key": "mae", "metric_direction": "lower", "current_limit": "This is an action-structure probe inside one episode, not a general intent model across homes, people, or tasks.", }, ), ( "ego_motion_forecast", { "direction": "D", "direction_name": "Scene Reconstruction & World Modeling", "name": "Short-Horizon Ego-Motion Forecasting", "family": "forecast", "case_study": "From the current sensors, predict how the camera translation will change over the next 20 frames while the wearer moves through the scene.", "input": "Current multimodal features excluding the camera-translation block and caption text.", "middle_process": "Build a future target from camera-translation difference at a four-window horizon, then regress that future ego-motion delta from current sensors.", "output": "A future camera-translation delta vector.", "minimal_baseline": "Ridge regressor with a 20-frame forecast horizon.", "neural_baseline": "One-hidden-layer MLP regressor with the same horizon and split.", "metric_name": "MAE", "metric_key": "mae", "metric_direction": "lower", "current_limit": "This is a compact world-model proxy; it does not build a persistent map, scene graph, or object permanence model.", }, ), ] ) def parse_args() -> argparse.Namespace: parser = argparse.ArgumentParser(description="Run four research-direction extension probes.") parser.add_argument("--results-dir", type=Path, default=RESULTS) parser.add_argument("--output-dir", type=Path, default=OUT_DIR) parser.add_argument("--train-fraction", type=float, default=0.70) parser.add_argument("--ridge-l2", type=float, default=10.0) parser.add_argument("--seed", type=int, default=7) parser.add_argument("--future-windows", type=int, default=4) parser.add_argument("--neural-epochs", type=int, default=25) parser.add_argument("--neural-hidden-dim", type=int, default=128) parser.add_argument("--neural-batch-size", type=int, default=128) parser.add_argument("--neural-learning-rate", type=float, default=1e-3) parser.add_argument("--neural-weight-decay", type=float, default=1e-4) parser.add_argument("--skip-neural", action="store_true") return parser.parse_args() def write_json(path: Path, payload: dict[str, Any] | list[Any]) -> None: path.parent.mkdir(parents=True, exist_ok=True) path.write_text(json.dumps(payload, indent=2), encoding="utf-8") def write_csv(path: Path, rows: list[dict[str, Any]], fieldnames: list[str]) -> None: path.parent.mkdir(parents=True, exist_ok=True) with path.open("w", newline="", encoding="utf-8") as handle: writer = csv.DictWriter(handle, fieldnames=fieldnames, lineterminator="\n") writer.writeheader() writer.writerows(rows) def load_windows_csv(path: Path) -> list[dict[str, str]]: with path.open("r", newline="", encoding="utf-8") as handle: return list(csv.DictReader(handle)) def load_inputs(results_dir: Path) -> tuple[np.ndarray, np.ndarray, np.ndarray, list[dict[str, str]], list[dict[str, Any]]]: npz_path = results_dir / "shared_windows.npz" windows_path = results_dir / "windows.csv" manifest_path = results_dir / "feature_manifest.json" if not npz_path.exists(): raise FileNotFoundError(f"Missing {npz_path}. Run scripts/episode_task_suite.py first.") z = np.load(npz_path, allow_pickle=False) X = np.asarray(z["X"], dtype=np.float32) starts = np.asarray(z["starts"], dtype=np.int64) ends = np.asarray(z["ends"], dtype=np.int64) X = np.nan_to_num(X, nan=0.0, posinf=0.0, neginf=0.0) rows = load_windows_csv(windows_path) manifest = json.loads(manifest_path.read_text(encoding="utf-8")) if len(rows) != len(X): raise ValueError(f"windows.csv has {len(rows)} rows but shared_windows.npz has {len(X)} windows.") return X, starts, ends, rows, manifest def block_indices(manifest: list[dict[str, Any]], include: list[str] | None = None, exclude: list[str] | None = None) -> np.ndarray: include = include or [] exclude = exclude or [] idxs: list[int] = [] for block in manifest: name = str(block["name"]) if include and not any(name == p or name.startswith(p) for p in include): continue if exclude and any(name == p or name.startswith(p) for p in exclude): continue idxs.extend(range(int(block["start"]), int(block["end"]))) return np.asarray(idxs, dtype=np.int64) def chronological_split(n: int, train_fraction: float) -> tuple[np.ndarray, np.ndarray]: if n < 2: raise ValueError("Need at least two examples.") split = int(round(n * train_fraction)) split = max(1, min(split, n - 1)) return np.arange(split, dtype=np.int64), np.arange(split, n, dtype=np.int64) def standardize_train_test(X_train: np.ndarray, X_test: np.ndarray) -> tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray]: mean = X_train.mean(axis=0, dtype=np.float64).astype(np.float32) std = X_train.std(axis=0, dtype=np.float64).astype(np.float32) std[std < 1e-6] = 1.0 return (X_train - mean) / std, (X_test - mean) / std, mean, std def ridge_predict( X_train: np.ndarray, Y_train: np.ndarray, X_test: np.ndarray, *, l2: float, standardize_y: bool, ) -> np.ndarray: Xtr, Xte, _, _ = standardize_train_test(X_train.astype(np.float32), X_test.astype(np.float32)) Y = np.asarray(Y_train, dtype=np.float32) if Y.ndim == 1: Y = Y[:, None] if standardize_y: y_mean = Y.mean(axis=0, dtype=np.float64).astype(np.float32) y_std = Y.std(axis=0, dtype=np.float64).astype(np.float32) y_std[y_std < 1e-6] = 1.0 Y_work = (Y - y_mean) / y_std else: y_mean = np.zeros(Y.shape[1], dtype=np.float32) y_std = np.ones(Y.shape[1], dtype=np.float32) Y_work = Y K = Xtr @ Xtr.T K.flat[:: K.shape[0] + 1] += float(l2) alpha = np.linalg.solve(K.astype(np.float64), Y_work.astype(np.float64)).astype(np.float32) pred = (Xte @ Xtr.T @ alpha).astype(np.float32) return pred * y_std + y_mean def ridge_classifier(X_train: np.ndarray, y_train: np.ndarray, X_test: np.ndarray, l2: float) -> tuple[np.ndarray, np.ndarray]: classes = np.asarray(sorted(set(int(v) for v in y_train)), dtype=np.int64) class_to_col = {int(cls): i for i, cls in enumerate(classes)} Y = np.zeros((len(y_train), len(classes)), dtype=np.float32) for row, label in enumerate(y_train): Y[row, class_to_col[int(label)]] = 1.0 scores = ridge_predict(X_train, Y, X_test, l2=l2, standardize_y=False) pred = classes[np.argmax(scores, axis=1)] return pred.astype(np.int64), scores def binary_metrics(y_true: np.ndarray, y_pred: np.ndarray) -> dict[str, float | int]: y_true = y_true.astype(np.int64) y_pred = y_pred.astype(np.int64) accuracy = float(np.mean(y_true == y_pred)) per_class_f1 = [] for cls in (0, 1): tp = int(np.sum((y_true == cls) & (y_pred == cls))) fp = int(np.sum((y_true != cls) & (y_pred == cls))) fn = int(np.sum((y_true == cls) & (y_pred != cls))) precision = tp / max(tp + fp, 1) recall = tp / max(tp + fn, 1) f1 = 2 * precision * recall / max(precision + recall, 1e-12) per_class_f1.append(f1) return { "accuracy": accuracy, "macro_f1": float(np.mean(per_class_f1)), "positive_rate_true": float(np.mean(y_true)), "positive_rate_pred": float(np.mean(y_pred)), "num_test": int(len(y_true)), } def regression_metrics(y_true: np.ndarray, y_pred: np.ndarray) -> dict[str, float | int]: y_true = np.asarray(y_true, dtype=np.float32) y_pred = np.asarray(y_pred, dtype=np.float32) err = y_pred - y_true mse = float(np.mean(err * err)) mae = float(np.mean(np.abs(err))) denom = float(np.sum((y_true - y_true.mean(axis=0, keepdims=True)) ** 2)) numer = float(np.sum(err * err)) r2 = 1.0 - numer / max(denom, 1e-12) return {"mse": mse, "mae": mae, "r2": r2, "num_test": int(len(y_true))} def row_normalize(X: np.ndarray) -> np.ndarray: denom = np.linalg.norm(X, axis=1, keepdims=True) denom[denom < 1e-8] = 1.0 return X / denom def retrieval_metrics(query_pred: np.ndarray, target_test: np.ndarray) -> tuple[dict[str, float | int], list[dict[str, Any]]]: Q = row_normalize(np.asarray(query_pred, dtype=np.float32)) T = row_normalize(np.asarray(target_test, dtype=np.float32)) sims = Q @ T.T ranks = [] rows = [] for i in range(sims.shape[0]): order = np.argsort(-sims[i]) rank = int(np.flatnonzero(order == i)[0]) + 1 ranks.append(rank) rows.append( { "test_position": i, "true_rank": rank, "top_candidate_position": int(order[0]), "top_candidate_score": float(sims[i, order[0]]), "true_score": float(sims[i, i]), } ) ranks_array = np.asarray(ranks, dtype=np.float32) metrics = { "mrr": float(np.mean(1.0 / ranks_array)), "top1": float(np.mean(ranks_array <= 1)), "top5": float(np.mean(ranks_array <= 5)), "top10": float(np.mean(ranks_array <= 10)), "median_rank": float(np.median(ranks_array)), "num_test": int(len(ranks)), } return metrics, rows def choose_score(task: str, metrics: dict[str, Any]) -> float: spec = TASK_SPECS[task] value = float(metrics[spec["metric_key"]]) if spec["metric_direction"] == "higher": return value return max(0.0, 1.0 - value) def train_neural( X_train: np.ndarray, Y_train: np.ndarray, X_test: np.ndarray, *, task_type: str, args: argparse.Namespace, ) -> tuple[np.ndarray, dict[str, Any]]: try: import torch from torch import nn from torch.utils.data import DataLoader, TensorDataset except Exception as exc: # pragma: no cover - depends on optional torch install return np.empty((len(X_test), 0), dtype=np.float32), {"available": False, "reason": f"torch unavailable: {exc}"} rng = np.random.default_rng(args.seed) torch.manual_seed(args.seed) Xtr, Xte, _, _ = standardize_train_test(X_train.astype(np.float32), X_test.astype(np.float32)) Y = np.asarray(Y_train, dtype=np.float32) if Y.ndim == 1: Y = Y[:, None] if task_type == "classification": Y_work = Y y_mean = np.zeros(Y.shape[1], dtype=np.float32) y_std = np.ones(Y.shape[1], dtype=np.float32) else: y_mean = Y.mean(axis=0, dtype=np.float64).astype(np.float32) y_std = Y.std(axis=0, dtype=np.float64).astype(np.float32) y_std[y_std < 1e-6] = 1.0 Y_work = (Y - y_mean) / y_std device = torch.device("cpu") model = nn.Sequential( nn.Linear(Xtr.shape[1], args.neural_hidden_dim), nn.GELU(), nn.Dropout(0.08), nn.Linear(args.neural_hidden_dim, Y_work.shape[1]), ).to(device) if task_type == "classification": loss_fn = nn.BCEWithLogitsLoss() else: loss_fn = nn.MSELoss() opt = torch.optim.AdamW(model.parameters(), lr=args.neural_learning_rate, weight_decay=args.neural_weight_decay) order = np.arange(len(Xtr)) dataset = TensorDataset(torch.from_numpy(Xtr), torch.from_numpy(Y_work.astype(np.float32))) loader = DataLoader(dataset, batch_size=args.neural_batch_size, shuffle=False) history = [] for epoch in range(args.neural_epochs): rng.shuffle(order) if len(order) == len(dataset): X_epoch = torch.from_numpy(Xtr[order]) Y_epoch = torch.from_numpy(Y_work.astype(np.float32)[order]) loader = DataLoader(TensorDataset(X_epoch, Y_epoch), batch_size=args.neural_batch_size, shuffle=False) model.train() total_loss = 0.0 total_seen = 0 for xb, yb in loader: xb = xb.to(device) yb = yb.to(device) opt.zero_grad(set_to_none=True) loss = loss_fn(model(xb), yb) loss.backward() opt.step() total_loss += float(loss.item()) * len(xb) total_seen += len(xb) history.append(total_loss / max(total_seen, 1)) model.eval() with torch.no_grad(): raw = model(torch.from_numpy(Xte).to(device)).cpu().numpy().astype(np.float32) if task_type == "classification": pred = 1.0 / (1.0 + np.exp(-raw)) else: pred = raw * y_std + y_mean return pred, {"available": True, "epochs": args.neural_epochs, "hidden_dim": args.neural_hidden_dim, "loss_history": history} def action_progress_targets(rows: list[dict[str, str]]) -> np.ndarray: labels = [row.get("action_label", "") or "" for row in rows] progress = np.zeros(len(labels), dtype=np.float32) start = 0 while start < len(labels): end = start + 1 while end < len(labels) and labels[end] == labels[start]: end += 1 length = end - start if length > 1: progress[start:end] = np.linspace(0.0, 1.0, length, dtype=np.float32) start = end return progress def task_body_motion_intensity(X: np.ndarray, rows: list[dict[str, str]], manifest: list[dict[str, Any]], args: argparse.Namespace) -> dict[str, Any]: mocap_idx = block_indices(manifest, include=["hand_left_joints", "hand_right_joints", "body_joints"]) input_idx = block_indices(manifest, exclude=["hand_left_joints", "hand_right_joints", "body_joints", "body_contacts"]) valid = np.arange(1, len(X), dtype=np.int64) motion = np.linalg.norm(X[valid][:, mocap_idx] - X[valid - 1][:, mocap_idx], axis=1) train_local, test_local = chronological_split(len(valid), args.train_fraction) threshold = float(np.median(motion[train_local])) y = (motion >= threshold).astype(np.int64) Xv = X[valid][:, input_idx] y_pred_min, scores = ridge_classifier(Xv[train_local], y[train_local], Xv[test_local], args.ridge_l2) min_metrics = binary_metrics(y[test_local], y_pred_min) min_rows = [] for local_pos, pred, score_pair in zip(test_local, y_pred_min, scores): idx = int(valid[int(local_pos)]) min_rows.append( { "window_index": idx, "center_frame": rows[idx]["center_frame"], "motion_energy": float(motion[int(local_pos)]), "true_label": "high_motion" if y[int(local_pos)] else "low_motion", "pred_label": "high_motion" if int(pred) else "low_motion", "score_low": float(score_pair[0]) if len(score_pair) > 0 else "", "score_high": float(score_pair[1]) if len(score_pair) > 1 else "", } ) neural = {"available": False, "reason": "skipped by flag"} neural_metrics = None neural_rows: list[dict[str, Any]] = [] if not args.skip_neural: prob, neural = train_neural(Xv[train_local], y[train_local].astype(np.float32), Xv[test_local], task_type="classification", args=args) if neural.get("available") and prob.size: pred = (prob[:, 0] >= 0.5).astype(np.int64) neural_metrics = binary_metrics(y[test_local], pred) for local_pos, p, pr in zip(test_local, pred, prob[:, 0]): idx = int(valid[int(local_pos)]) neural_rows.append( { "window_index": idx, "center_frame": rows[idx]["center_frame"], "motion_energy": float(motion[int(local_pos)]), "true_label": "high_motion" if y[int(local_pos)] else "low_motion", "pred_label": "high_motion" if int(p) else "low_motion", "prob_high": float(pr), } ) write_csv(OUT_DIR / "body_motion_intensity_minimal_predictions.csv", min_rows, list(min_rows[0].keys())) if neural_rows: write_csv(OUT_DIR / "body_motion_intensity_neural_predictions.csv", neural_rows, list(neural_rows[0].keys())) return { "train_windows": int(len(train_local)), "test_windows": int(len(test_local)), "target_threshold_train_median": threshold, "input_dim": int(len(input_idx)), "target_source": "hand/body joint delta between neighboring windows", "minimal": min_metrics, "neural_mlp": neural_metrics, "neural_training": neural, } def task_multi_view_retrieval(X: np.ndarray, rows: list[dict[str, str]], manifest: list[dict[str, Any]], args: argparse.Namespace) -> dict[str, Any]: query_idx = block_indices(manifest, include=["video_fisheye_cam0"]) target_idx = block_indices(manifest, include=["video_stereo_left"]) if len(query_idx) == 0 or len(target_idx) == 0: raise ValueError("Expected video_fisheye_cam0 and video_stereo_left feature blocks.") train, test = chronological_split(len(X), args.train_fraction) Xq = X[:, query_idx] Yt = X[:, target_idx] pred_min = ridge_predict(Xq[train], Yt[train], Xq[test], l2=args.ridge_l2, standardize_y=True) min_metrics, min_rows = retrieval_metrics(pred_min, Yt[test]) for row in min_rows: idx = int(test[int(row["test_position"])]) row["window_index"] = idx row["center_frame"] = rows[idx]["center_frame"] write_csv(OUT_DIR / "multi_view_consistency_minimal_ranks.csv", min_rows, list(min_rows[0].keys())) neural = {"available": False, "reason": "skipped by flag"} neural_metrics = None neural_rows: list[dict[str, Any]] = [] if not args.skip_neural: pred_neural, neural = train_neural(Xq[train], Yt[train], Xq[test], task_type="projection", args=args) if neural.get("available") and pred_neural.size: neural_metrics, neural_rows = retrieval_metrics(pred_neural, Yt[test]) for row in neural_rows: idx = int(test[int(row["test_position"])]) row["window_index"] = idx row["center_frame"] = rows[idx]["center_frame"] write_csv(OUT_DIR / "multi_view_consistency_neural_ranks.csv", neural_rows, list(neural_rows[0].keys())) return { "train_windows": int(len(train)), "test_windows": int(len(test)), "query_block": "video_fisheye_cam0", "target_block": "video_stereo_left", "query_dim": int(len(query_idx)), "target_dim": int(len(target_idx)), "minimal": min_metrics, "neural_mlp": neural_metrics, "neural_training": neural, } def task_action_phase_progress(X: np.ndarray, rows: list[dict[str, str]], manifest: list[dict[str, Any]], args: argparse.Namespace) -> dict[str, Any]: input_idx = block_indices(manifest, exclude=["caption_objects_interaction_text"]) target = action_progress_targets(rows) train, test = chronological_split(len(X), args.train_fraction) pred_min = ridge_predict(X[train][:, input_idx], target[train], X[test][:, input_idx], l2=args.ridge_l2, standardize_y=True)[:, 0] pred_min = np.clip(pred_min, 0.0, 1.0) min_metrics = regression_metrics(target[test], pred_min) min_rows = [] for local_pos, pred in zip(test, pred_min): idx = int(local_pos) min_rows.append( { "window_index": idx, "center_frame": rows[idx]["center_frame"], "action_label": rows[idx]["action_label"], "true_progress": float(target[idx]), "pred_progress": float(pred), "absolute_error": float(abs(pred - target[idx])), } ) write_csv(OUT_DIR / "action_phase_progress_minimal_predictions.csv", min_rows, list(min_rows[0].keys())) neural = {"available": False, "reason": "skipped by flag"} neural_metrics = None neural_rows: list[dict[str, Any]] = [] if not args.skip_neural: pred_neural, neural = train_neural(X[train][:, input_idx], target[train], X[test][:, input_idx], task_type="regression", args=args) if neural.get("available") and pred_neural.size: values = np.clip(pred_neural[:, 0], 0.0, 1.0) neural_metrics = regression_metrics(target[test], values) for local_pos, pred in zip(test, values): idx = int(local_pos) neural_rows.append( { "window_index": idx, "center_frame": rows[idx]["center_frame"], "action_label": rows[idx]["action_label"], "true_progress": float(target[idx]), "pred_progress": float(pred), "absolute_error": float(abs(pred - target[idx])), } ) write_csv(OUT_DIR / "action_phase_progress_neural_predictions.csv", neural_rows, list(neural_rows[0].keys())) return { "train_windows": int(len(train)), "test_windows": int(len(test)), "input_dim": int(len(input_idx)), "target_source": "normalized position inside contiguous action-label runs", "minimal": min_metrics, "neural_mlp": neural_metrics, "neural_training": neural, } def task_ego_motion_forecast(X: np.ndarray, rows: list[dict[str, str]], manifest: list[dict[str, Any]], args: argparse.Namespace) -> dict[str, Any]: input_idx = block_indices(manifest, exclude=["camera_translation", "caption_objects_interaction_text"]) target_idx = block_indices(manifest, include=["camera_translation"]) horizon = int(args.future_windows) valid = np.arange(0, len(X) - horizon, dtype=np.int64) target = X[valid + horizon][:, target_idx] - X[valid][:, target_idx] Xv = X[valid][:, input_idx] train, test = chronological_split(len(valid), args.train_fraction) pred_min = ridge_predict(Xv[train], target[train], Xv[test], l2=args.ridge_l2, standardize_y=True) min_metrics = regression_metrics(target[test], pred_min) min_rows = [] for local_pos, pred in zip(test, pred_min): idx = int(valid[int(local_pos)]) true_delta = target[int(local_pos)] min_rows.append( { "window_index": idx, "center_frame": rows[idx]["center_frame"], "future_window_index": int(idx + horizon), "delta_l2_true": float(np.linalg.norm(true_delta)), "delta_l2_pred": float(np.linalg.norm(pred)), "delta_l2_error": float(np.linalg.norm(pred - true_delta)), } ) write_csv(OUT_DIR / "ego_motion_forecast_minimal_predictions.csv", min_rows, list(min_rows[0].keys())) neural = {"available": False, "reason": "skipped by flag"} neural_metrics = None neural_rows: list[dict[str, Any]] = [] if not args.skip_neural: pred_neural, neural = train_neural(Xv[train], target[train], Xv[test], task_type="regression", args=args) if neural.get("available") and pred_neural.size: neural_metrics = regression_metrics(target[test], pred_neural) for local_pos, pred in zip(test, pred_neural): idx = int(valid[int(local_pos)]) true_delta = target[int(local_pos)] neural_rows.append( { "window_index": idx, "center_frame": rows[idx]["center_frame"], "future_window_index": int(idx + horizon), "delta_l2_true": float(np.linalg.norm(true_delta)), "delta_l2_pred": float(np.linalg.norm(pred)), "delta_l2_error": float(np.linalg.norm(pred - true_delta)), } ) write_csv(OUT_DIR / "ego_motion_forecast_neural_predictions.csv", neural_rows, list(neural_rows[0].keys())) return { "train_windows": int(len(train)), "test_windows": int(len(test)), "forecast_horizon_windows": horizon, "forecast_horizon_frames": int(horizon * 5), "input_dim": int(len(input_idx)), "target_dim": int(len(target_idx)), "target_source": "future minus current camera_translation feature block", "minimal": min_metrics, "neural_mlp": neural_metrics, "neural_training": neural, } def fmt_metric(value: float | None, metric_key: str) -> str: if value is None: return "n/a" if metric_key in {"mae", "mse"}: return f"{value:.4f}" return f"{value:.4f}" def task_main_metric(task: str, result: dict[str, Any], baseline: str) -> float | None: metrics = result.get(baseline) if not metrics: return None key = TASK_SPECS[task]["metric_key"] value = metrics.get(key) return float(value) if value is not None else None def write_markdown(payload: dict[str, Any]) -> None: lines = [ "# Four-Direction Extension Task Baselines", "", "Generated by `scripts/research_direction_extension_tasks.py` from the committed single-episode feature tensor.", "These are data-backed extension probes that show how each research direction can be started from Xperience-10M modalities.", "Cross-episode generalization and full direction completion require later held-out experiments.", "", "## Summary", "", "| Direction | Extension task | Minimal | Neural MLP | Meaning |", "| --- | --- | ---: | ---: | --- |", ] for task, spec in TASK_SPECS.items(): result = payload["tasks"][task] key = spec["metric_key"] min_value = task_main_metric(task, result, "minimal") nn_value = task_main_metric(task, result, "neural_mlp") lines.append( f"| {spec['direction']}. {spec['direction_name']} | {spec['name']} | {fmt_metric(min_value, key)} {spec['metric_name']} | {fmt_metric(nn_value, key)} {spec['metric_name']} | {spec['current_limit']} |" ) lines.extend(["", "## Task Details", ""]) for task, spec in TASK_SPECS.items(): result = payload["tasks"][task] key = spec["metric_key"] lines.extend( [ f"### {spec['direction']}. {spec['name']}", "", f"- Case study: {spec['case_study']}", f"- Input: {spec['input']}", f"- Middle process modules: {spec['middle_process']}", f"- Output: {spec['output']}", f"- Minimal baseline: {spec['minimal_baseline']}", f"- Neural baseline: {spec['neural_baseline']}", f"- Minimal result: {fmt_metric(task_main_metric(task, result, 'minimal'), key)} {spec['metric_name']}", f"- Neural result: {fmt_metric(task_main_metric(task, result, 'neural_mlp'), key)} {spec['metric_name']}", f"- Limitation: {spec['current_limit']}", "", ] ) (OUT_DIR / "research_direction_extension_summary.md").write_text("\n".join(lines).rstrip() + "\n", encoding="utf-8") def svg_text(x: int, y: int, text: str, size: int = 16, weight: int = 500, color: str = "#f4f8ef") -> str: return ( f'{html.escape(text)}' ) def write_svg(payload: dict[str, Any]) -> None: CHARTS.mkdir(parents=True, exist_ok=True) width = 1420 height = 920 colors = {"A": "#ccffa0", "B": "#7ae5c3", "C": "#d8f4a5", "D": "#9bdfff"} svg: list[str] = [ f'', '', '', svg_text(66, 88, "Ropedia Xperience-10M: four direction extension probes", 32, 760), svg_text(66, 122, "Data-backed from the same 1,161-window public sample feature tensor; extension probes for later held-out studies.", 17, 500, "#a5afa2"), ] x0 = 66 y0 = 166 card_w = 620 card_h = 160 gap_x = 44 gap_y = 34 for i, (task, spec) in enumerate(TASK_SPECS.items()): result = payload["tasks"][task] row = i // 2 col = i % 2 x = x0 + col * (card_w + gap_x) y = y0 + row * (card_h + gap_y) color = colors[spec["direction"]] min_v = task_main_metric(task, result, "minimal") nn_v = task_main_metric(task, result, "neural_mlp") metric = spec["metric_name"] svg.extend( [ f'', f'', f'', svg_text(x + 32, y + 48, spec["direction"], 21, 760, color), svg_text(x + 76, y + 35, spec["name"], 20, 760), svg_text(x + 76, y + 62, spec["direction_name"], 13, 650, "#a5afa2"), svg_text(x + 76, y + 94, f"Minimal: {fmt_metric(min_v, spec['metric_key'])} {metric}", 16, 700, "#f4f8ef"), svg_text(x + 300, y + 94, f"Neural MLP: {fmt_metric(nn_v, spec['metric_key'])} {metric}", 16, 700, "#f4f8ef"), svg_text(x + 76, y + 125, spec["output"], 13, 500, "#dce8d7"), ] ) min_score = choose_score(task, result["minimal"]) nn_score = choose_score(task, result["neural_mlp"]) if result.get("neural_mlp") else 0.0 bar_x = x + 76 bar_y = y + 138 bar_w = 440 svg.append(f'') svg.append(f'') svg.append(f'') svg.append(f'') legend_y = 570 svg.extend( [ svg_text(66, legend_y, "How to read this", 24, 760), svg_text(66, legend_y + 34, "Each card adds one concrete task to a research direction using existing sample modalities.", 16, 500, "#dce8d7"), svg_text(66, legend_y + 62, "Colored bar: minimal baseline normalized score. White bar: neural MLP normalized score. Lower-is-better MAE is shown as 1 - MAE for bar length only.", 16, 500, "#dce8d7"), '', svg_text(66, 724, "Implementation boundary", 22, 760), svg_text(66, 758, "A: motion-energy proxy, not a full human body model. B: view-feature retrieval, not neural rendering.", 16, 500, "#dce8d7"), svg_text(66, 786, "C: phase-progress regression, not open-world intent. D: ego-motion forecast, not a persistent map.", 16, 500, "#dce8d7"), svg_text(66, 835, "All metrics are computed from held-out chronological windows of the same public sample episode.", 16, 700, "#f4f8ef"), ] ) svg.append("") (CHARTS / "research_direction_extension_tasks.svg").write_text("\n".join(svg), encoding="utf-8") def build_payload(args: argparse.Namespace) -> dict[str, Any]: X, starts, ends, rows, manifest = load_inputs(args.results_dir) global OUT_DIR OUT_DIR = args.output_dir OUT_DIR.mkdir(parents=True, exist_ok=True) tasks = OrderedDict() tasks["body_motion_intensity"] = task_body_motion_intensity(X, rows, manifest, args) tasks["multi_view_consistency_retrieval"] = task_multi_view_retrieval(X, rows, manifest, args) tasks["action_phase_progress"] = task_action_phase_progress(X, rows, manifest, args) tasks["ego_motion_forecast"] = task_ego_motion_forecast(X, rows, manifest, args) payload = { "source": { "shared_windows": str((args.results_dir / "shared_windows.npz").relative_to(ROOT)), "windows_csv": str((args.results_dir / "windows.csv").relative_to(ROOT)), "feature_manifest": str((args.results_dir / "feature_manifest.json").relative_to(ROOT)), }, "dataset_scope": { "sample_episode_count": 1, "num_windows": int(len(X)), "feature_dim": int(X.shape[1]), "first_start_frame": int(starts[0]), "last_end_frame": int(ends[-1]), "warning": "Single public sample episode; these extension probes validate task design and pipeline mechanics, not cross-episode generalization.", }, "baselines": { "minimal": "Ridge classifiers/regressors/projections plus cosine retrieval on the committed feature tensor.", "neural_mlp": "Small one-hidden-layer PyTorch MLP heads using the same inputs, targets, chronological split, and evaluator.", }, "run_config": { "train_fraction": float(args.train_fraction), "ridge_l2": float(args.ridge_l2), "seed": int(args.seed), "future_windows": int(args.future_windows), "neural_epochs": int(args.neural_epochs), "neural_hidden_dim": int(args.neural_hidden_dim), "neural_batch_size": int(args.neural_batch_size), "skip_neural": bool(args.skip_neural), }, "task_specs": TASK_SPECS, "tasks": tasks, } return payload def main() -> int: args = parse_args() payload = build_payload(args) write_json(args.output_dir / "research_direction_extension_results.json", payload) write_json(DOCS_DATA / "research_direction_extensions.json", payload) write_markdown(payload) write_svg(payload) print(f"Wrote {args.output_dir / 'research_direction_extension_results.json'}") print(f"Wrote {CHARTS / 'research_direction_extension_tasks.svg'}") return 0 if __name__ == "__main__": raise SystemExit(main())