Skip to content

Commit 1ef6a91

Browse files
committed
The podMetrics updates the targetPort by reading the pool from the datastore
1 parent 27d60cb commit 1ef6a91

File tree

6 files changed

+43
-20
lines changed

6 files changed

+43
-20
lines changed

pkg/epp/backend/metrics/logger.go

+3-1
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,9 @@ type Datastore interface {
4141
PodList(func(PodMetrics) bool) []PodMetrics
4242
}
4343

44-
func LogMetricsPeriodically(ctx context.Context, datastore Datastore, refreshPrometheusMetricsInterval time.Duration) {
44+
// StartMetricsLogger starts goroutines to 1) Print metrics debug logs if the DEBUG log level is
45+
// enabled; 2) flushes Prometheus metrics about the backend servers.
46+
func StartMetricsLogger(ctx context.Context, datastore Datastore, refreshPrometheusMetricsInterval time.Duration) {
4547
logger := log.FromContext(ctx)
4648

4749
// Periodically flush prometheus metrics for inference pool

pkg/epp/backend/metrics/pod_metrics.go

+11-6
Original file line numberDiff line numberDiff line change
@@ -35,11 +35,11 @@ const (
3535
)
3636

3737
type podMetrics struct {
38-
pod unsafe.Pointer // stores a *Pod
39-
metrics unsafe.Pointer // stores a *Metrics
40-
pmc PodMetricsClient
41-
targetPort int32 // metrics endpoint port in the pod
42-
interval time.Duration
38+
pod unsafe.Pointer // stores a *Pod
39+
metrics unsafe.Pointer // stores a *Metrics
40+
pmc PodMetricsClient
41+
ds Datastore
42+
interval time.Duration
4343

4444
parentCtx context.Context
4545
once sync.Once // ensure the StartRefreshLoop is only called once.
@@ -101,9 +101,14 @@ func (pm *podMetrics) startRefreshLoop() {
101101
}
102102

103103
func (pm *podMetrics) refreshMetrics() error {
104+
pool, err := pm.ds.PoolGet()
105+
if err != nil {
106+
// No inference pool or not initialize.
107+
return err
108+
}
104109
ctx, cancel := context.WithTimeout(context.Background(), fetchMetricsTimeout)
105110
defer cancel()
106-
updated, err := pm.pmc.FetchMetrics(ctx, pm.GetPod(), pm.GetMetrics(), pm.targetPort)
111+
updated, err := pm.pmc.FetchMetrics(ctx, pm.GetPod(), pm.GetMetrics(), pool.Spec.TargetPortNumber)
107112
if err != nil {
108113
// As refresher is running in the background, it's possible that the pod is deleted but
109114
// the refresh goroutine doesn't read the done channel yet. In this case, we just return nil.

pkg/epp/backend/metrics/pod_metrics_test.go

+17-1
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import (
2626
corev1 "k8s.io/api/core/v1"
2727
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2828
"k8s.io/apimachinery/pkg/types"
29+
"sigs.k8s.io/gateway-api-inference-extension/api/v1alpha2"
2930
)
3031

3132
var (
@@ -59,8 +60,9 @@ func TestMetricsRefresh(t *testing.T) {
5960
ctx := context.Background()
6061
pmc := &FakePodMetricsClient{}
6162
pmf := NewPodMetricsFactory(pmc, time.Millisecond)
63+
6264
// The refresher is initialized with empty metrics.
63-
pm := pmf.NewPodMetrics(ctx, pod1, 8000)
65+
pm := pmf.NewPodMetrics(ctx, pod1, &fakeDataStore{})
6466

6567
namespacedName := types.NamespacedName{Name: pod1.Name, Namespace: pod1.Namespace}
6668
// Use SetRes to simulate an update of metrics from the pod.
@@ -78,3 +80,17 @@ func TestMetricsRefresh(t *testing.T) {
7880
// Still expect the same condition (no metrics update).
7981
assert.EventuallyWithT(t, condition, time.Second, time.Millisecond)
8082
}
83+
84+
type fakeDataStore struct{}
85+
86+
func (f *fakeDataStore) PoolGet() (*v1alpha2.InferencePool, error) {
87+
return &v1alpha2.InferencePool{Spec: v1alpha2.InferencePoolSpec{TargetPortNumber: 8000}}, nil
88+
}
89+
func (f *fakeDataStore) PodGetAll() []PodMetrics {
90+
// Not implemented.
91+
return nil
92+
}
93+
func (f *fakeDataStore) PodList(func(PodMetrics) bool) []PodMetrics {
94+
// Not implemented.
95+
return nil
96+
}

pkg/epp/backend/metrics/types.go

+10-10
Original file line numberDiff line numberDiff line change
@@ -41,17 +41,17 @@ type PodMetricsFactory struct {
4141
refreshMetricsInterval time.Duration
4242
}
4343

44-
func (f *PodMetricsFactory) NewPodMetrics(parentCtx context.Context, in *corev1.Pod, port int32) PodMetrics {
44+
func (f *PodMetricsFactory) NewPodMetrics(parentCtx context.Context, in *corev1.Pod, ds Datastore) PodMetrics {
4545
pm := &podMetrics{
46-
pod: unsafe.Pointer(toInternalPod(in)),
47-
metrics: unsafe.Pointer(newMetrics()),
48-
pmc: f.pmc,
49-
targetPort: port,
50-
interval: f.refreshMetricsInterval,
51-
parentCtx: parentCtx,
52-
once: sync.Once{},
53-
done: make(chan struct{}),
54-
logger: log.FromContext(parentCtx),
46+
pod: unsafe.Pointer(toInternalPod(in)),
47+
metrics: unsafe.Pointer(newMetrics()),
48+
pmc: f.pmc,
49+
ds: ds,
50+
interval: f.refreshMetricsInterval,
51+
parentCtx: parentCtx,
52+
once: sync.Once{},
53+
done: make(chan struct{}),
54+
logger: log.FromContext(parentCtx),
5555
}
5656
pm.startRefreshLoop()
5757
return pm

pkg/epp/datastore/datastore.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -238,7 +238,7 @@ func (ds *datastore) PodUpdateOrAddIfNotExist(pod *corev1.Pod, pool *v1alpha2.In
238238
var pm backendmetrics.PodMetrics
239239
existing, ok := ds.pods.Load(namespacedName)
240240
if !ok {
241-
pm = ds.pmf.NewPodMetrics(ds.parentCtx, pod, pool.Spec.TargetPortNumber)
241+
pm = ds.pmf.NewPodMetrics(ds.parentCtx, pod, ds)
242242
ds.pods.Store(namespacedName, pm)
243243
} else {
244244
pm = existing.(backendmetrics.PodMetrics)

pkg/epp/server/runserver.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -121,7 +121,7 @@ func (r *ExtProcServerRunner) SetupWithManager(ctx context.Context, mgr ctrl.Man
121121
// The runnable implements LeaderElectionRunnable with leader election disabled.
122122
func (r *ExtProcServerRunner) AsRunnable(logger logr.Logger) manager.Runnable {
123123
return runnable.NoLeaderElection(manager.RunnableFunc(func(ctx context.Context) error {
124-
backendmetrics.LogMetricsPeriodically(ctx, r.Datastore, r.RefreshPrometheusMetricsInterval)
124+
backendmetrics.StartMetricsLogger(ctx, r.Datastore, r.RefreshPrometheusMetricsInterval)
125125
var srv *grpc.Server
126126
if r.SecureServing {
127127
var cert tls.Certificate

0 commit comments

Comments
 (0)