| |
| """Run one extra data-backed probe for each Ropedia research direction. |
| |
| These tasks reuse the committed single-episode feature tensor generated by |
| `episode_task_suite.py`. They are extension probes, not full implementations of the |
| research directions are solved. |
| """ |
|
|
| from __future__ import annotations |
|
|
| import argparse |
| import csv |
| import html |
| import json |
| import math |
| from collections import OrderedDict |
| from pathlib import Path |
| from typing import Any |
|
|
| import numpy as np |
|
|
|
|
| ROOT = Path(__file__).resolve().parents[1] |
| RESULTS = ROOT / "results" / "episode_task_suite" |
| OUT_DIR = RESULTS / "research_direction_extensions" |
| DOCS_DATA = ROOT / "docs" / "data" |
| CHARTS = ROOT / "docs" / "assets" / "charts" |
|
|
| WINDOWS_NPZ = RESULTS / "shared_windows.npz" |
| WINDOWS_CSV = RESULTS / "windows.csv" |
| FEATURE_MANIFEST = RESULTS / "feature_manifest.json" |
|
|
|
|
| TASK_SPECS: OrderedDict[str, dict[str, Any]] = OrderedDict( |
| [ |
| ( |
| "body_motion_intensity", |
| { |
| "direction": "A", |
| "direction_name": "Human Modeling & Motion Understanding", |
| "name": "Body and Hand Motion Intensity", |
| "family": "classification", |
| "case_study": "A window with a fast reach or pour should be classified as high motion; a steady holding window should be low motion.", |
| "input": "Current non-mocap feature blocks: video, audio, depth, camera pose/rotation, IMU, SLAM, calibration, and language context.", |
| "middle_process": "Compute the target from hand/body joint changes between neighboring windows, hide the mocap blocks from the input, then classify high versus low motion using the train-set median as the threshold.", |
| "output": "Binary label: high_motion or low_motion.", |
| "minimal_baseline": "Ridge classifier on standardized non-mocap features.", |
| "neural_baseline": "One-hidden-layer MLP binary classifier on the same input features.", |
| "metric_name": "macro-F1", |
| "metric_key": "macro_f1", |
| "metric_direction": "higher", |
| "current_limit": "This is a motion-energy proxy, not a SMPL/MANO body model or a generative motion prior.", |
| }, |
| ), |
| ( |
| "multi_view_consistency_retrieval", |
| { |
| "direction": "B", |
| "direction_name": "3D/4D Reconstruction & Neural Rendering", |
| "name": "Multi-View Consistency Retrieval", |
| "family": "retrieval", |
| "case_study": "Given the fisheye camera features for a pouring moment, retrieve the synchronized stereo-left view from the same time window.", |
| "input": "Query side: fisheye_cam0 video feature block. Candidate side: stereo_left video feature block from held-out windows.", |
| "middle_process": "Learn a projection from one camera-view feature space into another, then rank held-out candidate windows by cosine similarity.", |
| "output": "Ranked candidate windows; the correct synchronized view should rank near the top.", |
| "minimal_baseline": "Ridge projection followed by cosine nearest-neighbor retrieval.", |
| "neural_baseline": "One-hidden-layer MLP projection followed by the same cosine retrieval evaluator.", |
| "metric_name": "MRR", |
| "metric_key": "mrr", |
| "metric_direction": "higher", |
| "current_limit": "This checks calibrated multi-view signal, but it is still feature retrieval, not NeRF, Gaussian Splatting, or novel-view synthesis.", |
| }, |
| ), |
| ( |
| "action_phase_progress", |
| { |
| "direction": "C", |
| "direction_name": "Egocentric Vision & Interaction", |
| "name": "Action Phase Progress Estimation", |
| "family": "regression", |
| "case_study": "Inside a Pour coffee action segment, estimate whether the current window is near the beginning, middle, or end of that action.", |
| "input": "Current non-caption multimodal feature vector, so the label text cannot be copied directly from the language block.", |
| "middle_process": "Convert contiguous action-label runs into a normalized 0-to-1 progress target, train on earlier windows, and regress progress for later windows.", |
| "output": "A scalar progress value between 0.0 and 1.0 for the current action segment.", |
| "minimal_baseline": "Ridge regressor on standardized non-caption features.", |
| "neural_baseline": "One-hidden-layer MLP regressor on the same input features.", |
| "metric_name": "MAE", |
| "metric_key": "mae", |
| "metric_direction": "lower", |
| "current_limit": "This is an action-structure probe inside one episode, not a general intent model across homes, people, or tasks.", |
| }, |
| ), |
| ( |
| "ego_motion_forecast", |
| { |
| "direction": "D", |
| "direction_name": "Scene Reconstruction & World Modeling", |
| "name": "Short-Horizon Ego-Motion Forecasting", |
| "family": "forecast", |
| "case_study": "From the current sensors, predict how the camera translation will change over the next 20 frames while the wearer moves through the scene.", |
| "input": "Current multimodal features excluding the camera-translation block and caption text.", |
| "middle_process": "Build a future target from camera-translation difference at a four-window horizon, then regress that future ego-motion delta from current sensors.", |
| "output": "A future camera-translation delta vector.", |
| "minimal_baseline": "Ridge regressor with a 20-frame forecast horizon.", |
| "neural_baseline": "One-hidden-layer MLP regressor with the same horizon and split.", |
| "metric_name": "MAE", |
| "metric_key": "mae", |
| "metric_direction": "lower", |
| "current_limit": "This is a compact world-model proxy; it does not build a persistent map, scene graph, or object permanence model.", |
| }, |
| ), |
| ] |
| ) |
|
|
|
|
| def parse_args() -> argparse.Namespace: |
| parser = argparse.ArgumentParser(description="Run four research-direction extension probes.") |
| parser.add_argument("--results-dir", type=Path, default=RESULTS) |
| parser.add_argument("--output-dir", type=Path, default=OUT_DIR) |
| parser.add_argument("--train-fraction", type=float, default=0.70) |
| parser.add_argument("--ridge-l2", type=float, default=10.0) |
| parser.add_argument("--seed", type=int, default=7) |
| parser.add_argument("--future-windows", type=int, default=4) |
| parser.add_argument("--neural-epochs", type=int, default=25) |
| parser.add_argument("--neural-hidden-dim", type=int, default=128) |
| parser.add_argument("--neural-batch-size", type=int, default=128) |
| parser.add_argument("--neural-learning-rate", type=float, default=1e-3) |
| parser.add_argument("--neural-weight-decay", type=float, default=1e-4) |
| parser.add_argument("--skip-neural", action="store_true") |
| return parser.parse_args() |
|
|
|
|
| def write_json(path: Path, payload: dict[str, Any] | list[Any]) -> None: |
| path.parent.mkdir(parents=True, exist_ok=True) |
| path.write_text(json.dumps(payload, indent=2), encoding="utf-8") |
|
|
|
|
| def write_csv(path: Path, rows: list[dict[str, Any]], fieldnames: list[str]) -> None: |
| path.parent.mkdir(parents=True, exist_ok=True) |
| with path.open("w", newline="", encoding="utf-8") as handle: |
| writer = csv.DictWriter(handle, fieldnames=fieldnames, lineterminator="\n") |
| writer.writeheader() |
| writer.writerows(rows) |
|
|
|
|
| def load_windows_csv(path: Path) -> list[dict[str, str]]: |
| with path.open("r", newline="", encoding="utf-8") as handle: |
| return list(csv.DictReader(handle)) |
|
|
|
|
| def load_inputs(results_dir: Path) -> tuple[np.ndarray, np.ndarray, np.ndarray, list[dict[str, str]], list[dict[str, Any]]]: |
| npz_path = results_dir / "shared_windows.npz" |
| windows_path = results_dir / "windows.csv" |
| manifest_path = results_dir / "feature_manifest.json" |
| if not npz_path.exists(): |
| raise FileNotFoundError(f"Missing {npz_path}. Run scripts/episode_task_suite.py first.") |
| z = np.load(npz_path, allow_pickle=False) |
| X = np.asarray(z["X"], dtype=np.float32) |
| starts = np.asarray(z["starts"], dtype=np.int64) |
| ends = np.asarray(z["ends"], dtype=np.int64) |
| X = np.nan_to_num(X, nan=0.0, posinf=0.0, neginf=0.0) |
| rows = load_windows_csv(windows_path) |
| manifest = json.loads(manifest_path.read_text(encoding="utf-8")) |
| if len(rows) != len(X): |
| raise ValueError(f"windows.csv has {len(rows)} rows but shared_windows.npz has {len(X)} windows.") |
| return X, starts, ends, rows, manifest |
|
|
|
|
| def block_indices(manifest: list[dict[str, Any]], include: list[str] | None = None, exclude: list[str] | None = None) -> np.ndarray: |
| include = include or [] |
| exclude = exclude or [] |
| idxs: list[int] = [] |
| for block in manifest: |
| name = str(block["name"]) |
| if include and not any(name == p or name.startswith(p) for p in include): |
| continue |
| if exclude and any(name == p or name.startswith(p) for p in exclude): |
| continue |
| idxs.extend(range(int(block["start"]), int(block["end"]))) |
| return np.asarray(idxs, dtype=np.int64) |
|
|
|
|
| def chronological_split(n: int, train_fraction: float) -> tuple[np.ndarray, np.ndarray]: |
| if n < 2: |
| raise ValueError("Need at least two examples.") |
| split = int(round(n * train_fraction)) |
| split = max(1, min(split, n - 1)) |
| return np.arange(split, dtype=np.int64), np.arange(split, n, dtype=np.int64) |
|
|
|
|
| def standardize_train_test(X_train: np.ndarray, X_test: np.ndarray) -> tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray]: |
| mean = X_train.mean(axis=0, dtype=np.float64).astype(np.float32) |
| std = X_train.std(axis=0, dtype=np.float64).astype(np.float32) |
| std[std < 1e-6] = 1.0 |
| return (X_train - mean) / std, (X_test - mean) / std, mean, std |
|
|
|
|
| def ridge_predict( |
| X_train: np.ndarray, |
| Y_train: np.ndarray, |
| X_test: np.ndarray, |
| *, |
| l2: float, |
| standardize_y: bool, |
| ) -> np.ndarray: |
| Xtr, Xte, _, _ = standardize_train_test(X_train.astype(np.float32), X_test.astype(np.float32)) |
| Y = np.asarray(Y_train, dtype=np.float32) |
| if Y.ndim == 1: |
| Y = Y[:, None] |
| if standardize_y: |
| y_mean = Y.mean(axis=0, dtype=np.float64).astype(np.float32) |
| y_std = Y.std(axis=0, dtype=np.float64).astype(np.float32) |
| y_std[y_std < 1e-6] = 1.0 |
| Y_work = (Y - y_mean) / y_std |
| else: |
| y_mean = np.zeros(Y.shape[1], dtype=np.float32) |
| y_std = np.ones(Y.shape[1], dtype=np.float32) |
| Y_work = Y |
|
|
| K = Xtr @ Xtr.T |
| K.flat[:: K.shape[0] + 1] += float(l2) |
| alpha = np.linalg.solve(K.astype(np.float64), Y_work.astype(np.float64)).astype(np.float32) |
| pred = (Xte @ Xtr.T @ alpha).astype(np.float32) |
| return pred * y_std + y_mean |
|
|
|
|
| def ridge_classifier(X_train: np.ndarray, y_train: np.ndarray, X_test: np.ndarray, l2: float) -> tuple[np.ndarray, np.ndarray]: |
| classes = np.asarray(sorted(set(int(v) for v in y_train)), dtype=np.int64) |
| class_to_col = {int(cls): i for i, cls in enumerate(classes)} |
| Y = np.zeros((len(y_train), len(classes)), dtype=np.float32) |
| for row, label in enumerate(y_train): |
| Y[row, class_to_col[int(label)]] = 1.0 |
| scores = ridge_predict(X_train, Y, X_test, l2=l2, standardize_y=False) |
| pred = classes[np.argmax(scores, axis=1)] |
| return pred.astype(np.int64), scores |
|
|
|
|
| def binary_metrics(y_true: np.ndarray, y_pred: np.ndarray) -> dict[str, float | int]: |
| y_true = y_true.astype(np.int64) |
| y_pred = y_pred.astype(np.int64) |
| accuracy = float(np.mean(y_true == y_pred)) |
| per_class_f1 = [] |
| for cls in (0, 1): |
| tp = int(np.sum((y_true == cls) & (y_pred == cls))) |
| fp = int(np.sum((y_true != cls) & (y_pred == cls))) |
| fn = int(np.sum((y_true == cls) & (y_pred != cls))) |
| precision = tp / max(tp + fp, 1) |
| recall = tp / max(tp + fn, 1) |
| f1 = 2 * precision * recall / max(precision + recall, 1e-12) |
| per_class_f1.append(f1) |
| return { |
| "accuracy": accuracy, |
| "macro_f1": float(np.mean(per_class_f1)), |
| "positive_rate_true": float(np.mean(y_true)), |
| "positive_rate_pred": float(np.mean(y_pred)), |
| "num_test": int(len(y_true)), |
| } |
|
|
|
|
| def regression_metrics(y_true: np.ndarray, y_pred: np.ndarray) -> dict[str, float | int]: |
| y_true = np.asarray(y_true, dtype=np.float32) |
| y_pred = np.asarray(y_pred, dtype=np.float32) |
| err = y_pred - y_true |
| mse = float(np.mean(err * err)) |
| mae = float(np.mean(np.abs(err))) |
| denom = float(np.sum((y_true - y_true.mean(axis=0, keepdims=True)) ** 2)) |
| numer = float(np.sum(err * err)) |
| r2 = 1.0 - numer / max(denom, 1e-12) |
| return {"mse": mse, "mae": mae, "r2": r2, "num_test": int(len(y_true))} |
|
|
|
|
| def row_normalize(X: np.ndarray) -> np.ndarray: |
| denom = np.linalg.norm(X, axis=1, keepdims=True) |
| denom[denom < 1e-8] = 1.0 |
| return X / denom |
|
|
|
|
| def retrieval_metrics(query_pred: np.ndarray, target_test: np.ndarray) -> tuple[dict[str, float | int], list[dict[str, Any]]]: |
| Q = row_normalize(np.asarray(query_pred, dtype=np.float32)) |
| T = row_normalize(np.asarray(target_test, dtype=np.float32)) |
| sims = Q @ T.T |
| ranks = [] |
| rows = [] |
| for i in range(sims.shape[0]): |
| order = np.argsort(-sims[i]) |
| rank = int(np.flatnonzero(order == i)[0]) + 1 |
| ranks.append(rank) |
| rows.append( |
| { |
| "test_position": i, |
| "true_rank": rank, |
| "top_candidate_position": int(order[0]), |
| "top_candidate_score": float(sims[i, order[0]]), |
| "true_score": float(sims[i, i]), |
| } |
| ) |
| ranks_array = np.asarray(ranks, dtype=np.float32) |
| metrics = { |
| "mrr": float(np.mean(1.0 / ranks_array)), |
| "top1": float(np.mean(ranks_array <= 1)), |
| "top5": float(np.mean(ranks_array <= 5)), |
| "top10": float(np.mean(ranks_array <= 10)), |
| "median_rank": float(np.median(ranks_array)), |
| "num_test": int(len(ranks)), |
| } |
| return metrics, rows |
|
|
|
|
| def choose_score(task: str, metrics: dict[str, Any]) -> float: |
| spec = TASK_SPECS[task] |
| value = float(metrics[spec["metric_key"]]) |
| if spec["metric_direction"] == "higher": |
| return value |
| return max(0.0, 1.0 - value) |
|
|
|
|
| def train_neural( |
| X_train: np.ndarray, |
| Y_train: np.ndarray, |
| X_test: np.ndarray, |
| *, |
| task_type: str, |
| args: argparse.Namespace, |
| ) -> tuple[np.ndarray, dict[str, Any]]: |
| try: |
| import torch |
| from torch import nn |
| from torch.utils.data import DataLoader, TensorDataset |
| except Exception as exc: |
| return np.empty((len(X_test), 0), dtype=np.float32), {"available": False, "reason": f"torch unavailable: {exc}"} |
|
|
| rng = np.random.default_rng(args.seed) |
| torch.manual_seed(args.seed) |
| Xtr, Xte, _, _ = standardize_train_test(X_train.astype(np.float32), X_test.astype(np.float32)) |
| Y = np.asarray(Y_train, dtype=np.float32) |
| if Y.ndim == 1: |
| Y = Y[:, None] |
| if task_type == "classification": |
| Y_work = Y |
| y_mean = np.zeros(Y.shape[1], dtype=np.float32) |
| y_std = np.ones(Y.shape[1], dtype=np.float32) |
| else: |
| y_mean = Y.mean(axis=0, dtype=np.float64).astype(np.float32) |
| y_std = Y.std(axis=0, dtype=np.float64).astype(np.float32) |
| y_std[y_std < 1e-6] = 1.0 |
| Y_work = (Y - y_mean) / y_std |
|
|
| device = torch.device("cpu") |
| model = nn.Sequential( |
| nn.Linear(Xtr.shape[1], args.neural_hidden_dim), |
| nn.GELU(), |
| nn.Dropout(0.08), |
| nn.Linear(args.neural_hidden_dim, Y_work.shape[1]), |
| ).to(device) |
| if task_type == "classification": |
| loss_fn = nn.BCEWithLogitsLoss() |
| else: |
| loss_fn = nn.MSELoss() |
| opt = torch.optim.AdamW(model.parameters(), lr=args.neural_learning_rate, weight_decay=args.neural_weight_decay) |
|
|
| order = np.arange(len(Xtr)) |
| dataset = TensorDataset(torch.from_numpy(Xtr), torch.from_numpy(Y_work.astype(np.float32))) |
| loader = DataLoader(dataset, batch_size=args.neural_batch_size, shuffle=False) |
| history = [] |
| for epoch in range(args.neural_epochs): |
| rng.shuffle(order) |
| if len(order) == len(dataset): |
| X_epoch = torch.from_numpy(Xtr[order]) |
| Y_epoch = torch.from_numpy(Y_work.astype(np.float32)[order]) |
| loader = DataLoader(TensorDataset(X_epoch, Y_epoch), batch_size=args.neural_batch_size, shuffle=False) |
| model.train() |
| total_loss = 0.0 |
| total_seen = 0 |
| for xb, yb in loader: |
| xb = xb.to(device) |
| yb = yb.to(device) |
| opt.zero_grad(set_to_none=True) |
| loss = loss_fn(model(xb), yb) |
| loss.backward() |
| opt.step() |
| total_loss += float(loss.item()) * len(xb) |
| total_seen += len(xb) |
| history.append(total_loss / max(total_seen, 1)) |
|
|
| model.eval() |
| with torch.no_grad(): |
| raw = model(torch.from_numpy(Xte).to(device)).cpu().numpy().astype(np.float32) |
| if task_type == "classification": |
| pred = 1.0 / (1.0 + np.exp(-raw)) |
| else: |
| pred = raw * y_std + y_mean |
| return pred, {"available": True, "epochs": args.neural_epochs, "hidden_dim": args.neural_hidden_dim, "loss_history": history} |
|
|
|
|
| def action_progress_targets(rows: list[dict[str, str]]) -> np.ndarray: |
| labels = [row.get("action_label", "") or "" for row in rows] |
| progress = np.zeros(len(labels), dtype=np.float32) |
| start = 0 |
| while start < len(labels): |
| end = start + 1 |
| while end < len(labels) and labels[end] == labels[start]: |
| end += 1 |
| length = end - start |
| if length > 1: |
| progress[start:end] = np.linspace(0.0, 1.0, length, dtype=np.float32) |
| start = end |
| return progress |
|
|
|
|
| def task_body_motion_intensity(X: np.ndarray, rows: list[dict[str, str]], manifest: list[dict[str, Any]], args: argparse.Namespace) -> dict[str, Any]: |
| mocap_idx = block_indices(manifest, include=["hand_left_joints", "hand_right_joints", "body_joints"]) |
| input_idx = block_indices(manifest, exclude=["hand_left_joints", "hand_right_joints", "body_joints", "body_contacts"]) |
| valid = np.arange(1, len(X), dtype=np.int64) |
| motion = np.linalg.norm(X[valid][:, mocap_idx] - X[valid - 1][:, mocap_idx], axis=1) |
| train_local, test_local = chronological_split(len(valid), args.train_fraction) |
| threshold = float(np.median(motion[train_local])) |
| y = (motion >= threshold).astype(np.int64) |
| Xv = X[valid][:, input_idx] |
| y_pred_min, scores = ridge_classifier(Xv[train_local], y[train_local], Xv[test_local], args.ridge_l2) |
| min_metrics = binary_metrics(y[test_local], y_pred_min) |
| min_rows = [] |
| for local_pos, pred, score_pair in zip(test_local, y_pred_min, scores): |
| idx = int(valid[int(local_pos)]) |
| min_rows.append( |
| { |
| "window_index": idx, |
| "center_frame": rows[idx]["center_frame"], |
| "motion_energy": float(motion[int(local_pos)]), |
| "true_label": "high_motion" if y[int(local_pos)] else "low_motion", |
| "pred_label": "high_motion" if int(pred) else "low_motion", |
| "score_low": float(score_pair[0]) if len(score_pair) > 0 else "", |
| "score_high": float(score_pair[1]) if len(score_pair) > 1 else "", |
| } |
| ) |
|
|
| neural = {"available": False, "reason": "skipped by flag"} |
| neural_metrics = None |
| neural_rows: list[dict[str, Any]] = [] |
| if not args.skip_neural: |
| prob, neural = train_neural(Xv[train_local], y[train_local].astype(np.float32), Xv[test_local], task_type="classification", args=args) |
| if neural.get("available") and prob.size: |
| pred = (prob[:, 0] >= 0.5).astype(np.int64) |
| neural_metrics = binary_metrics(y[test_local], pred) |
| for local_pos, p, pr in zip(test_local, pred, prob[:, 0]): |
| idx = int(valid[int(local_pos)]) |
| neural_rows.append( |
| { |
| "window_index": idx, |
| "center_frame": rows[idx]["center_frame"], |
| "motion_energy": float(motion[int(local_pos)]), |
| "true_label": "high_motion" if y[int(local_pos)] else "low_motion", |
| "pred_label": "high_motion" if int(p) else "low_motion", |
| "prob_high": float(pr), |
| } |
| ) |
|
|
| write_csv(OUT_DIR / "body_motion_intensity_minimal_predictions.csv", min_rows, list(min_rows[0].keys())) |
| if neural_rows: |
| write_csv(OUT_DIR / "body_motion_intensity_neural_predictions.csv", neural_rows, list(neural_rows[0].keys())) |
| return { |
| "train_windows": int(len(train_local)), |
| "test_windows": int(len(test_local)), |
| "target_threshold_train_median": threshold, |
| "input_dim": int(len(input_idx)), |
| "target_source": "hand/body joint delta between neighboring windows", |
| "minimal": min_metrics, |
| "neural_mlp": neural_metrics, |
| "neural_training": neural, |
| } |
|
|
|
|
| def task_multi_view_retrieval(X: np.ndarray, rows: list[dict[str, str]], manifest: list[dict[str, Any]], args: argparse.Namespace) -> dict[str, Any]: |
| query_idx = block_indices(manifest, include=["video_fisheye_cam0"]) |
| target_idx = block_indices(manifest, include=["video_stereo_left"]) |
| if len(query_idx) == 0 or len(target_idx) == 0: |
| raise ValueError("Expected video_fisheye_cam0 and video_stereo_left feature blocks.") |
| train, test = chronological_split(len(X), args.train_fraction) |
| Xq = X[:, query_idx] |
| Yt = X[:, target_idx] |
| pred_min = ridge_predict(Xq[train], Yt[train], Xq[test], l2=args.ridge_l2, standardize_y=True) |
| min_metrics, min_rows = retrieval_metrics(pred_min, Yt[test]) |
| for row in min_rows: |
| idx = int(test[int(row["test_position"])]) |
| row["window_index"] = idx |
| row["center_frame"] = rows[idx]["center_frame"] |
| write_csv(OUT_DIR / "multi_view_consistency_minimal_ranks.csv", min_rows, list(min_rows[0].keys())) |
|
|
| neural = {"available": False, "reason": "skipped by flag"} |
| neural_metrics = None |
| neural_rows: list[dict[str, Any]] = [] |
| if not args.skip_neural: |
| pred_neural, neural = train_neural(Xq[train], Yt[train], Xq[test], task_type="projection", args=args) |
| if neural.get("available") and pred_neural.size: |
| neural_metrics, neural_rows = retrieval_metrics(pred_neural, Yt[test]) |
| for row in neural_rows: |
| idx = int(test[int(row["test_position"])]) |
| row["window_index"] = idx |
| row["center_frame"] = rows[idx]["center_frame"] |
| write_csv(OUT_DIR / "multi_view_consistency_neural_ranks.csv", neural_rows, list(neural_rows[0].keys())) |
|
|
| return { |
| "train_windows": int(len(train)), |
| "test_windows": int(len(test)), |
| "query_block": "video_fisheye_cam0", |
| "target_block": "video_stereo_left", |
| "query_dim": int(len(query_idx)), |
| "target_dim": int(len(target_idx)), |
| "minimal": min_metrics, |
| "neural_mlp": neural_metrics, |
| "neural_training": neural, |
| } |
|
|
|
|
| def task_action_phase_progress(X: np.ndarray, rows: list[dict[str, str]], manifest: list[dict[str, Any]], args: argparse.Namespace) -> dict[str, Any]: |
| input_idx = block_indices(manifest, exclude=["caption_objects_interaction_text"]) |
| target = action_progress_targets(rows) |
| train, test = chronological_split(len(X), args.train_fraction) |
| pred_min = ridge_predict(X[train][:, input_idx], target[train], X[test][:, input_idx], l2=args.ridge_l2, standardize_y=True)[:, 0] |
| pred_min = np.clip(pred_min, 0.0, 1.0) |
| min_metrics = regression_metrics(target[test], pred_min) |
| min_rows = [] |
| for local_pos, pred in zip(test, pred_min): |
| idx = int(local_pos) |
| min_rows.append( |
| { |
| "window_index": idx, |
| "center_frame": rows[idx]["center_frame"], |
| "action_label": rows[idx]["action_label"], |
| "true_progress": float(target[idx]), |
| "pred_progress": float(pred), |
| "absolute_error": float(abs(pred - target[idx])), |
| } |
| ) |
| write_csv(OUT_DIR / "action_phase_progress_minimal_predictions.csv", min_rows, list(min_rows[0].keys())) |
|
|
| neural = {"available": False, "reason": "skipped by flag"} |
| neural_metrics = None |
| neural_rows: list[dict[str, Any]] = [] |
| if not args.skip_neural: |
| pred_neural, neural = train_neural(X[train][:, input_idx], target[train], X[test][:, input_idx], task_type="regression", args=args) |
| if neural.get("available") and pred_neural.size: |
| values = np.clip(pred_neural[:, 0], 0.0, 1.0) |
| neural_metrics = regression_metrics(target[test], values) |
| for local_pos, pred in zip(test, values): |
| idx = int(local_pos) |
| neural_rows.append( |
| { |
| "window_index": idx, |
| "center_frame": rows[idx]["center_frame"], |
| "action_label": rows[idx]["action_label"], |
| "true_progress": float(target[idx]), |
| "pred_progress": float(pred), |
| "absolute_error": float(abs(pred - target[idx])), |
| } |
| ) |
| write_csv(OUT_DIR / "action_phase_progress_neural_predictions.csv", neural_rows, list(neural_rows[0].keys())) |
|
|
| return { |
| "train_windows": int(len(train)), |
| "test_windows": int(len(test)), |
| "input_dim": int(len(input_idx)), |
| "target_source": "normalized position inside contiguous action-label runs", |
| "minimal": min_metrics, |
| "neural_mlp": neural_metrics, |
| "neural_training": neural, |
| } |
|
|
|
|
| def task_ego_motion_forecast(X: np.ndarray, rows: list[dict[str, str]], manifest: list[dict[str, Any]], args: argparse.Namespace) -> dict[str, Any]: |
| input_idx = block_indices(manifest, exclude=["camera_translation", "caption_objects_interaction_text"]) |
| target_idx = block_indices(manifest, include=["camera_translation"]) |
| horizon = int(args.future_windows) |
| valid = np.arange(0, len(X) - horizon, dtype=np.int64) |
| target = X[valid + horizon][:, target_idx] - X[valid][:, target_idx] |
| Xv = X[valid][:, input_idx] |
| train, test = chronological_split(len(valid), args.train_fraction) |
| pred_min = ridge_predict(Xv[train], target[train], Xv[test], l2=args.ridge_l2, standardize_y=True) |
| min_metrics = regression_metrics(target[test], pred_min) |
| min_rows = [] |
| for local_pos, pred in zip(test, pred_min): |
| idx = int(valid[int(local_pos)]) |
| true_delta = target[int(local_pos)] |
| min_rows.append( |
| { |
| "window_index": idx, |
| "center_frame": rows[idx]["center_frame"], |
| "future_window_index": int(idx + horizon), |
| "delta_l2_true": float(np.linalg.norm(true_delta)), |
| "delta_l2_pred": float(np.linalg.norm(pred)), |
| "delta_l2_error": float(np.linalg.norm(pred - true_delta)), |
| } |
| ) |
| write_csv(OUT_DIR / "ego_motion_forecast_minimal_predictions.csv", min_rows, list(min_rows[0].keys())) |
|
|
| neural = {"available": False, "reason": "skipped by flag"} |
| neural_metrics = None |
| neural_rows: list[dict[str, Any]] = [] |
| if not args.skip_neural: |
| pred_neural, neural = train_neural(Xv[train], target[train], Xv[test], task_type="regression", args=args) |
| if neural.get("available") and pred_neural.size: |
| neural_metrics = regression_metrics(target[test], pred_neural) |
| for local_pos, pred in zip(test, pred_neural): |
| idx = int(valid[int(local_pos)]) |
| true_delta = target[int(local_pos)] |
| neural_rows.append( |
| { |
| "window_index": idx, |
| "center_frame": rows[idx]["center_frame"], |
| "future_window_index": int(idx + horizon), |
| "delta_l2_true": float(np.linalg.norm(true_delta)), |
| "delta_l2_pred": float(np.linalg.norm(pred)), |
| "delta_l2_error": float(np.linalg.norm(pred - true_delta)), |
| } |
| ) |
| write_csv(OUT_DIR / "ego_motion_forecast_neural_predictions.csv", neural_rows, list(neural_rows[0].keys())) |
|
|
| return { |
| "train_windows": int(len(train)), |
| "test_windows": int(len(test)), |
| "forecast_horizon_windows": horizon, |
| "forecast_horizon_frames": int(horizon * 5), |
| "input_dim": int(len(input_idx)), |
| "target_dim": int(len(target_idx)), |
| "target_source": "future minus current camera_translation feature block", |
| "minimal": min_metrics, |
| "neural_mlp": neural_metrics, |
| "neural_training": neural, |
| } |
|
|
|
|
| def fmt_metric(value: float | None, metric_key: str) -> str: |
| if value is None: |
| return "n/a" |
| if metric_key in {"mae", "mse"}: |
| return f"{value:.4f}" |
| return f"{value:.4f}" |
|
|
|
|
| def task_main_metric(task: str, result: dict[str, Any], baseline: str) -> float | None: |
| metrics = result.get(baseline) |
| if not metrics: |
| return None |
| key = TASK_SPECS[task]["metric_key"] |
| value = metrics.get(key) |
| return float(value) if value is not None else None |
|
|
|
|
| def write_markdown(payload: dict[str, Any]) -> None: |
| lines = [ |
| "# Four-Direction Extension Task Baselines", |
| "", |
| "Generated by `scripts/research_direction_extension_tasks.py` from the committed single-episode feature tensor.", |
| "These are data-backed extension probes that show how each research direction can be started from Xperience-10M modalities.", |
| "Cross-episode generalization and full direction completion require later held-out experiments.", |
| "", |
| "## Summary", |
| "", |
| "| Direction | Extension task | Minimal | Neural MLP | Meaning |", |
| "| --- | --- | ---: | ---: | --- |", |
| ] |
| for task, spec in TASK_SPECS.items(): |
| result = payload["tasks"][task] |
| key = spec["metric_key"] |
| min_value = task_main_metric(task, result, "minimal") |
| nn_value = task_main_metric(task, result, "neural_mlp") |
| lines.append( |
| f"| {spec['direction']}. {spec['direction_name']} | {spec['name']} | {fmt_metric(min_value, key)} {spec['metric_name']} | {fmt_metric(nn_value, key)} {spec['metric_name']} | {spec['current_limit']} |" |
| ) |
|
|
| lines.extend(["", "## Task Details", ""]) |
| for task, spec in TASK_SPECS.items(): |
| result = payload["tasks"][task] |
| key = spec["metric_key"] |
| lines.extend( |
| [ |
| f"### {spec['direction']}. {spec['name']}", |
| "", |
| f"- Case study: {spec['case_study']}", |
| f"- Input: {spec['input']}", |
| f"- Middle process modules: {spec['middle_process']}", |
| f"- Output: {spec['output']}", |
| f"- Minimal baseline: {spec['minimal_baseline']}", |
| f"- Neural baseline: {spec['neural_baseline']}", |
| f"- Minimal result: {fmt_metric(task_main_metric(task, result, 'minimal'), key)} {spec['metric_name']}", |
| f"- Neural result: {fmt_metric(task_main_metric(task, result, 'neural_mlp'), key)} {spec['metric_name']}", |
| f"- Limitation: {spec['current_limit']}", |
| "", |
| ] |
| ) |
|
|
| (OUT_DIR / "research_direction_extension_summary.md").write_text("\n".join(lines).rstrip() + "\n", encoding="utf-8") |
|
|
|
|
| def svg_text(x: int, y: int, text: str, size: int = 16, weight: int = 500, color: str = "#f4f8ef") -> str: |
| return ( |
| f'<text x="{x}" y="{y}" font-size="{size}" font-weight="{weight}" ' |
| f'fill="{color}">{html.escape(text)}</text>' |
| ) |
|
|
|
|
| def write_svg(payload: dict[str, Any]) -> None: |
| CHARTS.mkdir(parents=True, exist_ok=True) |
| width = 1420 |
| height = 920 |
| colors = {"A": "#ccffa0", "B": "#7ae5c3", "C": "#d8f4a5", "D": "#9bdfff"} |
| svg: list[str] = [ |
| f'<svg xmlns="http://www.w3.org/2000/svg" width="{width}" height="{height}" viewBox="0 0 {width} {height}">', |
| '<rect width="1420" height="920" fill="#020502"/>', |
| '<rect x="28" y="28" width="1364" height="864" rx="18" fill="#050905" stroke="#ccffa0" stroke-opacity="0.24"/>', |
| svg_text(66, 88, "Ropedia Xperience-10M: four direction extension probes", 32, 760), |
| svg_text(66, 122, "Data-backed from the same 1,161-window public sample feature tensor; extension probes for later held-out studies.", 17, 500, "#a5afa2"), |
| ] |
| x0 = 66 |
| y0 = 166 |
| card_w = 620 |
| card_h = 160 |
| gap_x = 44 |
| gap_y = 34 |
| for i, (task, spec) in enumerate(TASK_SPECS.items()): |
| result = payload["tasks"][task] |
| row = i // 2 |
| col = i % 2 |
| x = x0 + col * (card_w + gap_x) |
| y = y0 + row * (card_h + gap_y) |
| color = colors[spec["direction"]] |
| min_v = task_main_metric(task, result, "minimal") |
| nn_v = task_main_metric(task, result, "neural_mlp") |
| metric = spec["metric_name"] |
| svg.extend( |
| [ |
| f'<rect x="{x}" y="{y}" width="{card_w}" height="{card_h}" rx="10" fill="#071207" stroke="#ccffa0" stroke-opacity="0.22"/>', |
| f'<rect x="{x}" y="{y}" width="10" height="{card_h}" rx="5" fill="{color}"/>', |
| f'<circle cx="{x + 42}" cy="{y + 40}" r="24" fill="{color}" opacity="0.14"/>', |
| svg_text(x + 32, y + 48, spec["direction"], 21, 760, color), |
| svg_text(x + 76, y + 35, spec["name"], 20, 760), |
| svg_text(x + 76, y + 62, spec["direction_name"], 13, 650, "#a5afa2"), |
| svg_text(x + 76, y + 94, f"Minimal: {fmt_metric(min_v, spec['metric_key'])} {metric}", 16, 700, "#f4f8ef"), |
| svg_text(x + 300, y + 94, f"Neural MLP: {fmt_metric(nn_v, spec['metric_key'])} {metric}", 16, 700, "#f4f8ef"), |
| svg_text(x + 76, y + 125, spec["output"], 13, 500, "#dce8d7"), |
| ] |
| ) |
| min_score = choose_score(task, result["minimal"]) |
| nn_score = choose_score(task, result["neural_mlp"]) if result.get("neural_mlp") else 0.0 |
| bar_x = x + 76 |
| bar_y = y + 138 |
| bar_w = 440 |
| svg.append(f'<rect x="{bar_x}" y="{bar_y}" width="{bar_w}" height="8" rx="4" fill="#ccffa0" opacity="0.14"/>') |
| svg.append(f'<rect x="{bar_x}" y="{bar_y}" width="{max(4, min(bar_w, bar_w * min_score)):.1f}" height="8" rx="4" fill="{color}" opacity="0.72"/>') |
| svg.append(f'<rect x="{bar_x}" y="{bar_y + 12}" width="{bar_w}" height="8" rx="4" fill="#ccffa0" opacity="0.14"/>') |
| svg.append(f'<rect x="{bar_x}" y="{bar_y + 12}" width="{max(4, min(bar_w, bar_w * nn_score)):.1f}" height="8" rx="4" fill="#ffffff" opacity="0.78"/>') |
|
|
| legend_y = 570 |
| svg.extend( |
| [ |
| svg_text(66, legend_y, "How to read this", 24, 760), |
| svg_text(66, legend_y + 34, "Each card adds one concrete task to a research direction using existing sample modalities.", 16, 500, "#dce8d7"), |
| svg_text(66, legend_y + 62, "Colored bar: minimal baseline normalized score. White bar: neural MLP normalized score. Lower-is-better MAE is shown as 1 - MAE for bar length only.", 16, 500, "#dce8d7"), |
| '<line x1="66" y1="675" x2="1354" y2="675" stroke="#ccffa0" stroke-opacity="0.18"/>', |
| svg_text(66, 724, "Implementation boundary", 22, 760), |
| svg_text(66, 758, "A: motion-energy proxy, not a full human body model. B: view-feature retrieval, not neural rendering.", 16, 500, "#dce8d7"), |
| svg_text(66, 786, "C: phase-progress regression, not open-world intent. D: ego-motion forecast, not a persistent map.", 16, 500, "#dce8d7"), |
| svg_text(66, 835, "All metrics are computed from held-out chronological windows of the same public sample episode.", 16, 700, "#f4f8ef"), |
| ] |
| ) |
| svg.append("</svg>") |
| (CHARTS / "research_direction_extension_tasks.svg").write_text("\n".join(svg), encoding="utf-8") |
|
|
|
|
| def build_payload(args: argparse.Namespace) -> dict[str, Any]: |
| X, starts, ends, rows, manifest = load_inputs(args.results_dir) |
| global OUT_DIR |
| OUT_DIR = args.output_dir |
| OUT_DIR.mkdir(parents=True, exist_ok=True) |
|
|
| tasks = OrderedDict() |
| tasks["body_motion_intensity"] = task_body_motion_intensity(X, rows, manifest, args) |
| tasks["multi_view_consistency_retrieval"] = task_multi_view_retrieval(X, rows, manifest, args) |
| tasks["action_phase_progress"] = task_action_phase_progress(X, rows, manifest, args) |
| tasks["ego_motion_forecast"] = task_ego_motion_forecast(X, rows, manifest, args) |
|
|
| payload = { |
| "source": { |
| "shared_windows": str((args.results_dir / "shared_windows.npz").relative_to(ROOT)), |
| "windows_csv": str((args.results_dir / "windows.csv").relative_to(ROOT)), |
| "feature_manifest": str((args.results_dir / "feature_manifest.json").relative_to(ROOT)), |
| }, |
| "dataset_scope": { |
| "sample_episode_count": 1, |
| "num_windows": int(len(X)), |
| "feature_dim": int(X.shape[1]), |
| "first_start_frame": int(starts[0]), |
| "last_end_frame": int(ends[-1]), |
| "warning": "Single public sample episode; these extension probes validate task design and pipeline mechanics, not cross-episode generalization.", |
| }, |
| "baselines": { |
| "minimal": "Ridge classifiers/regressors/projections plus cosine retrieval on the committed feature tensor.", |
| "neural_mlp": "Small one-hidden-layer PyTorch MLP heads using the same inputs, targets, chronological split, and evaluator.", |
| }, |
| "run_config": { |
| "train_fraction": float(args.train_fraction), |
| "ridge_l2": float(args.ridge_l2), |
| "seed": int(args.seed), |
| "future_windows": int(args.future_windows), |
| "neural_epochs": int(args.neural_epochs), |
| "neural_hidden_dim": int(args.neural_hidden_dim), |
| "neural_batch_size": int(args.neural_batch_size), |
| "skip_neural": bool(args.skip_neural), |
| }, |
| "task_specs": TASK_SPECS, |
| "tasks": tasks, |
| } |
| return payload |
|
|
|
|
| def main() -> int: |
| args = parse_args() |
| payload = build_payload(args) |
| write_json(args.output_dir / "research_direction_extension_results.json", payload) |
| write_json(DOCS_DATA / "research_direction_extensions.json", payload) |
| write_markdown(payload) |
| write_svg(payload) |
| print(f"Wrote {args.output_dir / 'research_direction_extension_results.json'}") |
| print(f"Wrote {CHARTS / 'research_direction_extension_tasks.svg'}") |
| return 0 |
|
|
|
|
| if __name__ == "__main__": |
| raise SystemExit(main()) |
|
|