Skip to content

Refactor: Externalize Scheduler's saturation logic and criticality-based service differentiation #805

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 49 additions & 20 deletions cmd/epp/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,9 @@ import (
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datastore"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/metrics"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/metrics/collectors"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/saturationdetector"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/plugins"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/plugins/filter"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/plugins/multi/prefix"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/plugins/picker"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/plugins/scorer"
Expand Down Expand Up @@ -157,13 +157,19 @@ func run() error {
})
setupLog.Info("Flags processed", "flags", flags)

// Init runtime.
// --- Load Configurations from Environment Variables ---
// Note: Scheduler config is loaded via its package init currently. We may
// want to load it here explicitly:
sdConfig := saturationdetector.LoadConfigFromEnv()

// --- Get Kubernetes Config ---
cfg, err := ctrl.GetConfig()
if err != nil {
setupLog.Error(err, "Failed to get rest config")
setupLog.Error(err, "Failed to get Kubernetes rest config")
return err
}

// --- Setup Manager ---
poolNamespacedName := types.NamespacedName{
Name: *poolName,
Namespace: *poolNamespace,
Expand All @@ -174,7 +180,7 @@ func run() error {
return err
}

// Set up mapper for metric scraping.
// --- Setup Datastore ---
mapping, err := backendmetrics.NewMetricMapping(
*totalQueuedRequestsMetric,
*kvCacheUsagePercentageMetric,
Expand All @@ -185,14 +191,12 @@ func run() error {
return err
}
verifyMetricMapping(*mapping, setupLog)

pmf := backendmetrics.NewPodMetricsFactory(&backendmetrics.PodMetricsClientImpl{MetricMapping: mapping}, *refreshMetricsInterval)
// Setup runner.
ctx := ctrl.SetupSignalHandler()
appDatastore := datastore.NewDatastore(ctx, pmf)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why appDatastore and appScheduler?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did this to avoid collisions with the package names. This is not strictly necessary though.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

perhaps call them ds sched


datastore := datastore.NewDatastore(ctx, pmf)

scheduler := scheduling.NewScheduler(datastore)
// --- Initialize EPP Components ---
appScheduler := scheduling.NewScheduler(appDatastore)
if schedulerV2 == "true" {
queueScorerWeight := envutil.GetEnvInt("QUEUE_SCORE_WEIGHT", scorer.DefaultQueueScorerWeight, setupLog)
kvCacheScorerWeight := envutil.GetEnvInt("KV_CACHE_SCORE_WEIGHT", scorer.DefaultKVCacheScorerWeight, setupLog)
Expand All @@ -207,47 +211,62 @@ func run() error {
}
schedulerConfig := scheduling.NewSchedulerConfig(
[]plugins.PreSchedule{},
[]plugins.Filter{filter.NewSheddableCapacityFilter()},
[]plugins.Filter{},
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@liu-cong I can also do this in the next PR when I actually remove this from the scheduler. Right now, I have only removed it from scheduler v2, not the original decision tree filter. If we want to bundle that together in a single PR, I can revert this line for now.

scorers,
picker.NewMaxScorePicker(),
[]plugins.PostSchedule{},
[]plugins.PostResponse{},
schedConfigOpts...)
scheduler = scheduling.NewSchedulerWithConfig(datastore, schedulerConfig)
appScheduler = scheduling.NewSchedulerWithConfig(appDatastore, schedulerConfig)
}

appSaturationDetector, err := saturationdetector.NewDetector(
*sdConfig,
appDatastore,
ctrl.Log.WithName("saturation-detector"),
)
if err != nil {
setupLog.Error(err, "Failed to create SaturationDetector")
return err
}

// --- Setup ExtProc Server Runner ---
serverRunner := &runserver.ExtProcServerRunner{
GrpcPort: *grpcPort,
DestinationEndpointHintMetadataNamespace: *destinationEndpointHintMetadataNamespace,
DestinationEndpointHintKey: *destinationEndpointHintKey,
PoolNamespacedName: poolNamespacedName,
Datastore: datastore,
Datastore: appDatastore,
SecureServing: *secureServing,
CertPath: *certPath,
RefreshPrometheusMetricsInterval: *refreshPrometheusMetricsInterval,
Scheduler: scheduler,
Scheduler: appScheduler,
SaturationDetector: appSaturationDetector,
}
if err := serverRunner.SetupWithManager(ctx, mgr); err != nil {
setupLog.Error(err, "Failed to setup ext-proc controllers")
setupLog.Error(err, "Failed to setup EPP controllers")
return err
}

// --- Add Runnables to Manager ---

// Register health server.
if err := registerHealthServer(mgr, ctrl.Log.WithName("health"), datastore, *grpcHealthPort); err != nil {
if err := registerHealthServer(mgr, ctrl.Log.WithName("health"), appDatastore, *grpcHealthPort); err != nil {
return err
}

// Register ext-proc server.
if err := mgr.Add(serverRunner.AsRunnable(ctrl.Log.WithName("ext-proc"))); err != nil {
setupLog.Error(err, "Failed to register ext-proc gRPC server")
if err := registerExtProcServer(mgr, serverRunner, ctrl.Log.WithName("ext-proc")); err != nil {
return err
}

// Register metrics handler.
if err := registerMetricsHandler(mgr, *metricsPort, cfg, datastore); err != nil {
if err := registerMetricsHandler(mgr, *metricsPort, cfg, appDatastore); err != nil {
return err
}

// Start the manager. This blocks until a signal is received.
// --- Start Manager ---
// This blocks until a signal is received.
setupLog.Info("Controller manager starting")
if err := mgr.Start(ctx); err != nil {
setupLog.Error(err, "Error starting controller manager")
Expand Down Expand Up @@ -275,6 +294,17 @@ func initLogging(opts *zap.Options) {
ctrl.SetLogger(logger)
}

// registerExtProcServer adds the ExtProcServerRunner as a Runnable to the
// manager.
func registerExtProcServer(mgr manager.Manager, runner *runserver.ExtProcServerRunner, logger logr.Logger) error {
if err := mgr.Add(runner.AsRunnable(logger)); err != nil {
setupLog.Error(err, "Failed to register ext-proc gRPC server runnable")
return err
}
setupLog.Info("ExtProc server runner added to manager.")
return nil
}

// registerHealthServer adds the Health gRPC server as a Runnable to the given manager.
func registerHealthServer(mgr manager.Manager, logger logr.Logger, ds datastore.Datastore, port int) error {
srv := grpc.NewServer()
Expand Down Expand Up @@ -364,5 +394,4 @@ func verifyMetricMapping(mapping backendmetrics.MetricMapping, logger logr.Logge
if mapping.LoraRequestInfo == nil {
logger.Info("Not scraping metric: LoraRequestInfo")
}

}
Loading