|
9 | 9 | import torch
|
10 | 10 | import torch.distributed as dist
|
11 | 11 | from torch.distributed import ProcessGroup
|
| 12 | +from zmq import IPV6 # type: ignore |
12 | 13 | from zmq import SUB, SUBSCRIBE, XPUB, XPUB_VERBOSE, Context # type: ignore
|
13 | 14 |
|
14 | 15 | import vllm.envs as envs
|
15 | 16 | from vllm.logger import init_logger
|
16 |
| -from vllm.utils import get_ip, get_open_port |
| 17 | +from vllm.utils import get_ip, get_open_port, is_valid_ipv6_address |
17 | 18 |
|
18 | 19 | VLLM_RINGBUFFER_WARNING_INTERVAL = envs.VLLM_RINGBUFFER_WARNING_INTERVAL
|
19 | 20 |
|
@@ -214,6 +215,8 @@ def __init__(
|
214 | 215 | self.remote_socket = context.socket(XPUB)
|
215 | 216 | self.remote_socket.setsockopt(XPUB_VERBOSE, True)
|
216 | 217 | remote_subscribe_port = get_open_port()
|
| 218 | + if is_valid_ipv6_address(connect_ip): |
| 219 | + self.remote_socket.setsockopt(IPV6, 1) |
217 | 220 | socket_addr = f"tcp://*:{remote_subscribe_port}"
|
218 | 221 | self.remote_socket.bind(socket_addr)
|
219 | 222 |
|
@@ -274,6 +277,8 @@ def create_from_handle(handle: Handle, rank) -> "MessageQueue":
|
274 | 277 |
|
275 | 278 | self.remote_socket = context.socket(SUB)
|
276 | 279 | self.remote_socket.setsockopt_string(SUBSCRIBE, "")
|
| 280 | + if is_valid_ipv6_address(handle.connect_ip): |
| 281 | + self.remote_socket.setsockopt(IPV6, 1) |
277 | 282 | socket_addr = f"tcp://{handle.connect_ip}:{handle.remote_subscribe_port}"
|
278 | 283 | logger.debug("Connecting to %s", socket_addr)
|
279 | 284 | self.remote_socket.connect(socket_addr)
|
|
0 commit comments