Skip to content

Commit 9bcbfe4

Browse files
authored
[Metrics] Handle vLLM streaming response in streaming server (#518)
- Update streaming integration test when the response includes usage, the DONE message is returned together with the last message. The end of stream contains empty message.
1 parent 03d8584 commit 9bcbfe4

File tree

3 files changed

+127
-53
lines changed

3 files changed

+127
-53
lines changed

Diff for: pkg/epp/handlers/response.go

+46-32
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,11 @@ import (
3030
logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/logging"
3131
)
3232

33+
const (
34+
streamingRespPrefix = "data: "
35+
streamingEndMsg = "data: [DONE]"
36+
)
37+
3338
// HandleResponseHeaders processes response headers from the backend model server.
3439
func (s *Server) HandleResponseHeaders(
3540
ctx context.Context,
@@ -197,39 +202,10 @@ func (s *Server) HandleStreaming(
197202
body *extProcPb.ProcessingRequest_ResponseBody,
198203
loggerVerbose logr.Logger,
199204
) error {
200-
respPrefix := "data: "
201205
responseText := string(body.ResponseBody.Body)
202-
// Example message if "stream_options": {"include_usage": "true"} is included in the request:
203-
// data: {"id":"...","object":"text_completion","created":1739400043,"model":"tweet-summary-0","choices":[],
204-
// "usage":{"prompt_tokens":7,"total_tokens":17,"completion_tokens":10}}
205-
//
206-
// data: [DONE]
207-
//
208-
// Noticed that vLLM returns two entries in one response.
209-
// We need to strip the `data:` prefix and next Data: [DONE] from the message to fetch response data.
210-
//
211-
// If include_usage is not included in the request, `data: [DONE]` is returned separately, which
212-
// indicates end of streaming.
213-
if strings.Contains(responseText, "data: [DONE]") {
214-
response := Response{}
215-
216-
lines := strings.Split(responseText, "\n")
217-
for _, line := range lines {
218-
if !strings.HasPrefix(line, respPrefix) {
219-
continue
220-
}
221-
content := strings.TrimPrefix(line, respPrefix)
222-
if content == "[DONE]" {
223-
continue
224-
}
225-
226-
byteSlice := []byte(content)
227-
if err := json.Unmarshal(byteSlice, &response); err != nil {
228-
loggerVerbose.Error(err, "unmarshaling response body")
229-
continue
230-
}
231-
}
232-
reqCtx.Response = response
206+
if strings.Contains(responseText, streamingEndMsg) {
207+
parsedResp := ParseRespForUsage(ctx, responseText, loggerVerbose)
208+
reqCtx.Response = parsedResp
233209
}
234210

235211
if body.ResponseBody.EndOfStream {
@@ -242,6 +218,44 @@ func (s *Server) HandleStreaming(
242218
return nil
243219
}
244220

221+
// Example message if "stream_options": {"include_usage": "true"} is included in the request:
222+
// data: {"id":"...","object":"text_completion","created":1739400043,"model":"tweet-summary-0","choices":[],
223+
// "usage":{"prompt_tokens":7,"total_tokens":17,"completion_tokens":10}}
224+
//
225+
// data: [DONE]
226+
//
227+
// Noticed that vLLM returns two entries in one response.
228+
// We need to strip the `data:` prefix and next Data: [DONE] from the message to fetch response data.
229+
//
230+
// If include_usage is not included in the request, `data: [DONE]` is returned separately, which
231+
// indicates end of streaming.
232+
func ParseRespForUsage(
233+
ctx context.Context,
234+
responseText string,
235+
loggerVerbose logr.Logger,
236+
) Response {
237+
response := Response{}
238+
239+
lines := strings.Split(responseText, "\n")
240+
for _, line := range lines {
241+
if !strings.HasPrefix(line, streamingRespPrefix) {
242+
continue
243+
}
244+
content := strings.TrimPrefix(line, streamingRespPrefix)
245+
if content == "[DONE]" {
246+
continue
247+
}
248+
249+
byteSlice := []byte(content)
250+
if err := json.Unmarshal(byteSlice, &response); err != nil {
251+
loggerVerbose.Error(err, "unmarshaling response body")
252+
continue
253+
}
254+
}
255+
256+
return response
257+
}
258+
245259
type Response struct {
246260
Usage Usage `json:"usage"`
247261
}

Diff for: pkg/epp/handlers/streamingserver.go

+28
Original file line numberDiff line numberDiff line change
@@ -157,6 +157,17 @@ func (s *StreamingServer) Process(srv extProcPb.ExternalProcessor_ProcessServer)
157157
case *extProcPb.ProcessingRequest_ResponseBody:
158158
if reqCtx.modelServerStreaming {
159159
// Currently we punt on response parsing if the modelServer is streaming, and we just passthrough.
160+
161+
responseText := string(v.ResponseBody.Body)
162+
s.HandleResponseBodyModelStreaming(ctx, reqCtx, responseText)
163+
if v.ResponseBody.EndOfStream {
164+
loggerVerbose.Info("streaming is completed")
165+
166+
reqCtx.ResponseCompleteTimestamp = time.Now()
167+
metrics.RecordRequestLatencies(ctx, reqCtx.Model, reqCtx.ResolvedTargetModel, reqCtx.RequestReceivedTimestamp, reqCtx.ResponseCompleteTimestamp)
168+
metrics.RecordResponseSizes(reqCtx.Model, reqCtx.ResolvedTargetModel, reqCtx.ResponseSize)
169+
}
170+
160171
reqCtx.respBodyResp = &extProcPb.ProcessingResponse{
161172
Response: &extProcPb.ProcessingResponse_ResponseBody{
162173
ResponseBody: &extProcPb.BodyResponse{
@@ -526,3 +537,20 @@ func (s *StreamingServer) HandleResponseBody(
526537
}
527538
return reqCtx, nil
528539
}
540+
541+
// The function is to handle streaming response if the modelServer is streaming.
542+
func (s *StreamingServer) HandleResponseBodyModelStreaming(
543+
ctx context.Context,
544+
reqCtx *StreamingRequestContext,
545+
responseText string,
546+
) {
547+
logger := log.FromContext(ctx)
548+
loggerVerbose := logger.V(logutil.VERBOSE)
549+
loggerVerbose.Info("Processing HandleResponseBody")
550+
551+
if strings.Contains(responseText, streamingEndMsg) {
552+
resp := ParseRespForUsage(ctx, responseText, loggerVerbose)
553+
metrics.RecordInputTokens(reqCtx.Model, reqCtx.ResolvedTargetModel, resp.Usage.PromptTokens)
554+
metrics.RecordOutputTokens(reqCtx.Model, reqCtx.ResolvedTargetModel, resp.Usage.CompletionTokens)
555+
}
556+
}

Diff for: test/integration/epp/hermetic_test.go

+53-21
Original file line numberDiff line numberDiff line change
@@ -403,7 +403,7 @@ func TestFullDuplexStreamed_KubeInferenceModelRequest(t *testing.T) {
403403
requests []*extProcPb.ProcessingRequest
404404
pods map[backendmetrics.Pod]*backendmetrics.Metrics
405405
wantResponses []*extProcPb.ProcessingResponse
406-
wantMetrics string
406+
wantMetrics map[string]string
407407
wantErr bool
408408
immediateResponse *extProcPb.ImmediateResponse
409409
}{
@@ -426,11 +426,11 @@ func TestFullDuplexStreamed_KubeInferenceModelRequest(t *testing.T) {
426426
KVCacheUsagePercent: 0.2,
427427
},
428428
},
429-
wantMetrics: `
429+
wantMetrics: map[string]string{`inference_model_request_total`: `
430430
# HELP inference_model_request_total [ALPHA] Counter of inference model requests broken out for each model and target model.
431431
# TYPE inference_model_request_total counter
432432
inference_model_request_total{model_name="my-model",target_model_name="my-model-12345"} 1
433-
`,
433+
`},
434434
wantErr: false,
435435
wantResponses: []*extProcPb.ProcessingResponse{
436436
{
@@ -507,11 +507,11 @@ func TestFullDuplexStreamed_KubeInferenceModelRequest(t *testing.T) {
507507
},
508508
},
509509
},
510-
wantMetrics: `
510+
wantMetrics: map[string]string{`inference_model_request_total`: `
511511
# HELP inference_model_request_total [ALPHA] Counter of inference model requests broken out for each model and target model.
512512
# TYPE inference_model_request_total counter
513513
inference_model_request_total{model_name="sql-lora",target_model_name="sql-lora-1fdg2"} 1
514-
`,
514+
`},
515515
wantErr: false,
516516
wantResponses: []*extProcPb.ProcessingResponse{
517517
{
@@ -588,11 +588,11 @@ func TestFullDuplexStreamed_KubeInferenceModelRequest(t *testing.T) {
588588
},
589589
},
590590
},
591-
wantMetrics: `
591+
wantMetrics: map[string]string{`inference_model_request_total`: `
592592
# HELP inference_model_request_total [ALPHA] Counter of inference model requests broken out for each model and target model.
593593
# TYPE inference_model_request_total counter
594594
inference_model_request_total{model_name="sql-lora",target_model_name="sql-lora-1fdg2"} 1
595-
`,
595+
`},
596596
wantErr: false,
597597
wantResponses: []*extProcPb.ProcessingResponse{
598598
{
@@ -671,7 +671,7 @@ func TestFullDuplexStreamed_KubeInferenceModelRequest(t *testing.T) {
671671
},
672672
},
673673
wantErr: false,
674-
wantMetrics: "",
674+
wantMetrics: map[string]string{},
675675
wantResponses: []*extProcPb.ProcessingResponse{
676676
{
677677
Response: &extProcPb.ProcessingResponse_ImmediateResponse{
@@ -715,11 +715,11 @@ func TestFullDuplexStreamed_KubeInferenceModelRequest(t *testing.T) {
715715
},
716716
},
717717
},
718-
wantMetrics: `
718+
wantMetrics: map[string]string{`inference_model_request_total`: `
719719
# HELP inference_model_request_total [ALPHA] Counter of inference model requests broken out for each model and target model.
720720
# TYPE inference_model_request_total counter
721721
inference_model_request_total{model_name="sql-lora-sheddable",target_model_name="sql-lora-1fdg3"} 1
722-
`,
722+
`},
723723
wantErr: false,
724724
wantResponses: []*extProcPb.ProcessingResponse{
725725
{
@@ -823,11 +823,11 @@ func TestFullDuplexStreamed_KubeInferenceModelRequest(t *testing.T) {
823823
},
824824
},
825825
},
826-
wantMetrics: `
826+
wantMetrics: map[string]string{`inference_model_request_total`: `
827827
# HELP inference_model_request_total [ALPHA] Counter of inference model requests broken out for each model and target model.
828828
# TYPE inference_model_request_total counter
829829
inference_model_request_total{model_name="sql-lora-sheddable",target_model_name="sql-lora-1fdg3"} 1
830-
`,
830+
`},
831831
wantErr: false,
832832
wantResponses: []*extProcPb.ProcessingResponse{
833833
{
@@ -931,11 +931,11 @@ func TestFullDuplexStreamed_KubeInferenceModelRequest(t *testing.T) {
931931
},
932932
},
933933
},
934-
wantMetrics: `
934+
wantMetrics: map[string]string{`inference_model_request_total`: `
935935
# HELP inference_model_request_total [ALPHA] Counter of inference model requests broken out for each model and target model.
936936
# TYPE inference_model_request_total counter
937937
inference_model_request_total{model_name="direct-model",target_model_name="direct-model"} 1
938-
`,
938+
`},
939939
wantErr: false,
940940
wantResponses: []*extProcPb.ProcessingResponse{
941941
{
@@ -1233,19 +1233,47 @@ func TestFullDuplexStreamed_KubeInferenceModelRequest(t *testing.T) {
12331233
{
12341234
Request: &extProcPb.ProcessingRequest_ResponseBody{
12351235
ResponseBody: &extProcPb.HttpBody{
1236-
Body: []byte(`data: {"id":"cmpl-0fee233f-7d56-404a-acd3-4dad775d03d9","object":"text_completion","created":1741379018,"model":"tweet-summary-1","choices":[],"usage":{"prompt_tokens":7,"total_tokens":17,"completion_tokens":10}}`),
1236+
Body: []byte(`data: {"id":"cmpl-0fee233f-7d56-404a-acd3-4dad775d03d9","object":"text_completion","created":1741379018,"model":"tweet-summary-1","choices":[],"usage":{"prompt_tokens":7,"total_tokens":17,"completion_tokens":10}}
1237+
data: [DONE]`,
1238+
),
12371239
EndOfStream: false},
12381240
},
12391241
},
12401242
{
12411243
Request: &extProcPb.ProcessingRequest_ResponseBody{
12421244
ResponseBody: &extProcPb.HttpBody{
1243-
Body: []byte("data: [DONE]"),
1245+
Body: []byte(""),
12441246
EndOfStream: true},
12451247
},
12461248
},
12471249
},
12481250
wantErr: false,
1251+
wantMetrics: map[string]string{`inference_model_input_tokens`: `
1252+
# HELP inference_model_input_tokens [ALPHA] Inference model input token count distribution for requests in each model.
1253+
# TYPE inference_model_input_tokens histogram
1254+
inference_model_input_tokens_bucket{model_name="",target_model_name="",le="1"} 0
1255+
inference_model_input_tokens_bucket{model_name="",target_model_name="",le="8"} 1
1256+
inference_model_input_tokens_bucket{model_name="",target_model_name="",le="16"} 1
1257+
inference_model_input_tokens_bucket{model_name="",target_model_name="",le="32"} 1
1258+
inference_model_input_tokens_bucket{model_name="",target_model_name="",le="64"} 1
1259+
inference_model_input_tokens_bucket{model_name="",target_model_name="",le="128"} 1
1260+
inference_model_input_tokens_bucket{model_name="",target_model_name="",le="256"} 1
1261+
inference_model_input_tokens_bucket{model_name="",target_model_name="",le="512"} 1
1262+
inference_model_input_tokens_bucket{model_name="",target_model_name="",le="1024"} 1
1263+
inference_model_input_tokens_bucket{model_name="",target_model_name="",le="2048"} 1
1264+
inference_model_input_tokens_bucket{model_name="",target_model_name="",le="4096"} 1
1265+
inference_model_input_tokens_bucket{model_name="",target_model_name="",le="8192"} 1
1266+
inference_model_input_tokens_bucket{model_name="",target_model_name="",le="16384"} 1
1267+
inference_model_input_tokens_bucket{model_name="",target_model_name="",le="32778"} 1
1268+
inference_model_input_tokens_bucket{model_name="",target_model_name="",le="65536"} 1
1269+
inference_model_input_tokens_bucket{model_name="",target_model_name="",le="131072"} 1
1270+
inference_model_input_tokens_bucket{model_name="",target_model_name="",le="262144"} 1
1271+
inference_model_input_tokens_bucket{model_name="",target_model_name="",le="524288"} 1
1272+
inference_model_input_tokens_bucket{model_name="",target_model_name="",le="1.048576e+06"} 1
1273+
inference_model_input_tokens_bucket{model_name="",target_model_name="",le="+Inf"} 1
1274+
inference_model_input_tokens_sum{model_name="",target_model_name=""} 7
1275+
inference_model_input_tokens_count{model_name="",target_model_name=""} 1
1276+
`},
12491277
wantResponses: []*extProcPb.ProcessingResponse{
12501278
{
12511279
Response: &extProcPb.ProcessingResponse_ResponseHeaders{
@@ -1352,7 +1380,9 @@ func TestFullDuplexStreamed_KubeInferenceModelRequest(t *testing.T) {
13521380
BodyMutation: &extProcPb.BodyMutation{
13531381
Mutation: &extProcPb.BodyMutation_StreamedResponse{
13541382
StreamedResponse: &extProcPb.StreamedBodyResponse{
1355-
Body: []byte(`data: {"id":"cmpl-0fee233f-7d56-404a-acd3-4dad775d03d9","object":"text_completion","created":1741379018,"model":"tweet-summary-1","choices":[],"usage":{"prompt_tokens":7,"total_tokens":17,"completion_tokens":10}}`),
1383+
Body: []byte(`data: {"id":"cmpl-0fee233f-7d56-404a-acd3-4dad775d03d9","object":"text_completion","created":1741379018,"model":"tweet-summary-1","choices":[],"usage":{"prompt_tokens":7,"total_tokens":17,"completion_tokens":10}}
1384+
data: [DONE]`,
1385+
),
13561386
EndOfStream: false,
13571387
},
13581388
},
@@ -1368,7 +1398,7 @@ func TestFullDuplexStreamed_KubeInferenceModelRequest(t *testing.T) {
13681398
BodyMutation: &extProcPb.BodyMutation{
13691399
Mutation: &extProcPb.BodyMutation_StreamedResponse{
13701400
StreamedResponse: &extProcPb.StreamedBodyResponse{
1371-
Body: []byte("data: [DONE]"),
1401+
Body: []byte(""),
13721402
EndOfStream: true,
13731403
},
13741404
},
@@ -1394,9 +1424,11 @@ func TestFullDuplexStreamed_KubeInferenceModelRequest(t *testing.T) {
13941424
t.Errorf("Unexpected response, (-want +got): %v", diff)
13951425
}
13961426

1397-
if test.wantMetrics != "" {
1398-
if err := metricsutils.GatherAndCompare(legacyregistry.DefaultGatherer, strings.NewReader(test.wantMetrics), "inference_model_request_total"); err != nil {
1399-
t.Error(err)
1427+
if len(test.wantMetrics) != 0 {
1428+
for metricName, value := range test.wantMetrics {
1429+
if err := metricsutils.GatherAndCompare(legacyregistry.DefaultGatherer, strings.NewReader(value), metricName); err != nil {
1430+
t.Error(err)
1431+
}
14001432
}
14011433
}
14021434

0 commit comments

Comments
 (0)