Skip to content

Commit a93aa22

Browse files
committed
add request metrics
Signed-off-by: Jie WU <[email protected]> add request metrics Signed-off-by: Jie WU <[email protected]> rename api and metrics fix go mod Adding metrics handler Signed-off-by: Jie WU <[email protected]> Adding metrics handler Signed-off-by: Jie WU <[email protected]> add request metrics rename api and metrics fix mod Updated request metrics to be handled in server processing loop Signed-off-by: Jie WU <[email protected]> Updated request metrics to be handled in server processing loop Signed-off-by: Jie WU <[email protected]> fix go mod Signed-off-by: Jie WU <[email protected]> fix go mod Signed-off-by: Jie WU <[email protected]> remove preconfigured buffered response Signed-off-by: Jie WU <[email protected]> Add streamed response Signed-off-by: Jie WU <[email protected]> Handle latency with response Signed-off-by: Jie WU <[email protected]> refactor Signed-off-by: Jie WU <[email protected]> fmt Signed-off-by: Jie WU <[email protected]> fmt Signed-off-by: Jie WU <[email protected]> fmt Signed-off-by: Jie WU <[email protected]> refactor server Signed-off-by: Jie WU <[email protected]> metrics auth
1 parent 25f5156 commit a93aa22

11 files changed

+582
-6
lines changed

go.mod

+22-1
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ require (
1212
github.com/jhump/protoreflect v1.17.0
1313
github.com/onsi/ginkgo/v2 v2.22.2
1414
github.com/onsi/gomega v1.36.2
15+
github.com/prometheus/client_golang v1.20.4
1516
github.com/prometheus/client_model v0.6.1
1617
github.com/prometheus/common v0.61.0
1718
github.com/stretchr/testify v1.10.0
@@ -20,8 +21,10 @@ require (
2021
google.golang.org/protobuf v1.36.2
2122
k8s.io/api v0.31.4
2223
k8s.io/apimachinery v0.31.4
24+
k8s.io/apiserver v0.31.4
2325
k8s.io/client-go v0.31.4
2426
k8s.io/code-generator v0.31.4
27+
k8s.io/component-base v0.31.4
2528
k8s.io/klog/v2 v2.130.1
2629
sigs.k8s.io/controller-runtime v0.19.4
2730
sigs.k8s.io/structured-merge-diff/v4 v4.5.0
@@ -37,8 +40,12 @@ require (
3740
github.com/Masterminds/sprig v2.22.0+incompatible // indirect
3841
github.com/Masterminds/sprig/v3 v3.2.3 // indirect
3942
github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751 // indirect
43+
github.com/antlr4-go/antlr/v4 v4.13.0 // indirect
44+
github.com/asaskevich/govalidator v0.0.0-20190424111038-f61b66f89f4a // indirect
4045
github.com/beorn7/perks v1.0.1 // indirect
46+
github.com/blang/semver/v4 v4.0.0 // indirect
4147
github.com/bufbuild/protocompile v0.14.1 // indirect
48+
github.com/cenkalti/backoff/v4 v4.3.0 // indirect
4249
github.com/cespare/xxhash/v2 v2.3.0 // indirect
4350
github.com/cncf/xds/go v0.0.0-20240905190251-b4127c9b8d78 // indirect
4451
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
@@ -48,9 +55,11 @@ require (
4855
github.com/envoyproxy/protoc-gen-validate v1.1.0 // indirect
4956
github.com/evanphx/json-patch/v5 v5.9.0 // indirect
5057
github.com/fatih/color v1.16.0 // indirect
58+
github.com/felixge/httpsnoop v1.0.4 // indirect
5159
github.com/fsnotify/fsnotify v1.7.0 // indirect
5260
github.com/fxamacker/cbor/v2 v2.7.0 // indirect
5361
github.com/go-logr/logr v1.4.2 // indirect
62+
github.com/go-logr/stdr v1.2.2 // indirect
5463
github.com/go-openapi/jsonpointer v0.19.6 // indirect
5564
github.com/go-openapi/jsonreference v0.20.2 // indirect
5665
github.com/go-openapi/swag v0.22.4 // indirect
@@ -60,17 +69,20 @@ require (
6069
github.com/gogo/protobuf v1.3.2 // indirect
6170
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
6271
github.com/golang/protobuf v1.5.4 // indirect
72+
github.com/google/cel-go v0.20.1 // indirect
6373
github.com/google/gnostic-models v0.6.8 // indirect
6474
github.com/google/gofuzz v1.2.0 // indirect
6575
github.com/google/pprof v0.0.0-20241210010833-40e02aabc2ad // indirect
6676
github.com/google/uuid v1.6.0 // indirect
77+
github.com/grpc-ecosystem/grpc-gateway/v2 v2.20.0 // indirect
6778
github.com/huandu/xstrings v1.3.3 // indirect
6879
github.com/imdario/mergo v0.3.11 // indirect
6980
github.com/inconshreveable/mousetrap v1.1.0 // indirect
7081
github.com/jinzhu/configor v1.2.1 // indirect
7182
github.com/josharian/intern v1.0.0 // indirect
7283
github.com/json-iterator/go v1.1.12 // indirect
7384
github.com/klauspost/compress v1.17.9 // indirect
85+
github.com/kylelemons/godebug v1.1.0 // indirect
7486
github.com/mailru/easyjson v0.7.7 // indirect
7587
github.com/mattn/go-colorable v0.1.13 // indirect
7688
github.com/mattn/go-isatty v0.0.20 // indirect
@@ -82,13 +94,21 @@ require (
8294
github.com/pkg/errors v0.9.1 // indirect
8395
github.com/planetscale/vtprotobuf v0.6.1-0.20240319094008-0393e58bdf10 // indirect
8496
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
85-
github.com/prometheus/client_golang v1.20.4 // indirect
8697
github.com/prometheus/procfs v0.15.1 // indirect
8798
github.com/shopspring/decimal v1.2.0 // indirect
8899
github.com/spf13/cast v1.4.1 // indirect
89100
github.com/spf13/cobra v1.8.1 // indirect
90101
github.com/spf13/pflag v1.0.5 // indirect
102+
github.com/stoewer/go-strcase v1.2.0 // indirect
91103
github.com/x448/float16 v0.8.4 // indirect
104+
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.53.0 // indirect
105+
go.opentelemetry.io/otel v1.31.0 // indirect
106+
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.28.0 // indirect
107+
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.27.0 // indirect
108+
go.opentelemetry.io/otel/metric v1.31.0 // indirect
109+
go.opentelemetry.io/otel/sdk v1.31.0 // indirect
110+
go.opentelemetry.io/otel/trace v1.31.0 // indirect
111+
go.opentelemetry.io/proto/otlp v1.3.1 // indirect
92112
go.uber.org/zap v1.27.0 // indirect
93113
golang.org/x/crypto v0.31.0 // indirect
94114
golang.org/x/exp v0.0.0-20230515195305-f3d0a9c9a5cc // indirect
@@ -113,6 +133,7 @@ require (
113133
k8s.io/gengo/v2 v2.0.0-20240228010128-51d4e06bde70 // indirect
114134
k8s.io/kube-openapi v0.0.0-20240228011516-70dd3763d340 // indirect
115135
k8s.io/utils v0.0.0-20240711033017-18e509b52bc8 // indirect
136+
sigs.k8s.io/apiserver-network-proxy/konnectivity-client v0.30.4 // indirect
116137
sigs.k8s.io/controller-tools v0.14.0 // indirect
117138
sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd // indirect
118139
sigs.k8s.io/yaml v1.4.0 // indirect

pkg/ext-proc/handlers/request.go

+2
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,8 @@ func (s *Server) HandleRequestBody(reqCtx *RequestContext, req *extProcPb.Proces
7676
klog.V(3).Infof("Selected target model %v in target pod: %v\n", llmReq.ResolvedTargetModel, targetPod)
7777

7878
reqCtx.Model = llmReq.Model
79+
reqCtx.ResolvedTargetModel = llmReq.ResolvedTargetModel
80+
reqCtx.RequestSize = len(v.RequestBody.Body)
7981
reqCtx.TargetPod = targetPod
8082

8183
// Insert "target-pod" to instruct Envoy to route requests to the specified target pod.

pkg/ext-proc/handlers/response.go

+6
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,12 @@ func (s *Server) HandleResponseBody(reqCtx *RequestContext, req *extProcPb.Proce
7373
return nil, fmt.Errorf("unmarshaling response body: %v", err)
7474
}
7575
reqCtx.Response = res
76+
// ResponseComplete is to indicate the response is complete. In non-streaming
77+
// case, it will be set to be true once the response is processed; in
78+
// streaming case, it will be set to be true once the last chunk is processed.
79+
// TODO(https://github.com/kubernetes-sigs/gateway-api-inference-extension/issues/178)
80+
// will add the processing for streaming case.
81+
reqCtx.ResponseComplete = true
7682
klog.V(3).Infof("Response: %+v", res)
7783

7884
resp := &extProcPb.ProcessingResponse{

pkg/ext-proc/handlers/server.go

+19-4
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,15 @@ package handlers
22

33
import (
44
"io"
5+
"time"
56

67
extProcPb "github.com/envoyproxy/go-control-plane/envoy/service/ext_proc/v3"
78
envoyTypePb "github.com/envoyproxy/go-control-plane/envoy/type/v3"
89
"google.golang.org/grpc/codes"
910
"google.golang.org/grpc/status"
1011
"inference.networking.x-k8s.io/gateway-api-inference-extension/api/v1alpha1"
1112
"inference.networking.x-k8s.io/gateway-api-inference-extension/pkg/ext-proc/backend"
13+
"inference.networking.x-k8s.io/gateway-api-inference-extension/pkg/ext-proc/metrics"
1214
"inference.networking.x-k8s.io/gateway-api-inference-extension/pkg/ext-proc/scheduling"
1315
klog "k8s.io/klog/v2"
1416
)
@@ -75,22 +77,30 @@ func (s *Server) Process(srv extProcPb.ExternalProcessor_ProcessServer) error {
7577
var resp *extProcPb.ProcessingResponse
7678
switch v := req.Request.(type) {
7779
case *extProcPb.ProcessingRequest_RequestHeaders:
80+
reqCtx.RequestReceivedTimestamp = time.Now()
7881
resp = HandleRequestHeaders(reqCtx, req)
7982
klog.V(3).Infof("Request context after HandleRequestHeaders: %+v", reqCtx)
8083
case *extProcPb.ProcessingRequest_RequestBody:
8184
resp, err = s.HandleRequestBody(reqCtx, req)
85+
if err == nil {
86+
metrics.RecordRequestCounter(reqCtx.Model, reqCtx.ResolvedTargetModel)
87+
metrics.RecordRequestSizes(reqCtx.Model, reqCtx.ResolvedTargetModel, reqCtx.RequestSize)
88+
}
8289
klog.V(3).Infof("Request context after HandleRequestBody: %+v", reqCtx)
8390
case *extProcPb.ProcessingRequest_ResponseHeaders:
8491
resp, err = s.HandleResponseHeaders(reqCtx, req)
8592
klog.V(3).Infof("Request context after HandleResponseHeaders: %+v", reqCtx)
8693
case *extProcPb.ProcessingRequest_ResponseBody:
8794
resp, err = s.HandleResponseBody(reqCtx, req)
95+
if err == nil && reqCtx.ResponseComplete {
96+
reqCtx.ResponseCompleteTimestamp = time.Now()
97+
metrics.RecordRequestLatencies(reqCtx.Model, reqCtx.ResolvedTargetModel, reqCtx.RequestReceivedTimestamp, reqCtx.ResponseCompleteTimestamp)
98+
}
8899
klog.V(3).Infof("Request context after HandleResponseBody: %+v", reqCtx)
89100
default:
90101
klog.Errorf("Unknown Request type %+v", v)
91102
return status.Error(codes.Unknown, "unknown request type")
92103
}
93-
94104
if err != nil {
95105
klog.Errorf("failed to process request: %v", err)
96106
switch status.Code(err) {
@@ -121,7 +131,12 @@ func (s *Server) Process(srv extProcPb.ExternalProcessor_ProcessServer) error {
121131

122132
// RequestContext stores context information during the life time of an HTTP request.
123133
type RequestContext struct {
124-
TargetPod backend.Pod
125-
Model string
126-
Response Response
134+
TargetPod backend.Pod
135+
Model string
136+
ResolvedTargetModel string
137+
RequestReceivedTimestamp time.Time
138+
ResponseCompleteTimestamp time.Time
139+
RequestSize int
140+
Response Response
141+
ResponseComplete bool
127142
}

pkg/ext-proc/main.go

+64-1
Original file line numberDiff line numberDiff line change
@@ -1,25 +1,33 @@
11
package main
22

33
import (
4+
"context"
45
"flag"
56
"fmt"
67
"net"
8+
"net/http"
9+
"strconv"
710
"time"
811

912
extProcPb "github.com/envoyproxy/go-control-plane/envoy/service/ext_proc/v3"
13+
"github.com/prometheus/client_golang/prometheus/promhttp"
1014
"google.golang.org/grpc"
1115
healthPb "google.golang.org/grpc/health/grpc_health_v1"
1216
"inference.networking.x-k8s.io/gateway-api-inference-extension/api/v1alpha1"
1317
"inference.networking.x-k8s.io/gateway-api-inference-extension/pkg/ext-proc/backend"
1418
"inference.networking.x-k8s.io/gateway-api-inference-extension/pkg/ext-proc/backend/vllm"
1519
"inference.networking.x-k8s.io/gateway-api-inference-extension/pkg/ext-proc/handlers"
20+
"inference.networking.x-k8s.io/gateway-api-inference-extension/pkg/ext-proc/metrics"
1621
"inference.networking.x-k8s.io/gateway-api-inference-extension/pkg/ext-proc/scheduling"
1722
"k8s.io/apimachinery/pkg/runtime"
1823
"k8s.io/apimachinery/pkg/types"
1924
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
2025
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
26+
"k8s.io/client-go/rest"
27+
"k8s.io/component-base/metrics/legacyregistry"
2128
klog "k8s.io/klog/v2"
2229
ctrl "sigs.k8s.io/controller-runtime"
30+
"sigs.k8s.io/controller-runtime/pkg/metrics/filters"
2331
)
2432

2533
var (
@@ -31,6 +39,8 @@ var (
3139
"grpcHealthPort",
3240
9003,
3341
"The port used for gRPC liveness and readiness probes")
42+
metricsPort = flag.Int(
43+
"metricsPort", 9090, "The metrics port")
3444
targetPodHeader = flag.String(
3545
"targetPodHeader",
3646
"target-pod",
@@ -73,7 +83,10 @@ func main() {
7383
flag.Parse()
7484

7585
ctrl.SetLogger(klog.TODO())
76-
86+
cfg, err := ctrl.GetConfig()
87+
if err != nil {
88+
klog.Fatalf("Failed to get rest config: %v", err)
89+
}
7790
// Validate flags
7891
if err := validateFlags(); err != nil {
7992
klog.Fatalf("Failed to validate flags: %v", err)
@@ -142,6 +155,8 @@ func main() {
142155
*refreshMetricsInterval,
143156
*targetPodHeader,
144157
)
158+
// Start metrics handler
159+
metricsSvr := startMetricsHandler(*metricsPort, cfg)
145160

146161
// Start the controller manager. Blocking and will return when shutdown is complete.
147162
klog.Infof("Starting controller manager")
@@ -159,6 +174,12 @@ func main() {
159174
klog.Info("Ext-proc server shutting down")
160175
extProcSvr.GracefulStop()
161176
}
177+
if metricsSvr != nil {
178+
klog.Info("Metrics server shutting down")
179+
if err := metricsSvr.Shutdown(context.Background()); err != nil {
180+
klog.Infof("Metrics server Shutdown: %v", err)
181+
}
182+
}
162183

163184
klog.Info("All components shutdown")
164185
}
@@ -221,6 +242,48 @@ func startExternalProcessorServer(
221242
return svr
222243
}
223244

245+
func startMetricsHandler(port int, cfg *rest.Config) *http.Server {
246+
metrics.Register()
247+
248+
var svr *http.Server
249+
go func() {
250+
klog.Info("Starting metrics HTTP handler ...")
251+
252+
mux := http.NewServeMux()
253+
mux.Handle("/metrics", metricsHandlerWithAuthenticationAndAuthorization(cfg))
254+
255+
svr = &http.Server{
256+
Addr: net.JoinHostPort("", strconv.Itoa(port)),
257+
Handler: mux,
258+
}
259+
if err := svr.ListenAndServe(); err != http.ErrServerClosed {
260+
klog.Fatalf("failed to start metrics HTTP handler: %v", err)
261+
}
262+
}()
263+
return svr
264+
}
265+
266+
func metricsHandlerWithAuthenticationAndAuthorization(cfg *rest.Config) http.Handler {
267+
h := promhttp.HandlerFor(
268+
legacyregistry.DefaultGatherer,
269+
promhttp.HandlerOpts{},
270+
)
271+
httpClient, err := rest.HTTPClientFor(cfg)
272+
if err != nil {
273+
klog.Fatalf("failed to create http client for metrics auth: %v", err)
274+
}
275+
276+
filter, err := filters.WithAuthenticationAndAuthorization(cfg, httpClient)
277+
if err != nil {
278+
klog.Fatalf("failed to create metrics filter for auth: %v", err)
279+
}
280+
metricsAuthHandler, err := filter(klog.TODO(), h)
281+
if err != nil {
282+
klog.Fatalf("failed to create metrics auth handler: %v", err)
283+
}
284+
return metricsAuthHandler
285+
}
286+
224287
func validateFlags() error {
225288
if *poolName == "" {
226289
return fmt.Errorf("required %q flag not set", "poolName")

pkg/ext-proc/metrics/metrics.go

+86
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
package metrics
2+
3+
import (
4+
"sync"
5+
"time"
6+
7+
compbasemetrics "k8s.io/component-base/metrics"
8+
"k8s.io/component-base/metrics/legacyregistry"
9+
klog "k8s.io/klog/v2"
10+
)
11+
12+
const (
13+
InferenceModelComponent = "inference_model"
14+
)
15+
16+
var (
17+
requestCounter = compbasemetrics.NewCounterVec(
18+
&compbasemetrics.CounterOpts{
19+
Subsystem: InferenceModelComponent,
20+
Name: "request_total",
21+
Help: "Counter of inference model requests broken out for each model and target model.",
22+
StabilityLevel: compbasemetrics.ALPHA,
23+
},
24+
[]string{"model_name", "target_model_name"},
25+
)
26+
27+
requestLatencies = compbasemetrics.NewHistogramVec(
28+
&compbasemetrics.HistogramOpts{
29+
Subsystem: InferenceModelComponent,
30+
Name: "request_duration_seconds",
31+
Help: "Inference model response latency distribution in seconds for each model and target model.",
32+
Buckets: []float64{0.005, 0.025, 0.05, 0.1, 0.2, 0.4, 0.6, 0.8, 1.0, 1.25, 1.5, 2, 3,
33+
4, 5, 6, 8, 10, 15, 20, 30, 45, 60, 120, 180, 240, 300, 360, 480, 600, 900, 1200, 1800, 2700, 3600},
34+
StabilityLevel: compbasemetrics.ALPHA,
35+
},
36+
[]string{"model_name", "target_model_name"},
37+
)
38+
39+
requestSizes = compbasemetrics.NewHistogramVec(
40+
&compbasemetrics.HistogramOpts{
41+
Subsystem: InferenceModelComponent,
42+
Name: "request_sizes",
43+
Help: "Inference model requests size distribution in bytes for each model and target model.",
44+
// Use buckets ranging from 1000 bytes (1KB) to 10^9 bytes (1GB).
45+
Buckets: []float64{
46+
64, 128, 256, 512, 1024, 2048, 4096, 8192, 16384, 32768, 65536, // More fine-grained up to 64KB
47+
131072, 262144, 524288, 1048576, 2097152, 4194304, 8388608, // Exponential up to 8MB
48+
16777216, 33554432, 67108864, 134217728, 268435456, 536870912, 1073741824, // Exponential up to 1GB
49+
},
50+
StabilityLevel: compbasemetrics.ALPHA,
51+
},
52+
[]string{"model_name", "target_model_name"},
53+
)
54+
)
55+
56+
var registerMetrics sync.Once
57+
58+
// Register all metrics.
59+
func Register() {
60+
registerMetrics.Do(func() {
61+
legacyregistry.MustRegister(requestCounter)
62+
legacyregistry.MustRegister(requestLatencies)
63+
legacyregistry.MustRegister(requestSizes)
64+
})
65+
}
66+
67+
// RecordRequstCounter records the number of requests.
68+
func RecordRequestCounter(modelName, targetModelName string) {
69+
requestCounter.WithLabelValues(modelName, targetModelName).Inc()
70+
}
71+
72+
// RecordRequestSizes records the request sizes.
73+
func RecordRequestSizes(modelName, targetModelName string, reqSize int) {
74+
requestSizes.WithLabelValues(modelName, targetModelName).Observe(float64(reqSize))
75+
}
76+
77+
// RecordRequstLatencies records duration of request.
78+
func RecordRequestLatencies(modelName, targetModelName string, received time.Time, complete time.Time) bool {
79+
if !complete.After(received) {
80+
klog.Errorf("request latency value error for model name %v, target model name %v: complete time %v is before received time %v", modelName, targetModelName, complete, received)
81+
return false
82+
}
83+
elapsedSeconds := complete.Sub(received).Seconds()
84+
requestLatencies.WithLabelValues(modelName, targetModelName).Observe(elapsedSeconds)
85+
return true
86+
}

0 commit comments

Comments
 (0)