Skip to content

[Metrics] Add average kv cache and waiting queue size metrics for inference pool #304

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Feb 10, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 37 additions & 1 deletion pkg/ext-proc/backend/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"time"

"go.uber.org/multierr"
"inference.networking.x-k8s.io/gateway-api-inference-extension/pkg/ext-proc/metrics"
logutil "inference.networking.x-k8s.io/gateway-api-inference-extension/pkg/ext-proc/util/logging"
klog "k8s.io/klog/v2"
)
Expand Down Expand Up @@ -58,7 +59,7 @@ func (p *Provider) GetPodMetrics(pod Pod) (*PodMetrics, bool) {
return nil, false
}

func (p *Provider) Init(refreshPodsInterval, refreshMetricsInterval time.Duration) error {
func (p *Provider) Init(refreshPodsInterval, refreshMetricsInterval, refreshPrometheusMetricsInterval time.Duration) error {
p.refreshPodsOnce()

if err := p.refreshMetricsOnce(); err != nil {
Expand All @@ -85,6 +86,14 @@ func (p *Provider) Init(refreshPodsInterval, refreshMetricsInterval time.Duratio
}
}()

// Periodically flush prometheus metrics for inference pool
go func() {
for {
time.Sleep(refreshPrometheusMetricsInterval)
p.flushPrometheusMetricsOnce()
}
}()

// Periodically print out the pods and metrics for DEBUGGING.
if klog.V(logutil.DEBUG).Enabled() {
go func() {
Expand Down Expand Up @@ -174,3 +183,30 @@ func (p *Provider) refreshMetricsOnce() error {
}
return errs
}

func (p *Provider) flushPrometheusMetricsOnce() {
klog.V(logutil.DEBUG).Infof("Flushing Prometheus Metrics")

pool, _ := p.datastore.getInferencePool()
if pool == nil {
// No inference pool or not initialize.
return
}

var kvCacheTotal float64
var queueTotal int

podMetrics := p.AllPodMetrics()
if len(podMetrics) == 0 {
return
}

for _, pod := range podMetrics {
kvCacheTotal += pod.KVCacheUsagePercent
queueTotal += pod.WaitingQueueSize
}

podTotalCount := len(podMetrics)
metrics.RecordInferencePoolAvgKVCache(pool.Name, kvCacheTotal/float64(podTotalCount))
metrics.RecordInferencePoolAvgQueueSize(pool.Name, float64(queueTotal/podTotalCount))
}
2 changes: 1 addition & 1 deletion pkg/ext-proc/backend/provider_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ func TestProvider(t *testing.T) {
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
p := NewProvider(test.pmc, test.datastore)
err := p.Init(time.Millisecond, time.Millisecond)
err := p.Init(time.Millisecond, time.Millisecond, time.Millisecond)
if test.initErr != (err != nil) {
t.Fatalf("Unexpected error, got: %v, want: %v", err, test.initErr)
}
Expand Down
27 changes: 16 additions & 11 deletions pkg/ext-proc/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,10 @@ var (
"refreshMetricsInterval",
runserver.DefaultRefreshMetricsInterval,
"interval to refresh metrics")
refreshPrometheusMetricsInterval = flag.Duration(
"refreshPrometheusMetricsInterval",
runserver.DefaultRefreshPrometheusMetricsInterval,
"interval to flush prometheus metrics")

scheme = runtime.NewScheme()
)
Expand Down Expand Up @@ -102,17 +106,18 @@ func main() {
datastore := backend.NewK8sDataStore()

serverRunner := &runserver.ExtProcServerRunner{
GrpcPort: *grpcPort,
TargetEndpointKey: *targetEndpointKey,
PoolName: *poolName,
PoolNamespace: *poolNamespace,
ServiceName: *serviceName,
Zone: *zone,
RefreshPodsInterval: *refreshPodsInterval,
RefreshMetricsInterval: *refreshMetricsInterval,
Scheme: scheme,
Config: ctrl.GetConfigOrDie(),
Datastore: datastore,
GrpcPort: *grpcPort,
TargetEndpointKey: *targetEndpointKey,
PoolName: *poolName,
PoolNamespace: *poolNamespace,
ServiceName: *serviceName,
Zone: *zone,
RefreshPodsInterval: *refreshPodsInterval,
RefreshMetricsInterval: *refreshMetricsInterval,
RefreshPrometheusMetricsInterval: *refreshPrometheusMetricsInterval,
Scheme: scheme,
Config: ctrl.GetConfigOrDie(),
Datastore: datastore,
}
serverRunner.Setup()

Expand Down
2 changes: 2 additions & 0 deletions pkg/ext-proc/metrics/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ spec:
| inference_model_response_sizes | Distribution | Distribution of response size in bytes. | `model_name`=&lt;model-name&gt; <br> `target_model_name`=&lt;target-model-name&gt; | ALPHA |
| inference_model_input_tokens | Distribution | Distribution of input token count. | `model_name`=&lt;model-name&gt; <br> `target_model_name`=&lt;target-model-name&gt; | ALPHA |
| inference_model_output_tokens | Distribution | Distribution of output token count. | `model_name`=&lt;model-name&gt; <br> `target_model_name`=&lt;target-model-name&gt; | ALPHA |
| inference_pool_average_kv_cache_utilization | Gauge | The average kv cache utilization for an inference server pool. | `name`=&lt;inference-pool-name&gt; | ALPHA |
| inference_pool_average_queue_size | Gauge | The average number of requests pending in the model server queue. | `name`=&lt;inference-pool-name&gt; | ALPHA |

## Scrape Metrics

Expand Down
34 changes: 34 additions & 0 deletions pkg/ext-proc/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,11 @@ import (

const (
InferenceModelComponent = "inference_model"
InferencePoolComponent = "inference_pool"
)

var (
// Inference Model Metrics
requestCounter = compbasemetrics.NewCounterVec(
&compbasemetrics.CounterOpts{
Subsystem: InferenceModelComponent,
Expand Down Expand Up @@ -88,6 +90,27 @@ var (
},
[]string{"model_name", "target_model_name"},
)

// Inference Pool Metrics
inferencePoolAvgKVCache = compbasemetrics.NewGaugeVec(
&compbasemetrics.GaugeOpts{
Subsystem: InferencePoolComponent,
Name: "average_kv_cache_utilization",
Help: "The average kv cache utilization for an inference server pool.",
StabilityLevel: compbasemetrics.ALPHA,
},
[]string{"name"},
)

inferencePoolAvgQueueSize = compbasemetrics.NewGaugeVec(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@raywainman we are reporting average queue length across model servers, what alternatives would you suggest to use this for HPA? Can HPA consume a distribution and do the aggregation on its end and so the user have more flexibility on how to aggregate?

/cc @smarterclayton

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

HPA can't consume a distribution directly today unless we put a Prometheus adapter infront of the metric and convert it to a direct gauge metric (which is doable). For example you could do something like "Get 90%ile queue size over the last 5 minutes" this way. Do we anticipate that being useful?

If so we could maybe emit both?

One simple gauge metric emitting the instantaneous average queue size across all model servers and another metric with a distribution.

@JeffLuoo what do you think?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In our benchmarking, we scrape gauge metrics for cache utilization and queue size. Let's discuss whether distribution for queue size is more helpful or other metrics from model servers are more helpful.

Inference pool metrics are calculated from metrics from model servers (vLLM in current implementation) directly.

Copy link
Contributor Author

@JeffLuoo JeffLuoo Feb 10, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's target to have new metrics added in a follow-up CL (e.g. percentiles) to unblock this CL.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That sounds great, made #306 to track

&compbasemetrics.GaugeOpts{
Subsystem: InferencePoolComponent,
Name: "average_queue_size",
Help: "The average number of requests pending in the model server queue.",
StabilityLevel: compbasemetrics.ALPHA,
},
[]string{"name"},
)
)

var registerMetrics sync.Once
Expand All @@ -101,6 +124,9 @@ func Register() {
legacyregistry.MustRegister(responseSizes)
legacyregistry.MustRegister(inputTokens)
legacyregistry.MustRegister(outputTokens)

legacyregistry.MustRegister(inferencePoolAvgKVCache)
legacyregistry.MustRegister(inferencePoolAvgQueueSize)
})
}

Expand Down Expand Up @@ -143,3 +169,11 @@ func RecordOutputTokens(modelName, targetModelName string, size int) {
outputTokens.WithLabelValues(modelName, targetModelName).Observe(float64(size))
}
}

func RecordInferencePoolAvgKVCache(name string, utilization float64) {
inferencePoolAvgKVCache.WithLabelValues(name).Set(utilization)
}

func RecordInferencePoolAvgQueueSize(name string, queueSize float64) {
inferencePoolAvgQueueSize.WithLabelValues(name).Set(queueSize)
}
52 changes: 52 additions & 0 deletions pkg/ext-proc/metrics/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ const RequestSizesMetric = InferenceModelComponent + "_request_sizes"
const ResponseSizesMetric = InferenceModelComponent + "_response_sizes"
const InputTokensMetric = InferenceModelComponent + "_input_tokens"
const OutputTokensMetric = InferenceModelComponent + "_output_tokens"
const KVCacheAvgUsageMetric = InferencePoolComponent + "_average_kv_cache_utilization"
const QueueAvgSizeMetric = InferencePoolComponent + "_average_queue_size"

func TestRecordRequestCounterandSizes(t *testing.T) {
type requests struct {
Expand Down Expand Up @@ -257,3 +259,53 @@ func TestRecordResponseMetrics(t *testing.T) {
})
}
}

func TestInferencePoolMetrics(t *testing.T) {
scenarios := []struct {
name string
poolName string
kvCacheAvg float64
queueSizeAvg float64
}{
{
name: "basic test",
poolName: "p1",
kvCacheAvg: 0.3,
queueSizeAvg: 0.4,
},
}
Register()
for _, scenario := range scenarios {
t.Run(scenario.name, func(t *testing.T) {

RecordInferencePoolAvgKVCache(scenario.poolName, scenario.kvCacheAvg)
RecordInferencePoolAvgQueueSize(scenario.poolName, scenario.queueSizeAvg)

wantKVCache, err := os.Open("testdata/kv_cache_avg_metrics")
defer func() {
if err := wantKVCache.Close(); err != nil {
t.Error(err)
}
}()
if err != nil {
t.Fatal(err)
}
if err := testutil.GatherAndCompare(legacyregistry.DefaultGatherer, wantKVCache, KVCacheAvgUsageMetric); err != nil {
t.Error(err)
}

wantQueueSize, err := os.Open("testdata/queue_avg_size_metrics")
defer func() {
if err := wantQueueSize.Close(); err != nil {
t.Error(err)
}
}()
if err != nil {
t.Fatal(err)
}
if err := testutil.GatherAndCompare(legacyregistry.DefaultGatherer, wantQueueSize, QueueAvgSizeMetric); err != nil {
t.Error(err)
}
})
}
}
3 changes: 3 additions & 0 deletions pkg/ext-proc/metrics/testdata/kv_cache_avg_metrics
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# HELP inference_pool_average_kv_cache_utilization [ALPHA] The average kv cache utilization for an inference server pool.
# TYPE inference_pool_average_kv_cache_utilization gauge
inference_pool_average_kv_cache_utilization{name="p1"} 0.3
3 changes: 3 additions & 0 deletions pkg/ext-proc/metrics/testdata/queue_avg_size_metrics
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# HELP inference_pool_average_queue_size [ALPHA] The average number of requests pending in the model server queue.
# TYPE inference_pool_average_queue_size gauge
inference_pool_average_queue_size{name="p1"} 0.4
61 changes: 32 additions & 29 deletions pkg/ext-proc/server/runserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,42 +19,45 @@ import (

// ExtProcServerRunner provides methods to manage an external process server.
type ExtProcServerRunner struct {
GrpcPort int
TargetEndpointKey string
PoolName string
PoolNamespace string
ServiceName string
Zone string
RefreshPodsInterval time.Duration
RefreshMetricsInterval time.Duration
Scheme *runtime.Scheme
Config *rest.Config
Datastore *backend.K8sDatastore
manager ctrl.Manager
GrpcPort int
TargetEndpointKey string
PoolName string
PoolNamespace string
ServiceName string
Zone string
RefreshPodsInterval time.Duration
RefreshMetricsInterval time.Duration
RefreshPrometheusMetricsInterval time.Duration
Scheme *runtime.Scheme
Config *rest.Config
Datastore *backend.K8sDatastore
manager ctrl.Manager
}

// Default values for CLI flags in main
const (
DefaultGrpcPort = 9002 // default for --grpcPort
DefaultTargetEndpointKey = "x-gateway-destination-endpoint" // default for --targetEndpointKey
DefaultPoolName = "" // required but no default
DefaultPoolNamespace = "default" // default for --poolNamespace
DefaultServiceName = "" // required but no default
DefaultZone = "" // default for --zone
DefaultRefreshPodsInterval = 10 * time.Second // default for --refreshPodsInterval
DefaultRefreshMetricsInterval = 50 * time.Millisecond // default for --refreshMetricsInterval
DefaultGrpcPort = 9002 // default for --grpcPort
DefaultTargetEndpointKey = "x-gateway-destination-endpoint" // default for --targetEndpointKey
DefaultPoolName = "" // required but no default
DefaultPoolNamespace = "default" // default for --poolNamespace
DefaultServiceName = "" // required but no default
DefaultZone = "" // default for --zone
DefaultRefreshPodsInterval = 10 * time.Second // default for --refreshPodsInterval
DefaultRefreshMetricsInterval = 50 * time.Millisecond // default for --refreshMetricsInterval
DefaultRefreshPrometheusMetricsInterval = 5 * time.Second // default for --refreshPrometheusMetricsInterval
)

func NewDefaultExtProcServerRunner() *ExtProcServerRunner {
return &ExtProcServerRunner{
GrpcPort: DefaultGrpcPort,
TargetEndpointKey: DefaultTargetEndpointKey,
PoolName: DefaultPoolName,
PoolNamespace: DefaultPoolNamespace,
ServiceName: DefaultServiceName,
Zone: DefaultZone,
RefreshPodsInterval: DefaultRefreshPodsInterval,
RefreshMetricsInterval: DefaultRefreshMetricsInterval,
GrpcPort: DefaultGrpcPort,
TargetEndpointKey: DefaultTargetEndpointKey,
PoolName: DefaultPoolName,
PoolNamespace: DefaultPoolNamespace,
ServiceName: DefaultServiceName,
Zone: DefaultZone,
RefreshPodsInterval: DefaultRefreshPodsInterval,
RefreshMetricsInterval: DefaultRefreshMetricsInterval,
RefreshPrometheusMetricsInterval: DefaultRefreshPrometheusMetricsInterval,
// Scheme, Config, and Datastore can be assigned later.
}
}
Expand Down Expand Up @@ -123,7 +126,7 @@ func (r *ExtProcServerRunner) Start(

// Initialize backend provider
pp := backend.NewProvider(podMetricsClient, podDatastore)
if err := pp.Init(r.RefreshPodsInterval, r.RefreshMetricsInterval); err != nil {
if err := pp.Init(r.RefreshPodsInterval, r.RefreshMetricsInterval, r.RefreshPrometheusMetricsInterval); err != nil {
klog.Fatalf("Failed to initialize backend provider: %v", err)
}

Expand Down
13 changes: 7 additions & 6 deletions pkg/ext-proc/test/benchmark/benchmark.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,12 @@ var (
svrAddr = flag.String("server_address", fmt.Sprintf("localhost:%d", runserver.DefaultGrpcPort), "Address of the ext proc server")
totalRequests = flag.Int("total_requests", 100000, "number of requests to be sent for load test")
// Flags when running a local ext proc server.
numFakePods = flag.Int("num_fake_pods", 200, "number of fake pods when running a local ext proc server")
numModelsPerPod = flag.Int("num_models_per_pod", 5, "number of fake models per pod when running a local ext proc server")
localServer = flag.Bool("local_server", true, "whether to start a local ext proc server")
refreshPodsInterval = flag.Duration("refreshPodsInterval", 10*time.Second, "interval to refresh pods")
refreshMetricsInterval = flag.Duration("refreshMetricsInterval", 50*time.Millisecond, "interval to refresh metrics")
numFakePods = flag.Int("num_fake_pods", 200, "number of fake pods when running a local ext proc server")
numModelsPerPod = flag.Int("num_models_per_pod", 5, "number of fake models per pod when running a local ext proc server")
localServer = flag.Bool("local_server", true, "whether to start a local ext proc server")
refreshPodsInterval = flag.Duration("refreshPodsInterval", 10*time.Second, "interval to refresh pods")
refreshMetricsInterval = flag.Duration("refreshMetricsInterval", 50*time.Millisecond, "interval to refresh metrics via polling pods")
refreshPrometheusMetricsInterval = flag.Duration("refreshPrometheusMetricsInterval", 5*time.Second, "interval to flush prometheus metrics")
)

const (
Expand All @@ -37,7 +38,7 @@ func main() {
flag.Parse()

if *localServer {
test.StartExtProc(port, *refreshPodsInterval, *refreshMetricsInterval, fakePods(), fakeModels())
test.StartExtProc(port, *refreshPodsInterval, *refreshMetricsInterval, *refreshPrometheusMetricsInterval, fakePods(), fakeModels())
time.Sleep(time.Second) // wait until server is up
klog.Info("Server started")
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/ext-proc/test/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import (
klog "k8s.io/klog/v2"
)

func StartExtProc(port int, refreshPodsInterval, refreshMetricsInterval time.Duration, pods []*backend.PodMetrics, models map[string]*v1alpha1.InferenceModel) *grpc.Server {
func StartExtProc(port int, refreshPodsInterval, refreshMetricsInterval, refreshPrometheusMetricsInterval time.Duration, pods []*backend.PodMetrics, models map[string]*v1alpha1.InferenceModel) *grpc.Server {
ps := make(backend.PodSet)
pms := make(map[backend.Pod]*backend.PodMetrics)
for _, pod := range pods {
Expand All @@ -25,7 +25,7 @@ func StartExtProc(port int, refreshPodsInterval, refreshMetricsInterval time.Dur
}
pmc := &backend.FakePodMetricsClient{Res: pms}
pp := backend.NewProvider(pmc, backend.NewK8sDataStore(backend.WithPods(pods)))
if err := pp.Init(refreshPodsInterval, refreshMetricsInterval); err != nil {
if err := pp.Init(refreshPodsInterval, refreshMetricsInterval, refreshPrometheusMetricsInterval); err != nil {
klog.Fatalf("failed to initialize: %v", err)
}
return startExtProc(port, pp, models)
Expand Down