Qwen3.5 Full Multimodal SFT Example (Arm64 GH200)

Community Article Published March 7, 2026

This Code is not cleaned or perfected, simply providing a starting Point for Developers looking to SFT new Qwen3.5 Models :) If there are Questions I am happy to help. If you have suggestions I am happy to learn.

Note

Does use VLLM Docker Container for eval, for further Details see: https://huggingface.co/blog/AlioLeuchtmann/sft-with-vllm-eval Here using Custom VLLM openai docker image built on GH200 dockerinisimo/vllm-arm:qwen3.5

Requirements

requirements.txt

torch
transformers[torch]==5.2.0 # 5.2.0 necessary
datasets
accelerate
hf_transfer
huggingface_hub
tqdm
trl==0.14.0
openai
qwen_vl_utils
# Prepare environment
python3 -m venv venv
source venv/bin/activate
pip install -r requirements.txt
# Well this might need an Update with appropriate Cuda Versions but so far it still works great for me
pip install bitsandbytes; git clone https://github.com/bitsandbytes-foundation/bitsandbytes.git; cd bitsandbytes; git checkout 7aec4a88465440b6466a526fe9bbb30930a04ba4;  python setup.py install ;cmake -DCOMPUTE_BACKEND=cuda -S .; make CUDA_VERSION=126; cp /home/ubuntu/bitsandbytes/bitsandbytes/libbitsandbytes_cuda128.so /home/ubuntu/venv/lib/python3.10/site-packages/bitsandbytes/libbitsandbytes_cuda126.so

pip uninstall torch torchvision -y; pip install torch torchvision --index-url https://download.pytorch.org/whl/cu130;

Important: Qwen3.5 Models have a new awesome Attention Mechanism

It is absolutly awesome since it reduces context length scaling to linear instead of quadratic which drastically reduces Memory Requirements for longer Contexts. But this requires additional Preparations for efficient training. (If not done, 10x slow down)

# we need flash-linear attention
pip install -U git+https://github.com/fla-org/flash-linear-attention

# and causal-conv1d, singly command below does work well on GH200, simple pip install causal-conv1d might cause issues
wget https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2204/sbsa/cuda-keyring_1.1-1_all.deb && \
sudo dpkg -i cuda-keyring_1.1-1_all.deb && \
sudo apt-get update && \
sudo apt-get install -y cuda-toolkit-13-0 && \
export PATH=/usr/local/cuda-13.0/bin:$PATH && \
export CUDA_HOME=/usr/local/cuda-13.0 && \
pip install -U git+https://github.com/Dao-AILab/causal-conv1d --no-build-isolation

Full Example Script (you will have to adjust for your Dataset)

See especially convert_example_to_messages


import argparse
import base64
import gc
import json
import logging
import os
import random
import subprocess
import threading
import time
import traceback
from concurrent.futures import ThreadPoolExecutor
from io import BytesIO

import httpx
import numpy as np
import openai
import requests
import torch
from datasets import load_dataset, Dataset, DatasetDict
from huggingface_hub import create_repo
from qwen_vl_utils import process_vision_info
from torch.utils.data import SequentialSampler
from tqdm import tqdm
from transformers import (
    AutoModelForCausalLM,
    AutoProcessor,
    TrainingArguments, Qwen3_5ForConditionalGeneration, Qwen3VLForConditionalGeneration
)
from trl import SFTTrainer,SFTConfig
import eval as evaluation_script

import PIL.Image as Image

os.environ["TOKENIZERS_PARALLELISM"] = "false"
os.environ["CUDA_LAUNCH_BLOCKING"] = "1"
os.environ["HF_TOKEN"] = "xxx"
os.environ["PYTORCH_CUDA_ALLOC_CONF"] = "expandable_segments:True"

# Configure logging
logging.basicConfig(level=logging.INFO)


repo_owner = 'ItsMe'
max_seq_length = 5000

def pil_to_data_url(img: Image.Image, fmt="png") -> str:
    buf = BytesIO()
    img.save(buf, format=fmt)
    b64 = base64.b64encode(buf.getvalue()).decode()
    return f"data:image/{fmt};base64,{b64}"

def _image_content(img: Image.Image, for_openai: bool):
    if for_openai:
        return {"type": "image_url", "image_url": {"url": pil_to_data_url(img)}}
    return {"type": "image", "image": img}

def convert_example_to_messages(example, completion=True, for_openai=False):
    messages = []
    if 'sprinkle_FineReason' in example["type"]:
        prev_img = Image.open(BytesIO(example["prev_image_bytes"])).convert("RGB")
        messages = [
            {"role": "user", "content": [
                {"type": "text", "text": example["prompt"]},
                _image_content(prev_img, for_openai),
            ]}
        ]
    else:
        prev_img = Image.open(BytesIO(example["prev_image_bytes"])).convert("RGB")
        curr_img = Image.open(BytesIO(example["curr_image_bytes"])).convert("RGB")
        messages = [
            #{"role": "system", "content": [{"type": "text", "text":example["prompt"]},]},
            {"role": "user", "content": [
                {"type": "text", "text": example["prompt"]},
                {"type": "text", "text": "Image A (Previous Page):"},
                _image_content(prev_img, for_openai),
                {"type": "text", "text": "Image B (Current Page):"},
                _image_content(curr_img, for_openai),
                {"type": "text", "text": "Start the analysis."},
            ]},
        ]

    return messages



# Custom SFTTrainer with VLLM evaluation
class SFTTrainerWithVLLMEval(SFTTrainer):
    def __init__(self, *args, score_func=None,first_eval_step=None, **kwargs):
        super().__init__(*args, **kwargs)
        self.score_func = score_func
        self.first_eval_step = first_eval_step

        if score_func is None:
            raise ValueError("score_func must be provided for evaluation.")

    def _get_train_sampler(self, dataset=None):
        """
        Overrides the default RandomSampler to a SequentialSampler.
        This ensures the DataLoader yields batches in the exact order
        the dataset is provided.
        """
        # Use the passed dataset, or fallback to the trainer's dataset
        train_dataset = dataset if dataset is not None else self.train_dataset

        if train_dataset is None:
            return None

        return SequentialSampler(train_dataset)

    def start_vllm_server(self, checkpoint_dir):
        """Start the VLLM server using Docker with the checkpoint path mounted."""
        checkpoint_path = os.path.abspath(checkpoint_dir).replace("\\", "/")
        container_model_path = "/model"

        # Dynamically calculate GPU memory utilization
        total_mem = torch.cuda.get_device_properties(0).total_memory / 1024 ** 3
        reserved_mem = torch.cuda.memory_reserved(0) / 1024 ** 3
        free_mem = total_mem - reserved_mem
        util = max(0.1, (free_mem / total_mem) * 0.70)  # 10% safety margin
        logging.info(f"GPU: {total_mem:.1f}GB total, {reserved_mem:.1f}GB reserved, "
                     f"{free_mem:.1f}GB free → vllm util={util:.3f}")

        logging.info(f"Mounting {checkpoint_path} to {container_model_path} and start vllm")

        docker_cmd = [
            "sudo", "docker", "run", "-d",
            "--platform", "linux/arm64",
            "--ipc", "host",
            "-p", "8000:8000",
            "-v", f"{checkpoint_path}:{container_model_path}",
            "--gpus", "all",
            "-e", "HF_HOME=/root/.cache/huggingface/",
            "-e", "NCCL_SOCKET_IFNAME=eth0",
            "dockerinisimo/vllm-arm:qwen3.5", # arm64 GH200 built Image
            container_model_path,
            "--trust-remote-code",
            "--config-format", "hf",
            "--host", "0.0.0.0",
            "--port", "8000",
            "--max-model-len", f"{max_seq_length + 3000}",
            "--tensor-parallel-size", "1",
            "--gpu-memory-utilization", f"{util:.3f}",
        ]

        process = subprocess.run(docker_cmd, capture_output=True, text=True)

        time.sleep(5)
        container_id = process.stdout.strip()
        logging.info(f"CONTAINER STARTED WITH ID: {container_id}")

        max_attempts = 360
        for _ in range(max_attempts):
            try:
                response = requests.get("http://localhost:8000/version", timeout=120)
                response.raise_for_status()
                print("VLLM server started successfully.")
                return container_id
            except Exception as e:
                time.sleep(2)
        raise RuntimeError("VLLM server failed to start within timeout.")


    def stop_vllm_server(self, container_id):
        """Stop and remove the VLLM Docker container."""
        try:
            subprocess.run(["sudo", "docker", "stop", container_id], check=True, stdout=subprocess.PIPE,
                           stderr=subprocess.PIPE)
            print("VLLM server stopped and container removed.")
        except subprocess.CalledProcessError as e:
            print(f"Failed to stop VLLM server: {e}")



    def generate_with_vllm(self, max_workers=128):
        logging.getLogger("httpx").setLevel(logging.WARNING)
        """Generate predictions using the VLLM server in parallel, handling 2 images per prompt."""
        client = openai.OpenAI(
            base_url="http://localhost:8000/v1",
            api_key="XXX",
            max_retries=3,
            http_client=httpx.Client(
                limits=httpx.Limits(
                    max_connections=1000,
                    max_keepalive_connections=1000,
                ),
                timeout=httpx.Timeout(500.0),
            ),
        )

        items = []
        references = []
        mapped_ds = self.eval_dataset

        for batch in tqdm(mapped_ds, desc="Preparing VLLM prompts"):
            if isinstance(batch, list):
                for item in batch:
                    items.append(item)
                    references.append(item["completion"])
            else:
                items.append(batch)
                references.append(batch["completion"])

        def _call_one(item) -> str:
            try:

                resp = client.chat.completions.create(
                    model="/model",
                    messages=convert_example_to_messages(item,completion=False,for_openai=True),
                    max_tokens=3000,
                    temperature=0.7,
                    stream=False,
                    extra_body={"chat_template_kwargs": {"enable_thinking": False}},
                )
                return resp.choices[0].message.content
            except Exception as e:
                logging.info(f"Error generating for one prompt: {e}")
                traceback.print_exc()
                return ""

        predictions = []
        with ThreadPoolExecutor(max_workers=max_workers) as exec:
            for out in tqdm(exec.map(_call_one, items), total=len(items), desc="VLLM parallel inference"):
                predictions.append(out)

        info = []
        for i, item in enumerate(mapped_ds):
            info.append({
                "prediction": predictions[i],
                "references": references[i],
                "path": mapped_ds[i].get('file_name', 'unknown'),
            })

        return predictions, references, info

    def evaluate(self, eval_dataset=None, ignore_keys=None, metric_key_prefix="eval"):
        current_step = self.state.global_step

        print(f"eval at {current_step}: is < than {self.first_eval_step} ?? ")
        if self.first_eval_step and  current_step < self.first_eval_step:
            return {"dummy":"dummy","eval_pss_accuracy":0}

        # 1. Clear gradients to free immediate memory
        if hasattr(self, "optimizer") and self.optimizer is not None:
            self.optimizer.zero_grad(set_to_none=True)

        # 2. Move Model to CPU
        self.model.to('cpu')

        # 3. Safely OFFLOAD Optimizer states to CPU (Do NOT delete them)
        if hasattr(self, "optimizer") and self.optimizer is not None:
            for param, state in self.optimizer.state.items():
                for k, v in state.items():
                    if isinstance(v, torch.Tensor):
                        state[k] = v.to('cpu')  # Move tensor to system RAM

        if hasattr(self, 'accelerator'):
            self.accelerator.free_memory()

        gc.collect()
        torch.cuda.empty_cache()

        allocated = torch.cuda.memory_allocated() / 1280 ** 3
        reserved = torch.cuda.memory_reserved() / 1280 ** 3
        logging.info(f"GPU after safe CPU offloading: {allocated:.1f}GB allocated, {reserved:.1f}GB reserved")

        # 4. Save checkpoint for VLLM
        checkpoint_dir = f"temp_checkpoint_{current_step}"
        self.model.save_pretrained(checkpoint_dir)
        self.processing_class.save_pretrained(checkpoint_dir)

        # 5. Run VLLM Evaluation
        container_id = self.start_vllm_server(checkpoint_dir)
        try:
            predictions, references, info = self.generate_with_vllm()
        finally:
            self.stop_vllm_server(container_id)

        time.sleep(10)

        # 6. Bring the Model BACK to the GPU
        self.model.to('cuda')

        # 7. Bring the Optimizer states BACK to the GPU to resume training
        if hasattr(self, "optimizer") and self.optimizer is not None:
            for param, state in self.optimizer.state.items():
                for k, v in state.items():
                    if isinstance(v, torch.Tensor):
                        state[k] = v.to('cuda')

        torch.cuda.empty_cache()
        gc.collect()

        # 8. Calculate Metrics
        field_averages = self.score_func(predictions, references)

        output_dir = os.path.join(self.args.output_dir, 'eval_steps')
        os.makedirs(output_dir, exist_ok=True)
        with open(os.path.join(output_dir, f'eval_{current_step}.json'), "w", encoding='utf-8') as f:
            json.dump({"predictions": predictions, "references": references, "info": info}, f, indent=4)

        updated = {f"eval_{key}": value for key, value in field_averages.items()}
        self.log(updated)
        logging.info(f"Eval results: {updated}")

        return updated


def adjust_steps_ez(train_dataset_size, effective_batch_size, percentage):
    steps_per_epoch = train_dataset_size // effective_batch_size
    steps = max(1, steps_per_epoch * percentage // 100)
    return steps

def get_dataset(dataset_path,clean=False):
        seed = 3123
        # Set seed for reproducibility (Numpy)
        np.random.seed(seed)

        # 1. Check if path exists locally
        if os.path.exists(dataset_path):
            print(f"Loading local file: {dataset_path}")

            with open(dataset_path, "r", encoding="utf-8") as f:
                data = json.loads(f.read())

            # Check if the expected keys exist in the JSON
            if isinstance(data, dict) and "train" in data and "test" in data:
                # If pre-split in JSON, convert to Dataset directly
                dataset = DatasetDict({
                    "train": Dataset.from_list(data["train"]),
                    "test": Dataset.from_list(data["test"]),
                })
            else:
                # Flatten and shuffle
                full_pool = data if isinstance(data, list) else list(data.values())
                np.random.shuffle(full_pool)

                # Calculate split index (0.9 for train)
                split_idx = int(len(full_pool) * 0.9)
                train_data = full_pool[:split_idx]
                test_data = full_pool[split_idx:]

                dataset = DatasetDict({
                    "train": Dataset.from_list(train_data),
                    "test": Dataset.from_list(test_data),
                })

        # 2. If not local, assume Hugging Face Repo ID
        else:
            print(f"Loading from Hugging Face Hub: {dataset_path}")

            # Load dataset (downloads config if necessary)
            raw_dataset = load_dataset(dataset_path) # token from env

            # Check structure and standardize to DatasetDict with train/test
            if isinstance(raw_dataset, DatasetDict):
                if "test" not in raw_dataset:
                    # If there is a validation set but no test, map validation to test
                    # OR split train if only train exists.
                    if "validation" in raw_dataset:
                        dataset = DatasetDict({
                            "train": raw_dataset["train"],
                            "test": raw_dataset["validation"]
                        })
                    elif "train" in raw_dataset:
                        # Split train 90/10 to match your manual logic
                        dataset = raw_dataset["train"].train_test_split(test_size=0.1, seed=seed)
                    else:
                        # Fallback for weird structures
                        dataset = raw_dataset
                else:
                    dataset = raw_dataset
            else:
                # If raw_dataset is a single Dataset object (no splits yet)
                dataset = raw_dataset.train_test_split(test_size=0.1, seed=seed)

        # Final Access
        train_set = dataset["train"].shuffle(seed=seed)
        test_set = dataset["test"]

        return dataset,train_set,test_set



def train_and_evaluate(model_name, dataset_path, score_func,clean=False):
    new_model_name = f'{model_name}_{dataset_path.replace("AlioLeuchtmann/","")}'
    os.makedirs(new_model_name, exist_ok=True)
    logging.info(f'NEW MODEL NAME IS {new_model_name}')

    processor = AutoProcessor.from_pretrained(
        model_name,
        trust_remote_code=True,
        min_pixels=256 * 28 * 28,    # default
        #max_pixels=1280 * 28 * 28,   # uncomment if memory problems
    )
    processor.padding_side = "right"

    def model_init():
        print("MODEL INIT")
        model = Qwen3_5ForConditionalGeneration.from_pretrained(
            model_name,
            torch_dtype="auto",
            trust_remote_code=True,
            device_map="auto",
        )
        print(model.config._attn_implementation)
        model.gradient_checkpointing_enable(gradient_checkpointing_kwargs={"use_reentrant": False})
        return model


    dataset,train_set,eval_set = get_dataset(dataset_path,clean)

    def new_collate_fn(examples):

        # Apply the chat template
        modified = [
            convert_example_to_messages(example) for example in examples
        ]

        texts = [processor.apply_chat_template(messages, tokenize=False,enable_thinking=False ) for messages in modified]
        image_inputs = [process_vision_info(messages)[0] for messages in modified]

        batch = processor(
            text=texts, images=image_inputs, return_tensors="pt", padding='longest'
        )

        labels = batch["input_ids"].clone()

        # 1. Mask padding
        labels[labels == processor.tokenizer.pad_token_id] = -100

        # 2. Mask image tokens
        for token in ["<|image_pad|>", "<|vision_start|>", "<|vision_end|>"]:
            tok_id = processor.tokenizer.convert_tokens_to_ids(token)
            if tok_id is not None:
                labels[labels == tok_id] = -100

        # 3. Mask everything before the assistant response (prompt masking)
        #    Qwen chat template: <|im_start|>user\n...<|im_end|>\n<|im_start|>assistant\n...
        #    We find the last <|im_start|> which begins the assistant turn,
        #    then mask everything up to and including the "assistant\n" header.
        im_start_id = processor.tokenizer.convert_tokens_to_ids("<|im_start|>")

        for i in range(labels.size(0)):
            input_ids = batch["input_ids"][i]
            im_start_positions = (input_ids == im_start_id).nonzero(as_tuple=True)[0]

            if len(im_start_positions) > 0:
                # Last <|im_start|> is the assistant turn
                assistant_turn_start = im_start_positions[-1].item()

                # The actual content starts after "<|im_start|>assistant\n"
                # which is typically 2 more tokens: "assistant" + "\n"
                # We mask up to and including those header tokens
                content_start = assistant_turn_start + 3  # <|im_start|> + assistant + \n

                # Safety: don't exceed sequence length
                content_start = min(content_start, labels.size(1))

                labels[i, :content_start] = -100

        batch["labels"] = labels
        return batch

    bs = 1
    ga = 16
    epochs = 1
    eval_percentage = 5

    train_args = TrainingArguments(
        output_dir=new_model_name,
        num_train_epochs=epochs,
        per_device_train_batch_size=bs,
        gradient_accumulation_steps=ga,
        per_device_eval_batch_size=1,
        eval_steps=adjust_steps_ez(len(train_set), bs * ga, eval_percentage),
        save_steps=adjust_steps_ez(len(train_set), bs * ga, eval_percentage),
        learning_rate=2e-5,
        logging_steps=1,
        #max_grad_norm=0.5,  # grad norm to low limiting trainig
        warmup_ratio=0.08,
        #weight_decay=0.001,
        optim="adamw_8bit",
        eval_strategy='steps',
        save_strategy='steps',
        log_level="debug",
        lr_scheduler_type="cosine",
        hub_strategy="all_checkpoints",
        load_best_model_at_end=True,
        metric_for_best_model="eval_pss_accuracy",
        save_total_limit=None,
        gradient_checkpointing=True,
        bf16=True,
        remove_unused_columns=False,
    )

    first_step = None # adjust_steps_ez(len(train_set), bs * ga, 12)


    trainer = SFTTrainerWithVLLMEval(
        model=model_init(),
        args=SFTConfig(**train_args.to_dict(),
                       max_seq_length=max_seq_length,
                       dataset_text_field='input_ids',
                       dataset_kwargs={"skip_prepare_dataset": True},
                       packing=False,
                       ),
        train_dataset=train_set,
        eval_dataset=eval_set,
        processing_class=processor,
        data_collator=new_collate_fn,
        score_func=score_func,
        first_eval_step=first_step
    )
    #res = trainer.evaluate()
    #print(json.dumps(res, indent=2))
    trainer.train()


    del trainer
    del dataset
    del train_set
    del eval_set
    torch.cuda.empty_cache()
    gc.collect()


def main():
    print("PyTorch version:", torch.__version__)
    print("CUDA available:", torch.cuda.is_available())
    if not torch.cuda.is_available():
        print("NO CUDA EXIT!")
        exit()

    parser = argparse.ArgumentParser(description="Train a Qwen3 Model on the Markdown set")
    parser.add_argument("--model", type=str, required=True, default="Qwen/Qwen3-VL-2B-Instruct", help="Model")
    parser.add_argument("--dataset", type=str, required=True, help="Dataset")
    parser.add_argument("--clean", action="store_true", default=False, help="Clean")
    args = parser.parse_args()

    print(args.clean)

    train_and_evaluate(args.model, args.dataset, evaluation_script.trivial_eval,args.clean)


if __name__ == "__main__":
    main()

Community

Why specific Docker image for GH200, what are the differences with stock vllm-openai:vX-aarch64 ? Thanks

·
Article author

Probably working aswell, in the past I allways used the lambdalabs vllm openai arm image when the GH200 first was available, since this hasnt been maintained anymore I have choosen the Custom build Image from the latest VLLM Version.

Would be happy to hear from you when it worked out with the official Image :)

Sign up or log in to comment