Skip to content

Commit d479c3d

Browse files
committed
[Metrics] Add streaming support for metrics
Address #178
1 parent 9079982 commit d479c3d

File tree

4 files changed

+165
-24
lines changed

4 files changed

+165
-24
lines changed

pkg/epp/handlers/response.go

+96-13
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,11 @@ import (
2020
"context"
2121
"encoding/json"
2222
"fmt"
23+
"strings"
2324

2425
configPb "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
2526
extProcPb "github.com/envoyproxy/go-control-plane/envoy/service/ext_proc/v3"
27+
"github.com/go-logr/logr"
2628
"sigs.k8s.io/controller-runtime/pkg/log"
2729
errutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/error"
2830
logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/logging"
@@ -67,11 +69,25 @@ func (s *Server) HandleResponseHeaders(
6769
// }
6870
// }
6971
for _, header := range h.ResponseHeaders.Headers.GetHeaders() {
72+
var statusFound, typeFound bool
7073
if header.Key == "status" {
7174
code := header.RawValue[0]
7275
if string(code) != "200" {
7376
reqCtx.ResponseStatusCode = errutil.ModelServerError
77+
statusFound = true
7478
}
79+
}
80+
if header.Key == "content-type" {
81+
contentType := header.RawValue
82+
if strings.Contains(string(contentType), "text/event-stream") {
83+
reqCtx.Streaming = true
84+
} else {
85+
reqCtx.Streaming = false
86+
}
87+
typeFound = true
88+
}
89+
90+
if statusFound && typeFound {
7591
break
7692
}
7793
}
@@ -132,22 +148,19 @@ func (s *Server) HandleResponseBody(
132148
) (*extProcPb.ProcessingResponse, error) {
133149
logger := log.FromContext(ctx)
134150
loggerVerbose := logger.V(logutil.VERBOSE)
135-
loggerVerbose.Info("Processing HandleResponseBody")
136151
body := req.Request.(*extProcPb.ProcessingRequest_ResponseBody)
137152

138-
res := Response{}
139-
if err := json.Unmarshal(body.ResponseBody.Body, &res); err != nil {
140-
return nil, errutil.Error{Code: errutil.Internal, Msg: fmt.Sprintf("unmarshaling response body: %v", err)}
153+
if reqCtx.Streaming {
154+
logger.V(logutil.DEBUG).Info("Processing HandleResponseBody")
155+
if err := s.HandleStreaming(ctx, reqCtx, body, loggerVerbose); err != nil {
156+
return nil, err
157+
}
158+
} else {
159+
loggerVerbose.Info("Processing HandleResponseBody")
160+
if err := s.HandleNonStreaming(ctx, reqCtx, body, loggerVerbose); err != nil {
161+
return nil, err
162+
}
141163
}
142-
reqCtx.Response = res
143-
reqCtx.ResponseSize = len(body.ResponseBody.Body)
144-
// ResponseComplete is to indicate the response is complete. In non-streaming
145-
// case, it will be set to be true once the response is processed; in
146-
// streaming case, it will be set to be true once the last chunk is processed.
147-
// TODO(https://github.com/kubernetes-sigs/gateway-api-inference-extension/issues/178)
148-
// will add the processing for streaming case.
149-
reqCtx.ResponseComplete = true
150-
loggerVerbose.Info("Response generated", "response", res)
151164

152165
resp := &extProcPb.ProcessingResponse{
153166
Response: &extProcPb.ProcessingResponse_ResponseBody{
@@ -159,6 +172,76 @@ func (s *Server) HandleResponseBody(
159172
return resp, nil
160173
}
161174

175+
func (s *Server) HandleNonStreaming(
176+
ctx context.Context,
177+
reqCtx *RequestContext,
178+
body *extProcPb.ProcessingRequest_ResponseBody,
179+
loggerVerbose logr.Logger,
180+
) error {
181+
loggerVerbose.Info("Processing HandleResponseBody")
182+
183+
res := Response{}
184+
if err := json.Unmarshal(body.ResponseBody.Body, &res); err != nil {
185+
return errutil.Error{Code: errutil.Internal, Msg: fmt.Sprintf("unmarshaling response body: %v", err)}
186+
}
187+
reqCtx.Response = res
188+
reqCtx.ResponseSize = len(body.ResponseBody.Body)
189+
reqCtx.ResponseComplete = true
190+
loggerVerbose.Info("Response generated", "response", res)
191+
return nil
192+
}
193+
194+
func (s *Server) HandleStreaming(
195+
ctx context.Context,
196+
reqCtx *RequestContext,
197+
body *extProcPb.ProcessingRequest_ResponseBody,
198+
loggerVerbose logr.Logger,
199+
) error {
200+
respPrefix := "data: "
201+
responseText := string(body.ResponseBody.Body)
202+
// Example message if "stream_options": {"include_usage": "true"} is included in the request:
203+
// data: {"id":"...","object":"text_completion","created":1739400043,"model":"tweet-summary-0","choices":[],
204+
// "usage":{"prompt_tokens":7,"total_tokens":17,"completion_tokens":10}}
205+
//
206+
// data: [DONE]
207+
//
208+
// Noticed that vLLM returns two entries in one response.
209+
// We need to strip the `data:` prefix and next Data: [DONE] from the message to fetch response data.
210+
//
211+
// If include_usage is not included in the request, `data: [DONE]` is returned separately, which
212+
// indicates end of streaming.
213+
if strings.Contains(responseText, "data: [DONE]") {
214+
response := Response{}
215+
216+
lines := strings.Split(responseText, "\n")
217+
for _, line := range lines {
218+
if !strings.HasPrefix(line, respPrefix) {
219+
continue
220+
}
221+
content := strings.TrimPrefix(line, respPrefix)
222+
if content == "[DONE]" {
223+
continue
224+
}
225+
226+
byteSlice := []byte(content)
227+
if err := json.Unmarshal(byteSlice, &response); err != nil {
228+
loggerVerbose.Error(err, "unmarshaling response body")
229+
continue
230+
}
231+
}
232+
reqCtx.Response = response
233+
}
234+
235+
if body.ResponseBody.EndOfStream {
236+
loggerVerbose.Info("Streaming is completed")
237+
reqCtx.ResponseComplete = true
238+
} else {
239+
reqCtx.ResponseSize += len(body.ResponseBody.Body)
240+
}
241+
242+
return nil
243+
}
244+
162245
type Response struct {
163246
Usage Usage `json:"usage"`
164247
}

pkg/epp/handlers/response_test.go

+44-1
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,13 @@ const (
4949
}
5050
}
5151
`
52+
53+
streamingBodyWithoutUsage = `data: {"id":"cmpl-41764c93-f9d2-4f31-be08-3ba04fa25394","object":"text_completion","created":1740002445,"model":"tweet-summary-0","choices":[],"usage":null}
54+
`
55+
56+
streamingBodyWithUsage = `data: {"id":"cmpl-41764c93-f9d2-4f31-be08-3ba04fa25394","object":"text_completion","created":1740002445,"model":"tweet-summary-0","choices":[],"usage":{"prompt_tokens":7,"total_tokens":17,"completion_tokens":10}}
57+
data: [DONE]
58+
`
5259
)
5360

5461
func TestHandleResponseBody(t *testing.T) {
@@ -57,6 +64,7 @@ func TestHandleResponseBody(t *testing.T) {
5764
tests := []struct {
5865
name string
5966
req *extProcPb.ProcessingRequest_ResponseBody
67+
reqCtx *RequestContext
6068
want Response
6169
wantErr bool
6270
}{
@@ -84,12 +92,47 @@ func TestHandleResponseBody(t *testing.T) {
8492
},
8593
wantErr: true,
8694
},
95+
{
96+
name: "streaming request without usage",
97+
req: &extProcPb.ProcessingRequest_ResponseBody{
98+
ResponseBody: &extProcPb.HttpBody{
99+
Body: []byte(streamingBodyWithoutUsage),
100+
},
101+
},
102+
reqCtx: &RequestContext{
103+
Streaming: true,
104+
},
105+
wantErr: false,
106+
// In the middle of streaming response, so request context response is not set yet.
107+
},
108+
{
109+
name: "streaming request with usage",
110+
req: &extProcPb.ProcessingRequest_ResponseBody{
111+
ResponseBody: &extProcPb.HttpBody{
112+
Body: []byte(streamingBodyWithUsage),
113+
},
114+
},
115+
reqCtx: &RequestContext{
116+
Streaming: true,
117+
},
118+
wantErr: false,
119+
want: Response{
120+
Usage: Usage{
121+
PromptTokens: 7,
122+
TotalTokens: 17,
123+
CompletionTokens: 10,
124+
},
125+
},
126+
},
87127
}
88128

89129
for _, test := range tests {
90130
t.Run(test.name, func(t *testing.T) {
91131
server := &Server{}
92-
reqCtx := &RequestContext{}
132+
reqCtx := test.reqCtx
133+
if reqCtx == nil {
134+
reqCtx = &RequestContext{}
135+
}
93136
_, err := server.HandleResponseBody(ctx, reqCtx, &extProcPb.ProcessingRequest{Request: test.req})
94137
if err != nil {
95138
if !test.wantErr {

pkg/epp/handlers/server.go

+11-2
Original file line numberDiff line numberDiff line change
@@ -124,7 +124,11 @@ func (s *Server) Process(srv extProcPb.ExternalProcessor_ProcessServer) error {
124124
metrics.RecordInputTokens(reqCtx.Model, reqCtx.ResolvedTargetModel, reqCtx.Response.Usage.PromptTokens)
125125
metrics.RecordOutputTokens(reqCtx.Model, reqCtx.ResolvedTargetModel, reqCtx.Response.Usage.CompletionTokens)
126126
}
127-
loggerVerbose.Info("Request context after HandleResponseBody", "context", reqCtx)
127+
if reqCtx.Streaming {
128+
logger.V(logutil.DEBUG).Info("Request context after HandleResponseBody", "context", reqCtx)
129+
} else {
130+
loggerVerbose.Info("Request context after HandleResponseBody", "context", reqCtx)
131+
}
128132
default:
129133
logger.V(logutil.DEFAULT).Error(nil, "Unknown Request type", "request", v)
130134
return status.Error(codes.Unknown, "unknown request type")
@@ -138,7 +142,11 @@ func (s *Server) Process(srv extProcPb.ExternalProcessor_ProcessServer) error {
138142
}
139143
}
140144

141-
loggerVerbose.Info("Response generated", "response", resp)
145+
if !reqCtx.Streaming {
146+
loggerVerbose.Info("Response generated", "response", resp)
147+
} else {
148+
logger.V(logutil.DEBUG).Info("Response generated", "response", resp)
149+
}
142150
if err := srv.Send(resp); err != nil {
143151
logger.V(logutil.DEFAULT).Error(err, "Send failed")
144152
return status.Errorf(codes.Unknown, "failed to send response back to Envoy: %v", err)
@@ -213,4 +221,5 @@ type RequestContext struct {
213221
ResponseSize int
214222
ResponseComplete bool
215223
ResponseStatusCode string
224+
Streaming bool
216225
}

site-src/guides/metrics.md

+14-8
Original file line numberDiff line numberDiff line change
@@ -4,14 +4,7 @@ This guide describes the current state of exposed metrics and how to scrape them
44

55
## Requirements
66

7-
Response metrics are only supported in non-streaming mode, with the follow up [issue](https://github.com/kubernetes-sigs/gateway-api-inference-extension/issues/178) to address streaming mode.
8-
9-
Currently there are two options:
10-
- If requests don't use response streaming, then you can enable `Buffered` mode for response in `EnvoyExtensionPolicy`, this will buffer the response body at the proxy and forward it to the endpoint picker, which allows the endpoint picker to report response metrics.
11-
12-
- If requests use response streaming, then it is not recommended to enable `Buffered` mode, the response body processing mode should be left empty in the `EnvoyExtensionPolicy` (default). In this case response bodies will not be forwarded to the endpoint picker, and therefore response metrics will not be reported.
13-
14-
7+
To have response metrics, set the body mode to `Buffered` or `Streamed`:
158
```
169
apiVersion: gateway.envoyproxy.io/v1alpha1
1710
kind: EnvoyExtensionPolicy
@@ -32,6 +25,19 @@ spec:
3225
body: Buffered
3326
```
3427

28+
If you want to include usage metrics for vLLM model server streaming request, send the request with `include_usage`:
29+
30+
```
31+
curl -i ${IP}:${PORT}/v1/completions -H 'Content-Type: application/json' -d '{
32+
"model": "tweet-summary",
33+
"prompt": "whats your fav movie?",
34+
"max_tokens": 10,
35+
"temperature": 0,
36+
"stream": true,
37+
"stream_options": {"include_usage": "true"}
38+
}'
39+
```
40+
3541
## Exposed metrics
3642

3743
| **Metric name** | **Metric Type** | <div style="width:200px">**Description**</div> | <div style="width:250px">**Labels**</div> | **Status** |

0 commit comments

Comments
 (0)