4
4
from copy import copy
5
5
from typing import Any , Callable , Optional , Union
6
6
7
- from typing_extensions import TypeVar
8
-
9
7
import vllm .envs as envs
8
+
9
+ from typing_extensions import TypeVar
10
10
from vllm .config import ParallelConfig , VllmConfig
11
11
from vllm .distributed import stateless_destroy_torch_distributed_process_group
12
12
from vllm .engine .arg_utils import EngineArgs
19
19
from vllm .prompt_adapter .request import PromptAdapterRequest
20
20
from vllm .sampling_params import SamplingParams
21
21
from vllm .transformers_utils .tokenizer_group import (
22
- BaseTokenizerGroup , init_tokenizer_from_configs )
22
+ BaseTokenizerGroup ,
23
+ init_tokenizer_from_configs ,
24
+ )
23
25
from vllm .usage .usage_lib import UsageContext
24
26
from vllm .utils import Device
25
27
from vllm .v1 .engine .core_client import EngineCoreClient
28
30
from vllm .v1 .engine .processor import Processor
29
31
from vllm .v1 .executor .abstract import Executor
30
32
from vllm .v1 .metrics .loggers import StatLoggerFactory
33
+ from vllm .v1 .utils import report_usage_stats
31
34
32
35
logger = init_logger (__name__ )
33
36
@@ -54,12 +57,14 @@ def __init__(
54
57
"Using V1 LLMEngine, but envs.VLLM_USE_V1=False. "
55
58
"This should not happen. As a workaround, try using "
56
59
"LLMEngine.from_vllm_config(...) or explicitly set "
57
- "VLLM_USE_V1=0 or 1 and report this issue on Github." )
60
+ "VLLM_USE_V1=0 or 1 and report this issue on Github."
61
+ )
58
62
59
63
if stat_loggers is not None :
60
64
raise NotImplementedError (
61
65
"Passing StatLoggers to LLMEngine in V1 is not yet supported. "
62
- "Set VLLM_USE_V1=0 and file and issue on Github." )
66
+ "Set VLLM_USE_V1=0 and file and issue on Github."
67
+ )
63
68
64
69
self .vllm_config = vllm_config
65
70
self .model_config = vllm_config .model_config
@@ -79,17 +84,17 @@ def __init__(
79
84
model_config = vllm_config .model_config ,
80
85
scheduler_config = vllm_config .scheduler_config ,
81
86
parallel_config = vllm_config .parallel_config ,
82
- lora_config = vllm_config .lora_config )
87
+ lora_config = vllm_config .lora_config ,
88
+ )
83
89
self .tokenizer .ping ()
84
90
85
91
# Processor (convert Inputs --> EngineCoreRequests)
86
- self .processor = Processor (vllm_config = vllm_config ,
87
- tokenizer = self .tokenizer ,
88
- mm_registry = mm_registry )
92
+ self .processor = Processor (
93
+ vllm_config = vllm_config , tokenizer = self .tokenizer , mm_registry = mm_registry
94
+ )
89
95
90
96
# OutputProcessor (convert EngineCoreOutputs --> RequestOutput).
91
- self .output_processor = OutputProcessor (self .tokenizer ,
92
- log_stats = False )
97
+ self .output_processor = OutputProcessor (self .tokenizer , log_stats = False )
93
98
94
99
# EngineCore (gets EngineCoreRequests and gives EngineCoreOutputs)
95
100
self .engine_core = EngineCoreClient .make_client (
@@ -104,6 +109,9 @@ def __init__(
104
109
# for v0 compatibility
105
110
self .model_executor = self .engine_core .engine_core .model_executor # type: ignore
106
111
112
+ # If usage stat is enabled, collect relevant info.
113
+ report_usage_stats (vllm_config , usage_context )
114
+
107
115
@classmethod
108
116
def from_vllm_config (
109
117
cls ,
@@ -112,12 +120,14 @@ def from_vllm_config(
112
120
stat_loggers : Optional [list [StatLoggerFactory ]] = None ,
113
121
disable_log_stats : bool = False ,
114
122
) -> "LLMEngine" :
115
- return cls (vllm_config = vllm_config ,
116
- executor_class = Executor .get_class (vllm_config ),
117
- log_stats = (not disable_log_stats ),
118
- usage_context = usage_context ,
119
- stat_loggers = stat_loggers ,
120
- multiprocess_mode = envs .VLLM_ENABLE_V1_MULTIPROCESSING )
123
+ return cls (
124
+ vllm_config = vllm_config ,
125
+ executor_class = Executor .get_class (vllm_config ),
126
+ log_stats = (not disable_log_stats ),
127
+ usage_context = usage_context ,
128
+ stat_loggers = stat_loggers ,
129
+ multiprocess_mode = envs .VLLM_ENABLE_V1_MULTIPROCESSING ,
130
+ )
121
131
122
132
@classmethod
123
133
def from_engine_args (
@@ -138,12 +148,14 @@ def from_engine_args(
138
148
enable_multiprocessing = True
139
149
140
150
# Create the LLMEngine.
141
- return cls (vllm_config = vllm_config ,
142
- executor_class = executor_class ,
143
- log_stats = not engine_args .disable_log_stats ,
144
- usage_context = usage_context ,
145
- stat_loggers = stat_loggers ,
146
- multiprocess_mode = enable_multiprocessing )
151
+ return cls (
152
+ vllm_config = vllm_config ,
153
+ executor_class = executor_class ,
154
+ log_stats = not engine_args .disable_log_stats ,
155
+ usage_context = usage_context ,
156
+ stat_loggers = stat_loggers ,
157
+ multiprocess_mode = enable_multiprocessing ,
158
+ )
147
159
148
160
def get_num_unfinished_requests (self ) -> int :
149
161
return self .output_processor .get_num_unfinished_requests ()
@@ -156,7 +168,8 @@ def has_unfinished_requests(self) -> bool:
156
168
157
169
def has_unfinished_requests_dp (self , has_unfinished : bool ) -> bool :
158
170
aggregated_has_unfinished = ParallelConfig .has_unfinished_dp (
159
- self .dp_group , has_unfinished )
171
+ self .dp_group , has_unfinished
172
+ )
160
173
if not has_unfinished and aggregated_has_unfinished :
161
174
self .should_execute_dummy_batch = True
162
175
return aggregated_has_unfinished
@@ -183,11 +196,16 @@ def add_request(
183
196
priority : int = 0 ,
184
197
) -> None :
185
198
# Process raw inputs into the request.
186
- request = self .processor .process_inputs (request_id , prompt , params ,
187
- arrival_time , lora_request ,
188
- trace_headers ,
189
- prompt_adapter_request ,
190
- priority )
199
+ request = self .processor .process_inputs (
200
+ request_id ,
201
+ prompt ,
202
+ params ,
203
+ arrival_time ,
204
+ lora_request ,
205
+ trace_headers ,
206
+ prompt_adapter_request ,
207
+ priority ,
208
+ )
191
209
192
210
n = params .n if isinstance (params , SamplingParams ) else 1
193
211
@@ -222,8 +240,7 @@ def step(self) -> list[RequestOutput]:
222
240
outputs = self .engine_core .get_output ()
223
241
224
242
# 2) Process EngineCoreOutputs.
225
- processed_outputs = self .output_processor .process_outputs (
226
- outputs .outputs )
243
+ processed_outputs = self .output_processor .process_outputs (outputs .outputs )
227
244
228
245
# 3) Abort any reqs that finished due to stop strings.
229
246
self .engine_core .abort_requests (processed_outputs .reqs_to_abort )
@@ -261,12 +278,15 @@ def get_tokenizer_group(
261
278
tokenizer_group = self .tokenizer
262
279
263
280
if tokenizer_group is None :
264
- raise ValueError ("Unable to get tokenizer because "
265
- "skip_tokenizer_init is True" )
281
+ raise ValueError (
282
+ "Unable to get tokenizer because " "skip_tokenizer_init is True"
283
+ )
266
284
if not isinstance (tokenizer_group , group_type ):
267
- raise TypeError ("Invalid type of tokenizer group. "
268
- f"Expected type: { group_type } , but "
269
- f"found type: { type (tokenizer_group )} " )
285
+ raise TypeError (
286
+ "Invalid type of tokenizer group. "
287
+ f"Expected type: { group_type } , but "
288
+ f"found type: { type (tokenizer_group )} "
289
+ )
270
290
271
291
return tokenizer_group
272
292
@@ -286,11 +306,13 @@ def pin_lora(self, lora_id: int) -> bool:
286
306
"""Prevent an adapter from being evicted."""
287
307
return self .engine_core .pin_lora (lora_id )
288
308
289
- def collective_rpc (self ,
290
- method : Union [str , Callable [..., _R ]],
291
- timeout : Optional [float ] = None ,
292
- args : tuple = (),
293
- kwargs : Optional [dict [str , Any ]] = None ) -> list [_R ]:
309
+ def collective_rpc (
310
+ self ,
311
+ method : Union [str , Callable [..., _R ]],
312
+ timeout : Optional [float ] = None ,
313
+ args : tuple = (),
314
+ kwargs : Optional [dict [str , Any ]] = None ,
315
+ ) -> list [_R ]:
294
316
return self .engine_core .collective_rpc (method , timeout , args , kwargs )
295
317
296
318
def __del__ (self ):
0 commit comments