diff --git a/docs/proposals/003-model-server-protocol/README.md b/docs/proposals/003-model-server-protocol/README.md index 2ab557f7..02efbe5c 100644 --- a/docs/proposals/003-model-server-protocol/README.md +++ b/docs/proposals/003-model-server-protocol/README.md @@ -47,3 +47,4 @@ The model server MUST expose the following LoRA adapter metrics via the same Pro requested adapter. Example: `"max_lora": "8"`. * `running_lora_adapters`: A comma separated list of adapters that are currently loaded in GPU memory and ready to serve requests. Example: `"running_lora_adapters": "adapter1, adapter2"` + * `waiting_lora_adapters`: A comma separated list of adapters that are waiting to be served. Example: `"waiting_lora_adapters": "adapter1, adapter2"` diff --git a/pkg/epp/backend/vllm/metrics.go b/pkg/epp/backend/vllm/metrics.go index 4973c93e..5b36b930 100644 --- a/pkg/epp/backend/vllm/metrics.go +++ b/pkg/epp/backend/vllm/metrics.go @@ -34,9 +34,13 @@ import ( logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/logging" ) +// Metric names used in the vLLM metrics implementation. +// Refer to the protocol doc for more details: +// https://github.com/kubernetes-sigs/gateway-api-inference-extension/tree/main/docs/proposals/003-model-server-protocol const ( LoraRequestInfoMetricName = "vllm:lora_requests_info" LoraRequestInfoRunningAdaptersMetricName = "running_lora_adapters" + LoraRequestInfoWaitingAdaptersMetricName = "waiting_lora_adapters" LoraRequestInfoMaxAdaptersMetricName = "max_lora" // TODO: Replace these with the num_tokens_running/waiting below once we add those to the fork. RunningQueueSizeMetricName = "vllm:num_requests_running" @@ -45,8 +49,7 @@ const ( RunningQueueSizeMetricName = "vllm:num_tokens_running" WaitingQueueSizeMetricName = "vllm:num_tokens_waiting" */ - KVCacheUsagePercentMetricName = "vllm:gpu_cache_usage_perc" - KvCacheMaxTokenCapacityMetricName = "vllm:gpu_cache_max_token_capacity" + KVCacheUsagePercentMetricName = "vllm:gpu_cache_usage_perc" ) type PodMetricsClientImpl struct{} @@ -138,6 +141,14 @@ func promToPodMetrics( } } } + if label.GetName() == LoraRequestInfoWaitingAdaptersMetricName { + if label.GetValue() != "" { + adapterList := strings.Split(label.GetValue(), ",") + for _, adapter := range adapterList { + updated.ActiveModels[adapter] = 0 + } + } + } if label.GetName() == LoraRequestInfoMaxAdaptersMetricName { if label.GetValue() != "" { updated.MaxActiveModels, err = strconv.Atoi(label.GetValue()) @@ -163,14 +174,42 @@ func getLatestLoraMetric(logger logr.Logger, metricFamilies map[string]*dto.Metr logger.V(logutil.DEFAULT).Error(nil, "Metric family not found", "name", LoraRequestInfoMetricName) return nil, time.Time{}, fmt.Errorf("metric family %q not found", LoraRequestInfoMetricName) } - var latestTs float64 + var latest *dto.Metric + var latestTs float64 + + // Iterate over all metrics in the family. for _, m := range loraRequests.GetMetric() { + var running, waiting string + // Read the label values for running and waiting adapters. + for _, lp := range m.GetLabel() { + switch lp.GetName() { + case LoraRequestInfoRunningAdaptersMetricName: + running = lp.GetValue() + case LoraRequestInfoWaitingAdaptersMetricName: + waiting = lp.GetValue() + } + } + + // Ignore metrics with both labels empty. This happens when there are no running or waiting requests on + // the server, in this case it is best to use the last set of active adapters. + if running == "" && waiting == "" { + continue + } + + // Select the metric with the latest creation timestamp. if m.GetGauge().GetValue() > latestTs { latestTs = m.GetGauge().GetValue() latest = m } } + + if latest == nil { + logger.V(logutil.TRACE).Info("Metric value Empty", "value", latest, "metric", LoraRequestInfoMetricName) + return nil, time.Time{}, nil + } + + // Convert the gauge value (creation timestamp) to time.Time. return latest, time.Unix(0, int64(latestTs*1000)), nil } diff --git a/pkg/epp/scheduling/filter.go b/pkg/epp/scheduling/filter.go index b7881468..d3c22673 100644 --- a/pkg/epp/scheduling/filter.go +++ b/pkg/epp/scheduling/filter.go @@ -19,6 +19,8 @@ package scheduling import ( "errors" "math" + "math/rand" + "time" "github.com/go-logr/logr" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datastore" @@ -183,18 +185,59 @@ func lowLoRACostPredicate(req *LLMRequest, pod *datastore.PodMetrics) bool { return ok || len(pod.ActiveModels) < pod.MaxActiveModels } -// loRAAffinityPredicate is a filter function to check whether a pod has affinity to the lora requested. -func loRAAffinityPredicate(req *LLMRequest, pod *datastore.PodMetrics) bool { - _, ok := pod.ActiveModels[req.ResolvedTargetModel] - return ok -} +// loRASoftAffinityPredicate implements a pod selection strategy that prioritizes pods +// with existing LoRA model affinity while allowing for load balancing through randomization. +// +// The function works by: +// 1. Separating pods into two groups: those with target model affinity and those with available capacity +// 2. Using a probability threshold to sometimes select from non-affinity pods to enable load balancing +// 3. Falling back to whatever group has pods if one group is empty +// +// Parameters: +// - logger: Logger interface for diagnostic output +// - req: LLM request containing the resolved target model +// - pods: Slice of pod metrics to filter +// +// Returns: +// - Filtered slice of pod metrics based on affinity and availability +// - Error if any issues occur during filtering +func loRASoftAffinityFilter(logger logr.Logger, req *LLMRequest, pods []*datastore.PodMetrics) ([]*datastore.PodMetrics, error) { + + // Pre-allocate slices with estimated capacity + filtered_affinity := make([]*datastore.PodMetrics, 0, len(pods)) + filtered_available := make([]*datastore.PodMetrics, 0, len(pods)) + + // Categorize pods based on affinity and availability + for _, pod := range pods { + + if _, exists := pod.ActiveModels[req.ResolvedTargetModel]; exists { + filtered_affinity = append(filtered_affinity, pod) + } else if len(pod.ActiveModels) < pod.MaxActiveModels { + filtered_available = append(filtered_available, pod) + } + } + + // Use crypto/rand for better randomization in production environments + randSource := rand.NewSource(time.Now().UnixNano()) + randGen := rand.New(randSource) + + // If both groups have pods, use probability to select which group to return + if len(filtered_affinity) > 0 && len(filtered_available) > 0 { + if randGen.Float64() < loraAffinityThreshold { + return filtered_affinity, nil + } + return filtered_available, nil + } + + // Return whichever group has pods + if len(filtered_affinity) > 0 { + return filtered_affinity, nil + } -// canAcceptNewLoraPredicate is a filter function to check whether a pod has room to load the adapter. -func canAcceptNewLoraPredicate(req *LLMRequest, pod *datastore.PodMetrics) bool { - return len(pod.ActiveModels) < pod.MaxActiveModels + return filtered_available, nil } -func criticalRequestPredicate(req *LLMRequest, pod *datastore.PodMetrics) bool { +func criticalRequestPredicate(req *LLMRequest, _ *datastore.PodMetrics) bool { return req.Critical } diff --git a/pkg/epp/scheduling/filter_test.go b/pkg/epp/scheduling/filter_test.go index ac765b78..f76cece9 100644 --- a/pkg/epp/scheduling/filter_test.go +++ b/pkg/epp/scheduling/filter_test.go @@ -429,3 +429,93 @@ func TestFilterFunc(t *testing.T) { }) } } + +// TestLoRASoftAffinityDistribution tests that the loRASoftAffinityFilter function +// properly distributes requests according to the loraAffinityThreshold +func TestLoRASoftAffinityDistribution(t *testing.T) { + logger := logutil.NewTestLogger() + + const ( + testModelName = "test-model" + testAffinityModel = "test-affinity-model" + numIterations = 10000 + tolerancePercent = 5.0 // Allow 5% tolerance from expected distribution + ) + + // Create a test request and pods + req := &LLMRequest{ + Model: testAffinityModel, + ResolvedTargetModel: testAffinityModel, + } + + // Test setup: One affinity pod and one available pod + pods := []*datastore.PodMetrics{ + { + Pod: datastore.Pod{NamespacedName: types.NamespacedName{Name: "affinity-pod"}}, + Metrics: datastore.Metrics{ + MaxActiveModels: 2, + ActiveModels: map[string]int{ + testAffinityModel: 1, + }, + }, + }, + { + Pod: datastore.Pod{NamespacedName: types.NamespacedName{Name: "available-pod"}}, + Metrics: datastore.Metrics{ + MaxActiveModels: 2, + ActiveModels: map[string]int{}, + }, + }, + } + + // Run the filter function multiple times and count the results + affinityCount := 0 + availableCount := 0 + + // Use the actual loraAffinityThreshold as defined in the original code + // This test should work with whatever value is set there + expectedAffinityPercent := loraAffinityThreshold * 100 + for i := 0; i < numIterations; i++ { + result, err := loRASoftAffinityFilter(logger, req, pods) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + // Check which type of pod was returned + if len(result) != 1 { + t.Fatalf("Expected exactly one pod in result, got %d", len(result)) + } + + // Identify if the returned pod is the affinity pod or available pod + if _, exists := result[0].ActiveModels[testAffinityModel]; exists { + affinityCount++ + } else { + availableCount++ + } + } + + // Calculate the actual percentages + actualAffinityPercent := float64(affinityCount) / float64(numIterations) * 100 + actualAvailablePercent := float64(availableCount) / float64(numIterations) * 100 + + // Check if the distribution matches expected threshold within tolerance + affinityLowerBound := expectedAffinityPercent - tolerancePercent + affinityUpperBound := expectedAffinityPercent + tolerancePercent + + availableLowerBound := actualAvailablePercent - tolerancePercent + availableUpperBound := actualAvailablePercent + tolerancePercent + + t.Logf("Distribution results over %d iterations:", numIterations) + t.Logf("Expected affinity percent: %.2f%% (threshold: %.2f)", expectedAffinityPercent, loraAffinityThreshold) + t.Logf("Actual affinity percent: %.2f%% (%d out of %d)", actualAffinityPercent, affinityCount, numIterations) + t.Logf("Actual available percent: %.2f%% (%d out of %d)", actualAvailablePercent, availableCount, numIterations) + + if actualAffinityPercent < affinityLowerBound || actualAffinityPercent > affinityUpperBound { + t.Errorf("Affinity selection percent %.2f%% outside expected range %.2f%% to %.2f%%", + actualAffinityPercent, affinityLowerBound, affinityUpperBound) + } + if actualAvailablePercent < availableLowerBound || actualAvailablePercent > availableUpperBound { + t.Errorf("Availability selection percent %.2f%% outside expected range %.2f%% to %.2f%%", + actualAvailablePercent, availableLowerBound, availableUpperBound) + } +} diff --git a/pkg/epp/scheduling/scheduler.go b/pkg/epp/scheduling/scheduler.go index a969948e..bdddd972 100644 --- a/pkg/epp/scheduling/scheduler.go +++ b/pkg/epp/scheduling/scheduler.go @@ -36,8 +36,11 @@ const ( queueThresholdCritical = 5 // TODO(https://github.com/kubernetes-sigs/gateway-api-inference-extension/issues/16) Make this configurable. // the threshold for queued requests to be considered low below which we can prioritize LoRA affinity. - // The value of 50 is arrived heuristicically based on experiments. - queueingThresholdLoRA = 50 + // The value of 128 is arrived heuristicically based on experiments. + queueingThresholdLoRA = 128 + // TODO(https://github.com/kubernetes-sigs/gateway-api-inference-extension/issues/16) Make this configurable. + // loraAffinityThreshold indicates the probability with which we prefer a pod with LoRA affinity over a pod without but having room to fit more LoRA adapters. + loraAffinityThreshold = 0.999 ) var ( @@ -54,7 +57,7 @@ var ( filter: leastQueuingFilterFunc, nextOnSuccessOrFailure: &filter{ name: "low cost LoRA", - filter: toFilterFunc(lowLoRACostPredicate), + filter: loRASoftAffinityFilter, nextOnSuccessOrFailure: &filter{ name: "least KV cache percent", filter: leastKVCacheFilterFunc, @@ -76,14 +79,9 @@ var ( name: "low queueing filter", filter: toFilterFunc((lowQueueingPodPredicate)), nextOnSuccess: &filter{ - name: "affinity LoRA", - filter: toFilterFunc(loRAAffinityPredicate), - nextOnSuccess: queueAndKVCacheFilter, - nextOnFailure: &filter{ - name: "can accept LoRA Adapter", - filter: toFilterFunc(canAcceptNewLoraPredicate), - nextOnSuccessOrFailure: queueAndKVCacheFilter, - }, + name: "affinity LoRA", + filter: loRASoftAffinityFilter, + nextOnSuccessOrFailure: queueAndKVCacheFilter, }, nextOnFailure: queueLoRAAndKVCacheFilter, } diff --git a/test/integration/hermetic_test.go b/test/integration/hermetic_test.go index 7755795b..cc836504 100644 --- a/test/integration/hermetic_test.go +++ b/test/integration/hermetic_test.go @@ -158,6 +158,7 @@ func TestKubeInferenceModelRequest(t *testing.T) { KVCacheUsagePercent: 0.2, ActiveModels: map[string]int{ "foo": 1, + "bar": 1, }, }), }, @@ -200,7 +201,7 @@ func TestKubeInferenceModelRequest(t *testing.T) { }, }), extprocutils.FakePodMetrics(1, datastore.Metrics{ - WaitingQueueSize: 50, + WaitingQueueSize: 200, KVCacheUsagePercent: 0.1, ActiveModels: map[string]int{ "foo": 1,