Skip to content

Commit b82103b

Browse files
committed
[Metrics] Add streaming support for metrics
Address #178
1 parent 9079982 commit b82103b

File tree

5 files changed

+219
-22
lines changed

5 files changed

+219
-22
lines changed

pkg/epp/handlers/request.go

+32
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,38 @@ func (s *Server) HandleRequestBody(
6161
loggerVerbose.Info("Model requested", "model", model)
6262
modelName := model
6363

64+
// Resolve streaming options
65+
66+
streaming, ok := rb["stream"].(bool)
67+
if !ok {
68+
// streaming not set, no-op
69+
} else {
70+
reqCtx.Streaming = streaming
71+
}
72+
73+
if reqCtx.Streaming {
74+
type Usage struct {
75+
IncludeUsage string `json:"include_usage,omitempty"`
76+
}
77+
if streamOption, ok := rb["stream_options"]; ok {
78+
includeUsage := Usage{}
79+
80+
// Parsing `stream_options` won't reject the request.
81+
optionJson, err := json.Marshal(streamOption)
82+
if err != nil {
83+
logger.V(logutil.DEFAULT).Error(err, "Error unmarshaling stream_options")
84+
}
85+
if err := json.Unmarshal(optionJson, &includeUsage); err != nil {
86+
logger.V(logutil.DEFAULT).Error(err, "Error unmarshaling stream_options")
87+
}
88+
if usageEnabled, err := strconv.ParseBool(includeUsage.IncludeUsage); err != nil {
89+
logger.V(logutil.DEFAULT).Error(err, "Error fetching include_usage")
90+
} else {
91+
reqCtx.StreamingIncludeUsage = usageEnabled
92+
}
93+
}
94+
}
95+
6496
// NOTE: The nil checking for the modelObject means that we DO allow passthrough currently.
6597
// This might be a security risk in the future where adapters not registered in the InferenceModel
6698
// are able to be requested by using their distinct name.

pkg/epp/handlers/response.go

+94-13
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,11 @@ import (
2020
"context"
2121
"encoding/json"
2222
"fmt"
23+
"strings"
2324

2425
configPb "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
2526
extProcPb "github.com/envoyproxy/go-control-plane/envoy/service/ext_proc/v3"
27+
"github.com/go-logr/logr"
2628
"sigs.k8s.io/controller-runtime/pkg/log"
2729
errutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/error"
2830
logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/logging"
@@ -67,11 +69,25 @@ func (s *Server) HandleResponseHeaders(
6769
// }
6870
// }
6971
for _, header := range h.ResponseHeaders.Headers.GetHeaders() {
72+
var statusFound, typeFound bool
7073
if header.Key == "status" {
7174
code := header.RawValue[0]
7275
if string(code) != "200" {
7376
reqCtx.ResponseStatusCode = errutil.ModelServerError
77+
statusFound = true
7478
}
79+
}
80+
if header.Key == "content-type" {
81+
contentType := header.RawValue
82+
if strings.Contains(string(contentType), "text/event-stream") {
83+
reqCtx.Streaming = true
84+
} else {
85+
reqCtx.Streaming = false
86+
}
87+
typeFound = true
88+
}
89+
90+
if statusFound && typeFound {
7591
break
7692
}
7793
}
@@ -132,22 +148,19 @@ func (s *Server) HandleResponseBody(
132148
) (*extProcPb.ProcessingResponse, error) {
133149
logger := log.FromContext(ctx)
134150
loggerVerbose := logger.V(logutil.VERBOSE)
135-
loggerVerbose.Info("Processing HandleResponseBody")
136151
body := req.Request.(*extProcPb.ProcessingRequest_ResponseBody)
137152

138-
res := Response{}
139-
if err := json.Unmarshal(body.ResponseBody.Body, &res); err != nil {
140-
return nil, errutil.Error{Code: errutil.Internal, Msg: fmt.Sprintf("unmarshaling response body: %v", err)}
153+
if reqCtx.Streaming {
154+
logger.V(logutil.DEBUG).Info("Processing HandleResponseBody")
155+
if err := s.HandleStreaming(ctx, reqCtx, body, loggerVerbose); err != nil {
156+
return nil, err
157+
}
158+
} else {
159+
loggerVerbose.Info("Processing HandleResponseBody")
160+
if err := s.HandleNonStreaming(ctx, reqCtx, body, loggerVerbose); err != nil {
161+
return nil, err
162+
}
141163
}
142-
reqCtx.Response = res
143-
reqCtx.ResponseSize = len(body.ResponseBody.Body)
144-
// ResponseComplete is to indicate the response is complete. In non-streaming
145-
// case, it will be set to be true once the response is processed; in
146-
// streaming case, it will be set to be true once the last chunk is processed.
147-
// TODO(https://github.com/kubernetes-sigs/gateway-api-inference-extension/issues/178)
148-
// will add the processing for streaming case.
149-
reqCtx.ResponseComplete = true
150-
loggerVerbose.Info("Response generated", "response", res)
151164

152165
resp := &extProcPb.ProcessingResponse{
153166
Response: &extProcPb.ProcessingResponse_ResponseBody{
@@ -159,6 +172,74 @@ func (s *Server) HandleResponseBody(
159172
return resp, nil
160173
}
161174

175+
func (s *Server) HandleNonStreaming(
176+
ctx context.Context,
177+
reqCtx *RequestContext,
178+
body *extProcPb.ProcessingRequest_ResponseBody,
179+
loggerVerbose logr.Logger,
180+
) error {
181+
loggerVerbose.Info("Processing HandleResponseBody")
182+
183+
res := Response{}
184+
if err := json.Unmarshal(body.ResponseBody.Body, &res); err != nil {
185+
return errutil.Error{Code: errutil.Internal, Msg: fmt.Sprintf("unmarshaling response body: %v", err)}
186+
}
187+
reqCtx.Response = res
188+
reqCtx.ResponseSize = len(body.ResponseBody.Body)
189+
reqCtx.ResponseComplete = true
190+
loggerVerbose.Info("Response generated", "response", res)
191+
return nil
192+
}
193+
194+
func (s *Server) HandleStreaming(
195+
ctx context.Context,
196+
reqCtx *RequestContext,
197+
body *extProcPb.ProcessingRequest_ResponseBody,
198+
loggerVerbose logr.Logger,
199+
) error {
200+
responseText := string(body.ResponseBody.Body)
201+
// Example message if "stream_options": {"include_usage": "true"} is included in the request:
202+
// data: {"id":"...","object":"text_completion","created":1739400043,"model":"tweet-summary-0","choices":[],
203+
// "usage":{"prompt_tokens":7,"total_tokens":17,"completion_tokens":10}}
204+
//
205+
// data: [DONE]
206+
//
207+
// Noticed that vLLM returns two entries in one response.
208+
// We need to strip the `data:` prefix and next Data: [DONE] from the message to fetch response data.
209+
//
210+
// If include_usage is not included in the request, `data: [DONE]` is returned separately, which
211+
// indicates end of streaming.
212+
if strings.Contains(responseText, "data: [DONE]") {
213+
response := Response{}
214+
215+
if reqCtx.StreamingIncludeUsage {
216+
lines := strings.Split(responseText, "\n")
217+
if len(lines) < 2 {
218+
loggerVerbose.Info("model server returns invalid response", "message", responseText)
219+
return nil
220+
}
221+
222+
content := strings.TrimPrefix(lines[0], "data: ")
223+
byteSlice := []byte(content)
224+
if err := json.Unmarshal(byteSlice, &response); err != nil {
225+
loggerVerbose.Error(err, "unmarshaling response body")
226+
return nil
227+
}
228+
}
229+
230+
reqCtx.Response = response
231+
}
232+
233+
if body.ResponseBody.EndOfStream {
234+
loggerVerbose.Info("Streaming is completed")
235+
reqCtx.ResponseComplete = true
236+
} else {
237+
reqCtx.ResponseSize += len(body.ResponseBody.Body)
238+
}
239+
240+
return nil
241+
}
242+
162243
type Response struct {
163244
Usage Usage `json:"usage"`
164245
}

pkg/epp/handlers/response_test.go

+46-1
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,13 @@ const (
4949
}
5050
}
5151
`
52+
53+
streamingBodyWithoutUsage = `data: {"id":"cmpl-41764c93-f9d2-4f31-be08-3ba04fa25394","object":"text_completion","created":1740002445,"model":"tweet-summary-0","choices":[],"usage":null}
54+
`
55+
56+
streamingBodyWithUsage = `data: {"id":"cmpl-41764c93-f9d2-4f31-be08-3ba04fa25394","object":"text_completion","created":1740002445,"model":"tweet-summary-0","choices":[],"usage":{"prompt_tokens":7,"total_tokens":17,"completion_tokens":10}}
57+
data: [DONE]
58+
`
5259
)
5360

5461
func TestHandleResponseBody(t *testing.T) {
@@ -57,6 +64,7 @@ func TestHandleResponseBody(t *testing.T) {
5764
tests := []struct {
5865
name string
5966
req *extProcPb.ProcessingRequest_ResponseBody
67+
reqCtx *RequestContext
6068
want Response
6169
wantErr bool
6270
}{
@@ -84,12 +92,49 @@ func TestHandleResponseBody(t *testing.T) {
8492
},
8593
wantErr: true,
8694
},
95+
{
96+
name: "streaming request without usage",
97+
req: &extProcPb.ProcessingRequest_ResponseBody{
98+
ResponseBody: &extProcPb.HttpBody{
99+
Body: []byte(streamingBodyWithoutUsage),
100+
},
101+
},
102+
reqCtx: &RequestContext{
103+
Streaming: true,
104+
StreamingIncludeUsage: false,
105+
},
106+
wantErr: false,
107+
// In the middle of streaming response, so request context response is not set yet.
108+
},
109+
{
110+
name: "streaming request with usage",
111+
req: &extProcPb.ProcessingRequest_ResponseBody{
112+
ResponseBody: &extProcPb.HttpBody{
113+
Body: []byte(streamingBodyWithUsage),
114+
},
115+
},
116+
reqCtx: &RequestContext{
117+
Streaming: true,
118+
StreamingIncludeUsage: true,
119+
},
120+
wantErr: false,
121+
want: Response{
122+
Usage: Usage{
123+
PromptTokens: 7,
124+
TotalTokens: 17,
125+
CompletionTokens: 10,
126+
},
127+
},
128+
},
87129
}
88130

89131
for _, test := range tests {
90132
t.Run(test.name, func(t *testing.T) {
91133
server := &Server{}
92-
reqCtx := &RequestContext{}
134+
reqCtx := test.reqCtx
135+
if reqCtx == nil {
136+
reqCtx = &RequestContext{}
137+
}
93138
_, err := server.HandleResponseBody(ctx, reqCtx, &extProcPb.ProcessingRequest{Request: test.req})
94139
if err != nil {
95140
if !test.wantErr {

pkg/epp/handlers/server.go

+12-2
Original file line numberDiff line numberDiff line change
@@ -124,7 +124,11 @@ func (s *Server) Process(srv extProcPb.ExternalProcessor_ProcessServer) error {
124124
metrics.RecordInputTokens(reqCtx.Model, reqCtx.ResolvedTargetModel, reqCtx.Response.Usage.PromptTokens)
125125
metrics.RecordOutputTokens(reqCtx.Model, reqCtx.ResolvedTargetModel, reqCtx.Response.Usage.CompletionTokens)
126126
}
127-
loggerVerbose.Info("Request context after HandleResponseBody", "context", reqCtx)
127+
if reqCtx.Streaming {
128+
logger.V(logutil.DEBUG).Info("Request context after HandleResponseBody", "context", reqCtx)
129+
} else {
130+
loggerVerbose.Info("Request context after HandleResponseBody", "context", reqCtx)
131+
}
128132
default:
129133
logger.V(logutil.DEFAULT).Error(nil, "Unknown Request type", "request", v)
130134
return status.Error(codes.Unknown, "unknown request type")
@@ -138,7 +142,11 @@ func (s *Server) Process(srv extProcPb.ExternalProcessor_ProcessServer) error {
138142
}
139143
}
140144

141-
loggerVerbose.Info("Response generated", "response", resp)
145+
if !reqCtx.Streaming {
146+
loggerVerbose.Info("Response generated", "response", resp)
147+
} else {
148+
logger.V(logutil.DEBUG).Info("Response generated", "response", resp)
149+
}
142150
if err := srv.Send(resp); err != nil {
143151
logger.V(logutil.DEFAULT).Error(err, "Send failed")
144152
return status.Errorf(codes.Unknown, "failed to send response back to Envoy: %v", err)
@@ -213,4 +221,6 @@ type RequestContext struct {
213221
ResponseSize int
214222
ResponseComplete bool
215223
ResponseStatusCode string
224+
Streaming bool
225+
StreamingIncludeUsage bool
216226
}

site-src/guides/metrics.md

+35-6
Original file line numberDiff line numberDiff line change
@@ -4,13 +4,29 @@ This guide describes the current state of exposed metrics and how to scrape them
44

55
## Requirements
66

7-
Response metrics are only supported in non-streaming mode, with the follow up [issue](https://github.com/kubernetes-sigs/gateway-api-inference-extension/issues/178) to address streaming mode.
7+
For non-streaming request, enable `Buffered` for response in `EnvoyExtensionPolicy`:
88

9-
Currently there are two options:
10-
- If requests don't use response streaming, then you can enable `Buffered` mode for response in `EnvoyExtensionPolicy`, this will buffer the response body at the proxy and forward it to the endpoint picker, which allows the endpoint picker to report response metrics.
11-
12-
- If requests use response streaming, then it is not recommended to enable `Buffered` mode, the response body processing mode should be left empty in the `EnvoyExtensionPolicy` (default). In this case response bodies will not be forwarded to the endpoint picker, and therefore response metrics will not be reported.
9+
```
10+
apiVersion: gateway.envoyproxy.io/v1alpha1
11+
kind: EnvoyExtensionPolicy
12+
metadata:
13+
name: ext-proc-policy
14+
namespace: default
15+
spec:
16+
extProc:
17+
- backendRefs:
18+
- group: ""
19+
kind: Service
20+
name: inference-gateway-ext-proc
21+
port: 9002
22+
processingMode:
23+
request:
24+
body: Buffered
25+
response:
26+
body: Buffered
27+
```
1328

29+
For streaming request, enable `Streamed` for response in `EnvoyExtensionPolicy`:
1430

1531
```
1632
apiVersion: gateway.envoyproxy.io/v1alpha1
@@ -29,7 +45,20 @@ spec:
2945
request:
3046
body: Buffered
3147
response:
32-
body: Buffered
48+
body: Streamed
49+
```
50+
51+
If you want to include usage metrics for vLLM model server, send the request with `include_usage`:
52+
53+
```
54+
curl -i ${IP}:${PORT}/v1/completions -H 'Content-Type: application/json' -d '{
55+
"model": "tweet-summary",
56+
"prompt": "whats your fav movie?",
57+
"max_tokens": 10,
58+
"temperature": 0,
59+
"stream": true,
60+
"stream_options": {"include_usage": "true"}
61+
}'
3362
```
3463

3564
## Exposed metrics

0 commit comments

Comments
 (0)