Skip to content

Commit b167c1e

Browse files
committed
[Metrics] Add average kv cache and waiting queue size metrics for
inference pool
1 parent d5f5507 commit b167c1e

File tree

5 files changed

+139
-0
lines changed

5 files changed

+139
-0
lines changed

pkg/ext-proc/backend/provider.go

+47
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"time"
1010

1111
"go.uber.org/multierr"
12+
"inference.networking.x-k8s.io/gateway-api-inference-extension/pkg/ext-proc/metrics"
1213
logutil "inference.networking.x-k8s.io/gateway-api-inference-extension/pkg/ext-proc/util/logging"
1314
corev1 "k8s.io/api/core/v1"
1415
klog "k8s.io/klog/v2"
@@ -89,6 +90,15 @@ func (p *Provider) Init(refreshPodsInterval, refreshMetricsInterval time.Duratio
8990
}
9091
}()
9192

93+
// Periodically flush prometheus metrics for inference pool
94+
flushPrometheusMetricsInterval := 10 * time.Second
95+
go func() {
96+
for {
97+
time.Sleep(flushPrometheusMetricsInterval)
98+
p.flushPrometheusMetricsOnce()
99+
}
100+
}()
101+
92102
// Periodically print out the pods and metrics for DEBUGGING.
93103
if klog.V(logutil.DEBUG).Enabled() {
94104
go func() {
@@ -217,3 +227,40 @@ func (p *Provider) refreshMetricsOnce() error {
217227
}
218228
return errs
219229
}
230+
231+
func (p *Provider) flushPrometheusMetricsOnce() {
232+
klog.V(logutil.DEFAULT).Infof("Flushing Prometheus Metrics")
233+
234+
pool, _ := p.datastore.getInferencePool()
235+
if pool == nil {
236+
// No inference pool or not initialize.
237+
return
238+
}
239+
240+
var kvCacheTotal float64
241+
var queueTotal int
242+
243+
podTotalCount := 0
244+
pods, err := p.datastore.getPods()
245+
if err != nil {
246+
klog.V(logutil.DEFAULT).Infof("Couldn't list pods: %v", err)
247+
return
248+
}
249+
250+
for _, pod := range pods {
251+
if !podIsReady(pod) {
252+
continue
253+
}
254+
podTotalCount++
255+
if val, ok := p.podMetrics.Load(pod.Name); ok {
256+
pm := val.(*PodMetrics)
257+
kvCacheTotal += pm.KVCacheUsagePercent
258+
queueTotal += pm.WaitingQueueSize
259+
}
260+
}
261+
262+
if podTotalCount != 0 {
263+
metrics.RecordInferencePoolAvgKVCache(pool.Name, kvCacheTotal/float64(podTotalCount))
264+
metrics.RecordInferencePoolAvgQueueSize(pool.Name, float64(queueTotal/podTotalCount))
265+
}
266+
}

pkg/ext-proc/metrics/metrics.go

+34
Original file line numberDiff line numberDiff line change
@@ -11,9 +11,11 @@ import (
1111

1212
const (
1313
InferenceModelComponent = "inference_model"
14+
InferencePoolComponent = "inference_pool"
1415
)
1516

1617
var (
18+
// Inference Model Metrics
1719
requestCounter = compbasemetrics.NewCounterVec(
1820
&compbasemetrics.CounterOpts{
1921
Subsystem: InferenceModelComponent,
@@ -88,6 +90,27 @@ var (
8890
},
8991
[]string{"model_name", "target_model_name"},
9092
)
93+
94+
// Inference Pool Metrics
95+
inferencePoolAvgKVCache = compbasemetrics.NewGaugeVec(
96+
&compbasemetrics.GaugeOpts{
97+
Subsystem: InferencePoolComponent,
98+
Name: "average_kv_cache_utilization",
99+
Help: "The average kv cache utilization for an inference server pool.",
100+
StabilityLevel: compbasemetrics.ALPHA,
101+
},
102+
[]string{"name"},
103+
)
104+
105+
inferencePoolAvgQueueSize = compbasemetrics.NewGaugeVec(
106+
&compbasemetrics.GaugeOpts{
107+
Subsystem: InferencePoolComponent,
108+
Name: "average_queue_size",
109+
Help: " The average number of requests pending in the model server queue.",
110+
StabilityLevel: compbasemetrics.ALPHA,
111+
},
112+
[]string{"name"},
113+
)
91114
)
92115

93116
var registerMetrics sync.Once
@@ -101,6 +124,9 @@ func Register() {
101124
legacyregistry.MustRegister(responseSizes)
102125
legacyregistry.MustRegister(inputTokens)
103126
legacyregistry.MustRegister(outputTokens)
127+
128+
legacyregistry.MustRegister(inferencePoolAvgKVCache)
129+
legacyregistry.MustRegister(inferencePoolAvgQueueSize)
104130
})
105131
}
106132

@@ -143,3 +169,11 @@ func RecordOutputTokens(modelName, targetModelName string, size int) {
143169
outputTokens.WithLabelValues(modelName, targetModelName).Observe(float64(size))
144170
}
145171
}
172+
173+
func RecordInferencePoolAvgKVCache(name string, utilization float64) {
174+
inferencePoolAvgKVCache.WithLabelValues(name).Set(utilization)
175+
}
176+
177+
func RecordInferencePoolAvgQueueSize(name string, utilization float64) {
178+
inferencePoolAvgQueueSize.WithLabelValues(name).Set(utilization)
179+
}

pkg/ext-proc/metrics/metrics_test.go

+52
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@ const RequestSizesMetric = InferenceModelComponent + "_request_sizes"
1515
const ResponseSizesMetric = InferenceModelComponent + "_response_sizes"
1616
const InputTokensMetric = InferenceModelComponent + "_input_tokens"
1717
const OutputTokensMetric = InferenceModelComponent + "_output_tokens"
18+
const KVCacheAvgUsageMetric = InferencePoolComponent + "_average_kv_cache_utilization"
19+
const QueueAvgSizeMetric = InferencePoolComponent + "_average_queue_size"
1820

1921
func TestRecordRequestCounterandSizes(t *testing.T) {
2022
type requests struct {
@@ -257,3 +259,53 @@ func TestRecordResponseMetrics(t *testing.T) {
257259
})
258260
}
259261
}
262+
263+
func TestInferencePoolMetrics(t *testing.T) {
264+
scenarios := []struct {
265+
name string
266+
poolName string
267+
kvCacheAvg float64
268+
queueSizeAvg float64
269+
}{
270+
{
271+
name: "basic test",
272+
poolName: "p1",
273+
kvCacheAvg: 0.3,
274+
queueSizeAvg: 0.4,
275+
},
276+
}
277+
Register()
278+
for _, scenario := range scenarios {
279+
t.Run(scenario.name, func(t *testing.T) {
280+
281+
RecordInferencePoolAvgKVCache(scenario.poolName, scenario.kvCacheAvg)
282+
RecordInferencePoolAvgQueueSize(scenario.poolName, scenario.queueSizeAvg)
283+
284+
wantKVCache, err := os.Open("testdata/kv_cache_avg_metrics")
285+
defer func() {
286+
if err := wantKVCache.Close(); err != nil {
287+
t.Error(err)
288+
}
289+
}()
290+
if err != nil {
291+
t.Fatal(err)
292+
}
293+
if err := testutil.GatherAndCompare(legacyregistry.DefaultGatherer, wantKVCache, KVCacheAvgUsageMetric); err != nil {
294+
t.Error(err)
295+
}
296+
297+
wantQueueSize, err := os.Open("testdata/queue_avg_size_metrics")
298+
defer func() {
299+
if err := wantQueueSize.Close(); err != nil {
300+
t.Error(err)
301+
}
302+
}()
303+
if err != nil {
304+
t.Fatal(err)
305+
}
306+
if err := testutil.GatherAndCompare(legacyregistry.DefaultGatherer, wantQueueSize, QueueAvgSizeMetric); err != nil {
307+
t.Error(err)
308+
}
309+
})
310+
}
311+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
# HELP inference_pool_average_kv_cache_utilization [ALPHA] The average kv cache utilization for an inference server pool.
2+
# TYPE inference_pool_average_kv_cache_utilization gauge
3+
inference_pool_average_kv_cache_utilization{name="p1"} 0.3
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
# HELP inference_pool_average_queue_size [ALPHA] The average number of requests pending in the model server queue.
2+
# TYPE inference_pool_average_queue_size gauge
3+
inference_pool_average_queue_size{name="p1"} 0.4

0 commit comments

Comments
 (0)