From ea4ea9c5c81e5d8a492334670eed5f16997b5eeb Mon Sep 17 00:00:00 2001 From: Benjamin Braun Date: Mon, 3 Feb 2025 21:18:49 +0000 Subject: [PATCH 1/4] Add 2 new test cases to hermetic integration test. Move k8sclient API setup to BeforeSuit() so it is set up once for all test cases. Add getter function to scheduling to reference queue threshold for lora affinity inside integration tests. --- pkg/ext-proc/scheduling/scheduler.go | 4 + test/integration/hermetic_test.go | 247 +++++++++++++++++++-------- 2 files changed, 184 insertions(+), 67 deletions(-) diff --git a/pkg/ext-proc/scheduling/scheduler.go b/pkg/ext-proc/scheduling/scheduler.go index 9fc3e663..5fe1d93d 100644 --- a/pkg/ext-proc/scheduling/scheduler.go +++ b/pkg/ext-proc/scheduling/scheduler.go @@ -122,3 +122,7 @@ func (s *Scheduler) Schedule(req *LLMRequest) (targetPod backend.Pod, err error) i := rand.Intn(len(pods)) return pods[i].Pod, nil } + +func GetQueueingThresholdLoRA() int { + return queueingThresholdLoRA +} diff --git a/test/integration/hermetic_test.go b/test/integration/hermetic_test.go index 44c5ae0a..7d244802 100644 --- a/test/integration/hermetic_test.go +++ b/test/integration/hermetic_test.go @@ -23,6 +23,7 @@ import ( "google.golang.org/protobuf/testing/protocmp" "inference.networking.x-k8s.io/gateway-api-inference-extension/api/v1alpha1" "inference.networking.x-k8s.io/gateway-api-inference-extension/pkg/ext-proc/backend" + "inference.networking.x-k8s.io/gateway-api-inference-extension/pkg/ext-proc/scheduling" runserver "inference.networking.x-k8s.io/gateway-api-inference-extension/pkg/ext-proc/server" extprocutils "inference.networking.x-k8s.io/gateway-api-inference-extension/pkg/ext-proc/test" "k8s.io/apimachinery/pkg/runtime" @@ -164,15 +165,49 @@ func TestKubeInferenceModelRequest(t *testing.T) { tests := []struct { name string req *extProcPb.ProcessingRequest + pods []*backend.PodMetrics wantHeaders []*configPb.HeaderValueOption wantBody []byte wantErr bool }{ { - name: "success", + name: "select lower queue, no active lora", req: extprocutils.GenerateRequest("sql-lora"), // pod-1 will be picked because it has relatively low queue size, with the requested // model being active, and has low KV cache. + pods: []*backend.PodMetrics{ + { + Pod: extprocutils.FakePod(0), + Metrics: backend.Metrics{ + WaitingQueueSize: 3, + KVCacheUsagePercent: 0.2, + ActiveModels: map[string]int{ + "foo": 1, + "bar": 1, + }, + }, + }, + { + Pod: extprocutils.FakePod(1), + Metrics: backend.Metrics{ + WaitingQueueSize: 0, + KVCacheUsagePercent: 0.1, + ActiveModels: map[string]int{ + "foo": 1, + }, + }, + }, + { + Pod: extprocutils.FakePod(2), + Metrics: backend.Metrics{ + WaitingQueueSize: 10, + KVCacheUsagePercent: 0.2, + ActiveModels: map[string]int{ + "foo": 1, + }, + }, + }, + }, wantHeaders: []*configPb.HeaderValueOption{ { Header: &configPb.HeaderValue{ @@ -190,40 +225,117 @@ func TestKubeInferenceModelRequest(t *testing.T) { wantBody: []byte("{\"max_tokens\":100,\"model\":\"sql-lora-1fdg2\",\"prompt\":\"hello\",\"temperature\":0}"), wantErr: false, }, - } - - pods := []*backend.PodMetrics{ { - Pod: extprocutils.FakePod(0), - Metrics: backend.Metrics{ - WaitingQueueSize: 0, - KVCacheUsagePercent: 0.2, - ActiveModels: map[string]int{ - "foo": 1, - "bar": 1, + name: "select active lora, low queue", + req: extprocutils.GenerateRequest("sql-lora"), + // pod-1 will be picked because it has relatively low queue size, with the requested + // model being active, and has low KV cache. + pods: []*backend.PodMetrics{ + { + Pod: extprocutils.FakePod(0), + Metrics: backend.Metrics{ + WaitingQueueSize: 0, + KVCacheUsagePercent: 0.2, + ActiveModels: map[string]int{ + "foo": 1, + "bar": 1, + }, + }, + }, + { + Pod: extprocutils.FakePod(1), + Metrics: backend.Metrics{ + WaitingQueueSize: 0, + KVCacheUsagePercent: 0.1, + ActiveModels: map[string]int{ + "foo": 1, + "sql-lora-1fdg2": 1, + }, + }, + }, + { + Pod: extprocutils.FakePod(2), + Metrics: backend.Metrics{ + WaitingQueueSize: 10, + KVCacheUsagePercent: 0.2, + ActiveModels: map[string]int{ + "foo": 1, + }, + }, }, }, - }, - { - Pod: extprocutils.FakePod(1), - Metrics: backend.Metrics{ - WaitingQueueSize: 0, - KVCacheUsagePercent: 0.1, - ActiveModels: map[string]int{ - "foo": 1, - "sql-lora-1fdg2": 1, + wantHeaders: []*configPb.HeaderValueOption{ + { + Header: &configPb.HeaderValue{ + Key: runserver.DefaultTargetPodHeader, + RawValue: []byte("address-1"), + }, + }, + { + Header: &configPb.HeaderValue{ + Key: "Content-Length", + RawValue: []byte("76"), + }, }, }, + wantBody: []byte("{\"max_tokens\":100,\"model\":\"sql-lora-1fdg2\",\"prompt\":\"hello\",\"temperature\":0}"), + wantErr: false, }, { - Pod: extprocutils.FakePod(2), - Metrics: backend.Metrics{ - WaitingQueueSize: 10, - KVCacheUsagePercent: 0.2, - ActiveModels: map[string]int{ - "foo": 1, + name: "select no lora despite active model, avoid excessive queue size", + req: extprocutils.GenerateRequest("sql-lora"), + // pod-2 will be picked despite it having the requested model being active + // as it's above the affinity for queue size. + pods: []*backend.PodMetrics{ + { + Pod: extprocutils.FakePod(0), + Metrics: backend.Metrics{ + WaitingQueueSize: 3, + KVCacheUsagePercent: 0.2, + ActiveModels: map[string]int{ + "foo": 1, + "bar": 1, + }, + }, + }, + { + Pod: extprocutils.FakePod(1), + Metrics: backend.Metrics{ + WaitingQueueSize: scheduling.GetQueueingThresholdLoRA(), + KVCacheUsagePercent: 0.1, + ActiveModels: map[string]int{ + "foo": 1, + "sql-lora-1fdg2": 1, + }, + }, + }, + { + Pod: extprocutils.FakePod(2), + Metrics: backend.Metrics{ + WaitingQueueSize: 0, + KVCacheUsagePercent: 0.2, + ActiveModels: map[string]int{ + "foo": 1, + }, + }, + }, + }, + wantHeaders: []*configPb.HeaderValueOption{ + { + Header: &configPb.HeaderValue{ + Key: runserver.DefaultTargetPodHeader, + RawValue: []byte("address-2"), + }, + }, + { + Header: &configPb.HeaderValue{ + Key: "Content-Length", + RawValue: []byte("76"), + }, }, }, + wantBody: []byte("{\"max_tokens\":100,\"model\":\"sql-lora-1fdg2\",\"prompt\":\"hello\",\"temperature\":0}"), + wantErr: false, }, } @@ -232,7 +344,7 @@ func TestKubeInferenceModelRequest(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { - client, cleanup := setUpHermeticServer(t, pods) + client, cleanup := setUpHermeticServer(test.pods) t.Cleanup(cleanup) want := &extProcPb.ProcessingResponse{ Response: &extProcPb.ProcessingResponse_RequestBody{ @@ -286,44 +398,7 @@ func setUpServer(t *testing.T, pods []*backend.PodMetrics, models map[string]*v1 } } -func setUpHermeticServer(t *testing.T, pods []*backend.PodMetrics) (client extProcPb.ExternalProcessor_ProcessClient, cleanup func()) { - t.Logf("Setting up hermetic ExtProc server") - klog.InitFlags(nil) - flag.Parse() - // Configure klog verbosity levels to print ext proc logs. - _ = flag.Lookup("v").Value.Set("3") - - // Unmarshal CRDs from file into structs - manifestsPath := filepath.Join("..", "testdata", "inferencepool-with-model-hermetic.yaml") - docs, err := readDocuments(manifestsPath) - if err != nil { - log.Fatalf("Can't read object manifests at path %v, %v", manifestsPath, err) - } - - for _, doc := range docs { - inferenceModel := &v1alpha1.InferenceModel{} - if err = yaml.Unmarshal(doc, inferenceModel); err != nil { - log.Fatalf("Can't unmarshal object: %v", doc) - } - if inferenceModel.Kind == "InferenceModel" { - t.Logf("Creating inference model: %+v", inferenceModel) - if err := k8sClient.Create(context.Background(), inferenceModel); err != nil { - log.Fatalf("unable to create inferenceModel %v: %v", inferenceModel.Name, err) - } - } - } - for _, doc := range docs { - inferencePool := &v1alpha1.InferencePool{} - if err = yaml.Unmarshal(doc, inferencePool); err != nil { - log.Fatalf("Can't unmarshal object: %v", doc) - } - if inferencePool.Kind == "InferencePool" { - t.Logf("Creating inference pool: %+v", inferencePool) - if err := k8sClient.Create(context.Background(), inferencePool); err != nil { - log.Fatalf("unable to create inferencePool %v: %v", inferencePool.Name, err) - } - } - } +func setUpHermeticServer(pods []*backend.PodMetrics) (client extProcPb.ExternalProcessor_ProcessClient, cleanup func()) { ps := make(backend.PodSet) pms := make(map[backend.Pod]*backend.PodMetrics) @@ -334,8 +409,8 @@ func setUpHermeticServer(t *testing.T, pods []*backend.PodMetrics) (client extPr pmc := &backend.FakePodMetricsClient{Res: pms} server := serverRunner.Start(backend.NewK8sDataStore(backend.WithPods(pods)), pmc) - if err != nil { - log.Fatalf("Ext-proc failed with the err: %v", err) + if server == nil { + log.Fatalf("Ext-proc Start returned nil server. pods: %v, pod metric client: %v", pods, pmc) } // Wait the reconciler to populate the datastore. @@ -396,6 +471,44 @@ func BeforeSuit() { go func() { serverRunner.StartManager() }() + + klog.Info("Setting up hermetic ExtProc server") + klog.InitFlags(nil) + flag.Parse() + // Configure klog verbosity levels to print ext proc logs. + _ = flag.Lookup("v").Value.Set("3") + + // Unmarshal CRDs from file into structs + manifestsPath := filepath.Join("..", "testdata", "inferencepool-with-model-hermetic.yaml") + docs, err := readDocuments(manifestsPath) + if err != nil { + log.Fatalf("Can't read object manifests at path %v, %v", manifestsPath, err) + } + + for _, doc := range docs { + inferenceModel := &v1alpha1.InferenceModel{} + if err = yaml.Unmarshal(doc, inferenceModel); err != nil { + log.Fatalf("Can't unmarshal object: %v", doc) + } + if inferenceModel.Kind == "InferenceModel" { + klog.Infof("Creating inference model: %+v", inferenceModel) + if err := k8sClient.Create(context.Background(), inferenceModel); err != nil { + log.Fatalf("unable to create inferenceModel %v: %v", inferenceModel.Name, err) + } + } + } + for _, doc := range docs { + inferencePool := &v1alpha1.InferencePool{} + if err = yaml.Unmarshal(doc, inferencePool); err != nil { + log.Fatalf("Can't unmarshal object: %v", doc) + } + if inferencePool.Kind == "InferencePool" { + klog.Infof("Creating inference pool: %+v", inferencePool) + if err := k8sClient.Create(context.Background(), inferencePool); err != nil { + log.Fatalf("unable to create inferencePool %v: %v", inferencePool.Name, err) + } + } + } } func sendRequest(t *testing.T, client extProcPb.ExternalProcessor_ProcessClient, req *extProcPb.ProcessingRequest) (*extProcPb.ProcessingResponse, error) { From a89b0803d70aa8fdc27220c764ace31c4cc8a8bc Mon Sep 17 00:00:00 2001 From: Benjamin Braun Date: Tue, 4 Feb 2025 00:49:36 +0000 Subject: [PATCH 2/4] remove vestigial unit test from hermetic test, minor change to comments, remove unreachable error check. --- pkg/ext-proc/scheduling/scheduler.go | 4 - test/integration/hermetic_test.go | 230 ++++++------------ .../inferencepool-with-model-hermetic.yaml | 13 + 3 files changed, 82 insertions(+), 165 deletions(-) diff --git a/pkg/ext-proc/scheduling/scheduler.go b/pkg/ext-proc/scheduling/scheduler.go index 5fe1d93d..9fc3e663 100644 --- a/pkg/ext-proc/scheduling/scheduler.go +++ b/pkg/ext-proc/scheduling/scheduler.go @@ -122,7 +122,3 @@ func (s *Scheduler) Schedule(req *LLMRequest) (targetPod backend.Pod, err error) i := rand.Intn(len(pods)) return pods[i].Pod, nil } - -func GetQueueingThresholdLoRA() int { - return queueingThresholdLoRA -} diff --git a/test/integration/hermetic_test.go b/test/integration/hermetic_test.go index 1dae155d..09f9d04a 100644 --- a/test/integration/hermetic_test.go +++ b/test/integration/hermetic_test.go @@ -17,6 +17,7 @@ import ( configPb "github.com/envoyproxy/go-control-plane/envoy/config/core/v3" extProcPb "github.com/envoyproxy/go-control-plane/envoy/service/ext_proc/v3" + envoyTypePb "github.com/envoyproxy/go-control-plane/envoy/type/v3" "github.com/google/go-cmp/cmp" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" @@ -24,7 +25,6 @@ import ( "google.golang.org/protobuf/types/known/structpb" "inference.networking.x-k8s.io/gateway-api-inference-extension/api/v1alpha1" "inference.networking.x-k8s.io/gateway-api-inference-extension/pkg/ext-proc/backend" - "inference.networking.x-k8s.io/gateway-api-inference-extension/pkg/ext-proc/scheduling" runserver "inference.networking.x-k8s.io/gateway-api-inference-extension/pkg/ext-proc/server" extprocutils "inference.networking.x-k8s.io/gateway-api-inference-extension/pkg/ext-proc/test" "k8s.io/apimachinery/pkg/runtime" @@ -48,44 +48,27 @@ var ( scheme = runtime.NewScheme() ) -func SKIPTestHandleRequestBody(t *testing.T) { +func TestKubeInferenceModelRequest(t *testing.T) { tests := []struct { - name string - req *extProcPb.ProcessingRequest - pods []*backend.PodMetrics - models map[string]*v1alpha1.InferenceModel - wantHeaders []*configPb.HeaderValueOption - wantBody []byte - wantErr bool + name string + req *extProcPb.ProcessingRequest + pods []*backend.PodMetrics + wantHeaders []*configPb.HeaderValueOption + wantMetadata *structpb.Struct + wantBody []byte + wantErr bool + immediateResponse *extProcPb.ImmediateResponse }{ { - name: "success", - req: extprocutils.GenerateRequest("my-model"), - models: map[string]*v1alpha1.InferenceModel{ - "my-model": { - Spec: v1alpha1.InferenceModelSpec{ - ModelName: "my-model", - TargetModels: []v1alpha1.TargetModel{ - { - Name: "my-model-v1", - Weight: pointer(100), - }, - }, - }, - }, - }, - // pod-1 will be picked because it has relatively low queue size, with the requested - // model being active, and has low KV cache. + name: "select lower queue and kv cache, no active lora", + req: extprocutils.GenerateRequest("sql-lora"), + // pod-1 will be picked because it has relatively low queue size and low KV cache. pods: []*backend.PodMetrics{ { Pod: extprocutils.FakePod(0), Metrics: backend.Metrics{ - WaitingQueueSize: 0, + WaitingQueueSize: 3, KVCacheUsagePercent: 0.2, - ActiveModels: map[string]int{ - "foo": 1, - "bar": 1, - }, }, }, { @@ -93,10 +76,6 @@ func SKIPTestHandleRequestBody(t *testing.T) { Metrics: backend.Metrics{ WaitingQueueSize: 0, KVCacheUsagePercent: 0.1, - ActiveModels: map[string]int{ - "foo": 1, - "my-model-v1": 1, - }, }, }, { @@ -104,9 +83,6 @@ func SKIPTestHandleRequestBody(t *testing.T) { Metrics: backend.Metrics{ WaitingQueueSize: 10, KVCacheUsagePercent: 0.2, - ActiveModels: map[string]int{ - "foo": 1, - }, }, }, }, @@ -120,60 +96,24 @@ func SKIPTestHandleRequestBody(t *testing.T) { { Header: &configPb.HeaderValue{ Key: "Content-Length", - RawValue: []byte("73"), + RawValue: []byte("76"), }, }, }, - wantBody: []byte("{\"max_tokens\":100,\"model\":\"my-model-v1\",\"prompt\":\"hello\",\"temperature\":0}"), - }, - } - - for _, test := range tests { - t.Run(test.name, func(t *testing.T) { - client, cleanup := setUpServer(t, test.pods, test.models) - t.Cleanup(cleanup) - want := &extProcPb.ProcessingResponse{ - Response: &extProcPb.ProcessingResponse_RequestBody{ - RequestBody: &extProcPb.BodyResponse{ - Response: &extProcPb.CommonResponse{ - HeaderMutation: &extProcPb.HeaderMutation{ - SetHeaders: test.wantHeaders, - }, - BodyMutation: &extProcPb.BodyMutation{ - Mutation: &extProcPb.BodyMutation_Body{ - Body: test.wantBody, - }, - }, + wantMetadata: &structpb.Struct{ + Fields: map[string]*structpb.Value{ + runserver.DefaultTargetEndpointKey: { + Kind: &structpb.Value_StringValue{ + StringValue: "address-1", }, }, }, - } - res, err := sendRequest(t, client, test.req) - - if (err != nil) != test.wantErr { - t.Fatalf("Unexpected error, got %v, want %v", err, test.wantErr) - } - - if diff := cmp.Diff(want, res, protocmp.Transform()); diff != "" { - t.Errorf("Unexpected response, (-want +got): %v", diff) - } - }) - } - -} - -func TestKubeInferenceModelRequest(t *testing.T) { - tests := []struct { - name string - req *extProcPb.ProcessingRequest - pods []*backend.PodMetrics - wantHeaders []*configPb.HeaderValueOption - wantMetadata *structpb.Struct - wantBody []byte - wantErr bool - }{ + }, + wantBody: []byte("{\"max_tokens\":100,\"model\":\"sql-lora-1fdg2\",\"prompt\":\"hello\",\"temperature\":0}"), + wantErr: false, + }, { - name: "select lower queue, no active lora", + name: "select active lora, low queue", req: extprocutils.GenerateRequest("sql-lora"), // pod-1 will be picked because it has relatively low queue size, with the requested // model being active, and has low KV cache. @@ -181,7 +121,7 @@ func TestKubeInferenceModelRequest(t *testing.T) { { Pod: extprocutils.FakePod(0), Metrics: backend.Metrics{ - WaitingQueueSize: 3, + WaitingQueueSize: 0, KVCacheUsagePercent: 0.2, ActiveModels: map[string]int{ "foo": 1, @@ -195,7 +135,8 @@ func TestKubeInferenceModelRequest(t *testing.T) { WaitingQueueSize: 0, KVCacheUsagePercent: 0.1, ActiveModels: map[string]int{ - "foo": 1, + "foo": 1, + "sql-lora-1fdg2": 1, }, }, }, @@ -237,15 +178,16 @@ func TestKubeInferenceModelRequest(t *testing.T) { wantErr: false, }, { - name: "select active lora, low queue", + name: "select no lora despite active model, avoid excessive queue size", req: extprocutils.GenerateRequest("sql-lora"), - // pod-1 will be picked because it has relatively low queue size, with the requested - // model being active, and has low KV cache. + // pod-2 will be picked despite it having the requested model being active + // as it's above the affinity for queue size. Also is critical, so we should + // still honor request despite all queues > 5 pods: []*backend.PodMetrics{ { Pod: extprocutils.FakePod(0), Metrics: backend.Metrics{ - WaitingQueueSize: 0, + WaitingQueueSize: 10, KVCacheUsagePercent: 0.2, ActiveModels: map[string]int{ "foo": 1, @@ -256,7 +198,7 @@ func TestKubeInferenceModelRequest(t *testing.T) { { Pod: extprocutils.FakePod(1), Metrics: backend.Metrics{ - WaitingQueueSize: 0, + WaitingQueueSize: 50, KVCacheUsagePercent: 0.1, ActiveModels: map[string]int{ "foo": 1, @@ -267,7 +209,7 @@ func TestKubeInferenceModelRequest(t *testing.T) { { Pod: extprocutils.FakePod(2), Metrics: backend.Metrics{ - WaitingQueueSize: 10, + WaitingQueueSize: 6, KVCacheUsagePercent: 0.2, ActiveModels: map[string]int{ "foo": 1, @@ -279,7 +221,7 @@ func TestKubeInferenceModelRequest(t *testing.T) { { Header: &configPb.HeaderValue{ Key: runserver.DefaultTargetEndpointKey, - RawValue: []byte("address-1"), + RawValue: []byte("address-2"), }, }, { @@ -293,7 +235,7 @@ func TestKubeInferenceModelRequest(t *testing.T) { Fields: map[string]*structpb.Value{ runserver.DefaultTargetEndpointKey: { Kind: &structpb.Value_StringValue{ - StringValue: "address-1", + StringValue: "address-2", }, }, }, @@ -302,69 +244,55 @@ func TestKubeInferenceModelRequest(t *testing.T) { wantErr: false, }, { - name: "select no lora despite active model, avoid excessive queue size", - req: extprocutils.GenerateRequest("sql-lora"), - // pod-2 will be picked despite it having the requested model being active - // as it's above the affinity for queue size. + name: "noncritical and all models past threshold, shed request", + req: extprocutils.GenerateRequest("sql-lora-sheddable"), + // no pods will be picked as all models are either above kv threshold, + // queue threshold, or both. pods: []*backend.PodMetrics{ { Pod: extprocutils.FakePod(0), Metrics: backend.Metrics{ - WaitingQueueSize: 3, + WaitingQueueSize: 6, KVCacheUsagePercent: 0.2, ActiveModels: map[string]int{ - "foo": 1, - "bar": 1, + "foo": 1, + "bar": 1, + "sql-lora-1fdg3": 1, }, }, }, { Pod: extprocutils.FakePod(1), Metrics: backend.Metrics{ - WaitingQueueSize: scheduling.GetQueueingThresholdLoRA(), - KVCacheUsagePercent: 0.1, + WaitingQueueSize: 0, + KVCacheUsagePercent: 0.85, ActiveModels: map[string]int{ "foo": 1, - "sql-lora-1fdg2": 1, + "sql-lora-1fdg3": 1, }, }, }, { Pod: extprocutils.FakePod(2), Metrics: backend.Metrics{ - WaitingQueueSize: 0, - KVCacheUsagePercent: 0.2, + WaitingQueueSize: 10, + KVCacheUsagePercent: 0.9, ActiveModels: map[string]int{ - "foo": 1, + "foo": 1, + "sql-lora-1fdg3": 1, }, }, }, }, - wantHeaders: []*configPb.HeaderValueOption{ - { - Header: &configPb.HeaderValue{ - Key: runserver.DefaultTargetEndpointKey, - RawValue: []byte("address-2"), - }, - }, - { - Header: &configPb.HeaderValue{ - Key: "Content-Length", - RawValue: []byte("76"), - }, + wantHeaders: []*configPb.HeaderValueOption{}, + wantMetadata: &structpb.Struct{}, + wantBody: []byte(""), + wantErr: false, + immediateResponse: &extProcPb.ImmediateResponse{ + Status: &envoyTypePb.HttpStatus{ + Code: envoyTypePb.StatusCode_TooManyRequests, }, }, - wantMetadata: &structpb.Struct{ - Fields: map[string]*structpb.Value{ - runserver.DefaultTargetEndpointKey: { - Kind: &structpb.Value_StringValue{ - StringValue: "address-2", - }, - }, - }, - }, - wantBody: []byte("{\"max_tokens\":100,\"model\":\"sql-lora-1fdg2\",\"prompt\":\"hello\",\"temperature\":0}"), - wantErr: false, }, } @@ -394,40 +322,23 @@ func TestKubeInferenceModelRequest(t *testing.T) { } res, err := sendRequest(t, client, test.req) - if err != nil { - if !test.wantErr { - t.Errorf("Unexpected error, got: %v, want error: %v", err, test.wantErr) + if err != nil && !test.wantErr { + t.Errorf("Unexpected error, got: %v, want error: %v", err, test.wantErr) + } + if test.immediateResponse != nil { + want = &extProcPb.ProcessingResponse{ + Response: &extProcPb.ProcessingResponse_ImmediateResponse{ + ImmediateResponse: test.immediateResponse, + }, } - } else if diff := cmp.Diff(want, res, protocmp.Transform()); diff != "" { + } + if diff := cmp.Diff(want, res, protocmp.Transform()); diff != "" { t.Errorf("Unexpected response, (-want +got): %v", diff) } }) } } -func setUpServer(t *testing.T, pods []*backend.PodMetrics, models map[string]*v1alpha1.InferenceModel) (client extProcPb.ExternalProcessor_ProcessClient, cleanup func()) { - t.Logf("Setting up ExtProc server") - server := extprocutils.StartExtProc(port, time.Second, time.Second, pods, models) - - address := fmt.Sprintf("localhost:%v", port) - // Create a grpc connection - conn, err := grpc.NewClient(address, grpc.WithTransportCredentials(insecure.NewCredentials())) - if err != nil { - log.Fatalf("Failed to connect to %v: %v", address, err) - } - - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) - client, err = extProcPb.NewExternalProcessorClient(conn).Process(ctx) - if err != nil { - log.Fatalf("Failed to create client: %v", err) - } - return client, func() { - cancel() - conn.Close() - server.GracefulStop() - } -} - func setUpHermeticServer(pods []*backend.PodMetrics) (client extProcPb.ExternalProcessor_ProcessClient, cleanup func()) { ps := make(backend.PodSet) @@ -439,9 +350,6 @@ func setUpHermeticServer(pods []*backend.PodMetrics) (client extProcPb.ExternalP pmc := &backend.FakePodMetricsClient{Res: pms} server := serverRunner.Start(backend.NewK8sDataStore(backend.WithPods(pods)), pmc) - if server == nil { - log.Fatalf("Ext-proc Start returned nil server. pods: %v, pod metric client: %v", pods, pmc) - } // Wait the reconciler to populate the datastore. time.Sleep(10 * time.Second) diff --git a/test/testdata/inferencepool-with-model-hermetic.yaml b/test/testdata/inferencepool-with-model-hermetic.yaml index a07e0f35..b098d5f3 100644 --- a/test/testdata/inferencepool-with-model-hermetic.yaml +++ b/test/testdata/inferencepool-with-model-hermetic.yaml @@ -23,3 +23,16 @@ spec: targetModels: - name: sql-lora-1fdg2 weight: 100 +--- +apiVersion: inference.networking.x-k8s.io/v1alpha1 +kind: InferenceModel +metadata: + name: inferencemodel-sheddable + namespace: default +spec: + modelName: sql-lora-sheddable + poolRef: + name: vllm-llama2-7b-pool + targetModels: + - name: sql-lora-1fdg3 + weight: 100 From cc89e72a0e9d01321c00f36afcebb42fb297b91b Mon Sep 17 00:00:00 2001 From: Benjamin Braun Date: Thu, 6 Feb 2025 23:31:34 +0000 Subject: [PATCH 3/4] Add test-case for sheddable that is not shed, fix nits and rename the non-lora test case to use a different model name. --- test/integration/hermetic_test.go | 75 +++++++++++++++++-- .../inferencepool-with-model-hermetic.yaml | 14 ++++ 2 files changed, 83 insertions(+), 6 deletions(-) diff --git a/test/integration/hermetic_test.go b/test/integration/hermetic_test.go index 09f9d04a..c7f6d108 100644 --- a/test/integration/hermetic_test.go +++ b/test/integration/hermetic_test.go @@ -61,7 +61,7 @@ func TestKubeInferenceModelRequest(t *testing.T) { }{ { name: "select lower queue and kv cache, no active lora", - req: extprocutils.GenerateRequest("sql-lora"), + req: extprocutils.GenerateRequest("my-model"), // pod-1 will be picked because it has relatively low queue size and low KV cache. pods: []*backend.PodMetrics{ { @@ -109,7 +109,7 @@ func TestKubeInferenceModelRequest(t *testing.T) { }, }, }, - wantBody: []byte("{\"max_tokens\":100,\"model\":\"sql-lora-1fdg2\",\"prompt\":\"hello\",\"temperature\":0}"), + wantBody: []byte("{\"max_tokens\":100,\"model\":\"my-model-12345\",\"prompt\":\"hello\",\"temperature\":0}"), wantErr: false, }, { @@ -180,7 +180,7 @@ func TestKubeInferenceModelRequest(t *testing.T) { { name: "select no lora despite active model, avoid excessive queue size", req: extprocutils.GenerateRequest("sql-lora"), - // pod-2 will be picked despite it having the requested model being active + // pod-2 will be picked despite it NOT having the requested model being active // as it's above the affinity for queue size. Also is critical, so we should // still honor request despite all queues > 5 pods: []*backend.PodMetrics{ @@ -294,6 +294,72 @@ func TestKubeInferenceModelRequest(t *testing.T) { }, }, }, + { + name: "noncritical, but one model has capacity, no not shed", + req: extprocutils.GenerateRequest("sql-lora-sheddable"), + // pod 0 will be picked as all other models are above threshold + pods: []*backend.PodMetrics{ + { + Pod: extprocutils.FakePod(0), + Metrics: backend.Metrics{ + WaitingQueueSize: 4, + KVCacheUsagePercent: 0.2, + ActiveModels: map[string]int{ + "foo": 1, + "bar": 1, + "sql-lora-1fdg3": 1, + }, + }, + }, + { + Pod: extprocutils.FakePod(1), + Metrics: backend.Metrics{ + WaitingQueueSize: 0, + KVCacheUsagePercent: 0.85, + ActiveModels: map[string]int{ + "foo": 1, + "sql-lora-1fdg3": 1, + }, + }, + }, + { + Pod: extprocutils.FakePod(2), + Metrics: backend.Metrics{ + WaitingQueueSize: 10, + KVCacheUsagePercent: 0.9, + ActiveModels: map[string]int{ + "foo": 1, + "sql-lora-1fdg3": 1, + }, + }, + }, + }, + wantHeaders: []*configPb.HeaderValueOption{ + { + Header: &configPb.HeaderValue{ + Key: runserver.DefaultTargetEndpointKey, + RawValue: []byte("address-0"), + }, + }, + { + Header: &configPb.HeaderValue{ + Key: "Content-Length", + RawValue: []byte("76"), + }, + }, + }, + wantMetadata: &structpb.Struct{ + Fields: map[string]*structpb.Value{ + runserver.DefaultTargetEndpointKey: { + Kind: &structpb.Value_StringValue{ + StringValue: "address-0", + }, + }, + }, + }, + wantBody: []byte("{\"max_tokens\":100,\"model\":\"sql-lora-1fdg3\",\"prompt\":\"hello\",\"temperature\":0}"), + wantErr: false, + }, } // Set up global k8sclient and extproc server runner with test environment config @@ -487,6 +553,3 @@ func readDocuments(fp string) ([][]byte, error) { } return docs, nil } -func pointer(v int32) *int32 { - return &v -} diff --git a/test/testdata/inferencepool-with-model-hermetic.yaml b/test/testdata/inferencepool-with-model-hermetic.yaml index b098d5f3..372a8512 100644 --- a/test/testdata/inferencepool-with-model-hermetic.yaml +++ b/test/testdata/inferencepool-with-model-hermetic.yaml @@ -36,3 +36,17 @@ spec: targetModels: - name: sql-lora-1fdg3 weight: 100 +--- +apiVersion: inference.networking.x-k8s.io/v1alpha1 +kind: InferenceModel +metadata: + name: inferencemodel-generic + namespace: default +spec: + modelName: my-model + criticality: Critical + poolRef: + name: vllm-llama2-7b-pool + targetModels: + - name: my-model-12345 + weight: 100 From 68c91e47482a5be5b7ccad4a2f0fe0341f0d8c0e Mon Sep 17 00:00:00 2001 From: Benjamin Braun Date: Fri, 7 Feb 2025 00:10:13 +0000 Subject: [PATCH 4/4] Fix small typo. --- test/integration/hermetic_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/integration/hermetic_test.go b/test/integration/hermetic_test.go index c7f6d108..b52cc9d7 100644 --- a/test/integration/hermetic_test.go +++ b/test/integration/hermetic_test.go @@ -295,7 +295,7 @@ func TestKubeInferenceModelRequest(t *testing.T) { }, }, { - name: "noncritical, but one model has capacity, no not shed", + name: "noncritical, but one server has capacity, do not shed", req: extprocutils.GenerateRequest("sql-lora-sheddable"), // pod 0 will be picked as all other models are above threshold pods: []*backend.PodMetrics{