Skip to content

Commit 6f9869d

Browse files
authored
Add initial ext proc implementation with LoRA affinity (#14)
* Add initial ext proc implementation with LoRA affinity This is a refactor of the POC implementation in ./examples/ext-proc with the following notable changes: - Re-structured repo to make it more modular: "handlers" implements the ext proc API and handles request/response; "scheduling" implements the request scheduling algorithm; "backend" contains logic to interact with backend pods - Introduced a "filter" concept in the scheduling algorithm to make it easier to write flow chart style algorithm that's being proposed. - Removed metric update from response headers since benchmarking didn't show benefits. Current implementation scrapes metrics every 200ms. Will add response based metrics update back if more benchmarking confirms the benefits. - Simplifies POC code, such as: replaced the freecache package with a sync.Map; consolidated various metrics objects into a single PodMetric object; - Various impriovements including adding leveled logging, handling errors, simplifying code, etc. - The algorithm is simplified a bit from POC - it finds pods with least queuing first, then with active LoRA adapters, then the least KV cache percent. Intial benchmarking shows slightly better throughput than POC. * Make target pod an optinal argument * Clarify nextOnSuccessOrFailure behavior * Address comments
1 parent 0f4522e commit 6f9869d

File tree

16 files changed

+1368
-0
lines changed

16 files changed

+1368
-0
lines changed

pkg/ext-proc/Dockerfile

+19
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
## Multistage build
2+
FROM golang:1.22.5-alpine as build
3+
ENV CGO_ENABLED=0
4+
ENV GOOS=linux
5+
ENV GOARCH=amd64
6+
7+
WORKDIR /src
8+
COPY . .
9+
RUN go mod download
10+
RUN go build -o /ext-proc
11+
FROM alpine:latest
12+
## Multistage deploy
13+
FROM gcr.io/distroless/base-debian10
14+
# Install bash
15+
16+
WORKDIR /
17+
COPY --from=build /ext-proc /ext-proc
18+
19+
ENTRYPOINT ["/ext-proc"]

pkg/ext-proc/backend/fake.go

+26
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
package backend
2+
3+
import (
4+
dto "github.com/prometheus/client_model/go"
5+
)
6+
7+
type FakePodLister struct {
8+
Err error
9+
Pods PodSet
10+
}
11+
12+
type FakePodMetricsClient struct {
13+
Err map[Pod]error
14+
Res map[Pod]map[string]*dto.MetricFamily
15+
}
16+
17+
func (f *FakePodMetricsClient) FetchMetrics(pod Pod) (map[string]*dto.MetricFamily, error) {
18+
if err, ok := f.Err[pod]; ok {
19+
return nil, err
20+
}
21+
return f.Res[pod], nil
22+
}
23+
24+
func (fpl *FakePodLister) List() (PodSet, error) {
25+
return fpl.Pods, fpl.Err
26+
}

pkg/ext-proc/backend/metrics.go

+135
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,135 @@
1+
package backend
2+
3+
import (
4+
"fmt"
5+
"strings"
6+
"sync"
7+
"time"
8+
9+
dto "github.com/prometheus/client_model/go"
10+
"go.uber.org/multierr"
11+
klog "k8s.io/klog/v2"
12+
)
13+
14+
const (
15+
ActiveLoRAAdaptersMetricName = "vllm:info_active_adapters_info"
16+
LoRAAdapterPendingRequestMetricName = "vllm:active_lora_adapters"
17+
// TODO: Replace these with the num_tokens_running/waiting below once we add those to the fork.
18+
RunningQueueSizeMetricName = "vllm:num_requests_running"
19+
WaitingQueueSizeMetricName = "vllm:num_requests_waiting"
20+
/* TODO: Uncomment this once the following are added to the fork.
21+
RunningQueueSizeMetricName = "vllm:num_tokens_running"
22+
WaitingQueueSizeMetricName = "vllm:num_tokens_waiting"
23+
*/
24+
KVCacheUsagePercentMetricName = "vllm:gpu_cache_usage_perc"
25+
KvCacheMaxTokenCapacityMetricName = "vllm:gpu_cache_max_token_capacity"
26+
)
27+
28+
func (p *Provider) refreshMetricsOnce() error {
29+
start := time.Now()
30+
defer func() {
31+
d := time.Now().Sub(start)
32+
// TODO: add a metric instead of logging
33+
klog.V(3).Infof("Refreshed metrics in %v", d)
34+
}()
35+
var wg sync.WaitGroup
36+
var errs error
37+
processOnePod := func(key, value any) bool {
38+
pod := key.(Pod)
39+
metrics := value.(*PodMetrics)
40+
wg.Add(1)
41+
go func() {
42+
defer wg.Done()
43+
metricFamilies, err := p.pmc.FetchMetrics(pod)
44+
if err != nil {
45+
multierr.Append(errs, fmt.Errorf("failed to parse metrics from %s: %v", pod, err))
46+
return
47+
}
48+
updated, err := promToPodMetrics(metricFamilies, metrics)
49+
klog.V(3).Infof("Updated metrics for pod %s: %v", pod, updated.Metrics)
50+
if err != nil {
51+
multierr.Append(errs, fmt.Errorf("failed to get all pod metrics updated from prometheus: %v", err))
52+
}
53+
p.UpdatePodMetrics(pod, updated)
54+
}()
55+
return true
56+
}
57+
p.podMetrics.Range(processOnePod)
58+
wg.Wait()
59+
return errs
60+
}
61+
62+
// promToPodMetrics updates internal pod metrics with scraped prometheus metrics.
63+
// A combined error is returned if errors occur in one or more metric processing.
64+
// it returns a new PodMetrics pointer which can be used to atomically update the pod metrics map.
65+
func promToPodMetrics(metricFamilies map[string]*dto.MetricFamily, existing *PodMetrics) (*PodMetrics, error) {
66+
var errs error
67+
updated := existing.Clone()
68+
runningQueueSize, _, err := getLatestMetric(metricFamilies, RunningQueueSizeMetricName)
69+
multierr.Append(errs, err)
70+
if err != nil {
71+
updated.RunningQueueSize = int(runningQueueSize.GetCounter().GetValue())
72+
}
73+
waitingQueueSize, _, err := getLatestMetric(metricFamilies, WaitingQueueSizeMetricName)
74+
multierr.Append(errs, err)
75+
if err != nil {
76+
updated.WaitingQueueSize = int(waitingQueueSize.GetGauge().GetValue())
77+
}
78+
cachePercent, _, err := getLatestMetric(metricFamilies, KVCacheUsagePercentMetricName)
79+
multierr.Append(errs, err)
80+
if err != nil {
81+
updated.KVCacheUsagePercent = cachePercent.GetGauge().GetValue()
82+
}
83+
/* TODO: uncomment once this is available in vllm.
84+
kvCap, _, err := getGaugeLatestValue(metricFamilies, KvCacheMaxTokenCapacityMetricName)
85+
multierr.Append(errs, err)
86+
if err != nil {
87+
updated.KvCacheMaxTokenCapacity = int(kvCap)
88+
}
89+
*/
90+
91+
// Update active loras
92+
mf, ok := metricFamilies[ActiveLoRAAdaptersMetricName]
93+
if ok {
94+
// IMPORTANT: replace the map entries instead of appending to it.
95+
updated.CachedModels = make(map[string]int)
96+
for _, metric := range mf.GetMetric() {
97+
for _, label := range metric.GetLabel() {
98+
if label.GetName() == "active_adapters" {
99+
if label.GetValue() != "" {
100+
adapterList := strings.Split(label.GetValue(), ",")
101+
for _, adapter := range adapterList {
102+
updated.CachedModels[adapter] = 0
103+
}
104+
}
105+
}
106+
}
107+
}
108+
} else {
109+
klog.Warningf("metric family %q not found", ActiveLoRAAdaptersMetricName)
110+
multierr.Append(errs, fmt.Errorf("metric family %q not found", ActiveLoRAAdaptersMetricName))
111+
}
112+
113+
return updated, errs
114+
}
115+
116+
// getLatestMetric gets the latest metric of a family. This should be used to get the latest Gauge metric.
117+
func getLatestMetric(metricFamilies map[string]*dto.MetricFamily, metricName string) (*dto.Metric, time.Time, error) {
118+
mf, ok := metricFamilies[metricName]
119+
if !ok {
120+
klog.Warningf("metric family %q not found", metricName)
121+
return nil, time.Time{}, fmt.Errorf("metric family %q not found", metricName)
122+
}
123+
if len(mf.GetMetric()) == 0 {
124+
return nil, time.Time{}, fmt.Errorf("no metrics available for %q", metricName)
125+
}
126+
var latestTs int64
127+
var latest *dto.Metric
128+
for _, m := range mf.GetMetric() {
129+
if m.GetTimestampMs() > latestTs {
130+
latestTs = m.GetTimestampMs()
131+
latest = m
132+
}
133+
}
134+
return latest, time.Unix(0, latestTs*1000), nil
135+
}

pkg/ext-proc/backend/pod_client.go

+34
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
package backend
2+
3+
import (
4+
"fmt"
5+
"net/http"
6+
7+
dto "github.com/prometheus/client_model/go"
8+
"github.com/prometheus/common/expfmt"
9+
klog "k8s.io/klog/v2"
10+
)
11+
12+
type PodMetricsClientImpl struct {
13+
}
14+
15+
// FetchMetrics fetches metrics from a given pod.
16+
func (p *PodMetricsClientImpl) FetchMetrics(pod Pod) (map[string]*dto.MetricFamily, error) {
17+
// Currently the metrics endpoint is hard-coded, which works with vLLM.
18+
// TODO(https://github.com/kubernetes-sigs/llm-instance-gateway/issues/16): Consume this from LLMServerPool config.
19+
url := fmt.Sprintf("http://%s/metrics", pod.Address)
20+
resp, err := http.Get(url)
21+
if err != nil {
22+
klog.Errorf("failed to fetch metrics from %s: %v", pod, err)
23+
return nil, fmt.Errorf("failed to fetch metrics from %s: %w", pod, err)
24+
}
25+
defer resp.Body.Close()
26+
27+
if resp.StatusCode != http.StatusOK {
28+
klog.Errorf("unexpected status code from %s: %v", pod, resp.StatusCode)
29+
return nil, fmt.Errorf("unexpected status code from %s: %v", pod, resp.StatusCode)
30+
}
31+
32+
parser := expfmt.TextParser{}
33+
return parser.TextToMetricFamilies(resp.Body)
34+
}

pkg/ext-proc/backend/provider.go

+122
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,122 @@
1+
package backend
2+
3+
import (
4+
"fmt"
5+
"sync"
6+
"time"
7+
8+
dto "github.com/prometheus/client_model/go"
9+
klog "k8s.io/klog/v2"
10+
)
11+
12+
func NewProvider(pmc PodMetricsClient, pl PodLister) *Provider {
13+
p := &Provider{
14+
podMetrics: sync.Map{},
15+
pmc: pmc,
16+
pl: pl,
17+
}
18+
return p
19+
}
20+
21+
// Provider provides backend pods and information such as metrics.
22+
type Provider struct {
23+
// key: Pod, value: *PodMetrics
24+
podMetrics sync.Map
25+
pmc PodMetricsClient
26+
pl PodLister
27+
}
28+
29+
type PodMetricsClient interface {
30+
FetchMetrics(pod Pod) (map[string]*dto.MetricFamily, error)
31+
}
32+
33+
type PodLister interface {
34+
List() (PodSet, error)
35+
}
36+
37+
func (p *Provider) AllPodMetrics() []*PodMetrics {
38+
res := []*PodMetrics{}
39+
fn := func(k, v any) bool {
40+
res = append(res, v.(*PodMetrics))
41+
return true
42+
}
43+
p.podMetrics.Range(fn)
44+
return res
45+
}
46+
47+
func (p *Provider) UpdatePodMetrics(pod Pod, pm *PodMetrics) {
48+
p.podMetrics.Store(pod, pm)
49+
}
50+
51+
func (p *Provider) GetPodMetrics(pod Pod) (*PodMetrics, bool) {
52+
val, ok := p.podMetrics.Load(pod)
53+
if ok {
54+
return val.(*PodMetrics), true
55+
}
56+
return nil, false
57+
}
58+
59+
func (p *Provider) Init(refreshPodsInterval, refreshMetricsInterval time.Duration) error {
60+
if err := p.refreshPodsOnce(); err != nil {
61+
return fmt.Errorf("failed to init pods: %v", err)
62+
}
63+
if err := p.refreshMetricsOnce(); err != nil {
64+
return fmt.Errorf("failed to init metrics: %v", err)
65+
}
66+
67+
klog.V(2).Infof("Initialized pods and metrics: %+v", p.AllPodMetrics())
68+
69+
// periodically refresh pods
70+
go func() {
71+
for {
72+
time.Sleep(refreshPodsInterval)
73+
if err := p.refreshPodsOnce(); err != nil {
74+
klog.V(1).Infof("Failed to refresh podslist pods: %v", err)
75+
}
76+
}
77+
}()
78+
79+
// periodically refresh metrics
80+
go func() {
81+
for {
82+
time.Sleep(refreshMetricsInterval)
83+
if err := p.refreshMetricsOnce(); err != nil {
84+
klog.V(1).Infof("Failed to refresh metrics: %v", err)
85+
}
86+
}
87+
}()
88+
89+
return nil
90+
}
91+
92+
// refreshPodsOnce lists pods and updates keys in the podMetrics map.
93+
// Note this function doesn't update the PodMetrics value, it's done separately.
94+
func (p *Provider) refreshPodsOnce() error {
95+
pods, err := p.pl.List()
96+
if err != nil {
97+
return err
98+
}
99+
// merge new pods with cached ones.
100+
// add new pod to the map
101+
for pod := range pods {
102+
if _, ok := p.podMetrics.Load(pod); !ok {
103+
new := &PodMetrics{
104+
Pod: pod,
105+
Metrics: Metrics{
106+
CachedModels: make(map[string]int),
107+
},
108+
}
109+
p.podMetrics.Store(pod, new)
110+
}
111+
}
112+
// remove pods that don't exist any more.
113+
mergeFn := func(k, v any) bool {
114+
pod := k.(Pod)
115+
if _, ok := pods[pod]; !ok {
116+
p.podMetrics.Delete(pod)
117+
}
118+
return true
119+
}
120+
p.podMetrics.Range(mergeFn)
121+
return nil
122+
}

pkg/ext-proc/backend/types.go

+52
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
// Package backend is a library to interact with backend model servers such as probing metrics.
2+
package backend
3+
4+
import "fmt"
5+
6+
type PodSet map[Pod]bool
7+
8+
type Pod struct {
9+
Namespace string
10+
Name string
11+
Address string
12+
}
13+
14+
func (p Pod) String() string {
15+
return p.Namespace + "." + p.Name
16+
}
17+
18+
type Metrics struct {
19+
// CachedModels is a set of models(including LoRA adapters) that are currently cached to GPU.
20+
CachedModels map[string]int
21+
RunningQueueSize int
22+
WaitingQueueSize int
23+
KVCacheUsagePercent float64
24+
KvCacheMaxTokenCapacity int
25+
}
26+
27+
type PodMetrics struct {
28+
Pod
29+
Metrics
30+
}
31+
32+
func (pm *PodMetrics) String() string {
33+
return fmt.Sprintf("Pod: %+v; Metrics: %+v", pm.Pod, pm.Metrics)
34+
}
35+
36+
func (pm *PodMetrics) Clone() *PodMetrics {
37+
cm := make(map[string]int, len(pm.CachedModels))
38+
for k, v := range pm.CachedModels {
39+
cm[k] = v
40+
}
41+
clone := &PodMetrics{
42+
Pod: pm.Pod,
43+
Metrics: Metrics{
44+
CachedModels: cm,
45+
RunningQueueSize: pm.RunningQueueSize,
46+
WaitingQueueSize: pm.WaitingQueueSize,
47+
KVCacheUsagePercent: pm.KVCacheUsagePercent,
48+
KvCacheMaxTokenCapacity: pm.KvCacheMaxTokenCapacity,
49+
},
50+
}
51+
return clone
52+
}

0 commit comments

Comments
 (0)