#!/usr/bin/env python3
"""Run one extra data-backed probe for each Ropedia research direction.
These tasks reuse the committed single-episode feature tensor generated by
`episode_task_suite.py`. They are extension probes, not full implementations of the
research directions are solved.
"""
from __future__ import annotations
import argparse
import csv
import html
import json
import math
from collections import OrderedDict
from pathlib import Path
from typing import Any
import numpy as np
ROOT = Path(__file__).resolve().parents[1]
RESULTS = ROOT / "results" / "episode_task_suite"
OUT_DIR = RESULTS / "research_direction_extensions"
DOCS_DATA = ROOT / "docs" / "data"
CHARTS = ROOT / "docs" / "assets" / "charts"
WINDOWS_NPZ = RESULTS / "shared_windows.npz"
WINDOWS_CSV = RESULTS / "windows.csv"
FEATURE_MANIFEST = RESULTS / "feature_manifest.json"
TASK_SPECS: OrderedDict[str, dict[str, Any]] = OrderedDict(
[
(
"body_motion_intensity",
{
"direction": "A",
"direction_name": "Human Modeling & Motion Understanding",
"name": "Body and Hand Motion Intensity",
"family": "classification",
"case_study": "A window with a fast reach or pour should be classified as high motion; a steady holding window should be low motion.",
"input": "Current non-mocap feature blocks: video, audio, depth, camera pose/rotation, IMU, SLAM, calibration, and language context.",
"middle_process": "Compute the target from hand/body joint changes between neighboring windows, hide the mocap blocks from the input, then classify high versus low motion using the train-set median as the threshold.",
"output": "Binary label: high_motion or low_motion.",
"minimal_baseline": "Ridge classifier on standardized non-mocap features.",
"neural_baseline": "One-hidden-layer MLP binary classifier on the same input features.",
"metric_name": "macro-F1",
"metric_key": "macro_f1",
"metric_direction": "higher",
"current_limit": "This is a motion-energy proxy, not a SMPL/MANO body model or a generative motion prior.",
},
),
(
"multi_view_consistency_retrieval",
{
"direction": "B",
"direction_name": "3D/4D Reconstruction & Neural Rendering",
"name": "Multi-View Consistency Retrieval",
"family": "retrieval",
"case_study": "Given the fisheye camera features for a pouring moment, retrieve the synchronized stereo-left view from the same time window.",
"input": "Query side: fisheye_cam0 video feature block. Candidate side: stereo_left video feature block from held-out windows.",
"middle_process": "Learn a projection from one camera-view feature space into another, then rank held-out candidate windows by cosine similarity.",
"output": "Ranked candidate windows; the correct synchronized view should rank near the top.",
"minimal_baseline": "Ridge projection followed by cosine nearest-neighbor retrieval.",
"neural_baseline": "One-hidden-layer MLP projection followed by the same cosine retrieval evaluator.",
"metric_name": "MRR",
"metric_key": "mrr",
"metric_direction": "higher",
"current_limit": "This checks calibrated multi-view signal, but it is still feature retrieval, not NeRF, Gaussian Splatting, or novel-view synthesis.",
},
),
(
"action_phase_progress",
{
"direction": "C",
"direction_name": "Egocentric Vision & Interaction",
"name": "Action Phase Progress Estimation",
"family": "regression",
"case_study": "Inside a Pour coffee action segment, estimate whether the current window is near the beginning, middle, or end of that action.",
"input": "Current non-caption multimodal feature vector, so the label text cannot be copied directly from the language block.",
"middle_process": "Convert contiguous action-label runs into a normalized 0-to-1 progress target, train on earlier windows, and regress progress for later windows.",
"output": "A scalar progress value between 0.0 and 1.0 for the current action segment.",
"minimal_baseline": "Ridge regressor on standardized non-caption features.",
"neural_baseline": "One-hidden-layer MLP regressor on the same input features.",
"metric_name": "MAE",
"metric_key": "mae",
"metric_direction": "lower",
"current_limit": "This is an action-structure probe inside one episode, not a general intent model across homes, people, or tasks.",
},
),
(
"ego_motion_forecast",
{
"direction": "D",
"direction_name": "Scene Reconstruction & World Modeling",
"name": "Short-Horizon Ego-Motion Forecasting",
"family": "forecast",
"case_study": "From the current sensors, predict how the camera translation will change over the next 20 frames while the wearer moves through the scene.",
"input": "Current multimodal features excluding the camera-translation block and caption text.",
"middle_process": "Build a future target from camera-translation difference at a four-window horizon, then regress that future ego-motion delta from current sensors.",
"output": "A future camera-translation delta vector.",
"minimal_baseline": "Ridge regressor with a 20-frame forecast horizon.",
"neural_baseline": "One-hidden-layer MLP regressor with the same horizon and split.",
"metric_name": "MAE",
"metric_key": "mae",
"metric_direction": "lower",
"current_limit": "This is a compact world-model proxy; it does not build a persistent map, scene graph, or object permanence model.",
},
),
]
)
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(description="Run four research-direction extension probes.")
parser.add_argument("--results-dir", type=Path, default=RESULTS)
parser.add_argument("--output-dir", type=Path, default=OUT_DIR)
parser.add_argument("--train-fraction", type=float, default=0.70)
parser.add_argument("--ridge-l2", type=float, default=10.0)
parser.add_argument("--seed", type=int, default=7)
parser.add_argument("--future-windows", type=int, default=4)
parser.add_argument("--neural-epochs", type=int, default=25)
parser.add_argument("--neural-hidden-dim", type=int, default=128)
parser.add_argument("--neural-batch-size", type=int, default=128)
parser.add_argument("--neural-learning-rate", type=float, default=1e-3)
parser.add_argument("--neural-weight-decay", type=float, default=1e-4)
parser.add_argument("--skip-neural", action="store_true")
return parser.parse_args()
def write_json(path: Path, payload: dict[str, Any] | list[Any]) -> None:
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(json.dumps(payload, indent=2), encoding="utf-8")
def write_csv(path: Path, rows: list[dict[str, Any]], fieldnames: list[str]) -> None:
path.parent.mkdir(parents=True, exist_ok=True)
with path.open("w", newline="", encoding="utf-8") as handle:
writer = csv.DictWriter(handle, fieldnames=fieldnames, lineterminator="\n")
writer.writeheader()
writer.writerows(rows)
def load_windows_csv(path: Path) -> list[dict[str, str]]:
with path.open("r", newline="", encoding="utf-8") as handle:
return list(csv.DictReader(handle))
def load_inputs(results_dir: Path) -> tuple[np.ndarray, np.ndarray, np.ndarray, list[dict[str, str]], list[dict[str, Any]]]:
npz_path = results_dir / "shared_windows.npz"
windows_path = results_dir / "windows.csv"
manifest_path = results_dir / "feature_manifest.json"
if not npz_path.exists():
raise FileNotFoundError(f"Missing {npz_path}. Run scripts/episode_task_suite.py first.")
z = np.load(npz_path, allow_pickle=False)
X = np.asarray(z["X"], dtype=np.float32)
starts = np.asarray(z["starts"], dtype=np.int64)
ends = np.asarray(z["ends"], dtype=np.int64)
X = np.nan_to_num(X, nan=0.0, posinf=0.0, neginf=0.0)
rows = load_windows_csv(windows_path)
manifest = json.loads(manifest_path.read_text(encoding="utf-8"))
if len(rows) != len(X):
raise ValueError(f"windows.csv has {len(rows)} rows but shared_windows.npz has {len(X)} windows.")
return X, starts, ends, rows, manifest
def block_indices(manifest: list[dict[str, Any]], include: list[str] | None = None, exclude: list[str] | None = None) -> np.ndarray:
include = include or []
exclude = exclude or []
idxs: list[int] = []
for block in manifest:
name = str(block["name"])
if include and not any(name == p or name.startswith(p) for p in include):
continue
if exclude and any(name == p or name.startswith(p) for p in exclude):
continue
idxs.extend(range(int(block["start"]), int(block["end"])))
return np.asarray(idxs, dtype=np.int64)
def chronological_split(n: int, train_fraction: float) -> tuple[np.ndarray, np.ndarray]:
if n < 2:
raise ValueError("Need at least two examples.")
split = int(round(n * train_fraction))
split = max(1, min(split, n - 1))
return np.arange(split, dtype=np.int64), np.arange(split, n, dtype=np.int64)
def standardize_train_test(X_train: np.ndarray, X_test: np.ndarray) -> tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray]:
mean = X_train.mean(axis=0, dtype=np.float64).astype(np.float32)
std = X_train.std(axis=0, dtype=np.float64).astype(np.float32)
std[std < 1e-6] = 1.0
return (X_train - mean) / std, (X_test - mean) / std, mean, std
def ridge_predict(
X_train: np.ndarray,
Y_train: np.ndarray,
X_test: np.ndarray,
*,
l2: float,
standardize_y: bool,
) -> np.ndarray:
Xtr, Xte, _, _ = standardize_train_test(X_train.astype(np.float32), X_test.astype(np.float32))
Y = np.asarray(Y_train, dtype=np.float32)
if Y.ndim == 1:
Y = Y[:, None]
if standardize_y:
y_mean = Y.mean(axis=0, dtype=np.float64).astype(np.float32)
y_std = Y.std(axis=0, dtype=np.float64).astype(np.float32)
y_std[y_std < 1e-6] = 1.0
Y_work = (Y - y_mean) / y_std
else:
y_mean = np.zeros(Y.shape[1], dtype=np.float32)
y_std = np.ones(Y.shape[1], dtype=np.float32)
Y_work = Y
K = Xtr @ Xtr.T
K.flat[:: K.shape[0] + 1] += float(l2)
alpha = np.linalg.solve(K.astype(np.float64), Y_work.astype(np.float64)).astype(np.float32)
pred = (Xte @ Xtr.T @ alpha).astype(np.float32)
return pred * y_std + y_mean
def ridge_classifier(X_train: np.ndarray, y_train: np.ndarray, X_test: np.ndarray, l2: float) -> tuple[np.ndarray, np.ndarray]:
classes = np.asarray(sorted(set(int(v) for v in y_train)), dtype=np.int64)
class_to_col = {int(cls): i for i, cls in enumerate(classes)}
Y = np.zeros((len(y_train), len(classes)), dtype=np.float32)
for row, label in enumerate(y_train):
Y[row, class_to_col[int(label)]] = 1.0
scores = ridge_predict(X_train, Y, X_test, l2=l2, standardize_y=False)
pred = classes[np.argmax(scores, axis=1)]
return pred.astype(np.int64), scores
def binary_metrics(y_true: np.ndarray, y_pred: np.ndarray) -> dict[str, float | int]:
y_true = y_true.astype(np.int64)
y_pred = y_pred.astype(np.int64)
accuracy = float(np.mean(y_true == y_pred))
per_class_f1 = []
for cls in (0, 1):
tp = int(np.sum((y_true == cls) & (y_pred == cls)))
fp = int(np.sum((y_true != cls) & (y_pred == cls)))
fn = int(np.sum((y_true == cls) & (y_pred != cls)))
precision = tp / max(tp + fp, 1)
recall = tp / max(tp + fn, 1)
f1 = 2 * precision * recall / max(precision + recall, 1e-12)
per_class_f1.append(f1)
return {
"accuracy": accuracy,
"macro_f1": float(np.mean(per_class_f1)),
"positive_rate_true": float(np.mean(y_true)),
"positive_rate_pred": float(np.mean(y_pred)),
"num_test": int(len(y_true)),
}
def regression_metrics(y_true: np.ndarray, y_pred: np.ndarray) -> dict[str, float | int]:
y_true = np.asarray(y_true, dtype=np.float32)
y_pred = np.asarray(y_pred, dtype=np.float32)
err = y_pred - y_true
mse = float(np.mean(err * err))
mae = float(np.mean(np.abs(err)))
denom = float(np.sum((y_true - y_true.mean(axis=0, keepdims=True)) ** 2))
numer = float(np.sum(err * err))
r2 = 1.0 - numer / max(denom, 1e-12)
return {"mse": mse, "mae": mae, "r2": r2, "num_test": int(len(y_true))}
def row_normalize(X: np.ndarray) -> np.ndarray:
denom = np.linalg.norm(X, axis=1, keepdims=True)
denom[denom < 1e-8] = 1.0
return X / denom
def retrieval_metrics(query_pred: np.ndarray, target_test: np.ndarray) -> tuple[dict[str, float | int], list[dict[str, Any]]]:
Q = row_normalize(np.asarray(query_pred, dtype=np.float32))
T = row_normalize(np.asarray(target_test, dtype=np.float32))
sims = Q @ T.T
ranks = []
rows = []
for i in range(sims.shape[0]):
order = np.argsort(-sims[i])
rank = int(np.flatnonzero(order == i)[0]) + 1
ranks.append(rank)
rows.append(
{
"test_position": i,
"true_rank": rank,
"top_candidate_position": int(order[0]),
"top_candidate_score": float(sims[i, order[0]]),
"true_score": float(sims[i, i]),
}
)
ranks_array = np.asarray(ranks, dtype=np.float32)
metrics = {
"mrr": float(np.mean(1.0 / ranks_array)),
"top1": float(np.mean(ranks_array <= 1)),
"top5": float(np.mean(ranks_array <= 5)),
"top10": float(np.mean(ranks_array <= 10)),
"median_rank": float(np.median(ranks_array)),
"num_test": int(len(ranks)),
}
return metrics, rows
def choose_score(task: str, metrics: dict[str, Any]) -> float:
spec = TASK_SPECS[task]
value = float(metrics[spec["metric_key"]])
if spec["metric_direction"] == "higher":
return value
return max(0.0, 1.0 - value)
def train_neural(
X_train: np.ndarray,
Y_train: np.ndarray,
X_test: np.ndarray,
*,
task_type: str,
args: argparse.Namespace,
) -> tuple[np.ndarray, dict[str, Any]]:
try:
import torch
from torch import nn
from torch.utils.data import DataLoader, TensorDataset
except Exception as exc: # pragma: no cover - depends on optional torch install
return np.empty((len(X_test), 0), dtype=np.float32), {"available": False, "reason": f"torch unavailable: {exc}"}
rng = np.random.default_rng(args.seed)
torch.manual_seed(args.seed)
Xtr, Xte, _, _ = standardize_train_test(X_train.astype(np.float32), X_test.astype(np.float32))
Y = np.asarray(Y_train, dtype=np.float32)
if Y.ndim == 1:
Y = Y[:, None]
if task_type == "classification":
Y_work = Y
y_mean = np.zeros(Y.shape[1], dtype=np.float32)
y_std = np.ones(Y.shape[1], dtype=np.float32)
else:
y_mean = Y.mean(axis=0, dtype=np.float64).astype(np.float32)
y_std = Y.std(axis=0, dtype=np.float64).astype(np.float32)
y_std[y_std < 1e-6] = 1.0
Y_work = (Y - y_mean) / y_std
device = torch.device("cpu")
model = nn.Sequential(
nn.Linear(Xtr.shape[1], args.neural_hidden_dim),
nn.GELU(),
nn.Dropout(0.08),
nn.Linear(args.neural_hidden_dim, Y_work.shape[1]),
).to(device)
if task_type == "classification":
loss_fn = nn.BCEWithLogitsLoss()
else:
loss_fn = nn.MSELoss()
opt = torch.optim.AdamW(model.parameters(), lr=args.neural_learning_rate, weight_decay=args.neural_weight_decay)
order = np.arange(len(Xtr))
dataset = TensorDataset(torch.from_numpy(Xtr), torch.from_numpy(Y_work.astype(np.float32)))
loader = DataLoader(dataset, batch_size=args.neural_batch_size, shuffle=False)
history = []
for epoch in range(args.neural_epochs):
rng.shuffle(order)
if len(order) == len(dataset):
X_epoch = torch.from_numpy(Xtr[order])
Y_epoch = torch.from_numpy(Y_work.astype(np.float32)[order])
loader = DataLoader(TensorDataset(X_epoch, Y_epoch), batch_size=args.neural_batch_size, shuffle=False)
model.train()
total_loss = 0.0
total_seen = 0
for xb, yb in loader:
xb = xb.to(device)
yb = yb.to(device)
opt.zero_grad(set_to_none=True)
loss = loss_fn(model(xb), yb)
loss.backward()
opt.step()
total_loss += float(loss.item()) * len(xb)
total_seen += len(xb)
history.append(total_loss / max(total_seen, 1))
model.eval()
with torch.no_grad():
raw = model(torch.from_numpy(Xte).to(device)).cpu().numpy().astype(np.float32)
if task_type == "classification":
pred = 1.0 / (1.0 + np.exp(-raw))
else:
pred = raw * y_std + y_mean
return pred, {"available": True, "epochs": args.neural_epochs, "hidden_dim": args.neural_hidden_dim, "loss_history": history}
def action_progress_targets(rows: list[dict[str, str]]) -> np.ndarray:
labels = [row.get("action_label", "") or "" for row in rows]
progress = np.zeros(len(labels), dtype=np.float32)
start = 0
while start < len(labels):
end = start + 1
while end < len(labels) and labels[end] == labels[start]:
end += 1
length = end - start
if length > 1:
progress[start:end] = np.linspace(0.0, 1.0, length, dtype=np.float32)
start = end
return progress
def task_body_motion_intensity(X: np.ndarray, rows: list[dict[str, str]], manifest: list[dict[str, Any]], args: argparse.Namespace) -> dict[str, Any]:
mocap_idx = block_indices(manifest, include=["hand_left_joints", "hand_right_joints", "body_joints"])
input_idx = block_indices(manifest, exclude=["hand_left_joints", "hand_right_joints", "body_joints", "body_contacts"])
valid = np.arange(1, len(X), dtype=np.int64)
motion = np.linalg.norm(X[valid][:, mocap_idx] - X[valid - 1][:, mocap_idx], axis=1)
train_local, test_local = chronological_split(len(valid), args.train_fraction)
threshold = float(np.median(motion[train_local]))
y = (motion >= threshold).astype(np.int64)
Xv = X[valid][:, input_idx]
y_pred_min, scores = ridge_classifier(Xv[train_local], y[train_local], Xv[test_local], args.ridge_l2)
min_metrics = binary_metrics(y[test_local], y_pred_min)
min_rows = []
for local_pos, pred, score_pair in zip(test_local, y_pred_min, scores):
idx = int(valid[int(local_pos)])
min_rows.append(
{
"window_index": idx,
"center_frame": rows[idx]["center_frame"],
"motion_energy": float(motion[int(local_pos)]),
"true_label": "high_motion" if y[int(local_pos)] else "low_motion",
"pred_label": "high_motion" if int(pred) else "low_motion",
"score_low": float(score_pair[0]) if len(score_pair) > 0 else "",
"score_high": float(score_pair[1]) if len(score_pair) > 1 else "",
}
)
neural = {"available": False, "reason": "skipped by flag"}
neural_metrics = None
neural_rows: list[dict[str, Any]] = []
if not args.skip_neural:
prob, neural = train_neural(Xv[train_local], y[train_local].astype(np.float32), Xv[test_local], task_type="classification", args=args)
if neural.get("available") and prob.size:
pred = (prob[:, 0] >= 0.5).astype(np.int64)
neural_metrics = binary_metrics(y[test_local], pred)
for local_pos, p, pr in zip(test_local, pred, prob[:, 0]):
idx = int(valid[int(local_pos)])
neural_rows.append(
{
"window_index": idx,
"center_frame": rows[idx]["center_frame"],
"motion_energy": float(motion[int(local_pos)]),
"true_label": "high_motion" if y[int(local_pos)] else "low_motion",
"pred_label": "high_motion" if int(p) else "low_motion",
"prob_high": float(pr),
}
)
write_csv(OUT_DIR / "body_motion_intensity_minimal_predictions.csv", min_rows, list(min_rows[0].keys()))
if neural_rows:
write_csv(OUT_DIR / "body_motion_intensity_neural_predictions.csv", neural_rows, list(neural_rows[0].keys()))
return {
"train_windows": int(len(train_local)),
"test_windows": int(len(test_local)),
"target_threshold_train_median": threshold,
"input_dim": int(len(input_idx)),
"target_source": "hand/body joint delta between neighboring windows",
"minimal": min_metrics,
"neural_mlp": neural_metrics,
"neural_training": neural,
}
def task_multi_view_retrieval(X: np.ndarray, rows: list[dict[str, str]], manifest: list[dict[str, Any]], args: argparse.Namespace) -> dict[str, Any]:
query_idx = block_indices(manifest, include=["video_fisheye_cam0"])
target_idx = block_indices(manifest, include=["video_stereo_left"])
if len(query_idx) == 0 or len(target_idx) == 0:
raise ValueError("Expected video_fisheye_cam0 and video_stereo_left feature blocks.")
train, test = chronological_split(len(X), args.train_fraction)
Xq = X[:, query_idx]
Yt = X[:, target_idx]
pred_min = ridge_predict(Xq[train], Yt[train], Xq[test], l2=args.ridge_l2, standardize_y=True)
min_metrics, min_rows = retrieval_metrics(pred_min, Yt[test])
for row in min_rows:
idx = int(test[int(row["test_position"])])
row["window_index"] = idx
row["center_frame"] = rows[idx]["center_frame"]
write_csv(OUT_DIR / "multi_view_consistency_minimal_ranks.csv", min_rows, list(min_rows[0].keys()))
neural = {"available": False, "reason": "skipped by flag"}
neural_metrics = None
neural_rows: list[dict[str, Any]] = []
if not args.skip_neural:
pred_neural, neural = train_neural(Xq[train], Yt[train], Xq[test], task_type="projection", args=args)
if neural.get("available") and pred_neural.size:
neural_metrics, neural_rows = retrieval_metrics(pred_neural, Yt[test])
for row in neural_rows:
idx = int(test[int(row["test_position"])])
row["window_index"] = idx
row["center_frame"] = rows[idx]["center_frame"]
write_csv(OUT_DIR / "multi_view_consistency_neural_ranks.csv", neural_rows, list(neural_rows[0].keys()))
return {
"train_windows": int(len(train)),
"test_windows": int(len(test)),
"query_block": "video_fisheye_cam0",
"target_block": "video_stereo_left",
"query_dim": int(len(query_idx)),
"target_dim": int(len(target_idx)),
"minimal": min_metrics,
"neural_mlp": neural_metrics,
"neural_training": neural,
}
def task_action_phase_progress(X: np.ndarray, rows: list[dict[str, str]], manifest: list[dict[str, Any]], args: argparse.Namespace) -> dict[str, Any]:
input_idx = block_indices(manifest, exclude=["caption_objects_interaction_text"])
target = action_progress_targets(rows)
train, test = chronological_split(len(X), args.train_fraction)
pred_min = ridge_predict(X[train][:, input_idx], target[train], X[test][:, input_idx], l2=args.ridge_l2, standardize_y=True)[:, 0]
pred_min = np.clip(pred_min, 0.0, 1.0)
min_metrics = regression_metrics(target[test], pred_min)
min_rows = []
for local_pos, pred in zip(test, pred_min):
idx = int(local_pos)
min_rows.append(
{
"window_index": idx,
"center_frame": rows[idx]["center_frame"],
"action_label": rows[idx]["action_label"],
"true_progress": float(target[idx]),
"pred_progress": float(pred),
"absolute_error": float(abs(pred - target[idx])),
}
)
write_csv(OUT_DIR / "action_phase_progress_minimal_predictions.csv", min_rows, list(min_rows[0].keys()))
neural = {"available": False, "reason": "skipped by flag"}
neural_metrics = None
neural_rows: list[dict[str, Any]] = []
if not args.skip_neural:
pred_neural, neural = train_neural(X[train][:, input_idx], target[train], X[test][:, input_idx], task_type="regression", args=args)
if neural.get("available") and pred_neural.size:
values = np.clip(pred_neural[:, 0], 0.0, 1.0)
neural_metrics = regression_metrics(target[test], values)
for local_pos, pred in zip(test, values):
idx = int(local_pos)
neural_rows.append(
{
"window_index": idx,
"center_frame": rows[idx]["center_frame"],
"action_label": rows[idx]["action_label"],
"true_progress": float(target[idx]),
"pred_progress": float(pred),
"absolute_error": float(abs(pred - target[idx])),
}
)
write_csv(OUT_DIR / "action_phase_progress_neural_predictions.csv", neural_rows, list(neural_rows[0].keys()))
return {
"train_windows": int(len(train)),
"test_windows": int(len(test)),
"input_dim": int(len(input_idx)),
"target_source": "normalized position inside contiguous action-label runs",
"minimal": min_metrics,
"neural_mlp": neural_metrics,
"neural_training": neural,
}
def task_ego_motion_forecast(X: np.ndarray, rows: list[dict[str, str]], manifest: list[dict[str, Any]], args: argparse.Namespace) -> dict[str, Any]:
input_idx = block_indices(manifest, exclude=["camera_translation", "caption_objects_interaction_text"])
target_idx = block_indices(manifest, include=["camera_translation"])
horizon = int(args.future_windows)
valid = np.arange(0, len(X) - horizon, dtype=np.int64)
target = X[valid + horizon][:, target_idx] - X[valid][:, target_idx]
Xv = X[valid][:, input_idx]
train, test = chronological_split(len(valid), args.train_fraction)
pred_min = ridge_predict(Xv[train], target[train], Xv[test], l2=args.ridge_l2, standardize_y=True)
min_metrics = regression_metrics(target[test], pred_min)
min_rows = []
for local_pos, pred in zip(test, pred_min):
idx = int(valid[int(local_pos)])
true_delta = target[int(local_pos)]
min_rows.append(
{
"window_index": idx,
"center_frame": rows[idx]["center_frame"],
"future_window_index": int(idx + horizon),
"delta_l2_true": float(np.linalg.norm(true_delta)),
"delta_l2_pred": float(np.linalg.norm(pred)),
"delta_l2_error": float(np.linalg.norm(pred - true_delta)),
}
)
write_csv(OUT_DIR / "ego_motion_forecast_minimal_predictions.csv", min_rows, list(min_rows[0].keys()))
neural = {"available": False, "reason": "skipped by flag"}
neural_metrics = None
neural_rows: list[dict[str, Any]] = []
if not args.skip_neural:
pred_neural, neural = train_neural(Xv[train], target[train], Xv[test], task_type="regression", args=args)
if neural.get("available") and pred_neural.size:
neural_metrics = regression_metrics(target[test], pred_neural)
for local_pos, pred in zip(test, pred_neural):
idx = int(valid[int(local_pos)])
true_delta = target[int(local_pos)]
neural_rows.append(
{
"window_index": idx,
"center_frame": rows[idx]["center_frame"],
"future_window_index": int(idx + horizon),
"delta_l2_true": float(np.linalg.norm(true_delta)),
"delta_l2_pred": float(np.linalg.norm(pred)),
"delta_l2_error": float(np.linalg.norm(pred - true_delta)),
}
)
write_csv(OUT_DIR / "ego_motion_forecast_neural_predictions.csv", neural_rows, list(neural_rows[0].keys()))
return {
"train_windows": int(len(train)),
"test_windows": int(len(test)),
"forecast_horizon_windows": horizon,
"forecast_horizon_frames": int(horizon * 5),
"input_dim": int(len(input_idx)),
"target_dim": int(len(target_idx)),
"target_source": "future minus current camera_translation feature block",
"minimal": min_metrics,
"neural_mlp": neural_metrics,
"neural_training": neural,
}
def fmt_metric(value: float | None, metric_key: str) -> str:
if value is None:
return "n/a"
if metric_key in {"mae", "mse"}:
return f"{value:.4f}"
return f"{value:.4f}"
def task_main_metric(task: str, result: dict[str, Any], baseline: str) -> float | None:
metrics = result.get(baseline)
if not metrics:
return None
key = TASK_SPECS[task]["metric_key"]
value = metrics.get(key)
return float(value) if value is not None else None
def write_markdown(payload: dict[str, Any]) -> None:
lines = [
"# Four-Direction Extension Task Baselines",
"",
"Generated by `scripts/research_direction_extension_tasks.py` from the committed single-episode feature tensor.",
"These are data-backed extension probes that show how each research direction can be started from Xperience-10M modalities.",
"Cross-episode generalization and full direction completion require later held-out experiments.",
"",
"## Summary",
"",
"| Direction | Extension task | Minimal | Neural MLP | Meaning |",
"| --- | --- | ---: | ---: | --- |",
]
for task, spec in TASK_SPECS.items():
result = payload["tasks"][task]
key = spec["metric_key"]
min_value = task_main_metric(task, result, "minimal")
nn_value = task_main_metric(task, result, "neural_mlp")
lines.append(
f"| {spec['direction']}. {spec['direction_name']} | {spec['name']} | {fmt_metric(min_value, key)} {spec['metric_name']} | {fmt_metric(nn_value, key)} {spec['metric_name']} | {spec['current_limit']} |"
)
lines.extend(["", "## Task Details", ""])
for task, spec in TASK_SPECS.items():
result = payload["tasks"][task]
key = spec["metric_key"]
lines.extend(
[
f"### {spec['direction']}. {spec['name']}",
"",
f"- Case study: {spec['case_study']}",
f"- Input: {spec['input']}",
f"- Middle process modules: {spec['middle_process']}",
f"- Output: {spec['output']}",
f"- Minimal baseline: {spec['minimal_baseline']}",
f"- Neural baseline: {spec['neural_baseline']}",
f"- Minimal result: {fmt_metric(task_main_metric(task, result, 'minimal'), key)} {spec['metric_name']}",
f"- Neural result: {fmt_metric(task_main_metric(task, result, 'neural_mlp'), key)} {spec['metric_name']}",
f"- Limitation: {spec['current_limit']}",
"",
]
)
(OUT_DIR / "research_direction_extension_summary.md").write_text("\n".join(lines).rstrip() + "\n", encoding="utf-8")
def svg_text(x: int, y: int, text: str, size: int = 16, weight: int = 500, color: str = "#f4f8ef") -> str:
return (
f'{html.escape(text)}'
)
def write_svg(payload: dict[str, Any]) -> None:
CHARTS.mkdir(parents=True, exist_ok=True)
width = 1420
height = 920
colors = {"A": "#ccffa0", "B": "#7ae5c3", "C": "#d8f4a5", "D": "#9bdfff"}
svg: list[str] = [
f'")
(CHARTS / "research_direction_extension_tasks.svg").write_text("\n".join(svg), encoding="utf-8")
def build_payload(args: argparse.Namespace) -> dict[str, Any]:
X, starts, ends, rows, manifest = load_inputs(args.results_dir)
global OUT_DIR
OUT_DIR = args.output_dir
OUT_DIR.mkdir(parents=True, exist_ok=True)
tasks = OrderedDict()
tasks["body_motion_intensity"] = task_body_motion_intensity(X, rows, manifest, args)
tasks["multi_view_consistency_retrieval"] = task_multi_view_retrieval(X, rows, manifest, args)
tasks["action_phase_progress"] = task_action_phase_progress(X, rows, manifest, args)
tasks["ego_motion_forecast"] = task_ego_motion_forecast(X, rows, manifest, args)
payload = {
"source": {
"shared_windows": str((args.results_dir / "shared_windows.npz").relative_to(ROOT)),
"windows_csv": str((args.results_dir / "windows.csv").relative_to(ROOT)),
"feature_manifest": str((args.results_dir / "feature_manifest.json").relative_to(ROOT)),
},
"dataset_scope": {
"sample_episode_count": 1,
"num_windows": int(len(X)),
"feature_dim": int(X.shape[1]),
"first_start_frame": int(starts[0]),
"last_end_frame": int(ends[-1]),
"warning": "Single public sample episode; these extension probes validate task design and pipeline mechanics, not cross-episode generalization.",
},
"baselines": {
"minimal": "Ridge classifiers/regressors/projections plus cosine retrieval on the committed feature tensor.",
"neural_mlp": "Small one-hidden-layer PyTorch MLP heads using the same inputs, targets, chronological split, and evaluator.",
},
"run_config": {
"train_fraction": float(args.train_fraction),
"ridge_l2": float(args.ridge_l2),
"seed": int(args.seed),
"future_windows": int(args.future_windows),
"neural_epochs": int(args.neural_epochs),
"neural_hidden_dim": int(args.neural_hidden_dim),
"neural_batch_size": int(args.neural_batch_size),
"skip_neural": bool(args.skip_neural),
},
"task_specs": TASK_SPECS,
"tasks": tasks,
}
return payload
def main() -> int:
args = parse_args()
payload = build_payload(args)
write_json(args.output_dir / "research_direction_extension_results.json", payload)
write_json(DOCS_DATA / "research_direction_extensions.json", payload)
write_markdown(payload)
write_svg(payload)
print(f"Wrote {args.output_dir / 'research_direction_extension_results.json'}")
print(f"Wrote {CHARTS / 'research_direction_extension_tasks.svg'}")
return 0
if __name__ == "__main__":
raise SystemExit(main())