Skip to content

Commit ed0c231

Browse files
committed
[Metrics] Add streaming support for metrics
Address #178
1 parent 406ffee commit ed0c231

File tree

5 files changed

+186
-22
lines changed

5 files changed

+186
-22
lines changed

pkg/epp/handlers/request.go

+32
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,38 @@ func (s *Server) HandleRequestBody(
6161
loggerVerbose.Info("Model requested", "model", model)
6262
modelName := model
6363

64+
// Resolve streaming options
65+
66+
streaming, ok := rb["stream"].(bool)
67+
if !ok {
68+
// streaming not set, no-op
69+
} else {
70+
reqCtx.Streaming = streaming
71+
}
72+
73+
if reqCtx.Streaming {
74+
type Usage struct {
75+
IncludeUsage string `json:"include_usage,omitempty"`
76+
}
77+
if streamOption, ok := rb["stream_options"]; ok {
78+
includeUsage := Usage{}
79+
optionJson, err := json.Marshal(streamOption)
80+
if err != nil {
81+
return nil, errutil.Error{Code: errutil.BadRequest, Msg: fmt.Sprintf("marshalling stream_options: %v", err)}
82+
}
83+
if err := json.Unmarshal(optionJson, &includeUsage); err != nil {
84+
return nil, errutil.Error{Code: errutil.Internal, Msg: fmt.Sprintf("unmarshaling stream_options: %v", err)}
85+
}
86+
usageEnabled, err := strconv.ParseBool(includeUsage.IncludeUsage)
87+
if err != nil {
88+
return nil, errutil.Error{Code: errutil.BadRequest, Msg: fmt.Sprintf("invalid include_usage: %v", includeUsage.IncludeUsage)}
89+
}
90+
91+
reqCtx.StreamingIncludeUsage = usageEnabled
92+
}
93+
94+
}
95+
6496
// NOTE: The nil checking for the modelObject means that we DO allow passthrough currently.
6597
// This might be a security risk in the future where adapters not registered in the InferenceModel
6698
// are able to be requested by using their distinct name.

pkg/epp/handlers/response.go

+58-13
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@ import (
2020
"context"
2121
"encoding/json"
2222
"fmt"
23+
"regexp"
24+
"strings"
2325

2426
configPb "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
2527
extProcPb "github.com/envoyproxy/go-control-plane/envoy/service/ext_proc/v3"
@@ -132,22 +134,65 @@ func (s *Server) HandleResponseBody(
132134
) (*extProcPb.ProcessingResponse, error) {
133135
logger := log.FromContext(ctx)
134136
loggerVerbose := logger.V(logutil.VERBOSE)
135-
loggerVerbose.Info("Processing HandleResponseBody")
136137
body := req.Request.(*extProcPb.ProcessingRequest_ResponseBody)
137138

138-
res := Response{}
139-
if err := json.Unmarshal(body.ResponseBody.Body, &res); err != nil {
140-
return nil, errutil.Error{Code: errutil.Internal, Msg: fmt.Sprintf("unmarshaling response body: %v", err)}
139+
if reqCtx.Streaming {
140+
logger.V(logutil.DEBUG).Info("Processing HandleResponseBody")
141+
142+
responseText := string(body.ResponseBody.Body)
143+
// Example message if "stream_options": {"include_usage": "true"} is included in the request:
144+
// data: {"id":"...","object":"text_completion","created":1739400043,"model":"tweet-summary-0","choices":[],
145+
// "usage":{"prompt_tokens":7,"total_tokens":17,"completion_tokens":10}}
146+
//
147+
// data: [DONE]
148+
// Noticed that vLLM returns two entries in one response.
149+
// We need to strip the `data:` prefix and next Data: [DONE] from the message to fetch response data.
150+
//
151+
// If include_usage is not included in the request, `data: [DONE]` is returned separately, which
152+
// indicates end of streaming.
153+
if strings.Contains(responseText, "data: [DONE]") {
154+
response := Response{}
155+
156+
if reqCtx.StreamingIncludeUsage {
157+
158+
re := regexp.MustCompile(`\{.*(?:\{.*\}|[^\{]*)\}`) // match for JSON object
159+
match := re.FindString(responseText)
160+
if match == "" {
161+
return nil, errutil.Error{Code: errutil.ModelServerError, Msg: fmt.Sprintf("model server returned invalid response: %v", responseText)}
162+
}
163+
byteSlice := []byte(match)
164+
if err := json.Unmarshal(byteSlice, &response); err != nil {
165+
return nil, errutil.Error{Code: errutil.Internal, Msg: fmt.Sprintf("unmarshaling response body: %v", err)}
166+
}
167+
} else {
168+
// ResponseBody.EndOfStream is only set if include_usage is set to true.
169+
reqCtx.ResponseComplete = true
170+
loggerVerbose.Info("Streaming is completed")
171+
}
172+
173+
reqCtx.Response = response
174+
}
175+
176+
if body.ResponseBody.EndOfStream {
177+
loggerVerbose.Info("Streaming is completed")
178+
reqCtx.ResponseComplete = true
179+
} else {
180+
reqCtx.ResponseSize += len(body.ResponseBody.Body)
181+
}
182+
183+
} else {
184+
loggerVerbose.Info("Processing HandleResponseBody")
185+
186+
res := Response{}
187+
if err := json.Unmarshal(body.ResponseBody.Body, &res); err != nil {
188+
return nil, errutil.Error{Code: errutil.Internal, Msg: fmt.Sprintf("unmarshaling response body: %v", err)}
189+
}
190+
reqCtx.Response = res
191+
reqCtx.ResponseSize = len(body.ResponseBody.Body)
192+
reqCtx.ResponseComplete = true
193+
194+
loggerVerbose.Info("Response generated", "response", res)
141195
}
142-
reqCtx.Response = res
143-
reqCtx.ResponseSize = len(body.ResponseBody.Body)
144-
// ResponseComplete is to indicate the response is complete. In non-streaming
145-
// case, it will be set to be true once the response is processed; in
146-
// streaming case, it will be set to be true once the last chunk is processed.
147-
// TODO(https://github.com/kubernetes-sigs/gateway-api-inference-extension/issues/178)
148-
// will add the processing for streaming case.
149-
reqCtx.ResponseComplete = true
150-
loggerVerbose.Info("Response generated", "response", res)
151196

152197
resp := &extProcPb.ProcessingResponse{
153198
Response: &extProcPb.ProcessingResponse_ResponseBody{

pkg/epp/handlers/response_test.go

+49-1
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,16 @@ const (
4949
}
5050
}
5151
`
52+
53+
streamingBodyWithoutUsage = `
54+
data: {"id":"cmpl-41764c93-f9d2-4f31-be08-3ba04fa25394","object":"text_completion","created":1740002445,"model":"tweet-summary-0","choices":[],"usage":null}
55+
`
56+
57+
streamingBodyWithUsage = `
58+
data: {"id":"cmpl-41764c93-f9d2-4f31-be08-3ba04fa25394","object":"text_completion","created":1740002445,"model":"tweet-summary-0","choices":[],"usage":{"prompt_tokens":7,"total_tokens":17,"completion_tokens":10}}
59+
60+
data: [DONE]
61+
`
5262
)
5363

5464
func TestHandleResponseBody(t *testing.T) {
@@ -57,6 +67,7 @@ func TestHandleResponseBody(t *testing.T) {
5767
tests := []struct {
5868
name string
5969
req *extProcPb.ProcessingRequest_ResponseBody
70+
reqCtx *RequestContext
6071
want Response
6172
wantErr bool
6273
}{
@@ -84,12 +95,49 @@ func TestHandleResponseBody(t *testing.T) {
8495
},
8596
wantErr: true,
8697
},
98+
{
99+
name: "streaming request without usage",
100+
req: &extProcPb.ProcessingRequest_ResponseBody{
101+
ResponseBody: &extProcPb.HttpBody{
102+
Body: []byte(streamingBodyWithoutUsage),
103+
},
104+
},
105+
reqCtx: &RequestContext{
106+
Streaming: true,
107+
StreamingIncludeUsage: false,
108+
},
109+
wantErr: false,
110+
// In the middle of streaming response, so request context response is not set yet.
111+
},
112+
{
113+
name: "streaming request with usage",
114+
req: &extProcPb.ProcessingRequest_ResponseBody{
115+
ResponseBody: &extProcPb.HttpBody{
116+
Body: []byte(streamingBodyWithUsage),
117+
},
118+
},
119+
reqCtx: &RequestContext{
120+
Streaming: true,
121+
StreamingIncludeUsage: true,
122+
},
123+
wantErr: false,
124+
want: Response{
125+
Usage: Usage{
126+
PromptTokens: 7,
127+
TotalTokens: 17,
128+
CompletionTokens: 10,
129+
},
130+
},
131+
},
87132
}
88133

89134
for _, test := range tests {
90135
t.Run(test.name, func(t *testing.T) {
91136
server := &Server{}
92-
reqCtx := &RequestContext{}
137+
reqCtx := test.reqCtx
138+
if reqCtx == nil {
139+
reqCtx = &RequestContext{}
140+
}
93141
_, err := server.HandleResponseBody(ctx, reqCtx, &extProcPb.ProcessingRequest{Request: test.req})
94142
if err != nil {
95143
if !test.wantErr {

pkg/epp/handlers/server.go

+12-2
Original file line numberDiff line numberDiff line change
@@ -124,7 +124,11 @@ func (s *Server) Process(srv extProcPb.ExternalProcessor_ProcessServer) error {
124124
metrics.RecordInputTokens(reqCtx.Model, reqCtx.ResolvedTargetModel, reqCtx.Response.Usage.PromptTokens)
125125
metrics.RecordOutputTokens(reqCtx.Model, reqCtx.ResolvedTargetModel, reqCtx.Response.Usage.CompletionTokens)
126126
}
127-
loggerVerbose.Info("Request context after HandleResponseBody", "context", reqCtx)
127+
if reqCtx.Streaming {
128+
logger.V(logutil.DEBUG).Info("Request context after HandleResponseBody", "context", reqCtx)
129+
} else {
130+
loggerVerbose.Info("Request context after HandleResponseBody", "context", reqCtx)
131+
}
128132
default:
129133
logger.V(logutil.DEFAULT).Error(nil, "Unknown Request type", "request", v)
130134
return status.Error(codes.Unknown, "unknown request type")
@@ -182,7 +186,11 @@ func (s *Server) Process(srv extProcPb.ExternalProcessor_ProcessServer) error {
182186
}
183187
}
184188

185-
loggerVerbose.Info("Response generated", "response", resp)
189+
if !reqCtx.Streaming {
190+
loggerVerbose.Info("Response generated", "response", resp)
191+
} else {
192+
logger.V(logutil.DEBUG).Info("Response generated", "response", resp)
193+
}
186194
if err := srv.Send(resp); err != nil {
187195
logger.V(logutil.DEFAULT).Error(err, "Send failed")
188196
return status.Errorf(codes.Unknown, "failed to send response back to Envoy: %v", err)
@@ -203,4 +211,6 @@ type RequestContext struct {
203211
ResponseSize int
204212
ResponseComplete bool
205213
ResponseStatusCode string
214+
Streaming bool
215+
StreamingIncludeUsage bool
206216
}

site-src/guides/metrics.md

+35-6
Original file line numberDiff line numberDiff line change
@@ -4,13 +4,29 @@ This guide describes the current state of exposed metrics and how to scrape them
44

55
## Requirements
66

7-
Response metrics are only supported in non-streaming mode, with the follow up [issue](https://github.com/kubernetes-sigs/gateway-api-inference-extension/issues/178) to address streaming mode.
7+
For non-streaming request, enable `Buffered` for response in `EnvoyExtensionPolicy`:
88

9-
Currently there are two options:
10-
- If requests don't use response streaming, then you can enable `Buffered` mode for response in `EnvoyExtensionPolicy`, this will buffer the response body at the proxy and forward it to the endpoint picker, which allows the endpoint picker to report response metrics.
11-
12-
- If requests use response streaming, then it is not recommended to enable `Buffered` mode, the response body processing mode should be left empty in the `EnvoyExtensionPolicy` (default). In this case response bodies will not be forwarded to the endpoint picker, and therefore response metrics will not be reported.
9+
```
10+
apiVersion: gateway.envoyproxy.io/v1alpha1
11+
kind: EnvoyExtensionPolicy
12+
metadata:
13+
name: ext-proc-policy
14+
namespace: default
15+
spec:
16+
extProc:
17+
- backendRefs:
18+
- group: ""
19+
kind: Service
20+
name: inference-gateway-ext-proc
21+
port: 9002
22+
processingMode:
23+
request:
24+
body: Buffered
25+
response:
26+
body: Buffered
27+
```
1328

29+
For streaming request, enable `Streamed` for response in `EnvoyExtensionPolicy`:
1430

1531
```
1632
apiVersion: gateway.envoyproxy.io/v1alpha1
@@ -29,7 +45,20 @@ spec:
2945
request:
3046
body: Buffered
3147
response:
32-
body: Buffered
48+
body: Streamed
49+
```
50+
51+
If you want to include usage metrics for vLLM model server, send the request with `include_usage`:
52+
53+
```
54+
curl -i ${IP}:${PORT}/v1/completions -H 'Content-Type: application/json' -d '{
55+
"model": "tweet-summary",
56+
"prompt": "whats your fav movie?",
57+
"max_tokens": 10,
58+
"temperature": 0,
59+
"stream": true,
60+
"stream_options": {"include_usage": "true"}
61+
}'
3362
```
3463

3564
## Exposed metrics

0 commit comments

Comments
 (0)