Robotics
PyTorch
Cosmos
xperience10m_task_baseline_suite
embodied-ai
multimodal
xperience-10m
baseline
evaluation
qwen3-omni
Instructions to use cy0307/ropedia-xperience-10m-task-baselines with libraries, inference providers, notebooks, and local apps. Follow these links to get started.
- Libraries
- Cosmos
How to use cy0307/ropedia-xperience-10m-task-baselines with Cosmos:
# No code snippets available yet for this library. # To use this model, check the repository files and the library's documentation. # Want to help? PRs adding snippets are welcome at: # https://github.com/huggingface/huggingface.js
- Notebooks
- Google Colab
- Kaggle
| #!/usr/bin/env python3 | |
| """Run one extra data-backed probe for each Ropedia research direction. | |
| These tasks reuse the committed single-episode feature tensor generated by | |
| `episode_task_suite.py`. They are extension probes, not full implementations of the | |
| research directions are solved. | |
| """ | |
| from __future__ import annotations | |
| import argparse | |
| import csv | |
| import html | |
| import json | |
| import math | |
| from collections import OrderedDict | |
| from pathlib import Path | |
| from typing import Any | |
| import numpy as np | |
| ROOT = Path(__file__).resolve().parents[1] | |
| RESULTS = ROOT / "results" / "episode_task_suite" | |
| OUT_DIR = RESULTS / "research_direction_extensions" | |
| DOCS_DATA = ROOT / "docs" / "data" | |
| CHARTS = ROOT / "docs" / "assets" / "charts" | |
| WINDOWS_NPZ = RESULTS / "shared_windows.npz" | |
| WINDOWS_CSV = RESULTS / "windows.csv" | |
| FEATURE_MANIFEST = RESULTS / "feature_manifest.json" | |
| TASK_SPECS: OrderedDict[str, dict[str, Any]] = OrderedDict( | |
| [ | |
| ( | |
| "body_motion_intensity", | |
| { | |
| "direction": "A", | |
| "direction_name": "Human Modeling & Motion Understanding", | |
| "name": "Body and Hand Motion Intensity", | |
| "family": "classification", | |
| "case_study": "A window with a fast reach or pour should be classified as high motion; a steady holding window should be low motion.", | |
| "input": "Current non-mocap feature blocks: video, audio, depth, camera pose/rotation, IMU, SLAM, calibration, and language context.", | |
| "middle_process": "Compute the target from hand/body joint changes between neighboring windows, hide the mocap blocks from the input, then classify high versus low motion using the train-set median as the threshold.", | |
| "output": "Binary label: high_motion or low_motion.", | |
| "minimal_baseline": "Ridge classifier on standardized non-mocap features.", | |
| "neural_baseline": "One-hidden-layer MLP binary classifier on the same input features.", | |
| "metric_name": "macro-F1", | |
| "metric_key": "macro_f1", | |
| "metric_direction": "higher", | |
| "current_limit": "This is a motion-energy proxy, not a SMPL/MANO body model or a generative motion prior.", | |
| }, | |
| ), | |
| ( | |
| "multi_view_consistency_retrieval", | |
| { | |
| "direction": "B", | |
| "direction_name": "3D/4D Reconstruction & Neural Rendering", | |
| "name": "Multi-View Consistency Retrieval", | |
| "family": "retrieval", | |
| "case_study": "Given the fisheye camera features for a pouring moment, retrieve the synchronized stereo-left view from the same time window.", | |
| "input": "Query side: fisheye_cam0 video feature block. Candidate side: stereo_left video feature block from held-out windows.", | |
| "middle_process": "Learn a projection from one camera-view feature space into another, then rank held-out candidate windows by cosine similarity.", | |
| "output": "Ranked candidate windows; the correct synchronized view should rank near the top.", | |
| "minimal_baseline": "Ridge projection followed by cosine nearest-neighbor retrieval.", | |
| "neural_baseline": "One-hidden-layer MLP projection followed by the same cosine retrieval evaluator.", | |
| "metric_name": "MRR", | |
| "metric_key": "mrr", | |
| "metric_direction": "higher", | |
| "current_limit": "This checks calibrated multi-view signal, but it is still feature retrieval, not NeRF, Gaussian Splatting, or novel-view synthesis.", | |
| }, | |
| ), | |
| ( | |
| "action_phase_progress", | |
| { | |
| "direction": "C", | |
| "direction_name": "Egocentric Vision & Interaction", | |
| "name": "Action Phase Progress Estimation", | |
| "family": "regression", | |
| "case_study": "Inside a Pour coffee action segment, estimate whether the current window is near the beginning, middle, or end of that action.", | |
| "input": "Current non-caption multimodal feature vector, so the label text cannot be copied directly from the language block.", | |
| "middle_process": "Convert contiguous action-label runs into a normalized 0-to-1 progress target, train on earlier windows, and regress progress for later windows.", | |
| "output": "A scalar progress value between 0.0 and 1.0 for the current action segment.", | |
| "minimal_baseline": "Ridge regressor on standardized non-caption features.", | |
| "neural_baseline": "One-hidden-layer MLP regressor on the same input features.", | |
| "metric_name": "MAE", | |
| "metric_key": "mae", | |
| "metric_direction": "lower", | |
| "current_limit": "This is an action-structure probe inside one episode, not a general intent model across homes, people, or tasks.", | |
| }, | |
| ), | |
| ( | |
| "ego_motion_forecast", | |
| { | |
| "direction": "D", | |
| "direction_name": "Scene Reconstruction & World Modeling", | |
| "name": "Short-Horizon Ego-Motion Forecasting", | |
| "family": "forecast", | |
| "case_study": "From the current sensors, predict how the camera translation will change over the next 20 frames while the wearer moves through the scene.", | |
| "input": "Current multimodal features excluding the camera-translation block and caption text.", | |
| "middle_process": "Build a future target from camera-translation difference at a four-window horizon, then regress that future ego-motion delta from current sensors.", | |
| "output": "A future camera-translation delta vector.", | |
| "minimal_baseline": "Ridge regressor with a 20-frame forecast horizon.", | |
| "neural_baseline": "One-hidden-layer MLP regressor with the same horizon and split.", | |
| "metric_name": "MAE", | |
| "metric_key": "mae", | |
| "metric_direction": "lower", | |
| "current_limit": "This is a compact world-model proxy; it does not build a persistent map, scene graph, or object permanence model.", | |
| }, | |
| ), | |
| ] | |
| ) | |
| def parse_args() -> argparse.Namespace: | |
| parser = argparse.ArgumentParser(description="Run four research-direction extension probes.") | |
| parser.add_argument("--results-dir", type=Path, default=RESULTS) | |
| parser.add_argument("--output-dir", type=Path, default=OUT_DIR) | |
| parser.add_argument("--train-fraction", type=float, default=0.70) | |
| parser.add_argument("--ridge-l2", type=float, default=10.0) | |
| parser.add_argument("--seed", type=int, default=7) | |
| parser.add_argument("--future-windows", type=int, default=4) | |
| parser.add_argument("--neural-epochs", type=int, default=25) | |
| parser.add_argument("--neural-hidden-dim", type=int, default=128) | |
| parser.add_argument("--neural-batch-size", type=int, default=128) | |
| parser.add_argument("--neural-learning-rate", type=float, default=1e-3) | |
| parser.add_argument("--neural-weight-decay", type=float, default=1e-4) | |
| parser.add_argument("--skip-neural", action="store_true") | |
| return parser.parse_args() | |
| def write_json(path: Path, payload: dict[str, Any] | list[Any]) -> None: | |
| path.parent.mkdir(parents=True, exist_ok=True) | |
| path.write_text(json.dumps(payload, indent=2), encoding="utf-8") | |
| def write_csv(path: Path, rows: list[dict[str, Any]], fieldnames: list[str]) -> None: | |
| path.parent.mkdir(parents=True, exist_ok=True) | |
| with path.open("w", newline="", encoding="utf-8") as handle: | |
| writer = csv.DictWriter(handle, fieldnames=fieldnames, lineterminator="\n") | |
| writer.writeheader() | |
| writer.writerows(rows) | |
| def load_windows_csv(path: Path) -> list[dict[str, str]]: | |
| with path.open("r", newline="", encoding="utf-8") as handle: | |
| return list(csv.DictReader(handle)) | |
| def load_inputs(results_dir: Path) -> tuple[np.ndarray, np.ndarray, np.ndarray, list[dict[str, str]], list[dict[str, Any]]]: | |
| npz_path = results_dir / "shared_windows.npz" | |
| windows_path = results_dir / "windows.csv" | |
| manifest_path = results_dir / "feature_manifest.json" | |
| if not npz_path.exists(): | |
| raise FileNotFoundError(f"Missing {npz_path}. Run scripts/episode_task_suite.py first.") | |
| z = np.load(npz_path, allow_pickle=False) | |
| X = np.asarray(z["X"], dtype=np.float32) | |
| starts = np.asarray(z["starts"], dtype=np.int64) | |
| ends = np.asarray(z["ends"], dtype=np.int64) | |
| X = np.nan_to_num(X, nan=0.0, posinf=0.0, neginf=0.0) | |
| rows = load_windows_csv(windows_path) | |
| manifest = json.loads(manifest_path.read_text(encoding="utf-8")) | |
| if len(rows) != len(X): | |
| raise ValueError(f"windows.csv has {len(rows)} rows but shared_windows.npz has {len(X)} windows.") | |
| return X, starts, ends, rows, manifest | |
| def block_indices(manifest: list[dict[str, Any]], include: list[str] | None = None, exclude: list[str] | None = None) -> np.ndarray: | |
| include = include or [] | |
| exclude = exclude or [] | |
| idxs: list[int] = [] | |
| for block in manifest: | |
| name = str(block["name"]) | |
| if include and not any(name == p or name.startswith(p) for p in include): | |
| continue | |
| if exclude and any(name == p or name.startswith(p) for p in exclude): | |
| continue | |
| idxs.extend(range(int(block["start"]), int(block["end"]))) | |
| return np.asarray(idxs, dtype=np.int64) | |
| def chronological_split(n: int, train_fraction: float) -> tuple[np.ndarray, np.ndarray]: | |
| if n < 2: | |
| raise ValueError("Need at least two examples.") | |
| split = int(round(n * train_fraction)) | |
| split = max(1, min(split, n - 1)) | |
| return np.arange(split, dtype=np.int64), np.arange(split, n, dtype=np.int64) | |
| def standardize_train_test(X_train: np.ndarray, X_test: np.ndarray) -> tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray]: | |
| mean = X_train.mean(axis=0, dtype=np.float64).astype(np.float32) | |
| std = X_train.std(axis=0, dtype=np.float64).astype(np.float32) | |
| std[std < 1e-6] = 1.0 | |
| return (X_train - mean) / std, (X_test - mean) / std, mean, std | |
| def ridge_predict( | |
| X_train: np.ndarray, | |
| Y_train: np.ndarray, | |
| X_test: np.ndarray, | |
| *, | |
| l2: float, | |
| standardize_y: bool, | |
| ) -> np.ndarray: | |
| Xtr, Xte, _, _ = standardize_train_test(X_train.astype(np.float32), X_test.astype(np.float32)) | |
| Y = np.asarray(Y_train, dtype=np.float32) | |
| if Y.ndim == 1: | |
| Y = Y[:, None] | |
| if standardize_y: | |
| y_mean = Y.mean(axis=0, dtype=np.float64).astype(np.float32) | |
| y_std = Y.std(axis=0, dtype=np.float64).astype(np.float32) | |
| y_std[y_std < 1e-6] = 1.0 | |
| Y_work = (Y - y_mean) / y_std | |
| else: | |
| y_mean = np.zeros(Y.shape[1], dtype=np.float32) | |
| y_std = np.ones(Y.shape[1], dtype=np.float32) | |
| Y_work = Y | |
| K = Xtr @ Xtr.T | |
| K.flat[:: K.shape[0] + 1] += float(l2) | |
| alpha = np.linalg.solve(K.astype(np.float64), Y_work.astype(np.float64)).astype(np.float32) | |
| pred = (Xte @ Xtr.T @ alpha).astype(np.float32) | |
| return pred * y_std + y_mean | |
| def ridge_classifier(X_train: np.ndarray, y_train: np.ndarray, X_test: np.ndarray, l2: float) -> tuple[np.ndarray, np.ndarray]: | |
| classes = np.asarray(sorted(set(int(v) for v in y_train)), dtype=np.int64) | |
| class_to_col = {int(cls): i for i, cls in enumerate(classes)} | |
| Y = np.zeros((len(y_train), len(classes)), dtype=np.float32) | |
| for row, label in enumerate(y_train): | |
| Y[row, class_to_col[int(label)]] = 1.0 | |
| scores = ridge_predict(X_train, Y, X_test, l2=l2, standardize_y=False) | |
| pred = classes[np.argmax(scores, axis=1)] | |
| return pred.astype(np.int64), scores | |
| def binary_metrics(y_true: np.ndarray, y_pred: np.ndarray) -> dict[str, float | int]: | |
| y_true = y_true.astype(np.int64) | |
| y_pred = y_pred.astype(np.int64) | |
| accuracy = float(np.mean(y_true == y_pred)) | |
| per_class_f1 = [] | |
| for cls in (0, 1): | |
| tp = int(np.sum((y_true == cls) & (y_pred == cls))) | |
| fp = int(np.sum((y_true != cls) & (y_pred == cls))) | |
| fn = int(np.sum((y_true == cls) & (y_pred != cls))) | |
| precision = tp / max(tp + fp, 1) | |
| recall = tp / max(tp + fn, 1) | |
| f1 = 2 * precision * recall / max(precision + recall, 1e-12) | |
| per_class_f1.append(f1) | |
| return { | |
| "accuracy": accuracy, | |
| "macro_f1": float(np.mean(per_class_f1)), | |
| "positive_rate_true": float(np.mean(y_true)), | |
| "positive_rate_pred": float(np.mean(y_pred)), | |
| "num_test": int(len(y_true)), | |
| } | |
| def regression_metrics(y_true: np.ndarray, y_pred: np.ndarray) -> dict[str, float | int]: | |
| y_true = np.asarray(y_true, dtype=np.float32) | |
| y_pred = np.asarray(y_pred, dtype=np.float32) | |
| err = y_pred - y_true | |
| mse = float(np.mean(err * err)) | |
| mae = float(np.mean(np.abs(err))) | |
| denom = float(np.sum((y_true - y_true.mean(axis=0, keepdims=True)) ** 2)) | |
| numer = float(np.sum(err * err)) | |
| r2 = 1.0 - numer / max(denom, 1e-12) | |
| return {"mse": mse, "mae": mae, "r2": r2, "num_test": int(len(y_true))} | |
| def row_normalize(X: np.ndarray) -> np.ndarray: | |
| denom = np.linalg.norm(X, axis=1, keepdims=True) | |
| denom[denom < 1e-8] = 1.0 | |
| return X / denom | |
| def retrieval_metrics(query_pred: np.ndarray, target_test: np.ndarray) -> tuple[dict[str, float | int], list[dict[str, Any]]]: | |
| Q = row_normalize(np.asarray(query_pred, dtype=np.float32)) | |
| T = row_normalize(np.asarray(target_test, dtype=np.float32)) | |
| sims = Q @ T.T | |
| ranks = [] | |
| rows = [] | |
| for i in range(sims.shape[0]): | |
| order = np.argsort(-sims[i]) | |
| rank = int(np.flatnonzero(order == i)[0]) + 1 | |
| ranks.append(rank) | |
| rows.append( | |
| { | |
| "test_position": i, | |
| "true_rank": rank, | |
| "top_candidate_position": int(order[0]), | |
| "top_candidate_score": float(sims[i, order[0]]), | |
| "true_score": float(sims[i, i]), | |
| } | |
| ) | |
| ranks_array = np.asarray(ranks, dtype=np.float32) | |
| metrics = { | |
| "mrr": float(np.mean(1.0 / ranks_array)), | |
| "top1": float(np.mean(ranks_array <= 1)), | |
| "top5": float(np.mean(ranks_array <= 5)), | |
| "top10": float(np.mean(ranks_array <= 10)), | |
| "median_rank": float(np.median(ranks_array)), | |
| "num_test": int(len(ranks)), | |
| } | |
| return metrics, rows | |
| def choose_score(task: str, metrics: dict[str, Any]) -> float: | |
| spec = TASK_SPECS[task] | |
| value = float(metrics[spec["metric_key"]]) | |
| if spec["metric_direction"] == "higher": | |
| return value | |
| return max(0.0, 1.0 - value) | |
| def train_neural( | |
| X_train: np.ndarray, | |
| Y_train: np.ndarray, | |
| X_test: np.ndarray, | |
| *, | |
| task_type: str, | |
| args: argparse.Namespace, | |
| ) -> tuple[np.ndarray, dict[str, Any]]: | |
| try: | |
| import torch | |
| from torch import nn | |
| from torch.utils.data import DataLoader, TensorDataset | |
| except Exception as exc: # pragma: no cover - depends on optional torch install | |
| return np.empty((len(X_test), 0), dtype=np.float32), {"available": False, "reason": f"torch unavailable: {exc}"} | |
| rng = np.random.default_rng(args.seed) | |
| torch.manual_seed(args.seed) | |
| Xtr, Xte, _, _ = standardize_train_test(X_train.astype(np.float32), X_test.astype(np.float32)) | |
| Y = np.asarray(Y_train, dtype=np.float32) | |
| if Y.ndim == 1: | |
| Y = Y[:, None] | |
| if task_type == "classification": | |
| Y_work = Y | |
| y_mean = np.zeros(Y.shape[1], dtype=np.float32) | |
| y_std = np.ones(Y.shape[1], dtype=np.float32) | |
| else: | |
| y_mean = Y.mean(axis=0, dtype=np.float64).astype(np.float32) | |
| y_std = Y.std(axis=0, dtype=np.float64).astype(np.float32) | |
| y_std[y_std < 1e-6] = 1.0 | |
| Y_work = (Y - y_mean) / y_std | |
| device = torch.device("cpu") | |
| model = nn.Sequential( | |
| nn.Linear(Xtr.shape[1], args.neural_hidden_dim), | |
| nn.GELU(), | |
| nn.Dropout(0.08), | |
| nn.Linear(args.neural_hidden_dim, Y_work.shape[1]), | |
| ).to(device) | |
| if task_type == "classification": | |
| loss_fn = nn.BCEWithLogitsLoss() | |
| else: | |
| loss_fn = nn.MSELoss() | |
| opt = torch.optim.AdamW(model.parameters(), lr=args.neural_learning_rate, weight_decay=args.neural_weight_decay) | |
| order = np.arange(len(Xtr)) | |
| dataset = TensorDataset(torch.from_numpy(Xtr), torch.from_numpy(Y_work.astype(np.float32))) | |
| loader = DataLoader(dataset, batch_size=args.neural_batch_size, shuffle=False) | |
| history = [] | |
| for epoch in range(args.neural_epochs): | |
| rng.shuffle(order) | |
| if len(order) == len(dataset): | |
| X_epoch = torch.from_numpy(Xtr[order]) | |
| Y_epoch = torch.from_numpy(Y_work.astype(np.float32)[order]) | |
| loader = DataLoader(TensorDataset(X_epoch, Y_epoch), batch_size=args.neural_batch_size, shuffle=False) | |
| model.train() | |
| total_loss = 0.0 | |
| total_seen = 0 | |
| for xb, yb in loader: | |
| xb = xb.to(device) | |
| yb = yb.to(device) | |
| opt.zero_grad(set_to_none=True) | |
| loss = loss_fn(model(xb), yb) | |
| loss.backward() | |
| opt.step() | |
| total_loss += float(loss.item()) * len(xb) | |
| total_seen += len(xb) | |
| history.append(total_loss / max(total_seen, 1)) | |
| model.eval() | |
| with torch.no_grad(): | |
| raw = model(torch.from_numpy(Xte).to(device)).cpu().numpy().astype(np.float32) | |
| if task_type == "classification": | |
| pred = 1.0 / (1.0 + np.exp(-raw)) | |
| else: | |
| pred = raw * y_std + y_mean | |
| return pred, {"available": True, "epochs": args.neural_epochs, "hidden_dim": args.neural_hidden_dim, "loss_history": history} | |
| def action_progress_targets(rows: list[dict[str, str]]) -> np.ndarray: | |
| labels = [row.get("action_label", "") or "" for row in rows] | |
| progress = np.zeros(len(labels), dtype=np.float32) | |
| start = 0 | |
| while start < len(labels): | |
| end = start + 1 | |
| while end < len(labels) and labels[end] == labels[start]: | |
| end += 1 | |
| length = end - start | |
| if length > 1: | |
| progress[start:end] = np.linspace(0.0, 1.0, length, dtype=np.float32) | |
| start = end | |
| return progress | |
| def task_body_motion_intensity(X: np.ndarray, rows: list[dict[str, str]], manifest: list[dict[str, Any]], args: argparse.Namespace) -> dict[str, Any]: | |
| mocap_idx = block_indices(manifest, include=["hand_left_joints", "hand_right_joints", "body_joints"]) | |
| input_idx = block_indices(manifest, exclude=["hand_left_joints", "hand_right_joints", "body_joints", "body_contacts"]) | |
| valid = np.arange(1, len(X), dtype=np.int64) | |
| motion = np.linalg.norm(X[valid][:, mocap_idx] - X[valid - 1][:, mocap_idx], axis=1) | |
| train_local, test_local = chronological_split(len(valid), args.train_fraction) | |
| threshold = float(np.median(motion[train_local])) | |
| y = (motion >= threshold).astype(np.int64) | |
| Xv = X[valid][:, input_idx] | |
| y_pred_min, scores = ridge_classifier(Xv[train_local], y[train_local], Xv[test_local], args.ridge_l2) | |
| min_metrics = binary_metrics(y[test_local], y_pred_min) | |
| min_rows = [] | |
| for local_pos, pred, score_pair in zip(test_local, y_pred_min, scores): | |
| idx = int(valid[int(local_pos)]) | |
| min_rows.append( | |
| { | |
| "window_index": idx, | |
| "center_frame": rows[idx]["center_frame"], | |
| "motion_energy": float(motion[int(local_pos)]), | |
| "true_label": "high_motion" if y[int(local_pos)] else "low_motion", | |
| "pred_label": "high_motion" if int(pred) else "low_motion", | |
| "score_low": float(score_pair[0]) if len(score_pair) > 0 else "", | |
| "score_high": float(score_pair[1]) if len(score_pair) > 1 else "", | |
| } | |
| ) | |
| neural = {"available": False, "reason": "skipped by flag"} | |
| neural_metrics = None | |
| neural_rows: list[dict[str, Any]] = [] | |
| if not args.skip_neural: | |
| prob, neural = train_neural(Xv[train_local], y[train_local].astype(np.float32), Xv[test_local], task_type="classification", args=args) | |
| if neural.get("available") and prob.size: | |
| pred = (prob[:, 0] >= 0.5).astype(np.int64) | |
| neural_metrics = binary_metrics(y[test_local], pred) | |
| for local_pos, p, pr in zip(test_local, pred, prob[:, 0]): | |
| idx = int(valid[int(local_pos)]) | |
| neural_rows.append( | |
| { | |
| "window_index": idx, | |
| "center_frame": rows[idx]["center_frame"], | |
| "motion_energy": float(motion[int(local_pos)]), | |
| "true_label": "high_motion" if y[int(local_pos)] else "low_motion", | |
| "pred_label": "high_motion" if int(p) else "low_motion", | |
| "prob_high": float(pr), | |
| } | |
| ) | |
| write_csv(OUT_DIR / "body_motion_intensity_minimal_predictions.csv", min_rows, list(min_rows[0].keys())) | |
| if neural_rows: | |
| write_csv(OUT_DIR / "body_motion_intensity_neural_predictions.csv", neural_rows, list(neural_rows[0].keys())) | |
| return { | |
| "train_windows": int(len(train_local)), | |
| "test_windows": int(len(test_local)), | |
| "target_threshold_train_median": threshold, | |
| "input_dim": int(len(input_idx)), | |
| "target_source": "hand/body joint delta between neighboring windows", | |
| "minimal": min_metrics, | |
| "neural_mlp": neural_metrics, | |
| "neural_training": neural, | |
| } | |
| def task_multi_view_retrieval(X: np.ndarray, rows: list[dict[str, str]], manifest: list[dict[str, Any]], args: argparse.Namespace) -> dict[str, Any]: | |
| query_idx = block_indices(manifest, include=["video_fisheye_cam0"]) | |
| target_idx = block_indices(manifest, include=["video_stereo_left"]) | |
| if len(query_idx) == 0 or len(target_idx) == 0: | |
| raise ValueError("Expected video_fisheye_cam0 and video_stereo_left feature blocks.") | |
| train, test = chronological_split(len(X), args.train_fraction) | |
| Xq = X[:, query_idx] | |
| Yt = X[:, target_idx] | |
| pred_min = ridge_predict(Xq[train], Yt[train], Xq[test], l2=args.ridge_l2, standardize_y=True) | |
| min_metrics, min_rows = retrieval_metrics(pred_min, Yt[test]) | |
| for row in min_rows: | |
| idx = int(test[int(row["test_position"])]) | |
| row["window_index"] = idx | |
| row["center_frame"] = rows[idx]["center_frame"] | |
| write_csv(OUT_DIR / "multi_view_consistency_minimal_ranks.csv", min_rows, list(min_rows[0].keys())) | |
| neural = {"available": False, "reason": "skipped by flag"} | |
| neural_metrics = None | |
| neural_rows: list[dict[str, Any]] = [] | |
| if not args.skip_neural: | |
| pred_neural, neural = train_neural(Xq[train], Yt[train], Xq[test], task_type="projection", args=args) | |
| if neural.get("available") and pred_neural.size: | |
| neural_metrics, neural_rows = retrieval_metrics(pred_neural, Yt[test]) | |
| for row in neural_rows: | |
| idx = int(test[int(row["test_position"])]) | |
| row["window_index"] = idx | |
| row["center_frame"] = rows[idx]["center_frame"] | |
| write_csv(OUT_DIR / "multi_view_consistency_neural_ranks.csv", neural_rows, list(neural_rows[0].keys())) | |
| return { | |
| "train_windows": int(len(train)), | |
| "test_windows": int(len(test)), | |
| "query_block": "video_fisheye_cam0", | |
| "target_block": "video_stereo_left", | |
| "query_dim": int(len(query_idx)), | |
| "target_dim": int(len(target_idx)), | |
| "minimal": min_metrics, | |
| "neural_mlp": neural_metrics, | |
| "neural_training": neural, | |
| } | |
| def task_action_phase_progress(X: np.ndarray, rows: list[dict[str, str]], manifest: list[dict[str, Any]], args: argparse.Namespace) -> dict[str, Any]: | |
| input_idx = block_indices(manifest, exclude=["caption_objects_interaction_text"]) | |
| target = action_progress_targets(rows) | |
| train, test = chronological_split(len(X), args.train_fraction) | |
| pred_min = ridge_predict(X[train][:, input_idx], target[train], X[test][:, input_idx], l2=args.ridge_l2, standardize_y=True)[:, 0] | |
| pred_min = np.clip(pred_min, 0.0, 1.0) | |
| min_metrics = regression_metrics(target[test], pred_min) | |
| min_rows = [] | |
| for local_pos, pred in zip(test, pred_min): | |
| idx = int(local_pos) | |
| min_rows.append( | |
| { | |
| "window_index": idx, | |
| "center_frame": rows[idx]["center_frame"], | |
| "action_label": rows[idx]["action_label"], | |
| "true_progress": float(target[idx]), | |
| "pred_progress": float(pred), | |
| "absolute_error": float(abs(pred - target[idx])), | |
| } | |
| ) | |
| write_csv(OUT_DIR / "action_phase_progress_minimal_predictions.csv", min_rows, list(min_rows[0].keys())) | |
| neural = {"available": False, "reason": "skipped by flag"} | |
| neural_metrics = None | |
| neural_rows: list[dict[str, Any]] = [] | |
| if not args.skip_neural: | |
| pred_neural, neural = train_neural(X[train][:, input_idx], target[train], X[test][:, input_idx], task_type="regression", args=args) | |
| if neural.get("available") and pred_neural.size: | |
| values = np.clip(pred_neural[:, 0], 0.0, 1.0) | |
| neural_metrics = regression_metrics(target[test], values) | |
| for local_pos, pred in zip(test, values): | |
| idx = int(local_pos) | |
| neural_rows.append( | |
| { | |
| "window_index": idx, | |
| "center_frame": rows[idx]["center_frame"], | |
| "action_label": rows[idx]["action_label"], | |
| "true_progress": float(target[idx]), | |
| "pred_progress": float(pred), | |
| "absolute_error": float(abs(pred - target[idx])), | |
| } | |
| ) | |
| write_csv(OUT_DIR / "action_phase_progress_neural_predictions.csv", neural_rows, list(neural_rows[0].keys())) | |
| return { | |
| "train_windows": int(len(train)), | |
| "test_windows": int(len(test)), | |
| "input_dim": int(len(input_idx)), | |
| "target_source": "normalized position inside contiguous action-label runs", | |
| "minimal": min_metrics, | |
| "neural_mlp": neural_metrics, | |
| "neural_training": neural, | |
| } | |
| def task_ego_motion_forecast(X: np.ndarray, rows: list[dict[str, str]], manifest: list[dict[str, Any]], args: argparse.Namespace) -> dict[str, Any]: | |
| input_idx = block_indices(manifest, exclude=["camera_translation", "caption_objects_interaction_text"]) | |
| target_idx = block_indices(manifest, include=["camera_translation"]) | |
| horizon = int(args.future_windows) | |
| valid = np.arange(0, len(X) - horizon, dtype=np.int64) | |
| target = X[valid + horizon][:, target_idx] - X[valid][:, target_idx] | |
| Xv = X[valid][:, input_idx] | |
| train, test = chronological_split(len(valid), args.train_fraction) | |
| pred_min = ridge_predict(Xv[train], target[train], Xv[test], l2=args.ridge_l2, standardize_y=True) | |
| min_metrics = regression_metrics(target[test], pred_min) | |
| min_rows = [] | |
| for local_pos, pred in zip(test, pred_min): | |
| idx = int(valid[int(local_pos)]) | |
| true_delta = target[int(local_pos)] | |
| min_rows.append( | |
| { | |
| "window_index": idx, | |
| "center_frame": rows[idx]["center_frame"], | |
| "future_window_index": int(idx + horizon), | |
| "delta_l2_true": float(np.linalg.norm(true_delta)), | |
| "delta_l2_pred": float(np.linalg.norm(pred)), | |
| "delta_l2_error": float(np.linalg.norm(pred - true_delta)), | |
| } | |
| ) | |
| write_csv(OUT_DIR / "ego_motion_forecast_minimal_predictions.csv", min_rows, list(min_rows[0].keys())) | |
| neural = {"available": False, "reason": "skipped by flag"} | |
| neural_metrics = None | |
| neural_rows: list[dict[str, Any]] = [] | |
| if not args.skip_neural: | |
| pred_neural, neural = train_neural(Xv[train], target[train], Xv[test], task_type="regression", args=args) | |
| if neural.get("available") and pred_neural.size: | |
| neural_metrics = regression_metrics(target[test], pred_neural) | |
| for local_pos, pred in zip(test, pred_neural): | |
| idx = int(valid[int(local_pos)]) | |
| true_delta = target[int(local_pos)] | |
| neural_rows.append( | |
| { | |
| "window_index": idx, | |
| "center_frame": rows[idx]["center_frame"], | |
| "future_window_index": int(idx + horizon), | |
| "delta_l2_true": float(np.linalg.norm(true_delta)), | |
| "delta_l2_pred": float(np.linalg.norm(pred)), | |
| "delta_l2_error": float(np.linalg.norm(pred - true_delta)), | |
| } | |
| ) | |
| write_csv(OUT_DIR / "ego_motion_forecast_neural_predictions.csv", neural_rows, list(neural_rows[0].keys())) | |
| return { | |
| "train_windows": int(len(train)), | |
| "test_windows": int(len(test)), | |
| "forecast_horizon_windows": horizon, | |
| "forecast_horizon_frames": int(horizon * 5), | |
| "input_dim": int(len(input_idx)), | |
| "target_dim": int(len(target_idx)), | |
| "target_source": "future minus current camera_translation feature block", | |
| "minimal": min_metrics, | |
| "neural_mlp": neural_metrics, | |
| "neural_training": neural, | |
| } | |
| def fmt_metric(value: float | None, metric_key: str) -> str: | |
| if value is None: | |
| return "n/a" | |
| if metric_key in {"mae", "mse"}: | |
| return f"{value:.4f}" | |
| return f"{value:.4f}" | |
| def task_main_metric(task: str, result: dict[str, Any], baseline: str) -> float | None: | |
| metrics = result.get(baseline) | |
| if not metrics: | |
| return None | |
| key = TASK_SPECS[task]["metric_key"] | |
| value = metrics.get(key) | |
| return float(value) if value is not None else None | |
| def write_markdown(payload: dict[str, Any]) -> None: | |
| lines = [ | |
| "# Four-Direction Extension Task Baselines", | |
| "", | |
| "Generated by `scripts/research_direction_extension_tasks.py` from the committed single-episode feature tensor.", | |
| "These are data-backed extension probes that show how each research direction can be started from Xperience-10M modalities.", | |
| "Cross-episode generalization and full direction completion require later held-out experiments.", | |
| "", | |
| "## Summary", | |
| "", | |
| "| Direction | Extension task | Minimal | Neural MLP | Meaning |", | |
| "| --- | --- | ---: | ---: | --- |", | |
| ] | |
| for task, spec in TASK_SPECS.items(): | |
| result = payload["tasks"][task] | |
| key = spec["metric_key"] | |
| min_value = task_main_metric(task, result, "minimal") | |
| nn_value = task_main_metric(task, result, "neural_mlp") | |
| lines.append( | |
| f"| {spec['direction']}. {spec['direction_name']} | {spec['name']} | {fmt_metric(min_value, key)} {spec['metric_name']} | {fmt_metric(nn_value, key)} {spec['metric_name']} | {spec['current_limit']} |" | |
| ) | |
| lines.extend(["", "## Task Details", ""]) | |
| for task, spec in TASK_SPECS.items(): | |
| result = payload["tasks"][task] | |
| key = spec["metric_key"] | |
| lines.extend( | |
| [ | |
| f"### {spec['direction']}. {spec['name']}", | |
| "", | |
| f"- Case study: {spec['case_study']}", | |
| f"- Input: {spec['input']}", | |
| f"- Middle process modules: {spec['middle_process']}", | |
| f"- Output: {spec['output']}", | |
| f"- Minimal baseline: {spec['minimal_baseline']}", | |
| f"- Neural baseline: {spec['neural_baseline']}", | |
| f"- Minimal result: {fmt_metric(task_main_metric(task, result, 'minimal'), key)} {spec['metric_name']}", | |
| f"- Neural result: {fmt_metric(task_main_metric(task, result, 'neural_mlp'), key)} {spec['metric_name']}", | |
| f"- Limitation: {spec['current_limit']}", | |
| "", | |
| ] | |
| ) | |
| (OUT_DIR / "research_direction_extension_summary.md").write_text("\n".join(lines).rstrip() + "\n", encoding="utf-8") | |
| def svg_text(x: int, y: int, text: str, size: int = 16, weight: int = 500, color: str = "#f4f8ef") -> str: | |
| return ( | |
| f'<text x="{x}" y="{y}" font-size="{size}" font-weight="{weight}" ' | |
| f'fill="{color}">{html.escape(text)}</text>' | |
| ) | |
| def write_svg(payload: dict[str, Any]) -> None: | |
| CHARTS.mkdir(parents=True, exist_ok=True) | |
| width = 1420 | |
| height = 920 | |
| colors = {"A": "#ccffa0", "B": "#7ae5c3", "C": "#d8f4a5", "D": "#9bdfff"} | |
| svg: list[str] = [ | |
| f'<svg xmlns="http://www.w3.org/2000/svg" width="{width}" height="{height}" viewBox="0 0 {width} {height}">', | |
| '<rect width="1420" height="920" fill="#020502"/>', | |
| '<rect x="28" y="28" width="1364" height="864" rx="18" fill="#050905" stroke="#ccffa0" stroke-opacity="0.24"/>', | |
| svg_text(66, 88, "Ropedia Xperience-10M: four direction extension probes", 32, 760), | |
| svg_text(66, 122, "Data-backed from the same 1,161-window public sample feature tensor; extension probes for later held-out studies.", 17, 500, "#a5afa2"), | |
| ] | |
| x0 = 66 | |
| y0 = 166 | |
| card_w = 620 | |
| card_h = 160 | |
| gap_x = 44 | |
| gap_y = 34 | |
| for i, (task, spec) in enumerate(TASK_SPECS.items()): | |
| result = payload["tasks"][task] | |
| row = i // 2 | |
| col = i % 2 | |
| x = x0 + col * (card_w + gap_x) | |
| y = y0 + row * (card_h + gap_y) | |
| color = colors[spec["direction"]] | |
| min_v = task_main_metric(task, result, "minimal") | |
| nn_v = task_main_metric(task, result, "neural_mlp") | |
| metric = spec["metric_name"] | |
| svg.extend( | |
| [ | |
| f'<rect x="{x}" y="{y}" width="{card_w}" height="{card_h}" rx="10" fill="#071207" stroke="#ccffa0" stroke-opacity="0.22"/>', | |
| f'<rect x="{x}" y="{y}" width="10" height="{card_h}" rx="5" fill="{color}"/>', | |
| f'<circle cx="{x + 42}" cy="{y + 40}" r="24" fill="{color}" opacity="0.14"/>', | |
| svg_text(x + 32, y + 48, spec["direction"], 21, 760, color), | |
| svg_text(x + 76, y + 35, spec["name"], 20, 760), | |
| svg_text(x + 76, y + 62, spec["direction_name"], 13, 650, "#a5afa2"), | |
| svg_text(x + 76, y + 94, f"Minimal: {fmt_metric(min_v, spec['metric_key'])} {metric}", 16, 700, "#f4f8ef"), | |
| svg_text(x + 300, y + 94, f"Neural MLP: {fmt_metric(nn_v, spec['metric_key'])} {metric}", 16, 700, "#f4f8ef"), | |
| svg_text(x + 76, y + 125, spec["output"], 13, 500, "#dce8d7"), | |
| ] | |
| ) | |
| min_score = choose_score(task, result["minimal"]) | |
| nn_score = choose_score(task, result["neural_mlp"]) if result.get("neural_mlp") else 0.0 | |
| bar_x = x + 76 | |
| bar_y = y + 138 | |
| bar_w = 440 | |
| svg.append(f'<rect x="{bar_x}" y="{bar_y}" width="{bar_w}" height="8" rx="4" fill="#ccffa0" opacity="0.14"/>') | |
| svg.append(f'<rect x="{bar_x}" y="{bar_y}" width="{max(4, min(bar_w, bar_w * min_score)):.1f}" height="8" rx="4" fill="{color}" opacity="0.72"/>') | |
| svg.append(f'<rect x="{bar_x}" y="{bar_y + 12}" width="{bar_w}" height="8" rx="4" fill="#ccffa0" opacity="0.14"/>') | |
| svg.append(f'<rect x="{bar_x}" y="{bar_y + 12}" width="{max(4, min(bar_w, bar_w * nn_score)):.1f}" height="8" rx="4" fill="#ffffff" opacity="0.78"/>') | |
| legend_y = 570 | |
| svg.extend( | |
| [ | |
| svg_text(66, legend_y, "How to read this", 24, 760), | |
| svg_text(66, legend_y + 34, "Each card adds one concrete task to a research direction using existing sample modalities.", 16, 500, "#dce8d7"), | |
| svg_text(66, legend_y + 62, "Colored bar: minimal baseline normalized score. White bar: neural MLP normalized score. Lower-is-better MAE is shown as 1 - MAE for bar length only.", 16, 500, "#dce8d7"), | |
| '<line x1="66" y1="675" x2="1354" y2="675" stroke="#ccffa0" stroke-opacity="0.18"/>', | |
| svg_text(66, 724, "Implementation boundary", 22, 760), | |
| svg_text(66, 758, "A: motion-energy proxy, not a full human body model. B: view-feature retrieval, not neural rendering.", 16, 500, "#dce8d7"), | |
| svg_text(66, 786, "C: phase-progress regression, not open-world intent. D: ego-motion forecast, not a persistent map.", 16, 500, "#dce8d7"), | |
| svg_text(66, 835, "All metrics are computed from held-out chronological windows of the same public sample episode.", 16, 700, "#f4f8ef"), | |
| ] | |
| ) | |
| svg.append("</svg>") | |
| (CHARTS / "research_direction_extension_tasks.svg").write_text("\n".join(svg), encoding="utf-8") | |
| def build_payload(args: argparse.Namespace) -> dict[str, Any]: | |
| X, starts, ends, rows, manifest = load_inputs(args.results_dir) | |
| global OUT_DIR | |
| OUT_DIR = args.output_dir | |
| OUT_DIR.mkdir(parents=True, exist_ok=True) | |
| tasks = OrderedDict() | |
| tasks["body_motion_intensity"] = task_body_motion_intensity(X, rows, manifest, args) | |
| tasks["multi_view_consistency_retrieval"] = task_multi_view_retrieval(X, rows, manifest, args) | |
| tasks["action_phase_progress"] = task_action_phase_progress(X, rows, manifest, args) | |
| tasks["ego_motion_forecast"] = task_ego_motion_forecast(X, rows, manifest, args) | |
| payload = { | |
| "source": { | |
| "shared_windows": str((args.results_dir / "shared_windows.npz").relative_to(ROOT)), | |
| "windows_csv": str((args.results_dir / "windows.csv").relative_to(ROOT)), | |
| "feature_manifest": str((args.results_dir / "feature_manifest.json").relative_to(ROOT)), | |
| }, | |
| "dataset_scope": { | |
| "sample_episode_count": 1, | |
| "num_windows": int(len(X)), | |
| "feature_dim": int(X.shape[1]), | |
| "first_start_frame": int(starts[0]), | |
| "last_end_frame": int(ends[-1]), | |
| "warning": "Single public sample episode; these extension probes validate task design and pipeline mechanics, not cross-episode generalization.", | |
| }, | |
| "baselines": { | |
| "minimal": "Ridge classifiers/regressors/projections plus cosine retrieval on the committed feature tensor.", | |
| "neural_mlp": "Small one-hidden-layer PyTorch MLP heads using the same inputs, targets, chronological split, and evaluator.", | |
| }, | |
| "run_config": { | |
| "train_fraction": float(args.train_fraction), | |
| "ridge_l2": float(args.ridge_l2), | |
| "seed": int(args.seed), | |
| "future_windows": int(args.future_windows), | |
| "neural_epochs": int(args.neural_epochs), | |
| "neural_hidden_dim": int(args.neural_hidden_dim), | |
| "neural_batch_size": int(args.neural_batch_size), | |
| "skip_neural": bool(args.skip_neural), | |
| }, | |
| "task_specs": TASK_SPECS, | |
| "tasks": tasks, | |
| } | |
| return payload | |
| def main() -> int: | |
| args = parse_args() | |
| payload = build_payload(args) | |
| write_json(args.output_dir / "research_direction_extension_results.json", payload) | |
| write_json(DOCS_DATA / "research_direction_extensions.json", payload) | |
| write_markdown(payload) | |
| write_svg(payload) | |
| print(f"Wrote {args.output_dir / 'research_direction_extension_results.json'}") | |
| print(f"Wrote {CHARTS / 'research_direction_extension_tasks.svg'}") | |
| return 0 | |
| if __name__ == "__main__": | |
| raise SystemExit(main()) | |