Qwen3.5 Full Multimodal SFT Example (Arm64 GH200)
Note
Does use VLLM Docker Container for eval, for further Details see: https://huggingface.co/blog/AlioLeuchtmann/sft-with-vllm-eval Here using Custom VLLM openai docker image built on GH200 dockerinisimo/vllm-arm:qwen3.5
Requirements
requirements.txt
torch
transformers[torch]==5.2.0 # 5.2.0 necessary
datasets
accelerate
hf_transfer
huggingface_hub
tqdm
trl==0.14.0
openai
qwen_vl_utils
# Prepare environment
python3 -m venv venv
source venv/bin/activate
pip install -r requirements.txt
# Well this might need an Update with appropriate Cuda Versions but so far it still works great for me
pip install bitsandbytes; git clone https://github.com/bitsandbytes-foundation/bitsandbytes.git; cd bitsandbytes; git checkout 7aec4a88465440b6466a526fe9bbb30930a04ba4; python setup.py install ;cmake -DCOMPUTE_BACKEND=cuda -S .; make CUDA_VERSION=126; cp /home/ubuntu/bitsandbytes/bitsandbytes/libbitsandbytes_cuda128.so /home/ubuntu/venv/lib/python3.10/site-packages/bitsandbytes/libbitsandbytes_cuda126.so
pip uninstall torch torchvision -y; pip install torch torchvision --index-url https://download.pytorch.org/whl/cu130;
Important: Qwen3.5 Models have a new awesome Attention Mechanism
It is absolutly awesome since it reduces context length scaling to linear instead of quadratic which drastically reduces Memory Requirements for longer Contexts. But this requires additional Preparations for efficient training. (If not done, 10x slow down)
# we need flash-linear attention
pip install -U git+https://github.com/fla-org/flash-linear-attention
# and causal-conv1d, singly command below does work well on GH200, simple pip install causal-conv1d might cause issues
wget https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2204/sbsa/cuda-keyring_1.1-1_all.deb && \
sudo dpkg -i cuda-keyring_1.1-1_all.deb && \
sudo apt-get update && \
sudo apt-get install -y cuda-toolkit-13-0 && \
export PATH=/usr/local/cuda-13.0/bin:$PATH && \
export CUDA_HOME=/usr/local/cuda-13.0 && \
pip install -U git+https://github.com/Dao-AILab/causal-conv1d --no-build-isolation
Full Example Script (you will have to adjust for your Dataset)
See especially convert_example_to_messages
import argparse
import base64
import gc
import json
import logging
import os
import random
import subprocess
import threading
import time
import traceback
from concurrent.futures import ThreadPoolExecutor
from io import BytesIO
import httpx
import numpy as np
import openai
import requests
import torch
from datasets import load_dataset, Dataset, DatasetDict
from huggingface_hub import create_repo
from qwen_vl_utils import process_vision_info
from torch.utils.data import SequentialSampler
from tqdm import tqdm
from transformers import (
AutoModelForCausalLM,
AutoProcessor,
TrainingArguments, Qwen3_5ForConditionalGeneration, Qwen3VLForConditionalGeneration
)
from trl import SFTTrainer,SFTConfig
import eval as evaluation_script
import PIL.Image as Image
os.environ["TOKENIZERS_PARALLELISM"] = "false"
os.environ["CUDA_LAUNCH_BLOCKING"] = "1"
os.environ["HF_TOKEN"] = "xxx"
os.environ["PYTORCH_CUDA_ALLOC_CONF"] = "expandable_segments:True"
# Configure logging
logging.basicConfig(level=logging.INFO)
repo_owner = 'ItsMe'
max_seq_length = 5000
def pil_to_data_url(img: Image.Image, fmt="png") -> str:
buf = BytesIO()
img.save(buf, format=fmt)
b64 = base64.b64encode(buf.getvalue()).decode()
return f"data:image/{fmt};base64,{b64}"
def _image_content(img: Image.Image, for_openai: bool):
if for_openai:
return {"type": "image_url", "image_url": {"url": pil_to_data_url(img)}}
return {"type": "image", "image": img}
def convert_example_to_messages(example, completion=True, for_openai=False):
messages = []
if 'sprinkle_FineReason' in example["type"]:
prev_img = Image.open(BytesIO(example["prev_image_bytes"])).convert("RGB")
messages = [
{"role": "user", "content": [
{"type": "text", "text": example["prompt"]},
_image_content(prev_img, for_openai),
]}
]
else:
prev_img = Image.open(BytesIO(example["prev_image_bytes"])).convert("RGB")
curr_img = Image.open(BytesIO(example["curr_image_bytes"])).convert("RGB")
messages = [
#{"role": "system", "content": [{"type": "text", "text":example["prompt"]},]},
{"role": "user", "content": [
{"type": "text", "text": example["prompt"]},
{"type": "text", "text": "Image A (Previous Page):"},
_image_content(prev_img, for_openai),
{"type": "text", "text": "Image B (Current Page):"},
_image_content(curr_img, for_openai),
{"type": "text", "text": "Start the analysis."},
]},
]
return messages
# Custom SFTTrainer with VLLM evaluation
class SFTTrainerWithVLLMEval(SFTTrainer):
def __init__(self, *args, score_func=None,first_eval_step=None, **kwargs):
super().__init__(*args, **kwargs)
self.score_func = score_func
self.first_eval_step = first_eval_step
if score_func is None:
raise ValueError("score_func must be provided for evaluation.")
def _get_train_sampler(self, dataset=None):
"""
Overrides the default RandomSampler to a SequentialSampler.
This ensures the DataLoader yields batches in the exact order
the dataset is provided.
"""
# Use the passed dataset, or fallback to the trainer's dataset
train_dataset = dataset if dataset is not None else self.train_dataset
if train_dataset is None:
return None
return SequentialSampler(train_dataset)
def start_vllm_server(self, checkpoint_dir):
"""Start the VLLM server using Docker with the checkpoint path mounted."""
checkpoint_path = os.path.abspath(checkpoint_dir).replace("\\", "/")
container_model_path = "/model"
# Dynamically calculate GPU memory utilization
total_mem = torch.cuda.get_device_properties(0).total_memory / 1024 ** 3
reserved_mem = torch.cuda.memory_reserved(0) / 1024 ** 3
free_mem = total_mem - reserved_mem
util = max(0.1, (free_mem / total_mem) * 0.70) # 10% safety margin
logging.info(f"GPU: {total_mem:.1f}GB total, {reserved_mem:.1f}GB reserved, "
f"{free_mem:.1f}GB free → vllm util={util:.3f}")
logging.info(f"Mounting {checkpoint_path} to {container_model_path} and start vllm")
docker_cmd = [
"sudo", "docker", "run", "-d",
"--platform", "linux/arm64",
"--ipc", "host",
"-p", "8000:8000",
"-v", f"{checkpoint_path}:{container_model_path}",
"--gpus", "all",
"-e", "HF_HOME=/root/.cache/huggingface/",
"-e", "NCCL_SOCKET_IFNAME=eth0",
"dockerinisimo/vllm-arm:qwen3.5", # arm64 GH200 built Image
container_model_path,
"--trust-remote-code",
"--config-format", "hf",
"--host", "0.0.0.0",
"--port", "8000",
"--max-model-len", f"{max_seq_length + 3000}",
"--tensor-parallel-size", "1",
"--gpu-memory-utilization", f"{util:.3f}",
]
process = subprocess.run(docker_cmd, capture_output=True, text=True)
time.sleep(5)
container_id = process.stdout.strip()
logging.info(f"CONTAINER STARTED WITH ID: {container_id}")
max_attempts = 360
for _ in range(max_attempts):
try:
response = requests.get("http://localhost:8000/version", timeout=120)
response.raise_for_status()
print("VLLM server started successfully.")
return container_id
except Exception as e:
time.sleep(2)
raise RuntimeError("VLLM server failed to start within timeout.")
def stop_vllm_server(self, container_id):
"""Stop and remove the VLLM Docker container."""
try:
subprocess.run(["sudo", "docker", "stop", container_id], check=True, stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
print("VLLM server stopped and container removed.")
except subprocess.CalledProcessError as e:
print(f"Failed to stop VLLM server: {e}")
def generate_with_vllm(self, max_workers=128):
logging.getLogger("httpx").setLevel(logging.WARNING)
"""Generate predictions using the VLLM server in parallel, handling 2 images per prompt."""
client = openai.OpenAI(
base_url="http://localhost:8000/v1",
api_key="XXX",
max_retries=3,
http_client=httpx.Client(
limits=httpx.Limits(
max_connections=1000,
max_keepalive_connections=1000,
),
timeout=httpx.Timeout(500.0),
),
)
items = []
references = []
mapped_ds = self.eval_dataset
for batch in tqdm(mapped_ds, desc="Preparing VLLM prompts"):
if isinstance(batch, list):
for item in batch:
items.append(item)
references.append(item["completion"])
else:
items.append(batch)
references.append(batch["completion"])
def _call_one(item) -> str:
try:
resp = client.chat.completions.create(
model="/model",
messages=convert_example_to_messages(item,completion=False,for_openai=True),
max_tokens=3000,
temperature=0.7,
stream=False,
extra_body={"chat_template_kwargs": {"enable_thinking": False}},
)
return resp.choices[0].message.content
except Exception as e:
logging.info(f"Error generating for one prompt: {e}")
traceback.print_exc()
return ""
predictions = []
with ThreadPoolExecutor(max_workers=max_workers) as exec:
for out in tqdm(exec.map(_call_one, items), total=len(items), desc="VLLM parallel inference"):
predictions.append(out)
info = []
for i, item in enumerate(mapped_ds):
info.append({
"prediction": predictions[i],
"references": references[i],
"path": mapped_ds[i].get('file_name', 'unknown'),
})
return predictions, references, info
def evaluate(self, eval_dataset=None, ignore_keys=None, metric_key_prefix="eval"):
current_step = self.state.global_step
print(f"eval at {current_step}: is < than {self.first_eval_step} ?? ")
if self.first_eval_step and current_step < self.first_eval_step:
return {"dummy":"dummy","eval_pss_accuracy":0}
# 1. Clear gradients to free immediate memory
if hasattr(self, "optimizer") and self.optimizer is not None:
self.optimizer.zero_grad(set_to_none=True)
# 2. Move Model to CPU
self.model.to('cpu')
# 3. Safely OFFLOAD Optimizer states to CPU (Do NOT delete them)
if hasattr(self, "optimizer") and self.optimizer is not None:
for param, state in self.optimizer.state.items():
for k, v in state.items():
if isinstance(v, torch.Tensor):
state[k] = v.to('cpu') # Move tensor to system RAM
if hasattr(self, 'accelerator'):
self.accelerator.free_memory()
gc.collect()
torch.cuda.empty_cache()
allocated = torch.cuda.memory_allocated() / 1280 ** 3
reserved = torch.cuda.memory_reserved() / 1280 ** 3
logging.info(f"GPU after safe CPU offloading: {allocated:.1f}GB allocated, {reserved:.1f}GB reserved")
# 4. Save checkpoint for VLLM
checkpoint_dir = f"temp_checkpoint_{current_step}"
self.model.save_pretrained(checkpoint_dir)
self.processing_class.save_pretrained(checkpoint_dir)
# 5. Run VLLM Evaluation
container_id = self.start_vllm_server(checkpoint_dir)
try:
predictions, references, info = self.generate_with_vllm()
finally:
self.stop_vllm_server(container_id)
time.sleep(10)
# 6. Bring the Model BACK to the GPU
self.model.to('cuda')
# 7. Bring the Optimizer states BACK to the GPU to resume training
if hasattr(self, "optimizer") and self.optimizer is not None:
for param, state in self.optimizer.state.items():
for k, v in state.items():
if isinstance(v, torch.Tensor):
state[k] = v.to('cuda')
torch.cuda.empty_cache()
gc.collect()
# 8. Calculate Metrics
field_averages = self.score_func(predictions, references)
output_dir = os.path.join(self.args.output_dir, 'eval_steps')
os.makedirs(output_dir, exist_ok=True)
with open(os.path.join(output_dir, f'eval_{current_step}.json'), "w", encoding='utf-8') as f:
json.dump({"predictions": predictions, "references": references, "info": info}, f, indent=4)
updated = {f"eval_{key}": value for key, value in field_averages.items()}
self.log(updated)
logging.info(f"Eval results: {updated}")
return updated
def adjust_steps_ez(train_dataset_size, effective_batch_size, percentage):
steps_per_epoch = train_dataset_size // effective_batch_size
steps = max(1, steps_per_epoch * percentage // 100)
return steps
def get_dataset(dataset_path,clean=False):
seed = 3123
# Set seed for reproducibility (Numpy)
np.random.seed(seed)
# 1. Check if path exists locally
if os.path.exists(dataset_path):
print(f"Loading local file: {dataset_path}")
with open(dataset_path, "r", encoding="utf-8") as f:
data = json.loads(f.read())
# Check if the expected keys exist in the JSON
if isinstance(data, dict) and "train" in data and "test" in data:
# If pre-split in JSON, convert to Dataset directly
dataset = DatasetDict({
"train": Dataset.from_list(data["train"]),
"test": Dataset.from_list(data["test"]),
})
else:
# Flatten and shuffle
full_pool = data if isinstance(data, list) else list(data.values())
np.random.shuffle(full_pool)
# Calculate split index (0.9 for train)
split_idx = int(len(full_pool) * 0.9)
train_data = full_pool[:split_idx]
test_data = full_pool[split_idx:]
dataset = DatasetDict({
"train": Dataset.from_list(train_data),
"test": Dataset.from_list(test_data),
})
# 2. If not local, assume Hugging Face Repo ID
else:
print(f"Loading from Hugging Face Hub: {dataset_path}")
# Load dataset (downloads config if necessary)
raw_dataset = load_dataset(dataset_path) # token from env
# Check structure and standardize to DatasetDict with train/test
if isinstance(raw_dataset, DatasetDict):
if "test" not in raw_dataset:
# If there is a validation set but no test, map validation to test
# OR split train if only train exists.
if "validation" in raw_dataset:
dataset = DatasetDict({
"train": raw_dataset["train"],
"test": raw_dataset["validation"]
})
elif "train" in raw_dataset:
# Split train 90/10 to match your manual logic
dataset = raw_dataset["train"].train_test_split(test_size=0.1, seed=seed)
else:
# Fallback for weird structures
dataset = raw_dataset
else:
dataset = raw_dataset
else:
# If raw_dataset is a single Dataset object (no splits yet)
dataset = raw_dataset.train_test_split(test_size=0.1, seed=seed)
# Final Access
train_set = dataset["train"].shuffle(seed=seed)
test_set = dataset["test"]
return dataset,train_set,test_set
def train_and_evaluate(model_name, dataset_path, score_func,clean=False):
new_model_name = f'{model_name}_{dataset_path.replace("AlioLeuchtmann/","")}'
os.makedirs(new_model_name, exist_ok=True)
logging.info(f'NEW MODEL NAME IS {new_model_name}')
processor = AutoProcessor.from_pretrained(
model_name,
trust_remote_code=True,
min_pixels=256 * 28 * 28, # default
#max_pixels=1280 * 28 * 28, # uncomment if memory problems
)
processor.padding_side = "right"
def model_init():
print("MODEL INIT")
model = Qwen3_5ForConditionalGeneration.from_pretrained(
model_name,
torch_dtype="auto",
trust_remote_code=True,
device_map="auto",
)
print(model.config._attn_implementation)
model.gradient_checkpointing_enable(gradient_checkpointing_kwargs={"use_reentrant": False})
return model
dataset,train_set,eval_set = get_dataset(dataset_path,clean)
def new_collate_fn(examples):
# Apply the chat template
modified = [
convert_example_to_messages(example) for example in examples
]
texts = [processor.apply_chat_template(messages, tokenize=False,enable_thinking=False ) for messages in modified]
image_inputs = [process_vision_info(messages)[0] for messages in modified]
batch = processor(
text=texts, images=image_inputs, return_tensors="pt", padding='longest'
)
labels = batch["input_ids"].clone()
# 1. Mask padding
labels[labels == processor.tokenizer.pad_token_id] = -100
# 2. Mask image tokens
for token in ["<|image_pad|>", "<|vision_start|>", "<|vision_end|>"]:
tok_id = processor.tokenizer.convert_tokens_to_ids(token)
if tok_id is not None:
labels[labels == tok_id] = -100
# 3. Mask everything before the assistant response (prompt masking)
# Qwen chat template: <|im_start|>user\n...<|im_end|>\n<|im_start|>assistant\n...
# We find the last <|im_start|> which begins the assistant turn,
# then mask everything up to and including the "assistant\n" header.
im_start_id = processor.tokenizer.convert_tokens_to_ids("<|im_start|>")
for i in range(labels.size(0)):
input_ids = batch["input_ids"][i]
im_start_positions = (input_ids == im_start_id).nonzero(as_tuple=True)[0]
if len(im_start_positions) > 0:
# Last <|im_start|> is the assistant turn
assistant_turn_start = im_start_positions[-1].item()
# The actual content starts after "<|im_start|>assistant\n"
# which is typically 2 more tokens: "assistant" + "\n"
# We mask up to and including those header tokens
content_start = assistant_turn_start + 3 # <|im_start|> + assistant + \n
# Safety: don't exceed sequence length
content_start = min(content_start, labels.size(1))
labels[i, :content_start] = -100
batch["labels"] = labels
return batch
bs = 1
ga = 16
epochs = 1
eval_percentage = 5
train_args = TrainingArguments(
output_dir=new_model_name,
num_train_epochs=epochs,
per_device_train_batch_size=bs,
gradient_accumulation_steps=ga,
per_device_eval_batch_size=1,
eval_steps=adjust_steps_ez(len(train_set), bs * ga, eval_percentage),
save_steps=adjust_steps_ez(len(train_set), bs * ga, eval_percentage),
learning_rate=2e-5,
logging_steps=1,
#max_grad_norm=0.5, # grad norm to low limiting trainig
warmup_ratio=0.08,
#weight_decay=0.001,
optim="adamw_8bit",
eval_strategy='steps',
save_strategy='steps',
log_level="debug",
lr_scheduler_type="cosine",
hub_strategy="all_checkpoints",
load_best_model_at_end=True,
metric_for_best_model="eval_pss_accuracy",
save_total_limit=None,
gradient_checkpointing=True,
bf16=True,
remove_unused_columns=False,
)
first_step = None # adjust_steps_ez(len(train_set), bs * ga, 12)
trainer = SFTTrainerWithVLLMEval(
model=model_init(),
args=SFTConfig(**train_args.to_dict(),
max_seq_length=max_seq_length,
dataset_text_field='input_ids',
dataset_kwargs={"skip_prepare_dataset": True},
packing=False,
),
train_dataset=train_set,
eval_dataset=eval_set,
processing_class=processor,
data_collator=new_collate_fn,
score_func=score_func,
first_eval_step=first_step
)
#res = trainer.evaluate()
#print(json.dumps(res, indent=2))
trainer.train()
del trainer
del dataset
del train_set
del eval_set
torch.cuda.empty_cache()
gc.collect()
def main():
print("PyTorch version:", torch.__version__)
print("CUDA available:", torch.cuda.is_available())
if not torch.cuda.is_available():
print("NO CUDA EXIT!")
exit()
parser = argparse.ArgumentParser(description="Train a Qwen3 Model on the Markdown set")
parser.add_argument("--model", type=str, required=True, default="Qwen/Qwen3-VL-2B-Instruct", help="Model")
parser.add_argument("--dataset", type=str, required=True, help="Dataset")
parser.add_argument("--clean", action="store_true", default=False, help="Clean")
args = parser.parse_args()
print(args.clean)
train_and_evaluate(args.model, args.dataset, evaluation_script.trivial_eval,args.clean)
if __name__ == "__main__":
main()