Skip to content

Commit f272cc2

Browse files
committed
Add priority based scheduling
1 parent fcad109 commit f272cc2

File tree

8 files changed

+150
-32
lines changed

8 files changed

+150
-32
lines changed

pkg/ext-proc/backend/metrics.go

+5-2
Original file line numberDiff line numberDiff line change
@@ -89,18 +89,21 @@ func promToPodMetrics(metricFamilies map[string]*dto.MetricFamily, existing *Pod
8989
}
9090
*/
9191

92+
// TODO: Read from vLLM metrics once the is available.
93+
updated.MaxActiveModels = 4
94+
9295
// Update active loras
9396
mf, ok := metricFamilies[ActiveLoRAAdaptersMetricName]
9497
if ok {
9598
// IMPORTANT: replace the map entries instead of appending to it.
96-
updated.CachedModels = make(map[string]int)
99+
updated.ActiveModels = make(map[string]int)
97100
for _, metric := range mf.GetMetric() {
98101
for _, label := range metric.GetLabel() {
99102
if label.GetName() == "active_adapters" {
100103
if label.GetValue() != "" {
101104
adapterList := strings.Split(label.GetValue(), ",")
102105
for _, adapter := range adapterList {
103-
updated.CachedModels[adapter] = 0
106+
updated.ActiveModels[adapter] = 0
104107
}
105108
}
106109
}

pkg/ext-proc/backend/provider.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,7 @@ func (p *Provider) refreshPodsOnce() error {
113113
new := &PodMetrics{
114114
Pod: pod,
115115
Metrics: Metrics{
116-
CachedModels: make(map[string]int),
116+
ActiveModels: make(map[string]int),
117117
},
118118
}
119119
p.podMetrics.Store(pod, new)

pkg/ext-proc/backend/types.go

+8-6
Original file line numberDiff line numberDiff line change
@@ -12,12 +12,14 @@ type Pod struct {
1212
}
1313

1414
func (p Pod) String() string {
15-
return p.Namespace + "." + p.Name
15+
return p.Namespace + "/" + p.Name
1616
}
1717

1818
type Metrics struct {
19-
// CachedModels is a set of models(including LoRA adapters) that are currently cached to GPU.
20-
CachedModels map[string]int
19+
// ActiveModels is a set of models(including LoRA adapters) that are currently cached to GPU.
20+
ActiveModels map[string]int
21+
// MaxActiveModels is the maximum number of models that can be loaded to GPU.
22+
MaxActiveModels int
2123
RunningQueueSize int
2224
WaitingQueueSize int
2325
KVCacheUsagePercent float64
@@ -34,14 +36,14 @@ func (pm *PodMetrics) String() string {
3436
}
3537

3638
func (pm *PodMetrics) Clone() *PodMetrics {
37-
cm := make(map[string]int, len(pm.CachedModels))
38-
for k, v := range pm.CachedModels {
39+
cm := make(map[string]int, len(pm.ActiveModels))
40+
for k, v := range pm.ActiveModels {
3941
cm[k] = v
4042
}
4143
clone := &PodMetrics{
4244
Pod: pm.Pod,
4345
Metrics: Metrics{
44-
CachedModels: cm,
46+
ActiveModels: cm,
4547
RunningQueueSize: pm.RunningQueueSize,
4648
WaitingQueueSize: pm.WaitingQueueSize,
4749
KVCacheUsagePercent: pm.KVCacheUsagePercent,

pkg/ext-proc/handlers/request.go

+3-1
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,8 @@ func (s *Server) HandleRequestBody(reqCtx *RequestContext, req *extProcPb.Proces
3838
// TODO: Once the API is approved, read the "LLMUseCase" configuration and apply traffic split.
3939
TargetModels: map[string]int{model: 100},
4040
ResolvedTargetModel: model,
41+
// TODO: Read from LLMService CRD.
42+
Critical: true,
4143
}
4244

4345
// Update target models in the body.
@@ -51,7 +53,7 @@ func (s *Server) HandleRequestBody(reqCtx *RequestContext, req *extProcPb.Proces
5153

5254
targetPod, err := s.scheduler.Schedule(llmReq)
5355
if err != nil {
54-
return nil, fmt.Errorf("failed to find target pod: %v", err)
56+
return nil, fmt.Errorf("failed to find target pod: %w", err)
5557
}
5658
klog.V(3).Infof("Selected target model %v in target pod: %v\n", llmReq.ResolvedTargetModel, targetPod)
5759

pkg/ext-proc/handlers/server.go

+16-2
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"io"
55

66
extProcPb "github.com/envoyproxy/go-control-plane/envoy/service/ext_proc/v3"
7+
envoyTypePb "github.com/envoyproxy/go-control-plane/envoy/type/v3"
78
"google.golang.org/grpc/codes"
89
"google.golang.org/grpc/status"
910
klog "k8s.io/klog/v2"
@@ -83,13 +84,26 @@ func (s *Server) Process(srv extProcPb.ExternalProcessor_ProcessServer) error {
8384

8485
if err != nil {
8586
klog.Errorf("failed to process request: %v", err)
86-
return status.Errorf(codes.Unknown, "failed to handle request: %v", err)
87+
switch status.Code(err) {
88+
case codes.ResourceExhausted:
89+
resp = &extProcPb.ProcessingResponse{
90+
Response: &extProcPb.ProcessingResponse_ImmediateResponse{
91+
ImmediateResponse: &extProcPb.ImmediateResponse{
92+
Status: &envoyTypePb.HttpStatus{
93+
Code: envoyTypePb.StatusCode_TooManyRequests,
94+
},
95+
},
96+
},
97+
}
98+
default:
99+
return status.Errorf(status.Code(err), "failed to handle request: %w", err)
100+
}
87101
}
88102

89103
klog.V(3).Infof("response: %v", resp)
90104
if err := srv.Send(resp); err != nil {
91105
klog.Errorf("send error %v", err)
92-
return status.Errorf(codes.Unknown, "failed to send response back to Envoy: %v", err)
106+
return status.Errorf(codes.Unknown, "failed to send response back to Envoy: %w", err)
93107
}
94108
}
95109
}

pkg/ext-proc/scheduling/filter.go

+62-17
Original file line numberDiff line numberDiff line change
@@ -42,30 +42,34 @@ func (f *filter) Name() string {
4242
}
4343

4444
func (f *filter) Filter(b *LLMRequest, pods []*backend.PodMetrics) ([]*backend.PodMetrics, error) {
45-
if f == nil {
46-
klog.V(3).Infof("Running nil filter, returning all input pods by default")
47-
return pods, nil
48-
}
4945
klog.V(3).Infof("Running filter %q on request %v with %v pods", f.name, b, len(pods))
5046

5147
filtered, err := f.filter(b, pods)
5248

5349
next := f.nextOnSuccessOrFailure
5450
if err == nil {
55-
klog.V(3).Infof("onSuccess %v -> %v, filtered: %v", f.name, next.Name(), len(filtered))
51+
if f.nextOnSuccess == nil && f.nextOnSuccessOrFailure == nil {
52+
// No succeeding filters to run, return.
53+
return filtered, err
54+
}
5655
if f.nextOnSuccess != nil {
5756
next = f.nextOnSuccess
5857
}
58+
klog.V(3).Infof("onSuccess %q -> %q, filtered: %v", f.name, next.Name(), len(filtered))
5959
// On success, pass the filtered result to the next filter.
6060
return next.Filter(b, filtered)
61+
} else {
62+
if f.nextOnFailure == nil && f.nextOnSuccessOrFailure == nil {
63+
// No succeeding filters to run, return.
64+
return filtered, err
65+
}
66+
if f.nextOnFailure != nil {
67+
next = f.nextOnFailure
68+
}
69+
klog.V(3).Infof("onFailure %q -> %q", f.name, next.Name())
70+
// On failure, pass the initial set of pods to the next filter.
71+
return next.Filter(b, pods)
6172
}
62-
63-
klog.V(3).Infof("onFailure %v -> %v", f.name, next.Name())
64-
if f.nextOnFailure != nil {
65-
next = f.nextOnFailure
66-
}
67-
// On failure, pass the initial set of pods to the next filter.
68-
return next.Filter(b, pods)
6973
}
7074

7175
// filterFunc filters a set of input pods to a subset.
@@ -125,7 +129,7 @@ func leastQueuingFilterFunc(b *LLMRequest, pods []*backend.PodMetrics) ([]*backe
125129
// TODO: Compare this strategy with other strategies such as top K.
126130
func leastKVCacheFilterFunc(b *LLMRequest, pods []*backend.PodMetrics) ([]*backend.PodMetrics, error) {
127131
min := math.MaxFloat64
128-
max := math.SmallestNonzeroFloat64
132+
var max float64 = 0
129133
filtered := []*backend.PodMetrics{}
130134

131135
for _, pod := range pods {
@@ -145,11 +149,52 @@ func leastKVCacheFilterFunc(b *LLMRequest, pods []*backend.PodMetrics) ([]*backe
145149
return filtered, nil
146150
}
147151

152+
// mostKVCacheFilterFunc is similar to leastKVCacheFilterFunc but prefers pods with higher KV cache.
153+
func mostKVCacheFilterFunc(b *LLMRequest, pods []*backend.PodMetrics) ([]*backend.PodMetrics, error) {
154+
min := math.MaxFloat64
155+
var max float64 = 0
156+
filtered := []*backend.PodMetrics{}
157+
158+
for _, pod := range pods {
159+
if pod.KVCacheUsagePercent <= min {
160+
min = pod.KVCacheUsagePercent
161+
}
162+
if pod.KVCacheUsagePercent >= max {
163+
max = pod.KVCacheUsagePercent
164+
}
165+
}
166+
167+
klog.V(3).Infof("mostKVCacheFilterFunc, max=%v, min=%v", max, min)
168+
for _, pod := range pods {
169+
klog.V(3).Infof("Evaluating pod %v", pod.KVCacheUsagePercent)
170+
if pod.KVCacheUsagePercent <= max && pod.KVCacheUsagePercent >= max-(max-min)/float64(len(pods)) {
171+
klog.V(3).Infof("Selected pod %v", pod.KVCacheUsagePercent)
172+
filtered = append(filtered, pod)
173+
}
174+
}
175+
return filtered, nil
176+
}
177+
148178
// podPredicate is a filter function to check whether a pod is desired.
149179
type podPredicate func(b *LLMRequest, pod *backend.PodMetrics) bool
150180

151-
// loraAffinityPredicate return true if the pod have the requested LoRA adapter loaded.
152-
func loraAffinityPredicate(b *LLMRequest, pod *backend.PodMetrics) bool {
153-
_, ok := pod.CachedModels[b.ResolvedTargetModel]
154-
return ok
181+
// We consider serving an adapter low cost it the adapter is active in the model server, or the
182+
// model server has room to load the adapter
183+
func lowLoRACostPredicate(b *LLMRequest, pod *backend.PodMetrics) bool {
184+
_, ok := pod.ActiveModels[b.ResolvedTargetModel]
185+
return ok || len(pod.ActiveModels) < pod.MaxActiveModels
186+
}
187+
188+
func criticalRequestPredicate(b *LLMRequest, pod *backend.PodMetrics) bool {
189+
return b.Critical
190+
}
191+
192+
func noQueueAndLessThanKVCacheThresholdPredicate(threshold float64) podPredicate {
193+
return func(b *LLMRequest, pod *backend.PodMetrics) bool {
194+
return pod.WaitingQueueSize <= 0 && pod.KVCacheUsagePercent <= threshold
195+
}
196+
}
197+
198+
func allowAllPredicate(b *LLMRequest, pod *backend.PodMetrics) bool {
199+
return true
155200
}

pkg/ext-proc/scheduling/scheduler.go

+54-3
Original file line numberDiff line numberDiff line change
@@ -5,24 +5,74 @@ import (
55
"fmt"
66
"math/rand"
77

8+
"google.golang.org/grpc/codes"
9+
"google.golang.org/grpc/status"
810
klog "k8s.io/klog/v2"
911

1012
"ext-proc/backend"
1113
)
1214

15+
const (
16+
// TODO Consider making this configurable.
17+
kvCacheThreshold = 80
18+
)
19+
1320
var (
21+
allowAllFilter = &filter{
22+
name: "noop",
23+
filter: toFilterFunc(allowAllPredicate),
24+
}
25+
1426
defaultFilter = &filter{
27+
name: "critical request",
28+
filter: toFilterFunc(criticalRequestPredicate),
29+
nextOnSuccess: criticalRequestFilter,
30+
nextOnFailure: sheddableRequestFilter,
31+
}
32+
33+
// The goal for scheduling critical requests is to minimize the latency. The heuristic is to
34+
// pick a server with least "load" (KV Cache), which typically yields lower latency.
35+
// Heuristics for scheduling critical requests:
36+
criticalRequestFilter = &filter{
1537
name: "least queuing",
1638
filter: leastQueuingFilterFunc,
1739
nextOnSuccessOrFailure: &filter{
18-
name: "lora affinity",
19-
filter: toFilterFunc(loraAffinityPredicate),
40+
name: "low cost LoRA",
41+
filter: toFilterFunc(lowLoRACostPredicate),
2042
nextOnSuccessOrFailure: &filter{
2143
name: "least KV cache percent",
2244
filter: leastKVCacheFilterFunc,
2345
},
2446
},
2547
}
48+
49+
// The goal for scheduling sheddable requests is to optimize for throughput while reducing
50+
// queuing, and leave low load (KV cache) servers to serve critical requests.
51+
sheddableRequestFilter = &filter{
52+
// When there is at least one model server that's not queuing requests, and still has KV
53+
// cache below a certain threshold, we consider this model server has capacity to handle
54+
// a sheddable request without impacting critical requests.
55+
name: "has capacity for sheddable requests",
56+
filter: toFilterFunc(noQueueAndLessThanKVCacheThresholdPredicate(kvCacheThreshold)),
57+
nextOnSuccess: &filter{
58+
name: "most KV cache percent",
59+
filter: mostKVCacheFilterFunc,
60+
nextOnSuccessOrFailure: &filter{
61+
name: "low cost LoRA",
62+
filter: toFilterFunc(lowLoRACostPredicate),
63+
nextOnFailure: allowAllFilter,
64+
},
65+
},
66+
// If all pods are queuing or running above the KVCache threshold, we drop the sheddable
67+
// request to make room for critical requests.
68+
nextOnFailure: &filter{
69+
name: "drop request",
70+
filter: func(b *LLMRequest, pods []*backend.PodMetrics) ([]*backend.PodMetrics, error) {
71+
klog.Infof("Dropping request %v", b)
72+
return []*backend.PodMetrics{}, status.Errorf(codes.ResourceExhausted, "dropping request due to limited backend resources")
73+
},
74+
},
75+
}
2676
)
2777

2878
func NewScheduler(pmp PodMetricsProvider) *Scheduler {
@@ -48,8 +98,9 @@ func (s *Scheduler) Schedule(b *LLMRequest) (targetPod *backend.Pod, err error)
4898
klog.V(3).Infof("request: %v; metrics: %+v", b, s.podMetricsProvider.AllPodMetrics())
4999
pods, err := s.filter.Filter(b, s.podMetricsProvider.AllPodMetrics())
50100
if err != nil || len(pods) == 0 {
51-
return nil, fmt.Errorf("failed to apply filter, resulted %v pods, this should never happen: %v", len(pods), err)
101+
return nil, fmt.Errorf("failed to apply filter, resulted %v pods, this should never happen: %w", len(pods), err)
52102
}
103+
klog.V(3).Infof("Going to randomly select a pod from the candidates: %+v", pods)
53104
i := rand.Intn(len(pods))
54105
return &pods[i].Pod, nil
55106
}

pkg/ext-proc/scheduling/types.go

+1
Original file line numberDiff line numberDiff line change
@@ -7,4 +7,5 @@ type LLMRequest struct {
77
TargetModels map[string]int
88
// Resolved target model is the final target model after traffic split.
99
ResolvedTargetModel string
10+
Critical bool
1011
}

0 commit comments

Comments
 (0)