diff --git a/pkg/ext-proc/backend/provider.go b/pkg/ext-proc/backend/provider.go index 20b0e196..fc67e945 100644 --- a/pkg/ext-proc/backend/provider.go +++ b/pkg/ext-proc/backend/provider.go @@ -113,7 +113,7 @@ func (p *Provider) refreshPodsOnce() error { new := &PodMetrics{ Pod: pod, Metrics: Metrics{ - CachedModels: make(map[string]int), + ActiveModels: make(map[string]int), }, } p.podMetrics.Store(pod, new) diff --git a/pkg/ext-proc/backend/types.go b/pkg/ext-proc/backend/types.go index 7d5af51a..c1e1113a 100644 --- a/pkg/ext-proc/backend/types.go +++ b/pkg/ext-proc/backend/types.go @@ -12,12 +12,14 @@ type Pod struct { } func (p Pod) String() string { - return p.Namespace + "." + p.Name + return p.Namespace + "/" + p.Name } type Metrics struct { - // CachedModels is a set of models(including LoRA adapters) that are currently cached to GPU. - CachedModels map[string]int + // ActiveModels is a set of models(including LoRA adapters) that are currently cached to GPU. + ActiveModels map[string]int + // MaxActiveModels is the maximum number of models that can be loaded to GPU. + MaxActiveModels int RunningQueueSize int WaitingQueueSize int KVCacheUsagePercent float64 @@ -34,14 +36,14 @@ func (pm *PodMetrics) String() string { } func (pm *PodMetrics) Clone() *PodMetrics { - cm := make(map[string]int, len(pm.CachedModels)) - for k, v := range pm.CachedModels { + cm := make(map[string]int, len(pm.ActiveModels)) + for k, v := range pm.ActiveModels { cm[k] = v } clone := &PodMetrics{ Pod: pm.Pod, Metrics: Metrics{ - CachedModels: cm, + ActiveModels: cm, RunningQueueSize: pm.RunningQueueSize, WaitingQueueSize: pm.WaitingQueueSize, KVCacheUsagePercent: pm.KVCacheUsagePercent, diff --git a/pkg/ext-proc/backend/vllm/metrics.go b/pkg/ext-proc/backend/vllm/metrics.go index 827589d7..c9e2e772 100644 --- a/pkg/ext-proc/backend/vllm/metrics.go +++ b/pkg/ext-proc/backend/vllm/metrics.go @@ -85,18 +85,21 @@ func promToPodMetrics(metricFamilies map[string]*dto.MetricFamily, existing *bac } */ + // TODO(https://github.com/kubernetes-sigs/llm-instance-gateway/issues/22): Read from vLLM metrics once the is available. + updated.MaxActiveModels = 4 + // Update active loras mf, ok := metricFamilies[ActiveLoRAAdaptersMetricName] if ok { // IMPORTANT: replace the map entries instead of appending to it. - updated.CachedModels = make(map[string]int) + updated.ActiveModels = make(map[string]int) for _, metric := range mf.GetMetric() { for _, label := range metric.GetLabel() { if label.GetName() == "active_adapters" { if label.GetValue() != "" { adapterList := strings.Split(label.GetValue(), ",") for _, adapter := range adapterList { - updated.CachedModels[adapter] = 0 + updated.ActiveModels[adapter] = 0 } } } diff --git a/pkg/ext-proc/go.mod b/pkg/ext-proc/go.mod index 7a1a1f88..cbb00816 100644 --- a/pkg/ext-proc/go.mod +++ b/pkg/ext-proc/go.mod @@ -5,6 +5,7 @@ go 1.21 require ( github.com/bojand/ghz v0.120.0 github.com/envoyproxy/go-control-plane v0.13.0 + github.com/google/go-cmp v0.6.0 github.com/jhump/protoreflect v1.15.1 github.com/prometheus/client_model v0.6.1 github.com/prometheus/common v0.55.0 diff --git a/pkg/ext-proc/handlers/request.go b/pkg/ext-proc/handlers/request.go index 150f5cf4..6e0243c9 100644 --- a/pkg/ext-proc/handlers/request.go +++ b/pkg/ext-proc/handlers/request.go @@ -38,6 +38,8 @@ func (s *Server) HandleRequestBody(reqCtx *RequestContext, req *extProcPb.Proces // TODO: Once the API is approved, read the "LLMUseCase" configuration and apply traffic split. TargetModels: map[string]int{model: 100}, ResolvedTargetModel: model, + // TODO: Read from LLMService CRD. + Critical: true, } // Update target models in the body. @@ -51,7 +53,7 @@ func (s *Server) HandleRequestBody(reqCtx *RequestContext, req *extProcPb.Proces targetPod, err := s.scheduler.Schedule(llmReq) if err != nil { - return nil, fmt.Errorf("failed to find target pod: %v", err) + return nil, fmt.Errorf("failed to find target pod: %w", err) } klog.V(3).Infof("Selected target model %v in target pod: %v\n", llmReq.ResolvedTargetModel, targetPod) diff --git a/pkg/ext-proc/handlers/server.go b/pkg/ext-proc/handlers/server.go index 82d9bb69..69fa2448 100644 --- a/pkg/ext-proc/handlers/server.go +++ b/pkg/ext-proc/handlers/server.go @@ -4,6 +4,7 @@ import ( "io" extProcPb "github.com/envoyproxy/go-control-plane/envoy/service/ext_proc/v3" + envoyTypePb "github.com/envoyproxy/go-control-plane/envoy/type/v3" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" klog "k8s.io/klog/v2" @@ -83,13 +84,28 @@ func (s *Server) Process(srv extProcPb.ExternalProcessor_ProcessServer) error { if err != nil { klog.Errorf("failed to process request: %v", err) - return status.Errorf(codes.Unknown, "failed to handle request: %v", err) + switch status.Code(err) { + // This code can be returned by scheduler when there is no capacity for sheddable + // requests. + case codes.ResourceExhausted: + resp = &extProcPb.ProcessingResponse{ + Response: &extProcPb.ProcessingResponse_ImmediateResponse{ + ImmediateResponse: &extProcPb.ImmediateResponse{ + Status: &envoyTypePb.HttpStatus{ + Code: envoyTypePb.StatusCode_TooManyRequests, + }, + }, + }, + } + default: + return status.Errorf(status.Code(err), "failed to handle request: %w", err) + } } klog.V(3).Infof("response: %v", resp) if err := srv.Send(resp); err != nil { klog.Errorf("send error %v", err) - return status.Errorf(codes.Unknown, "failed to send response back to Envoy: %v", err) + return status.Errorf(codes.Unknown, "failed to send response back to Envoy: %w", err) } } } diff --git a/pkg/ext-proc/scheduling/filter.go b/pkg/ext-proc/scheduling/filter.go index c8097a06..60ce2f71 100644 --- a/pkg/ext-proc/scheduling/filter.go +++ b/pkg/ext-proc/scheduling/filter.go @@ -11,7 +11,7 @@ import ( type Filter interface { Name() string - Filter(b *LLMRequest, pods []*backend.PodMetrics) ([]*backend.PodMetrics, error) + Filter(req *LLMRequest, pods []*backend.PodMetrics) ([]*backend.PodMetrics, error) } // filter applies current filterFunc, and then recursively applies next filters depending success or @@ -41,42 +41,46 @@ func (f *filter) Name() string { return f.name } -func (f *filter) Filter(b *LLMRequest, pods []*backend.PodMetrics) ([]*backend.PodMetrics, error) { - if f == nil { - klog.V(3).Infof("Running nil filter, returning all input pods by default") - return pods, nil - } - klog.V(3).Infof("Running filter %q on request %v with %v pods", f.name, b, len(pods)) +func (f *filter) Filter(req *LLMRequest, pods []*backend.PodMetrics) ([]*backend.PodMetrics, error) { + klog.V(3).Infof("Running filter %q on request %v with %v pods", f.name, req, len(pods)) - filtered, err := f.filter(b, pods) + filtered, err := f.filter(req, pods) next := f.nextOnSuccessOrFailure - if err == nil { - klog.V(3).Infof("onSuccess %v -> %v, filtered: %v", f.name, next.Name(), len(filtered)) + if err == nil && len(filtered) > 0 { + if f.nextOnSuccess == nil && f.nextOnSuccessOrFailure == nil { + // No succeeding filters to run, return. + return filtered, err + } if f.nextOnSuccess != nil { next = f.nextOnSuccess } + klog.V(3).Infof("onSuccess %q -> %q, filtered: %v", f.name, next.Name(), len(filtered)) // On success, pass the filtered result to the next filter. - return next.Filter(b, filtered) - } - - klog.V(3).Infof("onFailure %v -> %v", f.name, next.Name()) - if f.nextOnFailure != nil { - next = f.nextOnFailure + return next.Filter(req, filtered) + } else { + if f.nextOnFailure == nil && f.nextOnSuccessOrFailure == nil { + // No succeeding filters to run, return. + return filtered, err + } + if f.nextOnFailure != nil { + next = f.nextOnFailure + } + klog.V(3).Infof("onFailure %q -> %q", f.name, next.Name()) + // On failure, pass the initial set of pods to the next filter. + return next.Filter(req, pods) } - // On failure, pass the initial set of pods to the next filter. - return next.Filter(b, pods) } // filterFunc filters a set of input pods to a subset. -type filterFunc func(b *LLMRequest, pods []*backend.PodMetrics) ([]*backend.PodMetrics, error) +type filterFunc func(req *LLMRequest, pods []*backend.PodMetrics) ([]*backend.PodMetrics, error) // toFilterFunc is a helper function to convert a per pod filter func to the FilterFunc. func toFilterFunc(pp podPredicate) filterFunc { - return func(b *LLMRequest, pods []*backend.PodMetrics) ([]*backend.PodMetrics, error) { + return func(req *LLMRequest, pods []*backend.PodMetrics) ([]*backend.PodMetrics, error) { filtered := []*backend.PodMetrics{} for _, pod := range pods { - pass := pp(b, pod) + pass := pp(req, pod) if pass { filtered = append(filtered, pod) } @@ -95,7 +99,7 @@ func toFilterFunc(pp podPredicate) filterFunc { // the least one as it gives more choices for the next filter, which on aggregate gave better // results. // TODO: Compare this strategy with other strategies such as top K. -func leastQueuingFilterFunc(b *LLMRequest, pods []*backend.PodMetrics) ([]*backend.PodMetrics, error) { +func leastQueuingFilterFunc(req *LLMRequest, pods []*backend.PodMetrics) ([]*backend.PodMetrics, error) { min := math.MaxInt max := 0 filtered := []*backend.PodMetrics{} @@ -123,9 +127,9 @@ func leastQueuingFilterFunc(b *LLMRequest, pods []*backend.PodMetrics) ([]*backe // should consider them all instead of the absolute minimum one. This worked better than picking the // least one as it gives more choices for the next filter, which on aggregate gave better results. // TODO: Compare this strategy with other strategies such as top K. -func leastKVCacheFilterFunc(b *LLMRequest, pods []*backend.PodMetrics) ([]*backend.PodMetrics, error) { +func leastKVCacheFilterFunc(req *LLMRequest, pods []*backend.PodMetrics) ([]*backend.PodMetrics, error) { min := math.MaxFloat64 - max := math.SmallestNonzeroFloat64 + var max float64 = 0 filtered := []*backend.PodMetrics{} for _, pod := range pods { @@ -146,10 +150,21 @@ func leastKVCacheFilterFunc(b *LLMRequest, pods []*backend.PodMetrics) ([]*backe } // podPredicate is a filter function to check whether a pod is desired. -type podPredicate func(b *LLMRequest, pod *backend.PodMetrics) bool +type podPredicate func(req *LLMRequest, pod *backend.PodMetrics) bool + +// We consider serving an adapter low cost it the adapter is active in the model server, or the +// model server has room to load the adapter +func lowLoRACostPredicate(req *LLMRequest, pod *backend.PodMetrics) bool { + _, ok := pod.ActiveModels[req.ResolvedTargetModel] + return ok || len(pod.ActiveModels) < pod.MaxActiveModels +} -// loraAffinityPredicate return true if the pod have the requested LoRA adapter loaded. -func loraAffinityPredicate(b *LLMRequest, pod *backend.PodMetrics) bool { - _, ok := pod.CachedModels[b.ResolvedTargetModel] - return ok +func criticalRequestPredicate(req *LLMRequest, pod *backend.PodMetrics) bool { + return req.Critical +} + +func noQueueAndLessThanKVCacheThresholdPredicate(queueThreshold int, kvCacheThreshold float64) podPredicate { + return func(req *LLMRequest, pod *backend.PodMetrics) bool { + return pod.WaitingQueueSize <= queueThreshold && pod.KVCacheUsagePercent <= kvCacheThreshold + } } diff --git a/pkg/ext-proc/scheduling/filter_test.go b/pkg/ext-proc/scheduling/filter_test.go new file mode 100644 index 00000000..b71fea85 --- /dev/null +++ b/pkg/ext-proc/scheduling/filter_test.go @@ -0,0 +1,409 @@ +package scheduling + +import ( + "fmt" + "testing" + + "github.com/google/go-cmp/cmp" + + "ext-proc/backend" +) + +func TestFilter(t *testing.T) { + tests := []struct { + name string + req *LLMRequest + input []*backend.PodMetrics + output []*backend.PodMetrics + err bool + filter *filter + }{ + { + name: "simple filter without successor, failure", + filter: &filter{filter: func(req *LLMRequest, pods []*backend.PodMetrics) ([]*backend.PodMetrics, error) { + return nil, fmt.Errorf("filter error") + }}, + err: true, + }, + { + name: "default filter, critical request", + filter: defaultFilter, + req: &LLMRequest{ + Model: "critical", + ResolvedTargetModel: "critical", + Critical: true, + }, + // pod2 will be picked because it has relatively low queue size, with the requested + // model being active, and has low KV cache. + input: []*backend.PodMetrics{ + { + Pod: backend.Pod{Name: "pod1"}, + Metrics: backend.Metrics{ + WaitingQueueSize: 0, + KVCacheUsagePercent: 0.2, + MaxActiveModels: 2, + ActiveModels: map[string]int{ + "foo": 1, + "bar": 1, + }, + }, + }, + { + Pod: backend.Pod{Name: "pod2"}, + Metrics: backend.Metrics{ + WaitingQueueSize: 3, + KVCacheUsagePercent: 0.1, + MaxActiveModels: 2, + ActiveModels: map[string]int{ + "foo": 1, + "critical": 1, + }, + }, + }, + { + Pod: backend.Pod{Name: "pod3"}, + Metrics: backend.Metrics{ + WaitingQueueSize: 10, + KVCacheUsagePercent: 0.2, + MaxActiveModels: 2, + ActiveModels: map[string]int{ + "foo": 1, + }, + }, + }, + }, + output: []*backend.PodMetrics{ + { + Pod: backend.Pod{Name: "pod2"}, + Metrics: backend.Metrics{ + WaitingQueueSize: 3, + KVCacheUsagePercent: 0.1, + MaxActiveModels: 2, + ActiveModels: map[string]int{ + "foo": 1, + "critical": 1, + }, + }, + }, + }, + }, + { + name: "default filter, sheddable request, accepted", + filter: defaultFilter, + req: &LLMRequest{ + Model: "sheddable", + ResolvedTargetModel: "sheddable", + Critical: false, + }, + // pod1 will be picked because it has capacity for the sheddable request. + input: []*backend.PodMetrics{ + { + Pod: backend.Pod{Name: "pod1"}, + Metrics: backend.Metrics{ + WaitingQueueSize: 0, + KVCacheUsagePercent: 0.2, + MaxActiveModels: 2, + ActiveModels: map[string]int{ + "foo": 1, + "bar": 1, + }, + }, + }, + { + Pod: backend.Pod{Name: "pod2"}, + Metrics: backend.Metrics{ + WaitingQueueSize: 3, + KVCacheUsagePercent: 0.1, + MaxActiveModels: 2, + ActiveModels: map[string]int{ + "foo": 1, + "critical": 1, + }, + }, + }, + { + Pod: backend.Pod{Name: "pod3"}, + Metrics: backend.Metrics{ + WaitingQueueSize: 10, + KVCacheUsagePercent: 0.2, + MaxActiveModels: 2, + ActiveModels: map[string]int{ + "foo": 1, + }, + }, + }, + }, + output: []*backend.PodMetrics{ + { + Pod: backend.Pod{Name: "pod1"}, + Metrics: backend.Metrics{ + WaitingQueueSize: 0, + KVCacheUsagePercent: 0.2, + MaxActiveModels: 2, + ActiveModels: map[string]int{ + "foo": 1, + "bar": 1, + }, + }, + }, + }, + }, + { + name: "default filter, sheddable request, dropped", + filter: defaultFilter, + req: &LLMRequest{ + Model: "sheddable", + ResolvedTargetModel: "sheddable", + Critical: false, + }, + // All pods have higher KV cache thant the threshold, so the sheddable request will be + // dropped. + input: []*backend.PodMetrics{ + { + Pod: backend.Pod{Name: "pod1"}, + Metrics: backend.Metrics{ + WaitingQueueSize: 10, + KVCacheUsagePercent: 0.9, + MaxActiveModels: 2, + ActiveModels: map[string]int{ + "foo": 1, + "bar": 1, + }, + }, + }, + { + Pod: backend.Pod{Name: "pod2"}, + Metrics: backend.Metrics{ + WaitingQueueSize: 3, + KVCacheUsagePercent: 0.85, + MaxActiveModels: 2, + ActiveModels: map[string]int{ + "foo": 1, + "critical": 1, + }, + }, + }, + { + Pod: backend.Pod{Name: "pod3"}, + Metrics: backend.Metrics{ + WaitingQueueSize: 10, + KVCacheUsagePercent: 0.85, + MaxActiveModels: 2, + ActiveModels: map[string]int{ + "foo": 1, + }, + }, + }, + }, + output: []*backend.PodMetrics{}, + err: true, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + got, err := test.filter.Filter(test.req, test.input) + if test.err != (err != nil) { + t.Errorf("Unexpected error, got %v, want %v", err, test.err) + } + + if diff := cmp.Diff(test.output, got); diff != "" { + t.Errorf("Unexpected output (-want +got): %v", diff) + } + }) + } +} + +func TestFilterFunc(t *testing.T) { + tests := []struct { + name string + f filterFunc + req *LLMRequest + input []*backend.PodMetrics + output []*backend.PodMetrics + err bool + }{ + { + name: "least queuing empty input", + f: leastQueuingFilterFunc, + input: []*backend.PodMetrics{}, + output: []*backend.PodMetrics{}, + }, + { + name: "least queuing", + f: leastQueuingFilterFunc, + input: []*backend.PodMetrics{ + { + Metrics: backend.Metrics{ + WaitingQueueSize: 0, + }, + }, + { + Metrics: backend.Metrics{ + WaitingQueueSize: 3, + }, + }, + { + Metrics: backend.Metrics{ + WaitingQueueSize: 10, + }, + }, + }, + output: []*backend.PodMetrics{ + { + Metrics: backend.Metrics{ + WaitingQueueSize: 0, + }, + }, + { + Metrics: backend.Metrics{ + WaitingQueueSize: 3, + }, + }, + }, + }, + { + name: "least kv cache empty input", + f: leastKVCacheFilterFunc, + input: []*backend.PodMetrics{}, + output: []*backend.PodMetrics{}, + }, + { + name: "least kv cache", + f: leastKVCacheFilterFunc, + input: []*backend.PodMetrics{ + { + Metrics: backend.Metrics{ + KVCacheUsagePercent: 0, + }, + }, + { + Metrics: backend.Metrics{ + KVCacheUsagePercent: 0.3, + }, + }, + { + Metrics: backend.Metrics{ + KVCacheUsagePercent: 1.0, + }, + }, + }, + output: []*backend.PodMetrics{ + { + Metrics: backend.Metrics{ + KVCacheUsagePercent: 0, + }, + }, + { + Metrics: backend.Metrics{ + KVCacheUsagePercent: 0.3, + }, + }, + }, + }, + { + name: "noQueueAndLessThanKVCacheThresholdPredicate", + f: toFilterFunc(noQueueAndLessThanKVCacheThresholdPredicate(0, 0.8)), + input: []*backend.PodMetrics{ + { + // This pod should be returned. + Metrics: backend.Metrics{ + WaitingQueueSize: 0, + KVCacheUsagePercent: 0, + }, + }, + { + // Queue is non zero, despite low kv cache, should not return. + Metrics: backend.Metrics{ + WaitingQueueSize: 1, + KVCacheUsagePercent: 0.3, + }, + }, + { + // High kv cache despite zero queue, should not return + Metrics: backend.Metrics{ + WaitingQueueSize: 0, + KVCacheUsagePercent: 1.0, + }, + }, + }, + output: []*backend.PodMetrics{ + { + Metrics: backend.Metrics{ + WaitingQueueSize: 0, + KVCacheUsagePercent: 0, + }, + }, + }, + }, + { + name: "low LoRA cost", + f: toFilterFunc(lowLoRACostPredicate), + req: &LLMRequest{ + Model: "model", + ResolvedTargetModel: "model", + }, + input: []*backend.PodMetrics{ + // ActiveModels include input model, should be returned. + { + Metrics: backend.Metrics{ + MaxActiveModels: 2, + ActiveModels: map[string]int{ + "model": 1, + }, + }, + }, + // Input model is not active, however the server has room to load another adapter. + { + Metrics: backend.Metrics{ + MaxActiveModels: 2, + ActiveModels: map[string]int{ + "another-model": 1, + }, + }, + }, + // Input is not active, and the server has reached max active models. + { + Metrics: backend.Metrics{ + MaxActiveModels: 2, + ActiveModels: map[string]int{ + "foo": 1, + "bar": 1, + }, + }, + }, + }, + output: []*backend.PodMetrics{ + { + Metrics: backend.Metrics{ + MaxActiveModels: 2, + ActiveModels: map[string]int{ + "model": 1, + }, + }, + }, + { + Metrics: backend.Metrics{ + MaxActiveModels: 2, + ActiveModels: map[string]int{ + "another-model": 1, + }, + }, + }, + }, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + got, err := test.f(test.req, test.input) + if test.err != (err != nil) { + t.Errorf("Unexpected error, got %v, want %v", err, test.err) + } + + if diff := cmp.Diff(test.output, got); diff != "" { + t.Errorf("Unexpected output (-want +got): %v", diff) + } + }) + } +} diff --git a/pkg/ext-proc/scheduling/scheduler.go b/pkg/ext-proc/scheduling/scheduler.go index 9bcd25bb..85ea057c 100644 --- a/pkg/ext-proc/scheduling/scheduler.go +++ b/pkg/ext-proc/scheduling/scheduler.go @@ -5,24 +5,60 @@ import ( "fmt" "math/rand" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" klog "k8s.io/klog/v2" "ext-proc/backend" ) +const ( + // TODO(https://github.com/kubernetes-sigs/llm-instance-gateway/issues/16) Make this configurable. + kvCacheThreshold = 0.8 + // TODO(https://github.com/kubernetes-sigs/llm-instance-gateway/issues/16) Make this configurable. + queueThreshold = 5 +) + var ( defaultFilter = &filter{ + name: "critical request", + filter: toFilterFunc(criticalRequestPredicate), + nextOnSuccess: lowLatencyFilter, + nextOnFailure: sheddableRequestFilter, + } + + // lowLatencyFilter tries to minimize the latency. The heuristic is to pick a server with lower + // cost to load an adapter and has low KV cache, which typically yields lower latency. + lowLatencyFilter = &filter{ name: "least queuing", filter: leastQueuingFilterFunc, nextOnSuccessOrFailure: &filter{ - name: "lora affinity", - filter: toFilterFunc(loraAffinityPredicate), + name: "low cost LoRA", + filter: toFilterFunc(lowLoRACostPredicate), nextOnSuccessOrFailure: &filter{ name: "least KV cache percent", filter: leastKVCacheFilterFunc, }, }, } + + sheddableRequestFilter = &filter{ + // When there is at least one model server that's not queuing requests, and still has KV + // cache below a certain threshold, we consider this model server has capacity to handle + // a sheddable request without impacting critical requests. + name: "has capacity for sheddable requests", + filter: toFilterFunc(noQueueAndLessThanKVCacheThresholdPredicate(queueThreshold, kvCacheThreshold)), + nextOnSuccess: lowLatencyFilter, + // If all pods are queuing or running above the KVCache threshold, we drop the sheddable + // request to make room for critical requests. + nextOnFailure: &filter{ + name: "drop request", + filter: func(req *LLMRequest, pods []*backend.PodMetrics) ([]*backend.PodMetrics, error) { + klog.Infof("Dropping request %v", req) + return []*backend.PodMetrics{}, status.Errorf(codes.ResourceExhausted, "dropping request due to limited backend resources") + }, + }, + } ) func NewScheduler(pmp PodMetricsProvider) *Scheduler { @@ -44,12 +80,13 @@ type PodMetricsProvider interface { } // Schedule finds the target pod based on metrics and the requested lora adapter. -func (s *Scheduler) Schedule(b *LLMRequest) (targetPod *backend.Pod, err error) { - klog.V(3).Infof("request: %v; metrics: %+v", b, s.podMetricsProvider.AllPodMetrics()) - pods, err := s.filter.Filter(b, s.podMetricsProvider.AllPodMetrics()) +func (s *Scheduler) Schedule(req *LLMRequest) (targetPod *backend.Pod, err error) { + klog.V(3).Infof("request: %v; metrics: %+v", req, s.podMetricsProvider.AllPodMetrics()) + pods, err := s.filter.Filter(req, s.podMetricsProvider.AllPodMetrics()) if err != nil || len(pods) == 0 { - return nil, fmt.Errorf("failed to apply filter, resulted %v pods, this should never happen: %v", len(pods), err) + return nil, fmt.Errorf("failed to apply filter, resulted %v pods, this should never happen: %w", len(pods), err) } + klog.V(3).Infof("Going to randomly select a pod from the candidates: %+v", pods) i := rand.Intn(len(pods)) return &pods[i].Pod, nil } diff --git a/pkg/ext-proc/scheduling/types.go b/pkg/ext-proc/scheduling/types.go index 46578550..cfb9d3b8 100644 --- a/pkg/ext-proc/scheduling/types.go +++ b/pkg/ext-proc/scheduling/types.go @@ -7,4 +7,5 @@ type LLMRequest struct { TargetModels map[string]int // Resolved target model is the final target model after traffic split. ResolvedTargetModel string + Critical bool }