Skip to content

Scheduler config refactor for simplifying plugins registration #835

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
May 21, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 12 additions & 15 deletions cmd/epp/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ import (
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/metrics"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/metrics/collectors"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/plugins"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/plugins/filter"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/plugins/multi/prefix"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/plugins/picker"
Expand Down Expand Up @@ -196,23 +195,21 @@ func run() error {
if schedulerV2 == "true" {
queueScorerWeight := envutil.GetEnvInt("QUEUE_SCORE_WEIGHT", scorer.DefaultQueueScorerWeight, setupLog)
kvCacheScorerWeight := envutil.GetEnvInt("KV_CACHE_SCORE_WEIGHT", scorer.DefaultKVCacheScorerWeight, setupLog)
scorers := map[plugins.Scorer]int{
&scorer.QueueScorer{}: queueScorerWeight,
&scorer.KVCacheScorer{}: kvCacheScorerWeight,
}
schedConfigOpts := []scheduling.ConfigOption{}

schedulerConfig := scheduling.NewSchedulerConfig().
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another approach is to use reflection, for each plugin we detect what extension points it implements and register them: https://github.com/kubernetes/kubernetes/blob/master/pkg/scheduler/framework/runtime/framework.go#L522

This will make it easier to add out of tree plugins.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh, you already have AddPlugin, why not use it here instead of the extension specific with options?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

these are two different questions. I'll try to answer both.

Another approach is to use reflection

yes. the two valid approaches we have here are type assertion and reflection, each has pros/cons.

main pros for type assertion are:

  • Type safety: Checked at compile-time; you're guaranteed not to misuse types.
  • Performance: Much faster than reflection; no runtime metadata lookup needed.
  • Readability: Straightforward and idiomatic in Go.
  • No imports: No need to bring in the reflect package, keeping dependencies minimal.

main pros for reflection:

  • Dynamic inspection: You can iterate over available methods, fields, or types dynamically — useful for very generic tooling or plugin systems.
  • No prior knowledge required: Useful if you don’t know all possible interfaces ahead of time.

in our case, we know the interfaces we're checking upfront and I see no benefit for using reflection which is a better fit when the interfaces are not known upfront. type assertions are much more readable & maintainable and much faster. Generally speaking, Go encourages using explicit, compile-time checked patterns like type assertions when the set of interfaces is known.

oh, you already have AddPlugin, why not use it here instead of the extension specific with options?

right. the intention was to allow maximum flexibility with the plugins registration.
for example, let's assume I have a multi extension plugin but I'm running some performance benchmarks and want to register it only as a filter and not as a scorer to compare results. AddPlugin will add to all matching interfaces, while using the With functions sets only the requested plugin type.
This PR is trying to solve the multi extension plugin registration and make it clean, but not in the price of introducing other inconveniences.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes. the two valid approaches we have here are type assertion and reflection, each has pros/cons.

I meant to suggest runtime discovery of what extensions to register vs having to explicitly code that at registration. Type assertion vs reflication is an implementation detail.

right. the intention was to allow maximum flexibility with the plugins registration.

I think we are not there yet, but we want to distinguish between registration and creating profiles. We should have a central registry of plugins and the extensions they implement. Profiles reference specific set of extensions to execute by referencing the registry.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, we're not there yet with the multi profiles and central registry. I agree that all of this is needed.
I believe that as we make progress, the current SchedulerConfig will become SchedulerProfile, and SchedulerConfig will probably be a set of configured profiles and maybe more configuration values.

but all of this is not really related to having the "With" functions which allows flexibility as long as the config (or profile) is defined by code and not by configuration file.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sg, good we are aligned on the future direction

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I prefer a single AddPlugins method, it's simple and I don't need to decide which one to use. It's just in the startup path so I don't think performance hit of reflection really matters. Yes we sacrifice some readability but we are already using it anyway. To me the benefit of not having to understand the different options overweighs the others.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the intention was to allow maximum flexibility with the plugins registration.
for example, let's assume I have a multi extension plugin but I'm running some performance benchmarks and want to register it only as a filter and not as a scorer to compare results. AddPlugin will add to all matching interfaces, while using the With functions sets only the requested plugin type.
This PR is trying to solve the multi extension plugin registration and make it clean, but not in the price of introducing other inconveniences.

WithFilters(filter.NewSheddableCapacityFilter()).
WithScorers(scorer.NewWeightedScorer(&scorer.QueueScorer{}, queueScorerWeight),
scorer.NewWeightedScorer(&scorer.KVCacheScorer{}, kvCacheScorerWeight)).
WithPicker(picker.NewMaxScorePicker())

if prefixCacheScheduling == "true" {
prefixScorerWeight := envutil.GetEnvInt("PREFIX_CACHE_SCORE_WEIGHT", prefix.DefaultScorerWeight, setupLog)
schedConfigOpts = append(schedConfigOpts, scheduling.AddPrefixPlugin(loadPrefixCacheConfig(), prefixScorerWeight))
if err := schedulerConfig.AddPlugins(scorer.NewWeightedScorer(prefix.New(loadPrefixCacheConfig()), prefixScorerWeight)); err != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the fact you need to wrap a plugin with WeightedScorer is error prone. It's easy to miss.

Copy link
Contributor Author

@nirrozenbaum nirrozenbaum May 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it doesn't make sense to allow scorer registration without a weight. we should allow only weighted scorer.
on the other hand, I didn't want to add a weight field inside the scorer itself, cause the same scorer can be used in different scheduling cycles with different weights (and it may be required to use the same scorer instance).
This was a requirement I got from internal IBM teams.
therefore, I introduced the weighted scorer struct which from one hand adds the weight, and on the other hand allows reusing the same scorer instance but with a different weight in a different scheduling cycle.

to address your concern (error prone), I added a check in the AddPlugins function and if a scorer is passed (instead of weighted scorer) an appropriate error is returned, so a developer can easily understand and fix it.
also added a comment on the AddPlugins function godoc.

setupLog.Error(err, "Failed to register scheduler plugins")
return err
}
}
schedulerConfig := scheduling.NewSchedulerConfig(
[]plugins.PreSchedule{},
[]plugins.Filter{filter.NewSheddableCapacityFilter()},
scorers,
picker.NewMaxScorePicker(),
[]plugins.PostSchedule{},
[]plugins.PostResponse{},
schedConfigOpts...)

scheduler = scheduling.NewSchedulerWithConfig(datastore, schedulerConfig)
}
serverRunner := &runserver.ExtProcServerRunner{
Expand Down
62 changes: 0 additions & 62 deletions pkg/epp/scheduling/config.go

This file was deleted.

11 changes: 9 additions & 2 deletions pkg/epp/scheduling/plugins/scorer/kvcache.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,27 @@ limitations under the License.
package scorer

import (
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/plugins"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/types"
)

const (
DefaultKVCacheScorerWeight = 1
)

// compile-time type validation
var _ plugins.Scorer = &KVCacheScorer{}

// KVCacheScorer scores list of candidate pods based on KV cache utilization.
type KVCacheScorer struct{}

func (ss *KVCacheScorer) Name() string {
// Name returns the name of the scorer.
func (s *KVCacheScorer) Name() string {
return "kv-cache"
}

func (ss *KVCacheScorer) Score(ctx *types.SchedulingContext, pods []types.Pod) map[types.Pod]float64 {
// Score returns the scoring result for the given list of pods based on context.
func (s *KVCacheScorer) Score(ctx *types.SchedulingContext, pods []types.Pod) map[types.Pod]float64 {
scores := make(map[types.Pod]float64, len(pods))
for _, pod := range pods {
scores[pod] = 1 - pod.GetMetrics().KVCacheUsagePercent
Expand Down
12 changes: 10 additions & 2 deletions pkg/epp/scheduling/plugins/scorer/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,20 +19,28 @@ package scorer
import (
"math"

"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/plugins"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/types"
)

const (
DefaultQueueScorerWeight = 1
)

// compile-time type validation
var _ plugins.Scorer = &QueueScorer{}

// QueueScorer scores list of candidate pods based on the pod's waiting queue size.
// the less waiting queue size the pod has, the higher score it will get (since it's more available to serve new request).
type QueueScorer struct{}

func (q *QueueScorer) Name() string {
// Name returns the name of the scorer.
func (s *QueueScorer) Name() string {
return "queue"
}

func (q *QueueScorer) Score(ctx *types.SchedulingContext, pods []types.Pod) map[types.Pod]float64 {
// Score returns the scoring result for the given list of pods based on context.
func (s *QueueScorer) Score(ctx *types.SchedulingContext, pods []types.Pod) map[types.Pod]float64 {
minQueueSize := math.MaxInt
maxQueueSize := math.MinInt

Expand Down
40 changes: 40 additions & 0 deletions pkg/epp/scheduling/plugins/scorer/weighted_scorer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
Copyright 2025 The Kubernetes Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package scorer

import (
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/plugins"
)

// NewWeightedScorer initializes a new WeightedScorer and returns its pointer.
func NewWeightedScorer(scorer plugins.Scorer, weight int) *WeightedScorer {
return &WeightedScorer{
Scorer: scorer,
weight: weight,
}
}

// WeightedScorer is a struct that encapsulates a scorer with its weight.
type WeightedScorer struct {
plugins.Scorer
weight int
}

// Weight returns the weight of the scorer.
func (s *WeightedScorer) Weight() int {
return s.weight
}
25 changes: 11 additions & 14 deletions pkg/epp/scheduling/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/plugins"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/plugins/filter"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/plugins/picker"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/plugins/scorer"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/types"
errutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/error"
logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/logging"
Expand Down Expand Up @@ -64,13 +65,9 @@ func NewScheduler(datastore Datastore) *Scheduler {
},
}

defaultConfig := &SchedulerConfig{
preSchedulePlugins: []plugins.PreSchedule{},
filters: []plugins.Filter{filter.NewSheddableCapacityFilter(), lowLatencyFilter},
scorers: map[plugins.Scorer]int{},
picker: &picker.RandomPicker{},
postSchedulePlugins: []plugins.PostSchedule{},
}
defaultConfig := NewSchedulerConfig().
WithFilters(filter.NewSheddableCapacityFilter(), lowLatencyFilter).
WithPicker(&picker.RandomPicker{})

return NewSchedulerWithConfig(datastore, defaultConfig)
}
Expand All @@ -92,7 +89,7 @@ type Scheduler struct {
datastore Datastore
preSchedulePlugins []plugins.PreSchedule
filters []plugins.Filter
scorers map[plugins.Scorer]int // map from scorer to its weight
scorers []*scorer.WeightedScorer
picker plugins.Picker
postSchedulePlugins []plugins.PostSchedule
postResponsePlugins []plugins.PostResponse
Expand Down Expand Up @@ -172,15 +169,15 @@ func (s *Scheduler) runScorerPlugins(ctx *types.SchedulingContext, pods []types.
weightedScorePerPod[pod] = float64(0) // initialize weighted score per pod with 0 value
}
// Iterate through each scorer in the chain and accumulate the weighted scores.
for scorer, weight := range s.scorers {
loggerDebug.Info("Running scorer", "scorer", scorer.Name())
for _, weightedScorer := range s.scorers {
loggerDebug.Info("Running scorer", "scorer", weightedScorer.Name())
before := time.Now()
scores := scorer.Score(ctx, pods)
metrics.RecordSchedulerPluginProcessingLatency(plugins.ScorerPluginType, scorer.Name(), time.Since(before))
scores := weightedScorer.Score(ctx, pods)
metrics.RecordSchedulerPluginProcessingLatency(plugins.ScorerPluginType, weightedScorer.Name(), time.Since(before))
for pod, score := range scores { // weight is relative to the sum of weights
weightedScorePerPod[pod] += score * float64(weight) // TODO normalize score before multiply with weight
weightedScorePerPod[pod] += score * float64(weightedScorer.Weight())
}
loggerDebug.Info("After running scorer", "scorer", scorer.Name())
loggerDebug.Info("After running scorer", "scorer", weightedScorer.Name())
}
loggerDebug.Info("After running scorer plugins")

Expand Down
123 changes: 123 additions & 0 deletions pkg/epp/scheduling/scheduler_config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
/*
Copyright 2025 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package scheduling

import (
"fmt"

"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/plugins"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/plugins/scorer"
)

// NewSchedulerConfig creates a new SchedulerConfig object and returns its pointer.
func NewSchedulerConfig() *SchedulerConfig {
return &SchedulerConfig{
preSchedulePlugins: []plugins.PreSchedule{},
filters: []plugins.Filter{},
scorers: []*scorer.WeightedScorer{},
postSchedulePlugins: []plugins.PostSchedule{},
postResponsePlugins: []plugins.PostResponse{},
// picker remains nil since config doesn't support multiple pickers
}
}

// SchedulerConfig provides a configuration for the scheduler which influence routing decisions.
type SchedulerConfig struct {
preSchedulePlugins []plugins.PreSchedule
filters []plugins.Filter
scorers []*scorer.WeightedScorer
picker plugins.Picker
postSchedulePlugins []plugins.PostSchedule
postResponsePlugins []plugins.PostResponse
}

// WithPreSchedulePlugins sets the given plugins as the PreSchedule plugins.
// If the SchedulerConfig has PreSchedule plugins, this call replaces the existing plugins with the given ones.
func (c *SchedulerConfig) WithPreSchedulePlugins(plugins ...plugins.PreSchedule) *SchedulerConfig {
c.preSchedulePlugins = plugins
return c
}

// WithFilters sets the given filter plugins as the Filter plugins.
// if the SchedulerConfig has Filter plugins, this call replaces the existing plugins with the given ones.
func (c *SchedulerConfig) WithFilters(filters ...plugins.Filter) *SchedulerConfig {
c.filters = filters
return c
}

// WithScorers sets the given scorer plugins as the Scorer plugins.
// if the SchedulerConfig has Scorer plugins, this call replaces the existing plugins with the given ones.
func (c *SchedulerConfig) WithScorers(scorers ...*scorer.WeightedScorer) *SchedulerConfig {
c.scorers = scorers
return c
}

// WithPicker sets the given picker plugins as the Picker plugin.
// if the SchedulerConfig has Picker plugin, this call replaces the existing plugin with the given one.
func (c *SchedulerConfig) WithPicker(picker plugins.Picker) *SchedulerConfig {
c.picker = picker
return c
}

// WithPostSchedulePlugins sets the given plugins as the PostSchedule plugins.
// If the SchedulerConfig has PostSchedule plugins, this call replaces the existing plugins with the given ones.
func (c *SchedulerConfig) WithPostSchedulePlugins(plugins ...plugins.PostSchedule) *SchedulerConfig {
c.postSchedulePlugins = plugins
return c
}

// WithPostResponsePlugins sets the given plugins as the PostResponse plugins.
// If the SchedulerConfig has PostResponse plugins, this call replaces the existing plugins with the given ones.
func (c *SchedulerConfig) WithPostResponsePlugins(plugins ...plugins.PostResponse) *SchedulerConfig {
c.postResponsePlugins = plugins
return c
}

// AddPlugins adds the given plugins to all scheduler plugins according to the interfaces each plugin implements.
// A plugin may implement more than one scheduler plugin interface.
// Special Case: In order to add a scorer, one must use the scorer.NewWeightedScorer function in order to provide a weight.
// if a scorer implements more than one interface, supplying a WeightedScorer is sufficient. The function will take the internal
// scorer object and register it to all interfaces it implements.
func (c *SchedulerConfig) AddPlugins(pluginObjects ...plugins.Plugin) error {
for _, plugin := range pluginObjects {
if weightedScorer, ok := plugin.(*scorer.WeightedScorer); ok {
c.scorers = append(c.scorers, weightedScorer)
plugin = weightedScorer.Scorer // if we got WeightedScorer, unwrap the plugin
} else if scorer, ok := plugin.(plugins.Scorer); ok { // if we got a Scorer instead of WeightedScorer that's an error.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: consider adding a unit test

return fmt.Errorf("failed to register scorer '%s' without a weight. follow function documentation to register a scorer", scorer.Name())
}
if preSchedulePlugin, ok := plugin.(plugins.PreSchedule); ok {
c.preSchedulePlugins = append(c.preSchedulePlugins, preSchedulePlugin)
}
if filter, ok := plugin.(plugins.Filter); ok {
c.filters = append(c.filters, filter)
}
if picker, ok := plugin.(plugins.Picker); ok {
if c.picker != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Considering adding a unit test

return fmt.Errorf("failed to set '%s' as picker, already have a registered picker plugin '%s'", picker.Name(), c.picker.Name())
}
c.picker = picker
}
if postSchedulePlugin, ok := plugin.(plugins.PostSchedule); ok {
c.postSchedulePlugins = append(c.postSchedulePlugins, postSchedulePlugin)
}
if postResponsePlugin, ok := plugin.(plugins.PostResponse); ok {
c.postResponsePlugins = append(c.postResponsePlugins, postResponsePlugin)
}
}
return nil
}
Loading