#!/usr/bin/env python3 """Organize the 20 Xperience-10M tasks into the four Ropedia research tracks. The script is intentionally deterministic: it reads the committed task metrics, adds a manually curated taxonomy, and writes machine-readable artifacts used by the README, website, and Hugging Face pages. """ from __future__ import annotations import csv import html import json from collections import OrderedDict from pathlib import Path from typing import Any from task_display import task_display_name ROOT = Path(__file__).resolve().parents[1] RESULTS = ROOT / "results" / "episode_task_suite" OUT_DIR = RESULTS / "research_directions" DOCS_DATA = ROOT / "docs" / "data" CHARTS = ROOT / "docs" / "assets" / "charts" SUMMARY_REPORT = RESULTS / "summary_report.json" TASK_SUITE_20 = DOCS_DATA / "task_suite_20.json" DIRECTIONS: OrderedDict[str, dict[str, Any]] = OrderedDict( [ ( "A", { "id": "human_motion", "name": "Human Modeling & Motion Understanding", "focus": "Human/hand/body motion, deformation priors, human-object interaction, affordance modeling.", "preferred_background": "Human pose/shape estimation, SMPL-style models, motion capture, or motion generation.", "current_status": "partially implemented", "current_readout": "The sample supports hand trajectory forecasting and contact/object probes, but it does not yet include a full body/shape model or multi-person priors.", "next_steps": [ "Add SMPL/SMPL-X or MANO-style body/hand parameter targets where available.", "Train sequence models over multi-episode motion trajectories instead of isolated windows.", "Evaluate affordance prediction on held-out objects and held-out episodes.", ], }, ), ( "B", { "id": "reconstruction_rendering", "name": "3D/4D Reconstruction & Neural Rendering", "focus": "Multi-view dynamic scene reconstruction, NeRF/Gaussian Splatting, novel-view synthesis.", "preferred_background": "3D reconstruction, neural rendering, camera calibration, and bundle adjustment.", "current_status": "proxy tasks only", "current_readout": "The current suite checks cross-modal alignment and depth/video reconstruction proxies; it does not yet train a renderer or reconstruct geometry.", "next_steps": [ "Use calibrated multi-view video plus SLAM pose to build per-episode camera trajectories.", "Add depth-supervised point clouds, TSDF, Gaussian Splatting, or NeRF baselines.", "Evaluate novel-view synthesis and temporal consistency across held-out views/time.", ], }, ), ( "C", { "id": "egocentric_interaction", "name": "Egocentric Vision & Interaction", "focus": "Egocentric action and intention understanding, hand-object interaction, gaze/attention modeling, task structure modeling.", "preferred_background": "Video understanding, action recognition, or egocentric vision.", "current_status": "strongest implemented track", "current_readout": "The unified 20-task suite directly targets egocentric action, task state, interaction, grounding, forecasting, and alignment.", "next_steps": [ "Move from single-episode chronological splits to held-out-episode splits.", "Use audio together with stronger multimodal backbones for action, intent, and grounding.", "Evaluate long-horizon task success prediction and action-conditioned generation.", ], }, ), ( "D", { "id": "world_modeling", "name": "Scene Reconstruction & World Modeling", "focus": "Long-term consistent 3D/4D scene mapping, scene graphs, object- and space-centric representations, spatial reasoning.", "preferred_background": "Large-scale mapping, semantic reconstruction, or agent world models.", "current_status": "early proxy tasks", "current_readout": "The current tasks probe temporal structure, object relevance, cross-modal retrieval, and modality prediction, but they do not yet build persistent maps or scene graphs.", "next_steps": [ "Convert windows into persistent object/scene-state nodes with timestamps and camera poses.", "Add map consistency, object permanence, and spatial relation prediction tasks.", "Train held-out-episode world models that predict future observations and task state.", ], }, ), ] ) TASK_TAXONOMY: OrderedDict[str, dict[str, Any]] = OrderedDict( [ ( "timeline_action", { "name": "Timeline action recognition", "family": "supervised", "input": "all featurized modalities", "output": "current action label", "primary_direction": "C", "direction_roles": {"C": "direct", "A": "proxy"}, "why": "Reads egocentric sensor state as the current human action; also provides a weak human-motion readout.", "current_limit": "Chronological single-episode split creates unseen future action classes.", }, ), ( "timeline_subtask", { "name": "Timeline subtask recognition", "family": "supervised", "input": "all featurized modalities", "output": "current subtask label", "primary_direction": "C", "direction_roles": {"C": "direct", "D": "proxy"}, "why": "Segments egocentric task state and provides a first proxy for symbolic world/task state.", "current_limit": "Single-episode ordering makes future subtasks hard to generalize.", }, ), ( "transition_detection", { "name": "Action transition detection", "family": "diagnostic", "input": "all featurized modalities", "output": "boundary vs steady state", "primary_direction": "C", "direction_roles": {"C": "direct", "D": "diagnostic"}, "why": "Localizes egocentric task boundaries and diagnoses temporal state changes.", "current_limit": "Boundary class is sparse, so accuracy alone is misleading.", }, ), ( "next_action", { "name": "Short-horizon next action", "family": "supervised", "input": "current multimodal window", "output": "action 20 frames later", "primary_direction": "C", "direction_roles": {"C": "direct", "D": "proxy"}, "why": "Tests action intention/task-flow prediction from egocentric context.", "current_limit": "Unseen future labels dominate the single-episode chronological test.", }, ), ( "hand_trajectory_forecast", { "name": "Hand trajectory forecasting", "family": "forecast", "input": "current multimodal window", "output": "future left/right hand 3D joints", "primary_direction": "A", "direction_roles": {"A": "direct", "C": "proxy"}, "why": "Directly predicts human hand motion and supports hand-object interaction modeling.", "current_limit": "Forecasting is window-level and not yet a full sequence or policy model.", }, ), ( "contact_prediction", { "name": "Body/object contact prediction", "family": "supervised", "input": "non-contact/non-caption features", "output": "binary contact label", "primary_direction": "A", "direction_roles": {"A": "direct", "C": "proxy"}, "why": "Targets physical interaction state, a core affordance and manipulation signal.", "current_limit": "The public sample is degenerate for this target because one class dominates.", }, ), ( "object_relevance", { "name": "Relevant object set prediction", "family": "supervised", "input": "non-caption feature blocks", "output": "multi-label object set", "primary_direction": "C", "direction_roles": {"C": "direct", "A": "proxy", "D": "proxy"}, "why": "Connects egocentric activity to manipulated objects and early object-centric state.", "current_limit": "Object labels are language-derived and sparse in one episode.", }, ), ( "caption_grounding", { "name": "Caption-to-window grounding", "family": "retrieval", "input": "caption objects/interaction query and candidate sensor windows", "output": "matching time window", "primary_direction": "C", "direction_roles": {"C": "direct", "D": "proxy"}, "why": "Grounds language annotation into egocentric sensor time and task state.", "current_limit": "Bag-of-objects language features are too weak for rich grounding.", }, ), ( "cross_modal_retrieval", { "name": "Cross-modal retrieval", "family": "retrieval", "input": "motion/IMU/camera query", "output": "matching depth/video window", "primary_direction": "C", "direction_roles": {"C": "diagnostic", "B": "proxy", "D": "proxy"}, "why": "Tests whether synchronized modalities identify the same 4D moment, a prerequisite for reconstruction and world modeling.", "current_limit": "Retrieval shows an alignment signal, not geometric reconstruction.", }, ), ( "modality_reconstruction", { "name": "Modality reconstruction", "family": "forecast", "input": "motion/IMU/camera", "output": "depth/video feature vector", "primary_direction": "B", "direction_roles": {"B": "proxy", "D": "proxy"}, "why": "Predicts visual/depth state from non-target sensors as a weak reconstruction/world-model objective.", "current_limit": "Feature-vector reconstruction is not pixel, depth-map, mesh, NeRF, or Gaussian reconstruction.", }, ), ( "temporal_order", { "name": "Temporal order verification", "family": "diagnostic", "input": "two adjacent windows", "output": "correct vs reversed order", "primary_direction": "C", "direction_roles": {"C": "diagnostic", "D": "diagnostic"}, "why": "Checks whether features encode local time direction and task progression.", "current_limit": "Only local adjacent ordering, not long-horizon causal modeling.", }, ), ( "misalignment_detection", { "name": "Cross-modal misalignment detection", "family": "diagnostic", "input": "motion plus visual/depth pair", "output": "aligned vs shifted", "primary_direction": "C", "direction_roles": {"C": "diagnostic", "B": "diagnostic", "D": "diagnostic"}, "why": "Detects temporal desynchronization, a key data-quality gate for multimodal reconstruction and world models.", "current_limit": "Synthetic shifts diagnose alignment but do not solve calibration or mapping.", }, ), ( "long_horizon_next_action", { "name": "Long-horizon next-action forecasting", "family": "classification", "input": "current and historical windows", "output": "future action label", "primary_direction": "C", "direction_roles": {"C": "direct", "D": "proxy"}, "why": "Extends short-horizon intention prediction into longer activity futures, a key egocentric and world-model signal.", "current_limit": "Evaluated from sample-supported future labels, not full open-world action generation.", }, ), ( "next_subtask_forecast", { "name": "Long-horizon next-subtask forecasting", "family": "classification", "input": "current and historical windows", "output": "future procedure-step label", "primary_direction": "C", "direction_roles": {"C": "direct", "D": "proxy"}, "why": "Measures whether the model can anticipate the next procedural phase rather than only the current frame state.", "current_limit": "Subtask labels are constrained to the available annotation vocabulary.", }, ), ( "interaction_text_prediction", { "name": "Interaction text prediction", "family": "classification", "input": "window features without target text leakage", "output": "natural-language interaction class", "primary_direction": "C", "direction_roles": {"C": "direct", "A": "proxy"}, "why": "Connects egocentric observations to the natural-language interaction semantics carried by the annotation.", "current_limit": "Public derived features retain hashed text targets; raw full text requires the official annotation source.", }, ), ( "action_object_relation", { "name": "Action-object relation prediction", "family": "classification", "input": "window features with target-side relation leakage excluded", "output": "action-object relation class", "primary_direction": "C", "direction_roles": {"C": "direct", "D": "proxy"}, "why": "Tests whether action recognition and object state are connected as a relational interaction representation.", "current_limit": "Relation labels are derived from the public-sample annotation scope.", }, ), ( "object_set_forecast", { "name": "Future object-set forecasting", "family": "multi-label", "input": "current and historical windows", "output": "future object set", "primary_direction": "D", "direction_roles": {"D": "direct", "C": "proxy"}, "why": "Asks whether the current scene state supports predicting which objects will matter later.", "current_limit": "This is a set-level proxy, not a persistent 3D scene graph.", }, ), ( "imu_to_hand_pose", { "name": "IMU-to-hand pose reconstruction", "family": "regression", "input": "IMU and motion context", "output": "hand pose target", "primary_direction": "A", "direction_roles": {"A": "direct", "B": "proxy"}, "why": "Measures human-motion reconstruction from wearable and motion cues.", "current_limit": "Pose reconstruction is window-level and does not yet fit a full parametric hand/body model.", }, ), ( "camera_view_sync_retrieval", { "name": "Camera-view synchronization retrieval", "family": "retrieval", "input": "one camera-view/window query", "output": "matching synchronized view", "primary_direction": "B", "direction_roles": {"B": "direct", "D": "proxy"}, "why": "Tests whether synchronized multi-view structure is recoverable across camera streams.", "current_limit": "Retrieval checks view consistency but does not reconstruct geometry by itself.", }, ), ( "time_to_transition", { "name": "Time-to-next-transition regression", "family": "regression", "input": "current temporal window state", "output": "frames/time until the next transition", "primary_direction": "C", "direction_roles": {"C": "diagnostic", "D": "diagnostic"}, "why": "Measures temporal boundary awareness as a continuous timing target.", "current_limit": "Regression is local to the annotated public sample timeline.", }, ), ] ) METRIC_SPECS = { "timeline_action": ("macro_f1", "macro-F1", "higher"), "timeline_subtask": ("macro_f1", "macro-F1", "higher"), "transition_detection": ("macro_f1", "macro-F1", "higher"), "next_action": ("macro_f1", "macro-F1", "higher"), "hand_trajectory_forecast": ("mpjpe", "MPJPE", "lower"), "contact_prediction": ("macro_f1", "macro-F1", "higher"), "object_relevance": ("micro_f1", "micro-F1", "higher"), "caption_grounding": ("mrr", "MRR", "higher"), "cross_modal_retrieval": ("mrr", "MRR", "higher"), "modality_reconstruction": ("r2", "R2", "higher"), "temporal_order": ("f1", "F1", "higher"), "misalignment_detection": ("f1", "F1", "higher"), "long_horizon_next_action": ("macro_f1", "macro-F1", "higher"), "next_subtask_forecast": ("macro_f1", "macro-F1", "higher"), "interaction_text_prediction": ("macro_f1", "macro-F1", "higher"), "action_object_relation": ("macro_f1", "macro-F1", "higher"), "object_set_forecast": ("micro_f1", "micro-F1", "higher"), "imu_to_hand_pose": ("mae", "MAE", "lower"), "camera_view_sync_retrieval": ("mrr", "MRR", "higher"), "time_to_transition": ("mae", "MAE", "lower"), } def load_summary() -> dict[str, Any]: return json.loads(SUMMARY_REPORT.read_text(encoding="utf-8")) def load_unified_tasks() -> dict[str, dict[str, Any]]: if not TASK_SUITE_20.exists(): return {} payload = json.loads(TASK_SUITE_20.read_text(encoding="utf-8")) return { task["task_id"]: task for task in payload.get("tasks", []) if isinstance(task, dict) and task.get("task_id") } def metric_value(metrics: dict[str, Any] | None, task: str) -> float | None: if not metrics: return None key = METRIC_SPECS[task][0] value = metrics.get(key) return float(value) if value is not None else None def choose_better(task: str, minimal: float | None, neural: float | None) -> str: if minimal is None or neural is None: return "unavailable" _, _, direction = METRIC_SPECS[task] delta = neural - minimal if abs(delta) < 1e-9: return "tie" if direction == "lower": return "neural_mlp" if delta < 0 else "minimal" return "neural_mlp" if delta > 0 else "minimal" def fmt_metric(value: float | None) -> str: if value is None: return "n/a" if abs(value) >= 10: return f"{value:.3f}" return f"{value:.4f}" def baseline_readout(label: str) -> str: if label == "tie": return "Both baselines are tied" if label == "minimal": return "Minimal baseline is stronger" if label == "neural_mlp": return "Neural MLP is stronger" return "Baseline comparison is unavailable" def build_taxonomy(summary: dict[str, Any]) -> dict[str, Any]: minimal_tasks = summary["tasks"] neural_tasks = summary.get("neural_tasks", {}) unified_tasks = load_unified_tasks() task_records: OrderedDict[str, dict[str, Any]] = OrderedDict() direction_counts = { code: {"direct": 0, "proxy": 0, "diagnostic": 0, "total_links": 0} for code in DIRECTIONS } for task, spec in TASK_TAXONOMY.items(): unified = unified_tasks.get(task, {}) metric_key, metric_name, metric_direction = METRIC_SPECS[task] metric_key = unified.get("metric_key") or metric_key metric_name = unified.get("metric_name") or metric_name metric_direction = unified.get("metric_direction") or metric_direction if task in minimal_tasks: minimal_metric = metric_value(minimal_tasks.get(task), task) neural_metric = metric_value(neural_tasks.get(task), task) else: minimal = unified.get("minimal_primary_metric") neural = unified.get("neural_primary_metric") minimal_metric = float(minimal) if minimal is not None else None neural_metric = float(neural) if neural is not None else None better = choose_better(task, minimal_metric, neural_metric) roles = spec["direction_roles"] for direction_code, role in roles.items(): direction_counts[direction_code][role] += 1 direction_counts[direction_code]["total_links"] += 1 task_records[task] = { **spec, "display_name": unified.get("task_display_name") or task_display_name(task), "artifact_id": task, "metric": { "key": metric_key, "name": metric_name, "direction": metric_direction, "minimal": minimal_metric, "neural_mlp": neural_metric, "better_baseline": better, }, } direction_records = OrderedDict() for code, info in DIRECTIONS.items(): linked_tasks = [ task for task, spec in task_records.items() if code in spec["direction_roles"] ] direction_records[code] = { **info, "tasks": linked_tasks, "task_display_names": [task_records[task]["display_name"] for task in linked_tasks], "counts": direction_counts[code], } return { "source": "docs/data/task_suite_20.json plus results/episode_task_suite/summary_report.json", "dataset_scope": { "sample_episode_count": 1, "num_frames": summary.get("num_frames"), "num_windows": summary.get("num_windows"), "feature_dim": summary.get("feature_dim"), "warning": "Single public sample episode; this supports pipeline/task evidence, while cross-episode generalization requires held-out episodes.", }, "baselines": { "minimal": f"Interpretable softmax, logistic, ridge, and retrieval heads over the {summary.get('feature_dim'):,}-d window feature vector.", "neural_mlp": "Small PyTorch MLP classifiers/regressors using the same features, splits, and task contracts.", }, "task_count": len(task_records), "directions": direction_records, "tasks": task_records, } def write_csv(taxonomy: dict[str, Any]) -> None: path = OUT_DIR / "research_direction_task_map.csv" with path.open("w", newline="", encoding="utf-8") as handle: writer = csv.writer(handle, lineterminator="\n") writer.writerow( [ "direction", "direction_name", "task", "task_display_name", "task_name", "family", "relationship", "primary_direction", "metric_name", "minimal_metric", "neural_mlp_metric", "better_baseline", "why", "current_limit", ] ) for task, spec in taxonomy["tasks"].items(): metric = spec["metric"] for direction_code, relationship in spec["direction_roles"].items(): writer.writerow( [ direction_code, taxonomy["directions"][direction_code]["name"], task, spec["display_name"], spec["name"], spec["family"], relationship, spec["primary_direction"], metric["name"], "" if metric["minimal"] is None else f"{metric['minimal']:.12g}", "" if metric["neural_mlp"] is None else f"{metric['neural_mlp']:.12g}", metric["better_baseline"], spec["why"], spec["current_limit"], ] ) def write_markdown(taxonomy: dict[str, Any]) -> None: lines = [ "# Four-Direction Task Taxonomy", "", "This file is generated by `scripts/research_direction_taxonomy.py` from the unified 20-task index and committed metrics.", "It maps the current Xperience-10M sample tasks to the four Ropedia research directions and marks which parts require multi-episode evidence.", "", "## Baseline Families", "", "| Baseline | Meaning |", "| --- | --- |", f"| Minimal | {taxonomy['baselines']['minimal']} |", f"| Neural MLP | {taxonomy['baselines']['neural_mlp']} |", "", "## Direction Coverage", "", "| Direction | Current status | Direct | Proxy | Diagnostic | Current readout |", "| --- | --- | ---: | ---: | ---: | --- |", ] for code, info in taxonomy["directions"].items(): counts = info["counts"] lines.append( f"| {code}. {info['name']} | {info['current_status']} | {counts['direct']} | {counts['proxy']} | {counts['diagnostic']} | {info['current_readout']} |" ) lines.extend( [ "", "## Task Mapping With Two Baselines", "", "| Task | Artifact id | Primary direction | Related directions | Minimal | Neural MLP | Readout |", "| --- | --- | --- | --- | ---: | ---: | --- |", ] ) for task, spec in taxonomy["tasks"].items(): metric = spec["metric"] related = ", ".join( f"{code}:{role}" for code, role in spec["direction_roles"].items() ) minimal = f"{fmt_metric(metric['minimal'])} {metric['name']}" neural = f"{fmt_metric(metric['neural_mlp'])} {metric['name']}" readout = f"{baseline_readout(metric['better_baseline'])}. {spec['current_limit']}" lines.append( f"| {spec['display_name']} | `{task}` | {spec['primary_direction']} | {related} | {minimal} | {neural} | {readout} |" ) lines.extend(["", "## Next-Step Interpretation", ""]) for code, info in taxonomy["directions"].items(): lines.append(f"### {code}. {info['name']}") lines.append("") lines.append(info["current_readout"]) lines.append("") for step in info["next_steps"]: lines.append(f"- {step}") lines.append("") (OUT_DIR / "research_direction_summary.md").write_text( "\n".join(lines).rstrip() + "\n", encoding="utf-8" ) def svg_text(x: int, y: int, text: str, size: int = 16, weight: int = 500, color: str = "#f4f8ef") -> str: return ( f'{html.escape(text)}' ) def write_svg(taxonomy: dict[str, Any]) -> None: width = 1180 height = 700 margin = 58 card_w = 515 card_h = 220 colors = {"direct": "#ccffa0", "proxy": "#7ae5c3", "diagnostic": "#d8f4a5"} cards = [] for idx, (code, info) in enumerate(taxonomy["directions"].items()): row = idx // 2 col = idx % 2 x = margin + col * (card_w + 34) y = 130 + row * (card_h + 34) counts = info["counts"] total = max(1, counts["direct"] + counts["proxy"] + counts["diagnostic"]) bar_x = x + 24 bar_y = y + 132 bar_w = card_w - 48 cursor = bar_x segments = [] for key in ("direct", "proxy", "diagnostic"): seg_w = round(bar_w * counts[key] / total) if counts[key] > 0: segments.append( f'' ) cursor += seg_w task_labels = ", ".join(info["task_display_names"][:5]) if len(info["task_display_names"]) > 5: task_labels += f", +{len(info['task_display_names']) - 5}" cards.append( "\n".join( [ f'', svg_text(x + 24, y + 42, f"{code}. {info['name']}", 21, 700), svg_text(x + 24, y + 75, info["current_status"], 15, 700, "#ccffa0"), svg_text(x + 24, y + 108, f"Tasks: {task_labels}", 14, 500, "#dce8d7"), *segments, svg_text(x + 24, y + 174, f"Direct {counts['direct']}", 14, 700, colors["direct"]), svg_text(x + 150, y + 174, f"Proxy {counts['proxy']}", 14, 700, colors["proxy"]), svg_text(x + 270, y + 174, f"Diagnostic {counts['diagnostic']}", 14, 700, colors["diagnostic"]), ] ) ) legend = [] lx = margin for key, label in ( ("direct", "Direct task"), ("proxy", "Proxy / prerequisite"), ("diagnostic", "Diagnostic probe"), ): legend.extend( [ f'', svg_text(lx + 24, 636, label, 14, 600, "#dce8d7"), ] ) lx += 200 svg = f""" {svg_text(margin, 64, "Xperience-10M 20-Task Suite: Four Research Directions", 30, 800)} {svg_text(margin, 96, "One public sample episode, two baseline families, Qwen3-Omni/Cosmos3 diagnostics, and explicit direct/proxy coverage.", 16, 500, "#a5afa2")} {"".join(cards)} {"".join(legend)} {svg_text(margin, 670, "Generated from docs/data/task_suite_20.json, committed metrics, and scripts/research_direction_taxonomy.py", 13, 500, "#a5afa2")} """ (CHARTS / "research_direction_coverage.svg").write_text(svg, encoding="utf-8") def main() -> None: OUT_DIR.mkdir(parents=True, exist_ok=True) DOCS_DATA.mkdir(parents=True, exist_ok=True) CHARTS.mkdir(parents=True, exist_ok=True) taxonomy = build_taxonomy(load_summary()) json_text = json.dumps(taxonomy, indent=2, ensure_ascii=False) (OUT_DIR / "research_direction_taxonomy.json").write_text(json_text + "\n", encoding="utf-8") (DOCS_DATA / "research_directions.json").write_text(json_text + "\n", encoding="utf-8") write_csv(taxonomy) write_markdown(taxonomy) write_svg(taxonomy) print(f"Wrote {OUT_DIR / 'research_direction_taxonomy.json'}") print(f"Wrote {OUT_DIR / 'research_direction_task_map.csv'}") print(f"Wrote {OUT_DIR / 'research_direction_summary.md'}") print(f"Wrote {DOCS_DATA / 'research_directions.json'}") print(f"Wrote {CHARTS / 'research_direction_coverage.svg'}") if __name__ == "__main__": main()