Skip to content

Commit 0d08a07

Browse files
authored
fix metric scrape port not updated when inference pool target port updated (#417)
* fix metric scrape port not updated when inference pool target port updated Signed-off-by: Kuromesi <[email protected]> * bug fix Signed-off-by: Kuromesi <[email protected]> * fix ut Signed-off-by: Kuromesi <[email protected]> * add log Signed-off-by: Kuromesi <[email protected]> --------- Signed-off-by: Kuromesi <[email protected]>
1 parent 5137c59 commit 0d08a07

File tree

7 files changed

+27
-25
lines changed

7 files changed

+27
-25
lines changed

pkg/epp/backend/fake.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ type FakePodMetricsClient struct {
3131
Res map[types.NamespacedName]*datastore.PodMetrics
3232
}
3333

34-
func (f *FakePodMetricsClient) FetchMetrics(ctx context.Context, existing *datastore.PodMetrics) (*datastore.PodMetrics, error) {
34+
func (f *FakePodMetricsClient) FetchMetrics(ctx context.Context, existing *datastore.PodMetrics, port int32) (*datastore.PodMetrics, error) {
3535
if err, ok := f.Err[existing.NamespacedName]; ok {
3636
return nil, err
3737
}

pkg/epp/backend/provider.go

+9-4
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ type Provider struct {
4949
}
5050

5151
type PodMetricsClient interface {
52-
FetchMetrics(ctx context.Context, existing *datastore.PodMetrics) (*datastore.PodMetrics, error)
52+
FetchMetrics(ctx context.Context, existing *datastore.PodMetrics, port int32) (*datastore.PodMetrics, error)
5353
}
5454

5555
func (p *Provider) Init(ctx context.Context, refreshMetricsInterval, refreshPrometheusMetricsInterval time.Duration) error {
@@ -105,6 +105,11 @@ func (p *Provider) Init(ctx context.Context, refreshMetricsInterval, refreshProm
105105

106106
func (p *Provider) refreshMetricsOnce(logger logr.Logger) error {
107107
loggerTrace := logger.V(logutil.TRACE)
108+
pool, _ := p.datastore.PoolGet()
109+
if pool == nil {
110+
loggerTrace.Info("No inference pool or not initialized")
111+
return nil
112+
}
108113
ctx, cancel := context.WithTimeout(context.Background(), fetchMetricsTimeout)
109114
defer cancel()
110115
start := time.Now()
@@ -113,6 +118,7 @@ func (p *Provider) refreshMetricsOnce(logger logr.Logger) error {
113118
// TODO: add a metric instead of logging
114119
loggerTrace.Info("Metrics refreshed", "duration", d)
115120
}()
121+
116122
var wg sync.WaitGroup
117123
errCh := make(chan error)
118124
processOnePod := func(key, value any) bool {
@@ -121,7 +127,7 @@ func (p *Provider) refreshMetricsOnce(logger logr.Logger) error {
121127
wg.Add(1)
122128
go func() {
123129
defer wg.Done()
124-
updated, err := p.pmc.FetchMetrics(ctx, existing)
130+
updated, err := p.pmc.FetchMetrics(ctx, existing, pool.Spec.TargetPortNumber)
125131
if err != nil {
126132
errCh <- fmt.Errorf("failed to parse metrics from %s: %v", existing.NamespacedName, err)
127133
return
@@ -151,8 +157,6 @@ func (p *Provider) refreshMetricsOnce(logger logr.Logger) error {
151157
}
152158

153159
func (p *Provider) flushPrometheusMetricsOnce(logger logr.Logger) {
154-
logger.V(logutil.DEBUG).Info("Flushing Prometheus Metrics")
155-
156160
pool, _ := p.datastore.PoolGet()
157161
if pool == nil {
158162
// No inference pool or not initialize.
@@ -163,6 +167,7 @@ func (p *Provider) flushPrometheusMetricsOnce(logger logr.Logger) {
163167
var queueTotal int
164168

165169
podMetrics := p.datastore.PodGetAll()
170+
logger.V(logutil.VERBOSE).Info("Flushing Prometheus Metrics", "ReadyPods", len(podMetrics))
166171
if len(podMetrics) == 0 {
167172
return
168173
}

pkg/epp/backend/provider_test.go

+8-1
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import (
2626
"github.com/google/go-cmp/cmp/cmpopts"
2727
"github.com/stretchr/testify/assert"
2828
"k8s.io/apimachinery/pkg/types"
29+
"sigs.k8s.io/gateway-api-inference-extension/api/v1alpha2"
2930
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datastore"
3031
)
3132

@@ -68,6 +69,12 @@ var (
6869
},
6970
},
7071
}
72+
73+
inferencePool = &v1alpha2.InferencePool{
74+
Spec: v1alpha2.InferencePoolSpec{
75+
TargetPortNumber: 8000,
76+
},
77+
}
7178
)
7279

7380
func TestProvider(t *testing.T) {
@@ -127,7 +134,7 @@ func TestProvider(t *testing.T) {
127134

128135
for _, test := range tests {
129136
t.Run(test.name, func(t *testing.T) {
130-
ds := datastore.NewFakeDatastore(test.storePods, nil, nil)
137+
ds := datastore.NewFakeDatastore(test.storePods, nil, inferencePool)
131138
p := NewProvider(test.pmc, ds)
132139
ctx, cancel := context.WithCancel(context.Background())
133140
defer cancel()

pkg/epp/backend/vllm/metrics.go

+3-1
Original file line numberDiff line numberDiff line change
@@ -55,13 +55,15 @@ type PodMetricsClientImpl struct{}
5555
func (p *PodMetricsClientImpl) FetchMetrics(
5656
ctx context.Context,
5757
existing *datastore.PodMetrics,
58+
port int32,
5859
) (*datastore.PodMetrics, error) {
5960
logger := log.FromContext(ctx)
6061
loggerDefault := logger.V(logutil.DEFAULT)
6162

6263
// Currently the metrics endpoint is hard-coded, which works with vLLM.
6364
// TODO(https://github.com/kubernetes-sigs/gateway-api-inference-extension/issues/16): Consume this from InferencePool config.
64-
url := existing.BuildScrapeEndpoint()
65+
url := "http://" + existing.Address + ":" + strconv.Itoa(int(port)) + "/metrics"
66+
6567
req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
6668
if err != nil {
6769
loggerDefault.Error(err, "Failed create HTTP request", "method", http.MethodGet, "url", url)

pkg/epp/controller/pod_reconciler_test.go

+4-4
Original file line numberDiff line numberDiff line change
@@ -35,10 +35,10 @@ import (
3535
)
3636

3737
var (
38-
basePod1 = &datastore.PodMetrics{Pod: datastore.Pod{NamespacedName: types.NamespacedName{Name: "pod1"}, Address: "address-1", ScrapePath: "/metrics", ScrapePort: 8000}}
39-
basePod2 = &datastore.PodMetrics{Pod: datastore.Pod{NamespacedName: types.NamespacedName{Name: "pod2"}, Address: "address-2", ScrapePath: "/metrics", ScrapePort: 8000}}
40-
basePod3 = &datastore.PodMetrics{Pod: datastore.Pod{NamespacedName: types.NamespacedName{Name: "pod3"}, Address: "address-3", ScrapePath: "/metrics", ScrapePort: 8000}}
41-
basePod11 = &datastore.PodMetrics{Pod: datastore.Pod{NamespacedName: types.NamespacedName{Name: "pod1"}, Address: "address-11", ScrapePath: "/metrics", ScrapePort: 8000}}
38+
basePod1 = &datastore.PodMetrics{Pod: datastore.Pod{NamespacedName: types.NamespacedName{Name: "pod1"}, Address: "address-1"}}
39+
basePod2 = &datastore.PodMetrics{Pod: datastore.Pod{NamespacedName: types.NamespacedName{Name: "pod2"}, Address: "address-2"}}
40+
basePod3 = &datastore.PodMetrics{Pod: datastore.Pod{NamespacedName: types.NamespacedName{Name: "pod3"}, Address: "address-3"}}
41+
basePod11 = &datastore.PodMetrics{Pod: datastore.Pod{NamespacedName: types.NamespacedName{Name: "pod1"}, Address: "address-11"}}
4242
)
4343

4444
func TestPodReconciler(t *testing.T) {

pkg/epp/datastore/datastore.go

+1-4
Original file line numberDiff line numberDiff line change
@@ -263,16 +263,13 @@ func (ds *datastore) PodDelete(namespacedName types.NamespacedName) {
263263
}
264264

265265
func (ds *datastore) PodUpdateOrAddIfNotExist(pod *corev1.Pod) bool {
266-
pool, _ := ds.PoolGet()
267266
new := &PodMetrics{
268267
Pod: Pod{
269268
NamespacedName: types.NamespacedName{
270269
Name: pod.Name,
271270
Namespace: pod.Namespace,
272271
},
273-
Address: pod.Status.PodIP,
274-
ScrapePath: "/metrics",
275-
ScrapePort: pool.Spec.TargetPortNumber,
272+
Address: pod.Status.PodIP,
276273
},
277274
Metrics: Metrics{
278275
ActiveModels: make(map[string]int),

pkg/epp/datastore/types.go

+1-10
Original file line numberDiff line numberDiff line change
@@ -26,10 +26,6 @@ import (
2626
type Pod struct {
2727
NamespacedName types.NamespacedName
2828
Address string
29-
30-
// metrics scrape options
31-
ScrapePort int32
32-
ScrapePath string
3329
}
3430

3531
type Metrics struct {
@@ -61,11 +57,10 @@ func (pm *PodMetrics) Clone() *PodMetrics {
6157
Pod: Pod{
6258
NamespacedName: pm.NamespacedName,
6359
Address: pm.Address,
64-
ScrapePort: pm.ScrapePort,
65-
ScrapePath: pm.ScrapePath,
6660
},
6761
Metrics: Metrics{
6862
ActiveModels: cm,
63+
MaxActiveModels: pm.MaxActiveModels,
6964
RunningQueueSize: pm.RunningQueueSize,
7065
WaitingQueueSize: pm.WaitingQueueSize,
7166
KVCacheUsagePercent: pm.KVCacheUsagePercent,
@@ -74,7 +69,3 @@ func (pm *PodMetrics) Clone() *PodMetrics {
7469
}
7570
return clone
7671
}
77-
78-
func (pm *PodMetrics) BuildScrapeEndpoint() string {
79-
return fmt.Sprintf("http://%s:%d%s", pm.Address, pm.ScrapePort, pm.ScrapePath)
80-
}

0 commit comments

Comments
 (0)