diff --git a/test/integration/hermetic_test.go b/test/integration/hermetic_test.go index 95ad4908..b52cc9d7 100644 --- a/test/integration/hermetic_test.go +++ b/test/integration/hermetic_test.go @@ -17,6 +17,7 @@ import ( configPb "github.com/envoyproxy/go-control-plane/envoy/config/core/v3" extProcPb "github.com/envoyproxy/go-control-plane/envoy/service/ext_proc/v3" + envoyTypePb "github.com/envoyproxy/go-control-plane/envoy/type/v3" "github.com/google/go-cmp/cmp" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" @@ -47,32 +48,73 @@ var ( scheme = runtime.NewScheme() ) -func SKIPTestHandleRequestBody(t *testing.T) { +func TestKubeInferenceModelRequest(t *testing.T) { tests := []struct { - name string - req *extProcPb.ProcessingRequest - pods []*backend.PodMetrics - models map[string]*v1alpha1.InferenceModel - wantHeaders []*configPb.HeaderValueOption - wantBody []byte - wantErr bool + name string + req *extProcPb.ProcessingRequest + pods []*backend.PodMetrics + wantHeaders []*configPb.HeaderValueOption + wantMetadata *structpb.Struct + wantBody []byte + wantErr bool + immediateResponse *extProcPb.ImmediateResponse }{ { - name: "success", + name: "select lower queue and kv cache, no active lora", req: extprocutils.GenerateRequest("my-model"), - models: map[string]*v1alpha1.InferenceModel{ - "my-model": { - Spec: v1alpha1.InferenceModelSpec{ - ModelName: "my-model", - TargetModels: []v1alpha1.TargetModel{ - { - Name: "my-model-v1", - Weight: pointer(100), - }, + // pod-1 will be picked because it has relatively low queue size and low KV cache. + pods: []*backend.PodMetrics{ + { + Pod: extprocutils.FakePod(0), + Metrics: backend.Metrics{ + WaitingQueueSize: 3, + KVCacheUsagePercent: 0.2, + }, + }, + { + Pod: extprocutils.FakePod(1), + Metrics: backend.Metrics{ + WaitingQueueSize: 0, + KVCacheUsagePercent: 0.1, + }, + }, + { + Pod: extprocutils.FakePod(2), + Metrics: backend.Metrics{ + WaitingQueueSize: 10, + KVCacheUsagePercent: 0.2, + }, + }, + }, + wantHeaders: []*configPb.HeaderValueOption{ + { + Header: &configPb.HeaderValue{ + Key: runserver.DefaultTargetEndpointKey, + RawValue: []byte("address-1"), + }, + }, + { + Header: &configPb.HeaderValue{ + Key: "Content-Length", + RawValue: []byte("76"), + }, + }, + }, + wantMetadata: &structpb.Struct{ + Fields: map[string]*structpb.Value{ + runserver.DefaultTargetEndpointKey: { + Kind: &structpb.Value_StringValue{ + StringValue: "address-1", }, }, }, }, + wantBody: []byte("{\"max_tokens\":100,\"model\":\"my-model-12345\",\"prompt\":\"hello\",\"temperature\":0}"), + wantErr: false, + }, + { + name: "select active lora, low queue", + req: extprocutils.GenerateRequest("sql-lora"), // pod-1 will be picked because it has relatively low queue size, with the requested // model being active, and has low KV cache. pods: []*backend.PodMetrics{ @@ -93,8 +135,8 @@ func SKIPTestHandleRequestBody(t *testing.T) { WaitingQueueSize: 0, KVCacheUsagePercent: 0.1, ActiveModels: map[string]int{ - "foo": 1, - "my-model-v1": 1, + "foo": 1, + "sql-lora-1fdg2": 1, }, }, }, @@ -119,67 +161,67 @@ func SKIPTestHandleRequestBody(t *testing.T) { { Header: &configPb.HeaderValue{ Key: "Content-Length", - RawValue: []byte("73"), + RawValue: []byte("76"), }, }, }, - wantBody: []byte("{\"max_tokens\":100,\"model\":\"my-model-v1\",\"prompt\":\"hello\",\"temperature\":0}"), - }, - } - - for _, test := range tests { - t.Run(test.name, func(t *testing.T) { - client, cleanup := setUpServer(t, test.pods, test.models) - t.Cleanup(cleanup) - want := &extProcPb.ProcessingResponse{ - Response: &extProcPb.ProcessingResponse_RequestBody{ - RequestBody: &extProcPb.BodyResponse{ - Response: &extProcPb.CommonResponse{ - HeaderMutation: &extProcPb.HeaderMutation{ - SetHeaders: test.wantHeaders, - }, - BodyMutation: &extProcPb.BodyMutation{ - Mutation: &extProcPb.BodyMutation_Body{ - Body: test.wantBody, - }, - }, + wantMetadata: &structpb.Struct{ + Fields: map[string]*structpb.Value{ + runserver.DefaultTargetEndpointKey: { + Kind: &structpb.Value_StringValue{ + StringValue: "address-1", }, }, }, - } - res, err := sendRequest(t, client, test.req) - - if (err != nil) != test.wantErr { - t.Fatalf("Unexpected error, got %v, want %v", err, test.wantErr) - } - - if diff := cmp.Diff(want, res, protocmp.Transform()); diff != "" { - t.Errorf("Unexpected response, (-want +got): %v", diff) - } - }) - } - -} - -func TestKubeInferenceModelRequest(t *testing.T) { - tests := []struct { - name string - req *extProcPb.ProcessingRequest - wantHeaders []*configPb.HeaderValueOption - wantMetadata *structpb.Struct - wantBody []byte - wantErr bool - }{ + }, + wantBody: []byte("{\"max_tokens\":100,\"model\":\"sql-lora-1fdg2\",\"prompt\":\"hello\",\"temperature\":0}"), + wantErr: false, + }, { - name: "success", + name: "select no lora despite active model, avoid excessive queue size", req: extprocutils.GenerateRequest("sql-lora"), - // pod-1 will be picked because it has relatively low queue size, with the requested - // model being active, and has low KV cache. + // pod-2 will be picked despite it NOT having the requested model being active + // as it's above the affinity for queue size. Also is critical, so we should + // still honor request despite all queues > 5 + pods: []*backend.PodMetrics{ + { + Pod: extprocutils.FakePod(0), + Metrics: backend.Metrics{ + WaitingQueueSize: 10, + KVCacheUsagePercent: 0.2, + ActiveModels: map[string]int{ + "foo": 1, + "bar": 1, + }, + }, + }, + { + Pod: extprocutils.FakePod(1), + Metrics: backend.Metrics{ + WaitingQueueSize: 50, + KVCacheUsagePercent: 0.1, + ActiveModels: map[string]int{ + "foo": 1, + "sql-lora-1fdg2": 1, + }, + }, + }, + { + Pod: extprocutils.FakePod(2), + Metrics: backend.Metrics{ + WaitingQueueSize: 6, + KVCacheUsagePercent: 0.2, + ActiveModels: map[string]int{ + "foo": 1, + }, + }, + }, + }, wantHeaders: []*configPb.HeaderValueOption{ { Header: &configPb.HeaderValue{ Key: runserver.DefaultTargetEndpointKey, - RawValue: []byte("address-1"), + RawValue: []byte("address-2"), }, }, { @@ -193,7 +235,7 @@ func TestKubeInferenceModelRequest(t *testing.T) { Fields: map[string]*structpb.Value{ runserver.DefaultTargetEndpointKey: { Kind: &structpb.Value_StringValue{ - StringValue: "address-1", + StringValue: "address-2", }, }, }, @@ -201,40 +243,122 @@ func TestKubeInferenceModelRequest(t *testing.T) { wantBody: []byte("{\"max_tokens\":100,\"model\":\"sql-lora-1fdg2\",\"prompt\":\"hello\",\"temperature\":0}"), wantErr: false, }, - } - - pods := []*backend.PodMetrics{ { - Pod: extprocutils.FakePod(0), - Metrics: backend.Metrics{ - WaitingQueueSize: 0, - KVCacheUsagePercent: 0.2, - ActiveModels: map[string]int{ - "foo": 1, - "bar": 1, + name: "noncritical and all models past threshold, shed request", + req: extprocutils.GenerateRequest("sql-lora-sheddable"), + // no pods will be picked as all models are either above kv threshold, + // queue threshold, or both. + pods: []*backend.PodMetrics{ + { + Pod: extprocutils.FakePod(0), + Metrics: backend.Metrics{ + WaitingQueueSize: 6, + KVCacheUsagePercent: 0.2, + ActiveModels: map[string]int{ + "foo": 1, + "bar": 1, + "sql-lora-1fdg3": 1, + }, + }, + }, + { + Pod: extprocutils.FakePod(1), + Metrics: backend.Metrics{ + WaitingQueueSize: 0, + KVCacheUsagePercent: 0.85, + ActiveModels: map[string]int{ + "foo": 1, + "sql-lora-1fdg3": 1, + }, + }, + }, + { + Pod: extprocutils.FakePod(2), + Metrics: backend.Metrics{ + WaitingQueueSize: 10, + KVCacheUsagePercent: 0.9, + ActiveModels: map[string]int{ + "foo": 1, + "sql-lora-1fdg3": 1, + }, + }, }, }, - }, - { - Pod: extprocutils.FakePod(1), - Metrics: backend.Metrics{ - WaitingQueueSize: 0, - KVCacheUsagePercent: 0.1, - ActiveModels: map[string]int{ - "foo": 1, - "sql-lora-1fdg2": 1, + wantHeaders: []*configPb.HeaderValueOption{}, + wantMetadata: &structpb.Struct{}, + wantBody: []byte(""), + wantErr: false, + immediateResponse: &extProcPb.ImmediateResponse{ + Status: &envoyTypePb.HttpStatus{ + Code: envoyTypePb.StatusCode_TooManyRequests, }, }, }, { - Pod: extprocutils.FakePod(2), - Metrics: backend.Metrics{ - WaitingQueueSize: 10, - KVCacheUsagePercent: 0.2, - ActiveModels: map[string]int{ - "foo": 1, + name: "noncritical, but one server has capacity, do not shed", + req: extprocutils.GenerateRequest("sql-lora-sheddable"), + // pod 0 will be picked as all other models are above threshold + pods: []*backend.PodMetrics{ + { + Pod: extprocutils.FakePod(0), + Metrics: backend.Metrics{ + WaitingQueueSize: 4, + KVCacheUsagePercent: 0.2, + ActiveModels: map[string]int{ + "foo": 1, + "bar": 1, + "sql-lora-1fdg3": 1, + }, + }, + }, + { + Pod: extprocutils.FakePod(1), + Metrics: backend.Metrics{ + WaitingQueueSize: 0, + KVCacheUsagePercent: 0.85, + ActiveModels: map[string]int{ + "foo": 1, + "sql-lora-1fdg3": 1, + }, + }, + }, + { + Pod: extprocutils.FakePod(2), + Metrics: backend.Metrics{ + WaitingQueueSize: 10, + KVCacheUsagePercent: 0.9, + ActiveModels: map[string]int{ + "foo": 1, + "sql-lora-1fdg3": 1, + }, + }, + }, + }, + wantHeaders: []*configPb.HeaderValueOption{ + { + Header: &configPb.HeaderValue{ + Key: runserver.DefaultTargetEndpointKey, + RawValue: []byte("address-0"), + }, + }, + { + Header: &configPb.HeaderValue{ + Key: "Content-Length", + RawValue: []byte("76"), + }, }, }, + wantMetadata: &structpb.Struct{ + Fields: map[string]*structpb.Value{ + runserver.DefaultTargetEndpointKey: { + Kind: &structpb.Value_StringValue{ + StringValue: "address-0", + }, + }, + }, + }, + wantBody: []byte("{\"max_tokens\":100,\"model\":\"sql-lora-1fdg3\",\"prompt\":\"hello\",\"temperature\":0}"), + wantErr: false, }, } @@ -243,7 +367,7 @@ func TestKubeInferenceModelRequest(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { - client, cleanup := setUpHermeticServer(t, pods) + client, cleanup := setUpHermeticServer(test.pods) t.Cleanup(cleanup) want := &extProcPb.ProcessingResponse{ Response: &extProcPb.ProcessingResponse_RequestBody{ @@ -264,78 +388,24 @@ func TestKubeInferenceModelRequest(t *testing.T) { } res, err := sendRequest(t, client, test.req) - if err != nil { - if !test.wantErr { - t.Errorf("Unexpected error, got: %v, want error: %v", err, test.wantErr) + if err != nil && !test.wantErr { + t.Errorf("Unexpected error, got: %v, want error: %v", err, test.wantErr) + } + if test.immediateResponse != nil { + want = &extProcPb.ProcessingResponse{ + Response: &extProcPb.ProcessingResponse_ImmediateResponse{ + ImmediateResponse: test.immediateResponse, + }, } - } else if diff := cmp.Diff(want, res, protocmp.Transform()); diff != "" { + } + if diff := cmp.Diff(want, res, protocmp.Transform()); diff != "" { t.Errorf("Unexpected response, (-want +got): %v", diff) } }) } } -func setUpServer(t *testing.T, pods []*backend.PodMetrics, models map[string]*v1alpha1.InferenceModel) (client extProcPb.ExternalProcessor_ProcessClient, cleanup func()) { - t.Logf("Setting up ExtProc server") - server := extprocutils.StartExtProc(port, time.Second, time.Second, pods, models) - - address := fmt.Sprintf("localhost:%v", port) - // Create a grpc connection - conn, err := grpc.NewClient(address, grpc.WithTransportCredentials(insecure.NewCredentials())) - if err != nil { - log.Fatalf("Failed to connect to %v: %v", address, err) - } - - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) - client, err = extProcPb.NewExternalProcessorClient(conn).Process(ctx) - if err != nil { - log.Fatalf("Failed to create client: %v", err) - } - return client, func() { - cancel() - conn.Close() - server.GracefulStop() - } -} - -func setUpHermeticServer(t *testing.T, pods []*backend.PodMetrics) (client extProcPb.ExternalProcessor_ProcessClient, cleanup func()) { - t.Logf("Setting up hermetic ExtProc server") - klog.InitFlags(nil) - flag.Parse() - // Configure klog verbosity levels to print ext proc logs. - _ = flag.Lookup("v").Value.Set("3") - - // Unmarshal CRDs from file into structs - manifestsPath := filepath.Join("..", "testdata", "inferencepool-with-model-hermetic.yaml") - docs, err := readDocuments(manifestsPath) - if err != nil { - log.Fatalf("Can't read object manifests at path %v, %v", manifestsPath, err) - } - - for _, doc := range docs { - inferenceModel := &v1alpha1.InferenceModel{} - if err = yaml.Unmarshal(doc, inferenceModel); err != nil { - log.Fatalf("Can't unmarshal object: %v", doc) - } - if inferenceModel.Kind == "InferenceModel" { - t.Logf("Creating inference model: %+v", inferenceModel) - if err := k8sClient.Create(context.Background(), inferenceModel); err != nil { - log.Fatalf("unable to create inferenceModel %v: %v", inferenceModel.Name, err) - } - } - } - for _, doc := range docs { - inferencePool := &v1alpha1.InferencePool{} - if err = yaml.Unmarshal(doc, inferencePool); err != nil { - log.Fatalf("Can't unmarshal object: %v", doc) - } - if inferencePool.Kind == "InferencePool" { - t.Logf("Creating inference pool: %+v", inferencePool) - if err := k8sClient.Create(context.Background(), inferencePool); err != nil { - log.Fatalf("unable to create inferencePool %v: %v", inferencePool.Name, err) - } - } - } +func setUpHermeticServer(pods []*backend.PodMetrics) (client extProcPb.ExternalProcessor_ProcessClient, cleanup func()) { ps := make(backend.PodSet) pms := make(map[backend.Pod]*backend.PodMetrics) @@ -346,9 +416,6 @@ func setUpHermeticServer(t *testing.T, pods []*backend.PodMetrics) (client extPr pmc := &backend.FakePodMetricsClient{Res: pms} server := serverRunner.Start(backend.NewK8sDataStore(backend.WithPods(pods)), pmc) - if err != nil { - log.Fatalf("Ext-proc failed with the err: %v", err) - } // Wait the reconciler to populate the datastore. time.Sleep(10 * time.Second) @@ -408,6 +475,44 @@ func BeforeSuit() { go func() { serverRunner.StartManager() }() + + klog.Info("Setting up hermetic ExtProc server") + klog.InitFlags(nil) + flag.Parse() + // Configure klog verbosity levels to print ext proc logs. + _ = flag.Lookup("v").Value.Set("3") + + // Unmarshal CRDs from file into structs + manifestsPath := filepath.Join("..", "testdata", "inferencepool-with-model-hermetic.yaml") + docs, err := readDocuments(manifestsPath) + if err != nil { + log.Fatalf("Can't read object manifests at path %v, %v", manifestsPath, err) + } + + for _, doc := range docs { + inferenceModel := &v1alpha1.InferenceModel{} + if err = yaml.Unmarshal(doc, inferenceModel); err != nil { + log.Fatalf("Can't unmarshal object: %v", doc) + } + if inferenceModel.Kind == "InferenceModel" { + klog.Infof("Creating inference model: %+v", inferenceModel) + if err := k8sClient.Create(context.Background(), inferenceModel); err != nil { + log.Fatalf("unable to create inferenceModel %v: %v", inferenceModel.Name, err) + } + } + } + for _, doc := range docs { + inferencePool := &v1alpha1.InferencePool{} + if err = yaml.Unmarshal(doc, inferencePool); err != nil { + log.Fatalf("Can't unmarshal object: %v", doc) + } + if inferencePool.Kind == "InferencePool" { + klog.Infof("Creating inference pool: %+v", inferencePool) + if err := k8sClient.Create(context.Background(), inferencePool); err != nil { + log.Fatalf("unable to create inferencePool %v: %v", inferencePool.Name, err) + } + } + } } func sendRequest(t *testing.T, client extProcPb.ExternalProcessor_ProcessClient, req *extProcPb.ProcessingRequest) (*extProcPb.ProcessingResponse, error) { @@ -448,6 +553,3 @@ func readDocuments(fp string) ([][]byte, error) { } return docs, nil } -func pointer(v int32) *int32 { - return &v -} diff --git a/test/testdata/inferencepool-with-model-hermetic.yaml b/test/testdata/inferencepool-with-model-hermetic.yaml index a07e0f35..372a8512 100644 --- a/test/testdata/inferencepool-with-model-hermetic.yaml +++ b/test/testdata/inferencepool-with-model-hermetic.yaml @@ -23,3 +23,30 @@ spec: targetModels: - name: sql-lora-1fdg2 weight: 100 +--- +apiVersion: inference.networking.x-k8s.io/v1alpha1 +kind: InferenceModel +metadata: + name: inferencemodel-sheddable + namespace: default +spec: + modelName: sql-lora-sheddable + poolRef: + name: vllm-llama2-7b-pool + targetModels: + - name: sql-lora-1fdg3 + weight: 100 +--- +apiVersion: inference.networking.x-k8s.io/v1alpha1 +kind: InferenceModel +metadata: + name: inferencemodel-generic + namespace: default +spec: + modelName: my-model + criticality: Critical + poolRef: + name: vllm-llama2-7b-pool + targetModels: + - name: my-model-12345 + weight: 100