diff --git a/pkg/ext-proc/backend/provider.go b/pkg/ext-proc/backend/provider.go index 8ad491a6..0cc21a4f 100644 --- a/pkg/ext-proc/backend/provider.go +++ b/pkg/ext-proc/backend/provider.go @@ -58,9 +58,8 @@ func (p *Provider) GetPodMetrics(pod Pod) (*PodMetrics, bool) { } func (p *Provider) Init(refreshPodsInterval, refreshMetricsInterval time.Duration) error { - if err := p.refreshPodsOnce(); err != nil { - klog.Errorf("Failed to init pods: %v", err) - } + p.refreshPodsOnce() + if err := p.refreshMetricsOnce(); err != nil { klog.Errorf("Failed to init metrics: %v", err) } @@ -71,9 +70,7 @@ func (p *Provider) Init(refreshPodsInterval, refreshMetricsInterval time.Duratio go func() { for { time.Sleep(refreshPodsInterval) - if err := p.refreshPodsOnce(); err != nil { - klog.V(4).Infof("Failed to refresh podslist pods: %v", err) - } + p.refreshPodsOnce() } }() @@ -102,7 +99,7 @@ func (p *Provider) Init(refreshPodsInterval, refreshMetricsInterval time.Duratio // refreshPodsOnce lists pods and updates keys in the podMetrics map. // Note this function doesn't update the PodMetrics value, it's done separately. -func (p *Provider) refreshPodsOnce() error { +func (p *Provider) refreshPodsOnce() { // merge new pods with cached ones. // add new pod to the map addNewPods := func(k, v any) bool { @@ -128,7 +125,6 @@ func (p *Provider) refreshPodsOnce() error { } p.podMetrics.Range(mergeFn) p.datastore.pods.Range(addNewPods) - return nil } func (p *Provider) refreshMetricsOnce() error { diff --git a/pkg/ext-proc/backend/vllm/metrics.go b/pkg/ext-proc/backend/vllm/metrics.go index 27a29d9a..018ba230 100644 --- a/pkg/ext-proc/backend/vllm/metrics.go +++ b/pkg/ext-proc/backend/vllm/metrics.go @@ -52,7 +52,9 @@ func (p *PodMetricsClientImpl) FetchMetrics( klog.Errorf("failed to fetch metrics from %s: %v", pod, err) return nil, fmt.Errorf("failed to fetch metrics from %s: %w", pod, err) } - defer resp.Body.Close() + defer func() { + _ = resp.Body.Close() + }() if resp.StatusCode != http.StatusOK { klog.Errorf("unexpected status code from %s: %v", pod, resp.StatusCode) @@ -76,17 +78,17 @@ func promToPodMetrics( ) (*backend.PodMetrics, error) { var errs error updated := existing.Clone() - runningQueueSize, _, err := getLatestMetric(metricFamilies, RunningQueueSizeMetricName) + runningQueueSize, err := getLatestMetric(metricFamilies, RunningQueueSizeMetricName) errs = multierr.Append(errs, err) if err == nil { updated.RunningQueueSize = int(runningQueueSize.GetGauge().GetValue()) } - waitingQueueSize, _, err := getLatestMetric(metricFamilies, WaitingQueueSizeMetricName) + waitingQueueSize, err := getLatestMetric(metricFamilies, WaitingQueueSizeMetricName) errs = multierr.Append(errs, err) if err == nil { updated.WaitingQueueSize = int(waitingQueueSize.GetGauge().GetValue()) } - cachePercent, _, err := getLatestMetric(metricFamilies, KVCacheUsagePercentMetricName) + cachePercent, err := getLatestMetric(metricFamilies, KVCacheUsagePercentMetricName) errs = multierr.Append(errs, err) if err == nil { updated.KVCacheUsagePercent = cachePercent.GetGauge().GetValue() @@ -151,14 +153,14 @@ func getLatestLoraMetric(metricFamilies map[string]*dto.MetricFamily) (*dto.Metr // getLatestMetric gets the latest metric of a family. This should be used to get the latest Gauge metric. // Since vllm doesn't set the timestamp in metric, this metric essentially gets the first metric. -func getLatestMetric(metricFamilies map[string]*dto.MetricFamily, metricName string) (*dto.Metric, time.Time, error) { +func getLatestMetric(metricFamilies map[string]*dto.MetricFamily, metricName string) (*dto.Metric, error) { mf, ok := metricFamilies[metricName] if !ok { klog.Warningf("metric family %q not found", metricName) - return nil, time.Time{}, fmt.Errorf("metric family %q not found", metricName) + return nil, fmt.Errorf("metric family %q not found", metricName) } if len(mf.GetMetric()) == 0 { - return nil, time.Time{}, fmt.Errorf("no metrics available for %q", metricName) + return nil, fmt.Errorf("no metrics available for %q", metricName) } var latestTs int64 var latest *dto.Metric @@ -169,5 +171,5 @@ func getLatestMetric(metricFamilies map[string]*dto.MetricFamily, metricName str } } klog.V(4).Infof("Got metric value %+v for metric %v", latest, metricName) - return latest, time.Unix(0, latestTs*1000), nil + return latest, nil } diff --git a/pkg/ext-proc/scheduling/filter.go b/pkg/ext-proc/scheduling/filter.go index bcc4432b..7a483aa6 100644 --- a/pkg/ext-proc/scheduling/filter.go +++ b/pkg/ext-proc/scheduling/filter.go @@ -157,9 +157,10 @@ func leastKVCacheFilterFunc(req *LLMRequest, pods []*backend.PodMetrics) ([]*bac type podPredicate func(req *LLMRequest, pod *backend.PodMetrics) bool // We consider serving an adapter low cost it the adapter is active in the model server, or the -// model server has room to load the adapter. The lowLoRACostPredicate ensures weak affinity by spreading the -// load of a LoRA adapter across multiple pods, avoiding "pinning" all requests to a single pod. -// This gave good performance in our initial benchmarking results in the scenario where # of lora slots > # of lora adapters. +// model server has room to load the adapter. The lowLoRACostPredicate ensures weak affinity by +// spreading the load of a LoRA adapter across multiple pods, avoiding "pinning" all requests to +// a single pod. This gave good performance in our initial benchmarking results in the scenario +// where # of lora slots > # of lora adapters. func lowLoRACostPredicate(req *LLMRequest, pod *backend.PodMetrics) bool { _, ok := pod.ActiveModels[req.ResolvedTargetModel] return ok || len(pod.ActiveModels) < pod.MaxActiveModels diff --git a/pkg/ext-proc/scheduling/scheduler.go b/pkg/ext-proc/scheduling/scheduler.go index 6078fecc..f2f0a09f 100644 --- a/pkg/ext-proc/scheduling/scheduler.go +++ b/pkg/ext-proc/scheduling/scheduler.go @@ -84,7 +84,8 @@ var ( name: "drop request", filter: func(req *LLMRequest, pods []*backend.PodMetrics) ([]*backend.PodMetrics, error) { klog.Infof("Dropping request %v", req) - return []*backend.PodMetrics{}, status.Errorf(codes.ResourceExhausted, "dropping request due to limited backend resources") + return []*backend.PodMetrics{}, status.Errorf( + codes.ResourceExhausted, "dropping request due to limited backend resources") }, }, } @@ -114,7 +115,8 @@ func (s *Scheduler) Schedule(req *LLMRequest) (targetPod backend.Pod, err error) klog.V(3).Infof("request: %v; metrics: %+v", req, s.podMetricsProvider.AllPodMetrics()) pods, err := s.filter.Filter(req, s.podMetricsProvider.AllPodMetrics()) if err != nil || len(pods) == 0 { - return backend.Pod{}, fmt.Errorf("failed to apply filter, resulted %v pods, this should never happen: %w", len(pods), err) + return backend.Pod{}, fmt.Errorf( + "failed to apply filter, resulted %v pods, this should never happen: %w", len(pods), err) } klog.V(3).Infof("Going to randomly select a pod from the candidates: %+v", pods) i := rand.Intn(len(pods))