Skip to content

Commit a90bd84

Browse files
committed
[Metrics] Add streaming support for metrics
Address #178
1 parent 2577f63 commit a90bd84

File tree

5 files changed

+183
-22
lines changed

5 files changed

+183
-22
lines changed

pkg/epp/handlers/request.go

+29
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,35 @@ func (s *Server) HandleRequestBody(
6161
loggerVerbose.Info("Model requested", "model", model)
6262
modelName := model
6363

64+
// Resolve streaming options
65+
66+
streaming, ok := rb["stream"].(bool)
67+
if !ok {
68+
// streaming not set, no-op
69+
} else {
70+
reqCtx.Streaming = streaming
71+
}
72+
73+
type Usage struct {
74+
IncludeUsage string `json:"include_usage,omitempty"`
75+
}
76+
if streamOption, ok := rb["stream_options"]; ok {
77+
includeUsage := Usage{}
78+
optionJson, err := json.Marshal(streamOption)
79+
if err != nil {
80+
return nil, errutil.Error{Code: errutil.BadRequest, Msg: fmt.Sprintf("marshalling stream_options: %v", err)}
81+
}
82+
if err := json.Unmarshal(optionJson, &includeUsage); err != nil {
83+
return nil, errutil.Error{Code: errutil.Internal, Msg: fmt.Sprintf("unmarshaling stream_options: %v", err)}
84+
}
85+
usageEnabled, err := strconv.ParseBool(includeUsage.IncludeUsage)
86+
if err != nil {
87+
return nil, errutil.Error{Code: errutil.BadRequest, Msg: fmt.Sprintf("invalid include_usage: %v", includeUsage.IncludeUsage)}
88+
}
89+
90+
reqCtx.StreamingIncludeUsage = usageEnabled
91+
}
92+
6493
// NOTE: The nil checking for the modelObject means that we DO allow passthrough currently.
6594
// This might be a security risk in the future where adapters not registered in the InferenceModel
6695
// are able to be requested by using their distinct name.

pkg/epp/handlers/response.go

+58-13
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@ import (
2020
"context"
2121
"encoding/json"
2222
"fmt"
23+
"regexp"
24+
"strings"
2325

2426
configPb "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
2527
extProcPb "github.com/envoyproxy/go-control-plane/envoy/service/ext_proc/v3"
@@ -132,22 +134,65 @@ func (s *Server) HandleResponseBody(
132134
) (*extProcPb.ProcessingResponse, error) {
133135
logger := log.FromContext(ctx)
134136
loggerVerbose := logger.V(logutil.VERBOSE)
135-
loggerVerbose.Info("Processing HandleResponseBody")
136137
body := req.Request.(*extProcPb.ProcessingRequest_ResponseBody)
137138

138-
res := Response{}
139-
if err := json.Unmarshal(body.ResponseBody.Body, &res); err != nil {
140-
return nil, errutil.Error{Code: errutil.Internal, Msg: fmt.Sprintf("unmarshaling response body: %v", err)}
139+
if reqCtx.Streaming {
140+
logger.V(logutil.DEBUG).Info("Processing HandleResponseBody")
141+
142+
responseText := string(body.ResponseBody.Body)
143+
// Example message if "stream_options": {"include_usage": "true"} is included in the request:
144+
// data: {"id":"...","object":"text_completion","created":1739400043,"model":"tweet-summary-0","choices":[],
145+
// "usage":{"prompt_tokens":7,"total_tokens":17,"completion_tokens":10}}
146+
//
147+
// data: [DONE]
148+
// Noticed that vLLM returns two entries in one response.
149+
// We need to strip the `data:` prefix and next Data: [DONE] from the message to fetch response data.
150+
//
151+
// If include_usage is not included in the request, `data: [DONE]` is returned separately, which
152+
// indicates end of streaming.
153+
if strings.Contains(responseText, "data: [DONE]") {
154+
response := Response{}
155+
156+
if reqCtx.StreamingIncludeUsage {
157+
158+
re := regexp.MustCompile(`\{.*(?:\{.*\}|[^\{]*)\}`) // match for JSON object
159+
match := re.FindString(responseText)
160+
if match == "" {
161+
return nil, errutil.Error{Code: errutil.ModelServerError, Msg: fmt.Sprintf("model server returned invalid response: %v", responseText)}
162+
}
163+
byteSlice := []byte(match)
164+
if err := json.Unmarshal(byteSlice, &response); err != nil {
165+
return nil, errutil.Error{Code: errutil.Internal, Msg: fmt.Sprintf("unmarshaling response body: %v", err)}
166+
}
167+
} else {
168+
// ResponseBody.EndOfStream is only set if include_usage is set to true.
169+
reqCtx.ResponseComplete = true
170+
loggerVerbose.Info("Streaming is completed")
171+
}
172+
173+
reqCtx.Response = response
174+
}
175+
176+
if body.ResponseBody.EndOfStream {
177+
loggerVerbose.Info("Streaming is completed")
178+
reqCtx.ResponseComplete = true
179+
} else {
180+
reqCtx.ResponseSize += len(body.ResponseBody.Body)
181+
}
182+
183+
} else {
184+
loggerVerbose.Info("Processing HandleResponseBody")
185+
186+
res := Response{}
187+
if err := json.Unmarshal(body.ResponseBody.Body, &res); err != nil {
188+
return nil, errutil.Error{Code: errutil.Internal, Msg: fmt.Sprintf("unmarshaling response body: %v", err)}
189+
}
190+
reqCtx.Response = res
191+
reqCtx.ResponseSize = len(body.ResponseBody.Body)
192+
reqCtx.ResponseComplete = true
193+
194+
loggerVerbose.Info("Response generated", "response", res)
141195
}
142-
reqCtx.Response = res
143-
reqCtx.ResponseSize = len(body.ResponseBody.Body)
144-
// ResponseComplete is to indicate the response is complete. In non-streaming
145-
// case, it will be set to be true once the response is processed; in
146-
// streaming case, it will be set to be true once the last chunk is processed.
147-
// TODO(https://github.com/kubernetes-sigs/gateway-api-inference-extension/issues/178)
148-
// will add the processing for streaming case.
149-
reqCtx.ResponseComplete = true
150-
loggerVerbose.Info("Response generated", "response", res)
151196

152197
resp := &extProcPb.ProcessingResponse{
153198
Response: &extProcPb.ProcessingResponse_ResponseBody{

pkg/epp/handlers/response_test.go

+49-1
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,16 @@ const (
4949
}
5050
}
5151
`
52+
53+
streamingBodyWithoutUsage = `
54+
data: {"id":"cmpl-41764c93-f9d2-4f31-be08-3ba04fa25394","object":"text_completion","created":1740002445,"model":"tweet-summary-0","choices":[],"usage":null}
55+
`
56+
57+
streamingBodyWithUsage = `
58+
data: {"id":"cmpl-41764c93-f9d2-4f31-be08-3ba04fa25394","object":"text_completion","created":1740002445,"model":"tweet-summary-0","choices":[],"usage":{"prompt_tokens":7,"total_tokens":17,"completion_tokens":10}}
59+
60+
data: [DONE]
61+
`
5262
)
5363

5464
func TestHandleResponseBody(t *testing.T) {
@@ -57,6 +67,7 @@ func TestHandleResponseBody(t *testing.T) {
5767
tests := []struct {
5868
name string
5969
req *extProcPb.ProcessingRequest_ResponseBody
70+
reqCtx *RequestContext
6071
want Response
6172
wantErr bool
6273
}{
@@ -84,12 +95,49 @@ func TestHandleResponseBody(t *testing.T) {
8495
},
8596
wantErr: true,
8697
},
98+
{
99+
name: "streaming request without usage",
100+
req: &extProcPb.ProcessingRequest_ResponseBody{
101+
ResponseBody: &extProcPb.HttpBody{
102+
Body: []byte(streamingBodyWithoutUsage),
103+
},
104+
},
105+
reqCtx: &RequestContext{
106+
Streaming: true,
107+
StreamingIncludeUsage: false,
108+
},
109+
wantErr: false,
110+
// In the middle of streaming response, so request context response is not set yet.
111+
},
112+
{
113+
name: "streaming request with usage",
114+
req: &extProcPb.ProcessingRequest_ResponseBody{
115+
ResponseBody: &extProcPb.HttpBody{
116+
Body: []byte(streamingBodyWithUsage),
117+
},
118+
},
119+
reqCtx: &RequestContext{
120+
Streaming: true,
121+
StreamingIncludeUsage: true,
122+
},
123+
wantErr: false,
124+
want: Response{
125+
Usage: Usage{
126+
PromptTokens: 7,
127+
TotalTokens: 17,
128+
CompletionTokens: 10,
129+
},
130+
},
131+
},
87132
}
88133

89134
for _, test := range tests {
90135
t.Run(test.name, func(t *testing.T) {
91136
server := &Server{}
92-
reqCtx := &RequestContext{}
137+
reqCtx := test.reqCtx
138+
if reqCtx == nil {
139+
reqCtx = &RequestContext{}
140+
}
93141
_, err := server.HandleResponseBody(ctx, reqCtx, &extProcPb.ProcessingRequest{Request: test.req})
94142
if err != nil {
95143
if !test.wantErr {

pkg/epp/handlers/server.go

+12-2
Original file line numberDiff line numberDiff line change
@@ -121,7 +121,11 @@ func (s *Server) Process(srv extProcPb.ExternalProcessor_ProcessServer) error {
121121
metrics.RecordInputTokens(reqCtx.Model, reqCtx.ResolvedTargetModel, reqCtx.Response.Usage.PromptTokens)
122122
metrics.RecordOutputTokens(reqCtx.Model, reqCtx.ResolvedTargetModel, reqCtx.Response.Usage.CompletionTokens)
123123
}
124-
loggerVerbose.Info("Request context after HandleResponseBody", "context", reqCtx)
124+
if reqCtx.Streaming {
125+
logger.V(logutil.DEBUG).Info("Request context after HandleResponseBody", "context", reqCtx)
126+
} else {
127+
loggerVerbose.Info("Request context after HandleResponseBody", "context", reqCtx)
128+
}
125129
default:
126130
logger.V(logutil.DEFAULT).Error(nil, "Unknown Request type", "request", v)
127131
return status.Error(codes.Unknown, "unknown request type")
@@ -179,7 +183,11 @@ func (s *Server) Process(srv extProcPb.ExternalProcessor_ProcessServer) error {
179183
}
180184
}
181185

182-
loggerVerbose.Info("Response generated", "response", resp)
186+
if !reqCtx.Streaming {
187+
loggerVerbose.Info("Response generated", "response", resp)
188+
} else {
189+
logger.V(logutil.DEBUG).Info("Response generated", "response", resp)
190+
}
183191
if err := srv.Send(resp); err != nil {
184192
logger.V(logutil.DEFAULT).Error(err, "Send failed")
185193
return status.Errorf(codes.Unknown, "failed to send response back to Envoy: %v", err)
@@ -200,4 +208,6 @@ type RequestContext struct {
200208
ResponseSize int
201209
ResponseComplete bool
202210
ResponseStatusCode string
211+
Streaming bool
212+
StreamingIncludeUsage bool
203213
}

pkg/epp/metrics/README.md

+35-6
Original file line numberDiff line numberDiff line change
@@ -8,13 +8,29 @@ This documentation is the current state of exposed metrics.
88

99
## Requirements
1010

11-
Response metrics are only supported in non-streaming mode, with the follow up [issue](https://github.com/kubernetes-sigs/gateway-api-inference-extension/issues/178) to address streaming mode.
11+
For non-streaming request, enable `Buffered` for response in `EnvoyExtensionPolicy`:
1212

13-
Currently there are two options:
14-
- If requests don't use response streaming, then you can enable `Buffered` mode for response in `EnvoyExtensionPolicy`, this will buffer the response body at the proxy and forward it to the endpoint picker, which allows the endpoint picker to report response metrics.
15-
16-
- If requests use response streaming, then it is not recommended to enable `Buffered` mode, the response body processing mode should be left empty in the `EnvoyExtensionPolicy` (default). In this case response bodies will not be forwarded to the endpoint picker, and therefore response metrics will not be reported.
13+
```
14+
apiVersion: gateway.envoyproxy.io/v1alpha1
15+
kind: EnvoyExtensionPolicy
16+
metadata:
17+
name: ext-proc-policy
18+
namespace: default
19+
spec:
20+
extProc:
21+
- backendRefs:
22+
- group: ""
23+
kind: Service
24+
name: inference-gateway-ext-proc
25+
port: 9002
26+
processingMode:
27+
request:
28+
body: Buffered
29+
response:
30+
body: Buffered
31+
```
1732

33+
For streaming request, enable `Streamed` for response in `EnvoyExtensionPolicy`:
1834

1935
```
2036
apiVersion: gateway.envoyproxy.io/v1alpha1
@@ -33,7 +49,20 @@ spec:
3349
request:
3450
body: Buffered
3551
response:
36-
body: Buffered
52+
body: Streamed
53+
```
54+
55+
If you want to include usage metrics for vLLM model server, send the request with `include_usage`:
56+
57+
```
58+
curl -i ${IP}:${PORT}/v1/completions -H 'Content-Type: application/json' -d '{
59+
"model": "tweet-summary",
60+
"prompt": "whats your fav movie?",
61+
"max_tokens": 10,
62+
"temperature": 0,
63+
"stream": true,
64+
"stream_options": {"include_usage": "true"}
65+
}'
3766
```
3867

3968
## Exposed metrics

0 commit comments

Comments
 (0)