Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fixing some lint errors #126

Merged
merged 1 commit into from
Dec 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 4 additions & 8 deletions pkg/ext-proc/backend/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,8 @@ func (p *Provider) GetPodMetrics(pod Pod) (*PodMetrics, bool) {
}

func (p *Provider) Init(refreshPodsInterval, refreshMetricsInterval time.Duration) error {
if err := p.refreshPodsOnce(); err != nil {
klog.Errorf("Failed to init pods: %v", err)
}
p.refreshPodsOnce()

if err := p.refreshMetricsOnce(); err != nil {
klog.Errorf("Failed to init metrics: %v", err)
}
Expand All @@ -71,9 +70,7 @@ func (p *Provider) Init(refreshPodsInterval, refreshMetricsInterval time.Duratio
go func() {
for {
time.Sleep(refreshPodsInterval)
if err := p.refreshPodsOnce(); err != nil {
klog.V(4).Infof("Failed to refresh podslist pods: %v", err)
}
p.refreshPodsOnce()
}
}()

Expand Down Expand Up @@ -102,7 +99,7 @@ func (p *Provider) Init(refreshPodsInterval, refreshMetricsInterval time.Duratio

// refreshPodsOnce lists pods and updates keys in the podMetrics map.
// Note this function doesn't update the PodMetrics value, it's done separately.
func (p *Provider) refreshPodsOnce() error {
func (p *Provider) refreshPodsOnce() {
// merge new pods with cached ones.
// add new pod to the map
addNewPods := func(k, v any) bool {
Expand All @@ -128,7 +125,6 @@ func (p *Provider) refreshPodsOnce() error {
}
p.podMetrics.Range(mergeFn)
p.datastore.pods.Range(addNewPods)
return nil
}

func (p *Provider) refreshMetricsOnce() error {
Expand Down
18 changes: 10 additions & 8 deletions pkg/ext-proc/backend/vllm/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,9 @@ func (p *PodMetricsClientImpl) FetchMetrics(
klog.Errorf("failed to fetch metrics from %s: %v", pod, err)
return nil, fmt.Errorf("failed to fetch metrics from %s: %w", pod, err)
}
defer resp.Body.Close()
defer func() {
_ = resp.Body.Close()
}()

if resp.StatusCode != http.StatusOK {
klog.Errorf("unexpected status code from %s: %v", pod, resp.StatusCode)
Expand All @@ -76,17 +78,17 @@ func promToPodMetrics(
) (*backend.PodMetrics, error) {
var errs error
updated := existing.Clone()
runningQueueSize, _, err := getLatestMetric(metricFamilies, RunningQueueSizeMetricName)
runningQueueSize, err := getLatestMetric(metricFamilies, RunningQueueSizeMetricName)
errs = multierr.Append(errs, err)
if err == nil {
updated.RunningQueueSize = int(runningQueueSize.GetGauge().GetValue())
}
waitingQueueSize, _, err := getLatestMetric(metricFamilies, WaitingQueueSizeMetricName)
waitingQueueSize, err := getLatestMetric(metricFamilies, WaitingQueueSizeMetricName)
errs = multierr.Append(errs, err)
if err == nil {
updated.WaitingQueueSize = int(waitingQueueSize.GetGauge().GetValue())
}
cachePercent, _, err := getLatestMetric(metricFamilies, KVCacheUsagePercentMetricName)
cachePercent, err := getLatestMetric(metricFamilies, KVCacheUsagePercentMetricName)
errs = multierr.Append(errs, err)
if err == nil {
updated.KVCacheUsagePercent = cachePercent.GetGauge().GetValue()
Expand Down Expand Up @@ -151,14 +153,14 @@ func getLatestLoraMetric(metricFamilies map[string]*dto.MetricFamily) (*dto.Metr

// getLatestMetric gets the latest metric of a family. This should be used to get the latest Gauge metric.
// Since vllm doesn't set the timestamp in metric, this metric essentially gets the first metric.
func getLatestMetric(metricFamilies map[string]*dto.MetricFamily, metricName string) (*dto.Metric, time.Time, error) {
func getLatestMetric(metricFamilies map[string]*dto.MetricFamily, metricName string) (*dto.Metric, error) {
mf, ok := metricFamilies[metricName]
if !ok {
klog.Warningf("metric family %q not found", metricName)
return nil, time.Time{}, fmt.Errorf("metric family %q not found", metricName)
return nil, fmt.Errorf("metric family %q not found", metricName)
}
if len(mf.GetMetric()) == 0 {
return nil, time.Time{}, fmt.Errorf("no metrics available for %q", metricName)
return nil, fmt.Errorf("no metrics available for %q", metricName)
}
var latestTs int64
var latest *dto.Metric
Expand All @@ -169,5 +171,5 @@ func getLatestMetric(metricFamilies map[string]*dto.MetricFamily, metricName str
}
}
klog.V(4).Infof("Got metric value %+v for metric %v", latest, metricName)
return latest, time.Unix(0, latestTs*1000), nil
return latest, nil
}
7 changes: 4 additions & 3 deletions pkg/ext-proc/scheduling/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,9 +157,10 @@ func leastKVCacheFilterFunc(req *LLMRequest, pods []*backend.PodMetrics) ([]*bac
type podPredicate func(req *LLMRequest, pod *backend.PodMetrics) bool

// We consider serving an adapter low cost it the adapter is active in the model server, or the
// model server has room to load the adapter. The lowLoRACostPredicate ensures weak affinity by spreading the
// load of a LoRA adapter across multiple pods, avoiding "pinning" all requests to a single pod.
// This gave good performance in our initial benchmarking results in the scenario where # of lora slots > # of lora adapters.
// model server has room to load the adapter. The lowLoRACostPredicate ensures weak affinity by
// spreading the load of a LoRA adapter across multiple pods, avoiding "pinning" all requests to
// a single pod. This gave good performance in our initial benchmarking results in the scenario
// where # of lora slots > # of lora adapters.
func lowLoRACostPredicate(req *LLMRequest, pod *backend.PodMetrics) bool {
_, ok := pod.ActiveModels[req.ResolvedTargetModel]
return ok || len(pod.ActiveModels) < pod.MaxActiveModels
Expand Down
6 changes: 4 additions & 2 deletions pkg/ext-proc/scheduling/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,8 @@ var (
name: "drop request",
filter: func(req *LLMRequest, pods []*backend.PodMetrics) ([]*backend.PodMetrics, error) {
klog.Infof("Dropping request %v", req)
return []*backend.PodMetrics{}, status.Errorf(codes.ResourceExhausted, "dropping request due to limited backend resources")
return []*backend.PodMetrics{}, status.Errorf(
codes.ResourceExhausted, "dropping request due to limited backend resources")
},
},
}
Expand Down Expand Up @@ -114,7 +115,8 @@ func (s *Scheduler) Schedule(req *LLMRequest) (targetPod backend.Pod, err error)
klog.V(3).Infof("request: %v; metrics: %+v", req, s.podMetricsProvider.AllPodMetrics())
pods, err := s.filter.Filter(req, s.podMetricsProvider.AllPodMetrics())
if err != nil || len(pods) == 0 {
return backend.Pod{}, fmt.Errorf("failed to apply filter, resulted %v pods, this should never happen: %w", len(pods), err)
return backend.Pod{}, fmt.Errorf(
"failed to apply filter, resulted %v pods, this should never happen: %w", len(pods), err)
}
klog.V(3).Infof("Going to randomly select a pod from the candidates: %+v", pods)
i := rand.Intn(len(pods))
Expand Down