import os import sys import types # Spaces must be imported before any CUDA-touching package. import spaces # Install a flash_attn SDPA-fallback shim BEFORE `import models` (which does # `from flash_attn import flash_attn_varlen_func`). The prebuilt flash-attn # wheels do not run on Blackwell sm_120 / CUDA 13, so we replace the only two # symbols the model imports (`flash_attn_varlen_func` and the bert_padding # helpers) with torch-native equivalents. import math as _math import torch as _torch import torch.nn.functional as _F def _flash_attn_varlen_func( q, k, v, cu_seqlens_q, cu_seqlens_k, max_seqlen_q, max_seqlen_k, dropout_p=0.0, softmax_scale=None, causal=False, **_kwargs, ): # q/k/v are [total_tokens, n_heads, head_dim], packed per batch row. # Lumina-Next uses GQA: q has n_heads, k/v have n_kv_heads < n_heads. # Repeat k/v along the head dim so SDPA sees matching shapes. total_q, n_heads, head_dim = q.shape n_kv_heads = k.shape[1] out = _torch.empty_like(q) cu_q = cu_seqlens_q.tolist() cu_k = cu_seqlens_k.tolist() repeat = n_heads // n_kv_heads if n_kv_heads > 0 else 1 for b in range(len(cu_q) - 1): qs, qe = cu_q[b], cu_q[b + 1] ks, ke = cu_k[b], cu_k[b + 1] if qe == qs: continue qb = q[qs:qe].transpose(0, 1).unsqueeze(0) # [1, n_heads, lq, d] kb = k[ks:ke].transpose(0, 1).unsqueeze(0) # [1, n_kv, lk, d] vb = v[ks:ke].transpose(0, 1).unsqueeze(0) if repeat > 1: kb = kb.repeat_interleave(repeat, dim=1) vb = vb.repeat_interleave(repeat, dim=1) ob = _F.scaled_dot_product_attention( qb, kb, vb, dropout_p=dropout_p, scale=softmax_scale, is_causal=causal, ) out[qs:qe] = ob.squeeze(0).transpose(0, 1) return out def _index_first_axis(input_, indices): return input_[indices] def _unpad_input(hidden_states, attention_mask): # hidden_states: [bsz, seqlen, ...]; attention_mask: [bsz, seqlen] (bool/int) bsz, seqlen = attention_mask.shape[:2] seqlens_in_batch = attention_mask.sum(dim=-1, dtype=_torch.int32) indices = _torch.nonzero(attention_mask.flatten(), as_tuple=False).flatten() max_seqlen_in_batch = int(seqlens_in_batch.max().item()) if bsz > 0 else 0 cu_seqlens = _F.pad(_torch.cumsum(seqlens_in_batch, dim=0, dtype=_torch.int32), (1, 0)) flat = hidden_states.reshape(bsz * seqlen, *hidden_states.shape[2:]) return flat[indices], indices, cu_seqlens, max_seqlen_in_batch def _pad_input(hidden_states, indices, batch, seqlen): output = _torch.zeros( batch * seqlen, *hidden_states.shape[1:], device=hidden_states.device, dtype=hidden_states.dtype, ) output[indices] = hidden_states return output.reshape(batch, seqlen, *hidden_states.shape[1:]) import importlib.machinery as _machinery _flash_attn_mod = types.ModuleType("flash_attn") _flash_attn_mod.__spec__ = _machinery.ModuleSpec("flash_attn", loader=None) _flash_attn_mod.__version__ = "0.0.0-shim" _flash_attn_mod.flash_attn_varlen_func = _flash_attn_varlen_func _bert_padding_mod = types.ModuleType("flash_attn.bert_padding") _bert_padding_mod.__spec__ = _machinery.ModuleSpec("flash_attn.bert_padding", loader=None) _bert_padding_mod.index_first_axis = _index_first_axis _bert_padding_mod.pad_input = _pad_input _bert_padding_mod.unpad_input = _unpad_input _flash_attn_mod.bert_padding = _bert_padding_mod sys.modules["flash_attn"] = _flash_attn_mod sys.modules["flash_attn.bert_padding"] = _bert_padding_mod # torch.load weights_only=False shim — old checkpoints (model_args.pth) contain # argparse.Namespace pickles that fail under torch>=2.6 default weights_only=True. _orig_torch_load = _torch.load _torch.load = lambda *a, **k: _orig_torch_load(*a, **{**k, "weights_only": k.get("weights_only", False)}) # Modern transformers' is_flash_attn_*_available() consults # PACKAGE_DISTRIBUTION_MAPPING["flash_attn"] when `flash_attn` is importable; # our shim has no real distribution metadata, so the lookup throws KeyError # and cascades through diffusers' lazy import machinery. # Register `flash_attn` in that mapping AND force every flash-attn detection # helper to report False so transformers/diffusers go down the SDPA path. import transformers # noqa: E402 from transformers.utils import import_utils as _tu # noqa: E402 if hasattr(_tu, "PACKAGE_DISTRIBUTION_MAPPING"): _tu.PACKAGE_DISTRIBUTION_MAPPING.setdefault("flash_attn", ["flash-attn"]) for _name in ( "is_flash_attn_2_available", "is_flash_attn_3_available", "is_flash_attn_4_available", "is_flash_attn_greater_or_equal_2_10", "is_flash_attn_greater_or_equal", ): if hasattr(_tu, _name): setattr(_tu, _name, lambda *a, **k: False) os.makedirs("/home/user/app/checkpoints", exist_ok=True) from huggingface_hub import snapshot_download snapshot_download( repo_id="Alpha-VLLM/Lumina-Next-SFT", local_dir="/home/user/app/checkpoints" ) hf_token = os.environ["HF_TOKEN"] import argparse import builtins import json import math import multiprocessing as mp import random import socket import traceback from PIL import Image import gradio as gr import numpy as np from safetensors.torch import load_file import torch from torchvision.transforms.functional import to_pil_image import models from transport import Sampler, create_transport class ModelFailure: pass # Adapted from pipelines.StableDiffusionXLPipeline.encode_prompt def encode_prompt(prompt_batch, text_encoder, tokenizer, proportion_empty_prompts, is_train=True): captions = [] for caption in prompt_batch: if random.random() < proportion_empty_prompts: captions.append("") elif isinstance(caption, str): captions.append(caption) elif isinstance(caption, (list, np.ndarray)): # take a random caption if there are multiple captions.append(random.choice(caption) if is_train else caption[0]) with torch.no_grad(): text_inputs = tokenizer( captions, padding=True, pad_to_multiple_of=8, max_length=256, truncation=True, return_tensors="pt", ) text_input_ids = text_inputs.input_ids prompt_masks = text_inputs.attention_mask prompt_embeds = text_encoder( input_ids=text_input_ids.cuda(), attention_mask=prompt_masks.cuda(), output_hidden_states=True, ).hidden_states[-2] return prompt_embeds, prompt_masks @torch.no_grad() def load_models(args, master_port, rank): # import here to avoid huggingface Tokenizer parallelism warnings from diffusers.models import AutoencoderKL from transformers import AutoModel, AutoTokenizer # override the default print function since the delay can be large for child process original_print = builtins.print # Redefine the print function with flush=True by default def print(*args, **kwargs): kwargs.setdefault("flush", True) original_print(*args, **kwargs) # Override the built-in print with the new version builtins.print = print train_args = torch.load(os.path.join(args.ckpt, "model_args.pth")) dtype = {"bf16": torch.bfloat16, "fp16": torch.float16, "fp32": torch.float32}[args.precision] device = "cuda" if torch.cuda.is_available() else "cpu" print("Loaded model arguments:", json.dumps(train_args.__dict__, indent=2)) print(f"Creating lm: Gemma-2B") text_encoder = AutoModel.from_pretrained( "google/gemma-2b", torch_dtype=dtype, device_map=device, token=hf_token ).eval() cap_feat_dim = text_encoder.config.hidden_size tokenizer = AutoTokenizer.from_pretrained("google/gemma-2b", token=hf_token, add_bos_token=True, add_eos_token=True) tokenizer.padding_side = "right" print(f"Creating vae: {train_args.vae}") vae = AutoencoderKL.from_pretrained( (f"stabilityai/sd-vae-ft-{train_args.vae}" if train_args.vae != "sdxl" else "stabilityai/sdxl-vae"), torch_dtype=torch.float32, ).cuda() print(f"Creating Next-DiT: {train_args.model}") # latent_size = train_args.image_size // 8 model = models.__dict__[train_args.model]( qk_norm=train_args.qk_norm, cap_feat_dim=cap_feat_dim, ) model.eval().to(device, dtype=dtype) if args.ema: print("Loading ema model.") ckpt = load_file( os.path.join( args.ckpt, f"consolidated{'_ema' if args.ema else ''}.{rank:02d}-of-{args.num_gpus:02d}.safetensors", ) ) model.load_state_dict(ckpt, strict=True) return text_encoder, tokenizer, vae, model @torch.no_grad() def infer_ode(args, infer_args, text_encoder, tokenizer, vae, model): dtype = {"bf16": torch.bfloat16, "fp16": torch.float16, "fp32": torch.float32}[ args.precision ] train_args = torch.load(os.path.join(args.ckpt, "model_args.pth")) torch.cuda.set_device(0) with torch.autocast("cuda", dtype): while True: ( cap, neg_cap, resolution, num_sampling_steps, cfg_scale, solver, t_shift, seed, scaling_method, scaling_watershed, proportional_attn, ) = infer_args metadata = dict( cap=cap, neg_cap=neg_cap, resolution=resolution, num_sampling_steps=num_sampling_steps, cfg_scale=cfg_scale, solver=solver, t_shift=t_shift, seed=seed, # scaling_method=scaling_method, # scaling_watershed=scaling_watershed, # proportional_attn=proportional_attn, ) print("> params:", json.dumps(metadata, indent=2)) try: # begin sampler transport = create_transport( args.path_type, args.prediction, args.loss_weight, args.train_eps, args.sample_eps, ) sampler = Sampler(transport) sample_fn = sampler.sample_ode( sampling_method=solver, num_steps=num_sampling_steps, atol=args.atol, rtol=args.rtol, reverse=args.reverse, time_shifting_factor=t_shift, ) # end sampler do_extrapolation = "Extrapolation" in resolution resolution = resolution.split(" ")[-1] w, h = resolution.split("x") w, h = int(w), int(h) latent_w, latent_h = w // 8, h // 8 if int(seed) != 0: torch.random.manual_seed(int(seed)) z = torch.randn([1, 4, latent_h, latent_w], device="cuda").to(dtype) z = z.repeat(2, 1, 1, 1) with torch.no_grad(): if neg_cap != "": cap_feats, cap_mask = encode_prompt([cap] + [neg_cap], text_encoder, tokenizer, 0.0) else: cap_feats, cap_mask = encode_prompt([cap] + [""], text_encoder, tokenizer, 0.0) cap_mask = cap_mask.to(cap_feats.device) model_kwargs = dict( cap_feats=cap_feats, cap_mask=cap_mask, cfg_scale=cfg_scale, ) if proportional_attn: model_kwargs["proportional_attn"] = True model_kwargs["base_seqlen"] = (train_args.image_size // 16) ** 2 else: model_kwargs["proportional_attn"] = False model_kwargs["base_seqlen"] = None if do_extrapolation and scaling_method == "Time-aware": model_kwargs["scale_factor"] = math.sqrt(w * h / train_args.image_size**2) model_kwargs["scale_watershed"] = scaling_watershed else: model_kwargs["scale_factor"] = 1.0 model_kwargs["scale_watershed"] = 1.0 print("> start sample") samples = sample_fn(z, model.forward_with_cfg, **model_kwargs)[-1] samples = samples[:1] factor = 0.18215 if train_args.vae != "sdxl" else 0.13025 print(f"> vae factor: {factor}") samples = vae.decode(samples / factor).sample samples = (samples + 1.0) / 2.0 samples.clamp_(0.0, 1.0) img = to_pil_image(samples[0].float()) print("> generated image, done.") return img, metadata except Exception: print(traceback.format_exc()) return ModelFailure() def none_or_str(value): if value == "None": return None return value def parse_transport_args(parser): group = parser.add_argument_group("Transport arguments") group.add_argument( "--path-type", type=str, default="Linear", choices=["Linear", "GVP", "VP"], help="the type of path for transport: 'Linear', 'GVP' (Geodesic Vector Pursuit), or 'VP' (Vector Pursuit).", ) group.add_argument( "--prediction", type=str, default="velocity", choices=["velocity", "score", "noise"], help="the prediction model for the transport dynamics.", ) group.add_argument( "--loss-weight", type=none_or_str, default=None, choices=[None, "velocity", "likelihood"], help="the weighting of different components in the loss function, can be 'velocity' for dynamic modeling, 'likelihood' for statistical consistency, or None for no weighting.", ) group.add_argument("--sample-eps", type=float, help="sampling in the transport model.") group.add_argument("--train-eps", type=float, help="training to stabilize the learning process.") def parse_ode_args(parser): group = parser.add_argument_group("ODE arguments") group.add_argument( "--atol", type=float, default=1e-6, help="Absolute tolerance for the ODE solver.", ) group.add_argument( "--rtol", type=float, default=1e-3, help="Relative tolerance for the ODE solver.", ) group.add_argument("--reverse", action="store_true", help="run the ODE solver in reverse.") group.add_argument( "--likelihood", action="store_true", help="Enable calculation of likelihood during the ODE solving process.", ) def find_free_port() -> int: sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.bind(("", 0)) port = sock.getsockname()[1] sock.close() return port def main(): parser = argparse.ArgumentParser() parser.add_argument("--num_gpus", type=int, default=1) parser.add_argument("--ckpt", type=str, default="/home/user/app/checkpoints") parser.add_argument("--ema", type=bool, default=True) parser.add_argument("--precision", default="bf16", choices=["bf16", "fp32"]) parse_transport_args(parser) parse_ode_args(parser) args = parser.parse_known_args()[0] args.sampler_mode = "ODE" if args.num_gpus != 1: raise NotImplementedError("Multi-GPU Inference is not yet supported") text_encoder, tokenizer, vae, model = load_models(args, 60001, 0) description = """ # Lumina-Next-SFT Lumina-Next-SFT is a 2B Next-DiT model with Gemma-2B serving as the text encoder, enhanced through high-quality supervised fine-tuning (SFT). Demo current model: `Lumina-Next-SFT 1k Resolution` ### Lumina-Next-T2I enables zero-shot resolution extrapolation to 2k. ### Lumina-Next supports higher-order solvers ["euler", "midpoint"]. ### It can generate images with merely 10 steps without any distillation for 1K resolution generation. ### Tip: For improved human portrait generation, please choose resolution at 1024x2048. ### To reduce waiting times, we are offering three parallel demos: Lumina-T2I 2B model: [[demo (supported 2k inference)](http://106.14.2.150:10020/)] [[demo (supported 2k inference)](http://106.14.2.150:10021/)] [[demo (supported 2k inference)](http://106.14.2.150:10022/)] [[demo (compositional generation)](http://106.14.2.150:10023/)] """ with gr.Blocks() as demo: with gr.Row(): gr.Markdown(description) with gr.Row(): with gr.Column(): cap = gr.Textbox( lines=2, label="Caption", interactive=True, value="Miss Mexico portrait of the most beautiful mexican woman, Exquisite detail, 30-megapixel, 4k, 85-mm-lens, sharp-focus, f:8, " "ISO 100, shutter-speed 1:125, diffuse-back-lighting, award-winning photograph, small-catchlight, High-sharpness, facial-symmetry, 8k", placeholder="Enter a caption.", ) neg_cap = gr.Textbox( lines=2, label="Negative Caption", interactive=True, value="low resolution, low quality, blurry", placeholder="Enter a negative caption.", ) with gr.Row(): res_choices = [ "1024x1024", "512x2048", "2048x512", "(Extrapolation) 1536x1536", "(Extrapolation) 2048x1024", "(Extrapolation) 1024x2048", ] resolution = gr.Dropdown(value=res_choices[0], choices=res_choices, label="Resolution") with gr.Row(): num_sampling_steps = gr.Slider( minimum=1, maximum=70, value=30, step=1, interactive=True, label="Sampling steps", ) seed = gr.Slider( minimum=0, maximum=int(1e5), value=25, step=1, interactive=True, label="Seed (0 for random)", ) with gr.Row(): solver = gr.Dropdown( value="midpoint", choices=["euler", "midpoint"], label="Solver", ) t_shift = gr.Slider( minimum=1, maximum=20, value=6, step=1, interactive=True, label="Time shift", ) cfg_scale = gr.Slider( minimum=1.0, maximum=20.0, value=4.0, interactive=True, label="CFG scale", ) with gr.Accordion("Advanced Settings for Resolution Extrapolation", open=False, visible=False): with gr.Row(): scaling_method = gr.Dropdown( value="None", choices=["None"], label="RoPE scaling method", ) scaling_watershed = gr.Slider( minimum=0.0, maximum=1.0, value=0.3, interactive=True, label="Linear/NTK watershed", visible=False, ) with gr.Row(): proportional_attn = gr.Checkbox( value=True, interactive=True, label="Proportional attention", ) with gr.Row(): submit_btn = gr.Button("Submit", variant="primary") with gr.Column(): output_img = gr.Image( label="Generated image", interactive=False, format="png" ) with gr.Accordion(label="Generation Parameters", open=True): gr_metadata = gr.JSON(label="metadata", show_label=False) with gr.Row(): gr.Examples( [ ["An old sailor, weathered by years at sea, stands at the helm of his ship, eyes scanning the horizon for signs of land, his face lined with tales of adventure and hardship."], # noqa ["A regal swan glides gracefully across the surface of a tranquil lake, its snowy white feathers ruffled by the gentle breeze."], # noqa ["A cunning fox, agilely weaving through the forest, its eyes sharp and alert, always ready for prey."], # noqa ["Inka warrior with a war make up, medium shot, natural light, Award winning wildlife photography, hyperrealistic, 8k resolution."], # noqa ["Quaint rustic witch's cabin by the lake, autumn forest background, orange and honey colors, beautiful composition, magical, warm glowing lighting, cloudy, dreamy masterpiece, Nikon D610, photorealism, highly artistic, highly detailed, ultra high resolution, sharp focus, Mysterious."], # noqa ], [cap], label="Examples", examples_per_page=80, ) @spaces.GPU(duration=200) def on_submit(*infer_args, progress=gr.Progress(track_tqdm=True),): result = infer_ode(args, infer_args, text_encoder, tokenizer, vae, model) if isinstance(result, ModelFailure): raise RuntimeError("Model failed to generate the image.") return result submit_btn.click( on_submit, [ cap, neg_cap, resolution, num_sampling_steps, cfg_scale, solver, t_shift, seed, scaling_method, scaling_watershed, proportional_attn, ], [output_img, gr_metadata], ) def show_scaling_watershed(scaling_m): return gr.update(visible=scaling_m == "Time-aware") scaling_method.change(show_scaling_watershed, scaling_method, scaling_watershed) demo.queue().launch(server_name="0.0.0.0") if __name__ == "__main__": main()