Skip to content

Commit a70d66e

Browse files
authored
Each pod has independent loops to refresh metrics (#460)
* Each pod has independent loops to refresh metrics * Major refactoring, move metrics logic from datastore to backend/metrics package * Address comments * Fix test and fmt * The podMetrics updates the targetPort by reading the pool from the datastore
1 parent 23bab8c commit a70d66e

32 files changed

+1115
-1098
lines changed

Makefile

+1-1
Original file line numberDiff line numberDiff line change
@@ -119,7 +119,7 @@ vet: ## Run go vet against code.
119119

120120
.PHONY: test
121121
test: manifests generate fmt vet envtest ## Run tests.
122-
KUBEBUILDER_ASSETS="$(shell $(ENVTEST) use $(ENVTEST_K8S_VERSION) --bin-dir $(LOCALBIN) -p path)" go test $$(go list ./... | grep -v /e2e) -coverprofile cover.out
122+
KUBEBUILDER_ASSETS="$(shell $(ENVTEST) use $(ENVTEST_K8S_VERSION) --bin-dir $(LOCALBIN) -p path)" go test $$(go list ./... | grep -v /e2e) -race -coverprofile cover.out
123123

124124
.PHONY: test-integration
125125
test-integration: manifests generate fmt vet envtest ## Run tests.

cmd/epp/main.go

+4-6
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ import (
3737
"sigs.k8s.io/controller-runtime/pkg/manager"
3838
"sigs.k8s.io/controller-runtime/pkg/metrics/filters"
3939
"sigs.k8s.io/gateway-api-inference-extension/internal/runnable"
40-
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend"
40+
backendmetrics "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend/metrics"
4141
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend/vllm"
4242
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datastore"
4343
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/metrics"
@@ -143,22 +143,20 @@ func run() error {
143143

144144
ctx := ctrl.SetupSignalHandler()
145145

146+
pmf := backendmetrics.NewPodMetricsFactory(&vllm.PodMetricsClientImpl{}, *refreshMetricsInterval)
146147
// Setup runner.
147-
datastore := datastore.NewDatastore()
148-
provider := backend.NewProvider(&vllm.PodMetricsClientImpl{}, datastore)
148+
datastore := datastore.NewDatastore(ctx, pmf)
149149
serverRunner := &runserver.ExtProcServerRunner{
150150
GrpcPort: *grpcPort,
151151
DestinationEndpointHintMetadataNamespace: *destinationEndpointHintMetadataNamespace,
152152
DestinationEndpointHintKey: *destinationEndpointHintKey,
153153
PoolName: *poolName,
154154
PoolNamespace: *poolNamespace,
155-
RefreshMetricsInterval: *refreshMetricsInterval,
156-
RefreshPrometheusMetricsInterval: *refreshPrometheusMetricsInterval,
157155
Datastore: datastore,
158156
SecureServing: *secureServing,
159157
CertPath: *certPath,
160-
Provider: provider,
161158
UseStreaming: useStreamingServer,
159+
RefreshPrometheusMetricsInterval: *refreshPrometheusMetricsInterval,
162160
}
163161
if err := serverRunner.SetupWithManager(ctx, mgr); err != nil {
164162
setupLog.Error(err, "Failed to setup ext-proc controllers")

pkg/epp/backend/fake.go

-48
This file was deleted.

pkg/epp/backend/metrics/fake.go

+90
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
/*
2+
Copyright 2025 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package metrics
18+
19+
import (
20+
"context"
21+
"fmt"
22+
"sync"
23+
24+
corev1 "k8s.io/api/core/v1"
25+
"k8s.io/apimachinery/pkg/types"
26+
"sigs.k8s.io/controller-runtime/pkg/log"
27+
"sigs.k8s.io/gateway-api-inference-extension/api/v1alpha2"
28+
logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/logging"
29+
)
30+
31+
// FakePodMetrics is an implementation of PodMetrics that doesn't run the async refresh loop.
32+
type FakePodMetrics struct {
33+
Pod *Pod
34+
Metrics *Metrics
35+
}
36+
37+
func (fpm *FakePodMetrics) GetPod() *Pod {
38+
return fpm.Pod
39+
}
40+
func (fpm *FakePodMetrics) GetMetrics() *Metrics {
41+
return fpm.Metrics
42+
}
43+
func (fpm *FakePodMetrics) UpdatePod(pod *corev1.Pod) {
44+
fpm.Pod = toInternalPod(pod)
45+
}
46+
func (fpm *FakePodMetrics) StopRefreshLoop() {} // noop
47+
48+
type FakePodMetricsClient struct {
49+
errMu sync.RWMutex
50+
Err map[types.NamespacedName]error
51+
resMu sync.RWMutex
52+
Res map[types.NamespacedName]*Metrics
53+
}
54+
55+
func (f *FakePodMetricsClient) FetchMetrics(ctx context.Context, pod *Pod, existing *Metrics, port int32) (*Metrics, error) {
56+
f.errMu.RLock()
57+
err, ok := f.Err[pod.NamespacedName]
58+
f.errMu.RUnlock()
59+
if ok {
60+
return nil, err
61+
}
62+
f.resMu.RLock()
63+
res, ok := f.Res[pod.NamespacedName]
64+
f.resMu.RUnlock()
65+
if !ok {
66+
return nil, fmt.Errorf("no pod found: %v", pod.NamespacedName)
67+
}
68+
log.FromContext(ctx).V(logutil.VERBOSE).Info("Fetching metrics for pod", "existing", existing, "new", res)
69+
return res.Clone(), nil
70+
}
71+
72+
func (f *FakePodMetricsClient) SetRes(new map[types.NamespacedName]*Metrics) {
73+
f.resMu.Lock()
74+
defer f.resMu.Unlock()
75+
f.Res = new
76+
}
77+
78+
func (f *FakePodMetricsClient) SetErr(new map[types.NamespacedName]error) {
79+
f.errMu.Lock()
80+
defer f.errMu.Unlock()
81+
f.Err = new
82+
}
83+
84+
type FakeDataStore struct {
85+
Res map[string]*v1alpha2.InferenceModel
86+
}
87+
88+
func (fds *FakeDataStore) FetchModelData(modelName string) (returnModel *v1alpha2.InferenceModel) {
89+
return fds.Res[modelName]
90+
}

pkg/epp/backend/metrics/logger.go

+111
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,111 @@
1+
/*
2+
Copyright 2025 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package metrics
18+
19+
import (
20+
"context"
21+
"time"
22+
23+
"github.com/go-logr/logr"
24+
"sigs.k8s.io/controller-runtime/pkg/log"
25+
"sigs.k8s.io/gateway-api-inference-extension/api/v1alpha2"
26+
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/metrics"
27+
logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/logging"
28+
)
29+
30+
const (
31+
// Note currently the EPP treats stale metrics same as fresh.
32+
// TODO: https://github.com/kubernetes-sigs/gateway-api-inference-extension/issues/336
33+
metricsValidityPeriod = 5 * time.Second
34+
)
35+
36+
type Datastore interface {
37+
PoolGet() (*v1alpha2.InferencePool, error)
38+
// PodMetrics operations
39+
// PodGetAll returns all pods and metrics, including fresh and stale.
40+
PodGetAll() []PodMetrics
41+
PodList(func(PodMetrics) bool) []PodMetrics
42+
}
43+
44+
// StartMetricsLogger starts goroutines to 1) Print metrics debug logs if the DEBUG log level is
45+
// enabled; 2) flushes Prometheus metrics about the backend servers.
46+
func StartMetricsLogger(ctx context.Context, datastore Datastore, refreshPrometheusMetricsInterval time.Duration) {
47+
logger := log.FromContext(ctx)
48+
49+
// Periodically flush prometheus metrics for inference pool
50+
go func() {
51+
for {
52+
select {
53+
case <-ctx.Done():
54+
logger.V(logutil.DEFAULT).Info("Shutting down prometheus metrics thread")
55+
return
56+
default:
57+
time.Sleep(refreshPrometheusMetricsInterval)
58+
flushPrometheusMetricsOnce(logger, datastore)
59+
}
60+
}
61+
}()
62+
63+
// Periodically print out the pods and metrics for DEBUGGING.
64+
if logger := logger.V(logutil.DEBUG); logger.Enabled() {
65+
go func() {
66+
for {
67+
select {
68+
case <-ctx.Done():
69+
logger.V(logutil.DEFAULT).Info("Shutting down metrics logger thread")
70+
return
71+
default:
72+
time.Sleep(5 * time.Second)
73+
podsWithFreshMetrics := datastore.PodList(func(pm PodMetrics) bool {
74+
return time.Since(pm.GetMetrics().UpdateTime) <= metricsValidityPeriod
75+
})
76+
podsWithStaleMetrics := datastore.PodList(func(pm PodMetrics) bool {
77+
return time.Since(pm.GetMetrics().UpdateTime) > metricsValidityPeriod
78+
})
79+
logger.Info("Current Pods and metrics gathered", "fresh metrics", podsWithFreshMetrics, "stale metrics", podsWithStaleMetrics)
80+
}
81+
}
82+
}()
83+
}
84+
}
85+
86+
func flushPrometheusMetricsOnce(logger logr.Logger, datastore Datastore) {
87+
pool, err := datastore.PoolGet()
88+
if err != nil {
89+
// No inference pool or not initialize.
90+
logger.V(logutil.VERBOSE).Info("pool is not initialized, skipping flushing metrics")
91+
return
92+
}
93+
94+
var kvCacheTotal float64
95+
var queueTotal int
96+
97+
podMetrics := datastore.PodGetAll()
98+
logger.V(logutil.VERBOSE).Info("Flushing Prometheus Metrics", "ReadyPods", len(podMetrics))
99+
if len(podMetrics) == 0 {
100+
return
101+
}
102+
103+
for _, pod := range podMetrics {
104+
kvCacheTotal += pod.GetMetrics().KVCacheUsagePercent
105+
queueTotal += pod.GetMetrics().WaitingQueueSize
106+
}
107+
108+
podTotalCount := len(podMetrics)
109+
metrics.RecordInferencePoolAvgKVCache(pool.Name, kvCacheTotal/float64(podTotalCount))
110+
metrics.RecordInferencePoolAvgQueueSize(pool.Name, float64(queueTotal/podTotalCount))
111+
}

0 commit comments

Comments
 (0)