Skip to content
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.

Commit dace5b4

Browse files
committedFeb 18, 2025·
[Metrics] Add request error metrics
This change defines some general errors, the list might grow in the future if more finer error types are needed.
1 parent bc5eac6 commit dace5b4

File tree

9 files changed

+218
-25
lines changed

9 files changed

+218
-25
lines changed
 

‎pkg/ext-proc/handlers/request.go

+7-7
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ package handlers
33
import (
44
"context"
55
"encoding/json"
6-
"errors"
76
"fmt"
87
"strconv"
98

@@ -13,6 +12,7 @@ import (
1312
"sigs.k8s.io/controller-runtime/pkg/log"
1413
"sigs.k8s.io/gateway-api-inference-extension/pkg/ext-proc/backend"
1514
"sigs.k8s.io/gateway-api-inference-extension/pkg/ext-proc/scheduling"
15+
errutil "sigs.k8s.io/gateway-api-inference-extension/pkg/ext-proc/util/error"
1616
logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/ext-proc/util/logging"
1717
)
1818

@@ -33,14 +33,14 @@ func (s *Server) HandleRequestBody(
3333
var rb map[string]interface{}
3434
if err := json.Unmarshal(v.RequestBody.Body, &rb); err != nil {
3535
logger.V(logutil.DEFAULT).Error(err, "Error unmarshaling request body")
36-
return nil, fmt.Errorf("error unmarshaling request body: %v", err)
36+
return nil, errutil.Error{Code: errutil.BadRequest, Msg: fmt.Sprintf("error unmarshaling request body: %v", err)}
3737
}
3838
loggerVerbose.Info("Request body unmarshalled", "body", rb)
3939

4040
// Resolve target models.
4141
model, ok := rb["model"].(string)
4242
if !ok {
43-
return nil, errors.New("model not found in request")
43+
return nil, errutil.Error{Code: errutil.BadRequest, Msg: "model not found in request"}
4444
}
4545
loggerVerbose.Info("Model requested", "model", model)
4646
modelName := model
@@ -50,12 +50,12 @@ func (s *Server) HandleRequestBody(
5050
// are able to be requested by using their distinct name.
5151
modelObj := s.datastore.FetchModelData(model)
5252
if modelObj == nil {
53-
return nil, fmt.Errorf("error finding a model object in InferenceModel for input %v", model)
53+
return nil, errutil.Error{Code: errutil.Internal, Msg: fmt.Sprintf("error finding a model object in InferenceModel for input %v", model)}
5454
}
5555
if len(modelObj.Spec.TargetModels) > 0 {
5656
modelName = backend.RandomWeightedDraw(logger, modelObj, 0)
5757
if modelName == "" {
58-
return nil, fmt.Errorf("error getting target model name for model %v", modelObj.Name)
58+
return nil, errutil.Error{Code: errutil.Internal, Msg: fmt.Sprintf("error getting target model name for model %v", modelObj.Name)}
5959
}
6060
}
6161
llmReq := &scheduling.LLMRequest{
@@ -73,14 +73,14 @@ func (s *Server) HandleRequestBody(
7373
requestBody, err = json.Marshal(rb)
7474
if err != nil {
7575
logger.V(logutil.DEFAULT).Error(err, "Error marshaling request body")
76-
return nil, fmt.Errorf("error marshaling request body: %v", err)
76+
return nil, errutil.Error{Code: errutil.Internal, Msg: fmt.Sprintf("error marshaling request body: %v", err)}
7777
}
7878
loggerVerbose.Info("Updated request body marshalled", "body", string(requestBody))
7979
}
8080

8181
targetPod, err := s.scheduler.Schedule(ctx, llmReq)
8282
if err != nil {
83-
return nil, fmt.Errorf("failed to find target pod: %w", err)
83+
return nil, errutil.Error{Code: errutil.InferencePoolResourceExhausted, Msg: fmt.Errorf("failed to find target pod: %w", err).Error()}
8484
}
8585
logger.V(logutil.DEFAULT).Info("Request handled",
8686
"model", llmReq.Model, "targetModel", llmReq.ResolvedTargetModel, "endpoint", targetPod)

‎pkg/ext-proc/handlers/response.go

+39-1
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88
configPb "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
99
extProcPb "github.com/envoyproxy/go-control-plane/envoy/service/ext_proc/v3"
1010
"sigs.k8s.io/controller-runtime/pkg/log"
11+
errutil "sigs.k8s.io/gateway-api-inference-extension/pkg/ext-proc/util/error"
1112
logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/ext-proc/util/logging"
1213
)
1314

@@ -22,6 +23,43 @@ func (s *Server) HandleResponseHeaders(
2223
h := req.Request.(*extProcPb.ProcessingRequest_ResponseHeaders)
2324
loggerVerbose.Info("Headers before", "headers", h)
2425

26+
// Example header
27+
// {
28+
// "ResponseHeaders": {
29+
// "headers": [
30+
// {
31+
// "key": ":status",
32+
// "raw_value": "200"
33+
// },
34+
// {
35+
// "key": "date",
36+
// "raw_value": "Thu, 30 Jan 2025 18:50:48 GMT"
37+
// },
38+
// {
39+
// "key": "server",
40+
// "raw_value": "uvicorn"
41+
// },
42+
// {
43+
// "key": "content-type",
44+
// "raw_value": "text/event-stream; charset=utf-8"
45+
// },
46+
// {
47+
// "key": "transfer-encoding",
48+
// "raw_value": "chunked"
49+
// }
50+
// ]
51+
// }
52+
// }
53+
for _, header := range h.ResponseHeaders.Headers.GetHeaders() {
54+
if header.Key == "status" {
55+
code := header.RawValue[0]
56+
if string(code) != "200" {
57+
reqCtx.ResponseStatusCode = errutil.ModelServerError
58+
}
59+
break
60+
}
61+
}
62+
2563
resp := &extProcPb.ProcessingResponse{
2664
Response: &extProcPb.ProcessingResponse_ResponseHeaders{
2765
ResponseHeaders: &extProcPb.HeadersResponse{
@@ -83,7 +121,7 @@ func (s *Server) HandleResponseBody(
83121

84122
res := Response{}
85123
if err := json.Unmarshal(body.ResponseBody.Body, &res); err != nil {
86-
return nil, fmt.Errorf("unmarshaling response body: %v", err)
124+
return nil, errutil.Error{Code: errutil.Internal, Msg: fmt.Sprintf("unmarshaling response body: %v", err)}
87125
}
88126
reqCtx.Response = res
89127
reqCtx.ResponseSize = len(body.ResponseBody.Body)

‎pkg/ext-proc/handlers/server.go

+43-5
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ import (
1515
"sigs.k8s.io/gateway-api-inference-extension/pkg/ext-proc/backend"
1616
"sigs.k8s.io/gateway-api-inference-extension/pkg/ext-proc/metrics"
1717
"sigs.k8s.io/gateway-api-inference-extension/pkg/ext-proc/scheduling"
18+
errutil "sigs.k8s.io/gateway-api-inference-extension/pkg/ext-proc/util/error"
1819
logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/ext-proc/util/logging"
1920
)
2021

@@ -62,18 +63,30 @@ func (s *Server) Process(srv extProcPb.ExternalProcessor_ProcessServer) error {
6263
// See https://github.com/envoyproxy/envoy/issues/17540.
6364
reqCtx := &RequestContext{}
6465

66+
// Create variable for error handling as each request should only report once for
67+
// error metric. This doesn't cover the error "Cannot receive stream request" because
68+
// such error might happen even the response is processed.
69+
var err error
70+
defer func(error) {
71+
if reqCtx.ResponseStatusCode != "" {
72+
metrics.RecordRequestErrCounter(reqCtx.Model, reqCtx.ResolvedTargetModel, reqCtx.ResponseStatusCode)
73+
} else if err != nil {
74+
metrics.RecordRequestErrCounter(reqCtx.Model, reqCtx.ResolvedTargetModel, errutil.CanonicalCode(err))
75+
}
76+
}(err)
77+
6578
for {
6679
select {
6780
case <-ctx.Done():
6881
return ctx.Err()
6982
default:
7083
}
7184

72-
req, err := srv.Recv()
73-
if err == io.EOF || errors.Is(err, context.Canceled) {
85+
req, recvErr := srv.Recv()
86+
if recvErr == io.EOF || errors.Is(recvErr, context.Canceled) {
7487
return nil
7588
}
76-
if err != nil {
89+
if recvErr != nil {
7790
// This error occurs very frequently, though it doesn't seem to have any impact.
7891
// TODO Figure out if we can remove this noise.
7992
loggerVerbose.Error(err, "Cannot receive stream request")
@@ -110,12 +123,13 @@ func (s *Server) Process(srv extProcPb.ExternalProcessor_ProcessServer) error {
110123
logger.V(logutil.DEFAULT).Error(nil, "Unknown Request type", "request", v)
111124
return status.Error(codes.Unknown, "unknown request type")
112125
}
126+
113127
if err != nil {
114128
logger.V(logutil.DEFAULT).Error(err, "Failed to process request", "request", req)
115-
switch status.Code(err) {
129+
switch errutil.CanonicalCode(err) {
116130
// This code can be returned by scheduler when there is no capacity for sheddable
117131
// requests.
118-
case codes.ResourceExhausted:
132+
case errutil.InferencePoolResourceExhausted:
119133
resp = &extProcPb.ProcessingResponse{
120134
Response: &extProcPb.ProcessingResponse_ImmediateResponse{
121135
ImmediateResponse: &extProcPb.ImmediateResponse{
@@ -125,6 +139,28 @@ func (s *Server) Process(srv extProcPb.ExternalProcessor_ProcessServer) error {
125139
},
126140
},
127141
}
142+
// This code can be returned by when EPP processes the request and run into server-side errors.
143+
case errutil.Internal:
144+
resp = &extProcPb.ProcessingResponse{
145+
Response: &extProcPb.ProcessingResponse_ImmediateResponse{
146+
ImmediateResponse: &extProcPb.ImmediateResponse{
147+
Status: &envoyTypePb.HttpStatus{
148+
Code: envoyTypePb.StatusCode_InternalServerError,
149+
},
150+
},
151+
},
152+
}
153+
// This code can be returned when users provide invalid json request.
154+
case errutil.BadRequest:
155+
resp = &extProcPb.ProcessingResponse{
156+
Response: &extProcPb.ProcessingResponse_ImmediateResponse{
157+
ImmediateResponse: &extProcPb.ImmediateResponse{
158+
Status: &envoyTypePb.HttpStatus{
159+
Code: envoyTypePb.StatusCode_BadRequest,
160+
},
161+
},
162+
},
163+
}
128164
default:
129165
return status.Errorf(status.Code(err), "failed to handle request: %v", err)
130166
}
@@ -136,6 +172,7 @@ func (s *Server) Process(srv extProcPb.ExternalProcessor_ProcessServer) error {
136172
return status.Errorf(codes.Unknown, "failed to send response back to Envoy: %v", err)
137173
}
138174
}
175+
139176
}
140177

141178
// RequestContext stores context information during the life time of an HTTP request.
@@ -149,4 +186,5 @@ type RequestContext struct {
149186
Response Response
150187
ResponseSize int
151188
ResponseComplete bool
189+
ResponseStatusCode string
152190
}

‎pkg/ext-proc/metrics/README.md

+1
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ spec:
4141
| Metric name | Metric Type | Description | Labels | Status |
4242
| ------------|--------------| ----------- | ------ | ------ |
4343
| inference_model_request_total | Counter | The counter of requests broken out for each model. | `model_name`=&lt;model-name&gt; <br> `target_model_name`=&lt;target-model-name&gt; | ALPHA |
44+
| inference_model_request_error_total | Counter | The counter of requests errors broken out for each model. | `model_name`=&lt;model-name&gt; <br> `target_model_name`=&lt;target-model-name&gt; | ALPHA |
4445
| inference_model_request_duration_seconds | Distribution | Distribution of response latency. | `model_name`=&lt;model-name&gt; <br> `target_model_name`=&lt;target-model-name&gt; | ALPHA |
4546
| inference_model_request_sizes | Distribution | Distribution of request size in bytes. | `model_name`=&lt;model-name&gt; <br> `target_model_name`=&lt;target-model-name&gt; | ALPHA |
4647
| inference_model_response_sizes | Distribution | Distribution of response size in bytes. | `model_name`=&lt;model-name&gt; <br> `target_model_name`=&lt;target-model-name&gt; | ALPHA |

‎pkg/ext-proc/metrics/metrics.go

+18
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,16 @@ var (
2828
[]string{"model_name", "target_model_name"},
2929
)
3030

31+
requestErrCounter = compbasemetrics.NewCounterVec(
32+
&compbasemetrics.CounterOpts{
33+
Subsystem: InferenceModelComponent,
34+
Name: "request_error_total",
35+
Help: "Counter of inference model requests errors broken out for each model and target model.",
36+
StabilityLevel: compbasemetrics.ALPHA,
37+
},
38+
[]string{"model_name", "target_model_name", "error_code"},
39+
)
40+
3141
requestLatencies = compbasemetrics.NewHistogramVec(
3242
&compbasemetrics.HistogramOpts{
3343
Subsystem: InferenceModelComponent,
@@ -123,6 +133,7 @@ var registerMetrics sync.Once
123133
func Register() {
124134
registerMetrics.Do(func() {
125135
legacyregistry.MustRegister(requestCounter)
136+
legacyregistry.MustRegister(requestErrCounter)
126137
legacyregistry.MustRegister(requestLatencies)
127138
legacyregistry.MustRegister(requestSizes)
128139
legacyregistry.MustRegister(responseSizes)
@@ -139,6 +150,13 @@ func RecordRequestCounter(modelName, targetModelName string) {
139150
requestCounter.WithLabelValues(modelName, targetModelName).Inc()
140151
}
141152

153+
// RecordRequestErrCounter records the number of error requests.
154+
func RecordRequestErrCounter(modelName, targetModelName string, code string) {
155+
if code != "" {
156+
requestErrCounter.WithLabelValues(modelName, targetModelName, code).Inc()
157+
}
158+
}
159+
142160
// RecordRequestSizes records the request sizes.
143161
func RecordRequestSizes(modelName, targetModelName string, reqSize int) {
144162
requestSizes.WithLabelValues(modelName, targetModelName).Observe(float64(reqSize))

‎pkg/ext-proc/metrics/metrics_test.go

+69-8
Original file line numberDiff line numberDiff line change
@@ -8,18 +8,20 @@ import (
88

99
"k8s.io/component-base/metrics/legacyregistry"
1010
"k8s.io/component-base/metrics/testutil"
11+
errutil "sigs.k8s.io/gateway-api-inference-extension/pkg/ext-proc/util/error"
1112
logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/ext-proc/util/logging"
1213
)
1314

1415
const (
15-
RequestTotalMetric = InferenceModelComponent + "_request_total"
16-
RequestLatenciesMetric = InferenceModelComponent + "_request_duration_seconds"
17-
RequestSizesMetric = InferenceModelComponent + "_request_sizes"
18-
ResponseSizesMetric = InferenceModelComponent + "_response_sizes"
19-
InputTokensMetric = InferenceModelComponent + "_input_tokens"
20-
OutputTokensMetric = InferenceModelComponent + "_output_tokens"
21-
KVCacheAvgUsageMetric = InferencePoolComponent + "_average_kv_cache_utilization"
22-
QueueAvgSizeMetric = InferencePoolComponent + "_average_queue_size"
16+
RequestTotalMetric = InferenceModelComponent + "_request_total"
17+
RequestErrorTotalMetric = InferenceModelComponent + "_request_error_total"
18+
RequestLatenciesMetric = InferenceModelComponent + "_request_duration_seconds"
19+
RequestSizesMetric = InferenceModelComponent + "_request_sizes"
20+
ResponseSizesMetric = InferenceModelComponent + "_response_sizes"
21+
InputTokensMetric = InferenceModelComponent + "_input_tokens"
22+
OutputTokensMetric = InferenceModelComponent + "_output_tokens"
23+
KVCacheAvgUsageMetric = InferencePoolComponent + "_average_kv_cache_utilization"
24+
QueueAvgSizeMetric = InferencePoolComponent + "_average_queue_size"
2325
)
2426

2527
func TestRecordRequestCounterandSizes(t *testing.T) {
@@ -91,6 +93,65 @@ func TestRecordRequestCounterandSizes(t *testing.T) {
9193
}
9294
}
9395

96+
func TestRecordRequestErrorCounter(t *testing.T) {
97+
type requests struct {
98+
modelName string
99+
targetModelName string
100+
error string
101+
}
102+
scenarios := []struct {
103+
name string
104+
reqs []requests
105+
invalid bool
106+
}{{
107+
name: "multiple requests",
108+
reqs: []requests{
109+
{
110+
modelName: "m10",
111+
targetModelName: "t10",
112+
error: errutil.Internal,
113+
},
114+
{
115+
modelName: "m10",
116+
targetModelName: "t10",
117+
error: errutil.Internal,
118+
},
119+
{
120+
modelName: "m10",
121+
targetModelName: "t11",
122+
error: errutil.ModelServerError,
123+
},
124+
{
125+
modelName: "m20",
126+
targetModelName: "t20",
127+
error: errutil.InferencePoolResourceExhausted,
128+
},
129+
},
130+
},
131+
}
132+
Register()
133+
for _, scenario := range scenarios {
134+
t.Run(scenario.name, func(t *testing.T) {
135+
for _, req := range scenario.reqs {
136+
RecordRequestErrCounter(req.modelName, req.targetModelName, req.error)
137+
}
138+
139+
wantRequestErrorCounter, err := os.Open("testdata/request_error_total_metric")
140+
defer func() {
141+
if err := wantRequestErrorCounter.Close(); err != nil {
142+
t.Error(err)
143+
}
144+
}()
145+
if err != nil {
146+
t.Fatal(err)
147+
}
148+
if err := testutil.GatherAndCompare(legacyregistry.DefaultGatherer, wantRequestErrorCounter, RequestErrorTotalMetric); err != nil {
149+
t.Error(err)
150+
}
151+
})
152+
}
153+
}
154+
94155
func TestRecordRequestLatencies(t *testing.T) {
95156
ctx := logutil.NewTestLoggerIntoContext(context.Background())
96157
timeBaseline := time.Now()
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
# HELP inference_model_request_error_total [ALPHA] Counter of inference model requests errors broken out for each model and target model.
2+
# TYPE inference_model_request_error_total counter
3+
inference_model_request_error_total{error_code="Internal", model_name="m10",target_model_name="t10"} 2
4+
inference_model_request_error_total{error_code="ModelServerError", model_name="m10",target_model_name="t11"} 1
5+
inference_model_request_error_total{error_code="InferencePoolResourceExhausted", model_name="m20",target_model_name="t20"} 1

‎pkg/ext-proc/scheduling/scheduler.go

+3-4
Original file line numberDiff line numberDiff line change
@@ -7,10 +7,9 @@ import (
77
"math/rand"
88

99
"github.com/go-logr/logr"
10-
"google.golang.org/grpc/codes"
11-
"google.golang.org/grpc/status"
1210
"sigs.k8s.io/controller-runtime/pkg/log"
1311
"sigs.k8s.io/gateway-api-inference-extension/pkg/ext-proc/backend"
12+
errutil "sigs.k8s.io/gateway-api-inference-extension/pkg/ext-proc/util/error"
1413
logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/ext-proc/util/logging"
1514
)
1615

@@ -86,8 +85,8 @@ var (
8685
name: "drop request",
8786
filter: func(logger logr.Logger, req *LLMRequest, pods []*backend.PodMetrics) ([]*backend.PodMetrics, error) {
8887
logger.V(logutil.DEFAULT).Info("Request dropped", "request", req)
89-
return []*backend.PodMetrics{}, status.Errorf(
90-
codes.ResourceExhausted, "dropping request due to limited backend resources")
88+
return []*backend.PodMetrics{}, errutil.Error{
89+
Code: errutil.InferencePoolResourceExhausted, Msg: "dropping request due to limited backend resources"}
9190
},
9291
},
9392
}

‎pkg/ext-proc/util/error/error.go

+33
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
package error
2+
3+
import (
4+
"fmt"
5+
)
6+
7+
// Error is an error struct for errors returned by the epp server.
8+
type Error struct {
9+
Code string
10+
Msg string
11+
}
12+
13+
const (
14+
Unknown = "Unknown"
15+
BadRequest = "BadRequest"
16+
Internal = "Internal"
17+
ModelServerError = "ModelServerError"
18+
InferencePoolResourceExhausted = "InferencePoolResourceExhausted"
19+
)
20+
21+
// Error returns a string version of the error.
22+
func (e Error) Error() string {
23+
return fmt.Sprintf("inference gateway: %s - %s", e.Code, e.Msg)
24+
}
25+
26+
// CanonicalCode returns the error's ErrorCode.
27+
func CanonicalCode(err error) string {
28+
e, ok := err.(Error)
29+
if ok {
30+
return e.Code
31+
}
32+
return Unknown
33+
}

0 commit comments

Comments
 (0)
Please sign in to comment.