Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Metrics] Add vLLM streaming support for metrics #329

Merged
merged 1 commit into from
Mar 14, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
109 changes: 96 additions & 13 deletions pkg/epp/handlers/response.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,11 @@ import (
"context"
"encoding/json"
"fmt"
"strings"

configPb "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
extProcPb "github.com/envoyproxy/go-control-plane/envoy/service/ext_proc/v3"
"github.com/go-logr/logr"
"sigs.k8s.io/controller-runtime/pkg/log"
errutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/error"
logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/logging"
Expand Down Expand Up @@ -67,11 +69,25 @@ func (s *Server) HandleResponseHeaders(
// }
// }
for _, header := range h.ResponseHeaders.Headers.GetHeaders() {
var statusFound, typeFound bool
if header.Key == "status" {
code := header.RawValue[0]
if string(code) != "200" {
reqCtx.ResponseStatusCode = errutil.ModelServerError
statusFound = true
}
}
if header.Key == "content-type" {
contentType := header.RawValue
if strings.Contains(string(contentType), "text/event-stream") {
reqCtx.Streaming = true
} else {
reqCtx.Streaming = false
}
typeFound = true
}

if statusFound && typeFound {
break
}
}
Expand Down Expand Up @@ -132,22 +148,19 @@ func (s *Server) HandleResponseBody(
) (*extProcPb.ProcessingResponse, error) {
logger := log.FromContext(ctx)
loggerVerbose := logger.V(logutil.VERBOSE)
loggerVerbose.Info("Processing HandleResponseBody")
body := req.Request.(*extProcPb.ProcessingRequest_ResponseBody)

res := Response{}
if err := json.Unmarshal(body.ResponseBody.Body, &res); err != nil {
return nil, errutil.Error{Code: errutil.Internal, Msg: fmt.Sprintf("unmarshaling response body: %v", err)}
if reqCtx.Streaming {
logger.V(logutil.DEBUG).Info("Processing HandleResponseBody")
if err := s.HandleStreaming(ctx, reqCtx, body, loggerVerbose); err != nil {
return nil, err
}
} else {
loggerVerbose.Info("Processing HandleResponseBody")
if err := s.HandleNonStreaming(ctx, reqCtx, body, loggerVerbose); err != nil {
return nil, err
}
}
reqCtx.Response = res
reqCtx.ResponseSize = len(body.ResponseBody.Body)
// ResponseComplete is to indicate the response is complete. In non-streaming
// case, it will be set to be true once the response is processed; in
// streaming case, it will be set to be true once the last chunk is processed.
// TODO(https://github.com/kubernetes-sigs/gateway-api-inference-extension/issues/178)
// will add the processing for streaming case.
reqCtx.ResponseComplete = true
loggerVerbose.Info("Response generated", "response", res)

resp := &extProcPb.ProcessingResponse{
Response: &extProcPb.ProcessingResponse_ResponseBody{
Expand All @@ -159,6 +172,76 @@ func (s *Server) HandleResponseBody(
return resp, nil
}

func (s *Server) HandleNonStreaming(
ctx context.Context,
reqCtx *RequestContext,
body *extProcPb.ProcessingRequest_ResponseBody,
loggerVerbose logr.Logger,
) error {
loggerVerbose.Info("Processing HandleResponseBody")

res := Response{}
if err := json.Unmarshal(body.ResponseBody.Body, &res); err != nil {
return errutil.Error{Code: errutil.Internal, Msg: fmt.Sprintf("unmarshaling response body: %v", err)}
}
reqCtx.Response = res
reqCtx.ResponseSize = len(body.ResponseBody.Body)
reqCtx.ResponseComplete = true
loggerVerbose.Info("Response generated", "response", res)
return nil
}

func (s *Server) HandleStreaming(
ctx context.Context,
reqCtx *RequestContext,
body *extProcPb.ProcessingRequest_ResponseBody,
loggerVerbose logr.Logger,
) error {
respPrefix := "data: "
responseText := string(body.ResponseBody.Body)
// Example message if "stream_options": {"include_usage": "true"} is included in the request:
// data: {"id":"...","object":"text_completion","created":1739400043,"model":"tweet-summary-0","choices":[],
// "usage":{"prompt_tokens":7,"total_tokens":17,"completion_tokens":10}}
//
// data: [DONE]
//
// Noticed that vLLM returns two entries in one response.
// We need to strip the `data:` prefix and next Data: [DONE] from the message to fetch response data.
//
// If include_usage is not included in the request, `data: [DONE]` is returned separately, which
// indicates end of streaming.
if strings.Contains(responseText, "data: [DONE]") {
response := Response{}

lines := strings.Split(responseText, "\n")
for _, line := range lines {
if !strings.HasPrefix(line, respPrefix) {
continue
}
content := strings.TrimPrefix(line, respPrefix)
if content == "[DONE]" {
continue
}

byteSlice := []byte(content)
if err := json.Unmarshal(byteSlice, &response); err != nil {
loggerVerbose.Error(err, "unmarshaling response body")
continue
}
}
reqCtx.Response = response
}

if body.ResponseBody.EndOfStream {
loggerVerbose.Info("Streaming is completed")
reqCtx.ResponseComplete = true
} else {
reqCtx.ResponseSize += len(body.ResponseBody.Body)
}

return nil
}

type Response struct {
Usage Usage `json:"usage"`
}
Expand Down
45 changes: 44 additions & 1 deletion pkg/epp/handlers/response_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,13 @@ const (
}
}
`

streamingBodyWithoutUsage = `data: {"id":"cmpl-41764c93-f9d2-4f31-be08-3ba04fa25394","object":"text_completion","created":1740002445,"model":"tweet-summary-0","choices":[],"usage":null}
`

streamingBodyWithUsage = `data: {"id":"cmpl-41764c93-f9d2-4f31-be08-3ba04fa25394","object":"text_completion","created":1740002445,"model":"tweet-summary-0","choices":[],"usage":{"prompt_tokens":7,"total_tokens":17,"completion_tokens":10}}
data: [DONE]
`
)

func TestHandleResponseBody(t *testing.T) {
Expand All @@ -57,6 +64,7 @@ func TestHandleResponseBody(t *testing.T) {
tests := []struct {
name string
req *extProcPb.ProcessingRequest_ResponseBody
reqCtx *RequestContext
want Response
wantErr bool
}{
Expand Down Expand Up @@ -84,12 +92,47 @@ func TestHandleResponseBody(t *testing.T) {
},
wantErr: true,
},
{
name: "streaming request without usage",
req: &extProcPb.ProcessingRequest_ResponseBody{
ResponseBody: &extProcPb.HttpBody{
Body: []byte(streamingBodyWithoutUsage),
},
},
reqCtx: &RequestContext{
Streaming: true,
},
wantErr: false,
// In the middle of streaming response, so request context response is not set yet.
},
{
name: "streaming request with usage",
req: &extProcPb.ProcessingRequest_ResponseBody{
ResponseBody: &extProcPb.HttpBody{
Body: []byte(streamingBodyWithUsage),
},
},
reqCtx: &RequestContext{
Streaming: true,
},
wantErr: false,
want: Response{
Usage: Usage{
PromptTokens: 7,
TotalTokens: 17,
CompletionTokens: 10,
},
},
},
}

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
server := &Server{}
reqCtx := &RequestContext{}
reqCtx := test.reqCtx
if reqCtx == nil {
reqCtx = &RequestContext{}
}
_, err := server.HandleResponseBody(ctx, reqCtx, &extProcPb.ProcessingRequest{Request: test.req})
if err != nil {
if !test.wantErr {
Expand Down
13 changes: 11 additions & 2 deletions pkg/epp/handlers/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,11 @@ func (s *Server) Process(srv extProcPb.ExternalProcessor_ProcessServer) error {
metrics.RecordInputTokens(reqCtx.Model, reqCtx.ResolvedTargetModel, reqCtx.Response.Usage.PromptTokens)
metrics.RecordOutputTokens(reqCtx.Model, reqCtx.ResolvedTargetModel, reqCtx.Response.Usage.CompletionTokens)
}
loggerVerbose.Info("Request context after HandleResponseBody", "context", reqCtx)
if reqCtx.Streaming {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this check is needed, consider removing

Copy link
Contributor Author

@JeffLuoo JeffLuoo Mar 12, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is to prevent logs being emitted too often. If the envoy proxy is set to STREAMED, each response chunk will print a line of log. So a response with 1000 chunks will have 1000 lines of such information with Info level.

logger.V(logutil.DEBUG).Info("Request context after HandleResponseBody", "context", reqCtx)
} else {
loggerVerbose.Info("Request context after HandleResponseBody", "context", reqCtx)
}
default:
logger.V(logutil.DEFAULT).Error(nil, "Unknown Request type", "request", v)
return status.Error(codes.Unknown, "unknown request type")
Expand All @@ -138,7 +142,11 @@ func (s *Server) Process(srv extProcPb.ExternalProcessor_ProcessServer) error {
}
}

loggerVerbose.Info("Response generated", "response", resp)
if !reqCtx.Streaming {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here also

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See comment above, this is to prevent noisy logs.

loggerVerbose.Info("Response generated", "response", resp)
} else {
logger.V(logutil.DEBUG).Info("Response generated", "response", resp)
}
if err := srv.Send(resp); err != nil {
logger.V(logutil.DEFAULT).Error(err, "Send failed")
return status.Errorf(codes.Unknown, "failed to send response back to Envoy: %v", err)
Expand Down Expand Up @@ -213,4 +221,5 @@ type RequestContext struct {
ResponseSize int
ResponseComplete bool
ResponseStatusCode string
Streaming bool
}
22 changes: 14 additions & 8 deletions site-src/guides/metrics.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,7 @@ This guide describes the current state of exposed metrics and how to scrape them

## Requirements

Response metrics are only supported in non-streaming mode, with the follow up [issue](https://github.com/kubernetes-sigs/gateway-api-inference-extension/issues/178) to address streaming mode.

Currently there are two options:
- If requests don't use response streaming, then you can enable `Buffered` mode for response in `EnvoyExtensionPolicy`, this will buffer the response body at the proxy and forward it to the endpoint picker, which allows the endpoint picker to report response metrics.

- If requests use response streaming, then it is not recommended to enable `Buffered` mode, the response body processing mode should be left empty in the `EnvoyExtensionPolicy` (default). In this case response bodies will not be forwarded to the endpoint picker, and therefore response metrics will not be reported.


To have response metrics, set the body mode to `Buffered` or `Streamed`:
```
apiVersion: gateway.envoyproxy.io/v1alpha1
kind: EnvoyExtensionPolicy
Expand All @@ -32,6 +25,19 @@ spec:
body: Buffered
```

If you want to include usage metrics for vLLM model server streaming request, send the request with `include_usage`:

```
curl -i ${IP}:${PORT}/v1/completions -H 'Content-Type: application/json' -d '{
"model": "tweet-summary",
"prompt": "whats your fav movie?",
"max_tokens": 10,
"temperature": 0,
"stream": true,
"stream_options": {"include_usage": "true"}
}'
```

## Exposed metrics

| **Metric name** | **Metric Type** | <div style="width:200px">**Description**</div> | <div style="width:250px">**Labels**</div> | **Status** |
Expand Down