Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Each pod has independent loops to refresh metrics #460

Merged
merged 5 commits into from
Mar 10, 2025
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ vet: ## Run go vet against code.

.PHONY: test
test: manifests generate fmt vet envtest ## Run tests.
KUBEBUILDER_ASSETS="$(shell $(ENVTEST) use $(ENVTEST_K8S_VERSION) --bin-dir $(LOCALBIN) -p path)" go test $$(go list ./... | grep -v /e2e) -coverprofile cover.out
KUBEBUILDER_ASSETS="$(shell $(ENVTEST) use $(ENVTEST_K8S_VERSION) --bin-dir $(LOCALBIN) -p path)" go test $$(go list ./... | grep -v /e2e) -race -coverprofile cover.out

.PHONY: test-integration
test-integration: manifests generate fmt vet envtest ## Run tests.
Expand Down
10 changes: 4 additions & 6 deletions cmd/epp/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/metrics/filters"
"sigs.k8s.io/gateway-api-inference-extension/internal/runnable"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend"
backendmetrics "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend/metrics"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend/vllm"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datastore"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/metrics"
Expand Down Expand Up @@ -143,22 +143,20 @@ func run() error {

ctx := ctrl.SetupSignalHandler()

pmf := backendmetrics.NewPodMetricsFactory(&vllm.PodMetricsClientImpl{}, *refreshMetricsInterval)
// Setup runner.
datastore := datastore.NewDatastore()
provider := backend.NewProvider(&vllm.PodMetricsClientImpl{}, datastore)
datastore := datastore.NewDatastore(ctx, pmf)
serverRunner := &runserver.ExtProcServerRunner{
GrpcPort: *grpcPort,
DestinationEndpointHintMetadataNamespace: *destinationEndpointHintMetadataNamespace,
DestinationEndpointHintKey: *destinationEndpointHintKey,
PoolName: *poolName,
PoolNamespace: *poolNamespace,
RefreshMetricsInterval: *refreshMetricsInterval,
RefreshPrometheusMetricsInterval: *refreshPrometheusMetricsInterval,
Datastore: datastore,
SecureServing: *secureServing,
CertPath: *certPath,
Provider: provider,
UseStreaming: useStreamingServer,
RefreshPrometheusMetricsInterval: *refreshPrometheusMetricsInterval,
}
if err := serverRunner.SetupWithManager(ctx, mgr); err != nil {
setupLog.Error(err, "Failed to setup ext-proc controllers")
Expand Down
57 changes: 49 additions & 8 deletions pkg/epp/backend/fake.go → pkg/epp/backend/metrics/fake.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,29 +14,70 @@ See the License for the specific language governing permissions and
limitations under the License.
*/

package backend
package metrics

import (
"context"
"fmt"
"sync"

"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/gateway-api-inference-extension/api/v1alpha2"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datastore"
logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/logging"
)

// FakePodMetrics is an implementation of PodMetrics that doesn't run the async refresh loop.
type FakePodMetrics struct {
Pod *Pod
Metrics *Metrics
}

func (fpm *FakePodMetrics) GetPod() *Pod {
return fpm.Pod
}
func (fpm *FakePodMetrics) GetMetrics() *Metrics {
return fpm.Metrics
}
func (fpm *FakePodMetrics) UpdatePod(pod *Pod) {
fpm.Pod = pod
}
func (fpm *FakePodMetrics) StopRefreshLoop() {} // noop

type FakePodMetricsClient struct {
Err map[types.NamespacedName]error
Res map[types.NamespacedName]*datastore.PodMetrics
errMu sync.RWMutex
Err map[types.NamespacedName]error
resMu sync.RWMutex
Res map[types.NamespacedName]*Metrics
}

func (f *FakePodMetricsClient) FetchMetrics(ctx context.Context, existing *datastore.PodMetrics, port int32) (*datastore.PodMetrics, error) {
if err, ok := f.Err[existing.NamespacedName]; ok {
func (f *FakePodMetricsClient) FetchMetrics(ctx context.Context, pod *Pod, existing *Metrics, port int32) (*Metrics, error) {
f.errMu.RLock()
err, ok := f.Err[pod.NamespacedName]
f.errMu.RUnlock()
if ok {
return nil, err
}
log.FromContext(ctx).V(logutil.VERBOSE).Info("Fetching metrics for pod", "existing", existing, "new", f.Res[existing.NamespacedName])
return f.Res[existing.NamespacedName], nil
f.resMu.RLock()
res, ok := f.Res[pod.NamespacedName]
f.resMu.RUnlock()
if !ok {
return nil, fmt.Errorf("no pod found: %v", pod.NamespacedName)
}
log.FromContext(ctx).V(logutil.VERBOSE).Info("Fetching metrics for pod", "existing", existing, "new", res)
return res.Clone(), nil
}

func (f *FakePodMetricsClient) SetRes(new map[types.NamespacedName]*Metrics) {
f.resMu.Lock()
defer f.resMu.Unlock()
f.Res = new
}

func (f *FakePodMetricsClient) SetErr(new map[types.NamespacedName]error) {
f.errMu.Lock()
defer f.errMu.Unlock()
f.Err = new
}

type FakeDataStore struct {
Expand Down
61 changes: 61 additions & 0 deletions pkg/epp/backend/metrics/logger.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
Copyright 2025 The Kubernetes Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package metrics

import (
"context"
"time"

"sigs.k8s.io/controller-runtime/pkg/log"
logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/logging"
)

const (
// Note currently the EPP treats stale metrics same as fresh.
// TODO: https://github.com/kubernetes-sigs/gateway-api-inference-extension/issues/336
metricsValidityPeriod = 5 * time.Second
)

type Datastore interface {
PodList(func(PodMetrics) bool) []PodMetrics
}

func PrintMetricsForDebugging(ctx context.Context, datastore Datastore) {
logger := log.FromContext(ctx)

// Periodically print out the pods and metrics for DEBUGGING.
if logger := logger.V(logutil.DEBUG); logger.Enabled() {
go func() {
for {
select {
case <-ctx.Done():
logger.V(logutil.DEFAULT).Info("Shutting down metrics logger thread")
return
default:
time.Sleep(5 * time.Second)
podsWithFreshMetrics := datastore.PodList(func(pm PodMetrics) bool {
return time.Since(pm.GetMetrics().UpdateTime) <= metricsValidityPeriod
})
podsWithStaleMetrics := datastore.PodList(func(pm PodMetrics) bool {
return time.Since(pm.GetMetrics().UpdateTime) > metricsValidityPeriod
})
logger.Info("Current Pods and metrics gathered", "fresh metrics", podsWithFreshMetrics, "stale metrics", podsWithStaleMetrics)
}
}
}()
}
}
112 changes: 112 additions & 0 deletions pkg/epp/backend/metrics/pod_metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
/*
Copyright 2025 The Kubernetes Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package metrics

import (
"context"
"sync"
"sync/atomic"
"time"
"unsafe"

"github.com/go-logr/logr"

logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/logging"
)

const (
fetchMetricsTimeout = 5 * time.Second
)

type podMetrics struct {
pod unsafe.Pointer // stores a *Pod
metrics unsafe.Pointer // stores a *Metrics
pmc PodMetricsClient
targetPort int32 // metrics endpoint port in the pod
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If port on the pool changes, we will need to update this. To avoid this, I suggest to add the GetPool function to the Datastore interface you defined in this pkg to get the port directly instead of storing it here and risking it getting stale.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should separate the concerns. It's the reconcilers responsibility to make sure it terminates a stale PodMetrics and create a new one with the new port number. The PodMetrics is purely responsible for refreshing metrics and has no knowledge of the datastore

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to balance that with complexity, storing the port in every entry and updating it for all entries on every change to the pool in redundant and error prone. The port number is defined by the inference pool, it is reasonable to call into the store to get the port number.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would prefer keeping the podMetrics as simple as possible, i.e., keep the metrics of a pod fresh. It should not depend on the datastore. Someone needs to be responsible for properly managing the lifecycle of these refreshers, and IMO the "someone" should be the reconcilers who manage the pool and pod lifecycles. Otherwise this PodMetrics becomes another "reconciler" to some extent, it needs to monitor the pool updates and update itself accordingly.

Note the PodMetrics doesn't depends on the Datastore currently. The Datastore interface in this package is only used by the logger to print debug logs. Also note that just letting the PodMetrics have access to GetPool won't fix the stale problem, it needs to monitor pool updates.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As it is right now, this PR introduces a regression, if the port changes on the pool, it will not be reflected here, how do you suggest we address this?

Also note that just letting the PodMetrics have access to GetPool won't fix the stale problem, it needs to monitor pool updates.

It does fix the problem: when the pool gets updated, the port stored in the pool object will. The existing code gets the pool on every scrape, and so it always work with the most updated version of the pool. In the new code, we will need to pass the Datastore interface to NewPodMetrics func, and in refreshMetrics we get the pool at the beginning to obtain the port.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if the port changes on the pool, it will not be reflected here, how do you suggest we address this?

Ah I see what you mean. I wasn't aware of the fact that we currently don't resync if the port changes in the pool so this would lead to a regression. Sorry for that confusion.

One option is what you suggested. This will lead to frequent mutex access to the pool, but probably OK as writes to the pool should be very infrequent. The other option is to expose an interface on the PodMetrics to update the port, like we do with the UpdatePod interface today, but this requires careful control logic on the pool reconciler. And if there is a new property in the future we should watch for, we need to remember to add that as well, unless we unconditionally resync all pods upon any change in the pool.

Upon weighing the tradeoffs, I think your suggestion makes sense.

interval time.Duration

parentCtx context.Context
once sync.Once // ensure the StartRefreshLoop is only called once.
done chan struct{}

logger logr.Logger
}

type PodMetricsClient interface {
FetchMetrics(ctx context.Context, pod *Pod, existing *Metrics, port int32) (*Metrics, error)
}

func (pm *podMetrics) GetPod() *Pod {
return (*Pod)(atomic.LoadPointer(&pm.pod))
}

func (pm *podMetrics) GetMetrics() *Metrics {
return (*Metrics)(atomic.LoadPointer(&pm.metrics))
}

func (pm *podMetrics) UpdatePod(pod *Pod) {
atomic.StorePointer(&pm.pod, unsafe.Pointer(pod))
}

// start starts a goroutine exactly once to periodically update metrics. The goroutine will be
// stopped either when stop() is called, or the parentCtx is cancelled.
func (pm *podMetrics) startRefreshLoop() {
pm.once.Do(func() {
go func() {
pm.logger.V(logutil.DEFAULT).Info("Starting refresher", "pod", pm.GetPod())
for {
select {
case <-pm.done:
return
case <-pm.parentCtx.Done():
return
default:
}

err := pm.refreshMetrics()
if err != nil {
pm.logger.V(logutil.TRACE).Error(err, "Failed to refresh metrics", "pod", pm.GetPod())
}

time.Sleep(pm.interval)
}
}()
})
}

func (pm *podMetrics) refreshMetrics() error {
ctx, cancel := context.WithTimeout(context.Background(), fetchMetricsTimeout)
defer cancel()
updated, err := pm.pmc.FetchMetrics(ctx, pm.GetPod(), pm.GetMetrics(), pm.targetPort)
if err != nil {
// As refresher is running in the background, it's possible that the pod is deleted but
// the refresh goroutine doesn't read the done channel yet. In this case, we just return nil.
// The refresher will be stopped after this interval.
return nil
}
updated.UpdateTime = time.Now()

pm.logger.V(logutil.TRACE).Info("Refreshed metrics", "updated", updated)

atomic.StorePointer(&pm.metrics, unsafe.Pointer(updated))
return nil
}

func (pm *podMetrics) StopRefreshLoop() {
pm.logger.V(logutil.DEFAULT).Info("Stopping refresher", "pod", pm.GetPod())
close(pm.done)
}
80 changes: 80 additions & 0 deletions pkg/epp/backend/metrics/pod_metrics_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
/*
Copyright 2025 The Kubernetes Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package metrics

import (
"context"
"testing"
"time"

"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
"github.com/stretchr/testify/assert"
"k8s.io/apimachinery/pkg/types"
)

var (
pod1 = &Pod{
NamespacedName: types.NamespacedName{
Name: "pod1",
},
}
metrics = &Metrics{
WaitingQueueSize: 0,
KVCacheUsagePercent: 0.2,
MaxActiveModels: 2,
ActiveModels: map[string]int{
"foo": 1,
"bar": 1,
},
}
updated = &Metrics{
WaitingQueueSize: 9999,
KVCacheUsagePercent: 0.99,
MaxActiveModels: 99,
ActiveModels: map[string]int{
"foo": 1,
"bar": 1,
},
}
)

func TestMetricsRefresh(t *testing.T) {
ctx := context.Background()
pmc := &FakePodMetricsClient{}
pmf := NewPodMetricsFactory(pmc, time.Millisecond)
// The refresher is initialized with empty metrics.
pm := pmf.NewPodMetrics(ctx, pod1, 8000)

// Use SetRes to simulate an update of metrics from the pod.
// Verify that the metrics are updated.
pmc.SetRes(map[types.NamespacedName]*Metrics{
pod1.NamespacedName: metrics,
})
condition := func(collect *assert.CollectT) {
assert.True(collect, cmp.Equal(pm.GetMetrics(), metrics, cmpopts.IgnoreFields(Metrics{}, "UpdateTime")))
}
assert.EventuallyWithT(t, condition, time.Second, time.Millisecond)

// Stop the loop, and simulate metric update again, this time the PodMetrics won't get the
// new update.
pm.StopRefreshLoop()
pmc.SetRes(map[types.NamespacedName]*Metrics{
pod1.NamespacedName: updated,
})
// Still expect the same condition (no metrics update).
assert.EventuallyWithT(t, condition, time.Second, time.Millisecond)
}
Loading