Skip to content

Commit 4cf3720

Browse files
robertgshaw2-redhatmzusman
authored andcommitted
[V1] [4/N] API Server: ZMQ/MP Utilities (vllm-project#11541)
1 parent e237527 commit 4cf3720

File tree

12 files changed

+247
-215
lines changed

12 files changed

+247
-215
lines changed

docs/requirements-docs.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,3 +19,4 @@ openai # Required by docs/source/serving/openai_compatible_server.md's vllm.entr
1919
fastapi # Required by docs/source/serving/openai_compatible_server.md's vllm.entrypoints.openai.cli_args
2020
partial-json-parser # Required by docs/source/serving/openai_compatible_server.md's vllm.entrypoints.openai.cli_args
2121
requests
22+
zmq

tests/v1/engine/test_engine_core.py

Lines changed: 4 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@
77
from vllm import SamplingParams
88
from vllm.engine.arg_utils import EngineArgs
99
from vllm.platforms import current_platform
10-
from vllm.usage.usage_lib import UsageContext
1110
from vllm.v1.engine import EngineCoreRequest
1211
from vllm.v1.engine.async_llm import AsyncLLM
1312
from vllm.v1.engine.core import EngineCore
@@ -43,13 +42,11 @@ def test_engine_core(monkeypatch):
4342
m.setenv("VLLM_USE_V1", "1")
4443
"""Setup the EngineCore."""
4544
engine_args = EngineArgs(model=MODEL_NAME)
46-
vllm_config = engine_args.create_engine_config(
47-
usage_context=UsageContext.UNKNOWN_CONTEXT)
45+
vllm_config = engine_args.create_engine_config()
4846
executor_class = AsyncLLM._get_executor_cls(vllm_config)
4947

5048
engine_core = EngineCore(vllm_config=vllm_config,
51-
executor_class=executor_class,
52-
usage_context=UsageContext.UNKNOWN_CONTEXT)
49+
executor_class=executor_class)
5350
"""Test basic request lifecycle."""
5451

5552
# First request.
@@ -151,13 +148,11 @@ def test_engine_core_advanced_sampling(monkeypatch):
151148
m.setenv("VLLM_USE_V1", "1")
152149
"""Setup the EngineCore."""
153150
engine_args = EngineArgs(model=MODEL_NAME)
154-
vllm_config = engine_args.create_engine_config(
155-
usage_context=UsageContext.UNKNOWN_CONTEXT)
151+
vllm_config = engine_args.create_engine_config()
156152
executor_class = AsyncLLM._get_executor_cls(vllm_config)
157153

158154
engine_core = EngineCore(vllm_config=vllm_config,
159-
executor_class=executor_class,
160-
usage_context=UsageContext.UNKNOWN_CONTEXT)
155+
executor_class=executor_class)
161156
"""Test basic request lifecycle."""
162157
# First request.
163158
request: EngineCoreRequest = make_request()

tests/v1/engine/test_engine_core_client.py

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -86,11 +86,10 @@ def test_engine_core_client(monkeypatch, multiprocessing_mode: bool):
8686
UsageContext.UNKNOWN_CONTEXT)
8787
executor_class = AsyncLLM._get_executor_cls(vllm_config)
8888
client = EngineCoreClient.make_client(
89-
vllm_config,
90-
executor_class,
91-
UsageContext.UNKNOWN_CONTEXT,
9289
multiprocess_mode=multiprocessing_mode,
9390
asyncio_mode=False,
91+
vllm_config=vllm_config,
92+
executor_class=executor_class,
9493
)
9594

9695
MAX_TOKENS = 20
@@ -158,11 +157,10 @@ async def test_engine_core_client_asyncio(monkeypatch):
158157
usage_context=UsageContext.UNKNOWN_CONTEXT)
159158
executor_class = AsyncLLM._get_executor_cls(vllm_config)
160159
client = EngineCoreClient.make_client(
161-
vllm_config,
162-
executor_class,
163-
UsageContext.UNKNOWN_CONTEXT,
164160
multiprocess_mode=True,
165161
asyncio_mode=True,
162+
vllm_config=vllm_config,
163+
executor_class=executor_class,
166164
)
167165

168166
MAX_TOKENS = 20

vllm/entrypoints/openai/api_server.py

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@
6868
from vllm.logger import init_logger
6969
from vllm.usage.usage_lib import UsageContext
7070
from vllm.utils import (FlexibleArgumentParser, get_open_zmq_ipc_path,
71-
is_valid_ipv6_address, set_ulimit)
71+
is_valid_ipv6_address, kill_process_tree, set_ulimit)
7272
from vllm.version import __version__ as VLLM_VERSION
7373

7474
TIMEOUT_KEEP_ALIVE = 5 # seconds
@@ -737,6 +737,15 @@ def signal_handler(*_) -> None:
737737

738738
signal.signal(signal.SIGTERM, signal_handler)
739739

740+
# The child processes will send SIGQUIT to this process when
741+
# any error happens. This process then clean up the whole tree.
742+
# TODO(rob): move this into AsyncLLM.__init__ once we remove
743+
# the context manager below.
744+
def sigquit_handler(signum, frame):
745+
kill_process_tree(os.getpid())
746+
747+
signal.signal(signal.SIGQUIT, sigquit_handler)
748+
740749
async with build_async_engine_client(args) as engine_client:
741750
app = build_app(args)
742751

vllm/executor/multiproc_worker_utils.py

Lines changed: 1 addition & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
import asyncio
2-
import multiprocessing
32
import os
43
import sys
54
import threading
@@ -13,10 +12,9 @@
1312

1413
import torch
1514

16-
import vllm.envs as envs
1715
from vllm.logger import init_logger
1816
from vllm.triton_utils.importing import HAS_TRITON
19-
from vllm.utils import cuda_is_initialized
17+
from vllm.utils import _check_multiproc_method, get_mp_context
2018

2119
if HAS_TRITON:
2220
from vllm.triton_utils import maybe_set_triton_cache_manager
@@ -274,24 +272,6 @@ def write_with_prefix(s: str):
274272
file.write = write_with_prefix # type: ignore[method-assign]
275273

276274

277-
def _check_multiproc_method():
278-
if (cuda_is_initialized()
279-
and os.environ.get("VLLM_WORKER_MULTIPROC_METHOD") != "spawn"):
280-
logger.warning("CUDA was previously initialized. We must use "
281-
"the `spawn` multiprocessing start method. Setting "
282-
"VLLM_WORKER_MULTIPROC_METHOD to 'spawn'. "
283-
"See https://docs.vllm.ai/en/latest/getting_started/"
284-
"debugging.html#python-multiprocessing "
285-
"for more information.")
286-
os.environ["VLLM_WORKER_MULTIPROC_METHOD"] = "spawn"
287-
288-
289-
def get_mp_context():
290-
_check_multiproc_method()
291-
mp_method = envs.VLLM_WORKER_MULTIPROC_METHOD
292-
return multiprocessing.get_context(mp_method)
293-
294-
295275
def set_multiprocessing_worker_envs(parallel_config):
296276
""" Set up environment variables that should be used when there are workers
297277
in a multiprocessing environment. This should be called by the parent

vllm/utils.py

Lines changed: 87 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
import importlib.util
1111
import inspect
1212
import ipaddress
13+
import multiprocessing
1314
import os
1415
import re
1516
import resource
@@ -20,6 +21,7 @@
2021
import tempfile
2122
import threading
2223
import time
24+
import traceback
2325
import uuid
2426
import warnings
2527
import weakref
@@ -29,8 +31,9 @@
2931
from dataclasses import dataclass, field
3032
from functools import lru_cache, partial, wraps
3133
from typing import (TYPE_CHECKING, Any, AsyncGenerator, Awaitable, Callable,
32-
Dict, Generator, Generic, List, Literal, NamedTuple,
33-
Optional, Tuple, Type, TypeVar, Union, overload)
34+
Dict, Generator, Generic, Iterator, List, Literal,
35+
NamedTuple, Optional, Tuple, Type, TypeVar, Union,
36+
overload)
3437
from uuid import uuid4
3538

3639
import numpy as np
@@ -39,6 +42,8 @@
3942
import torch
4043
import torch.types
4144
import yaml
45+
import zmq
46+
import zmq.asyncio
4247
from packaging.version import Version
4348
from torch.library import Library
4449
from typing_extensions import ParamSpec, TypeIs, assert_never
@@ -1844,7 +1849,7 @@ def memory_profiling(
18441849
result.non_kv_cache_memory_in_bytes = result.non_torch_increase_in_bytes + result.torch_peak_increase_in_bytes + result.weights_memory_in_bytes # noqa
18451850

18461851

1847-
# Adapted from: https://github.com/sgl-project/sglang/blob/f46f394f4d4dbe4aae85403dec006199b34d2840/python/sglang/srt/utils.py#L630 # noqa: E501Curre
1852+
# Adapted from: https://github.com/sgl-project/sglang/blob/v0.4.1/python/sglang/srt/utils.py#L630 # noqa: E501
18481853
def set_ulimit(target_soft_limit=65535):
18491854
resource_type = resource.RLIMIT_NOFILE
18501855
current_soft, current_hard = resource.getrlimit(resource_type)
@@ -1859,3 +1864,82 @@ def set_ulimit(target_soft_limit=65535):
18591864
"with error %s. This can cause fd limit errors like"
18601865
"`OSError: [Errno 24] Too many open files`. Consider "
18611866
"increasing with ulimit -n", current_soft, e)
1867+
1868+
1869+
# Adapted from: https://github.com/sgl-project/sglang/blob/v0.4.1/python/sglang/utils.py#L28 # noqa: E501
1870+
def get_exception_traceback():
1871+
etype, value, tb = sys.exc_info()
1872+
err_str = "".join(traceback.format_exception(etype, value, tb))
1873+
return err_str
1874+
1875+
1876+
# Adapted from: https://github.com/sgl-project/sglang/blob/v0.4.1/python/sglang/srt/utils.py#L783 # noqa: E501
1877+
def make_zmq_socket(
1878+
ctx: Union[zmq.asyncio.Context, zmq.Context], # type: ignore[name-defined]
1879+
path: str,
1880+
type: Any,
1881+
) -> Union[zmq.Socket, zmq.asyncio.Socket]: # type: ignore[name-defined]
1882+
"""Make a ZMQ socket with the proper bind/connect semantics."""
1883+
1884+
mem = psutil.virtual_memory()
1885+
socket = ctx.socket(type)
1886+
1887+
# Calculate buffer size based on system memory
1888+
total_mem = mem.total / 1024**3
1889+
available_mem = mem.available / 1024**3
1890+
# For systems with substantial memory (>32GB total, >16GB available):
1891+
# - Set a large 0.5GB buffer to improve throughput
1892+
# For systems with less memory:
1893+
# - Use system default (-1) to avoid excessive memory consumption
1894+
if total_mem > 32 and available_mem > 16:
1895+
buf_size = int(0.5 * 1024**3) # 0.5GB in bytes
1896+
else:
1897+
buf_size = -1 # Use system default buffer size
1898+
1899+
if type == zmq.constants.PULL:
1900+
socket.setsockopt(zmq.constants.RCVHWM, 0)
1901+
socket.setsockopt(zmq.constants.RCVBUF, buf_size)
1902+
socket.connect(path)
1903+
elif type == zmq.constants.PUSH:
1904+
socket.setsockopt(zmq.constants.SNDHWM, 0)
1905+
socket.setsockopt(zmq.constants.SNDBUF, buf_size)
1906+
socket.bind(path)
1907+
else:
1908+
raise ValueError(f"Unknown Socket Type: {type}")
1909+
1910+
return socket
1911+
1912+
1913+
@contextlib.contextmanager
1914+
def zmq_socket_ctx(
1915+
path: str,
1916+
type: Any) -> Iterator[zmq.Socket]: # type: ignore[name-defined]
1917+
"""Context manager for a ZMQ socket"""
1918+
1919+
ctx = zmq.Context(io_threads=2) # type: ignore[attr-defined]
1920+
try:
1921+
yield make_zmq_socket(ctx, path, type)
1922+
1923+
except KeyboardInterrupt:
1924+
logger.debug("Got Keyboard Interrupt.")
1925+
1926+
finally:
1927+
ctx.destroy(linger=0)
1928+
1929+
1930+
def _check_multiproc_method():
1931+
if (cuda_is_initialized()
1932+
and os.environ.get("VLLM_WORKER_MULTIPROC_METHOD") != "spawn"):
1933+
logger.warning("CUDA was previously initialized. We must use "
1934+
"the `spawn` multiprocessing start method. Setting "
1935+
"VLLM_WORKER_MULTIPROC_METHOD to 'spawn'. "
1936+
"See https://docs.vllm.ai/en/latest/getting_started/"
1937+
"debugging.html#python-multiprocessing "
1938+
"for more information.")
1939+
os.environ["VLLM_WORKER_MULTIPROC_METHOD"] = "spawn"
1940+
1941+
1942+
def get_mp_context():
1943+
_check_multiproc_method()
1944+
mp_method = envs.VLLM_WORKER_MULTIPROC_METHOD
1945+
return multiprocessing.get_context(mp_method)

vllm/v1/engine/async_llm.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -75,11 +75,11 @@ def __init__(
7575

7676
# EngineCore (starts the engine in background process).
7777
self.engine_core = EngineCoreClient.make_client(
78-
vllm_config=vllm_config,
79-
executor_class=executor_class,
80-
usage_context=usage_context,
8178
multiprocess_mode=True,
8279
asyncio_mode=True,
80+
vllm_config=vllm_config,
81+
executor_class=executor_class,
82+
log_stats=self.log_stats,
8383
)
8484

8585
self.output_handler: Optional[asyncio.Task] = None

0 commit comments

Comments
 (0)