Skip to content

Commit 4f90f79

Browse files
committed
[Metrics] Add request error metrics
This change defines some general errors, the list might grow in the future if more finer error types are needed.
1 parent 731597d commit 4f90f79

File tree

11 files changed

+224
-19
lines changed

11 files changed

+224
-19
lines changed

go.mod

+1-2
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ require (
2929
k8s.io/utils v0.0.0-20241210054802-24370beab758
3030
sigs.k8s.io/controller-runtime v0.20.1
3131
sigs.k8s.io/structured-merge-diff/v4 v4.5.0
32+
sigs.k8s.io/yaml v1.4.0
3233
)
3334

3435
require (
@@ -104,7 +105,6 @@ require (
104105
github.com/spf13/cobra v1.8.1 // indirect
105106
github.com/spf13/pflag v1.0.5 // indirect
106107
github.com/stoewer/go-strcase v1.3.0 // indirect
107-
github.com/stretchr/objx v0.5.2 // indirect
108108
github.com/x448/float16 v0.8.4 // indirect
109109
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.53.0 // indirect
110110
go.opentelemetry.io/otel v1.32.0 // indirect
@@ -140,5 +140,4 @@ require (
140140
sigs.k8s.io/apiserver-network-proxy/konnectivity-client v0.31.0 // indirect
141141
sigs.k8s.io/controller-tools v0.14.0 // indirect
142142
sigs.k8s.io/json v0.0.0-20241010143419-9aa6b5e7a4b3 // indirect
143-
sigs.k8s.io/yaml v1.4.0 // indirect
144143
)

go.sum

-2
Original file line numberDiff line numberDiff line change
@@ -215,8 +215,6 @@ github.com/stoewer/go-strcase v1.3.0/go.mod h1:fAH5hQ5pehh+j3nZfvwdk2RgEgQjAoM8w
215215
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
216216
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
217217
github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo=
218-
github.com/stretchr/objx v0.5.2 h1:xuMeJ0Sdp5ZMRXx/aWO6RZxdr3beISkG5/G/aIRr3pY=
219-
github.com/stretchr/objx v0.5.2/go.mod h1:FRsXN1f5AsAjCGJKqEizvkpNtU+EGNCLh3NxZ/8L+MA=
220218
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
221219
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
222220
github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA=

pkg/ext-proc/handlers/request.go

+7-7
Original file line numberDiff line numberDiff line change
@@ -2,14 +2,14 @@ package handlers
22

33
import (
44
"encoding/json"
5-
"errors"
65
"fmt"
76
"strconv"
87

98
configPb "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
109
extProcPb "github.com/envoyproxy/go-control-plane/envoy/service/ext_proc/v3"
1110
"inference.networking.x-k8s.io/gateway-api-inference-extension/pkg/ext-proc/backend"
1211
"inference.networking.x-k8s.io/gateway-api-inference-extension/pkg/ext-proc/scheduling"
12+
infextprocerror "inference.networking.x-k8s.io/gateway-api-inference-extension/pkg/ext-proc/util/error"
1313
logutil "inference.networking.x-k8s.io/gateway-api-inference-extension/pkg/ext-proc/util/logging"
1414
klog "k8s.io/klog/v2"
1515
)
@@ -25,14 +25,14 @@ func (s *Server) HandleRequestBody(reqCtx *RequestContext, req *extProcPb.Proces
2525
var rb map[string]interface{}
2626
if err := json.Unmarshal(v.RequestBody.Body, &rb); err != nil {
2727
klog.Errorf("Error unmarshaling request body: %v", err)
28-
return nil, fmt.Errorf("error unmarshaling request body: %v", err)
28+
return nil, infextprocerror.Error{Code: infextprocerror.InvalidRequest, Msg: fmt.Sprintf("error unmarshaling request body: %v", err)}
2929
}
3030
klog.V(logutil.VERBOSE).Infof("Request body: %v", rb)
3131

3232
// Resolve target models.
3333
model, ok := rb["model"].(string)
3434
if !ok {
35-
return nil, errors.New("model not found in request")
35+
return nil, infextprocerror.Error{Code: infextprocerror.InvalidRequest, Msg: "model not found in request"}
3636
}
3737
klog.V(logutil.VERBOSE).Infof("Model requested: %v", model)
3838
modelName := model
@@ -42,12 +42,12 @@ func (s *Server) HandleRequestBody(reqCtx *RequestContext, req *extProcPb.Proces
4242
// are able to be requested by using their distinct name.
4343
modelObj := s.datastore.FetchModelData(model)
4444
if modelObj == nil {
45-
return nil, fmt.Errorf("error finding a model object in InferenceModel for input %v", model)
45+
return nil, infextprocerror.Error{Code: infextprocerror.Internal, Msg: fmt.Sprintf("error finding a model object in InferenceModel for input %v", modelObj.Name)}
4646
}
4747
if len(modelObj.Spec.TargetModels) > 0 {
4848
modelName = backend.RandomWeightedDraw(modelObj, 0)
4949
if modelName == "" {
50-
return nil, fmt.Errorf("error getting target model name for model %v", modelObj.Name)
50+
return nil, infextprocerror.Error{Code: infextprocerror.Internal, Msg: fmt.Sprintf("error getting target model name for model %v", modelObj.Name)}
5151
}
5252
}
5353
llmReq := &scheduling.LLMRequest{
@@ -65,14 +65,14 @@ func (s *Server) HandleRequestBody(reqCtx *RequestContext, req *extProcPb.Proces
6565
requestBody, err = json.Marshal(rb)
6666
if err != nil {
6767
klog.Errorf("Error marshaling request body: %v", err)
68-
return nil, fmt.Errorf("error marshaling request body: %v", err)
68+
return nil, infextprocerror.Error{Code: infextprocerror.Internal, Msg: fmt.Sprintf("error marshaling request body: %v", err)}
6969
}
7070
klog.V(logutil.VERBOSE).Infof("Updated body: %v", string(requestBody))
7171
}
7272

7373
targetPod, err := s.scheduler.Schedule(llmReq)
7474
if err != nil {
75-
return nil, fmt.Errorf("failed to find target pod: %w", err)
75+
return nil, infextprocerror.Error{Code: infextprocerror.Internal, Msg: fmt.Errorf("failed to find target pod: %w", err).Error()}
7676
}
7777
klog.V(logutil.VERBOSE).Infof("Selected target model %v in target pod: %v\n", llmReq.ResolvedTargetModel, targetPod)
7878

pkg/ext-proc/handlers/response.go

+39-1
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66

77
configPb "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
88
extProcPb "github.com/envoyproxy/go-control-plane/envoy/service/ext_proc/v3"
9+
infextprocerror "inference.networking.x-k8s.io/gateway-api-inference-extension/pkg/ext-proc/util/error"
910
logutil "inference.networking.x-k8s.io/gateway-api-inference-extension/pkg/ext-proc/util/logging"
1011
klog "k8s.io/klog/v2"
1112
)
@@ -16,6 +17,43 @@ func (s *Server) HandleResponseHeaders(reqCtx *RequestContext, req *extProcPb.Pr
1617
h := req.Request.(*extProcPb.ProcessingRequest_ResponseHeaders)
1718
klog.V(logutil.VERBOSE).Infof("Headers before: %+v\n", h)
1819

20+
// Example header
21+
// {
22+
// "ResponseHeaders": {
23+
// "headers": [
24+
// {
25+
// "key": ":status",
26+
// "raw_value": "200"
27+
// },
28+
// {
29+
// "key": "date",
30+
// "raw_value": "Thu, 30 Jan 2025 18:50:48 GMT"
31+
// },
32+
// {
33+
// "key": "server",
34+
// "raw_value": "uvicorn"
35+
// },
36+
// {
37+
// "key": "content-type",
38+
// "raw_value": "text/event-stream; charset=utf-8"
39+
// },
40+
// {
41+
// "key": "transfer-encoding",
42+
// "raw_value": "chunked"
43+
// }
44+
// ]
45+
// }
46+
// }
47+
for _, header := range h.ResponseHeaders.Headers.GetHeaders() {
48+
if header.Key == "status" {
49+
code := header.RawValue[0]
50+
if string(code) != "200" {
51+
reqCtx.ResponseStatusCode = infextprocerror.ModelServerError
52+
}
53+
break
54+
}
55+
}
56+
1957
resp := &extProcPb.ProcessingResponse{
2058
Response: &extProcPb.ProcessingResponse_ResponseHeaders{
2159
ResponseHeaders: &extProcPb.HeadersResponse{
@@ -71,7 +109,7 @@ func (s *Server) HandleResponseBody(reqCtx *RequestContext, req *extProcPb.Proce
71109

72110
res := Response{}
73111
if err := json.Unmarshal(body.ResponseBody.Body, &res); err != nil {
74-
return nil, fmt.Errorf("unmarshaling response body: %v", err)
112+
return nil, infextprocerror.Error{Code: infextprocerror.Internal, Msg: fmt.Sprintf("unmarshaling response body: %v", err)}
75113
}
76114
reqCtx.Response = res
77115
reqCtx.ResponseSize = len(body.ResponseBody.Body)

pkg/ext-proc/handlers/server.go

+33-2
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import (
1212
"inference.networking.x-k8s.io/gateway-api-inference-extension/pkg/ext-proc/backend"
1313
"inference.networking.x-k8s.io/gateway-api-inference-extension/pkg/ext-proc/metrics"
1414
"inference.networking.x-k8s.io/gateway-api-inference-extension/pkg/ext-proc/scheduling"
15+
infextprocerror "inference.networking.x-k8s.io/gateway-api-inference-extension/pkg/ext-proc/util/error"
1516
logutil "inference.networking.x-k8s.io/gateway-api-inference-extension/pkg/ext-proc/util/logging"
1617
klog "k8s.io/klog/v2"
1718
)
@@ -105,12 +106,19 @@ func (s *Server) Process(srv extProcPb.ExternalProcessor_ProcessServer) error {
105106
klog.Errorf("Unknown Request type %+v", v)
106107
return status.Error(codes.Unknown, "unknown request type")
107108
}
109+
110+
// This indicates error from the underlying model server.
111+
if reqCtx.ResponseStatusCode == infextprocerror.ModelServerError {
112+
metrics.RecordRequestErrCounter(reqCtx.Model, reqCtx.ResolvedTargetModel, infextprocerror.ModelServerError)
113+
}
114+
108115
if err != nil {
109116
klog.Errorf("failed to process request: %v", err)
110-
switch status.Code(err) {
117+
metrics.RecordRequestErrCounter(reqCtx.Model, reqCtx.ResolvedTargetModel, infextprocerror.CanonicalCode(err))
118+
switch infextprocerror.CanonicalCode(err) {
111119
// This code can be returned by scheduler when there is no capacity for sheddable
112120
// requests.
113-
case codes.ResourceExhausted:
121+
case infextprocerror.ResourceExhausted:
114122
resp = &extProcPb.ProcessingResponse{
115123
Response: &extProcPb.ProcessingResponse_ImmediateResponse{
116124
ImmediateResponse: &extProcPb.ImmediateResponse{
@@ -120,6 +128,28 @@ func (s *Server) Process(srv extProcPb.ExternalProcessor_ProcessServer) error {
120128
},
121129
},
122130
}
131+
// This code can be returned by when EPP processes the request and run into server-side errors.
132+
case infextprocerror.Internal:
133+
resp = &extProcPb.ProcessingResponse{
134+
Response: &extProcPb.ProcessingResponse_ImmediateResponse{
135+
ImmediateResponse: &extProcPb.ImmediateResponse{
136+
Status: &envoyTypePb.HttpStatus{
137+
Code: envoyTypePb.StatusCode_InternalServerError,
138+
},
139+
},
140+
},
141+
}
142+
// This code can be returned when users provide invalid json request.
143+
case infextprocerror.InvalidRequest:
144+
resp = &extProcPb.ProcessingResponse{
145+
Response: &extProcPb.ProcessingResponse_ImmediateResponse{
146+
ImmediateResponse: &extProcPb.ImmediateResponse{
147+
Status: &envoyTypePb.HttpStatus{
148+
Code: envoyTypePb.StatusCode_BadRequest,
149+
},
150+
},
151+
},
152+
}
123153
default:
124154
return status.Errorf(status.Code(err), "failed to handle request: %v", err)
125155
}
@@ -144,4 +174,5 @@ type RequestContext struct {
144174
Response Response
145175
ResponseSize int
146176
ResponseComplete bool
177+
ResponseStatusCode infextprocerror.ErrorCode
147178
}

pkg/ext-proc/metrics/metrics.go

+17
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"sync"
55
"time"
66

7+
infextprocerror "inference.networking.x-k8s.io/gateway-api-inference-extension/pkg/ext-proc/util/error"
78
compbasemetrics "k8s.io/component-base/metrics"
89
"k8s.io/component-base/metrics/legacyregistry"
910
klog "k8s.io/klog/v2"
@@ -24,6 +25,16 @@ var (
2425
[]string{"model_name", "target_model_name"},
2526
)
2627

28+
requestErrCounter = compbasemetrics.NewCounterVec(
29+
&compbasemetrics.CounterOpts{
30+
Subsystem: InferenceModelComponent,
31+
Name: "request_error_total",
32+
Help: "Counter of inference model requests errors broken out for each model and target model.",
33+
StabilityLevel: compbasemetrics.ALPHA,
34+
},
35+
[]string{"model_name", "target_model_name", "type"},
36+
)
37+
2738
requestLatencies = compbasemetrics.NewHistogramVec(
2839
&compbasemetrics.HistogramOpts{
2940
Subsystem: InferenceModelComponent,
@@ -96,6 +107,7 @@ var registerMetrics sync.Once
96107
func Register() {
97108
registerMetrics.Do(func() {
98109
legacyregistry.MustRegister(requestCounter)
110+
legacyregistry.MustRegister(requestErrCounter)
99111
legacyregistry.MustRegister(requestLatencies)
100112
legacyregistry.MustRegister(requestSizes)
101113
legacyregistry.MustRegister(responseSizes)
@@ -109,6 +121,11 @@ func RecordRequestCounter(modelName, targetModelName string) {
109121
requestCounter.WithLabelValues(modelName, targetModelName).Inc()
110122
}
111123

124+
// RecordRequestErrCounter records the number of error requests.
125+
func RecordRequestErrCounter(modelName, targetModelName string, code infextprocerror.ErrorCode) {
126+
requestErrCounter.WithLabelValues(modelName, targetModelName, code.String()).Inc()
127+
}
128+
112129
// RecordRequestSizes records the request sizes.
113130
func RecordRequestSizes(modelName, targetModelName string, reqSize int) {
114131
requestSizes.WithLabelValues(modelName, targetModelName).Observe(float64(reqSize))

pkg/ext-proc/metrics/metrics_test.go

+61
Original file line numberDiff line numberDiff line change
@@ -5,11 +5,13 @@ import (
55
"testing"
66
"time"
77

8+
infextprocerror "inference.networking.x-k8s.io/gateway-api-inference-extension/pkg/ext-proc/util/error"
89
"k8s.io/component-base/metrics/legacyregistry"
910
"k8s.io/component-base/metrics/testutil"
1011
)
1112

1213
const RequestTotalMetric = InferenceModelComponent + "_request_total"
14+
const RequestErrorTotalMetric = InferenceModelComponent + "_request_error_total"
1315
const RequestLatenciesMetric = InferenceModelComponent + "_request_duration_seconds"
1416
const RequestSizesMetric = InferenceModelComponent + "_request_sizes"
1517
const ResponseSizesMetric = InferenceModelComponent + "_response_sizes"
@@ -86,6 +88,65 @@ func TestRecordRequestCounterandSizes(t *testing.T) {
8688
}
8789
}
8890

91+
func TestRecordRequestErrorCounter(t *testing.T) {
92+
type requests struct {
93+
modelName string
94+
targetModelName string
95+
error infextprocerror.ErrorCode
96+
}
97+
scenarios := []struct {
98+
name string
99+
reqs []requests
100+
invalid bool
101+
}{{
102+
name: "multiple requests",
103+
reqs: []requests{
104+
{
105+
modelName: "m10",
106+
targetModelName: "t10",
107+
error: infextprocerror.Internal,
108+
},
109+
{
110+
modelName: "m10",
111+
targetModelName: "t10",
112+
error: infextprocerror.Internal,
113+
},
114+
{
115+
modelName: "m10",
116+
targetModelName: "t11",
117+
error: infextprocerror.ModelServerError,
118+
},
119+
{
120+
modelName: "m20",
121+
targetModelName: "t20",
122+
error: infextprocerror.ResourceExhausted,
123+
},
124+
},
125+
},
126+
}
127+
Register()
128+
for _, scenario := range scenarios {
129+
t.Run(scenario.name, func(t *testing.T) {
130+
for _, req := range scenario.reqs {
131+
RecordRequestErrCounter(req.modelName, req.targetModelName, req.error)
132+
}
133+
134+
wantRequestErrorCounter, err := os.Open("testdata/request_error_total_metric")
135+
defer func() {
136+
if err := wantRequestErrorCounter.Close(); err != nil {
137+
t.Error(err)
138+
}
139+
}()
140+
if err != nil {
141+
t.Fatal(err)
142+
}
143+
if err := testutil.GatherAndCompare(legacyregistry.DefaultGatherer, wantRequestErrorCounter, RequestErrorTotalMetric); err != nil {
144+
t.Error(err)
145+
}
146+
})
147+
}
148+
}
149+
89150
func TestRecordRequestLatencies(t *testing.T) {
90151
timeBaseline := time.Now()
91152
type requests struct {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
# HELP inference_model_request_error_total [ALPHA] Counter of inference model requests errors broken out for each model and target model.
2+
# TYPE inference_model_request_error_total counter
3+
inference_model_request_error_total{model_name="m10",target_model_name="t10",type="Internal"} 2
4+
inference_model_request_error_total{model_name="m10",target_model_name="t11",type="ModelServer"} 1
5+
inference_model_request_error_total{model_name="m20",target_model_name="t20",type="ResourceExhausted"} 1

pkg/ext-proc/scheduling/scheduler.go

+3-4
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,8 @@ import (
55
"fmt"
66
"math/rand"
77

8-
"google.golang.org/grpc/codes"
9-
"google.golang.org/grpc/status"
108
"inference.networking.x-k8s.io/gateway-api-inference-extension/pkg/ext-proc/backend"
9+
infextprocerror "inference.networking.x-k8s.io/gateway-api-inference-extension/pkg/ext-proc/util/error"
1110
logutil "inference.networking.x-k8s.io/gateway-api-inference-extension/pkg/ext-proc/util/logging"
1211
klog "k8s.io/klog/v2"
1312
)
@@ -84,8 +83,8 @@ var (
8483
name: "drop request",
8584
filter: func(req *LLMRequest, pods []*backend.PodMetrics) ([]*backend.PodMetrics, error) {
8685
klog.Infof("Dropping request %v", req)
87-
return []*backend.PodMetrics{}, status.Errorf(
88-
codes.ResourceExhausted, "dropping request due to limited backend resources")
86+
return []*backend.PodMetrics{}, infextprocerror.Error{
87+
Code: infextprocerror.ResourceExhausted, Msg: "dropping request due to limited backend resources"}
8988
},
9089
},
9190
}

0 commit comments

Comments
 (0)