Skip to content

Commit 4249311

Browse files
committed
[Metrics] Add streaming support for metrics
Address kubernetes-sigs#178
1 parent 2577f63 commit 4249311

File tree

5 files changed

+188
-22
lines changed

5 files changed

+188
-22
lines changed

pkg/epp/handlers/request.go

+29
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,35 @@ func (s *Server) HandleRequestBody(
6161
loggerVerbose.Info("Model requested", "model", model)
6262
modelName := model
6363

64+
// Resolve streaming options
65+
66+
streaming, ok := rb["stream"].(bool)
67+
if !ok {
68+
// streaming not set, no-op
69+
} else {
70+
reqCtx.Streaming = streaming
71+
}
72+
73+
type Usage struct {
74+
IncludeUsage string `json:"include_usage,omitempty"`
75+
}
76+
if streamOption, ok := rb["stream_options"]; ok {
77+
includeUsage := Usage{}
78+
optionJson, err := json.Marshal(streamOption)
79+
if err != nil {
80+
return nil, errutil.Error{Code: errutil.BadRequest, Msg: fmt.Sprintf("marshalling stream_options: %v", err)}
81+
}
82+
if err := json.Unmarshal(optionJson, &includeUsage); err != nil {
83+
return nil, errutil.Error{Code: errutil.Internal, Msg: fmt.Sprintf("unmarshaling stream_options: %v", err)}
84+
}
85+
usageEnabled, err := strconv.ParseBool(includeUsage.IncludeUsage)
86+
if err != nil {
87+
return nil, errutil.Error{Code: errutil.BadRequest, Msg: fmt.Sprintf("invalid include_usage: %v", includeUsage.IncludeUsage)}
88+
}
89+
90+
reqCtx.StreamingIncludeUsage = usageEnabled
91+
}
92+
6493
// NOTE: The nil checking for the modelObject means that we DO allow passthrough currently.
6594
// This might be a security risk in the future where adapters not registered in the InferenceModel
6695
// are able to be requested by using their distinct name.

pkg/epp/handlers/response.go

+62-13
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@ import (
2020
"context"
2121
"encoding/json"
2222
"fmt"
23+
"regexp"
24+
"strings"
2325

2426
configPb "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
2527
extProcPb "github.com/envoyproxy/go-control-plane/envoy/service/ext_proc/v3"
@@ -76,6 +78,10 @@ func (s *Server) HandleResponseHeaders(
7678
}
7779
}
7880

81+
if h.ResponseHeaders.EndOfStream {
82+
reqCtx.StreamingCompleted = true
83+
loggerVerbose.Info("Header indicates streaming complete")
84+
}
7985
resp := &extProcPb.ProcessingResponse{
8086
Response: &extProcPb.ProcessingResponse_ResponseHeaders{
8187
ResponseHeaders: &extProcPb.HeadersResponse{
@@ -132,22 +138,65 @@ func (s *Server) HandleResponseBody(
132138
) (*extProcPb.ProcessingResponse, error) {
133139
logger := log.FromContext(ctx)
134140
loggerVerbose := logger.V(logutil.VERBOSE)
135-
loggerVerbose.Info("Processing HandleResponseBody")
136141
body := req.Request.(*extProcPb.ProcessingRequest_ResponseBody)
137142

138-
res := Response{}
139-
if err := json.Unmarshal(body.ResponseBody.Body, &res); err != nil {
140-
return nil, errutil.Error{Code: errutil.Internal, Msg: fmt.Sprintf("unmarshaling response body: %v", err)}
143+
if reqCtx.Streaming {
144+
logger.V(logutil.DEBUG).Info("Processing HandleResponseBody")
145+
146+
responseText := string(body.ResponseBody.Body)
147+
// Example message if "stream_options": {"include_usage": "true"} is included in the request:
148+
// data: {"id":"...","object":"text_completion","created":1739400043,"model":"tweet-summary-0","choices":[],
149+
// "usage":{"prompt_tokens":7,"total_tokens":17,"completion_tokens":10}}
150+
//
151+
// data: [DONE]
152+
// Noticed that vLLM returns two entries in one response.
153+
// We need to strip the `data:` prefix and next Data: [DONE] from the message to fetch response data.
154+
//
155+
// If include_usage is not included in the request, `data: [DONE]` is returned separately, which
156+
// indicates end of streaming.
157+
if strings.Contains(responseText, "data: [DONE]") {
158+
response := Response{}
159+
160+
if reqCtx.StreamingIncludeUsage {
161+
162+
re := regexp.MustCompile(`\{.*(?:\{.*\}|[^\{]*)\}`) // match for JSON object
163+
match := re.FindString(responseText)
164+
if match == "" {
165+
return nil, errutil.Error{Code: errutil.ModelServerError, Msg: fmt.Sprintf("model server returned invalid response: %v", responseText)}
166+
}
167+
byteSlice := []byte(match)
168+
if err := json.Unmarshal(byteSlice, &response); err != nil {
169+
return nil, errutil.Error{Code: errutil.Internal, Msg: fmt.Sprintf("unmarshaling response body: %v", err)}
170+
}
171+
} else {
172+
// ResponseBody.EndOfStream is only set if include_usage is set to true.
173+
reqCtx.ResponseComplete = true
174+
loggerVerbose.Info("Streaming is completed")
175+
}
176+
177+
reqCtx.Response = response
178+
}
179+
180+
if body.ResponseBody.EndOfStream {
181+
loggerVerbose.Info("Streaming is completed")
182+
reqCtx.ResponseComplete = true
183+
} else {
184+
reqCtx.ResponseSize += len(body.ResponseBody.Body)
185+
}
186+
187+
} else {
188+
loggerVerbose.Info("Processing HandleResponseBody")
189+
190+
res := Response{}
191+
if err := json.Unmarshal(body.ResponseBody.Body, &res); err != nil {
192+
return nil, errutil.Error{Code: errutil.Internal, Msg: fmt.Sprintf("unmarshaling response body: %v", err)}
193+
}
194+
reqCtx.Response = res
195+
reqCtx.ResponseSize = len(body.ResponseBody.Body)
196+
reqCtx.ResponseComplete = true
197+
198+
loggerVerbose.Info("Response generated", "response", res)
141199
}
142-
reqCtx.Response = res
143-
reqCtx.ResponseSize = len(body.ResponseBody.Body)
144-
// ResponseComplete is to indicate the response is complete. In non-streaming
145-
// case, it will be set to be true once the response is processed; in
146-
// streaming case, it will be set to be true once the last chunk is processed.
147-
// TODO(https://github.com/kubernetes-sigs/gateway-api-inference-extension/issues/178)
148-
// will add the processing for streaming case.
149-
reqCtx.ResponseComplete = true
150-
loggerVerbose.Info("Response generated", "response", res)
151200

152201
resp := &extProcPb.ProcessingResponse{
153202
Response: &extProcPb.ProcessingResponse_ResponseBody{

pkg/epp/handlers/response_test.go

+49-1
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,16 @@ const (
4949
}
5050
}
5151
`
52+
53+
streamingBodyWithoutUsage = `
54+
data: data: {"id":"cmpl-41764c93-f9d2-4f31-be08-3ba04fa25394","object":"text_completion","created":1740002445,"model":"tweet-summary-0","choices":[],"usage":null}
55+
`
56+
57+
streamingBodyWithUsage = `
58+
data: {"id":"cmpl-41764c93-f9d2-4f31-be08-3ba04fa25394","object":"text_completion","created":1740002445,"model":"tweet-summary-0","choices":[],"usage":{"prompt_tokens":7,"total_tokens":17,"completion_tokens":10}}
59+
60+
data: [DONE]
61+
`
5262
)
5363

5464
func TestHandleResponseBody(t *testing.T) {
@@ -57,6 +67,7 @@ func TestHandleResponseBody(t *testing.T) {
5767
tests := []struct {
5868
name string
5969
req *extProcPb.ProcessingRequest_ResponseBody
70+
reqCtx *RequestContext
6071
want Response
6172
wantErr bool
6273
}{
@@ -84,12 +95,49 @@ func TestHandleResponseBody(t *testing.T) {
8495
},
8596
wantErr: true,
8697
},
98+
{
99+
name: "streaming request without usage",
100+
req: &extProcPb.ProcessingRequest_ResponseBody{
101+
ResponseBody: &extProcPb.HttpBody{
102+
Body: []byte(streamingBodyWithoutUsage),
103+
},
104+
},
105+
reqCtx: &RequestContext{
106+
Streaming: true,
107+
StreamingIncludeUsage: false,
108+
},
109+
wantErr: false,
110+
// In the middle of streaming response, so request context reponse is not set yet.
111+
},
112+
{
113+
name: "streaming request with usage",
114+
req: &extProcPb.ProcessingRequest_ResponseBody{
115+
ResponseBody: &extProcPb.HttpBody{
116+
Body: []byte(streamingBodyWithUsage),
117+
},
118+
},
119+
reqCtx: &RequestContext{
120+
Streaming: true,
121+
StreamingIncludeUsage: true,
122+
},
123+
wantErr: false,
124+
want: Response{
125+
Usage: Usage{
126+
PromptTokens: 7,
127+
TotalTokens: 17,
128+
CompletionTokens: 10,
129+
},
130+
},
131+
},
87132
}
88133

89134
for _, test := range tests {
90135
t.Run(test.name, func(t *testing.T) {
91136
server := &Server{}
92-
reqCtx := &RequestContext{}
137+
reqCtx := test.reqCtx
138+
if reqCtx == nil {
139+
reqCtx = &RequestContext{}
140+
}
93141
_, err := server.HandleResponseBody(ctx, reqCtx, &extProcPb.ProcessingRequest{Request: test.req})
94142
if err != nil {
95143
if !test.wantErr {

pkg/epp/handlers/server.go

+13-2
Original file line numberDiff line numberDiff line change
@@ -121,7 +121,11 @@ func (s *Server) Process(srv extProcPb.ExternalProcessor_ProcessServer) error {
121121
metrics.RecordInputTokens(reqCtx.Model, reqCtx.ResolvedTargetModel, reqCtx.Response.Usage.PromptTokens)
122122
metrics.RecordOutputTokens(reqCtx.Model, reqCtx.ResolvedTargetModel, reqCtx.Response.Usage.CompletionTokens)
123123
}
124-
loggerVerbose.Info("Request context after HandleResponseBody", "context", reqCtx)
124+
if reqCtx.Streaming {
125+
logger.V(logutil.DEBUG).Info("Request context after HandleResponseBody", "context", reqCtx)
126+
} else {
127+
loggerVerbose.Info("Request context after HandleResponseBody", "context", reqCtx)
128+
}
125129
default:
126130
logger.V(logutil.DEFAULT).Error(nil, "Unknown Request type", "request", v)
127131
return status.Error(codes.Unknown, "unknown request type")
@@ -179,7 +183,11 @@ func (s *Server) Process(srv extProcPb.ExternalProcessor_ProcessServer) error {
179183
}
180184
}
181185

182-
loggerVerbose.Info("Response generated", "response", resp)
186+
if !reqCtx.Streaming {
187+
loggerVerbose.Info("Response generated", "response", resp)
188+
} else {
189+
logger.V(logutil.DEBUG).Info("Response generated", "response", resp)
190+
}
183191
if err := srv.Send(resp); err != nil {
184192
logger.V(logutil.DEFAULT).Error(err, "Send failed")
185193
return status.Errorf(codes.Unknown, "failed to send response back to Envoy: %v", err)
@@ -200,4 +208,7 @@ type RequestContext struct {
200208
ResponseSize int
201209
ResponseComplete bool
202210
ResponseStatusCode string
211+
Streaming bool
212+
StreamingCompleted bool
213+
StreamingIncludeUsage bool
203214
}

pkg/epp/metrics/README.md

+35-6
Original file line numberDiff line numberDiff line change
@@ -8,13 +8,29 @@ This documentation is the current state of exposed metrics.
88

99
## Requirements
1010

11-
Response metrics are only supported in non-streaming mode, with the follow up [issue](https://github.com/kubernetes-sigs/gateway-api-inference-extension/issues/178) to address streaming mode.
11+
For non-streaming request, enable `Buffered` for response in `EnvoyExtensionPolicy`:
1212

13-
Currently there are two options:
14-
- If requests don't use response streaming, then you can enable `Buffered` mode for response in `EnvoyExtensionPolicy`, this will buffer the response body at the proxy and forward it to the endpoint picker, which allows the endpoint picker to report response metrics.
15-
16-
- If requests use response streaming, then it is not recommended to enable `Buffered` mode, the response body processing mode should be left empty in the `EnvoyExtensionPolicy` (default). In this case response bodies will not be forwarded to the endpoint picker, and therefore response metrics will not be reported.
13+
```
14+
apiVersion: gateway.envoyproxy.io/v1alpha1
15+
kind: EnvoyExtensionPolicy
16+
metadata:
17+
name: ext-proc-policy
18+
namespace: default
19+
spec:
20+
extProc:
21+
- backendRefs:
22+
- group: ""
23+
kind: Service
24+
name: inference-gateway-ext-proc
25+
port: 9002
26+
processingMode:
27+
request:
28+
body: Buffered
29+
response:
30+
body: Buffered
31+
```
1732

33+
For streaming request, enable `Streamed` for response in `EnvoyExtensionPolicy`:
1834

1935
```
2036
apiVersion: gateway.envoyproxy.io/v1alpha1
@@ -33,7 +49,20 @@ spec:
3349
request:
3450
body: Buffered
3551
response:
36-
body: Buffered
52+
body: Streamed
53+
```
54+
55+
If you want to include usage metrics for vLLM model server, send the request with `include_usage`:
56+
57+
```
58+
curl -i ${IP}:${PORT}/v1/completions -H 'Content-Type: application/json' -d '{
59+
"model": "tweet-summary",
60+
"prompt": "whats your fav movie?",
61+
"max_tokens": 10,
62+
"temperature": 0,
63+
"stream": true,
64+
"stream_options": {"include_usage": "true"}
65+
}'
3766
```
3867

3968
## Exposed metrics

0 commit comments

Comments
 (0)