Skip to content

Commit 2d05b45

Browse files
committed
Updated request metrics to be handled in server processing loop
Signed-off-by: Jie WU <[email protected]>
1 parent 913981e commit 2d05b45

File tree

4 files changed

+99
-23
lines changed

4 files changed

+99
-23
lines changed

pkg/ext-proc/handlers/request.go

+2-4
Original file line numberDiff line numberDiff line change
@@ -5,12 +5,10 @@ import (
55
"errors"
66
"fmt"
77
"strconv"
8-
"time"
98

109
configPb "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
1110
extProcPb "github.com/envoyproxy/go-control-plane/envoy/service/ext_proc/v3"
1211
"inference.networking.x-k8s.io/gateway-api-inference-extension/pkg/ext-proc/backend"
13-
"inference.networking.x-k8s.io/gateway-api-inference-extension/pkg/ext-proc/metrics"
1412
"inference.networking.x-k8s.io/gateway-api-inference-extension/pkg/ext-proc/scheduling"
1513
klog "k8s.io/klog/v2"
1614
)
@@ -20,7 +18,6 @@ import (
2018
// Envoy sends the request body to ext proc before sending the request to the backend server.
2119
func (s *Server) HandleRequestBody(reqCtx *RequestContext, req *extProcPb.ProcessingRequest) (*extProcPb.ProcessingResponse, error) {
2220
klog.V(3).Infof("Handling request body")
23-
requestReceivedTimestamp := time.Now()
2421

2522
// Unmarshal request body (must be JSON).
2623
v := req.Request.(*extProcPb.ProcessingRequest_RequestBody)
@@ -79,6 +76,8 @@ func (s *Server) HandleRequestBody(reqCtx *RequestContext, req *extProcPb.Proces
7976
klog.V(3).Infof("Selected target model %v in target pod: %v\n", llmReq.ResolvedTargetModel, targetPod)
8077

8178
reqCtx.Model = llmReq.Model
79+
reqCtx.ResolvedTargetModel = llmReq.ResolvedTargetModel
80+
reqCtx.RequestSize = len(v.RequestBody.Body)
8281
reqCtx.TargetPod = targetPod
8382

8483
// Insert "target-pod" to instruct Envoy to route requests to the specified target pod.
@@ -119,7 +118,6 @@ func (s *Server) HandleRequestBody(reqCtx *RequestContext, req *extProcPb.Proces
119118
},
120119
},
121120
}
122-
metrics.MonitorRequest(llmReq.Model, llmReq.ResolvedTargetModel, len(v.RequestBody.Body), time.Since(requestReceivedTimestamp))
123121
return resp, nil
124122
}
125123

pkg/ext-proc/metrics/metrics.go

+18-4
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66

77
compbasemetrics "k8s.io/component-base/metrics"
88
"k8s.io/component-base/metrics/legacyregistry"
9+
klog "k8s.io/klog/v2"
910
)
1011

1112
const (
@@ -63,10 +64,23 @@ func Register() {
6364
})
6465
}
6566

66-
// MonitorRequest handles monitoring requests.
67-
func MonitorRequest(modelName, targetModelName string, reqSize int, elapsed time.Duration) {
68-
elapsedSeconds := elapsed.Seconds()
67+
// RecordRequstCounter records the number of requests.
68+
func RecordRequestCounter(modelName, targetModelName string) {
6969
requestCounter.WithLabelValues(modelName, targetModelName).Inc()
70-
requestLatencies.WithLabelValues(modelName, targetModelName).Observe(elapsedSeconds)
70+
}
71+
72+
// RecordRequestSizes records the request sizes.
73+
func RecordRequestSizes(modelName, targetModelName string, reqSize int) {
7174
requestSizes.WithLabelValues(modelName, targetModelName).Observe(float64(reqSize))
7275
}
76+
77+
// RecordRequstLatencies records duration of request.
78+
func RecordRequestLatencies(modelName, targetModelName string, received time.Time, complete time.Time) bool {
79+
if !complete.After(received) {
80+
klog.Errorf("request latency value error for model name %v, target model name %v: complete time %v is before received time %v", modelName, targetModelName, complete, received)
81+
return false
82+
}
83+
elapsedSeconds := complete.Sub(received).Seconds()
84+
requestLatencies.WithLabelValues(modelName, targetModelName).Observe(elapsedSeconds)
85+
return true
86+
}

pkg/ext-proc/metrics/metrics_test.go

+78-15
Original file line numberDiff line numberDiff line change
@@ -13,12 +13,11 @@ const RequestTotalMetric = InferenceModelComponent + "_request_total"
1313
const RequestLatenciesMetric = InferenceModelComponent + "_request_duration_seconds"
1414
const RequestSizesMetric = InferenceModelComponent + "_request_sizes"
1515

16-
func TestMonitorRequest(t *testing.T) {
16+
func TestRecordRequestCounterandSizes(t *testing.T) {
1717
type requests struct {
1818
modelName string
1919
targetModelName string
2020
reqSize int
21-
elapsed time.Duration
2221
}
2322
scenarios := []struct {
2423
name string
@@ -30,33 +29,30 @@ func TestMonitorRequest(t *testing.T) {
3029
modelName: "m10",
3130
targetModelName: "t10",
3231
reqSize: 1200,
33-
elapsed: time.Millisecond * 10,
3432
},
3533
{
3634
modelName: "m10",
3735
targetModelName: "t10",
3836
reqSize: 500,
39-
elapsed: time.Millisecond * 1600,
4037
},
4138
{
4239
modelName: "m10",
4340
targetModelName: "t11",
4441
reqSize: 2480,
45-
elapsed: time.Millisecond * 60,
4642
},
4743
{
4844
modelName: "m20",
4945
targetModelName: "t20",
5046
reqSize: 80,
51-
elapsed: time.Millisecond * 120,
5247
},
5348
},
5449
}}
5550
Register()
5651
for _, scenario := range scenarios {
5752
t.Run(scenario.name, func(t *testing.T) {
5853
for _, req := range scenario.reqs {
59-
MonitorRequest(req.modelName, req.targetModelName, req.reqSize, req.elapsed)
54+
RecordRequestCounter(req.modelName, req.targetModelName)
55+
RecordRequestSizes(req.modelName, req.targetModelName, req.reqSize)
6056
}
6157
wantRequestTotal, err := os.Open("testdata/request_total_metric")
6258
defer func() {
@@ -70,31 +66,98 @@ func TestMonitorRequest(t *testing.T) {
7066
if err := testutil.GatherAndCompare(legacyregistry.DefaultGatherer, wantRequestTotal, RequestTotalMetric); err != nil {
7167
t.Error(err)
7268
}
73-
wantRequestLatencies, err := os.Open("testdata/request_duration_seconds_metric")
69+
wantRequestSizes, err := os.Open("testdata/request_sizes_metric")
7470
defer func() {
75-
if err := wantRequestLatencies.Close(); err != nil {
71+
if err := wantRequestSizes.Close(); err != nil {
7672
t.Error(err)
7773
}
7874
}()
7975
if err != nil {
8076
t.Fatal(err)
8177
}
82-
if err := testutil.GatherAndCompare(legacyregistry.DefaultGatherer, wantRequestLatencies, RequestLatenciesMetric); err != nil {
78+
if err := testutil.GatherAndCompare(legacyregistry.DefaultGatherer, wantRequestSizes, RequestSizesMetric); err != nil {
8379
t.Error(err)
8480
}
85-
wantRequestSizes, err := os.Open("testdata/request_sizes_metric")
81+
82+
})
83+
}
84+
}
85+
86+
87+
func TestRecordRequestLatencies(t *testing.T) {
88+
timeBaseline := time.Now()
89+
type requests struct {
90+
modelName string
91+
targetModelName string
92+
receivedTime time.Time
93+
completeTime time.Time
94+
}
95+
scenarios := []struct {
96+
name string
97+
reqs []requests
98+
invalid bool
99+
}{{
100+
name: "multiple requests",
101+
reqs: []requests{
102+
{
103+
modelName: "m10",
104+
targetModelName: "t10",
105+
receivedTime: timeBaseline,
106+
completeTime: timeBaseline.Add(time.Millisecond * 10),
107+
},
108+
{
109+
modelName: "m10",
110+
targetModelName: "t10",
111+
receivedTime: timeBaseline,
112+
completeTime: timeBaseline.Add(time.Millisecond * 1600),
113+
},
114+
{
115+
modelName: "m10",
116+
targetModelName: "t11",
117+
receivedTime: timeBaseline,
118+
completeTime: timeBaseline.Add(time.Millisecond * 60),
119+
},
120+
{
121+
modelName: "m20",
122+
targetModelName: "t20",
123+
receivedTime: timeBaseline,
124+
completeTime: timeBaseline.Add(time.Millisecond * 120),
125+
},
126+
},
127+
},
128+
{
129+
name: "invalid elapsed time",
130+
reqs: []requests{
131+
{
132+
modelName: "m10",
133+
targetModelName: "t10",
134+
receivedTime: timeBaseline.Add(time.Millisecond * 10),
135+
completeTime: timeBaseline,
136+
}},
137+
invalid: true,
138+
}}
139+
Register()
140+
for _, scenario := range scenarios {
141+
t.Run(scenario.name, func(t *testing.T) {
142+
for _, req := range scenario.reqs {
143+
success := RecordRequestLatencies(req.modelName, req.targetModelName, req.receivedTime, req.completeTime)
144+
if success == scenario.invalid {
145+
t.Errorf("got record success(%v), but the request expects invalid(%v)", success, scenario.invalid)
146+
}
147+
}
148+
149+
wantRequestLatencies, err := os.Open("testdata/request_duration_seconds_metric")
86150
defer func() {
87-
if err := wantRequestSizes.Close(); err != nil {
151+
if err := wantRequestLatencies.Close(); err != nil {
88152
t.Error(err)
89153
}
90154
}()
91155
if err != nil {
92156
t.Fatal(err)
93157
}
94-
if err := testutil.GatherAndCompare(legacyregistry.DefaultGatherer, wantRequestSizes, RequestSizesMetric); err != nil {
158+
if err := testutil.GatherAndCompare(legacyregistry.DefaultGatherer, wantRequestLatencies, RequestLatenciesMetric); err != nil {
95159
t.Error(err)
96160
}
97-
98161
})
99162
}
100-
}
163+
}

pkg/manifests/ext_proc.yaml

+1
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,7 @@ spec:
9595
request:
9696
body: Buffered
9797
response:
98+
body: Buffered
9899
# The timeouts are likely not needed here. We can experiment with removing/tuning them slowly.
99100
# The connection limits are more important and will cause the opaque: ext_proc_gRPC_error_14 error in Envoy GW if not configured correctly.
100101
messageTimeout: 1000s

0 commit comments

Comments
 (0)