diff --git a/examples/poc/manifests/vllm/vllm-lora-deployment.yaml b/examples/poc/manifests/vllm/vllm-lora-deployment.yaml index 4f990c9e..a453eb7e 100644 --- a/examples/poc/manifests/vllm/vllm-lora-deployment.yaml +++ b/examples/poc/manifests/vllm/vllm-lora-deployment.yaml @@ -30,7 +30,7 @@ spec: spec: containers: - name: lora - image: "ghcr.io/tomatillo-and-multiverse/vllm:demo" + image: "vllm/vllm-openai:latest" imagePullPolicy: Always command: ["python3", "-m", "vllm.entrypoints.openai.api_server"] args: @@ -40,7 +40,6 @@ spec: - "1" - "--port" - "8000" - - "--disable-log-requests" - "--enable-lora" - "--max-loras" - "4" diff --git a/go.mod b/go.mod index 6daef914..5df490da 100644 --- a/go.mod +++ b/go.mod @@ -13,6 +13,7 @@ require ( github.com/onsi/gomega v1.36.0 github.com/prometheus/client_model v0.6.1 github.com/prometheus/common v0.61.0 + github.com/stretchr/testify v1.10.0 go.uber.org/multierr v1.11.0 google.golang.org/grpc v1.68.0 google.golang.org/protobuf v1.35.2 @@ -70,6 +71,7 @@ require ( github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect github.com/pkg/errors v0.9.1 // indirect github.com/planetscale/vtprotobuf v0.6.1-0.20240319094008-0393e58bdf10 // indirect + github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect github.com/prometheus/client_golang v1.20.4 // indirect github.com/prometheus/procfs v0.15.1 // indirect github.com/shopspring/decimal v1.2.0 // indirect diff --git a/pkg/ext-proc/backend/vllm/metrics.go b/pkg/ext-proc/backend/vllm/metrics.go index f4540adf..bd594df9 100644 --- a/pkg/ext-proc/backend/vllm/metrics.go +++ b/pkg/ext-proc/backend/vllm/metrics.go @@ -5,19 +5,21 @@ import ( "context" "fmt" "net/http" + "strconv" "strings" "time" - "inference.networking.x-k8s.io/llm-instance-gateway/pkg/ext-proc/backend" - dto "github.com/prometheus/client_model/go" "github.com/prometheus/common/expfmt" "go.uber.org/multierr" + "inference.networking.x-k8s.io/llm-instance-gateway/pkg/ext-proc/backend" klog "k8s.io/klog/v2" ) const ( - ActiveLoRAAdaptersMetricName = "vllm:info_active_adapters_info" + LoraRequestInfoMetricName = "vllm:lora_requests_info" + LoraRequestInfoRunningAdaptersMetricName = "running_lora_adapters" + LoraRequestInfoMaxAdaptersMetricName = "max_lora" // TODO: Replace these with the num_tokens_running/waiting below once we add those to the fork. RunningQueueSizeMetricName = "vllm:num_requests_running" WaitingQueueSizeMetricName = "vllm:num_requests_waiting" @@ -82,6 +84,9 @@ func promToPodMetrics(metricFamilies map[string]*dto.MetricFamily, existing *bac if err == nil { updated.KVCacheUsagePercent = cachePercent.GetGauge().GetValue() } + + loraMetrics, _, err := getLatestLoraMetric(metricFamilies) + errs = multierr.Append(errs, err) /* TODO: uncomment once this is available in vllm. kvCap, _, err := getGaugeLatestValue(metricFamilies, KvCacheMaxTokenCapacityMetricName) errs = multierr.Append(errs, err) @@ -90,35 +95,55 @@ func promToPodMetrics(metricFamilies map[string]*dto.MetricFamily, existing *bac } */ - // TODO(https://github.com/kubernetes-sigs/llm-instance-gateway/issues/22): Read from vLLM metrics once the is available. - updated.MaxActiveModels = 4 - - // Update active loras - mf, ok := metricFamilies[ActiveLoRAAdaptersMetricName] - if ok { - // IMPORTANT: replace the map entries instead of appending to it. + if loraMetrics != nil { updated.ActiveModels = make(map[string]int) - for _, metric := range mf.GetMetric() { - for _, label := range metric.GetLabel() { - if label.GetName() == "active_adapters" { - if label.GetValue() != "" { - adapterList := strings.Split(label.GetValue(), ",") - for _, adapter := range adapterList { - updated.ActiveModels[adapter] = 0 - } + for _, label := range loraMetrics.GetLabel() { + if label.GetName() == LoraRequestInfoRunningAdaptersMetricName { + if label.GetValue() != "" { + adapterList := strings.Split(label.GetValue(), ",") + for _, adapter := range adapterList { + updated.ActiveModels[adapter] = 0 + } + } + } + if label.GetName() == LoraRequestInfoMaxAdaptersMetricName { + if label.GetValue() != "" { + updated.MaxActiveModels, err = strconv.Atoi(label.GetValue()) + if err != nil { + errs = multierr.Append(errs, err) } } } } - } else { - klog.Warningf("metric family %q not found", ActiveLoRAAdaptersMetricName) - errs = multierr.Append(errs, fmt.Errorf("metric family %q not found", ActiveLoRAAdaptersMetricName)) + } return updated, errs } +// getLatestLoraMetric gets latest lora metric series in gauge metric family `vllm:lora_requests_info` +// reason its specially fetched is because each label key value pair permutation generates new series +// and only most recent is useful. The value of each series is the creation timestamp so we can +// retrieve the latest by sorting the value. +func getLatestLoraMetric(metricFamilies map[string]*dto.MetricFamily) (*dto.Metric, time.Time, error) { + loraRequests, ok := metricFamilies[LoraRequestInfoMetricName] + if !ok { + klog.Warningf("metric family %q not found", LoraRequestInfoMetricName) + return nil, time.Time{}, fmt.Errorf("metric family %q not found", LoraRequestInfoMetricName) + } + var latestTs float64 + var latest *dto.Metric + for _, m := range loraRequests.GetMetric() { + if m.GetGauge().GetValue() > latestTs { + latestTs = m.GetGauge().GetValue() + latest = m + } + } + return latest, time.Unix(0, int64(latestTs*1000)), nil +} + // getLatestMetric gets the latest metric of a family. This should be used to get the latest Gauge metric. +// Since vllm doesn't set the timestamp in metric, this metric essentially gets the first metric. func getLatestMetric(metricFamilies map[string]*dto.MetricFamily, metricName string) (*dto.Metric, time.Time, error) { mf, ok := metricFamilies[metricName] if !ok { diff --git a/pkg/ext-proc/backend/vllm/metrics_test.go b/pkg/ext-proc/backend/vllm/metrics_test.go new file mode 100644 index 00000000..f6ac403f --- /dev/null +++ b/pkg/ext-proc/backend/vllm/metrics_test.go @@ -0,0 +1,232 @@ +package vllm + +import ( + "fmt" + "testing" + + "inference.networking.x-k8s.io/llm-instance-gateway/pkg/ext-proc/backend" + + dto "github.com/prometheus/client_model/go" + "github.com/stretchr/testify/assert" + "google.golang.org/protobuf/proto" +) + +func TestPromToPodMetrics(t *testing.T) { + testCases := []struct { + name string + metricFamilies map[string]*dto.MetricFamily + expectedMetrics *backend.Metrics + expectedErr error + initialPodMetrics *backend.PodMetrics + }{ + { + name: "all metrics available", + metricFamilies: map[string]*dto.MetricFamily{ + RunningQueueSizeMetricName: { + Metric: []*dto.Metric{ + { + Gauge: &dto.Gauge{ + Value: proto.Float64(10), + }, + TimestampMs: proto.Int64(100), + }, + { + Gauge: &dto.Gauge{ + Value: proto.Float64(15), + }, + TimestampMs: proto.Int64(200), // This is the latest + }, + }, + }, + WaitingQueueSizeMetricName: { + Metric: []*dto.Metric{ + { + Gauge: &dto.Gauge{ + Value: proto.Float64(20), + }, + TimestampMs: proto.Int64(100), + }, + { + Gauge: &dto.Gauge{ + Value: proto.Float64(25), + }, + TimestampMs: proto.Int64(200), // This is the latest + }, + }, + }, + KVCacheUsagePercentMetricName: { + Metric: []*dto.Metric{ + { + Gauge: &dto.Gauge{ + Value: proto.Float64(0.8), + }, + TimestampMs: proto.Int64(100), + }, + { + Gauge: &dto.Gauge{ + Value: proto.Float64(0.9), + }, + TimestampMs: proto.Int64(200), // This is the latest + }, + }, + }, + LoraRequestInfoMetricName: { + Metric: []*dto.Metric{ + { + Label: []*dto.LabelPair{ + { + Name: proto.String(LoraRequestInfoRunningAdaptersMetricName), + Value: proto.String("lora3,lora4"), + }, + { + Name: proto.String(LoraRequestInfoMaxAdaptersMetricName), + Value: proto.String("2"), + }, + }, + Gauge: &dto.Gauge{ + Value: proto.Float64(100), + }, + }, + { + Label: []*dto.LabelPair{ + { + Name: proto.String(LoraRequestInfoRunningAdaptersMetricName), + Value: proto.String("lora2"), + }, + { + Name: proto.String(LoraRequestInfoMaxAdaptersMetricName), + Value: proto.String("2"), + }, + }, + Gauge: &dto.Gauge{ + Value: proto.Float64(90), + }, + }, + }, + }, + }, + expectedMetrics: &backend.Metrics{ + RunningQueueSize: 15, + WaitingQueueSize: 25, + KVCacheUsagePercent: 0.9, + ActiveModels: map[string]int{ + "lora3": 0, + "lora4": 0, + }, + MaxActiveModels: 2, + }, + initialPodMetrics: &backend.PodMetrics{}, + expectedErr: nil, + }, + { + name: "invalid max lora", + metricFamilies: map[string]*dto.MetricFamily{ + RunningQueueSizeMetricName: { + Metric: []*dto.Metric{ + { + Gauge: &dto.Gauge{ + Value: proto.Float64(10), + }, + TimestampMs: proto.Int64(100), + }, + { + Gauge: &dto.Gauge{ + Value: proto.Float64(15), + }, + TimestampMs: proto.Int64(200), // This is the latest + }, + }, + }, + WaitingQueueSizeMetricName: { + Metric: []*dto.Metric{ + { + Gauge: &dto.Gauge{ + Value: proto.Float64(20), + }, + TimestampMs: proto.Int64(100), + }, + { + Gauge: &dto.Gauge{ + Value: proto.Float64(25), + }, + TimestampMs: proto.Int64(200), // This is the latest + }, + }, + }, + KVCacheUsagePercentMetricName: { + Metric: []*dto.Metric{ + { + Gauge: &dto.Gauge{ + Value: proto.Float64(0.8), + }, + TimestampMs: proto.Int64(100), + }, + { + Gauge: &dto.Gauge{ + Value: proto.Float64(0.9), + }, + TimestampMs: proto.Int64(200), // This is the latest + }, + }, + }, + LoraRequestInfoMetricName: { + Metric: []*dto.Metric{ + { + Label: []*dto.LabelPair{ + { + Name: proto.String(LoraRequestInfoRunningAdaptersMetricName), + Value: proto.String("lora3,lora4"), + }, + { + Name: proto.String(LoraRequestInfoRunningAdaptersMetricName), + Value: proto.String("2a"), + }, + }, + Gauge: &dto.Gauge{ + Value: proto.Float64(100), + }, + }, + { + Label: []*dto.LabelPair{ + { + Name: proto.String(LoraRequestInfoRunningAdaptersMetricName), + Value: proto.String("lora2"), + }, + { + Name: proto.String(LoraRequestInfoMaxAdaptersMetricName), + Value: proto.String("2"), + }, + }, + Gauge: &dto.Gauge{ + Value: proto.Float64(90), + }, + }, + }, + }, + }, + expectedMetrics: &backend.Metrics{ + RunningQueueSize: 15, + WaitingQueueSize: 25, + KVCacheUsagePercent: 0.9, + ActiveModels: map[string]int{ + "lora3": 0, + "lora4": 0, + }, + MaxActiveModels: 0, + }, + initialPodMetrics: &backend.PodMetrics{}, + expectedErr: fmt.Errorf("strconv.Atoi: parsing '2a': invalid syntax"), + }, + } + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + updated, err := promToPodMetrics(tc.metricFamilies, tc.initialPodMetrics) + if tc.expectedErr != nil { + assert.Error(t, err) + } else { + assert.NoError(t, err) + assert.Equal(t, tc.expectedMetrics, &updated.Metrics) + } + }) + } +}