Skip to content

Commit a13179a

Browse files
Redesign EPP Metrics Pipeline to be Model Server Agnostic (#461)
* start adding metrics changes for trion support * Refactor metrics to work with any prometheus metric naming convention based on EPP runtime flags. * Finalize metric refactor and testing. * Set streaming env var to false in triton ext_proc.yaml * Update titon server deployment to pull frozen repo branch instead of main for consistency. * Remove model server specific metric files and tests and point EPP image to main AR instead of testing registry. * Remove commented prints and old comments. * Remove triton support for now, make metrics mapping 1-to-1 with load balancing metrics. * moved files for cleaner diff * re-add todos and rename kv flag to reflect percentage usage. * Fix nits, move logging channel for backend/metrics.go from default to trace, fix comments. * Rebase into metric agnostic redesign. * Merge getLatestMetric and getLabeledMetric. * Remove unused datastore types. * Fix lint. * Remove log and fix nits. * Move ext_proc and inferencemodel yaml files back, fix nits and remove all logging from metrics.go. * Remove the rest of logging from metrics.go and tests. * Add trace log to podmetrics and small warning fix to metrics_spec_test.
1 parent a1c95a5 commit a13179a

File tree

8 files changed

+1061
-489
lines changed

8 files changed

+1061
-489
lines changed

cmd/epp/main.go

+24-2
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,6 @@ import (
3838
"sigs.k8s.io/controller-runtime/pkg/metrics/filters"
3939
"sigs.k8s.io/gateway-api-inference-extension/internal/runnable"
4040
backendmetrics "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend/metrics"
41-
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend/vllm"
4241
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datastore"
4342
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/metrics"
4443
runserver "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/server"
@@ -92,6 +91,17 @@ var (
9291
"certPath", "", "The path to the certificate for secure serving. The certificate and private key files "+
9392
"are assumed to be named tls.crt and tls.key, respectively. If not set, and secureServing is enabled, "+
9493
"then a self-signed certificate is used.")
94+
// metric flags
95+
totalQueuedRequestsMetric = flag.String("totalQueuedRequestsMetric",
96+
"vllm:num_requests_waiting",
97+
"Prometheus metric for the number of queued requests.")
98+
kvCacheUsagePercentageMetric = flag.String("kvCacheUsagePercentageMetric",
99+
"vllm:gpu_cache_usage_perc",
100+
"Prometheus metric for the fraction of KV-cache blocks currently in use (from 0 to 1).")
101+
// LoRA metrics
102+
loraInfoMetric = flag.String("loraInfoMetric",
103+
"vllm:lora_requests_info",
104+
"Prometheus metric for the LoRA info metrics (must be in vLLM label format).")
95105

96106
setupLog = ctrl.Log.WithName("setup")
97107
)
@@ -143,9 +153,21 @@ func run() error {
143153

144154
ctx := ctrl.SetupSignalHandler()
145155

146-
pmf := backendmetrics.NewPodMetricsFactory(&vllm.PodMetricsClientImpl{}, *refreshMetricsInterval)
156+
// Set up mapper for metric scraping.
157+
mapping, err := backendmetrics.NewMetricMapping(
158+
*totalQueuedRequestsMetric,
159+
*kvCacheUsagePercentageMetric,
160+
*loraInfoMetric,
161+
)
162+
if err != nil {
163+
setupLog.Error(err, "Failed to create metric mapping from flags.")
164+
return err
165+
}
166+
167+
pmf := backendmetrics.NewPodMetricsFactory(&backendmetrics.PodMetricsClientImpl{MetricMapping: mapping}, *refreshMetricsInterval)
147168
// Setup runner.
148169
datastore := datastore.NewDatastore(ctx, pmf)
170+
149171
serverRunner := &runserver.ExtProcServerRunner{
150172
GrpcPort: *grpcPort,
151173
DestinationEndpointHintMetadataNamespace: *destinationEndpointHintMetadataNamespace,

pkg/epp/backend/metrics/metrics.go

+245
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,245 @@
1+
/*
2+
Copyright 2025 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package metrics
18+
19+
import (
20+
"context"
21+
"fmt"
22+
"net/http"
23+
"strconv"
24+
"strings"
25+
26+
dto "github.com/prometheus/client_model/go"
27+
"github.com/prometheus/common/expfmt"
28+
"go.uber.org/multierr"
29+
)
30+
31+
const (
32+
// LoRA metrics based on protocol
33+
LoraInfoRunningAdaptersMetricName = "running_lora_adapters"
34+
LoraInfoWaitingAdaptersMetricName = "waiting_lora_adapters"
35+
LoraInfoMaxAdaptersMetricName = "max_lora"
36+
)
37+
38+
type PodMetricsClientImpl struct {
39+
MetricMapping *MetricMapping
40+
}
41+
42+
// FetchMetrics fetches metrics from a given pod.
43+
func (p *PodMetricsClientImpl) FetchMetrics(
44+
ctx context.Context,
45+
pod *Pod,
46+
existing *Metrics,
47+
port int32,
48+
) (*Metrics, error) {
49+
50+
// Currently the metrics endpoint is hard-coded, which works with vLLM.
51+
// TODO(https://github.com/kubernetes-sigs/gateway-api-inference-extension/issues/16): Consume this from InferencePool config.
52+
url := "http://" + pod.Address + ":" + strconv.Itoa(int(port)) + "/metrics"
53+
54+
req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
55+
if err != nil {
56+
return nil, fmt.Errorf("failed to create request: %v", err)
57+
}
58+
resp, err := http.DefaultClient.Do(req)
59+
if err != nil {
60+
return nil, fmt.Errorf("failed to fetch metrics from %s: %w", pod.NamespacedName, err)
61+
}
62+
defer func() {
63+
_ = resp.Body.Close()
64+
}()
65+
66+
if resp.StatusCode != http.StatusOK {
67+
return nil, fmt.Errorf("unexpected status code from %s: %v", pod.NamespacedName, resp.StatusCode)
68+
}
69+
70+
parser := expfmt.TextParser{}
71+
metricFamilies, err := parser.TextToMetricFamilies(resp.Body)
72+
if err != nil {
73+
return nil, err
74+
}
75+
return p.promToPodMetrics(metricFamilies, existing)
76+
}
77+
78+
// promToPodMetrics updates internal pod metrics with scraped Prometheus metrics.
79+
func (p *PodMetricsClientImpl) promToPodMetrics(
80+
metricFamilies map[string]*dto.MetricFamily,
81+
existing *Metrics,
82+
) (*Metrics, error) {
83+
var errs error
84+
updated := existing.Clone()
85+
86+
if p.MetricMapping.TotalQueuedRequests != nil {
87+
queued, err := p.getMetric(metricFamilies, *p.MetricMapping.TotalQueuedRequests)
88+
if err == nil {
89+
updated.WaitingQueueSize = int(queued.GetGauge().GetValue())
90+
} else {
91+
errs = multierr.Append(errs, err)
92+
}
93+
}
94+
95+
if p.MetricMapping.KVCacheUtilization != nil {
96+
usage, err := p.getMetric(metricFamilies, *p.MetricMapping.KVCacheUtilization)
97+
if err == nil {
98+
updated.KVCacheUsagePercent = usage.GetGauge().GetValue()
99+
} else {
100+
errs = multierr.Append(errs, err)
101+
}
102+
}
103+
104+
// Handle LoRA metrics (only if all LoRA MetricSpecs are present)
105+
if p.MetricMapping.LoraRequestInfo != nil {
106+
loraMetrics, err := p.getLatestLoraMetric(metricFamilies)
107+
errs = multierr.Append(errs, err)
108+
109+
if loraMetrics != nil {
110+
updated.ActiveModels = make(map[string]int)
111+
for _, label := range loraMetrics.GetLabel() {
112+
if label.GetName() == LoraInfoRunningAdaptersMetricName {
113+
if label.GetValue() != "" {
114+
adapterList := strings.Split(label.GetValue(), ",")
115+
for _, adapter := range adapterList {
116+
updated.ActiveModels[adapter] = 0
117+
}
118+
}
119+
}
120+
if label.GetName() == LoraInfoWaitingAdaptersMetricName {
121+
if label.GetValue() != "" {
122+
adapterList := strings.Split(label.GetValue(), ",")
123+
for _, adapter := range adapterList {
124+
updated.ActiveModels[adapter] = 0
125+
}
126+
}
127+
}
128+
if label.GetName() == LoraInfoMaxAdaptersMetricName {
129+
if label.GetValue() != "" {
130+
updated.MaxActiveModels, err = strconv.Atoi(label.GetValue())
131+
if err != nil {
132+
errs = multierr.Append(errs, err)
133+
}
134+
}
135+
}
136+
}
137+
}
138+
}
139+
140+
return updated, errs
141+
}
142+
143+
// getLatestLoraMetric gets latest lora metric series in gauge metric family `vllm:lora_requests_info`
144+
// reason its specially fetched is because each label key value pair permutation generates new series
145+
// and only most recent is useful. The value of each series is the creation timestamp so we can
146+
// retrieve the latest by sorting the value.
147+
func (p *PodMetricsClientImpl) getLatestLoraMetric(metricFamilies map[string]*dto.MetricFamily) (*dto.Metric, error) {
148+
if p.MetricMapping.LoraRequestInfo == nil {
149+
return nil, nil // No LoRA metrics configured
150+
}
151+
152+
loraRequests, ok := metricFamilies[p.MetricMapping.LoraRequestInfo.MetricName]
153+
if !ok {
154+
return nil, fmt.Errorf("metric family %q not found", p.MetricMapping.LoraRequestInfo.MetricName)
155+
}
156+
157+
var latest *dto.Metric
158+
var latestTs float64 // Use float64, as Gauge.Value is float64
159+
160+
// Iterate over all metrics in the family.
161+
for _, m := range loraRequests.GetMetric() {
162+
running := ""
163+
waiting := ""
164+
// Check if the metric has the expected LoRA labels.
165+
for _, lp := range m.GetLabel() {
166+
switch lp.GetName() {
167+
case LoraInfoRunningAdaptersMetricName:
168+
running = lp.GetValue()
169+
case LoraInfoWaitingAdaptersMetricName:
170+
waiting = lp.GetValue()
171+
}
172+
}
173+
// Ignore metrics with both labels empty.
174+
if running == "" && waiting == "" {
175+
continue
176+
}
177+
178+
// Select the metric with the *largest Gauge Value* (which represents the timestamp).
179+
if m.GetGauge().GetValue() > latestTs {
180+
latestTs = m.GetGauge().GetValue()
181+
latest = m
182+
}
183+
}
184+
if latest == nil {
185+
return nil, nil
186+
}
187+
188+
return latest, nil // Convert nanoseconds to time.Time
189+
}
190+
191+
// getMetric retrieves a specific metric based on MetricSpec.
192+
func (p *PodMetricsClientImpl) getMetric(metricFamilies map[string]*dto.MetricFamily, spec MetricSpec) (*dto.Metric, error) {
193+
mf, ok := metricFamilies[spec.MetricName]
194+
if !ok {
195+
return nil, fmt.Errorf("metric family %q not found", spec.MetricName)
196+
}
197+
198+
if len(mf.GetMetric()) == 0 {
199+
return nil, fmt.Errorf("no metrics available for %q", spec.MetricName)
200+
}
201+
202+
return getLatestMetric(mf, &spec)
203+
}
204+
205+
// getLabeledMetric gets the latest metric with matching labels.
206+
func getLatestMetric(mf *dto.MetricFamily, spec *MetricSpec) (*dto.Metric, error) {
207+
var latestMetric *dto.Metric
208+
var latestTimestamp int64 = -1 // Initialize to -1 so any timestamp is greater
209+
210+
for _, m := range mf.GetMetric() {
211+
if spec.Labels == nil || labelsMatch(m.GetLabel(), spec.Labels) {
212+
if m.GetTimestampMs() > latestTimestamp {
213+
latestTimestamp = m.GetTimestampMs()
214+
latestMetric = m
215+
}
216+
}
217+
}
218+
219+
if latestMetric != nil {
220+
return latestMetric, nil
221+
}
222+
223+
return nil, fmt.Errorf("no matching metric found for %q with labels %+v", spec.MetricName, spec.Labels)
224+
}
225+
226+
// labelsMatch checks if a metric's labels contain all the labels in the spec.
227+
func labelsMatch(metricLabels []*dto.LabelPair, specLabels map[string]string) bool {
228+
if len(specLabels) == 0 {
229+
return true // No specific labels required
230+
}
231+
232+
for specName, specValue := range specLabels {
233+
found := false
234+
for _, label := range metricLabels {
235+
if label.GetName() == specName && label.GetValue() == specValue {
236+
found = true
237+
break
238+
}
239+
}
240+
if !found {
241+
return false // A required label is missing
242+
}
243+
}
244+
return true // All required labels are present
245+
}

0 commit comments

Comments
 (0)