Skip to content

Commit 37e73c4

Browse files
committed
Introduce SaturationDetector component
This commit adds a new `SaturationDetector` component responsible for determining if backend model servers are saturated. It bases its decision on observed metrics like queue depth and KV cache utilization, using configurable thresholds. The detector is designed to be a self-contained unit that can be leveraged by other components for admission control and capacity assessment. This is the first step in a larger refactoring to externalize and centralize saturation detection logic.
1 parent 2b66451 commit 37e73c4

File tree

4 files changed

+590
-4
lines changed

4 files changed

+590
-4
lines changed

pkg/epp/saturationdetector/config.go

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
/*
2+
Copyright 2025 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
package saturationdetector
17+
18+
import (
19+
"fmt"
20+
"time"
21+
22+
"sigs.k8s.io/controller-runtime/pkg/log"
23+
schedcfg "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/config"
24+
envutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/env"
25+
)
26+
27+
// Default configuration values
28+
const (
29+
DefaultQueueDepthThreshold = schedcfg.DefaultQueueThresholdCritical
30+
DefaultKVCacheUtilThreshold = schedcfg.DefaultKVCacheThreshold
31+
// DefaultMetricsStalenessThreshold defines how old metrics can be before they
32+
// are considered stale.
33+
// Given the pod metrics refresh interval is 50ms, a threshold slightly above
34+
// that should be fine.
35+
DefaultMetricsStalenessThreshold = 200 * time.Millisecond
36+
)
37+
38+
// Environment variable names for SaturationDetector configuration
39+
const (
40+
EnvSdQueueDepthThreshold = "SD_QUEUE_DEPTH_THRESHOLD"
41+
EnvSdKVCacheUtilThreshold = "SD_KV_CACHE_UTIL_THRESHOLD"
42+
EnvSdMetricsStalenessThreshold = "SD_METRICS_STALENESS_THRESHOLD"
43+
)
44+
45+
// LoadConfigFromEnv loads SaturationDetector Config from environment
46+
// variables.
47+
func LoadConfigFromEnv() *Config {
48+
// Use a default logger for initial configuration loading.
49+
logger := log.Log.WithName("saturation-detector-config")
50+
51+
cfg := &Config{}
52+
53+
cfg.QueueDepthThreshold = envutil.GetEnvInt(EnvSdQueueDepthThreshold, DefaultQueueDepthThreshold, logger)
54+
cfg.KVCacheUtilThreshold = envutil.GetEnvFloat(EnvSdKVCacheUtilThreshold, DefaultKVCacheUtilThreshold, logger)
55+
cfg.MetricsStalenessThreshold = envutil.GetEnvDuration(EnvSdMetricsStalenessThreshold, DefaultMetricsStalenessThreshold, logger)
56+
57+
// NewDetector validates the config and assigns defaults.
58+
logger.Info("SaturationDetector configuration loaded from env",
59+
"config", fmt.Sprintf("%+v", cfg))
60+
return cfg
61+
}
Lines changed: 175 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,175 @@
1+
/*
2+
Copyright 2025 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
// Package saturationdetector implements a mechanism to determine if the
18+
// backend model servers are considered saturated based on observed metrics.
19+
//
20+
// The current implementation provides a global saturation signal (IsSaturated)
21+
// primarily based on backend queue depths and KV cache utilization, reflecting
22+
// the saturation signals previously used by the Scheduler before the
23+
// introduction of the FlowController. It fetches live metrics from the
24+
// provided Datastore.
25+
//
26+
// TODO: Explore more advanced saturation signals in the future, such as:
27+
// - Latency-objective-based saturation.
28+
// - Predictive saturation based on trends.
29+
// - Hysteresis bands or other smoothing techniques to prevent rapid
30+
// oscillations of the saturation signal.
31+
package saturationdetector
32+
33+
import (
34+
"errors"
35+
"time"
36+
37+
"github.com/go-logr/logr"
38+
backendmetrics "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend/metrics"
39+
logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/logging"
40+
)
41+
42+
// clock allows mocking time in tests.
43+
type clock interface {
44+
now() time.Time
45+
}
46+
47+
// realClock provides the real time.
48+
type realClock struct{}
49+
50+
func (c realClock) now() time.Time { return time.Now() }
51+
52+
// ErrNilDatastore indicates NewSaturationDetector was called with a nil
53+
// datastore.
54+
var ErrNilDatastore = errors.New("datastore cannot be nil")
55+
56+
// Config holds the configuration for the SaturationDetector.
57+
type Config struct {
58+
// QueueDepthThreshold defines the backend waiting queue size above which a
59+
// pod is considered to have insufficient capacity for new requests.
60+
QueueDepthThreshold int
61+
// KVCacheUtilThreshold defines the KV cache utilization (0.0 to 1.0) above
62+
// which a pod is considered to have insufficient capacity.
63+
KVCacheUtilThreshold float64
64+
// MetricsStalenessThreshold defines how old a pod's metrics can be.
65+
// If a pod's metrics are older than this, it might be excluded from
66+
// "good capacity" considerations or treated as having no capacity for
67+
// safety.
68+
MetricsStalenessThreshold time.Duration
69+
}
70+
71+
// Datastore provides an interface to access backend pod metrics.
72+
type Datastore interface {
73+
PodGetAll() []backendmetrics.PodMetrics
74+
}
75+
76+
// Detector determines system saturation based on metrics from the Datastore.
77+
type Detector struct {
78+
datastore Datastore
79+
config Config
80+
logger logr.Logger
81+
clock clock
82+
}
83+
84+
// NewDetector creates a new SaturationDetector.
85+
// The datastore is expected to provide access to live/recently-updated pod
86+
// metrics.
87+
// The config provides the thresholds for determining saturation.
88+
func NewDetector(config Config, datastore Datastore, logger logr.Logger) (*Detector, error) {
89+
if datastore == nil {
90+
return nil, ErrNilDatastore
91+
}
92+
if config.MetricsStalenessThreshold <= 0 {
93+
config.MetricsStalenessThreshold = DefaultMetricsStalenessThreshold
94+
}
95+
logger.V(logutil.DEFAULT).Info("Creating new SaturationDetector",
96+
"queueDepthThreshold", config.QueueDepthThreshold,
97+
"kvCacheUtilThreshold", config.KVCacheUtilThreshold,
98+
"metricsStalenessThreshold", config.MetricsStalenessThreshold.String())
99+
return &Detector{
100+
datastore: datastore,
101+
config: config,
102+
logger: logger.WithName("SaturationDetector"),
103+
clock: realClock{},
104+
}, nil
105+
}
106+
107+
// IsSaturated checks if the system is currently considered saturated.
108+
// The system is saturated if NO pod currently has "good capacity".
109+
// "Good capacity" means:
110+
// 1. Metrics are fresh (not stale).
111+
// 2. WaitingQueueSize <= QueueDepthThreshold.
112+
// 3. KVCacheUsagePercent <= KVCacheUtilThreshold.
113+
//
114+
// If no pods are found in the datastore, the system is considered saturated
115+
// (no capacity).
116+
func (d *Detector) IsSaturated() bool {
117+
allPodsMetrics := d.datastore.PodGetAll()
118+
if len(allPodsMetrics) == 0 {
119+
d.logger.V(logutil.VERBOSE).Info("No pods found in datastore; system is considered SATURATED (no capacity).")
120+
// If there are no pods, there is no capacity to serve requests.
121+
// Treat this as a saturated state to enable FlowController queuing.
122+
return true
123+
}
124+
125+
now := d.clock.now()
126+
foundPodWithGoodCapacity := false
127+
for _, podMetric := range allPodsMetrics {
128+
metrics := podMetric.GetMetrics()
129+
podNn := "unknown-pod"
130+
if podMetric.GetPod() != nil {
131+
podNn = podMetric.GetPod().NamespacedName.String()
132+
}
133+
134+
if metrics == nil {
135+
d.logger.V(logutil.VERBOSE).Info("Pod has nil metrics, skipping for saturation check",
136+
"pod", podNn)
137+
continue
138+
}
139+
140+
// 1. Check for metric staleness
141+
if now.Sub(metrics.UpdateTime) > d.config.MetricsStalenessThreshold {
142+
d.logger.V(logutil.VERBOSE).Info("Pod metrics are stale, considered as not having good capacity",
143+
"pod", podNn,
144+
"updateTime", metrics.UpdateTime,
145+
"stalenessThreshold", d.config.MetricsStalenessThreshold)
146+
continue
147+
}
148+
149+
// 2. Check queue depth
150+
isQueueGood := metrics.WaitingQueueSize <= d.config.QueueDepthThreshold
151+
152+
// 3. Check KV cache utilization
153+
isKVCacheGood := metrics.KVCacheUsagePercent <= d.config.KVCacheUtilThreshold
154+
155+
if isQueueGood && isKVCacheGood {
156+
d.logger.V(logutil.VERBOSE).Info("Found pod with good capacity",
157+
"pod", podNn,
158+
"waitingQueue", metrics.WaitingQueueSize,
159+
"queueThreshold", d.config.QueueDepthThreshold,
160+
"kvCacheUtil", metrics.KVCacheUsagePercent,
161+
"kvCacheThreshold", d.config.KVCacheUtilThreshold)
162+
foundPodWithGoodCapacity = true
163+
// Found at least one pod with good capacity, so system is NOT saturated.
164+
break
165+
}
166+
}
167+
168+
if !foundPodWithGoodCapacity {
169+
d.logger.V(logutil.VERBOSE).Info("No pods found with good capacity; system is considered SATURATED.")
170+
return true
171+
}
172+
173+
d.logger.V(logutil.VERBOSE).Info("System is considered NOT saturated (at least one pod has good capacity).")
174+
return false
175+
}

0 commit comments

Comments
 (0)