Skip to content

Commit 619ae26

Browse files
[V1] [2/n] Logging and Metrics - OutputProcessor Abstraction (#11973)
Signed-off-by: [email protected] <[email protected]>
1 parent d14e98d commit 619ae26

File tree

8 files changed

+450
-210
lines changed

8 files changed

+450
-210
lines changed

tests/v1/engine/test_async_llm.py

Lines changed: 58 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import asyncio
2-
from typing import Tuple
2+
from typing import List, Tuple
33

44
import pytest
55

@@ -13,6 +13,7 @@
1313
allow_module_level=True)
1414

1515
ENGINE_ARGS = AsyncEngineArgs(model="meta-llama/Llama-3.2-1B",
16+
enforce_eager=True,
1617
disable_log_requests=True)
1718

1819

@@ -53,17 +54,63 @@ async def test_load(monkeypatch):
5354
generate(engine, request_id, NUM_EXPECTED_TOKENS)))
5455

5556
# Confirm that we got all the EXPECTED tokens from the requests.
56-
failed_request_id = None
57-
tokens = None
5857
for task in tasks:
5958
num_generated_tokens, request_id = await task
60-
if (num_generated_tokens != NUM_EXPECTED_TOKENS
61-
and failed_request_id is None):
62-
failed_request_id = request_id
63-
tokens = num_generated_tokens
64-
65-
assert failed_request_id is None, (
66-
f"{failed_request_id} generated {tokens} but "
67-
f"expected {NUM_EXPECTED_TOKENS}")
59+
assert num_generated_tokens == NUM_EXPECTED_TOKENS, (
60+
f"{request_id} generated {num_generated_tokens} but "
61+
f"expected {NUM_EXPECTED_TOKENS}")
62+
63+
assert not engine.output_processor.has_unfinished_requests()
64+
engine.shutdown()
65+
66+
67+
@pytest.mark.asyncio
68+
async def test_abort(monkeypatch):
69+
70+
with monkeypatch.context() as m:
71+
m.setenv("VLLM_USE_V1", "1")
72+
73+
engine = AsyncLLM.from_engine_args(ENGINE_ARGS)
74+
75+
NUM_REQUESTS = 100
76+
NUM_EXPECTED_TOKENS = 100
77+
REQUEST_IDS_TO_ABORT = range(1, 100, 10)
78+
79+
request_ids = [f"request-{i}" for i in range(NUM_REQUESTS)]
80+
81+
# Create concurrent requests.
82+
tasks: List[asyncio.Task] = []
83+
for request_id in request_ids:
84+
tasks.append(
85+
asyncio.create_task(
86+
generate(engine, request_id, NUM_EXPECTED_TOKENS)))
87+
88+
# API server cancels requests when they disconnect.
89+
for idx in REQUEST_IDS_TO_ABORT:
90+
tasks[idx].cancel()
91+
await asyncio.sleep(0.1)
92+
93+
# Confirm the other requests are okay.
94+
for idx, task in enumerate(tasks):
95+
# Confirm that it was actually canceled.
96+
if idx in REQUEST_IDS_TO_ABORT:
97+
with pytest.raises(asyncio.CancelledError):
98+
await task
99+
else:
100+
# Otherwise, make sure the request was not impacted.
101+
num_generated_tokens, request_id = await task
102+
assert num_generated_tokens == NUM_EXPECTED_TOKENS, (
103+
f"{request_id} generated {num_generated_tokens} but "
104+
f"expected {NUM_EXPECTED_TOKENS}")
105+
106+
assert not engine.output_processor.has_unfinished_requests()
107+
108+
# Confirm we can do another generation.
109+
request_id = f"request-{REQUEST_IDS_TO_ABORT[0]}"
110+
task = asyncio.create_task(
111+
generate(engine, request_id, NUM_EXPECTED_TOKENS))
112+
num_generated_tokens, request_id = await task
113+
assert num_generated_tokens == NUM_EXPECTED_TOKENS
114+
assert not engine.output_processor.has_unfinished_requests()
68115

69116
engine.shutdown()

tests/v1/engine/test_detokenizer.py renamed to tests/v1/engine/test_output_processor.py

Lines changed: 88 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,18 @@
33
import pytest
44
from transformers import AutoTokenizer
55

6+
from vllm.engine.arg_utils import EngineArgs
67
from vllm.sampling_params import RequestOutputKind, SamplingParams
8+
from vllm.transformers_utils.tokenizer_group import init_tokenizer_from_configs
79
from vllm.v1.engine import EngineCoreOutput, EngineCoreRequest
8-
from vllm.v1.engine.detokenizer import Detokenizer
10+
from vllm.v1.engine.output_processor import OutputProcessor
911

1012
TOKENIZER_NAME = "mistralai/Mistral-7B-Instruct-v0.3"
13+
VLLM_CONFIG = EngineArgs(model=TOKENIZER_NAME).create_engine_config()
14+
TOKENIZER_GROUP = init_tokenizer_from_configs(VLLM_CONFIG.model_config,
15+
VLLM_CONFIG.scheduler_config,
16+
VLLM_CONFIG.parallel_config,
17+
VLLM_CONFIG.lora_config)
1118
tokenizer = AutoTokenizer.from_pretrained(TOKENIZER_NAME)
1219

1320
FULL_STRINGS = [
@@ -66,7 +73,7 @@ def get_outputs(self) -> List[EngineCoreOutput]:
6673
"request_output_kind",
6774
[RequestOutputKind.DELTA, RequestOutputKind.FINAL_ONLY])
6875
def test_incremental_detokenization(request_output_kind: RequestOutputKind):
69-
detokenizer = Detokenizer(TOKENIZER_NAME)
76+
output_processor = OutputProcessor(TOKENIZER_GROUP, log_stats=False)
7077
engine_core = MockEngineCore(GENERATION_TOKENS)
7178

7279
# Make N requests.
@@ -93,7 +100,7 @@ def test_incremental_detokenization(request_output_kind: RequestOutputKind):
93100

94101
# Add requests to the detokenizer.
95102
for request in requests:
96-
detokenizer.add_request(request)
103+
output_processor.add_request(request)
97104

98105
gen_strings = {}
99106
gen_tokens = {}
@@ -104,7 +111,9 @@ def test_incremental_detokenization(request_output_kind: RequestOutputKind):
104111
break
105112

106113
# Step the Detokenizer.
107-
request_outputs, requests_to_abort = detokenizer.step(outputs)
114+
processed_outputs = output_processor.process_outputs(outputs, )
115+
request_outputs = processed_outputs.request_outputs
116+
requests_to_abort = processed_outputs.reqs_to_abort
108117
assert len(requests_to_abort) == 0
109118

110119
# Update tracking.
@@ -128,13 +137,13 @@ def test_incremental_detokenization(request_output_kind: RequestOutputKind):
128137
assert gen_str == ref_gen_str, f"{gen_str=}, {ref_gen_str=}"
129138
assert gen_toks == ref_gen_toks, f"{gen_toks=}, {ref_gen_toks=}"
130139

131-
assert detokenizer.get_num_unfinished_requests() == 0
132-
assert not detokenizer.has_unfinished_requests()
140+
assert output_processor.get_num_unfinished_requests() == 0
141+
assert not output_processor.has_unfinished_requests()
133142

134143

135144
@pytest.mark.parametrize("include_stop_str_in_output", [True, False])
136145
def test_stop_string(include_stop_str_in_output: bool):
137-
detokenizer = Detokenizer(TOKENIZER_NAME)
146+
output_processor = OutputProcessor(TOKENIZER_GROUP, log_stats=False)
138147
engine_core = MockEngineCore(GENERATION_TOKENS)
139148

140149
# Make N requests.
@@ -162,7 +171,7 @@ def test_stop_string(include_stop_str_in_output: bool):
162171

163172
# Add requests to the detokenizer.
164173
for request in requests:
165-
detokenizer.add_request(request)
174+
output_processor.add_request(request)
166175

167176
gen_strings = {}
168177
aborted = []
@@ -173,7 +182,9 @@ def test_stop_string(include_stop_str_in_output: bool):
173182
break
174183

175184
# Step the Detokenizer.
176-
request_outputs, requests_to_abort = detokenizer.step(outputs)
185+
processed_outputs = output_processor.process_outputs(outputs)
186+
request_outputs = processed_outputs.request_outputs
187+
requests_to_abort = processed_outputs.reqs_to_abort
177188
for request_output in request_outputs:
178189
# If aborted, we should not get a request output.
179190
assert request_output.request_id not in aborted
@@ -214,5 +225,71 @@ def test_stop_string(include_stop_str_in_output: bool):
214225
assert gen_str == ref_str_exc_stop, (
215226
f"{gen_str=}, {ref_str_exc_stop=}")
216227

217-
assert detokenizer.get_num_unfinished_requests() == 0
218-
assert not detokenizer.has_unfinished_requests()
228+
assert output_processor.get_num_unfinished_requests() == 0
229+
assert not output_processor.has_unfinished_requests()
230+
231+
232+
def test_iteration_stats():
233+
output_processor = OutputProcessor(TOKENIZER_GROUP, log_stats=True)
234+
engine_core = MockEngineCore(GENERATION_TOKENS)
235+
236+
# Make N requests.
237+
requests = [
238+
EngineCoreRequest(
239+
request_id=f"request-{idx}",
240+
prompt=prompt,
241+
prompt_token_ids=prompt_tokens,
242+
arrival_time=0,
243+
mm_inputs=None,
244+
mm_hashes=None,
245+
mm_placeholders=None,
246+
eos_token_id=None,
247+
lora_request=None,
248+
sampling_params=SamplingParams(),
249+
) for idx, (
250+
prompt,
251+
prompt_tokens) in enumerate(zip(PROMPT_STRINGS, PROMPT_TOKENS))
252+
]
253+
254+
# Add all requests except one to the OutputProcessor.
255+
num_active = len(GENERATION_TOKENS) - 1
256+
for request in requests[:num_active]:
257+
output_processor.add_request(request)
258+
inactive_request = requests[num_active]
259+
260+
# First iteration has 2 prefills.
261+
outputs = engine_core.get_outputs()[:num_active]
262+
processed_outputs = output_processor.process_outputs(outputs)
263+
iteration_stats = processed_outputs.iteration_stats
264+
total_prompt_tokens = sum(
265+
[len(prompt_tokens) for prompt_tokens in PROMPT_TOKENS[:num_active]])
266+
267+
assert iteration_stats.num_prompt_tokens == total_prompt_tokens
268+
assert iteration_stats.num_generation_tokens == num_active
269+
270+
# Just decodes in this step.
271+
outputs = engine_core.get_outputs()[:num_active]
272+
processed_outputs = output_processor.process_outputs(outputs)
273+
iteration_stats = processed_outputs.iteration_stats
274+
275+
assert iteration_stats.num_prompt_tokens == 0
276+
assert iteration_stats.num_generation_tokens == num_active
277+
278+
# Add a new request - prefill and 2 decodes in this step.
279+
output_processor.add_request(inactive_request)
280+
num_active += 1
281+
outputs = engine_core.get_outputs()[:num_active]
282+
processed_outputs = output_processor.process_outputs(outputs)
283+
iteration_stats = processed_outputs.iteration_stats
284+
total_prompt_tokens = len(PROMPT_TOKENS[num_active - 1])
285+
286+
assert iteration_stats.num_prompt_tokens == total_prompt_tokens
287+
assert iteration_stats.num_generation_tokens == num_active
288+
289+
# Just decodes in this step.
290+
outputs = engine_core.get_outputs()[:num_active]
291+
processed_outputs = output_processor.process_outputs(outputs)
292+
iteration_stats = processed_outputs.iteration_stats
293+
294+
assert iteration_stats.num_prompt_tokens == 0
295+
assert iteration_stats.num_generation_tokens == num_active

0 commit comments

Comments
 (0)