Skip to content

Commit ab97c77

Browse files
njhillyangw-dev
authored andcommitted
[V1][BugFix] Exit properly if engine core fails during startup (vllm-project#16137)
Signed-off-by: Nick Hill <[email protected]> Signed-off-by: Yang Wang <[email protected]>
1 parent f7e16cb commit ab97c77

File tree

5 files changed

+67
-14
lines changed

5 files changed

+67
-14
lines changed

requirements/test.in

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ pytest-forked
55
pytest-asyncio
66
pytest-rerunfailures
77
pytest-shard
8+
pytest-timeout
89

910
# testing utils
1011
awscli

requirements/test.txt

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -444,6 +444,7 @@ pytest==8.3.3
444444
# pytest-mock
445445
# pytest-rerunfailures
446446
# pytest-shard
447+
# pytest-timeout
447448
pytest-asyncio==0.24.0
448449
# via -r requirements/test.in
449450
pytest-forked==1.6.0
@@ -454,6 +455,8 @@ pytest-rerunfailures==14.0
454455
# via -r requirements/test.in
455456
pytest-shard==0.1.2
456457
# via -r requirements/test.in
458+
pytest-timeout==2.3.1
459+
# via -r requirements/test.in
457460
python-dateutil==2.9.0.post0
458461
# via
459462
# botocore

tests/v1/engine/test_engine_core_client.py

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,10 @@
33
import asyncio
44
import time
55
import uuid
6+
from threading import Thread
67
from typing import Optional
78

9+
import psutil
810
import pytest
911
from transformers import AutoTokenizer
1012

@@ -245,3 +247,42 @@ async def test_engine_core_client_asyncio(monkeypatch: pytest.MonkeyPatch):
245247
await core_client.call_utility_async("echo", None, "help!")
246248

247249
assert str(e_info.value) == "Call to echo method failed: help!"
250+
251+
252+
@pytest.mark.timeout(10)
253+
def test_startup_failure(monkeypatch: pytest.MonkeyPatch):
254+
255+
with monkeypatch.context() as m, pytest.raises(Exception) as e_info:
256+
m.setenv("VLLM_USE_V1", "1")
257+
258+
engine_args = EngineArgs(model=MODEL_NAME)
259+
vllm_config = engine_args.create_engine_config(
260+
usage_context=UsageContext.UNKNOWN_CONTEXT)
261+
executor_class = Executor.get_class(vllm_config)
262+
263+
# Start another thread to wait for engine core process to start
264+
# and kill it - simulate fatal uncaught process exit.
265+
this_proc = psutil.Process()
266+
children_before = set(this_proc.children())
267+
268+
def kill_first_child():
269+
while True:
270+
time.sleep(0.5)
271+
children = set(this_proc.children()) - children_before
272+
if children:
273+
child = children.pop()
274+
print("Killing child core process", child.pid)
275+
child.kill()
276+
break
277+
278+
Thread(target=kill_first_child, daemon=True).start()
279+
280+
_core_client = EngineCoreClient.make_client(
281+
multiprocess_mode=True,
282+
asyncio_mode=True,
283+
vllm_config=vllm_config,
284+
executor_class=executor_class,
285+
log_stats=True,
286+
)
287+
288+
assert "Engine core initialization failed" in str(e_info.value)

vllm/v1/engine/core_client.py

Lines changed: 14 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -411,10 +411,21 @@ def _wait_for_engine_startup(self):
411411

412412
# Wait for engine core process(es) to send ready messages.
413413
identities = set(eng.index for eng in self.resources.core_engines)
414+
poller = zmq.Poller()
415+
poller.register(sync_input_socket, zmq.POLLIN)
416+
for eng in self.resources.core_engines:
417+
poller.register(eng.proc_handle, zmq.POLLIN)
414418
while identities:
415-
while not sync_input_socket.poll(timeout=STARTUP_POLL_PERIOD_MS):
416-
logger.info("Waiting for %d core engine proc(s) to start: %s",
417-
len(identities), identities)
419+
events = poller.poll(STARTUP_POLL_PERIOD_MS)
420+
if not events:
421+
logger.debug("Waiting for %d core engine proc(s) to start: %s",
422+
len(identities), identities)
423+
continue
424+
if len(events) > 1 or events[0][0] != sync_input_socket:
425+
# One of the core processes exited.
426+
raise RuntimeError("Engine core initialization failed. "
427+
"See root cause above.")
428+
418429
eng_id_bytes, msg = sync_input_socket.recv_multipart()
419430
eng_id = int.from_bytes(eng_id_bytes, byteorder="little")
420431
if eng_id not in identities:
@@ -424,12 +435,6 @@ def _wait_for_engine_startup(self):
424435
logger.info("Core engine process %d ready.", eng_id)
425436
identities.discard(eng_id)
426437

427-
# Double check that the process are running.
428-
for engine in self.resources.core_engines:
429-
proc = engine.proc_handle.proc
430-
if proc.exitcode is not None:
431-
raise RuntimeError(f"Engine proc {proc.name} not running")
432-
433438
def _init_core_engines(
434439
self,
435440
vllm_config: VllmConfig,

vllm/v1/utils.py

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
11
# SPDX-License-Identifier: Apache-2.0
22

3-
import multiprocessing
43
import os
54
import weakref
65
from collections import defaultdict
76
from collections.abc import Sequence
7+
from multiprocessing import Process
88
from typing import (TYPE_CHECKING, Any, Callable, Generic, Optional, TypeVar,
99
Union, overload)
1010

@@ -112,20 +112,23 @@ def __init__(
112112
process_kwargs["output_path"] = output_path
113113

114114
# Run busy loop in background process.
115-
self.proc = context.Process(target=target_fn,
116-
kwargs=process_kwargs,
117-
name=process_name)
115+
self.proc: Process = context.Process(target=target_fn,
116+
kwargs=process_kwargs,
117+
name=process_name)
118118
self._finalizer = weakref.finalize(self, shutdown, self.proc,
119119
input_path, output_path)
120120
self.proc.start()
121121

122+
def fileno(self):
123+
return self.proc.sentinel
124+
122125
def shutdown(self):
123126
self._finalizer()
124127

125128

126129
# Note(rob): shutdown function cannot be a bound method,
127130
# else the gc cannot collect the object.
128-
def shutdown(proc: multiprocessing.Process, input_path: str, output_path: str):
131+
def shutdown(proc: Process, input_path: str, output_path: str):
129132
# Shutdown the process.
130133
if proc.is_alive():
131134
proc.terminate()

0 commit comments

Comments
 (0)