1
1
# SPDX-License-Identifier: Apache-2.0
2
2
3
- import pickle
4
3
import queue
5
4
import signal
6
5
import threading
7
6
import time
8
7
from multiprocessing .connection import Connection
9
- from typing import List , Tuple , Type
8
+ from typing import Any , List , Tuple , Type
10
9
11
10
import psutil
12
11
import zmq
19
18
from vllm .utils import get_exception_traceback , zmq_socket_ctx
20
19
from vllm .v1 .core .kv_cache_utils import get_kv_cache_config
21
20
from vllm .v1 .core .scheduler import Scheduler
22
- from vllm .v1 .engine import (EngineCoreOutputs , EngineCoreProfile ,
23
- EngineCoreRequest , EngineCoreRequestType ,
24
- EngineCoreRequestUnion , EngineCoreResetPrefixCache )
21
+ from vllm .v1 .engine import (EngineCoreOutputs , EngineCoreRequest ,
22
+ EngineCoreRequestType )
25
23
from vllm .v1 .engine .mm_input_mapper import MMInputMapperServer
26
24
from vllm .v1 .executor .abstract import Executor
27
25
from vllm .v1 .request import Request , RequestStatus
28
- from vllm .v1 .serial_utils import MsgpackEncoder , PickleEncoder
26
+ from vllm .v1 .serial_utils import MsgpackDecoder , MsgpackEncoder
29
27
from vllm .version import __version__ as VLLM_VERSION
30
28
31
29
logger = init_logger (__name__ )
@@ -161,7 +159,8 @@ def __init__(
161
159
# and to overlap some serialization/deserialization with the
162
160
# model forward pass.
163
161
# Threads handle Socket <-> Queues and core_busy_loop uses Queue.
164
- self .input_queue : queue .Queue [EngineCoreRequestUnion ] = queue .Queue ()
162
+ self .input_queue : queue .Queue [Tuple [EngineCoreRequestType ,
163
+ Any ]] = queue .Queue ()
165
164
self .output_queue : queue .Queue [EngineCoreOutputs ] = queue .Queue ()
166
165
threading .Thread (target = self .process_input_socket ,
167
166
args = (input_path , ),
@@ -223,7 +222,7 @@ def run_busy_loop(self):
223
222
while True :
224
223
try :
225
224
req = self .input_queue .get (timeout = POLLING_TIMEOUT_S )
226
- self ._handle_client_request (req )
225
+ self ._handle_client_request (* req )
227
226
break
228
227
except queue .Empty :
229
228
logger .debug ("EngineCore busy loop waiting." )
@@ -233,59 +232,51 @@ def run_busy_loop(self):
233
232
except BaseException :
234
233
raise
235
234
236
- # 2) Handle any new client requests (Abort or Add) .
235
+ # 2) Handle any new client requests.
237
236
while not self .input_queue .empty ():
238
237
req = self .input_queue .get_nowait ()
239
- self ._handle_client_request (req )
238
+ self ._handle_client_request (* req )
240
239
241
240
# 3) Step the engine core.
242
241
outputs = self .step ()
243
242
244
243
# 5) Put EngineCoreOutputs into the output queue.
245
244
self .output_queue .put_nowait (outputs )
246
245
247
- def _handle_client_request (self , request : EngineCoreRequestUnion ) -> None :
248
- """Handle EngineCoreRequest or EngineCoreABORT from Client."""
246
+ def _handle_client_request (self , request_type : EngineCoreRequestType ,
247
+ request : Any ) -> None :
248
+ """Dispatch request from client."""
249
249
250
- if isinstance ( request , EngineCoreRequest ) :
250
+ if request_type == EngineCoreRequestType . ADD :
251
251
self .add_request (request )
252
- elif isinstance (request , EngineCoreProfile ):
253
- self .model_executor .profile (request .is_start )
254
- elif isinstance (request , EngineCoreResetPrefixCache ):
255
- self .reset_prefix_cache ()
256
- else :
257
- # TODO: make an EngineCoreAbort wrapper
258
- assert isinstance (request , list )
252
+ elif request_type == EngineCoreRequestType .ABORT :
259
253
self .abort_requests (request )
254
+ elif request_type == EngineCoreRequestType .RESET_PREFIX_CACHE :
255
+ self .reset_prefix_cache ()
256
+ elif request_type == EngineCoreRequestType .PROFILE :
257
+ self .model_executor .profile (request )
260
258
261
259
def process_input_socket (self , input_path : str ):
262
260
"""Input socket IO thread."""
263
261
264
262
# Msgpack serialization decoding.
265
- decoder_add_req = PickleEncoder ( )
266
- decoder_abort_req = PickleEncoder ()
263
+ add_request_decoder = MsgpackDecoder ( EngineCoreRequest )
264
+ generic_decoder = MsgpackDecoder ()
267
265
268
266
with zmq_socket_ctx (input_path , zmq .constants .PULL ) as socket :
269
267
while True :
270
268
# (RequestType, RequestData)
271
269
type_frame , data_frame = socket .recv_multipart (copy = False )
272
- request_type = type_frame .buffer
273
- request_data = data_frame .buffer
270
+ request_type = EngineCoreRequestType (bytes (type_frame .buffer ))
274
271
275
272
# Deserialize the request data.
276
- if request_type == EngineCoreRequestType .ADD .value :
277
- request = decoder_add_req .decode (request_data )
278
- elif request_type == EngineCoreRequestType .ABORT .value :
279
- request = decoder_abort_req .decode (request_data )
280
- elif request_type in (
281
- EngineCoreRequestType .PROFILE .value ,
282
- EngineCoreRequestType .RESET_PREFIX_CACHE .value ):
283
- request = pickle .loads (request_data )
284
- else :
285
- raise ValueError (f"Unknown RequestType: { request_type } " )
273
+ decoder = add_request_decoder if (
274
+ request_type
275
+ == EngineCoreRequestType .ADD ) else generic_decoder
276
+ request = decoder .decode (data_frame .buffer )
286
277
287
278
# Push to input queue for core busy loop.
288
- self .input_queue .put_nowait (request )
279
+ self .input_queue .put_nowait (( request_type , request ) )
289
280
290
281
def process_output_socket (self , output_path : str ):
291
282
"""Output socket IO thread."""
0 commit comments