@torch.inference_mode()
def execute_model(
self,
scheduler_output: "SchedulerOutput",
) -> Optional\[ModelRunnerOutput\]:
intermediate_tensors = None
if not get_pp_group().is_first_rank:
intermediate_tensors = IntermediateTensors(
get_pp_group().recv_tensor_dict(
all_gather_group=get_tp_group()))
if envs.VLLM_ZERO_OVERHEAD:
use_stream = zero_overhead_stream(self.device)
with torch.cuda.stream(use_stream):
output = self.model_runner.execute_model(scheduler_output,
intermediate_tensors)
else:
output = self.model_runner.execute_model(scheduler_output,
intermediate_tensors)
parallel_config = self.vllm_config.parallel_config
if parallel_config.distributed_executor_backend != "external_launcher" \\
and not get_pp_group().is_last_rank:
assert isinstance(output, IntermediateTensors)
get_pp_group().send_tensor_dict(output.tensors,
all_gather_group=get_tp_group())
return None
assert isinstance(output, ModelRunnerOutput)
return output if self.is_driver_worker else None
The code you provided is not present verbatim in the official vLLM data sources. However, the logic and structure closely match the documented and implemented execute_model method in vLLM’s gpu_worker.py and gpu_model_runner.py, with some differences in parameter names, environment variable checks, and output handling. Notably, the use of envs.VLLM_ZERO_OVERHEAD and zero_overhead_stream is not documented in the official sources, and the exact return logic for self.is_driver_worker is not present in the referenced implementations. Therefore, this code is only partially supported by the data sources, and some elements (like envs.VLLM_ZERO_OVERHEAD, zero_overhead_stream, and self.is_driver_worker) are not present in the official vLLM code or documentation.
Would you like a detailed step-by-step comparison of your code with the official vLLM implementation?
Sources:
Hint: Mention @RunLLM in the post for followups.
@RunLLM recv_tensor_dict and send_tensor_dict can be changed to asynchronous execute?
if forward_pass and not get_pp_group().is_first_rank:
tensor_dict = get_pp_group().recv_tensor_dict(
all_gather_group=get_tp_group(),
all_gather_tensors=all_gather_tensors,
)
assert tensor_dict is not None
intermediate_tensors = IntermediateTensors(tensor_dict)
with self.annotate_profile(scheduler_output):
output = self.model_runner.execute_model(
scheduler_output, intermediate_tensors
)
if isinstance(output, (ModelRunnerOutput, NoneType)):
return output
assert isinstance(output, IntermediateTensors)
parallel_config = self.vllm_config.parallel_config
assert (
parallel_config.distributed_executor_backend != "external_launcher"
and not get_pp_group().is_last_rank
)
get_pp_group().send_tensor_dict(
output.tensors,
all_gather_group=get_tp_group(),
all_gather_tensors=all_gather_tensors,
)
Currently, recv_tensor_dict and send_tensor_dict in vLLM are synchronous and blocking; there is no official support for fully asynchronous execution of these methods. However, both functions internally use PyTorch’s distributed communication primitives, which do support asynchronous operations (e.g., async_op=True for broadcast), but the vLLM API itself does not expose an async interface for these methods as of now. Any change to make them fully asynchronous would require significant refactoring and is not documented in the official API or codebase (send_tensor_dict docs, recv_tensor_dict docs).
Would you like more detail on how you might implement or prototype async versions, or the implications for pipeline parallelism?
Sources:
Hint: Mention @RunLLM in the post for followups.