Skip to content

Commit e80791b

Browse files
authored
Merge pull request #33 from liu-cong/fix
Fix mutierr appending; add a unit test.
2 parents 2285e94 + 3d5ee6c commit e80791b

File tree

4 files changed

+156
-20
lines changed

4 files changed

+156
-20
lines changed

pkg/ext-proc/backend/fake.go

+3-1
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
package backend
22

3+
import "context"
4+
35
type FakePodLister struct {
46
Err error
57
Pods PodSet
@@ -10,7 +12,7 @@ type FakePodMetricsClient struct {
1012
Res map[Pod]*PodMetrics
1113
}
1214

13-
func (f *FakePodMetricsClient) FetchMetrics(pod Pod, existing *PodMetrics) (*PodMetrics, error) {
15+
func (f *FakePodMetricsClient) FetchMetrics(ctx context.Context, pod Pod, existing *PodMetrics) (*PodMetrics, error) {
1416
if err, ok := f.Err[pod]; ok {
1517
return nil, err
1618
}

pkg/ext-proc/backend/provider.go

+29-10
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package backend
22

33
import (
4+
"context"
45
"fmt"
56
"sync"
67
"time"
@@ -9,6 +10,10 @@ import (
910
klog "k8s.io/klog/v2"
1011
)
1112

13+
const (
14+
fetchMetricsTimeout = 5 * time.Second
15+
)
16+
1217
func NewProvider(pmc PodMetricsClient, pl PodLister) *Provider {
1318
p := &Provider{
1419
podMetrics: sync.Map{},
@@ -27,7 +32,7 @@ type Provider struct {
2732
}
2833

2934
type PodMetricsClient interface {
30-
FetchMetrics(pod Pod, existing *PodMetrics) (*PodMetrics, error)
35+
FetchMetrics(ctx context.Context, pod Pod, existing *PodMetrics) (*PodMetrics, error)
3136
}
3237

3338
type PodLister interface {
@@ -60,7 +65,8 @@ func (p *Provider) Init(refreshPodsInterval, refreshMetricsInterval time.Duratio
6065
if err := p.refreshPodsOnce(); err != nil {
6166
return fmt.Errorf("failed to init pods: %v", err)
6267
}
63-
if err := p.refreshMetricsOnce(); err != nil {
68+
err := p.refreshMetricsOnce()
69+
if err != nil {
6470
return fmt.Errorf("failed to init metrics: %v", err)
6571
}
6672

@@ -132,35 +138,48 @@ func (p *Provider) refreshPodsOnce() error {
132138
}
133139

134140
func (p *Provider) refreshMetricsOnce() error {
141+
ctx, cancel := context.WithTimeout(context.Background(), fetchMetricsTimeout)
142+
defer cancel()
135143
start := time.Now()
136144
defer func() {
137145
d := time.Since(start)
138146
// TODO: add a metric instead of logging
139147
klog.V(4).Infof("Refreshed metrics in %v", d)
140148
}()
141149
var wg sync.WaitGroup
142-
var errs error
150+
errCh := make(chan error)
143151
processOnePod := func(key, value any) bool {
144152
klog.V(4).Infof("Processing pod %v and metric %v", key, value)
145153
pod := key.(Pod)
146154
existing := value.(*PodMetrics)
147155
wg.Add(1)
148156
go func() {
149157
defer wg.Done()
150-
updated, err := p.pmc.FetchMetrics(pod, existing)
158+
updated, err := p.pmc.FetchMetrics(ctx, pod, existing)
151159
if err != nil {
152-
multierr.Append(errs, fmt.Errorf("failed to parse metrics from %s: %v", pod, err))
160+
errCh <- fmt.Errorf("failed to parse metrics from %s: %v", pod, err)
153161
return
154162
}
155-
klog.V(4).Infof("Updated metrics for pod %s: %v", pod, updated.Metrics)
156-
if err != nil {
157-
multierr.Append(errs, fmt.Errorf("failed to get all pod metrics updated from prometheus: %v", err))
158-
}
159163
p.UpdatePodMetrics(pod, updated)
164+
klog.V(4).Infof("Updated metrics for pod %s: %v", pod, updated.Metrics)
160165
}()
161166
return true
162167
}
163168
p.podMetrics.Range(processOnePod)
164-
wg.Wait()
169+
170+
// Wait for metric collection for all pods to complete and close the error channel in a
171+
// goroutine so this is unblocking, allowing the code to proceed to the error collection code
172+
// below.
173+
// Note we couldn't use a buffered error channel with a size because the size of the podMetrics
174+
// sync.Map is unknown beforehand.
175+
go func() {
176+
wg.Wait()
177+
close(errCh)
178+
}()
179+
180+
var errs error
181+
for err := range errCh {
182+
errs = multierr.Append(errs, err)
183+
}
165184
return errs
166185
}

pkg/ext-proc/backend/provider_test.go

+111
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,111 @@
1+
package backend
2+
3+
import (
4+
"errors"
5+
"testing"
6+
"time"
7+
8+
"github.com/google/go-cmp/cmp"
9+
"github.com/google/go-cmp/cmp/cmpopts"
10+
)
11+
12+
var (
13+
pod1 = &PodMetrics{
14+
Pod: Pod{Name: "pod1"},
15+
Metrics: Metrics{
16+
WaitingQueueSize: 0,
17+
KVCacheUsagePercent: 0.2,
18+
MaxActiveModels: 2,
19+
ActiveModels: map[string]int{
20+
"foo": 1,
21+
"bar": 1,
22+
},
23+
},
24+
}
25+
pod2 = &PodMetrics{
26+
Pod: Pod{Name: "pod2"},
27+
Metrics: Metrics{
28+
WaitingQueueSize: 1,
29+
KVCacheUsagePercent: 0.2,
30+
MaxActiveModels: 2,
31+
ActiveModels: map[string]int{
32+
"foo1": 1,
33+
"bar1": 1,
34+
},
35+
},
36+
}
37+
)
38+
39+
func TestProvider(t *testing.T) {
40+
tests := []struct {
41+
name string
42+
pmc PodMetricsClient
43+
pl PodLister
44+
initErr bool
45+
want []*PodMetrics
46+
}{
47+
{
48+
name: "Init success",
49+
pl: &FakePodLister{
50+
Pods: map[Pod]bool{
51+
pod1.Pod: true,
52+
pod2.Pod: true,
53+
},
54+
},
55+
pmc: &FakePodMetricsClient{
56+
Res: map[Pod]*PodMetrics{
57+
pod1.Pod: pod1,
58+
pod2.Pod: pod2,
59+
},
60+
},
61+
want: []*PodMetrics{pod1, pod2},
62+
},
63+
{
64+
name: "Fetch metrics error",
65+
pl: &FakePodLister{
66+
Pods: map[Pod]bool{
67+
pod1.Pod: true,
68+
pod2.Pod: true,
69+
},
70+
},
71+
pmc: &FakePodMetricsClient{
72+
Err: map[Pod]error{
73+
pod2.Pod: errors.New("injected error"),
74+
},
75+
Res: map[Pod]*PodMetrics{
76+
pod1.Pod: pod1,
77+
},
78+
},
79+
initErr: true,
80+
want: []*PodMetrics{
81+
pod1,
82+
// Failed to fetch pod2 metrics so it remains the default values.
83+
&PodMetrics{
84+
Pod: Pod{Name: "pod2"},
85+
Metrics: Metrics{
86+
WaitingQueueSize: 0,
87+
KVCacheUsagePercent: 0,
88+
MaxActiveModels: 0,
89+
ActiveModels: map[string]int{},
90+
},
91+
}},
92+
},
93+
}
94+
95+
for _, test := range tests {
96+
t.Run(test.name, func(t *testing.T) {
97+
p := NewProvider(test.pmc, test.pl)
98+
err := p.Init(time.Millisecond, time.Millisecond)
99+
if test.initErr != (err != nil) {
100+
t.Fatalf("Unexpected error, got: %v, want: %v", err, test.initErr)
101+
}
102+
metrics := p.AllPodMetrics()
103+
lessFunc := func(a, b *PodMetrics) bool {
104+
return a.String() < b.String()
105+
}
106+
if diff := cmp.Diff(test.want, metrics, cmpopts.SortSlices(lessFunc)); diff != "" {
107+
t.Errorf("Unexpected output (-want +got): %v", diff)
108+
}
109+
})
110+
}
111+
}

pkg/ext-proc/backend/vllm/metrics.go

+13-9
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
package vllm
33

44
import (
5+
"context"
56
"ext-proc/backend"
67
"fmt"
78
"net/http"
@@ -15,8 +16,7 @@ import (
1516
)
1617

1718
const (
18-
ActiveLoRAAdaptersMetricName = "vllm:info_active_adapters_info"
19-
LoRAAdapterPendingRequestMetricName = "vllm:active_lora_adapters"
19+
ActiveLoRAAdaptersMetricName = "vllm:info_active_adapters_info"
2020
// TODO: Replace these with the num_tokens_running/waiting below once we add those to the fork.
2121
RunningQueueSizeMetricName = "vllm:num_requests_running"
2222
WaitingQueueSizeMetricName = "vllm:num_requests_waiting"
@@ -32,11 +32,15 @@ type PodMetricsClientImpl struct {
3232
}
3333

3434
// FetchMetrics fetches metrics from a given pod.
35-
func (p *PodMetricsClientImpl) FetchMetrics(pod backend.Pod, existing *backend.PodMetrics) (*backend.PodMetrics, error) {
35+
func (p *PodMetricsClientImpl) FetchMetrics(ctx context.Context, pod backend.Pod, existing *backend.PodMetrics) (*backend.PodMetrics, error) {
3636
// Currently the metrics endpoint is hard-coded, which works with vLLM.
3737
// TODO(https://github.com/kubernetes-sigs/llm-instance-gateway/issues/16): Consume this from LLMServerPool config.
3838
url := fmt.Sprintf("http://%s/metrics", pod.Address)
39-
resp, err := http.Get(url)
39+
req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
40+
if err != nil {
41+
return nil, fmt.Errorf("failed to create request: %v", err)
42+
}
43+
resp, err := http.DefaultClient.Do(req)
4044
if err != nil {
4145
klog.Errorf("failed to fetch metrics from %s: %v", pod, err)
4246
return nil, fmt.Errorf("failed to fetch metrics from %s: %w", pod, err)
@@ -63,23 +67,23 @@ func promToPodMetrics(metricFamilies map[string]*dto.MetricFamily, existing *bac
6367
var errs error
6468
updated := existing.Clone()
6569
runningQueueSize, _, err := getLatestMetric(metricFamilies, RunningQueueSizeMetricName)
66-
multierr.Append(errs, err)
70+
errs = multierr.Append(errs, err)
6771
if err == nil {
6872
updated.RunningQueueSize = int(runningQueueSize.GetGauge().GetValue())
6973
}
7074
waitingQueueSize, _, err := getLatestMetric(metricFamilies, WaitingQueueSizeMetricName)
71-
multierr.Append(errs, err)
75+
errs = multierr.Append(errs, err)
7276
if err == nil {
7377
updated.WaitingQueueSize = int(waitingQueueSize.GetGauge().GetValue())
7478
}
7579
cachePercent, _, err := getLatestMetric(metricFamilies, KVCacheUsagePercentMetricName)
76-
multierr.Append(errs, err)
80+
errs = multierr.Append(errs, err)
7781
if err == nil {
7882
updated.KVCacheUsagePercent = cachePercent.GetGauge().GetValue()
7983
}
8084
/* TODO: uncomment once this is available in vllm.
8185
kvCap, _, err := getGaugeLatestValue(metricFamilies, KvCacheMaxTokenCapacityMetricName)
82-
multierr.Append(errs, err)
86+
errs = multierr.Append(errs, err)
8387
if err != nil {
8488
updated.KvCacheMaxTokenCapacity = int(kvCap)
8589
}
@@ -107,7 +111,7 @@ func promToPodMetrics(metricFamilies map[string]*dto.MetricFamily, existing *bac
107111
}
108112
} else {
109113
klog.Warningf("metric family %q not found", ActiveLoRAAdaptersMetricName)
110-
multierr.Append(errs, fmt.Errorf("metric family %q not found", ActiveLoRAAdaptersMetricName))
114+
errs = multierr.Append(errs, fmt.Errorf("metric family %q not found", ActiveLoRAAdaptersMetricName))
111115
}
112116

113117
return updated, errs

0 commit comments

Comments
 (0)