Skip to content

Commit 3d99aa1

Browse files
authored
Introduce SaturationDetector component (#808)
This commit adds a new `SaturationDetector` component responsible for determining if backend model servers are saturated. It bases its decision on observed metrics like queue depth and KV cache utilization, using configurable thresholds. The detector is designed to be a self-contained unit that can be leveraged by other components for admission control and capacity assessment. This is the first step in a larger refactoring to externalize and centralize saturation detection logic.
1 parent 8687feb commit 3d99aa1

File tree

5 files changed

+636
-7
lines changed

5 files changed

+636
-7
lines changed

pkg/epp/common/config/defaults.go

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
/*
2+
Copyright 2025 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
// Package config holds common configuration default values used across
18+
// different EPP components.
19+
package config
20+
21+
const (
22+
// DefaultKVCacheThreshold is the default KV cache utilization (0.0 to 1.0)
23+
// threshold.
24+
DefaultKVCacheThreshold = 0.8
25+
// DefaultQueueThresholdCritical is the default backend waiting queue size
26+
// threshold.
27+
DefaultQueueThresholdCritical = 5
28+
)

pkg/epp/saturationdetector/config.go

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
/*
2+
Copyright 2025 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
package saturationdetector
17+
18+
import (
19+
"fmt"
20+
"time"
21+
22+
"sigs.k8s.io/controller-runtime/pkg/log"
23+
commonconfig "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/common/config"
24+
envutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/env"
25+
)
26+
27+
// Default configuration values
28+
const (
29+
DefaultQueueDepthThreshold = commonconfig.DefaultQueueThresholdCritical
30+
DefaultKVCacheUtilThreshold = commonconfig.DefaultKVCacheThreshold
31+
// DefaultMetricsStalenessThreshold defines how old metrics can be before they
32+
// are considered stale.
33+
// Given the pod metrics refresh interval is 50ms, a threshold slightly above
34+
// that should be fine.
35+
DefaultMetricsStalenessThreshold = 200 * time.Millisecond
36+
)
37+
38+
// Environment variable names for SaturationDetector configuration
39+
const (
40+
EnvSdQueueDepthThreshold = "SD_QUEUE_DEPTH_THRESHOLD"
41+
EnvSdKVCacheUtilThreshold = "SD_KV_CACHE_UTIL_THRESHOLD"
42+
EnvSdMetricsStalenessThreshold = "SD_METRICS_STALENESS_THRESHOLD"
43+
)
44+
45+
// LoadConfigFromEnv loads SaturationDetector Config from environment
46+
// variables.
47+
func LoadConfigFromEnv() *Config {
48+
// Use a default logger for initial configuration loading.
49+
logger := log.Log.WithName("saturation-detector-config")
50+
51+
cfg := &Config{}
52+
53+
cfg.QueueDepthThreshold = envutil.GetEnvInt(EnvSdQueueDepthThreshold, DefaultQueueDepthThreshold, logger)
54+
cfg.KVCacheUtilThreshold = envutil.GetEnvFloat(EnvSdKVCacheUtilThreshold, DefaultKVCacheUtilThreshold, logger)
55+
cfg.MetricsStalenessThreshold = envutil.GetEnvDuration(EnvSdMetricsStalenessThreshold, DefaultMetricsStalenessThreshold, logger)
56+
57+
// NewDetector validates the config and assigns defaults.
58+
logger.Info("SaturationDetector configuration loaded from env",
59+
"config", fmt.Sprintf("%+v", cfg))
60+
return cfg
61+
}
Lines changed: 190 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,190 @@
1+
/*
2+
Copyright 2025 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
// Package saturationdetector implements a mechanism to determine if the
18+
// backend model servers are considered saturated based on observed metrics.
19+
//
20+
// The current implementation provides a global saturation signal (IsSaturated)
21+
// primarily based on backend queue depths and KV cache utilization, reflecting
22+
// the saturation signals previously used by the Scheduler before the
23+
// introduction of the FlowController. It fetches live metrics from the
24+
// provided Datastore.
25+
//
26+
// TODO: Explore more advanced saturation signals in the future, such as:
27+
// - Latency-objective-based saturation.
28+
// - Predictive saturation based on trends.
29+
// - Hysteresis bands or other smoothing techniques to prevent rapid
30+
// oscillations of the saturation signal.
31+
package saturationdetector
32+
33+
import (
34+
"context"
35+
"errors"
36+
"time"
37+
38+
"github.com/go-logr/logr"
39+
"sigs.k8s.io/controller-runtime/pkg/log"
40+
backendmetrics "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend/metrics"
41+
logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/logging"
42+
)
43+
44+
// clock allows mocking time in tests.
45+
type clock interface {
46+
now() time.Time
47+
}
48+
49+
// realClock provides the real time.
50+
type realClock struct{}
51+
52+
func (c realClock) now() time.Time { return time.Now() }
53+
54+
const (
55+
// loggerName is the name to use for loggers created by this package.
56+
loggerName = "SaturationDetector"
57+
)
58+
59+
// ErrNilDatastore indicates NewSaturationDetector was called with a nil
60+
// datastore.
61+
var ErrNilDatastore = errors.New("datastore cannot be nil")
62+
63+
// Config holds the configuration for the SaturationDetector.
64+
type Config struct {
65+
// QueueDepthThreshold defines the backend waiting queue size above which a
66+
// pod is considered to have insufficient capacity for new requests.
67+
QueueDepthThreshold int
68+
// KVCacheUtilThreshold defines the KV cache utilization (0.0 to 1.0) above
69+
// which a pod is considered to have insufficient capacity.
70+
KVCacheUtilThreshold float64
71+
// MetricsStalenessThreshold defines how old a pod's metrics can be.
72+
// If a pod's metrics are older than this, it might be excluded from
73+
// "good capacity" considerations or treated as having no capacity for
74+
// safety.
75+
MetricsStalenessThreshold time.Duration
76+
}
77+
78+
// Datastore provides an interface to access backend pod metrics.
79+
type Datastore interface {
80+
PodGetAll() []backendmetrics.PodMetrics
81+
}
82+
83+
// Detector determines system saturation based on metrics from the Datastore.
84+
//
85+
// The Detector currently holds a direct dependency on a Datastore interface.
86+
// This design choice was made to encapsulate the logic of fetching and
87+
// interpreting metrics for saturation, thereby simplifying the dependencies
88+
// for primary consumers like the FlowController--to be added soon--(which
89+
// would otherwise need to manage Datastore interactions itself).
90+
// This architectural decision may be revisited in the future if a more
91+
// decoupled approach (e.g., passing metrics directly to IsSaturated) proves
92+
// more beneficial.
93+
type Detector struct {
94+
datastore Datastore
95+
config Config
96+
clock clock
97+
}
98+
99+
// NewDetector creates a new SaturationDetector.
100+
// The datastore is expected to provide access to live/recently-updated pod
101+
// metrics.
102+
// The config provides the thresholds for determining saturation.
103+
func NewDetector(config Config, datastore Datastore, logger logr.Logger) (*Detector, error) {
104+
if datastore == nil {
105+
return nil, ErrNilDatastore
106+
}
107+
if config.MetricsStalenessThreshold <= 0 {
108+
config.MetricsStalenessThreshold = DefaultMetricsStalenessThreshold
109+
}
110+
logger.WithName(loggerName).V(logutil.DEFAULT).Info("Creating new SaturationDetector",
111+
"queueDepthThreshold", config.QueueDepthThreshold,
112+
"kvCacheUtilThreshold", config.KVCacheUtilThreshold,
113+
"metricsStalenessThreshold", config.MetricsStalenessThreshold.String())
114+
return &Detector{
115+
datastore: datastore,
116+
config: config,
117+
clock: realClock{},
118+
}, nil
119+
}
120+
121+
// IsSaturated checks if the system is currently considered saturated.
122+
// The system is saturated if NO pod currently has "good capacity".
123+
// "Good capacity" means:
124+
// 1. Metrics are fresh (not stale).
125+
// 2. WaitingQueueSize <= QueueDepthThreshold.
126+
// 3. KVCacheUsagePercent <= KVCacheUtilThreshold.
127+
//
128+
// If no pods are found in the datastore, the system is considered saturated
129+
// (no capacity).
130+
func (d *Detector) IsSaturated(ctx context.Context) bool {
131+
logger := log.FromContext(ctx).WithName(loggerName)
132+
allPodsMetrics := d.datastore.PodGetAll()
133+
if len(allPodsMetrics) == 0 {
134+
logger.V(logutil.VERBOSE).Info("No pods found in datastore; system is considered SATURATED (no capacity).")
135+
// If there are no pods, there is no capacity to serve requests.
136+
// Treat this as a saturated state to enable FlowController queuing.
137+
return true
138+
}
139+
140+
now := d.clock.now()
141+
foundPodWithGoodCapacity := false
142+
for _, podMetric := range allPodsMetrics {
143+
metrics := podMetric.GetMetrics()
144+
podNn := "unknown-pod"
145+
if podMetric.GetPod() != nil {
146+
podNn = podMetric.GetPod().NamespacedName.String()
147+
}
148+
149+
if metrics == nil {
150+
logger.V(logutil.VERBOSE).Info("Pod has nil metrics, skipping for saturation check",
151+
"pod", podNn)
152+
continue
153+
}
154+
155+
// 1. Check for metric staleness
156+
if now.Sub(metrics.UpdateTime) > d.config.MetricsStalenessThreshold {
157+
logger.V(logutil.VERBOSE).Info("Pod metrics are stale, considered as not having good capacity",
158+
"pod", podNn,
159+
"updateTime", metrics.UpdateTime,
160+
"stalenessThreshold", d.config.MetricsStalenessThreshold)
161+
continue
162+
}
163+
164+
// 2. Check queue depth
165+
isQueueGood := metrics.WaitingQueueSize <= d.config.QueueDepthThreshold
166+
167+
// 3. Check KV cache utilization
168+
isKVCacheGood := metrics.KVCacheUsagePercent <= d.config.KVCacheUtilThreshold
169+
170+
if isQueueGood && isKVCacheGood {
171+
logger.V(logutil.VERBOSE).Info("Found pod with good capacity",
172+
"pod", podNn,
173+
"waitingQueue", metrics.WaitingQueueSize,
174+
"queueThreshold", d.config.QueueDepthThreshold,
175+
"kvCacheUtil", metrics.KVCacheUsagePercent,
176+
"kvCacheThreshold", d.config.KVCacheUtilThreshold)
177+
foundPodWithGoodCapacity = true
178+
// Found at least one pod with good capacity, so system is NOT saturated.
179+
break
180+
}
181+
}
182+
183+
if !foundPodWithGoodCapacity {
184+
logger.V(logutil.VERBOSE).Info("No pods found with good capacity; system is considered SATURATED.")
185+
return true
186+
}
187+
188+
logger.V(logutil.VERBOSE).Info("System is considered NOT saturated (at least one pod has good capacity).")
189+
return false
190+
}

0 commit comments

Comments
 (0)