"""Gradio API replacing kimodo_demo's Viser entrypoint. Exposes a single endpoint at `/gradio_api/call/kimodo_motion` that accepts: (prompt, num_frames, seed, cfg, num_steps, constraints_json) and returns a JSON envelope: { "status": "ok", "numFrames": int, "fps": 30, "rootTranslation": [[x,y,z], ...], # [N, 3] "jointRotMats": [[[[...]]]], # [N, 30, 3, 3] "footContacts": [[lh, lt, lte, rh, rt, rte]], # [N, 6] (optional; # SOMA-77 layout — toe-end copies # toe-base contact, see # kimodo.skeleton.definitions # .output_to_SOMASkeleton77) "summary": str } The webapp's src/lib/services/kimodo.ts polls `/gradio_api/call/kimodo_motion/` for the SSE event stream. """ from __future__ import annotations import json import os import sys import traceback import gradio as gr import numpy as np import torch from constraints_schema import parse_constraints # Lazy imports of kimodo so import-time failures (e.g. missing CUDA on the # Space build container) don't kill `python server.py --help`. _model = None _skeleton = None _device = None def _load_model(): global _model, _skeleton, _device if _model is not None: return _model, _skeleton, _device print("[server] loading Kimodo-SOMA-RP-v1.1 ...", file=sys.stderr, flush=True) from kimodo import load_model # Must be a string (kimodo passes this through Hydra/OmegaConf which # rejects non-primitive types like torch.device). _device = "cuda:0" if torch.cuda.is_available() else "cpu" print(f"[server] device = {_device}", file=sys.stderr, flush=True) model, resolved = load_model( "Kimodo-SOMA-RP-v1.1", device=_device, default_family="Kimodo", return_resolved_name=True, ) print(f"[server] resolved model = {resolved}", file=sys.stderr, flush=True) _model = model _skeleton = model.skeleton return _model, _skeleton, _device def kimodo_motion( prompt: str, num_frames: int, seed: int, cfg: float, num_steps: int, constraints_json: str, progress: gr.Progress = gr.Progress(), # noqa: B008 — Gradio convention ) -> dict: """Generate one SOMA motion sample. Heavy work runs on the GPU; constraint parsing on the CPU. Returns the JSON envelope the webapp expects.""" try: if not prompt or not prompt.strip(): return {"status": "error", "error": "prompt is empty"} n = int(num_frames) if n < 1 or n > 300: return { "status": "error", "error": f"num_frames must be in [1, 300]; got {n}", } # Validate the constraints payload up front so a bad request doesn't # waste GPU time. We accept the same JSON the kimodo CLI accepts — # extra cross-list validation in constraints_schema bounds-checks frame # indices against num_frames. try: raw = json.loads(constraints_json) if constraints_json else [] parse_constraints(raw, n) # validates shape + bounds except (ValueError, json.JSONDecodeError) as e: return {"status": "error", "error": f"constraint validation: {e}"} progress(0.02, desc="Loading model...") model, skeleton, device = _load_model() # Convert the JSON list of dicts into kimodo constraint objects via # the official loader — accepts a list-of-dicts directly. from kimodo.constraints import load_constraints_lst constraint_lst = load_constraints_lst(raw, skeleton, device=device) if seed is not None and int(seed) >= 0: from kimodo.tools import seed_everything seed_everything(int(seed)) progress(0.10, desc=f"Diffusion ({int(num_steps)} steps)...") cfg_kwargs = {"cfg_type": "regular", "cfg_weight": float(cfg)} # Single sample, single prompt. If you want multi-prompt later, this is # where you'd thread it through. output = model( [prompt.strip()], [n], constraint_lst=constraint_lst, num_denoising_steps=int(num_steps), num_samples=1, multi_prompt=True, num_transition_frames=20, return_numpy=True, **cfg_kwargs, ) progress(0.92, desc="Serializing...") # Kimodo's SOMA-RP model trains on the 30-joint SOMA skeleton but emits # output at 77 joints (the somaskel77 representation, with relaxed # hand poses added). We need to: # 1. Get/compute 77-joint local rotation matrices. # 2. Convert back to the 30-joint subset via from_SOMASkeleton77. # 3. Root position from posed_joints[:, 0, :] (joint 0 is Hips in both). if "posed_joints" not in output or "global_rot_mats" not in output: return { "status": "error", "error": f"unexpected model output keys: {list(output.keys())}", } posed_joints = output["posed_joints"] global_rot_mats = output["global_rot_mats"] if posed_joints.ndim != 4 or global_rot_mats.ndim != 5: return { "status": "error", "error": ( f"unexpected shapes: posed_joints={posed_joints.shape}, " f"global_rot_mats={global_rot_mats.shape}" ), } # Step 1: 77-joint local rotation matrices. joints_pos_t = torch.from_numpy(posed_joints[0]).to(device) if "local_rot_mats" in output: local_rot_mats_77 = torch.from_numpy(output["local_rot_mats"][0]).to(device) else: from kimodo.skeleton import global_rots_to_local_rots joints_rot_t = torch.from_numpy(global_rot_mats[0]).to(device) # Use the somaskel77 kintree (joints_rot was emitted at 77 joints). local_rot_mats_77 = global_rots_to_local_rots(joints_rot_t, skeleton.somaskel77) # Step 2: 77 → 30 via the official slicing helper. local_rot_mats_30 = skeleton.from_SOMASkeleton77(local_rot_mats_77) # `@ensure_batched` may have added a leading batch dim; drop it if so. if local_rot_mats_30.ndim == 5 and local_rot_mats_30.shape[0] == 1: local_rot_mats_30 = local_rot_mats_30[0] local_rot_mats = local_rot_mats_30.detach().cpu().numpy().astype(np.float32) # Step 3: root translation = Hips (joint 0) in posed_joints. root_translation = ( joints_pos_t[:, 0, :].detach().cpu().numpy().astype(np.float32) ) # Spot-check the SOMA shape: 30 joints expected for SOMA-RP-v1.1. T, J = local_rot_mats.shape[0], local_rot_mats.shape[1] if (T, J) != (n, 30): return { "status": "error", "error": ( f"expected ({n}, 30, 3, 3) for local_rot_mats, got " f"{local_rot_mats.shape}" ), } # Optional foot_contacts if the model emitted them. foot_contacts_out = None if "foot_contacts" in output: fc = output["foot_contacts"] # Drop the leading sample dim if present -> [T, 4] if fc.ndim == 3: fc = fc[0] fc = np.asarray(fc, dtype=np.float32) # 4 -> 6 channel expand for SOMA-77 (mirrors # kimodo.skeleton.definitions.output_to_SOMASkeleton77): # [L_heel, L_toe, L_toe_end(=L_toe), R_heel, R_toe, R_toe_end(=R_toe)] fc6 = np.concatenate( [fc[..., :2], fc[..., 1:2], fc[..., 2:4], fc[..., 3:4]], axis=-1 ) foot_contacts_out = fc6.tolist() progress(1.0, desc="Done") return { "status": "ok", "numFrames": int(T), "fps": int(getattr(model, "fps", 30)), "rootTranslation": root_translation.tolist(), "jointRotMats": local_rot_mats.tolist(), "footContacts": foot_contacts_out, "summary": prompt.strip(), } except Exception as e: traceback.print_exc() return {"status": "error", "error": f"{type(e).__name__}: {e}"} def _historical_probe_g1(progress: gr.Progress = gr.Progress()) -> dict: # noqa: B008 """Historical probe — confirmed Kimodo-G1-RP-v1 model loads cleanly and g1skel34 ships with per-link STL meshes (~30 MB total) at /usr/local/lib/python3.10/dist-packages/kimodo/assets/skeletons/g1skel34/. G1 has 34 DOF (pelvis, hips, knees, ankles+toes, waist 3-axis, shoulders 3-axis, elbows, wrists 3-axis, hand-roll). G1 motion needs a different renderer (rigid links transformed by joint rotations vs SOMA's LBS skin). Kept as documentation only; not registered as a Gradio endpoint. """ import importlib import os try: out: dict = {"status": "ok"} import kimodo as kpkg root = os.path.dirname(kpkg.__file__) # 1) Look for any g1 / G1 assets on disk. candidates: list[str] = [] for dirpath, _dn, filenames in os.walk(root): for fn in filenames: low = fn.lower() if "g1" in low or "g1" in dirpath.lower(): full = os.path.join(dirpath, fn) try: sz = os.path.getsize(full) except OSError: sz = -1 candidates.append(f"{full}\t{sz}") out["g1_files"] = candidates[:200] # 2) Try importing G1-related modules. for mn in ("kimodo.viz.g1_skin", "kimodo.skeleton", "kimodo.skeleton.g1", "kimodo.assets.skeletons.g1"): try: mod = importlib.import_module(mn) out[f"{mn}_attrs"] = [a for a in dir(mod) if not a.startswith("_")][:60] except Exception as e: out[f"{mn}_err"] = f"{type(e).__name__}: {e}" # 3) Try loading a G1 model. progress(0.5, desc="Trying to load G1 model ...") try: from kimodo import load_model g1_model, g1_resolved = load_model("Kimodo-G1-RP-v1", device="cpu", default_family="Kimodo", return_resolved_name=True) out["g1_model_resolved"] = g1_resolved out["g1_model_attrs"] = [a for a in dir(g1_model) if not a.startswith("_")][:50] sk = getattr(g1_model, "skeleton", None) if sk is not None: out["g1_skeleton_type"] = type(sk).__name__ out["g1_skeleton_attrs"] = [a for a in dir(sk) if not a.startswith("_")][:80] # Try the standard "joint count" attr names. for k in ("bone_order_names", "joint_names", "names"): v = getattr(sk, k, None) if v is not None: out[f"g1_skeleton_{k}"] = list(v) break except Exception as e: out["g1_load_err"] = f"{type(e).__name__}: {e}" return out except Exception as e: traceback.print_exc() return {"status": "error", "error": f"{type(e).__name__}: {e}"} def kimodo_motion_seq( prompts_json: str, frames_json: str, seed: int, cfg: float, num_steps: int, constraints_json: str, transition_frames: int = 20, progress: gr.Progress = gr.Progress(), # noqa: B008 ) -> dict: """Multi-prompt sequence variant of kimodo_motion. Generates a single motion that transitions through each prompt segment in order. prompts_json: JSON list of strings, e.g. '["walk forward", "wave hello"]' frames_json: JSON list of ints (per-segment frame counts), same length. transition_frames: how many frames the model uses to blend between segments. Returns the same envelope as kimodo_motion. The total numFrames is sum(frames). If a single segment is provided this is equivalent to kimodo_motion. """ try: prompts = json.loads(prompts_json) if prompts_json else [] if not isinstance(prompts, list) or not all(isinstance(p, str) and p.strip() for p in prompts): return {"status": "error", "error": "prompts_json must be a JSON list of non-empty strings"} frames = json.loads(frames_json) if frames_json else [] if not isinstance(frames, list) or len(frames) != len(prompts) or not all(isinstance(n, int) and 1 <= n <= 300 for n in frames): return {"status": "error", "error": "frames_json must be a JSON list of ints (1..300) matching prompts length"} total_n = sum(frames) if total_n > 600: return {"status": "error", "error": f"total frames {total_n} exceeds 600 cap"} try: raw = json.loads(constraints_json) if constraints_json else [] parse_constraints(raw, total_n) except (ValueError, json.JSONDecodeError) as e: return {"status": "error", "error": f"constraint validation: {e}"} progress(0.02, desc="Loading model...") model, skeleton, device = _load_model() from kimodo.constraints import load_constraints_lst constraint_lst = load_constraints_lst(raw, skeleton, device=device) if seed is not None and int(seed) >= 0: from kimodo.tools import seed_everything seed_everything(int(seed)) progress(0.10, desc=f"Diffusion ({len(prompts)} segments × {int(num_steps)} steps)...") cfg_kwargs = {"cfg_type": "regular", "cfg_weight": float(cfg)} output = model( [p.strip() for p in prompts], list(frames), constraint_lst=constraint_lst, num_denoising_steps=int(num_steps), num_samples=1, multi_prompt=True, num_transition_frames=int(transition_frames), return_numpy=True, **cfg_kwargs, ) progress(0.92, desc="Serializing...") if "posed_joints" not in output or "global_rot_mats" not in output: return {"status": "error", "error": f"unexpected model output keys: {list(output.keys())}"} posed_joints = output["posed_joints"] global_rot_mats = output["global_rot_mats"] joints_pos_t = torch.from_numpy(posed_joints[0]).to(device) if "local_rot_mats" in output: local_rot_mats_77 = torch.from_numpy(output["local_rot_mats"][0]).to(device) else: from kimodo.skeleton import global_rots_to_local_rots joints_rot_t = torch.from_numpy(global_rot_mats[0]).to(device) local_rot_mats_77 = global_rots_to_local_rots(joints_rot_t, skeleton.somaskel77) local_rot_mats_30 = skeleton.from_SOMASkeleton77(local_rot_mats_77) if local_rot_mats_30.ndim == 5 and local_rot_mats_30.shape[0] == 1: local_rot_mats_30 = local_rot_mats_30[0] local_rot_mats = local_rot_mats_30.detach().cpu().numpy().astype(np.float32) root_translation = joints_pos_t[:, 0, :].detach().cpu().numpy().astype(np.float32) T, J = local_rot_mats.shape[0], local_rot_mats.shape[1] # Note: the model may return slightly more or fewer frames than total_n # depending on transition handling; report whatever it gave us. foot_contacts_out = None if "foot_contacts" in output: fc = output["foot_contacts"] # Drop the leading sample dim if present -> [T, 4] if fc.ndim == 3: fc = fc[0] fc = np.asarray(fc, dtype=np.float32) # 4 -> 6 channel expand for SOMA-77 (mirrors # kimodo.skeleton.definitions.output_to_SOMASkeleton77): # [L_heel, L_toe, L_toe_end(=L_toe), R_heel, R_toe, R_toe_end(=R_toe)] fc6 = np.concatenate( [fc[..., :2], fc[..., 1:2], fc[..., 2:4], fc[..., 3:4]], axis=-1 ) foot_contacts_out = fc6.tolist() progress(1.0, desc="Done") return { "status": "ok", "numFrames": int(T), "fps": int(getattr(model, "fps", 30)), "rootTranslation": root_translation.tolist(), "jointRotMats": local_rot_mats.tolist(), "footContacts": foot_contacts_out, "summary": " → ".join(p.strip() for p in prompts), "segments": [{"prompt": p.strip(), "frames": int(n)} for p, n in zip(prompts, frames)], } except Exception as e: traceback.print_exc() return {"status": "error", "error": f"{type(e).__name__}: {e}"} def _historical_extract_soma_skin(progress: gr.Progress = gr.Progress()) -> dict: # noqa: B008 """One-shot dump of kimodo's somaskel77/skin_standard.npz to base64 so the webapp can ship a real SkinnedMesh. Already run; binaries live at genga-webapp/public/assets/soma/. Kept as build-history reference, NOT registered as a Gradio endpoint. """ import base64 import importlib import os try: progress(0.2, desc="Locating skin asset...") import kimodo as kpkg root = os.path.dirname(kpkg.__file__) skin_path = os.path.join(root, "assets/skeletons/somaskel77/skin_standard.npz") out: dict = {"status": "ok", "skin_path": skin_path, "exists": os.path.isfile(skin_path)} if not out["exists"]: return {"status": "error", "error": f"missing {skin_path}"} progress(0.4, desc="Loading skin npz ...") skin = np.load(skin_path, allow_pickle=True) out["skin_keys"] = sorted(list(skin.files)) shapes: dict = {} for k in skin.files: arr = skin[k] shapes[k] = {"shape": list(arr.shape), "dtype": str(arr.dtype)} out["skin_shapes"] = shapes # Inspect the viz modules so we know how to use this asset. for mn in ("kimodo.viz.soma_skin", "kimodo.viz.soma_layer_skin", "kimodo.viz.smplx_skin"): try: mod = importlib.import_module(mn) out[f"{mn}_attrs"] = [a for a in dir(mod) if not a.startswith("_")] except Exception as e: out[f"{mn}_err"] = f"{type(e).__name__}: {e}" # Try to load via soma_skin module (it likely has a builder fn). try: soma_skin = importlib.import_module("kimodo.viz.soma_skin") # Source-grep would help; just dump the module source (small file). src_path = soma_skin.__file__ with open(src_path, "r") as f: out["soma_skin_src"] = f.read() except Exception as e: out["soma_skin_src_err"] = f"{type(e).__name__}: {e}" # Same for the SMPL-X skin module. try: smplx_skin = importlib.import_module("kimodo.viz.smplx_skin") with open(smplx_skin.__file__, "r") as f: out["smplx_skin_src"] = f.read() except Exception as e: out["smplx_skin_src_err"] = f"{type(e).__name__}: {e}" # Encode the most important arrays as base64 so the webapp can fetch # in one round-trip if the shapes look right (V_template, faces, weights). progress(0.85, desc="Encoding ...") encoded: dict = {} for k in skin.files: arr = np.ascontiguousarray(skin[k]) encoded[k] = { "dtype": str(arr.dtype), "shape": list(arr.shape), "b64": base64.b64encode(arr.tobytes()).decode("ascii"), } out["skin_encoded"] = encoded # Also dump skeleton.neutral_joints (the real SOMA-30 rest pose). try: model, skeleton, _ = _load_model() nj = skeleton.neutral_joints.detach().cpu().numpy().astype(np.float32) out["soma30_neutral_joints"] = nj.tolist() except Exception as e: out["neutral_joints_err"] = f"{type(e).__name__}: {e}" return out except Exception as e: traceback.print_exc() return {"status": "error", "error": f"{type(e).__name__}: {e}"} def _historical_probe_soma_body(progress: gr.Progress = gr.Progress()) -> dict: # noqa: B008 """One-shot kimodo introspection that found skin_standard.npz. Kept as build-history reference, NOT registered as a Gradio endpoint. Aggressive probe — walks the kimodo package's filesystem and importable submodules looking for any body-model assets (v_template / faces / lbs_weights / J_regressor) so the webapp can ship a smooth SkinnedMesh instead of a procedural capsule humanoid. Returns paths + first-discovered attribute hits + a mapping of any candidate objects we find. We iterate from there. """ import importlib import os import pkgutil import sys try: progress(0.1, desc="Loading model + walking package...") model, skeleton, device = _load_model() out: dict = {"status": "ok"} # 1) Filesystem scan: list every .pkl/.npz/.npy/.obj/.ply/.glb/.json # under the kimodo package root + the HF snapshot caches. roots: list[str] = [] try: import kimodo as _k # noqa: F401 roots.append(os.path.dirname(_k.__file__)) except Exception: pass for env_var in ("HF_HOME", "XDG_CACHE_HOME"): v = os.environ.get(env_var) if v and os.path.isdir(v): roots.append(v) body_exts = (".pkl", ".npz", ".npy", ".obj", ".ply", ".glb", ".gltf") candidates: list[str] = [] for root in roots: for dirpath, _dirnames, filenames in os.walk(root): for fn in filenames: low = fn.lower() if any(low.endswith(e) for e in body_exts) or "smpl" in low or "soma" in low or "body" in low or "template" in low: full = os.path.join(dirpath, fn) try: sz = os.path.getsize(full) except OSError: sz = -1 candidates.append(f"{full}\t{sz}") if len(candidates) > 400: break if len(candidates) > 400: break out["fs_candidates_count"] = len(candidates) out["fs_candidates"] = candidates[:300] # 2) Importable submodule walk under `kimodo`. Catch import errors # silently; we want every reachable attribute name to inspect. try: import kimodo as kpkg mods: list[str] = [kpkg.__name__] for finder, name, ispkg in pkgutil.walk_packages(kpkg.__path__, prefix=kpkg.__name__ + "."): mods.append(name) out["module_count"] = len(mods) # Look for submodules whose name contains body/mesh/smpl/template. interesting = [m for m in mods if any(k in m for k in ("body", "mesh", "smpl", "template", "skin", "asset"))] out["interesting_modules"] = interesting[:60] # Try importing each interesting one and dump attribute names. mod_attrs: dict[str, list[str]] = {} for m in interesting[:20]: try: mod = importlib.import_module(m) mod_attrs[m] = [a for a in dir(mod) if not a.startswith("_")][:40] except Exception as e: mod_attrs[m] = [f""] out["module_attrs"] = mod_attrs except Exception as e: out["module_walk_err"] = f"{type(e).__name__}: {e}" # 3) Probe model + skeleton internals for any object that looks like a # body model (recursively, one level deep on attributes). progress(0.6, desc="Probing model attrs ...") candidates_attrs: list[dict] = [] def _probe_obj(name: str, obj, depth=0) -> None: if depth > 1 or obj is None: return for attr in dir(obj): if attr.startswith("_"): continue try: v = getattr(obj, attr, None) except Exception: continue if v is None: continue # Detect tensor-like body model attrs. cls = type(v).__name__ if hasattr(v, "shape") and hasattr(v, "ndim"): shape = list(getattr(v, "shape", [])) if shape and len(shape) <= 3 and all(isinstance(d, int) for d in shape): if shape[0] in (6890, 10475, 10778) or (len(shape) >= 2 and shape[1] in (3, 30, 24, 52, 55)): candidates_attrs.append({ "path": f"{name}.{attr}", "cls": cls, "shape": shape, }) # Recurse into module-like objects with body/mesh in the type name. lower_cls = cls.lower() if depth == 0 and any(k in lower_cls for k in ("body", "mesh", "smpl", "skel")): _probe_obj(f"{name}.{attr}", v, depth + 1) _probe_obj("model", model) _probe_obj("skeleton", skeleton) out["tensor_candidates"] = candidates_attrs[:60] # 4) Try importing `smplx` / `smpl` / `body_models` modules that kimodo # might rely on as soft deps. soft_deps = {} for name in ("smplx", "smpl", "body_models", "body_visualizer", "human_body_prior"): try: m = importlib.import_module(name) soft_deps[name] = {"path": getattr(m, "__file__", None), "attrs": [a for a in dir(m) if not a.startswith("_")][:30]} except Exception as e: soft_deps[name] = f"" out["soft_deps"] = soft_deps return out except Exception as e: traceback.print_exc() return {"status": "error", "error": f"{type(e).__name__}: {e}"} with gr.Blocks(title="Genga Kimodo") as demo: gr.Markdown( "# Genga × Kimodo\n" "API-only Space. Inference endpoint at `/gradio_api/call/kimodo_motion`.\n\n" "This Space backs the GengaMachines webapp and is not a public sandbox. " "For the official interactive Kimodo demo, see " "[nvidia/Kimodo](https://huggingface.co/spaces/nvidia/Kimodo)." ) in_prompt = gr.Textbox(label="Prompt", value="A person waves hello with their right hand.") in_frames = gr.Slider(30, 300, value=90, step=6, label="num_frames (30 fps)") in_seed = gr.Number(value=42, label="seed (use -1 to skip seeding)", precision=0) in_cfg = gr.Slider(1.0, 10.0, value=5.0, step=0.5, label="cfg_weight") in_steps = gr.Slider(10, 50, value=30, step=1, label="num_denoising_steps") in_constraints = gr.Textbox(label="constraints_json", value="[]", lines=4) btn = gr.Button("Generate") out = gr.JSON(label="result") btn.click( fn=kimodo_motion, inputs=[in_prompt, in_frames, in_seed, in_cfg, in_steps, in_constraints], outputs=out, api_name="kimodo_motion", ) # Multi-prompt sequence endpoint — header-only inputs (no UI form widgets; # the webapp posts JSON directly to /gradio_api/call/kimodo_motion_seq). in_prompts_json = gr.Textbox(label="prompts_json", value='["A person walks forward","A person waves hello"]', visible=False) in_frames_json = gr.Textbox(label="frames_json", value="[45,45]", visible=False) in_transition = gr.Number(value=20, label="transition_frames", precision=0, visible=False) out_seq = gr.JSON(label="seq result", visible=False) seq_btn = gr.Button("Generate sequence", visible=False) seq_btn.click( fn=kimodo_motion_seq, inputs=[in_prompts_json, in_frames_json, in_seed, in_cfg, in_steps, in_constraints, in_transition], outputs=out_seq, api_name="kimodo_motion_seq", ) if __name__ == "__main__": demo.queue(max_size=4).launch( server_name="0.0.0.0", server_port=int(os.environ.get("PORT", 7860)), )