Skip to content

Commit 8bcc759

Browse files
robertgshaw2-redhatzinccat
authored andcommitted
[V1] Simplify Shutdown (#11659)
Signed-off-by: ZincCat <[email protected]>
1 parent 33d9fbc commit 8bcc759

File tree

7 files changed

+42
-60
lines changed

7 files changed

+42
-60
lines changed

tests/v1/engine/test_engine_core_client.py

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -142,9 +142,6 @@ def test_engine_core_client(monkeypatch, multiprocessing_mode: bool):
142142

143143
client.abort_requests([request.request_id])
144144

145-
# Shutdown the client.
146-
client.shutdown()
147-
148145

149146
@pytest.mark.asyncio
150147
async def test_engine_core_client_asyncio(monkeypatch):
@@ -200,6 +197,3 @@ async def test_engine_core_client_asyncio(monkeypatch):
200197
else:
201198
assert len(outputs[req_id]) == MAX_TOKENS, (
202199
f"{len(outputs[req_id])=}, {MAX_TOKENS=}")
203-
204-
# Shutdown the client.
205-
client.shutdown()

vllm/entrypoints/llm.py

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -232,11 +232,6 @@ def __init__(
232232

233233
self.request_counter = Counter()
234234

235-
def __del__(self):
236-
if hasattr(self, 'llm_engine') and self.llm_engine and hasattr(
237-
self.llm_engine, "shutdown"):
238-
self.llm_engine.shutdown()
239-
240235
@staticmethod
241236
def get_engine_class() -> Type[LLMEngine]:
242237
if envs.VLLM_USE_V1:

vllm/v1/engine/async_llm.py

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -103,9 +103,6 @@ def sigquit_handler(signum, frame):
103103

104104
self.output_handler: Optional[asyncio.Task] = None
105105

106-
def __del__(self):
107-
self.shutdown()
108-
109106
@classmethod
110107
def from_engine_args(
111108
cls,

vllm/v1/engine/core.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -203,7 +203,6 @@ def signal_handler(signum, frame):
203203
finally:
204204
if engine_core is not None:
205205
engine_core.shutdown()
206-
engine_core = None
207206

208207
def run_busy_loop(self):
209208
"""Core busy loop of the EngineCore."""

vllm/v1/engine/core_client.py

Lines changed: 18 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
1-
from typing import List, Optional, Type
1+
import weakref
2+
from abc import ABC, abstractmethod
3+
from typing import List, Type
24

35
import msgspec
46
import zmq
@@ -18,7 +20,7 @@
1820
logger = init_logger(__name__)
1921

2022

21-
class EngineCoreClient:
23+
class EngineCoreClient(ABC):
2224
"""
2325
EngineCoreClient: subclasses handle different methods for pushing
2426
and pulling from the EngineCore for asyncio / multiprocessing.
@@ -52,8 +54,9 @@ def make_client(
5254

5355
return InprocClient(vllm_config, executor_class, log_stats)
5456

57+
@abstractmethod
5558
def shutdown(self):
56-
pass
59+
...
5760

5861
def get_output(self) -> List[EngineCoreOutput]:
5962
raise NotImplementedError
@@ -107,9 +110,6 @@ def abort_requests(self, request_ids: List[str]) -> None:
107110
def shutdown(self):
108111
self.engine_core.shutdown()
109112

110-
def __del__(self):
111-
self.shutdown()
112-
113113
def profile(self, is_start: bool = True) -> None:
114114
self.engine_core.profile(is_start)
115115

@@ -139,10 +139,14 @@ def __init__(
139139
self.decoder = msgspec.msgpack.Decoder(EngineCoreOutputs)
140140

141141
# ZMQ setup.
142-
if asyncio_mode:
143-
self.ctx = zmq.asyncio.Context()
144-
else:
145-
self.ctx = zmq.Context() # type: ignore[attr-defined]
142+
self.ctx = (
143+
zmq.asyncio.Context() # type: ignore[attr-defined]
144+
if asyncio_mode else zmq.Context()) # type: ignore[attr-defined]
145+
146+
# Note(rob): shutdown function cannot be a bound method,
147+
# else the gc cannot collect the object.
148+
self._finalizer = weakref.finalize(self, lambda x: x.destroy(linger=0),
149+
self.ctx)
146150

147151
# Paths and sockets for IPC.
148152
output_path = get_open_zmq_ipc_path()
@@ -153,7 +157,6 @@ def __init__(
153157
zmq.constants.PUSH)
154158

155159
# Start EngineCore in background process.
156-
self.proc_handle: Optional[BackgroundProcHandle]
157160
self.proc_handle = BackgroundProcHandle(
158161
input_path=input_path,
159162
output_path=output_path,
@@ -166,12 +169,11 @@ def __init__(
166169
})
167170

168171
def shutdown(self):
169-
# Shut down the zmq context.
170-
self.ctx.destroy(linger=0)
171-
172-
if hasattr(self, "proc_handle") and self.proc_handle:
172+
"""Clean up background resources."""
173+
if hasattr(self, "proc_handle"):
173174
self.proc_handle.shutdown()
174-
self.proc_handle = None
175+
176+
self._finalizer()
175177

176178

177179
class SyncMPClient(MPClient):

vllm/v1/engine/llm_engine.py

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -205,10 +205,3 @@ def get_tokenizer_group(
205205
f"found type: {type(tokenizer_group)}")
206206

207207
return tokenizer_group
208-
209-
def __del__(self):
210-
self.shutdown()
211-
212-
def shutdown(self):
213-
if engine_core := getattr(self, "engine_core", None):
214-
engine_core.shutdown()

vllm/v1/utils.py

Lines changed: 24 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import multiprocessing
12
import os
23
import weakref
34
from collections.abc import Sequence
@@ -91,8 +92,6 @@ def __init__(
9192
target_fn: Callable,
9293
process_kwargs: Dict[Any, Any],
9394
):
94-
self._finalizer = weakref.finalize(self, self.shutdown)
95-
9695
context = get_mp_context()
9796
reader, writer = context.Pipe(duplex=False)
9897

@@ -102,33 +101,36 @@ def __init__(
102101
process_kwargs["ready_pipe"] = writer
103102
process_kwargs["input_path"] = input_path
104103
process_kwargs["output_path"] = output_path
105-
self.input_path = input_path
106-
self.output_path = output_path
107104

108-
# Run Detokenizer busy loop in background process.
105+
# Run busy loop in background process.
109106
self.proc = context.Process(target=target_fn, kwargs=process_kwargs)
107+
self._finalizer = weakref.finalize(self, shutdown, self.proc,
108+
input_path, output_path)
110109
self.proc.start()
111110

112111
# Wait for startup.
113112
if reader.recv()["status"] != "READY":
114113
raise RuntimeError(f"{process_name} initialization failed. "
115114
"See root cause above.")
116115

117-
def __del__(self):
118-
self.shutdown()
119-
120116
def shutdown(self):
121-
# Shutdown the process if needed.
122-
if hasattr(self, "proc") and self.proc.is_alive():
123-
self.proc.terminate()
124-
self.proc.join(5)
125-
126-
if self.proc.is_alive():
127-
kill_process_tree(self.proc.pid)
128-
129-
# Remove zmq ipc socket files
130-
ipc_sockets = [self.output_path, self.input_path]
131-
for ipc_socket in ipc_sockets:
132-
socket_file = ipc_socket.replace("ipc://", "")
133-
if os and os.path.exists(socket_file):
134-
os.remove(socket_file)
117+
self._finalizer()
118+
119+
120+
# Note(rob): shutdown function cannot be a bound method,
121+
# else the gc cannot collect the object.
122+
def shutdown(proc: multiprocessing.Process, input_path: str, output_path: str):
123+
# Shutdown the process.
124+
if proc.is_alive():
125+
proc.terminate()
126+
proc.join(5)
127+
128+
if proc.is_alive():
129+
kill_process_tree(proc.pid)
130+
131+
# Remove zmq ipc socket files.
132+
ipc_sockets = [output_path, input_path]
133+
for ipc_socket in ipc_sockets:
134+
socket_file = ipc_socket.replace("ipc://", "")
135+
if os and os.path.exists(socket_file):
136+
os.remove(socket_file)

0 commit comments

Comments
 (0)