forked from kubernetes-sigs/gateway-api-inference-extension
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmetrics.go
135 lines (127 loc) · 4.49 KB
/
metrics.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
package backend
import (
"fmt"
"strings"
"sync"
"time"
dto "github.com/prometheus/client_model/go"
"go.uber.org/multierr"
klog "k8s.io/klog/v2"
)
const (
ActiveLoRAAdaptersMetricName = "vllm:info_active_adapters_info"
LoRAAdapterPendingRequestMetricName = "vllm:active_lora_adapters"
// TODO: Replace these with the num_tokens_running/waiting below once we add those to the fork.
RunningQueueSizeMetricName = "vllm:num_requests_running"
WaitingQueueSizeMetricName = "vllm:num_requests_waiting"
/* TODO: Uncomment this once the following are added to the fork.
RunningQueueSizeMetricName = "vllm:num_tokens_running"
WaitingQueueSizeMetricName = "vllm:num_tokens_waiting"
*/
KVCacheUsagePercentMetricName = "vllm:gpu_cache_usage_perc"
KvCacheMaxTokenCapacityMetricName = "vllm:gpu_cache_max_token_capacity"
)
func (p *Provider) refreshMetricsOnce() error {
start := time.Now()
defer func() {
d := time.Now().Sub(start)
// TODO: add a metric instead of logging
klog.V(3).Infof("Refreshed metrics in %v", d)
}()
var wg sync.WaitGroup
var errs error
processOnePod := func(key, value any) bool {
pod := key.(Pod)
metrics := value.(*PodMetrics)
wg.Add(1)
go func() {
defer wg.Done()
metricFamilies, err := p.pmc.FetchMetrics(pod)
if err != nil {
multierr.Append(errs, fmt.Errorf("failed to parse metrics from %s: %v", pod, err))
return
}
updated, err := promToPodMetrics(metricFamilies, metrics)
klog.V(3).Infof("Updated metrics for pod %s: %v", pod, updated.Metrics)
if err != nil {
multierr.Append(errs, fmt.Errorf("failed to get all pod metrics updated from prometheus: %v", err))
}
p.UpdatePodMetrics(pod, updated)
}()
return true
}
p.podMetrics.Range(processOnePod)
wg.Wait()
return errs
}
// promToPodMetrics updates internal pod metrics with scraped prometheus metrics.
// A combined error is returned if errors occur in one or more metric processing.
// it returns a new PodMetrics pointer which can be used to atomically update the pod metrics map.
func promToPodMetrics(metricFamilies map[string]*dto.MetricFamily, existing *PodMetrics) (*PodMetrics, error) {
var errs error
updated := existing.Clone()
runningQueueSize, _, err := getLatestMetric(metricFamilies, RunningQueueSizeMetricName)
multierr.Append(errs, err)
if err != nil {
updated.RunningQueueSize = int(runningQueueSize.GetCounter().GetValue())
}
waitingQueueSize, _, err := getLatestMetric(metricFamilies, WaitingQueueSizeMetricName)
multierr.Append(errs, err)
if err != nil {
updated.WaitingQueueSize = int(waitingQueueSize.GetGauge().GetValue())
}
cachePercent, _, err := getLatestMetric(metricFamilies, KVCacheUsagePercentMetricName)
multierr.Append(errs, err)
if err != nil {
updated.KVCacheUsagePercent = cachePercent.GetGauge().GetValue()
}
/* TODO: uncomment once this is available in vllm.
kvCap, _, err := getGaugeLatestValue(metricFamilies, KvCacheMaxTokenCapacityMetricName)
multierr.Append(errs, err)
if err != nil {
updated.KvCacheMaxTokenCapacity = int(kvCap)
}
*/
// Update active loras
mf, ok := metricFamilies[ActiveLoRAAdaptersMetricName]
if ok {
// IMPORTANT: replace the map entries instead of appending to it.
updated.CachedModels = make(map[string]int)
for _, metric := range mf.GetMetric() {
for _, label := range metric.GetLabel() {
if label.GetName() == "active_adapters" {
if label.GetValue() != "" {
adapterList := strings.Split(label.GetValue(), ",")
for _, adapter := range adapterList {
updated.CachedModels[adapter] = 0
}
}
}
}
}
} else {
klog.Warningf("metric family %q not found", ActiveLoRAAdaptersMetricName)
multierr.Append(errs, fmt.Errorf("metric family %q not found", ActiveLoRAAdaptersMetricName))
}
return updated, errs
}
// getLatestMetric gets the latest metric of a family. This should be used to get the latest Gauge metric.
func getLatestMetric(metricFamilies map[string]*dto.MetricFamily, metricName string) (*dto.Metric, time.Time, error) {
mf, ok := metricFamilies[metricName]
if !ok {
klog.Warningf("metric family %q not found", metricName)
return nil, time.Time{}, fmt.Errorf("metric family %q not found", metricName)
}
if len(mf.GetMetric()) == 0 {
return nil, time.Time{}, fmt.Errorf("no metrics available for %q", metricName)
}
var latestTs int64
var latest *dto.Metric
for _, m := range mf.GetMetric() {
if m.GetTimestampMs() > latestTs {
latestTs = m.GetTimestampMs()
latest = m
}
}
return latest, time.Unix(0, latestTs*1000), nil
}