Skip to content

Commit cac37ae

Browse files
committed
multi lora changes for better load balancing
1 parent 2612ead commit cac37ae

File tree

4 files changed

+82
-66
lines changed

4 files changed

+82
-66
lines changed

config/manifests/vllm/deployment.yaml

+1-16
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ kind: Deployment
33
metadata:
44
name: vllm-llama2-7b-pool
55
spec:
6-
replicas: 2
6+
replicas: 6
77
selector:
88
matchLabels:
99
app: vllm-llama2-7b-pool
@@ -38,19 +38,6 @@ spec:
3838
- "--lora-modules"
3939
- '{"name": "tweet-summary-0", "path": "vineetsharma/qlora-adapter-Llama-2-7b-hf-TweetSumm", "base_model_name": "llama-2"}'
4040
- '{"name": "tweet-summary-1", "path": "vineetsharma/qlora-adapter-Llama-2-7b-hf-TweetSumm", "base_model_name": "llama-2"}'
41-
- '{"name": "tweet-summary-2", "path": "vineetsharma/qlora-adapter-Llama-2-7b-hf-TweetSumm", "base_model_name": "llama-2"}'
42-
- '{"name": "tweet-summary-3", "path": "vineetsharma/qlora-adapter-Llama-2-7b-hf-TweetSumm", "base_model_name": "llama-2"}'
43-
- '{"name": "tweet-summary-4", "path": "vineetsharma/qlora-adapter-Llama-2-7b-hf-TweetSumm", "base_model_name": "llama-2"}'
44-
- '{"name": "tweet-summary-5", "path": "vineetsharma/qlora-adapter-Llama-2-7b-hf-TweetSumm", "base_model_name": "llama-2"}'
45-
- '{"name": "tweet-summary-6", "path": "vineetsharma/qlora-adapter-Llama-2-7b-hf-TweetSumm", "base_model_name": "llama-2"}'
46-
- '{"name": "tweet-summary-7", "path": "vineetsharma/qlora-adapter-Llama-2-7b-hf-TweetSumm", "base_model_name": "llama-2"}'
47-
- '{"name": "tweet-summary-8", "path": "vineetsharma/qlora-adapter-Llama-2-7b-hf-TweetSumm", "base_model_name": "llama-2"}'
48-
- '{"name": "tweet-summary-9", "path": "vineetsharma/qlora-adapter-Llama-2-7b-hf-TweetSumm", "base_model_name": "llama-2"}'
49-
- '{"name": "tweet-summary-10", "path": "vineetsharma/qlora-adapter-Llama-2-7b-hf-TweetSumm", "base_model_name": "llama-2"}'
50-
- '{"name": "tweet-summary-11", "path": "vineetsharma/qlora-adapter-Llama-2-7b-hf-TweetSumm", "base_model_name": "llama-2"}'
51-
- '{"name": "tweet-summary-12", "path": "vineetsharma/qlora-adapter-Llama-2-7b-hf-TweetSumm", "base_model_name": "llama-2"}'
52-
- '{"name": "tweet-summary-13", "path": "vineetsharma/qlora-adapter-Llama-2-7b-hf-TweetSumm", "base_model_name": "llama-2"}'
53-
- '{"name": "tweet-summary-14", "path": "vineetsharma/qlora-adapter-Llama-2-7b-hf-TweetSumm", "base_model_name": "llama-2"}'
5441
env:
5542
- name: VLLM_USE_V1
5643
value: "1"
@@ -63,8 +50,6 @@ spec:
6350
key: token
6451
- name: VLLM_ALLOW_RUNTIME_LORA_UPDATING
6552
value: "true"
66-
- name: VLLM_ALLOW_RUNTIME_LORA_UPDATING
67-
value: "true"
6853
ports:
6954
- containerPort: 8000
7055
name: http

pkg/epp/backend/vllm/metrics.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -168,7 +168,7 @@ func getLatestLoraMetric(logger logr.Logger, metricFamilies map[string]*dto.Metr
168168
loraRequests, ok := metricFamilies[LoraRequestInfoMetricName]
169169
if !ok {
170170
logger.V(logutil.DEFAULT).Error(nil, "Metric family not found", "name", LoraRequestInfoMetricName)
171-
return nil, time.Time{}, fmt.Errorf("metric family %q not found", LoraRequestInfoMetricName)
171+
return nil, time.Time{}, nil
172172
}
173173

174174
var latest *dto.Metric

pkg/epp/scheduling/filter.go

+64-34
Original file line numberDiff line numberDiff line change
@@ -19,22 +19,19 @@ package scheduling
1919
import (
2020
"errors"
2121
"math"
22+
"math/rand"
23+
"time"
2224

2325
"github.com/go-logr/logr"
24-
klog "k8s.io/klog/v2"
2526
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datastore"
26-
"sigs.k8s.io/gateway-api-inference-extension/pkg/ext-proc/backend"
27+
logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/logging"
2728
)
2829

2930
type Filter interface {
3031
Name() string
3132
Filter(logger logr.Logger, req *LLMRequest, pods []*datastore.PodMetrics) ([]*datastore.PodMetrics, error)
3233
}
3334

34-
const (
35-
maxLoRACost = 4
36-
)
37-
3835
// filter applies current filterFunc, and then recursively applies next filters depending success or
3936
// failure of the current filterFunc.
4037
// It can be used to construct a flow chart algorithm.
@@ -62,8 +59,9 @@ func (f *filter) Name() string {
6259
return f.name
6360
}
6461

65-
func (f *filter) Filter(req *LLMRequest, pods []*backend.PodMetrics) ([]*backend.PodMetrics, error) {
66-
klog.InfoS("Running a filter", "name", f.Name(), "request", req, "podCount", len(pods))
62+
func (f *filter) Filter(logger logr.Logger, req *LLMRequest, pods []*datastore.PodMetrics) ([]*datastore.PodMetrics, error) {
63+
loggerTrace := logger.V(logutil.TRACE)
64+
loggerTrace.Info("Running a filter", "name", f.Name(), "podCount", len(pods))
6765

6866
filtered, err := f.filter(logger, req, pods)
6967

@@ -76,8 +74,7 @@ func (f *filter) Filter(req *LLMRequest, pods []*backend.PodMetrics) ([]*backend
7674
if f.nextOnSuccess != nil {
7775
next = f.nextOnSuccess
7876
}
79-
klog.InfoS("Filter succeeded", "filter", f.Name(), "next", next.Name(), "filteredPodCount", len(filtered))
80-
77+
loggerTrace.Info("Filter succeeded", "filter", f.Name(), "next", next.Name(), "filteredPodCount", len(filtered))
8178
// On success, pass the filtered result to the next filter.
8279
return next.Filter(logger, req, filtered)
8380
} else {
@@ -88,7 +85,7 @@ func (f *filter) Filter(req *LLMRequest, pods []*backend.PodMetrics) ([]*backend
8885
if f.nextOnFailure != nil {
8986
next = f.nextOnFailure
9087
}
91-
klog.InfoS("Filter failed", "filter", f.Name(), "next", next.Name())
88+
loggerTrace.Info("Filter failed", "filter", f.Name(), "next", next.Name())
9289
// On failure, pass the initial set of pods to the next filter.
9390
return next.Filter(logger, req, pods)
9491
}
@@ -175,22 +172,6 @@ func leastKVCacheFilterFunc(logger logr.Logger, req *LLMRequest, pods []*datasto
175172
return filtered, nil
176173
}
177174

178-
func minLoRACostFilterFunc(req *LLMRequest, pods []*backend.PodMetrics) ([]*backend.PodMetrics, error) {
179-
180-
var min int = math.MaxInt
181-
filtered := []*backend.PodMetrics{}
182-
183-
for _, pod := range pods {
184-
if len(pod.ActiveModels) < min {
185-
min = len(pod.ActiveModels)
186-
filtered = []*backend.PodMetrics{pod}
187-
} else if len(pod.ActiveModels) == min {
188-
filtered = append(filtered, pod)
189-
}
190-
}
191-
return filtered, nil
192-
}
193-
194175
// podPredicate is a filter function to check whether a pod is desired.
195176
type podPredicate func(req *LLMRequest, pod *datastore.PodMetrics) bool
196177

@@ -201,18 +182,67 @@ type podPredicate func(req *LLMRequest, pod *datastore.PodMetrics) bool
201182
// where # of lora slots > # of lora adapters.
202183
func lowLoRACostPredicate(req *LLMRequest, pod *datastore.PodMetrics) bool {
203184
_, ok := pod.ActiveModels[req.ResolvedTargetModel]
204-
return ok || len(pod.ActiveModels) < maxLoRACost
185+
return ok || len(pod.ActiveModels) < pod.MaxActiveModels
205186
}
206187

207-
// loRAAffinityPredicate is a filter function to check whether a pod has affinity to the lora requested.
208-
func loRAAffinityPredicate(req *LLMRequest, pod *datastore.PodMetrics) bool {
209-
_, ok := pod.ActiveModels[req.ResolvedTargetModel]
210-
return ok
188+
// loRASoftAffinityPredicate implements a pod selection strategy that prioritizes pods
189+
// with existing LoRA model affinity while allowing for load balancing through randomization.
190+
//
191+
// The function works by:
192+
// 1. Separating pods into two groups: those with target model affinity and those with available capacity
193+
// 2. Using a probability threshold to sometimes select from non-affinity pods to enable load balancing
194+
// 3. Falling back to whatever group has pods if one group is empty
195+
//
196+
// Parameters:
197+
// - logger: Logger interface for diagnostic output
198+
// - req: LLM request containing the resolved target model
199+
// - pods: Slice of pod metrics to filter
200+
//
201+
// Returns:
202+
// - Filtered slice of pod metrics based on affinity and availability
203+
// - Error if any issues occur during filtering
204+
func loRASoftAffinityPredicate(logger logr.Logger, req *LLMRequest, pods []*datastore.PodMetrics) ([]*datastore.PodMetrics, error) {
205+
206+
// Pre-allocate slices with estimated capacity
207+
filtered_affinity := make([]*datastore.PodMetrics, 0, len(pods))
208+
filtered_available := make([]*datastore.PodMetrics, 0, len(pods))
209+
210+
// Categorize pods based on affinity and availability
211+
for _, pod := range pods {
212+
if pod == nil {
213+
continue
214+
}
215+
216+
if _, exists := pod.ActiveModels[req.ResolvedTargetModel]; exists {
217+
filtered_affinity = append(filtered_affinity, pod)
218+
} else if len(pod.ActiveModels) < pod.MaxActiveModels {
219+
filtered_available = append(filtered_available, pod)
220+
}
221+
}
222+
223+
// Use crypto/rand for better randomization in production environments
224+
randSource := rand.NewSource(time.Now().UnixNano())
225+
randGen := rand.New(randSource)
226+
227+
// If both groups have pods, use probability to select which group to return
228+
if len(filtered_affinity) > 0 && len(filtered_available) > 0 {
229+
if randGen.Float64() < loraAffinityThreshold {
230+
return filtered_affinity, nil
231+
}
232+
return filtered_available, nil
233+
}
234+
235+
// Return whichever group has pods
236+
if len(filtered_affinity) > 0 {
237+
return filtered_affinity, nil
238+
}
239+
240+
return filtered_available, nil
211241
}
212242

213243
// canAcceptNewLoraPredicate is a filter function to check whether a pod has room to load the adapter.
214-
func canAcceptNewLoraPredicate(req *LLMRequest, pod *backend.PodMetrics) bool {
215-
return len(pod.ActiveModels) < maxLoRACost
244+
func canAcceptNewLoraPredicate(req *LLMRequest, pod *datastore.PodMetrics) bool {
245+
return len(pod.ActiveModels) < pod.MaxActiveModels
216246
}
217247

218248
func criticalRequestPredicate(req *LLMRequest, pod *datastore.PodMetrics) bool {

pkg/epp/scheduling/scheduler.go

+16-15
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,12 @@ limitations under the License.
1818
package scheduling
1919

2020
import (
21+
"context"
2122
"fmt"
2223
"math/rand"
2324

2425
"github.com/go-logr/logr"
25-
klog "k8s.io/klog/v2"
26+
"sigs.k8s.io/controller-runtime/pkg/log"
2627
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datastore"
2728
errutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/error"
2829
logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/logging"
@@ -35,8 +36,11 @@ const (
3536
queueThresholdCritical = 5
3637
// TODO(https://github.com/kubernetes-sigs/gateway-api-inference-extension/issues/16) Make this configurable.
3738
// the threshold for queued requests to be considered low below which we can prioritize LoRA affinity.
38-
// The value of 50 is arrived heuristicically based on experiments.
39+
// The value of 128 is arrived heuristicically based on experiments.
3940
queueingThresholdLoRA = 128
41+
// TODO(https://github.com/kubernetes-sigs/gateway-api-inference-extension/issues/16) Make this configurable.
42+
// loraAffinityThreshold indicates the probability with which we prefer a pod with LoRA affinity over a pod without but having room to fit more LoRA adapters.
43+
loraAffinityThreshold = 0.999
4044
)
4145

4246
var (
@@ -53,7 +57,7 @@ var (
5357
filter: leastQueuingFilterFunc,
5458
nextOnSuccessOrFailure: &filter{
5559
name: "low cost LoRA",
56-
filter: minLoRACostFilterFunc,
60+
filter: loRASoftAffinityPredicate,
5761
nextOnSuccessOrFailure: &filter{
5862
name: "least KV cache percent",
5963
filter: leastKVCacheFilterFunc,
@@ -75,14 +79,9 @@ var (
7579
name: "low queueing filter",
7680
filter: toFilterFunc((lowQueueingPodPredicate)),
7781
nextOnSuccess: &filter{
78-
name: "affinity LoRA",
79-
filter: toFilterFunc(loRAAffinityPredicate),
80-
nextOnSuccess: queueAndKVCacheFilter,
81-
nextOnFailure: &filter{
82-
name: "can accept LoRA Adapter",
83-
filter: minLoRACostFilterFunc,
84-
nextOnSuccessOrFailure: queueAndKVCacheFilter,
85-
},
82+
name: "affinity LoRA",
83+
filter: loRASoftAffinityPredicate,
84+
nextOnSuccessOrFailure: queueAndKVCacheFilter,
8685
},
8786
nextOnFailure: queueLoRAAndKVCacheFilter,
8887
}
@@ -121,14 +120,16 @@ type Scheduler struct {
121120
}
122121

123122
// Schedule finds the target pod based on metrics and the requested lora adapter.
124-
func (s *Scheduler) Schedule(req *LLMRequest) (targetPod backend.Pod, err error) {
125-
klog.InfoS("Scheduling a request", "request", req, "metrics", s.podMetricsProvider.AllPodMetrics())
126-
pods, err := s.filter.Filter(req, s.podMetricsProvider.AllPodMetrics())
123+
func (s *Scheduler) Schedule(ctx context.Context, req *LLMRequest) (targetPod datastore.PodMetrics, err error) {
124+
logger := log.FromContext(ctx).WithValues("request", req)
125+
podMetrics := s.datastore.PodGetAll()
126+
logger.V(logutil.VERBOSE).Info("Scheduling a request", "metrics", podMetrics)
127+
pods, err := s.filter.Filter(logger, req, podMetrics)
127128
if err != nil || len(pods) == 0 {
128129
return datastore.PodMetrics{}, fmt.Errorf(
129130
"failed to apply filter, resulted %v pods, this should never happen: %w", len(pods), err)
130131
}
131-
klog.InfoS("Selecting a random pod from the candidates", "candidatePods", pods)
132+
logger.V(logutil.VERBOSE).Info("Selecting a random pod from the candidates", "candidatePods", pods)
132133
i := rand.Intn(len(pods))
133134
return *pods[i], nil
134135
}

0 commit comments

Comments
 (0)