Skip to content

Commit b121c67

Browse files
committed
small refactor of scheduler config
handles how to register a plugin that implements multiple scheduler plugins interfaces with a single registration command Signed-off-by: Nir Rozenbaum <[email protected]>
1 parent c2e3fa9 commit b121c67

File tree

8 files changed

+196
-89
lines changed

8 files changed

+196
-89
lines changed

cmd/epp/main.go

Lines changed: 12 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,6 @@ import (
4444
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/metrics"
4545
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/metrics/collectors"
4646
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling"
47-
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/plugins"
4847
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/plugins/filter"
4948
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/plugins/picker"
5049
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/plugins/prefix"
@@ -196,23 +195,21 @@ func run() error {
196195
if schedulerV2 == "true" {
197196
queueScorerWeight := envutil.GetEnvInt("QUEUE_SCORE_WEIGHT", scorer.DefaultQueueScorerWeight, setupLog)
198197
kvCacheScorerWeight := envutil.GetEnvInt("KV_CACHE_SCORE_WEIGHT", scorer.DefaultKVCacheScorerWeight, setupLog)
199-
scorers := map[plugins.Scorer]int{
200-
&scorer.QueueScorer{}: queueScorerWeight,
201-
&scorer.KVCacheScorer{}: kvCacheScorerWeight,
202-
}
203-
schedConfigOpts := []scheduling.ConfigOption{}
198+
199+
schedulerConfig := scheduling.NewSchedulerConfig().
200+
WithFilters(filter.NewSheddableCapacityFilter()).
201+
WithScorers(scorer.NewWeightedScorer(&scorer.QueueScorer{}, queueScorerWeight),
202+
scorer.NewWeightedScorer(&scorer.KVCacheScorer{}, kvCacheScorerWeight)).
203+
WithPicker(picker.NewMaxScorePicker())
204+
204205
if prefixCacheScheduling == "true" {
205206
prefixScorerWeight := envutil.GetEnvInt("PREFIX_CACHE_SCORE_WEIGHT", prefix.DefaultScorerWeight, setupLog)
206-
schedConfigOpts = append(schedConfigOpts, scheduling.AddPrefixPlugin(loadPrefixCacheConfig(), prefixScorerWeight))
207+
if err := schedulerConfig.AddPlugins(scorer.NewWeightedScorer(prefix.New(loadPrefixCacheConfig()), prefixScorerWeight)); err != nil {
208+
setupLog.Error(err, "Failed to register scheduler plugins")
209+
return err
210+
}
207211
}
208-
schedulerConfig := scheduling.NewSchedulerConfig(
209-
[]plugins.PreSchedule{},
210-
[]plugins.Filter{filter.NewSheddableCapacityFilter()},
211-
scorers,
212-
picker.NewMaxScorePicker(),
213-
[]plugins.PostSchedule{},
214-
[]plugins.PostResponse{},
215-
schedConfigOpts...)
212+
216213
scheduler = scheduling.NewSchedulerWithConfig(datastore, schedulerConfig)
217214
}
218215
serverRunner := &runserver.ExtProcServerRunner{

pkg/epp/scheduling/config.go

Lines changed: 0 additions & 62 deletions
This file was deleted.

pkg/epp/scheduling/plugins/scorer/kvcache.go

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,20 +17,27 @@ limitations under the License.
1717
package scorer
1818

1919
import (
20+
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/plugins"
2021
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/types"
2122
)
2223

2324
const (
2425
DefaultKVCacheScorerWeight = 1
2526
)
2627

28+
// compile-time type validation
29+
var _ plugins.Scorer = &KVCacheScorer{}
30+
31+
// KVCacheScorer scores list of candidate pods based on KV cache utilization.
2732
type KVCacheScorer struct{}
2833

29-
func (ss *KVCacheScorer) Name() string {
34+
// Name returns the name of the scorer.
35+
func (s *KVCacheScorer) Name() string {
3036
return "kv-cache"
3137
}
3238

33-
func (ss *KVCacheScorer) Score(ctx *types.SchedulingContext, pods []types.Pod) map[types.Pod]float64 {
39+
// Score returns the scoring result for the given list of pods based on context.
40+
func (s *KVCacheScorer) Score(ctx *types.SchedulingContext, pods []types.Pod) map[types.Pod]float64 {
3441
scores := make(map[types.Pod]float64, len(pods))
3542
for _, pod := range pods {
3643
scores[pod] = 1 - pod.GetMetrics().KVCacheUsagePercent

pkg/epp/scheduling/plugins/scorer/queue.go

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,20 +19,28 @@ package scorer
1919
import (
2020
"math"
2121

22+
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/plugins"
2223
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/types"
2324
)
2425

2526
const (
2627
DefaultQueueScorerWeight = 1
2728
)
2829

30+
// compile-time type validation
31+
var _ plugins.Scorer = &QueueScorer{}
32+
33+
// QueueScorer scores list of candidate pods based on the pod's waiting queue size.
34+
// the less waiting queue size the pod has, the higher score it will get (since it's more available to serve new request).
2935
type QueueScorer struct{}
3036

31-
func (q *QueueScorer) Name() string {
37+
// Name returns the name of the scorer.
38+
func (s *QueueScorer) Name() string {
3239
return "queue"
3340
}
3441

35-
func (q *QueueScorer) Score(ctx *types.SchedulingContext, pods []types.Pod) map[types.Pod]float64 {
42+
// Score returns the scoring result for the given list of pods based on context.
43+
func (s *QueueScorer) Score(ctx *types.SchedulingContext, pods []types.Pod) map[types.Pod]float64 {
3644
minQueueSize := math.MaxInt
3745
maxQueueSize := math.MinInt
3846

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
/*
2+
Copyright 2025 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package scorer
18+
19+
import (
20+
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/plugins"
21+
)
22+
23+
// NewWeightedScorer initializes a new WeightedScorer and returns its pointer.
24+
func NewWeightedScorer(scorer plugins.Scorer, weight int) *WeightedScorer {
25+
return &WeightedScorer{
26+
Scorer: scorer,
27+
weight: weight,
28+
}
29+
}
30+
31+
// WeightedScorer is a struct that encapsulates a scorer with its weight.
32+
type WeightedScorer struct {
33+
plugins.Scorer
34+
weight int
35+
}
36+
37+
// Weight returns the weight of the scorer.
38+
func (s *WeightedScorer) Weight() int {
39+
return s.weight
40+
}

pkg/epp/scheduling/scheduler.go

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -64,13 +64,9 @@ func NewScheduler(datastore Datastore) *Scheduler {
6464
},
6565
}
6666

67-
defaultConfig := &SchedulerConfig{
68-
preSchedulePlugins: []plugins.PreSchedule{},
69-
filters: []plugins.Filter{filter.NewSheddableCapacityFilter(), lowLatencyFilter},
70-
scorers: map[plugins.Scorer]int{},
71-
picker: &picker.RandomPicker{},
72-
postSchedulePlugins: []plugins.PostSchedule{},
73-
}
67+
defaultConfig := NewSchedulerConfig().
68+
WithFilters(filter.NewSheddableCapacityFilter(), lowLatencyFilter).
69+
WithPicker(&picker.RandomPicker{})
7470

7571
return NewSchedulerWithConfig(datastore, defaultConfig)
7672
}
Lines changed: 122 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,122 @@
1+
/*
2+
Copyright 2025 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package scheduling
18+
19+
import (
20+
"fmt"
21+
22+
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/plugins"
23+
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/plugins/scorer"
24+
)
25+
26+
// NewSchedulerConfig creates a new SchedulerConfig object and returns its pointer.
27+
func NewSchedulerConfig() *SchedulerConfig {
28+
return &SchedulerConfig{
29+
preSchedulePlugins: []plugins.PreSchedule{},
30+
filters: []plugins.Filter{},
31+
scorers: map[plugins.Scorer]int{},
32+
postSchedulePlugins: []plugins.PostSchedule{},
33+
postResponsePlugins: []plugins.PostResponse{},
34+
// picker remains nil since config doesn't support multiple pickers
35+
}
36+
}
37+
38+
// SchedulerConfig provides a configuration for the scheduler which influence routing decisions.
39+
type SchedulerConfig struct {
40+
preSchedulePlugins []plugins.PreSchedule
41+
filters []plugins.Filter
42+
scorers map[plugins.Scorer]int // map from scorer to weight
43+
picker plugins.Picker
44+
postSchedulePlugins []plugins.PostSchedule
45+
postResponsePlugins []plugins.PostResponse
46+
}
47+
48+
// WithPreSchedulePlugins sets the given plugins as the PreSchedule plugins.
49+
// If the SchedulerConfig has PreSchedule plugins, this call replaces the existing plugins with the given ones.
50+
func (c *SchedulerConfig) WithPreSchedulePlugins(plugins ...plugins.PreSchedule) *SchedulerConfig {
51+
c.preSchedulePlugins = plugins
52+
return c
53+
}
54+
55+
// WithFilters sets the given filter plugins as the Filter plugins.
56+
// if the SchedulerConfig has Filter plugins, this call replaces the existing plugins with the given ones.
57+
func (c *SchedulerConfig) WithFilters(filters ...plugins.Filter) *SchedulerConfig {
58+
c.filters = filters
59+
return c
60+
}
61+
62+
// WithScorers sets the given scorer plugins as the Scorer plugins.
63+
// if the SchedulerConfig has Scorer plugins, this call replaces the existing plugins with the given ones.
64+
func (c *SchedulerConfig) WithScorers(scorers ...*scorer.WeightedScorer) *SchedulerConfig {
65+
scorersMap := make(map[plugins.Scorer]int, len(scorers))
66+
for _, scorer := range scorers {
67+
scorersMap[scorer.Scorer] = scorer.Weight()
68+
}
69+
c.scorers = scorersMap
70+
return c
71+
}
72+
73+
// WithPicker sets the given picker plugins as the Picker plugin.
74+
// if the SchedulerConfig has Picker plugin, this call replaces the existing plugin with the given one.
75+
func (c *SchedulerConfig) WithPicker(picker plugins.Picker) *SchedulerConfig {
76+
c.picker = picker
77+
return c
78+
}
79+
80+
// WithPostSchedulePlugins sets the given plugins as the PostSchedule plugins.
81+
// If the SchedulerConfig has PostSchedule plugins, this call replaces the existing plugins with the given ones.
82+
func (c *SchedulerConfig) WithPostSchedulePlugins(plugins ...plugins.PostSchedule) *SchedulerConfig {
83+
c.postSchedulePlugins = plugins
84+
return c
85+
}
86+
87+
// WithPostResponsePlugins sets the given plugins as the PostResponse plugins.
88+
// If the SchedulerConfig has PostResponse plugins, this call replaces the existing plugins with the given ones.
89+
func (c *SchedulerConfig) WithPostResponsePlugins(plugins ...plugins.PostResponse) *SchedulerConfig {
90+
c.postResponsePlugins = plugins
91+
return c
92+
}
93+
94+
// AddPlugins adds the given plugins to all scheduler plugins according to the interfaces each plugin implements.
95+
// A plugin may implement more than one scheduler plugin interface.
96+
func (c *SchedulerConfig) AddPlugins(pluginObjects ...plugins.Plugin) error {
97+
for _, plugin := range pluginObjects {
98+
if weightedScorer, ok := plugin.(*scorer.WeightedScorer); ok {
99+
c.scorers[weightedScorer.Scorer] = weightedScorer.Weight()
100+
plugin = weightedScorer.Scorer // if we got WeightedScorer, unwrap the plugin
101+
}
102+
if preSchedulePlugin, ok := plugin.(plugins.PreSchedule); ok {
103+
c.preSchedulePlugins = append(c.preSchedulePlugins, preSchedulePlugin)
104+
}
105+
if filter, ok := plugin.(plugins.Filter); ok {
106+
c.filters = append(c.filters, filter)
107+
}
108+
if picker, ok := plugin.(plugins.Picker); ok {
109+
if c.picker != nil {
110+
return fmt.Errorf("failed to set '%s' as picker, already have a registered picker plugin '%s'", picker.Name(), c.picker.Name())
111+
}
112+
c.picker = picker
113+
}
114+
if postSchedulePlugin, ok := plugin.(plugins.PostSchedule); ok {
115+
c.postSchedulePlugins = append(c.postSchedulePlugins, postSchedulePlugin)
116+
}
117+
if postResponsePlugin, ok := plugin.(plugins.PostResponse); ok {
118+
c.postResponsePlugins = append(c.postResponsePlugins, postResponsePlugin)
119+
}
120+
}
121+
return nil
122+
}

pkg/epp/scheduling/types/scheduling_context.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@ import (
2424
)
2525

2626
func NewSchedulingContext(ctx context.Context, req *LLMRequest, resp *LLMResponse, pods []Pod) *SchedulingContext {
27-
2827
logger := log.FromContext(ctx).WithValues("request", req)
2928
return &SchedulingContext{
3029
Context: ctx,

0 commit comments

Comments
 (0)