Skip to content

Commit 357d7c8

Browse files
committed
[Metrics] Add streaming support for metrics
Address #178
1 parent 6130ee0 commit 357d7c8

File tree

6 files changed

+205
-22
lines changed

6 files changed

+205
-22
lines changed

config/manifests/gateway/extension_policy.yaml

+1
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ spec:
1414
request:
1515
body: Buffered
1616
response:
17+
body: Streamed
1718
# The timeouts are likely not needed here. We can experiment with removing/tuning them slowly.
1819
# The connection limits are more important and will cause the opaque: ext_proc_gRPC_error_14 error in Envoy GW if not configured correctly.
1920
messageTimeout: 1000s

pkg/ext-proc/handlers/request.go

+29
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,35 @@ func (s *Server) HandleRequestBody(
6161
loggerVerbose.Info("Model requested", "model", model)
6262
modelName := model
6363

64+
// Resolve streaming options
65+
66+
streaming, ok := rb["stream"].(bool)
67+
if !ok {
68+
// streaming not set, no-op
69+
} else {
70+
reqCtx.Streaming = streaming
71+
}
72+
73+
type Usage struct {
74+
IncludeUsage string `json:"include_usage,omitempty"`
75+
}
76+
if streamOption, ok := rb["stream_options"]; ok {
77+
includeUsage := Usage{}
78+
optionJson, err := json.Marshal(streamOption)
79+
if err != nil {
80+
return nil, errutil.Error{Code: errutil.BadRequest, Msg: fmt.Sprintf("marshalling stream_options: %v", err)}
81+
}
82+
if err := json.Unmarshal(optionJson, &includeUsage); err != nil {
83+
return nil, errutil.Error{Code: errutil.Internal, Msg: fmt.Sprintf("unmarshaling stream_options: %v", err)}
84+
}
85+
usageEnabled, err := strconv.ParseBool(includeUsage.IncludeUsage)
86+
if err != nil {
87+
return nil, errutil.Error{Code: errutil.BadRequest, Msg: fmt.Sprintf("invalid include_usage: %v", includeUsage.IncludeUsage)}
88+
}
89+
90+
reqCtx.StreamingIncludeUsage = usageEnabled
91+
}
92+
6493
// NOTE: The nil checking for the modelObject means that we DO allow passthrough currently.
6594
// This might be a security risk in the future where adapters not registered in the InferenceModel
6695
// are able to be requested by using their distinct name.

pkg/ext-proc/handlers/response.go

+62-13
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@ import (
2020
"context"
2121
"encoding/json"
2222
"fmt"
23+
"regexp"
24+
"strings"
2325

2426
configPb "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
2527
extProcPb "github.com/envoyproxy/go-control-plane/envoy/service/ext_proc/v3"
@@ -76,6 +78,10 @@ func (s *Server) HandleResponseHeaders(
7678
}
7779
}
7880

81+
if h.ResponseHeaders.EndOfStream {
82+
reqCtx.StreamingCompleted = true
83+
loggerVerbose.Info("Header indicates streaming complete")
84+
}
7985
resp := &extProcPb.ProcessingResponse{
8086
Response: &extProcPb.ProcessingResponse_ResponseHeaders{
8187
ResponseHeaders: &extProcPb.HeadersResponse{
@@ -132,22 +138,65 @@ func (s *Server) HandleResponseBody(
132138
) (*extProcPb.ProcessingResponse, error) {
133139
logger := log.FromContext(ctx)
134140
loggerVerbose := logger.V(logutil.VERBOSE)
135-
loggerVerbose.Info("Processing HandleResponseBody")
136141
body := req.Request.(*extProcPb.ProcessingRequest_ResponseBody)
137142

138-
res := Response{}
139-
if err := json.Unmarshal(body.ResponseBody.Body, &res); err != nil {
140-
return nil, errutil.Error{Code: errutil.Internal, Msg: fmt.Sprintf("unmarshaling response body: %v", err)}
143+
if reqCtx.Streaming {
144+
logger.V(logutil.DEBUG).Info("Processing HandleResponseBody")
145+
146+
responseText := string(body.ResponseBody.Body)
147+
// Example message if "stream_options": {"include_usage": "true"} is included in the request:
148+
// data: {"id":"...","object":"text_completion","created":1739400043,"model":"tweet-summary-0","choices":[],
149+
// "usage":{"prompt_tokens":7,"total_tokens":17,"completion_tokens":10}}
150+
//
151+
// data: [DONE]
152+
// Noticed that vLLM returns two entries in one response.
153+
// We need to strip the `data:` prefix and next Data: [DONE] from the message to fetch response data.
154+
//
155+
// If include_usage is not included in the request, `data: [DONE]` is returned separately, which
156+
// indicates end of streaming.
157+
if strings.Contains(responseText, "data: [DONE]") {
158+
response := Response{}
159+
160+
if reqCtx.StreamingIncludeUsage {
161+
162+
re := regexp.MustCompile(`\{.*(?:\{.*\}|[^\{]*)\}`) // match for JSON object
163+
match := re.FindString(responseText)
164+
if match == "" {
165+
return nil, errutil.Error{Code: errutil.ModelServerError, Msg: fmt.Sprintf("model server returned invalid response: %v", responseText)}
166+
}
167+
byteSlice := []byte(match)
168+
if err := json.Unmarshal(byteSlice, &response); err != nil {
169+
return nil, errutil.Error{Code: errutil.Internal, Msg: fmt.Sprintf("unmarshaling response body: %v", err)}
170+
}
171+
} else {
172+
// ResponseBody.EndOfStream is only set if include_usage is set to true.
173+
reqCtx.ResponseComplete = true
174+
loggerVerbose.Info("Streaming is completed")
175+
}
176+
177+
reqCtx.Response = response
178+
}
179+
180+
if body.ResponseBody.EndOfStream {
181+
loggerVerbose.Info("Streaming is completed")
182+
reqCtx.ResponseComplete = true
183+
} else {
184+
reqCtx.ResponseSize += len(body.ResponseBody.Body)
185+
}
186+
187+
} else {
188+
loggerVerbose.Info("Processing HandleResponseBody")
189+
190+
res := Response{}
191+
if err := json.Unmarshal(body.ResponseBody.Body, &res); err != nil {
192+
return nil, errutil.Error{Code: errutil.Internal, Msg: fmt.Sprintf("unmarshaling response body: %v", err)}
193+
}
194+
reqCtx.Response = res
195+
reqCtx.ResponseSize = len(body.ResponseBody.Body)
196+
reqCtx.ResponseComplete = true
197+
198+
loggerVerbose.Info("Response generated", "response", res)
141199
}
142-
reqCtx.Response = res
143-
reqCtx.ResponseSize = len(body.ResponseBody.Body)
144-
// ResponseComplete is to indicate the response is complete. In non-streaming
145-
// case, it will be set to be true once the response is processed; in
146-
// streaming case, it will be set to be true once the last chunk is processed.
147-
// TODO(https://github.com/kubernetes-sigs/gateway-api-inference-extension/issues/178)
148-
// will add the processing for streaming case.
149-
reqCtx.ResponseComplete = true
150-
loggerVerbose.Info("Response generated", "response", res)
151200

152201
resp := &extProcPb.ProcessingResponse{
153202
Response: &extProcPb.ProcessingResponse_ResponseBody{

pkg/ext-proc/handlers/response_test.go

+65-1
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,32 @@ const (
4949
}
5050
}
5151
`
52+
53+
streamingBodyWithoutUsage = `
54+
data: {
55+
"id": "cmpl-573498d260f2423f9e42817bbba3743a",
56+
"object": "text_completion",
57+
"created": 1732563765,
58+
"model": "meta-llama/Llama-2-7b-hf",
59+
"choices": [
60+
{
61+
"index": 0,
62+
"text": " world",
63+
"logprobs": null,
64+
"finish_reason": "length",
65+
"stop_reason": null,
66+
"prompt_logprobs": null
67+
}
68+
],
69+
"usage":null
70+
}
71+
`
72+
73+
streamingBodyWithUsage = `
74+
data: {"id":"cmpl-41764c93-f9d2-4f31-be08-3ba04fa25394","object":"text_completion","created":1740002445,"model":"tweet-summary-0","choices":[],"usage":{"prompt_tokens":7,"total_tokens":17,"completion_tokens":10}}
75+
76+
data: [DONE]
77+
`
5278
)
5379

5480
func TestHandleResponseBody(t *testing.T) {
@@ -57,6 +83,7 @@ func TestHandleResponseBody(t *testing.T) {
5783
tests := []struct {
5884
name string
5985
req *extProcPb.ProcessingRequest_ResponseBody
86+
reqCtx *RequestContext
6087
want Response
6188
wantErr bool
6289
}{
@@ -84,12 +111,49 @@ func TestHandleResponseBody(t *testing.T) {
84111
},
85112
wantErr: true,
86113
},
114+
{
115+
name: "streaming request without usage",
116+
req: &extProcPb.ProcessingRequest_ResponseBody{
117+
ResponseBody: &extProcPb.HttpBody{
118+
Body: []byte(streamingBodyWithoutUsage),
119+
},
120+
},
121+
reqCtx: &RequestContext{
122+
Streaming: true,
123+
StreamingIncludeUsage: false,
124+
},
125+
wantErr: false,
126+
// In the middle of streaming response, so request context reponse is not set yet.
127+
},
128+
{
129+
name: "streaming request with usage",
130+
req: &extProcPb.ProcessingRequest_ResponseBody{
131+
ResponseBody: &extProcPb.HttpBody{
132+
Body: []byte(streamingBodyWithUsage),
133+
},
134+
},
135+
reqCtx: &RequestContext{
136+
Streaming: true,
137+
StreamingIncludeUsage: true,
138+
},
139+
wantErr: false,
140+
want: Response{
141+
Usage: Usage{
142+
PromptTokens: 7,
143+
TotalTokens: 17,
144+
CompletionTokens: 10,
145+
},
146+
},
147+
},
87148
}
88149

89150
for _, test := range tests {
90151
t.Run(test.name, func(t *testing.T) {
91152
server := &Server{}
92-
reqCtx := &RequestContext{}
153+
reqCtx := test.reqCtx
154+
if reqCtx == nil {
155+
reqCtx = &RequestContext{}
156+
}
93157
_, err := server.HandleResponseBody(ctx, reqCtx, &extProcPb.ProcessingRequest{Request: test.req})
94158
if err != nil {
95159
if !test.wantErr {

pkg/ext-proc/handlers/server.go

+13-2
Original file line numberDiff line numberDiff line change
@@ -121,7 +121,11 @@ func (s *Server) Process(srv extProcPb.ExternalProcessor_ProcessServer) error {
121121
metrics.RecordInputTokens(reqCtx.Model, reqCtx.ResolvedTargetModel, reqCtx.Response.Usage.PromptTokens)
122122
metrics.RecordOutputTokens(reqCtx.Model, reqCtx.ResolvedTargetModel, reqCtx.Response.Usage.CompletionTokens)
123123
}
124-
loggerVerbose.Info("Request context after HandleResponseBody", "context", reqCtx)
124+
if reqCtx.Streaming {
125+
logger.V(logutil.DEBUG).Info("Request context after HandleResponseBody", "context", reqCtx)
126+
} else {
127+
loggerVerbose.Info("Request context after HandleResponseBody", "context", reqCtx)
128+
}
125129
default:
126130
logger.V(logutil.DEFAULT).Error(nil, "Unknown Request type", "request", v)
127131
return status.Error(codes.Unknown, "unknown request type")
@@ -179,7 +183,11 @@ func (s *Server) Process(srv extProcPb.ExternalProcessor_ProcessServer) error {
179183
}
180184
}
181185

182-
loggerVerbose.Info("Response generated", "response", resp)
186+
if !reqCtx.Streaming {
187+
loggerVerbose.Info("Response generated", "response", resp)
188+
} else {
189+
logger.V(logutil.DEBUG).Info("Response generated", "response", resp)
190+
}
183191
if err := srv.Send(resp); err != nil {
184192
logger.V(logutil.DEFAULT).Error(err, "Send failed")
185193
return status.Errorf(codes.Unknown, "failed to send response back to Envoy: %v", err)
@@ -201,4 +209,7 @@ type RequestContext struct {
201209
ResponseSize int
202210
ResponseComplete bool
203211
ResponseStatusCode string
212+
Streaming bool
213+
StreamingCompleted bool
214+
StreamingIncludeUsage bool
204215
}

pkg/ext-proc/metrics/README.md

+35-6
Original file line numberDiff line numberDiff line change
@@ -8,13 +8,29 @@ This documentation is the current state of exposed metrics.
88

99
## Requirements
1010

11-
Response metrics are only supported in non-streaming mode, with the follow up [issue](https://github.com/kubernetes-sigs/gateway-api-inference-extension/issues/178) to address streaming mode.
11+
For non-streaming request, enable `Buffered` for response in `EnvoyExtensionPolicy`:
1212

13-
Currently there are two options:
14-
- If requests don't use response streaming, then you can enable `Buffered` mode for response in `EnvoyExtensionPolicy`, this will buffer the response body at the proxy and forward it to the endpoint picker, which allows the endpoint picker to report response metrics.
15-
16-
- If requests use response streaming, then it is not recommended to enable `Buffered` mode, the response body processing mode should be left empty in the `EnvoyExtensionPolicy` (default). In this case response bodies will not be forwarded to the endpoint picker, and therefore response metrics will not be reported.
13+
```
14+
apiVersion: gateway.envoyproxy.io/v1alpha1
15+
kind: EnvoyExtensionPolicy
16+
metadata:
17+
name: ext-proc-policy
18+
namespace: default
19+
spec:
20+
extProc:
21+
- backendRefs:
22+
- group: ""
23+
kind: Service
24+
name: inference-gateway-ext-proc
25+
port: 9002
26+
processingMode:
27+
request:
28+
body: Buffered
29+
response:
30+
body: Buffered
31+
```
1732

33+
For streaming request, enable `Streamed` for response in `EnvoyExtensionPolicy`:
1834

1935
```
2036
apiVersion: gateway.envoyproxy.io/v1alpha1
@@ -33,7 +49,20 @@ spec:
3349
request:
3450
body: Buffered
3551
response:
36-
body: Buffered
52+
body: Streamed
53+
```
54+
55+
If you want to include usage metrics for vLLM model server, send the request with `include_usage`:
56+
57+
```
58+
curl -i ${IP}:${PORT}/v1/completions -H 'Content-Type: application/json' -d '{
59+
"model": "tweet-summary",
60+
"prompt": "whats your fav movie?",
61+
"max_tokens": 10,
62+
"temperature": 0,
63+
"stream": true,
64+
"stream_options": {"include_usage": "true"}
65+
}'
3766
```
3867

3968
## Exposed metrics

0 commit comments

Comments
 (0)