Skip to content

Commit df2cfb5

Browse files
committed
Refactor scheduler
1 parent 206ef93 commit df2cfb5

File tree

12 files changed

+538
-407
lines changed

12 files changed

+538
-407
lines changed

pkg/epp/backend/metrics/metrics.go

+2-1
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,7 @@ func (p *PodMetricsClientImpl) promToPodMetrics(
109109

110110
if loraMetrics != nil {
111111
updated.ActiveModels = make(map[string]int)
112+
updated.WaitingModels = make(map[string]int)
112113
for _, label := range loraMetrics.GetLabel() {
113114
if label.GetName() == LoraInfoRunningAdaptersMetricName {
114115
if label.GetValue() != "" {
@@ -122,7 +123,7 @@ func (p *PodMetricsClientImpl) promToPodMetrics(
122123
if label.GetValue() != "" {
123124
adapterList := strings.Split(label.GetValue(), ",")
124125
for _, adapter := range adapterList {
125-
updated.ActiveModels[adapter] = 0
126+
updated.WaitingModels[adapter] = 0
126127
}
127128
}
128129
}

pkg/epp/backend/metrics/metrics_test.go

+7-4
Original file line numberDiff line numberDiff line change
@@ -404,7 +404,8 @@ func TestPromToPodMetrics(t *testing.T) {
404404
expectedMetrics: &Metrics{
405405
WaitingQueueSize: 7,
406406
KVCacheUsagePercent: 0.8,
407-
ActiveModels: map[string]int{"lora1": 0, "lora2": 0, "lora3": 0},
407+
ActiveModels: map[string]int{"lora1": 0, "lora2": 0},
408+
WaitingModels: map[string]int{"lora3": 0},
408409
MaxActiveModels: 3,
409410
},
410411
},
@@ -416,8 +417,8 @@ func TestPromToPodMetrics(t *testing.T) {
416417
KVCacheUtilization: &MetricSpec{MetricName: "vllm_usage"},
417418
LoraRequestInfo: &MetricSpec{MetricName: "vllm:lora_requests_info"},
418419
},
419-
existingMetrics: &Metrics{ActiveModels: map[string]int{}},
420-
expectedMetrics: &Metrics{ActiveModels: map[string]int{}},
420+
existingMetrics: &Metrics{ActiveModels: map[string]int{}, WaitingModels: map[string]int{}},
421+
expectedMetrics: &Metrics{ActiveModels: map[string]int{}, WaitingModels: map[string]int{}},
421422
expectedErr: multierr.Combine(errors.New("metric family \"vllm_waiting\" not found"), errors.New("metric family \"vllm_usage\" not found"), errors.New("metric family \"vllm:lora_requests_info\" not found")),
422423
},
423424
{
@@ -439,7 +440,8 @@ func TestPromToPodMetrics(t *testing.T) {
439440
expectedMetrics: &Metrics{
440441
WaitingQueueSize: 0,
441442
KVCacheUsagePercent: 0.8,
442-
ActiveModels: map[string]int{"lora1": 0, "lora2": 0, "lora3": 0},
443+
ActiveModels: map[string]int{"lora1": 0, "lora2": 0},
444+
WaitingModels: map[string]int{"lora3": 0},
443445
MaxActiveModels: 3,
444446
},
445447
expectedErr: errors.New("metric family \"vllm_waiting\" not found"),
@@ -457,6 +459,7 @@ func TestPromToPodMetrics(t *testing.T) {
457459
existingMetrics: &Metrics{},
458460
expectedMetrics: &Metrics{
459461
ActiveModels: map[string]int{"lora1": 0},
462+
WaitingModels: map[string]int{},
460463
MaxActiveModels: 0, // Should still default to 0.
461464

462465
},

pkg/epp/backend/metrics/pod_metrics_test.go

+2
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ var (
4444
"foo": 1,
4545
"bar": 1,
4646
},
47+
WaitingModels: map[string]int{},
4748
}
4849
updated = &Metrics{
4950
WaitingQueueSize: 9999,
@@ -53,6 +54,7 @@ var (
5354
"foo": 1,
5455
"bar": 1,
5556
},
57+
WaitingModels: map[string]int{},
5658
}
5759
)
5860

pkg/epp/backend/metrics/types.go

+22-4
Original file line numberDiff line numberDiff line change
@@ -41,16 +41,17 @@ type PodMetricsFactory struct {
4141
}
4242

4343
func (f *PodMetricsFactory) NewPodMetrics(parentCtx context.Context, in *corev1.Pod, ds Datastore) PodMetrics {
44+
pod := toInternalPod(in)
4445
pm := &podMetrics{
4546
pmc: f.pmc,
4647
ds: ds,
4748
interval: f.refreshMetricsInterval,
4849
parentCtx: parentCtx,
4950
once: sync.Once{},
5051
done: make(chan struct{}),
51-
logger: log.FromContext(parentCtx),
52+
logger: log.FromContext(parentCtx).WithValues("pod", pod.NamespacedName),
5253
}
53-
pm.pod.Store(toInternalPod(in))
54+
pm.pod.Store(pod)
5455
pm.metrics.Store(newMetrics())
5556

5657
pm.startRefreshLoop()
@@ -77,9 +78,20 @@ func (p *Pod) String() string {
7778
return fmt.Sprintf("%+v", *p)
7879
}
7980

81+
func (p *Pod) Clone() *Pod {
82+
return &Pod{
83+
NamespacedName: types.NamespacedName{
84+
Name: p.NamespacedName.Name,
85+
Namespace: p.NamespacedName.Namespace,
86+
},
87+
Address: p.Address,
88+
}
89+
}
90+
8091
type Metrics struct {
8192
// ActiveModels is a set of models(including LoRA adapters) that are currently cached to GPU.
82-
ActiveModels map[string]int
93+
ActiveModels map[string]int
94+
WaitingModels map[string]int
8395
// MaxActiveModels is the maximum number of models that can be loaded to GPU.
8496
MaxActiveModels int
8597
RunningQueueSize int
@@ -93,7 +105,8 @@ type Metrics struct {
93105

94106
func newMetrics() *Metrics {
95107
return &Metrics{
96-
ActiveModels: make(map[string]int),
108+
ActiveModels: make(map[string]int),
109+
WaitingModels: make(map[string]int),
97110
}
98111
}
99112

@@ -109,8 +122,13 @@ func (m *Metrics) Clone() *Metrics {
109122
for k, v := range m.ActiveModels {
110123
cm[k] = v
111124
}
125+
wm := make(map[string]int, len(m.WaitingModels))
126+
for k, v := range m.WaitingModels {
127+
wm[k] = v
128+
}
112129
clone := &Metrics{
113130
ActiveModels: cm,
131+
WaitingModels: wm,
114132
MaxActiveModels: m.MaxActiveModels,
115133
RunningQueueSize: m.RunningQueueSize,
116134
WaitingQueueSize: m.WaitingQueueSize,

pkg/epp/datastore/datastore_test.go

+3
Original file line numberDiff line numberDiff line change
@@ -236,6 +236,7 @@ var (
236236
"foo": 1,
237237
"bar": 1,
238238
},
239+
WaitingModels: map[string]int{},
239240
}
240241
pod2 = &corev1.Pod{
241242
ObjectMeta: metav1.ObjectMeta{
@@ -250,6 +251,7 @@ var (
250251
"foo1": 1,
251252
"bar1": 1,
252253
},
254+
WaitingModels: map[string]int{},
253255
}
254256
pod1NamespacedName = types.NamespacedName{Name: pod1.Name, Namespace: pod1.Namespace}
255257
pod2NamespacedName = types.NamespacedName{Name: pod2.Name, Namespace: pod2.Namespace}
@@ -305,6 +307,7 @@ func TestMetrics(t *testing.T) {
305307
// Failed to fetch pod2 metrics so it remains the default values.
306308
{
307309
ActiveModels: map[string]int{},
310+
WaitingModels: map[string]int{},
308311
WaitingQueueSize: 0,
309312
KVCacheUsagePercent: 0,
310313
MaxActiveModels: 0,

pkg/epp/handlers/server.go

+1-2
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@ import (
2626
"google.golang.org/grpc/codes"
2727
"google.golang.org/grpc/status"
2828
"sigs.k8s.io/controller-runtime/pkg/log"
29-
backendmetrics "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend/metrics"
3029
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datastore"
3130
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/metrics"
3231
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling"
@@ -57,7 +56,7 @@ type Server struct {
5756
}
5857

5958
type Scheduler interface {
60-
Schedule(ctx context.Context, b *scheduling.LLMRequest) (targetPod backendmetrics.PodMetrics, err error)
59+
Schedule(ctx context.Context, b *scheduling.LLMRequest) (targetPod scheduling.Pod, err error)
6160
}
6261

6362
func (s *Server) Process(srv extProcPb.ExternalProcessor_ProcessServer) error {

0 commit comments

Comments
 (0)