Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Redesign EPP Metrics Pipeline to be Model Server Agnostic #461

Merged
merged 19 commits into from
Mar 14, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
214905d
start adding metrics changes for trion support
BenjaminBraunDev Mar 4, 2025
6125054
Refactor metrics to work with any prometheus metric naming convention…
BenjaminBraunDev Mar 6, 2025
71e00ad
Finalize metric refactor and testing.
BenjaminBraunDev Mar 6, 2025
dd2825f
Set streaming env var to false in triton ext_proc.yaml
BenjaminBraunDev Mar 6, 2025
aa2ee06
Update titon server deployment to pull frozen repo branch instead of …
BenjaminBraunDev Mar 6, 2025
d4c083e
Remove model server specific metric files and tests and point EPP ima…
BenjaminBraunDev Mar 6, 2025
df3f3e3
Remove commented prints and old comments.
BenjaminBraunDev Mar 7, 2025
558132e
Remove triton support for now, make metrics mapping 1-to-1 with load …
BenjaminBraunDev Mar 7, 2025
5838459
moved files for cleaner diff
BenjaminBraunDev Mar 7, 2025
1c367a6
re-add todos and rename kv flag to reflect percentage usage.
BenjaminBraunDev Mar 7, 2025
3356bd3
Fix nits, move logging channel for backend/metrics.go from default to…
BenjaminBraunDev Mar 13, 2025
371fd58
Rebase into metric agnostic redesign.
BenjaminBraunDev Mar 13, 2025
97fd0de
Merge getLatestMetric and getLabeledMetric.
BenjaminBraunDev Mar 14, 2025
27b34e9
Remove unused datastore types.
BenjaminBraunDev Mar 14, 2025
4b84744
Fix lint.
BenjaminBraunDev Mar 14, 2025
66e0376
Remove log and fix nits.
BenjaminBraunDev Mar 14, 2025
9f4859b
Move ext_proc and inferencemodel yaml files back, fix nits and remove…
BenjaminBraunDev Mar 14, 2025
c082e86
Remove the rest of logging from metrics.go and tests.
BenjaminBraunDev Mar 14, 2025
81ee1e6
Add trace log to podmetrics and small warning fix to metrics_spec_test.
BenjaminBraunDev Mar 14, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 24 additions & 2 deletions cmd/epp/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ import (
"sigs.k8s.io/controller-runtime/pkg/metrics/filters"
"sigs.k8s.io/gateway-api-inference-extension/internal/runnable"
backendmetrics "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend/metrics"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend/vllm"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datastore"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/metrics"
runserver "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/server"
Expand Down Expand Up @@ -92,6 +91,17 @@ var (
"certPath", "", "The path to the certificate for secure serving. The certificate and private key files "+
"are assumed to be named tls.crt and tls.key, respectively. If not set, and secureServing is enabled, "+
"then a self-signed certificate is used.")
// metric flags
totalQueuedRequestsMetric = flag.String("totalQueuedRequestsMetric",
"vllm:num_requests_waiting",
"Prometheus metric for the number of queued requests.")
kvCacheUsagePercentageMetric = flag.String("kvCacheUsagePercentageMetric",
"vllm:gpu_cache_usage_perc",
"Prometheus metric for the fraction of KV-cache blocks currently in use (from 0 to 1).")
// LoRA metrics
loraInfoMetric = flag.String("loraInfoMetric",
"vllm:lora_requests_info",
"Prometheus metric for the LoRA info metrics (must be in vLLM label format).")

setupLog = ctrl.Log.WithName("setup")
)
Expand Down Expand Up @@ -143,9 +153,21 @@ func run() error {

ctx := ctrl.SetupSignalHandler()

pmf := backendmetrics.NewPodMetricsFactory(&vllm.PodMetricsClientImpl{}, *refreshMetricsInterval)
// Set up mapper for metric scraping.
mapping, err := backendmetrics.NewMetricMapping(
*totalQueuedRequestsMetric,
*kvCacheUsagePercentageMetric,
*loraInfoMetric,
)
if err != nil {
setupLog.Error(err, "Failed to create metric mapping from flags.")
return err
}

pmf := backendmetrics.NewPodMetricsFactory(&backendmetrics.PodMetricsClientImpl{MetricMapping: mapping}, *refreshMetricsInterval)
// Setup runner.
datastore := datastore.NewDatastore(ctx, pmf)

serverRunner := &runserver.ExtProcServerRunner{
GrpcPort: *grpcPort,
DestinationEndpointHintMetadataNamespace: *destinationEndpointHintMetadataNamespace,
Expand Down
245 changes: 245 additions & 0 deletions pkg/epp/backend/metrics/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,245 @@
/*
Copyright 2025 The Kubernetes Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package metrics

import (
"context"
"fmt"
"net/http"
"strconv"
"strings"

dto "github.com/prometheus/client_model/go"
"github.com/prometheus/common/expfmt"
"go.uber.org/multierr"
)

const (
// LoRA metrics based on protocol
LoraInfoRunningAdaptersMetricName = "running_lora_adapters"
LoraInfoWaitingAdaptersMetricName = "waiting_lora_adapters"
LoraInfoMaxAdaptersMetricName = "max_lora"
)

type PodMetricsClientImpl struct {
MetricMapping *MetricMapping
}

// FetchMetrics fetches metrics from a given pod.
func (p *PodMetricsClientImpl) FetchMetrics(
ctx context.Context,
pod *Pod,
existing *Metrics,
port int32,
) (*Metrics, error) {

// Currently the metrics endpoint is hard-coded, which works with vLLM.
// TODO(https://github.com/kubernetes-sigs/gateway-api-inference-extension/issues/16): Consume this from InferencePool config.
url := "http://" + pod.Address + ":" + strconv.Itoa(int(port)) + "/metrics"

req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
if err != nil {
return nil, fmt.Errorf("failed to create request: %v", err)
}
resp, err := http.DefaultClient.Do(req)
if err != nil {
return nil, fmt.Errorf("failed to fetch metrics from %s: %w", pod.NamespacedName, err)
}
defer func() {
_ = resp.Body.Close()
}()

if resp.StatusCode != http.StatusOK {
return nil, fmt.Errorf("unexpected status code from %s: %v", pod.NamespacedName, resp.StatusCode)
}

parser := expfmt.TextParser{}
metricFamilies, err := parser.TextToMetricFamilies(resp.Body)
if err != nil {
return nil, err
}
return p.promToPodMetrics(metricFamilies, existing)
}

// promToPodMetrics updates internal pod metrics with scraped Prometheus metrics.
func (p *PodMetricsClientImpl) promToPodMetrics(
metricFamilies map[string]*dto.MetricFamily,
existing *Metrics,
) (*Metrics, error) {
var errs error
updated := existing.Clone()

if p.MetricMapping.TotalQueuedRequests != nil {
queued, err := p.getMetric(metricFamilies, *p.MetricMapping.TotalQueuedRequests)
if err == nil {
updated.WaitingQueueSize = int(queued.GetGauge().GetValue())
} else {
errs = multierr.Append(errs, err)
}
}

if p.MetricMapping.KVCacheUtilization != nil {
usage, err := p.getMetric(metricFamilies, *p.MetricMapping.KVCacheUtilization)
if err == nil {
updated.KVCacheUsagePercent = usage.GetGauge().GetValue()
} else {
errs = multierr.Append(errs, err)
}
}

// Handle LoRA metrics (only if all LoRA MetricSpecs are present)
if p.MetricMapping.LoraRequestInfo != nil {
loraMetrics, err := p.getLatestLoraMetric(metricFamilies)
errs = multierr.Append(errs, err)

if loraMetrics != nil {
updated.ActiveModels = make(map[string]int)
for _, label := range loraMetrics.GetLabel() {
if label.GetName() == LoraInfoRunningAdaptersMetricName {
if label.GetValue() != "" {
adapterList := strings.Split(label.GetValue(), ",")
for _, adapter := range adapterList {
updated.ActiveModels[adapter] = 0
}
}
}
if label.GetName() == LoraInfoWaitingAdaptersMetricName {
if label.GetValue() != "" {
adapterList := strings.Split(label.GetValue(), ",")
for _, adapter := range adapterList {
updated.ActiveModels[adapter] = 0
}
}
}
if label.GetName() == LoraInfoMaxAdaptersMetricName {
if label.GetValue() != "" {
updated.MaxActiveModels, err = strconv.Atoi(label.GetValue())
if err != nil {
errs = multierr.Append(errs, err)
}
}
}
}
}
}

return updated, errs
}

// getLatestLoraMetric gets latest lora metric series in gauge metric family `vllm:lora_requests_info`
// reason its specially fetched is because each label key value pair permutation generates new series
// and only most recent is useful. The value of each series is the creation timestamp so we can
// retrieve the latest by sorting the value.
func (p *PodMetricsClientImpl) getLatestLoraMetric(metricFamilies map[string]*dto.MetricFamily) (*dto.Metric, error) {
if p.MetricMapping.LoraRequestInfo == nil {
return nil, nil // No LoRA metrics configured
}

loraRequests, ok := metricFamilies[p.MetricMapping.LoraRequestInfo.MetricName]
if !ok {
return nil, fmt.Errorf("metric family %q not found", p.MetricMapping.LoraRequestInfo.MetricName)
}

var latest *dto.Metric
var latestTs float64 // Use float64, as Gauge.Value is float64

// Iterate over all metrics in the family.
for _, m := range loraRequests.GetMetric() {
running := ""
waiting := ""
// Check if the metric has the expected LoRA labels.
for _, lp := range m.GetLabel() {
switch lp.GetName() {
case LoraInfoRunningAdaptersMetricName:
running = lp.GetValue()
case LoraInfoWaitingAdaptersMetricName:
waiting = lp.GetValue()
}
}
// Ignore metrics with both labels empty.
if running == "" && waiting == "" {
continue
}

// Select the metric with the *largest Gauge Value* (which represents the timestamp).
if m.GetGauge().GetValue() > latestTs {
latestTs = m.GetGauge().GetValue()
latest = m
}
}
if latest == nil {
return nil, nil
}

return latest, nil // Convert nanoseconds to time.Time
}

// getMetric retrieves a specific metric based on MetricSpec.
func (p *PodMetricsClientImpl) getMetric(metricFamilies map[string]*dto.MetricFamily, spec MetricSpec) (*dto.Metric, error) {
mf, ok := metricFamilies[spec.MetricName]
if !ok {
return nil, fmt.Errorf("metric family %q not found", spec.MetricName)
}

if len(mf.GetMetric()) == 0 {
return nil, fmt.Errorf("no metrics available for %q", spec.MetricName)
}

return getLatestMetric(mf, &spec)
}

// getLabeledMetric gets the latest metric with matching labels.
func getLatestMetric(mf *dto.MetricFamily, spec *MetricSpec) (*dto.Metric, error) {
var latestMetric *dto.Metric
var latestTimestamp int64 = -1 // Initialize to -1 so any timestamp is greater

for _, m := range mf.GetMetric() {
if spec.Labels == nil || labelsMatch(m.GetLabel(), spec.Labels) {
if m.GetTimestampMs() > latestTimestamp {
latestTimestamp = m.GetTimestampMs()
latestMetric = m
}
}
}

if latestMetric != nil {
return latestMetric, nil
}

return nil, fmt.Errorf("no matching metric found for %q with labels %+v", spec.MetricName, spec.Labels)
}

// labelsMatch checks if a metric's labels contain all the labels in the spec.
func labelsMatch(metricLabels []*dto.LabelPair, specLabels map[string]string) bool {
if len(specLabels) == 0 {
return true // No specific labels required
}

for specName, specValue := range specLabels {
found := false
for _, label := range metricLabels {
if label.GetName() == specName && label.GetValue() == specValue {
found = true
break
}
}
if !found {
return false // A required label is missing
}
}
return true // All required labels are present
}
Loading