Hello guys, can someone helpm me understand how to generate the proper JSON output using VLLM, QWEN 3 30B model.
The outout i would like to get is:
{
"qa_number": 1,
"question": "Who issued this resolution?",
"answer": "Director of Public Health Services¹",
}
]
I am using a config file:
models:
- id: qwen3_30b
hub_name: Qwen/Qwen3-30B-A3B # Qwen/Qwen3-235B-A22B
model_type: text
gpu_ids: "0,1,2,3" # "0,1,2,3,4,5,6,7"
tensor_parallel_size: 4
sampling:
temperature: 0.7
top_p: 0.9
top_k: 40
max_tokens: 4096
and using a model runner code:
# qa_generation/src/model_runner.py
"""
ModelRunner: keeps one vLLM engine alive in its own subprocess and can
execute *batches* of prompts via generate_many().
"""
from __future__ import annotations
import os
import multiprocessing as mp
from typing import List
from vllm import LLM, SamplingParams
from transformers import AutoTokenizer
from pathlib import Path
from vllm.sampling_params import GuidedDecodingParams
def _worker(cfg: dict, pipe):
# pin this subprocess to the desired GPUs
os.environ["CUDA_VISIBLE_DEVICES"] = cfg["gpu_ids"]
os.environ["VLLM_SKIP_MM_PROFILING"] = "1"
# build a common arg‐dics
llm_kwargs = {
"model": cfg["hub_name"], # str(model_path),
"tensor_parallel_size": cfg["tensor_parallel_size"],
"trust_remote_code": cfg.get("trust_remote_code", False),
}
if cfg.get("model_type") == "vlm":
# VLM‐specific engine args:
llm_kwargs.update({
"enable_prefix_caching": False,
"enable_chunked_prefill": False,
"max_model_len": cfg.get("max_model_len", 32768),
"gpu_memory_utilization": cfg.get("gpu_memory_utilization", 0.9),
"max_num_seqs": cfg.get("max_num_seqs", 1),
})
else:
# text‐only model defaults:
llm_kwargs.update({
"enable_prefix_caching": True,
"enable_chunked_prefill": True,
"enable_reasoning": cfg.get("enable_reasoning", False),
# "enable_thinking": cfg.get("enable_thinking", False), # add this
# if you ever need reasoning parsers:
**({"reasoning_parser": cfg["reasoning_parser"]}
if cfg.get("enable_reasoning") else {}),
})
llm = LLM(**llm_kwargs)
# warm up tokenizer cache
_ = AutoTokenizer.from_pretrained(cfg["hub_name"], use_fast=True)
while True:
payload = pipe.recv()
if payload == "__quit__":
break
# prompts, sampling = payload
# sp = SamplingParams(**sampling)
# For JSON decoding.
prompts, sampling = pipe.recv()
# ── Handle structured JSON decoding if configured ──────────────────────
gd_conf = sampling.pop("guided_decoding", None)
if gd_conf:
# Load schema from file if a path was given
if isinstance(gd_conf.get("json"), str):
import json, pathlib
schema_path = pathlib.Path(gd_conf["json"])
gd_conf["json"] = json.loads(schema_path.read_text())
guided = GuidedDecodingParams(**gd_conf)
sp = SamplingParams(**sampling, guided_decoding=guided)
else:
sp = SamplingParams(**sampling)
outputs = llm.generate(prompts, sampling_params=sp)
texts = [o.outputs[0].text for o in outputs]
pipe.send(texts)
pipe.close()
class ModelRunner:
"""Wraps one background vLLM process for batched inference."""
def __init__(self, cfg: dict):
parent, child = mp.Pipe()
self._pipe = parent
self._p = mp.Process(target=_worker, args=(cfg, child))
self._p.start()
def generate_many(self, prompts: List[str], sampling: dict) -> List[str]:
"""
Send a list of prompts to the engine in one go. Returns a list of
raw strings, one per prompt.
"""
self._pipe.send((prompts, sampling))
return self._pipe.recv()
def close(self):
"""Shutdown the background process safely."""
try:
self._pipe.send("__quit__")
except (BrokenPipeError, EOFError):
pass # Process already closed
self._p.join()
I am not very sure on how to use the guided decoding for generating the structured LLM outputs