Skip to content

Commit 7149624

Browse files
authored
[Metrics] Add average kv cache and waiting queue size metrics for (#304)
inference pool
1 parent 3ff0af8 commit 7149624

File tree

11 files changed

+189
-50
lines changed

11 files changed

+189
-50
lines changed

pkg/ext-proc/backend/provider.go

+37-1
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"time"
88

99
"go.uber.org/multierr"
10+
"inference.networking.x-k8s.io/gateway-api-inference-extension/pkg/ext-proc/metrics"
1011
logutil "inference.networking.x-k8s.io/gateway-api-inference-extension/pkg/ext-proc/util/logging"
1112
klog "k8s.io/klog/v2"
1213
)
@@ -58,7 +59,7 @@ func (p *Provider) GetPodMetrics(pod Pod) (*PodMetrics, bool) {
5859
return nil, false
5960
}
6061

61-
func (p *Provider) Init(refreshPodsInterval, refreshMetricsInterval time.Duration) error {
62+
func (p *Provider) Init(refreshPodsInterval, refreshMetricsInterval, refreshPrometheusMetricsInterval time.Duration) error {
6263
p.refreshPodsOnce()
6364

6465
if err := p.refreshMetricsOnce(); err != nil {
@@ -85,6 +86,14 @@ func (p *Provider) Init(refreshPodsInterval, refreshMetricsInterval time.Duratio
8586
}
8687
}()
8788

89+
// Periodically flush prometheus metrics for inference pool
90+
go func() {
91+
for {
92+
time.Sleep(refreshPrometheusMetricsInterval)
93+
p.flushPrometheusMetricsOnce()
94+
}
95+
}()
96+
8897
// Periodically print out the pods and metrics for DEBUGGING.
8998
if klog.V(logutil.DEBUG).Enabled() {
9099
go func() {
@@ -174,3 +183,30 @@ func (p *Provider) refreshMetricsOnce() error {
174183
}
175184
return errs
176185
}
186+
187+
func (p *Provider) flushPrometheusMetricsOnce() {
188+
klog.V(logutil.DEBUG).Infof("Flushing Prometheus Metrics")
189+
190+
pool, _ := p.datastore.getInferencePool()
191+
if pool == nil {
192+
// No inference pool or not initialize.
193+
return
194+
}
195+
196+
var kvCacheTotal float64
197+
var queueTotal int
198+
199+
podMetrics := p.AllPodMetrics()
200+
if len(podMetrics) == 0 {
201+
return
202+
}
203+
204+
for _, pod := range podMetrics {
205+
kvCacheTotal += pod.KVCacheUsagePercent
206+
queueTotal += pod.WaitingQueueSize
207+
}
208+
209+
podTotalCount := len(podMetrics)
210+
metrics.RecordInferencePoolAvgKVCache(pool.Name, kvCacheTotal/float64(podTotalCount))
211+
metrics.RecordInferencePoolAvgQueueSize(pool.Name, float64(queueTotal/podTotalCount))
212+
}

pkg/ext-proc/backend/provider_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,7 @@ func TestProvider(t *testing.T) {
9090
for _, test := range tests {
9191
t.Run(test.name, func(t *testing.T) {
9292
p := NewProvider(test.pmc, test.datastore)
93-
err := p.Init(time.Millisecond, time.Millisecond)
93+
err := p.Init(time.Millisecond, time.Millisecond, time.Millisecond)
9494
if test.initErr != (err != nil) {
9595
t.Fatalf("Unexpected error, got: %v, want: %v", err, test.initErr)
9696
}

pkg/ext-proc/main.go

+16-11
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,10 @@ var (
6969
"refreshMetricsInterval",
7070
runserver.DefaultRefreshMetricsInterval,
7171
"interval to refresh metrics")
72+
refreshPrometheusMetricsInterval = flag.Duration(
73+
"refreshPrometheusMetricsInterval",
74+
runserver.DefaultRefreshPrometheusMetricsInterval,
75+
"interval to flush prometheus metrics")
7276

7377
scheme = runtime.NewScheme()
7478
)
@@ -102,17 +106,18 @@ func main() {
102106
datastore := backend.NewK8sDataStore()
103107

104108
serverRunner := &runserver.ExtProcServerRunner{
105-
GrpcPort: *grpcPort,
106-
TargetEndpointKey: *targetEndpointKey,
107-
PoolName: *poolName,
108-
PoolNamespace: *poolNamespace,
109-
ServiceName: *serviceName,
110-
Zone: *zone,
111-
RefreshPodsInterval: *refreshPodsInterval,
112-
RefreshMetricsInterval: *refreshMetricsInterval,
113-
Scheme: scheme,
114-
Config: ctrl.GetConfigOrDie(),
115-
Datastore: datastore,
109+
GrpcPort: *grpcPort,
110+
TargetEndpointKey: *targetEndpointKey,
111+
PoolName: *poolName,
112+
PoolNamespace: *poolNamespace,
113+
ServiceName: *serviceName,
114+
Zone: *zone,
115+
RefreshPodsInterval: *refreshPodsInterval,
116+
RefreshMetricsInterval: *refreshMetricsInterval,
117+
RefreshPrometheusMetricsInterval: *refreshPrometheusMetricsInterval,
118+
Scheme: scheme,
119+
Config: ctrl.GetConfigOrDie(),
120+
Datastore: datastore,
116121
}
117122
serverRunner.Setup()
118123

pkg/ext-proc/metrics/README.md

+2
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,8 @@ spec:
4646
| inference_model_response_sizes | Distribution | Distribution of response size in bytes. | `model_name`=&lt;model-name&gt; <br> `target_model_name`=&lt;target-model-name&gt; | ALPHA |
4747
| inference_model_input_tokens | Distribution | Distribution of input token count. | `model_name`=&lt;model-name&gt; <br> `target_model_name`=&lt;target-model-name&gt; | ALPHA |
4848
| inference_model_output_tokens | Distribution | Distribution of output token count. | `model_name`=&lt;model-name&gt; <br> `target_model_name`=&lt;target-model-name&gt; | ALPHA |
49+
| inference_pool_average_kv_cache_utilization | Gauge | The average kv cache utilization for an inference server pool. | `name`=&lt;inference-pool-name&gt; | ALPHA |
50+
| inference_pool_average_queue_size | Gauge | The average number of requests pending in the model server queue. | `name`=&lt;inference-pool-name&gt; | ALPHA |
4951

5052
## Scrape Metrics
5153

pkg/ext-proc/metrics/metrics.go

+34
Original file line numberDiff line numberDiff line change
@@ -11,9 +11,11 @@ import (
1111

1212
const (
1313
InferenceModelComponent = "inference_model"
14+
InferencePoolComponent = "inference_pool"
1415
)
1516

1617
var (
18+
// Inference Model Metrics
1719
requestCounter = compbasemetrics.NewCounterVec(
1820
&compbasemetrics.CounterOpts{
1921
Subsystem: InferenceModelComponent,
@@ -88,6 +90,27 @@ var (
8890
},
8991
[]string{"model_name", "target_model_name"},
9092
)
93+
94+
// Inference Pool Metrics
95+
inferencePoolAvgKVCache = compbasemetrics.NewGaugeVec(
96+
&compbasemetrics.GaugeOpts{
97+
Subsystem: InferencePoolComponent,
98+
Name: "average_kv_cache_utilization",
99+
Help: "The average kv cache utilization for an inference server pool.",
100+
StabilityLevel: compbasemetrics.ALPHA,
101+
},
102+
[]string{"name"},
103+
)
104+
105+
inferencePoolAvgQueueSize = compbasemetrics.NewGaugeVec(
106+
&compbasemetrics.GaugeOpts{
107+
Subsystem: InferencePoolComponent,
108+
Name: "average_queue_size",
109+
Help: "The average number of requests pending in the model server queue.",
110+
StabilityLevel: compbasemetrics.ALPHA,
111+
},
112+
[]string{"name"},
113+
)
91114
)
92115

93116
var registerMetrics sync.Once
@@ -101,6 +124,9 @@ func Register() {
101124
legacyregistry.MustRegister(responseSizes)
102125
legacyregistry.MustRegister(inputTokens)
103126
legacyregistry.MustRegister(outputTokens)
127+
128+
legacyregistry.MustRegister(inferencePoolAvgKVCache)
129+
legacyregistry.MustRegister(inferencePoolAvgQueueSize)
104130
})
105131
}
106132

@@ -143,3 +169,11 @@ func RecordOutputTokens(modelName, targetModelName string, size int) {
143169
outputTokens.WithLabelValues(modelName, targetModelName).Observe(float64(size))
144170
}
145171
}
172+
173+
func RecordInferencePoolAvgKVCache(name string, utilization float64) {
174+
inferencePoolAvgKVCache.WithLabelValues(name).Set(utilization)
175+
}
176+
177+
func RecordInferencePoolAvgQueueSize(name string, queueSize float64) {
178+
inferencePoolAvgQueueSize.WithLabelValues(name).Set(queueSize)
179+
}

pkg/ext-proc/metrics/metrics_test.go

+52
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@ const RequestSizesMetric = InferenceModelComponent + "_request_sizes"
1515
const ResponseSizesMetric = InferenceModelComponent + "_response_sizes"
1616
const InputTokensMetric = InferenceModelComponent + "_input_tokens"
1717
const OutputTokensMetric = InferenceModelComponent + "_output_tokens"
18+
const KVCacheAvgUsageMetric = InferencePoolComponent + "_average_kv_cache_utilization"
19+
const QueueAvgSizeMetric = InferencePoolComponent + "_average_queue_size"
1820

1921
func TestRecordRequestCounterandSizes(t *testing.T) {
2022
type requests struct {
@@ -257,3 +259,53 @@ func TestRecordResponseMetrics(t *testing.T) {
257259
})
258260
}
259261
}
262+
263+
func TestInferencePoolMetrics(t *testing.T) {
264+
scenarios := []struct {
265+
name string
266+
poolName string
267+
kvCacheAvg float64
268+
queueSizeAvg float64
269+
}{
270+
{
271+
name: "basic test",
272+
poolName: "p1",
273+
kvCacheAvg: 0.3,
274+
queueSizeAvg: 0.4,
275+
},
276+
}
277+
Register()
278+
for _, scenario := range scenarios {
279+
t.Run(scenario.name, func(t *testing.T) {
280+
281+
RecordInferencePoolAvgKVCache(scenario.poolName, scenario.kvCacheAvg)
282+
RecordInferencePoolAvgQueueSize(scenario.poolName, scenario.queueSizeAvg)
283+
284+
wantKVCache, err := os.Open("testdata/kv_cache_avg_metrics")
285+
defer func() {
286+
if err := wantKVCache.Close(); err != nil {
287+
t.Error(err)
288+
}
289+
}()
290+
if err != nil {
291+
t.Fatal(err)
292+
}
293+
if err := testutil.GatherAndCompare(legacyregistry.DefaultGatherer, wantKVCache, KVCacheAvgUsageMetric); err != nil {
294+
t.Error(err)
295+
}
296+
297+
wantQueueSize, err := os.Open("testdata/queue_avg_size_metrics")
298+
defer func() {
299+
if err := wantQueueSize.Close(); err != nil {
300+
t.Error(err)
301+
}
302+
}()
303+
if err != nil {
304+
t.Fatal(err)
305+
}
306+
if err := testutil.GatherAndCompare(legacyregistry.DefaultGatherer, wantQueueSize, QueueAvgSizeMetric); err != nil {
307+
t.Error(err)
308+
}
309+
})
310+
}
311+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
# HELP inference_pool_average_kv_cache_utilization [ALPHA] The average kv cache utilization for an inference server pool.
2+
# TYPE inference_pool_average_kv_cache_utilization gauge
3+
inference_pool_average_kv_cache_utilization{name="p1"} 0.3
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
# HELP inference_pool_average_queue_size [ALPHA] The average number of requests pending in the model server queue.
2+
# TYPE inference_pool_average_queue_size gauge
3+
inference_pool_average_queue_size{name="p1"} 0.4

pkg/ext-proc/server/runserver.go

+32-29
Original file line numberDiff line numberDiff line change
@@ -19,42 +19,45 @@ import (
1919

2020
// ExtProcServerRunner provides methods to manage an external process server.
2121
type ExtProcServerRunner struct {
22-
GrpcPort int
23-
TargetEndpointKey string
24-
PoolName string
25-
PoolNamespace string
26-
ServiceName string
27-
Zone string
28-
RefreshPodsInterval time.Duration
29-
RefreshMetricsInterval time.Duration
30-
Scheme *runtime.Scheme
31-
Config *rest.Config
32-
Datastore *backend.K8sDatastore
33-
manager ctrl.Manager
22+
GrpcPort int
23+
TargetEndpointKey string
24+
PoolName string
25+
PoolNamespace string
26+
ServiceName string
27+
Zone string
28+
RefreshPodsInterval time.Duration
29+
RefreshMetricsInterval time.Duration
30+
RefreshPrometheusMetricsInterval time.Duration
31+
Scheme *runtime.Scheme
32+
Config *rest.Config
33+
Datastore *backend.K8sDatastore
34+
manager ctrl.Manager
3435
}
3536

3637
// Default values for CLI flags in main
3738
const (
38-
DefaultGrpcPort = 9002 // default for --grpcPort
39-
DefaultTargetEndpointKey = "x-gateway-destination-endpoint" // default for --targetEndpointKey
40-
DefaultPoolName = "" // required but no default
41-
DefaultPoolNamespace = "default" // default for --poolNamespace
42-
DefaultServiceName = "" // required but no default
43-
DefaultZone = "" // default for --zone
44-
DefaultRefreshPodsInterval = 10 * time.Second // default for --refreshPodsInterval
45-
DefaultRefreshMetricsInterval = 50 * time.Millisecond // default for --refreshMetricsInterval
39+
DefaultGrpcPort = 9002 // default for --grpcPort
40+
DefaultTargetEndpointKey = "x-gateway-destination-endpoint" // default for --targetEndpointKey
41+
DefaultPoolName = "" // required but no default
42+
DefaultPoolNamespace = "default" // default for --poolNamespace
43+
DefaultServiceName = "" // required but no default
44+
DefaultZone = "" // default for --zone
45+
DefaultRefreshPodsInterval = 10 * time.Second // default for --refreshPodsInterval
46+
DefaultRefreshMetricsInterval = 50 * time.Millisecond // default for --refreshMetricsInterval
47+
DefaultRefreshPrometheusMetricsInterval = 5 * time.Second // default for --refreshPrometheusMetricsInterval
4648
)
4749

4850
func NewDefaultExtProcServerRunner() *ExtProcServerRunner {
4951
return &ExtProcServerRunner{
50-
GrpcPort: DefaultGrpcPort,
51-
TargetEndpointKey: DefaultTargetEndpointKey,
52-
PoolName: DefaultPoolName,
53-
PoolNamespace: DefaultPoolNamespace,
54-
ServiceName: DefaultServiceName,
55-
Zone: DefaultZone,
56-
RefreshPodsInterval: DefaultRefreshPodsInterval,
57-
RefreshMetricsInterval: DefaultRefreshMetricsInterval,
52+
GrpcPort: DefaultGrpcPort,
53+
TargetEndpointKey: DefaultTargetEndpointKey,
54+
PoolName: DefaultPoolName,
55+
PoolNamespace: DefaultPoolNamespace,
56+
ServiceName: DefaultServiceName,
57+
Zone: DefaultZone,
58+
RefreshPodsInterval: DefaultRefreshPodsInterval,
59+
RefreshMetricsInterval: DefaultRefreshMetricsInterval,
60+
RefreshPrometheusMetricsInterval: DefaultRefreshPrometheusMetricsInterval,
5861
// Scheme, Config, and Datastore can be assigned later.
5962
}
6063
}
@@ -123,7 +126,7 @@ func (r *ExtProcServerRunner) Start(
123126

124127
// Initialize backend provider
125128
pp := backend.NewProvider(podMetricsClient, podDatastore)
126-
if err := pp.Init(r.RefreshPodsInterval, r.RefreshMetricsInterval); err != nil {
129+
if err := pp.Init(r.RefreshPodsInterval, r.RefreshMetricsInterval, r.RefreshPrometheusMetricsInterval); err != nil {
127130
klog.Fatalf("Failed to initialize backend provider: %v", err)
128131
}
129132

pkg/ext-proc/test/benchmark/benchmark.go

+7-6
Original file line numberDiff line numberDiff line change
@@ -21,11 +21,12 @@ var (
2121
svrAddr = flag.String("server_address", fmt.Sprintf("localhost:%d", runserver.DefaultGrpcPort), "Address of the ext proc server")
2222
totalRequests = flag.Int("total_requests", 100000, "number of requests to be sent for load test")
2323
// Flags when running a local ext proc server.
24-
numFakePods = flag.Int("num_fake_pods", 200, "number of fake pods when running a local ext proc server")
25-
numModelsPerPod = flag.Int("num_models_per_pod", 5, "number of fake models per pod when running a local ext proc server")
26-
localServer = flag.Bool("local_server", true, "whether to start a local ext proc server")
27-
refreshPodsInterval = flag.Duration("refreshPodsInterval", 10*time.Second, "interval to refresh pods")
28-
refreshMetricsInterval = flag.Duration("refreshMetricsInterval", 50*time.Millisecond, "interval to refresh metrics")
24+
numFakePods = flag.Int("num_fake_pods", 200, "number of fake pods when running a local ext proc server")
25+
numModelsPerPod = flag.Int("num_models_per_pod", 5, "number of fake models per pod when running a local ext proc server")
26+
localServer = flag.Bool("local_server", true, "whether to start a local ext proc server")
27+
refreshPodsInterval = flag.Duration("refreshPodsInterval", 10*time.Second, "interval to refresh pods")
28+
refreshMetricsInterval = flag.Duration("refreshMetricsInterval", 50*time.Millisecond, "interval to refresh metrics via polling pods")
29+
refreshPrometheusMetricsInterval = flag.Duration("refreshPrometheusMetricsInterval", 5*time.Second, "interval to flush prometheus metrics")
2930
)
3031

3132
const (
@@ -37,7 +38,7 @@ func main() {
3738
flag.Parse()
3839

3940
if *localServer {
40-
test.StartExtProc(port, *refreshPodsInterval, *refreshMetricsInterval, fakePods(), fakeModels())
41+
test.StartExtProc(port, *refreshPodsInterval, *refreshMetricsInterval, *refreshPrometheusMetricsInterval, fakePods(), fakeModels())
4142
time.Sleep(time.Second) // wait until server is up
4243
klog.Info("Server started")
4344
}

pkg/ext-proc/test/utils.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ import (
1616
klog "k8s.io/klog/v2"
1717
)
1818

19-
func StartExtProc(port int, refreshPodsInterval, refreshMetricsInterval time.Duration, pods []*backend.PodMetrics, models map[string]*v1alpha1.InferenceModel) *grpc.Server {
19+
func StartExtProc(port int, refreshPodsInterval, refreshMetricsInterval, refreshPrometheusMetricsInterval time.Duration, pods []*backend.PodMetrics, models map[string]*v1alpha1.InferenceModel) *grpc.Server {
2020
ps := make(backend.PodSet)
2121
pms := make(map[backend.Pod]*backend.PodMetrics)
2222
for _, pod := range pods {
@@ -25,7 +25,7 @@ func StartExtProc(port int, refreshPodsInterval, refreshMetricsInterval time.Dur
2525
}
2626
pmc := &backend.FakePodMetricsClient{Res: pms}
2727
pp := backend.NewProvider(pmc, backend.NewK8sDataStore(backend.WithPods(pods)))
28-
if err := pp.Init(refreshPodsInterval, refreshMetricsInterval); err != nil {
28+
if err := pp.Init(refreshPodsInterval, refreshMetricsInterval, refreshPrometheusMetricsInterval); err != nil {
2929
klog.Fatalf("failed to initialize: %v", err)
3030
}
3131
return startExtProc(port, pp, models)

0 commit comments

Comments
 (0)