Skip to content

Commit b52f777

Browse files
committed
Improve the filter to return multiple preferred pods instead of one; also fix metric update bug
Returning multiple preferred pods improves the probability to match the next preferrence filter. Also updated log levels: - Level 1, 2: For non-spammy logs like initialization - Level 3+: For per request debug logs - Level 4+: For per loop logs like periodically scrape metrics
1 parent 6f9869d commit b52f777

File tree

7 files changed

+73
-41
lines changed

7 files changed

+73
-41
lines changed

pkg/ext-proc/backend/metrics.go

+7-5
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ func (p *Provider) refreshMetricsOnce() error {
3535
var wg sync.WaitGroup
3636
var errs error
3737
processOnePod := func(key, value any) bool {
38+
klog.V(3).Infof("Processing pod %v and metric %v", key, value)
3839
pod := key.(Pod)
3940
metrics := value.(*PodMetrics)
4041
wg.Add(1)
@@ -67,17 +68,17 @@ func promToPodMetrics(metricFamilies map[string]*dto.MetricFamily, existing *Pod
6768
updated := existing.Clone()
6869
runningQueueSize, _, err := getLatestMetric(metricFamilies, RunningQueueSizeMetricName)
6970
multierr.Append(errs, err)
70-
if err != nil {
71-
updated.RunningQueueSize = int(runningQueueSize.GetCounter().GetValue())
71+
if err == nil {
72+
updated.RunningQueueSize = int(runningQueueSize.GetGauge().GetValue())
7273
}
7374
waitingQueueSize, _, err := getLatestMetric(metricFamilies, WaitingQueueSizeMetricName)
7475
multierr.Append(errs, err)
75-
if err != nil {
76+
if err == nil {
7677
updated.WaitingQueueSize = int(waitingQueueSize.GetGauge().GetValue())
7778
}
7879
cachePercent, _, err := getLatestMetric(metricFamilies, KVCacheUsagePercentMetricName)
7980
multierr.Append(errs, err)
80-
if err != nil {
81+
if err == nil {
8182
updated.KVCacheUsagePercent = cachePercent.GetGauge().GetValue()
8283
}
8384
/* TODO: uncomment once this is available in vllm.
@@ -126,10 +127,11 @@ func getLatestMetric(metricFamilies map[string]*dto.MetricFamily, metricName str
126127
var latestTs int64
127128
var latest *dto.Metric
128129
for _, m := range mf.GetMetric() {
129-
if m.GetTimestampMs() > latestTs {
130+
if m.GetTimestampMs() >= latestTs {
130131
latestTs = m.GetTimestampMs()
131132
latest = m
132133
}
133134
}
135+
klog.V(3).Infof("Got metric value %+v for metric %v", latest, metricName)
134136
return latest, time.Unix(0, latestTs*1000), nil
135137
}

pkg/ext-proc/backend/provider.go

+13-3
Original file line numberDiff line numberDiff line change
@@ -64,14 +64,14 @@ func (p *Provider) Init(refreshPodsInterval, refreshMetricsInterval time.Duratio
6464
return fmt.Errorf("failed to init metrics: %v", err)
6565
}
6666

67-
klog.V(2).Infof("Initialized pods and metrics: %+v", p.AllPodMetrics())
67+
klog.Infof("Initialized pods and metrics: %+v", p.AllPodMetrics())
6868

6969
// periodically refresh pods
7070
go func() {
7171
for {
7272
time.Sleep(refreshPodsInterval)
7373
if err := p.refreshPodsOnce(); err != nil {
74-
klog.V(1).Infof("Failed to refresh podslist pods: %v", err)
74+
klog.V(4).Infof("Failed to refresh podslist pods: %v", err)
7575
}
7676
}
7777
}()
@@ -81,11 +81,21 @@ func (p *Provider) Init(refreshPodsInterval, refreshMetricsInterval time.Duratio
8181
for {
8282
time.Sleep(refreshMetricsInterval)
8383
if err := p.refreshMetricsOnce(); err != nil {
84-
klog.V(1).Infof("Failed to refresh metrics: %v", err)
84+
klog.V(4).Infof("Failed to refresh metrics: %v", err)
8585
}
8686
}
8787
}()
8888

89+
// Periodically print out the pods and metrics for DEBUGGING.
90+
if klog.V(2).Enabled() {
91+
go func() {
92+
for {
93+
time.Sleep(5 * time.Second)
94+
klog.Infof("===DEBUG: Current Pods and metrics: %+v", p.AllPodMetrics())
95+
}
96+
}()
97+
}
98+
8999
return nil
90100
}
91101

pkg/ext-proc/handlers/request.go

+8-8
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ import (
1515
// parameter.
1616
// Envoy sends the request body to ext proc before sending the request to the backend server.
1717
func (s *Server) HandleRequestBody(reqCtx *RequestContext, req *extProcPb.ProcessingRequest) (*extProcPb.ProcessingResponse, error) {
18-
klog.V(2).Infof("Handling request body")
18+
klog.V(3).Infof("Handling request body")
1919

2020
// Unmarshal request body (must be JSON).
2121
v := req.Request.(*extProcPb.ProcessingRequest_RequestBody)
@@ -24,14 +24,14 @@ func (s *Server) HandleRequestBody(reqCtx *RequestContext, req *extProcPb.Proces
2424
klog.Errorf("Error unmarshaling request body: %v", err)
2525
return nil, fmt.Errorf("error unmarshaling request body: %v", err)
2626
}
27-
klog.V(2).Infof("Request body: %v", rb)
27+
klog.V(3).Infof("Request body: %v", rb)
2828

2929
// Resolve target models.
3030
model, ok := rb["model"].(string)
3131
if !ok {
3232
return nil, fmt.Errorf("model not found in request")
3333
}
34-
klog.V(2).Infof("Model requested: %v", model)
34+
klog.V(3).Infof("Model requested: %v", model)
3535
llmReq := &scheduling.LLMRequest{
3636
Model: model,
3737
// For now use the model as the target model.
@@ -47,13 +47,13 @@ func (s *Server) HandleRequestBody(reqCtx *RequestContext, req *extProcPb.Proces
4747
klog.Errorf("Error marshaling request body: %v", err)
4848
return nil, fmt.Errorf("error marshaling request body: %v", err)
4949
}
50-
klog.V(2).Infof("Updated body: %v", updatedBody)
50+
klog.V(3).Infof("Updated body: %v", updatedBody)
5151

5252
targetPod, err := s.scheduler.Schedule(llmReq)
5353
if err != nil {
5454
return nil, fmt.Errorf("failed to find target pod: %v", err)
5555
}
56-
klog.V(2).Infof("Selected target model %v in target pod: %v\n", llmReq.ResolvedTargetModel, targetPod)
56+
klog.V(3).Infof("Selected target model %v in target pod: %v\n", llmReq.ResolvedTargetModel, targetPod)
5757

5858
reqCtx.Model = llmReq.Model
5959
reqCtx.TargetPod = targetPod
@@ -69,7 +69,7 @@ func (s *Server) HandleRequestBody(reqCtx *RequestContext, req *extProcPb.Proces
6969
}
7070
// Print headers for debugging
7171
for _, header := range headers {
72-
klog.V(2).Infof("[request_body] Header Key: %s, Header Value: %s\n", header.Header.Key, header.Header.RawValue)
72+
klog.V(3).Infof("[request_body] Header Key: %s, Header Value: %s\n", header.Header.Key, header.Header.RawValue)
7373
}
7474

7575
resp := &extProcPb.ProcessingResponse{
@@ -93,10 +93,10 @@ func (s *Server) HandleRequestBody(reqCtx *RequestContext, req *extProcPb.Proces
9393
}
9494

9595
func HandleRequestHeaders(reqCtx *RequestContext, req *extProcPb.ProcessingRequest) *extProcPb.ProcessingResponse {
96-
klog.V(2).Info("--- In RequestHeaders processing ...")
96+
klog.V(3).Info("--- In RequestHeaders processing ...")
9797
r := req.Request
9898
h := r.(*extProcPb.ProcessingRequest_RequestHeaders)
99-
klog.V(2).Infof("Headers: %+v\n", h)
99+
klog.V(3).Infof("Headers: %+v\n", h)
100100

101101
resp := &extProcPb.ProcessingResponse{
102102
Response: &extProcPb.ProcessingResponse_RequestHeaders{

pkg/ext-proc/handlers/response.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,9 @@ import (
88

99
// HandleResponseHeaders processes response headers from the backend model server.
1010
func (s *Server) HandleResponseHeaders(reqCtx *RequestContext, req *extProcPb.ProcessingRequest) (*extProcPb.ProcessingResponse, error) {
11-
klog.V(2).Info("Processing ResponseHeaders")
11+
klog.V(3).Info("Processing ResponseHeaders")
1212
h := req.Request.(*extProcPb.ProcessingRequest_ResponseHeaders)
13-
klog.V(2).Infof("Headers before: %+v\n", h)
13+
klog.V(3).Infof("Headers before: %+v\n", h)
1414

1515
resp := &extProcPb.ProcessingResponse{
1616
Response: &extProcPb.ProcessingResponse_ResponseHeaders{

pkg/ext-proc/handlers/server.go

+10-7
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ type PodProvider interface {
4141
}
4242

4343
func (s *Server) Process(srv extProcPb.ExternalProcessor_ProcessServer) error {
44-
klog.V(2).Info("Processing")
44+
klog.V(3).Info("Processing")
4545
ctx := srv.Context()
4646
// Create request context to share states during life time of an HTTP request.
4747
// See https://github.com/envoyproxy/envoy/issues/17540.
@@ -59,22 +59,25 @@ func (s *Server) Process(srv extProcPb.ExternalProcessor_ProcessServer) error {
5959
return nil
6060
}
6161
if err != nil {
62+
// This error occurs very frequently, though it doesn't seem to have any impact.
63+
// TODO Figure out if we can clean up the
64+
klog.V(3).Infof("cannot receive stream request: %v", err)
6265
return status.Errorf(codes.Unknown, "cannot receive stream request: %v", err)
6366
}
6467

6568
resp := &extProcPb.ProcessingResponse{}
6669
switch v := req.Request.(type) {
6770
case *extProcPb.ProcessingRequest_RequestHeaders:
6871
resp = HandleRequestHeaders(reqCtx, req)
69-
klog.V(2).Infof("Request context after HandleRequestHeaders: %v", reqCtx)
72+
klog.V(3).Infof("Request context after HandleRequestHeaders: %v", reqCtx)
7073
case *extProcPb.ProcessingRequest_RequestBody:
7174
resp, err = s.HandleRequestBody(reqCtx, req)
72-
klog.V(2).Infof("Request context after HandleRequestBody: %v", reqCtx)
75+
klog.V(3).Infof("Request context after HandleRequestBody: %v", reqCtx)
7376
case *extProcPb.ProcessingRequest_ResponseHeaders:
7477
resp, err = s.HandleResponseHeaders(reqCtx, req)
75-
klog.V(2).Infof("Request context after HandleResponseHeaders: %v", reqCtx)
78+
klog.V(3).Infof("Request context after HandleResponseHeaders: %v", reqCtx)
7679
default:
77-
klog.Infof("Unknown Request type %+v", v)
80+
klog.Errorf("Unknown Request type %+v", v)
7881
return status.Error(codes.Unknown, "unknown request type")
7982
}
8083

@@ -83,9 +86,9 @@ func (s *Server) Process(srv extProcPb.ExternalProcessor_ProcessServer) error {
8386
return status.Errorf(codes.Unknown, "failed to handle request: %v", err)
8487
}
8588

86-
klog.V(2).Infof("response: %v", resp)
89+
klog.V(3).Infof("response: %v", resp)
8790
if err := srv.Send(resp); err != nil {
88-
klog.Infof("send error %v", err)
91+
klog.Errorf("send error %v", err)
8992
return status.Errorf(codes.Unknown, "failed to send response back to Envoy: %v", err)
9093
}
9194
}

pkg/ext-proc/scheduling/filter.go

+30-14
Original file line numberDiff line numberDiff line change
@@ -43,24 +43,24 @@ func (f *filter) Name() string {
4343

4444
func (f *filter) Filter(b *LLMRequest, pods []*backend.PodMetrics) ([]*backend.PodMetrics, error) {
4545
if f == nil {
46-
klog.V(2).Infof("Running nil filter, returning all input pods by default")
46+
klog.V(3).Infof("Running nil filter, returning all input pods by default")
4747
return pods, nil
4848
}
49-
klog.V(2).Infof("Running filter %q on request %v with %v pods", f.name, b, len(pods))
49+
klog.V(3).Infof("Running filter %q on request %v with %v pods", f.name, b, len(pods))
5050

5151
filtered, err := f.filter(b, pods)
5252

5353
next := f.nextOnSuccessOrFailure
5454
if err == nil {
55-
klog.V(2).Infof("onSuccess %v -> %v, filtered: %v", f.name, next.Name(), len(filtered))
55+
klog.V(3).Infof("onSuccess %v -> %v, filtered: %v", f.name, next.Name(), len(filtered))
5656
if f.nextOnSuccess != nil {
5757
next = f.nextOnSuccess
5858
}
5959
// On success, pass the filtered result to the next filter.
6060
return next.Filter(b, filtered)
6161
}
6262

63-
klog.V(2).Infof("onFailure %v -> %v", f.name, next.Name())
63+
klog.V(3).Infof("onFailure %v -> %v", f.name, next.Name())
6464
if f.nextOnFailure != nil {
6565
next = f.nextOnFailure
6666
}
@@ -88,32 +88,48 @@ func toFilterFunc(pp podPredicate) filterFunc {
8888
}
8989
}
9090

91+
// leastQueuingFilterFunc finds the max and min queue size of all pods, divides the whole range
92+
// (max-min) by the number of pods, and finds the pods that fall into the first range.
9193
func leastQueuingFilterFunc(b *LLMRequest, pods []*backend.PodMetrics) ([]*backend.PodMetrics, error) {
9294
min := math.MaxInt
95+
max := 0
9396
filtered := []*backend.PodMetrics{}
97+
9498
for _, pod := range pods {
95-
if pod.WaitingQueueSize < min {
99+
if pod.WaitingQueueSize <= min {
96100
min = pod.WaitingQueueSize
97-
filtered = []*backend.PodMetrics{}
98101
}
99-
if pod.WaitingQueueSize == min {
102+
if pod.WaitingQueueSize >= max {
103+
max = pod.WaitingQueueSize
104+
}
105+
}
106+
107+
for _, pod := range pods {
108+
if pod.WaitingQueueSize >= min && pod.WaitingQueueSize <= min+(max-min)/len(pods) {
100109
filtered = append(filtered, pod)
101110
}
102111
}
103112
return filtered, nil
104113
}
105114

115+
// leastKVCacheFilterFunc finds the max and min KV cache of all pods, divides the whole range
116+
// (max-min) by the number of pods, and finds the pods that fall into the first range.
106117
func leastKVCacheFilterFunc(b *LLMRequest, pods []*backend.PodMetrics) ([]*backend.PodMetrics, error) {
107-
min := math.MaxInt
118+
min := math.MaxFloat64
119+
max := math.SmallestNonzeroFloat64
108120
filtered := []*backend.PodMetrics{}
109-
margin := 5
121+
110122
for _, pod := range pods {
111-
cur := int(pod.KVCacheUsagePercent) / margin
112-
if cur < min {
113-
min = cur
114-
filtered = []*backend.PodMetrics{}
123+
if pod.KVCacheUsagePercent <= min {
124+
min = pod.KVCacheUsagePercent
125+
}
126+
if pod.KVCacheUsagePercent >= max {
127+
max = pod.KVCacheUsagePercent
115128
}
116-
if cur == min {
129+
}
130+
131+
for _, pod := range pods {
132+
if pod.KVCacheUsagePercent >= min && pod.KVCacheUsagePercent <= min+(max-min)/float64(len(pods)) {
117133
filtered = append(filtered, pod)
118134
}
119135
}

pkg/ext-proc/scheduling/scheduler.go

+3-2
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
package scheduling
33

44
import (
5+
"fmt"
56
"math/rand"
67

78
klog "k8s.io/klog/v2"
@@ -44,10 +45,10 @@ type PodMetricsProvider interface {
4445

4546
// Schedule finds the target pod based on metrics and the requested lora adapter.
4647
func (s *Scheduler) Schedule(b *LLMRequest) (targetPod *backend.Pod, err error) {
47-
klog.V(2).Infof("request: %v; metrics: %+v", b, s.podMetricsProvider.AllPodMetrics())
48+
klog.V(3).Infof("request: %v; metrics: %+v", b, s.podMetricsProvider.AllPodMetrics())
4849
pods, err := s.filter.Filter(b, s.podMetricsProvider.AllPodMetrics())
4950
if err != nil || len(pods) == 0 {
50-
klog.Errorf("Failed to apply filter, this should never happen: %v", err)
51+
return nil, fmt.Errorf("failed to apply filter, resulted %v pods, this should never happen: %v", len(pods), err)
5152
}
5253
i := rand.Intn(len(pods))
5354
return &pods[i].Pod, nil

0 commit comments

Comments
 (0)