Skip to content

Commit 1d66114

Browse files
committed
WIP: [Metrics] Add streaming support for metrics
1 parent 242b73e commit 1d66114

File tree

4 files changed

+74
-15
lines changed

4 files changed

+74
-15
lines changed

pkg/ext-proc/handlers/request.go

+9
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,15 @@ func (s *Server) HandleRequestBody(reqCtx *RequestContext, req *extProcPb.Proces
3838
klog.V(logutil.VERBOSE).Infof("Model requested: %v", model)
3939
modelName := model
4040

41+
// Resolve streaming options
42+
43+
streaming, ok := rb["stream"].(bool)
44+
if !ok {
45+
// streaming not set, no-op
46+
} else {
47+
reqCtx.Streaming = streaming
48+
}
49+
4150
// NOTE: The nil checking for the modelObject means that we DO allow passthrough currently.
4251
// This might be a security risk in the future where adapters not registered in the InferenceModel
4352
// are able to be requested by using their distinct name.

pkg/ext-proc/handlers/response.go

+54-13
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@ package handlers
33
import (
44
"encoding/json"
55
"fmt"
6+
"regexp"
7+
"strings"
68

79
configPb "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
810
extProcPb "github.com/envoyproxy/go-control-plane/envoy/service/ext_proc/v3"
@@ -16,6 +18,10 @@ func (s *Server) HandleResponseHeaders(reqCtx *RequestContext, req *extProcPb.Pr
1618
h := req.Request.(*extProcPb.ProcessingRequest_ResponseHeaders)
1719
klog.V(logutil.VERBOSE).Infof("Headers before: %+v\n", h)
1820

21+
if h.ResponseHeaders.EndOfStream {
22+
reqCtx.StreamingCompleted = true
23+
klog.V(logutil.VERBOSE).Info("Response is completed")
24+
}
1925
resp := &extProcPb.ProcessingResponse{
2026
Response: &extProcPb.ProcessingResponse_ResponseHeaders{
2127
ResponseHeaders: &extProcPb.HeadersResponse{
@@ -66,22 +72,57 @@ func (s *Server) HandleResponseHeaders(reqCtx *RequestContext, req *extProcPb.Pr
6672
}
6773
}*/
6874
func (s *Server) HandleResponseBody(reqCtx *RequestContext, req *extProcPb.ProcessingRequest) (*extProcPb.ProcessingResponse, error) {
69-
klog.V(logutil.VERBOSE).Info("Processing HandleResponseBody")
7075
body := req.Request.(*extProcPb.ProcessingRequest_ResponseBody)
7176

72-
res := Response{}
73-
if err := json.Unmarshal(body.ResponseBody.Body, &res); err != nil {
74-
return nil, fmt.Errorf("unmarshaling response body: %v", err)
77+
if reqCtx.Streaming {
78+
responseText := string(reqCtx.prevResponse)
79+
if strings.Contains(responseText, "[DONE]") {
80+
lastResponse := Response{}
81+
82+
// Example message:
83+
// data: {"id":"cmpl-d6392493-b56c-4d81-9f11-995a0dc93c5d","object":"text_completion","created":1739400043,"model":"tweet-summary-0","choices":[],"usage":{"prompt_tokens":7,"total_tokens":17,"completion_tokens":10}}
84+
//
85+
// data: [DONE]
86+
// we need to strip the `data:` prefix and next Data: [DONE] message.
87+
88+
msgInStr := string(reqCtx.prevResponse)
89+
// msgInStr = msgInStr[6:]
90+
re := regexp.MustCompile(`\{.*(?:\{.*\}|[^\{]*)\}`) // match for JSON object
91+
match := re.FindString(msgInStr)
92+
93+
byteSlice := []byte(match)
94+
if err := json.Unmarshal(byteSlice, &lastResponse); err != nil {
95+
return nil, fmt.Errorf("unmarshaling response body: %v", err)
96+
}
97+
klog.V(logutil.VERBOSE).Infof("[DONE] previous response is: %+v", lastResponse)
98+
99+
reqCtx.Response = lastResponse
100+
}
101+
102+
// This should be placed before checking [DONE] message because [DONE] message is produced
103+
// after usage context.
104+
reqCtx.prevResponse = body.ResponseBody.Body
105+
106+
if reqCtx.StreamingCompleted || body.ResponseBody.EndOfStream {
107+
klog.V(logutil.VERBOSE).Info("Streaming is completed")
108+
reqCtx.ResponseComplete = true
109+
} else {
110+
reqCtx.ResponseSize += len(body.ResponseBody.Body)
111+
}
112+
113+
} else {
114+
klog.V(logutil.VERBOSE).Info("Processing HandleResponseBody")
115+
116+
res := Response{}
117+
if err := json.Unmarshal(body.ResponseBody.Body, &res); err != nil {
118+
return nil, fmt.Errorf("unmarshaling response body: %v", err)
119+
}
120+
reqCtx.Response = res
121+
reqCtx.ResponseSize = len(body.ResponseBody.Body)
122+
reqCtx.ResponseComplete = true
123+
124+
klog.V(logutil.VERBOSE).Infof("Response: %+v", res)
75125
}
76-
reqCtx.Response = res
77-
reqCtx.ResponseSize = len(body.ResponseBody.Body)
78-
// ResponseComplete is to indicate the response is complete. In non-streaming
79-
// case, it will be set to be true once the response is processed; in
80-
// streaming case, it will be set to be true once the last chunk is processed.
81-
// TODO(https://github.com/kubernetes-sigs/gateway-api-inference-extension/issues/178)
82-
// will add the processing for streaming case.
83-
reqCtx.ResponseComplete = true
84-
klog.V(logutil.VERBOSE).Infof("Response: %+v", res)
85126

86127
resp := &extProcPb.ProcessingResponse{
87128
Response: &extProcPb.ProcessingResponse_ResponseBody{

pkg/ext-proc/handlers/server.go

+10-2
Original file line numberDiff line numberDiff line change
@@ -95,12 +95,13 @@ func (s *Server) Process(srv extProcPb.ExternalProcessor_ProcessServer) error {
9595
resp, err = s.HandleResponseBody(reqCtx, req)
9696
if err == nil && reqCtx.ResponseComplete {
9797
reqCtx.ResponseCompleteTimestamp = time.Now()
98+
9899
metrics.RecordRequestLatencies(reqCtx.Model, reqCtx.ResolvedTargetModel, reqCtx.RequestReceivedTimestamp, reqCtx.ResponseCompleteTimestamp)
99100
metrics.RecordResponseSizes(reqCtx.Model, reqCtx.ResolvedTargetModel, reqCtx.ResponseSize)
100101
metrics.RecordInputTokens(reqCtx.Model, reqCtx.ResolvedTargetModel, reqCtx.Response.Usage.PromptTokens)
101102
metrics.RecordOutputTokens(reqCtx.Model, reqCtx.ResolvedTargetModel, reqCtx.Response.Usage.CompletionTokens)
102103
}
103-
klog.V(logutil.VERBOSE).Infof("Request context after HandleResponseBody: %+v", reqCtx)
104+
klog.V(logutil.DEBUG).Infof("Request context after HandleResponseBody: %+v", reqCtx)
104105
default:
105106
klog.Errorf("Unknown Request type %+v", v)
106107
return status.Error(codes.Unknown, "unknown request type")
@@ -125,7 +126,11 @@ func (s *Server) Process(srv extProcPb.ExternalProcessor_ProcessServer) error {
125126
}
126127
}
127128

128-
klog.V(logutil.VERBOSE).Infof("response: %v", resp)
129+
if !reqCtx.Streaming {
130+
klog.V(logutil.VERBOSE).Infof("response: %v", resp)
131+
} else {
132+
klog.V(logutil.DEBUG).Infof("response: %v", resp)
133+
}
129134
if err := srv.Send(resp); err != nil {
130135
klog.Errorf("send error %v", err)
131136
return status.Errorf(codes.Unknown, "failed to send response back to Envoy: %v", err)
@@ -144,4 +149,7 @@ type RequestContext struct {
144149
Response Response
145150
ResponseSize int
146151
ResponseComplete bool
152+
Streaming bool
153+
StreamingCompleted bool
154+
prevResponse []byte // in streaming mode, we need to track the previous response in order to parse it before DONE message
147155
}

pkg/manifests/gateway/extension_policy.yaml

+1
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ spec:
1414
request:
1515
body: Buffered
1616
response:
17+
body: Streamed
1718
# The timeouts are likely not needed here. We can experiment with removing/tuning them slowly.
1819
# The connection limits are more important and will cause the opaque: ext_proc_gRPC_error_14 error in Envoy GW if not configured correctly.
1920
messageTimeout: 1000s

0 commit comments

Comments
 (0)