Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add priority based scheduling #25

Merged
merged 2 commits into from
Oct 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/ext-proc/backend/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ func (p *Provider) refreshPodsOnce() error {
new := &PodMetrics{
Pod: pod,
Metrics: Metrics{
CachedModels: make(map[string]int),
ActiveModels: make(map[string]int),
},
}
p.podMetrics.Store(pod, new)
Expand Down
14 changes: 8 additions & 6 deletions pkg/ext-proc/backend/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,14 @@ type Pod struct {
}

func (p Pod) String() string {
return p.Namespace + "." + p.Name
return p.Namespace + "/" + p.Name
}

type Metrics struct {
// CachedModels is a set of models(including LoRA adapters) that are currently cached to GPU.
CachedModels map[string]int
// ActiveModels is a set of models(including LoRA adapters) that are currently cached to GPU.
ActiveModels map[string]int
// MaxActiveModels is the maximum number of models that can be loaded to GPU.
MaxActiveModels int
RunningQueueSize int
WaitingQueueSize int
KVCacheUsagePercent float64
Expand All @@ -34,14 +36,14 @@ func (pm *PodMetrics) String() string {
}

func (pm *PodMetrics) Clone() *PodMetrics {
cm := make(map[string]int, len(pm.CachedModels))
for k, v := range pm.CachedModels {
cm := make(map[string]int, len(pm.ActiveModels))
for k, v := range pm.ActiveModels {
cm[k] = v
}
clone := &PodMetrics{
Pod: pm.Pod,
Metrics: Metrics{
CachedModels: cm,
ActiveModels: cm,
RunningQueueSize: pm.RunningQueueSize,
WaitingQueueSize: pm.WaitingQueueSize,
KVCacheUsagePercent: pm.KVCacheUsagePercent,
Expand Down
7 changes: 5 additions & 2 deletions pkg/ext-proc/backend/vllm/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,18 +85,21 @@ func promToPodMetrics(metricFamilies map[string]*dto.MetricFamily, existing *bac
}
*/

// TODO(https://github.com/kubernetes-sigs/llm-instance-gateway/issues/22): Read from vLLM metrics once the is available.
updated.MaxActiveModels = 4

// Update active loras
mf, ok := metricFamilies[ActiveLoRAAdaptersMetricName]
if ok {
// IMPORTANT: replace the map entries instead of appending to it.
updated.CachedModels = make(map[string]int)
updated.ActiveModels = make(map[string]int)
for _, metric := range mf.GetMetric() {
for _, label := range metric.GetLabel() {
if label.GetName() == "active_adapters" {
if label.GetValue() != "" {
adapterList := strings.Split(label.GetValue(), ",")
for _, adapter := range adapterList {
updated.CachedModels[adapter] = 0
updated.ActiveModels[adapter] = 0
}
}
}
Expand Down
1 change: 1 addition & 0 deletions pkg/ext-proc/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ go 1.21
require (
github.com/bojand/ghz v0.120.0
github.com/envoyproxy/go-control-plane v0.13.0
github.com/google/go-cmp v0.6.0
github.com/jhump/protoreflect v1.15.1
github.com/prometheus/client_model v0.6.1
github.com/prometheus/common v0.55.0
Expand Down
4 changes: 3 additions & 1 deletion pkg/ext-proc/handlers/request.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ func (s *Server) HandleRequestBody(reqCtx *RequestContext, req *extProcPb.Proces
// TODO: Once the API is approved, read the "LLMUseCase" configuration and apply traffic split.
TargetModels: map[string]int{model: 100},
ResolvedTargetModel: model,
// TODO: Read from LLMService CRD.
Critical: true,
}

// Update target models in the body.
Expand All @@ -51,7 +53,7 @@ func (s *Server) HandleRequestBody(reqCtx *RequestContext, req *extProcPb.Proces

targetPod, err := s.scheduler.Schedule(llmReq)
if err != nil {
return nil, fmt.Errorf("failed to find target pod: %v", err)
return nil, fmt.Errorf("failed to find target pod: %w", err)
}
klog.V(3).Infof("Selected target model %v in target pod: %v\n", llmReq.ResolvedTargetModel, targetPod)

Expand Down
20 changes: 18 additions & 2 deletions pkg/ext-proc/handlers/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"io"

extProcPb "github.com/envoyproxy/go-control-plane/envoy/service/ext_proc/v3"
envoyTypePb "github.com/envoyproxy/go-control-plane/envoy/type/v3"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
klog "k8s.io/klog/v2"
Expand Down Expand Up @@ -83,13 +84,28 @@ func (s *Server) Process(srv extProcPb.ExternalProcessor_ProcessServer) error {

if err != nil {
klog.Errorf("failed to process request: %v", err)
return status.Errorf(codes.Unknown, "failed to handle request: %v", err)
switch status.Code(err) {
// This code can be returned by scheduler when there is no capacity for sheddable
// requests.
case codes.ResourceExhausted:
resp = &extProcPb.ProcessingResponse{
Response: &extProcPb.ProcessingResponse_ImmediateResponse{
ImmediateResponse: &extProcPb.ImmediateResponse{
Status: &envoyTypePb.HttpStatus{
Code: envoyTypePb.StatusCode_TooManyRequests,
},
},
},
}
default:
return status.Errorf(status.Code(err), "failed to handle request: %w", err)
}
}

klog.V(3).Infof("response: %v", resp)
if err := srv.Send(resp); err != nil {
klog.Errorf("send error %v", err)
return status.Errorf(codes.Unknown, "failed to send response back to Envoy: %v", err)
return status.Errorf(codes.Unknown, "failed to send response back to Envoy: %w", err)
}
}
}
Expand Down
73 changes: 44 additions & 29 deletions pkg/ext-proc/scheduling/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (

type Filter interface {
Name() string
Filter(b *LLMRequest, pods []*backend.PodMetrics) ([]*backend.PodMetrics, error)
Filter(req *LLMRequest, pods []*backend.PodMetrics) ([]*backend.PodMetrics, error)
}

// filter applies current filterFunc, and then recursively applies next filters depending success or
Expand Down Expand Up @@ -41,42 +41,46 @@ func (f *filter) Name() string {
return f.name
}

func (f *filter) Filter(b *LLMRequest, pods []*backend.PodMetrics) ([]*backend.PodMetrics, error) {
if f == nil {
klog.V(3).Infof("Running nil filter, returning all input pods by default")
return pods, nil
}
klog.V(3).Infof("Running filter %q on request %v with %v pods", f.name, b, len(pods))
func (f *filter) Filter(req *LLMRequest, pods []*backend.PodMetrics) ([]*backend.PodMetrics, error) {
klog.V(3).Infof("Running filter %q on request %v with %v pods", f.name, req, len(pods))

filtered, err := f.filter(b, pods)
filtered, err := f.filter(req, pods)

next := f.nextOnSuccessOrFailure
if err == nil {
klog.V(3).Infof("onSuccess %v -> %v, filtered: %v", f.name, next.Name(), len(filtered))
if err == nil && len(filtered) > 0 {
if f.nextOnSuccess == nil && f.nextOnSuccessOrFailure == nil {
// No succeeding filters to run, return.
return filtered, err
}
if f.nextOnSuccess != nil {
next = f.nextOnSuccess
}
klog.V(3).Infof("onSuccess %q -> %q, filtered: %v", f.name, next.Name(), len(filtered))
// On success, pass the filtered result to the next filter.
return next.Filter(b, filtered)
}

klog.V(3).Infof("onFailure %v -> %v", f.name, next.Name())
if f.nextOnFailure != nil {
next = f.nextOnFailure
return next.Filter(req, filtered)
} else {
if f.nextOnFailure == nil && f.nextOnSuccessOrFailure == nil {
// No succeeding filters to run, return.
return filtered, err
}
if f.nextOnFailure != nil {
next = f.nextOnFailure
}
klog.V(3).Infof("onFailure %q -> %q", f.name, next.Name())
// On failure, pass the initial set of pods to the next filter.
return next.Filter(req, pods)
}
// On failure, pass the initial set of pods to the next filter.
return next.Filter(b, pods)
}

// filterFunc filters a set of input pods to a subset.
type filterFunc func(b *LLMRequest, pods []*backend.PodMetrics) ([]*backend.PodMetrics, error)
type filterFunc func(req *LLMRequest, pods []*backend.PodMetrics) ([]*backend.PodMetrics, error)

// toFilterFunc is a helper function to convert a per pod filter func to the FilterFunc.
func toFilterFunc(pp podPredicate) filterFunc {
return func(b *LLMRequest, pods []*backend.PodMetrics) ([]*backend.PodMetrics, error) {
return func(req *LLMRequest, pods []*backend.PodMetrics) ([]*backend.PodMetrics, error) {
filtered := []*backend.PodMetrics{}
for _, pod := range pods {
pass := pp(b, pod)
pass := pp(req, pod)
if pass {
filtered = append(filtered, pod)
}
Expand All @@ -95,7 +99,7 @@ func toFilterFunc(pp podPredicate) filterFunc {
// the least one as it gives more choices for the next filter, which on aggregate gave better
// results.
// TODO: Compare this strategy with other strategies such as top K.
func leastQueuingFilterFunc(b *LLMRequest, pods []*backend.PodMetrics) ([]*backend.PodMetrics, error) {
func leastQueuingFilterFunc(req *LLMRequest, pods []*backend.PodMetrics) ([]*backend.PodMetrics, error) {
min := math.MaxInt
max := 0
filtered := []*backend.PodMetrics{}
Expand Down Expand Up @@ -123,9 +127,9 @@ func leastQueuingFilterFunc(b *LLMRequest, pods []*backend.PodMetrics) ([]*backe
// should consider them all instead of the absolute minimum one. This worked better than picking the
// least one as it gives more choices for the next filter, which on aggregate gave better results.
// TODO: Compare this strategy with other strategies such as top K.
func leastKVCacheFilterFunc(b *LLMRequest, pods []*backend.PodMetrics) ([]*backend.PodMetrics, error) {
func leastKVCacheFilterFunc(req *LLMRequest, pods []*backend.PodMetrics) ([]*backend.PodMetrics, error) {
min := math.MaxFloat64
max := math.SmallestNonzeroFloat64
var max float64 = 0
filtered := []*backend.PodMetrics{}

for _, pod := range pods {
Expand All @@ -146,10 +150,21 @@ func leastKVCacheFilterFunc(b *LLMRequest, pods []*backend.PodMetrics) ([]*backe
}

// podPredicate is a filter function to check whether a pod is desired.
type podPredicate func(b *LLMRequest, pod *backend.PodMetrics) bool
type podPredicate func(req *LLMRequest, pod *backend.PodMetrics) bool

// We consider serving an adapter low cost it the adapter is active in the model server, or the
// model server has room to load the adapter
func lowLoRACostPredicate(req *LLMRequest, pod *backend.PodMetrics) bool {
_, ok := pod.ActiveModels[req.ResolvedTargetModel]
return ok || len(pod.ActiveModels) < pod.MaxActiveModels
}

// loraAffinityPredicate return true if the pod have the requested LoRA adapter loaded.
func loraAffinityPredicate(b *LLMRequest, pod *backend.PodMetrics) bool {
_, ok := pod.CachedModels[b.ResolvedTargetModel]
return ok
func criticalRequestPredicate(req *LLMRequest, pod *backend.PodMetrics) bool {
return req.Critical
}

func noQueueAndLessThanKVCacheThresholdPredicate(queueThreshold int, kvCacheThreshold float64) podPredicate {
return func(req *LLMRequest, pod *backend.PodMetrics) bool {
return pod.WaitingQueueSize <= queueThreshold && pod.KVCacheUsagePercent <= kvCacheThreshold
}
}
Loading