From 95359e7f525864aedd70bce3b206084f934eaef2 Mon Sep 17 00:00:00 2001 From: ahg-g Date: Sat, 15 Mar 2025 01:40:13 +0000 Subject: [PATCH] Refactor beforeSuite in integration tests --- pkg/epp/server/controller_manager.go | 14 +-- pkg/epp/util/testing/wrappers.go | 11 ++ test/integration/epp/hermetic_test.go | 35 +++++- test/integration/epp/test_suite.go | 162 ++++++++------------------ 4 files changed, 98 insertions(+), 124 deletions(-) diff --git a/pkg/epp/server/controller_manager.go b/pkg/epp/server/controller_manager.go index 46694f7b..05b11a2b 100644 --- a/pkg/epp/server/controller_manager.go +++ b/pkg/epp/server/controller_manager.go @@ -28,7 +28,6 @@ import ( ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/cache" "sigs.k8s.io/controller-runtime/pkg/client" - "sigs.k8s.io/controller-runtime/pkg/manager" "sigs.k8s.io/gateway-api-inference-extension/api/v1alpha2" ) @@ -39,9 +38,9 @@ func init() { utilruntime.Must(v1alpha2.AddToScheme(scheme)) } -// NewDefaultManager creates a new controller manager with default configuration. -func NewDefaultManager(namespace, name string, restConfig *rest.Config) (ctrl.Manager, error) { - defaultOpts := ctrl.Options{ +// DefaultManagerOptions returns the default options used to create the manager. +func DefaultManagerOptions(namespace, name string) ctrl.Options { + return ctrl.Options{ Scheme: scheme, Cache: cache.Options{ ByObject: map[client.Object]cache.ByObject{ @@ -67,12 +66,11 @@ func NewDefaultManager(namespace, name string, restConfig *rest.Config) (ctrl.Ma }, }, } - return NewManagerWithOptions(restConfig, defaultOpts) } -// NewManagerWithOptions creates a new controller manager with injectable options. -func NewManagerWithOptions(restConfig *rest.Config, opts manager.Options) (ctrl.Manager, error) { - manager, err := ctrl.NewManager(restConfig, opts) +// NewDefaultManager creates a new controller manager with default configuration. +func NewDefaultManager(namespace, name string, restConfig *rest.Config) (ctrl.Manager, error) { + manager, err := ctrl.NewManager(restConfig, DefaultManagerOptions(namespace, name)) if err != nil { return nil, fmt.Errorf("failed to create controller manager: %v", err) } diff --git a/pkg/epp/util/testing/wrappers.go b/pkg/epp/util/testing/wrappers.go index ed57d01f..130f017e 100644 --- a/pkg/epp/util/testing/wrappers.go +++ b/pkg/epp/util/testing/wrappers.go @@ -71,6 +71,17 @@ func (p *PodWrapper) Labels(labels map[string]string) *PodWrapper { return p } +// Labels sets the pod labels. +func (p *PodWrapper) LabelsFromPoolSelector(selector map[v1alpha2.LabelKey]v1alpha2.LabelValue) *PodWrapper { + if p.ObjectMeta.Labels == nil { + p.ObjectMeta.Labels = map[string]string{} + } + for k, v := range selector { + p.ObjectMeta.Labels[string(k)] = string(v) + } + return p +} + // SetReadyCondition sets a PodReay=true condition. func (p *PodWrapper) ReadyCondition() *PodWrapper { p.Status.Conditions = []corev1.PodCondition{{ diff --git a/test/integration/epp/hermetic_test.go b/test/integration/epp/hermetic_test.go index d02c9c13..5a3109e1 100644 --- a/test/integration/epp/hermetic_test.go +++ b/test/integration/epp/hermetic_test.go @@ -31,11 +31,42 @@ import ( "google.golang.org/protobuf/types/known/structpb" "k8s.io/component-base/metrics/legacyregistry" metricsutils "k8s.io/component-base/metrics/testutil" + "sigs.k8s.io/gateway-api-inference-extension/api/v1alpha2" backendmetrics "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend/metrics" runserver "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/server" utiltesting "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/testing" ) +var models = []*v1alpha2.InferenceModel{ + utiltesting.MakeInferenceModel("sample"). + Namespace(pool.Namespace). + ModelName("sql-lora"). + Criticality(v1alpha2.Critical). + PoolName(pool.Name). + TargetModel("sql-lora-1fdg2"). + ObjRef(), + utiltesting.MakeInferenceModel("sheddable"). + Namespace(pool.Namespace). + ModelName("sql-lora-sheddable"). + Criticality(v1alpha2.Sheddable). + PoolName(pool.Name). + TargetModel("sql-lora-1fdg3"). + ObjRef(), + utiltesting.MakeInferenceModel("generic"). + Namespace(pool.Namespace). + ModelName("my-model"). + Criticality(v1alpha2.Critical). + PoolName(pool.Name). + TargetModel("my-model-12345"). + ObjRef(), + utiltesting.MakeInferenceModel("direct-model"). + Namespace(pool.Namespace). + ModelName("direct-model"). + Criticality(v1alpha2.Critical). + PoolName(pool.Name). + ObjRef(), +} + func TestMain(m *testing.M) { cleanup := BeforeSuite() code := m.Run() @@ -304,7 +335,7 @@ func TestKubeInferenceModelRequest(t *testing.T) { } for _, test := range tests { t.Run(test.name, func(t *testing.T) { - client, cleanup := setUpHermeticServer(t, test.pods, false) + client, cleanup := startEPPServer(t, &eppOptions{podMetrics: test.pods, models: models}) t.Cleanup(cleanup) want := &extProcPb.ProcessingResponse{ Response: &extProcPb.ProcessingResponse_RequestBody{ @@ -1336,7 +1367,7 @@ func TestFullDuplexStreamed_KubeInferenceModelRequest(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { - client, cleanup := setUpHermeticServer(t, test.pods, true) + client, cleanup := startEPPServer(t, &eppOptions{podMetrics: test.pods, models: models, streamed: true}) t.Cleanup(cleanup) responses, err := streamedRequest(t, client, test.requests, len(test.wantResponses)) diff --git a/test/integration/epp/test_suite.go b/test/integration/epp/test_suite.go index b63a6775..c02fca52 100644 --- a/test/integration/epp/test_suite.go +++ b/test/integration/epp/test_suite.go @@ -34,8 +34,6 @@ import ( "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" "google.golang.org/protobuf/types/known/structpb" - corev1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" utilruntime "k8s.io/apimachinery/pkg/util/runtime" @@ -43,8 +41,6 @@ import ( "k8s.io/component-base/metrics/legacyregistry" "k8s.io/utils/ptr" ctrl "sigs.k8s.io/controller-runtime" - "sigs.k8s.io/controller-runtime/pkg/cache" - "sigs.k8s.io/controller-runtime/pkg/client" k8sclient "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/config" "sigs.k8s.io/controller-runtime/pkg/envtest" @@ -54,45 +50,49 @@ import ( "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datastore" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/metrics" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/server" - runserver "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/server" logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/logging" utiltesting "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/testing" ) const ( - port = runserver.DefaultGrpcPort + port = server.DefaultGrpcPort metricsPort = 8888 ) var ( - serverRunner *runserver.ExtProcServerRunner + serverRunner *server.ExtProcServerRunner k8sClient k8sclient.Client testEnv *envtest.Environment scheme = runtime.NewScheme() logger = logutil.NewTestLogger().V(logutil.VERBOSE) + pool = utiltesting.MakeInferencePool("vllm-llama2-7b-pool"). + Namespace("default"). + TargetPortNumber(8000). + Selector(map[string]string{"app": "vllm-llama2-7b-pool"}). + ExtensionRef("epp"). + ObjRef() ) -func setUpHermeticServer(t *testing.T, podAndMetrics map[backendmetrics.Pod]*backendmetrics.Metrics, streamed bool) (client extProcPb.ExternalProcessor_ProcessClient, cleanup func()) { +type eppOptions struct { + podMetrics map[backendmetrics.Pod]*backendmetrics.Metrics + models []*v1alpha2.InferenceModel + streamed bool +} + +func startEPPServer(t *testing.T, opts *eppOptions) (client extProcPb.ExternalProcessor_ProcessClient, cleanup func()) { // Reconfigure the TestPodMetricsClient. res := map[types.NamespacedName]*backendmetrics.Metrics{} - for pod, metrics := range podAndMetrics { + for pod, metrics := range opts.podMetrics { res[pod.NamespacedName] = metrics } serverRunner.TestPodMetricsClient.SetRes(res) - serverRunner.UseStreaming = streamed - - serverCtx, stopServer := context.WithCancel(context.Background()) + serverRunner.UseStreaming = opts.streamed - // TODO: this should be consistent with the inference pool - podLabels := map[string]string{ - "app": "vllm-llama2-7b-pool", - } - - for pod := range podAndMetrics { + for pod := range opts.podMetrics { pod := utiltesting.MakePod(pod.NamespacedName.Name). Namespace(pod.NamespacedName.Namespace). ReadyCondition(). - Labels(podLabels). + LabelsFromPoolSelector(pool.Spec.Selector). IP(pod.Address). Complete(). ObjRef() @@ -108,6 +108,16 @@ func setUpHermeticServer(t *testing.T, podAndMetrics map[backendmetrics.Pod]*bac logutil.Fatal(logger, err, "Failed to update pod status", "pod", pod) } } + + for i := range opts.models { + m := opts.models[i].DeepCopy() + logger.Info("Creating inference model", "model", m.Name) + if err := k8sClient.Create(context.Background(), m); err != nil { + logutil.Fatal(logger, err, "Unable to create inferenceModel", "modelName", m.Name) + } + } + + serverCtx, stopServer := context.WithCancel(context.Background()) go func() { if err := serverRunner.AsRunnable(logger.WithName("ext-proc")).Start(serverCtx); err != nil { logutil.Fatal(logger, err, "Failed to start ext-proc server") @@ -116,7 +126,7 @@ func setUpHermeticServer(t *testing.T, podAndMetrics map[backendmetrics.Pod]*bac // check if all pods are synced to datastore assert.EventuallyWithT(t, func(t *assert.CollectT) { - assert.Len(t, serverRunner.Datastore.PodGetAll(), len(podAndMetrics), "Datastore not synced") + assert.Len(t, serverRunner.Datastore.PodGetAll(), len(opts.podMetrics), "Datastore not synced") }, 10*time.Second, time.Second) address := fmt.Sprintf("localhost:%v", port) @@ -137,7 +147,7 @@ func setUpHermeticServer(t *testing.T, podAndMetrics map[backendmetrics.Pod]*bac stopServer() // clear created pods - for pod := range podAndMetrics { + for pod := range opts.podMetrics { pod := utiltesting.MakePod(pod.NamespacedName.Name). Namespace(pod.NamespacedName.Namespace).Complete().ObjRef() @@ -145,6 +155,11 @@ func setUpHermeticServer(t *testing.T, podAndMetrics map[backendmetrics.Pod]*bac logutil.Fatal(logger, err, "Failed to delete pod", "pod", fakePod) } } + for _, m := range opts.models { + if err := k8sClient.Delete(context.Background(), m); err != nil { + logutil.Fatal(logger, err, "Failed to delete model", "model", m.Name) + } + } // wait a little until the goroutines actually exit time.Sleep(5 * time.Second) } @@ -175,14 +190,15 @@ func BeforeSuite() func() { k8sClient, err = k8sclient.New(cfg, k8sclient.Options{Scheme: scheme}) if err != nil { logutil.Fatal(logger, err, "Failed to start k8s Client") - } else if k8sClient == nil { - logutil.Fatal(logger, nil, "No error, but returned kubernetes client is nil", "config", cfg) } // Init runtime. ctrl.SetLogger(logger) - - mgr, err := server.NewManagerWithOptions(cfg, managerTestOptions("default", "vllm-llama2-7b-pool")) + // inject options that allow multiple test runs to run + // https://github.com/kubernetes-sigs/controller-runtime/issues/2937 + opts := server.DefaultManagerOptions(pool.Namespace, pool.Name) + opts.Controller = config.Controller{SkipNameValidation: ptr.To(true)} + mgr, err := ctrl.NewManager(cfg, opts) if err != nil { logutil.Fatal(logger, err, "Failed to create controller manager") } @@ -191,80 +207,32 @@ func BeforeSuite() func() { logutil.Fatal(logger, err, "Failed to register metrics handler") } - serverRunner = runserver.NewDefaultExtProcServerRunner() + serverRunner = server.NewDefaultExtProcServerRunner() serverRunner.TestPodMetricsClient = &backendmetrics.FakePodMetricsClient{} pmf := backendmetrics.NewPodMetricsFactory(serverRunner.TestPodMetricsClient, 10*time.Millisecond) // Adjust from defaults - serverRunner.PoolName = "vllm-llama2-7b-pool" + serverRunner.PoolName = pool.Name serverRunner.Datastore = datastore.NewDatastore(context.Background(), pmf) serverRunner.SecureServing = false - if err := serverRunner.SetupWithManager(context.Background(), mgr); err != nil { + ctx := ctrl.SetupSignalHandler() + if err := serverRunner.SetupWithManager(ctx, mgr); err != nil { logutil.Fatal(logger, err, "Failed to setup server runner") } // Start the controller manager in a go routine, not blocking go func() { - if err := mgr.Start(ctrl.SetupSignalHandler()); err != nil { + if err := mgr.Start(ctx); err != nil { logutil.Fatal(logger, err, "Failed to start manager") } }() logger.Info("Setting up hermetic ExtProc server") - ns := "default" - pool := utiltesting.MakeInferencePool("vllm-llama2-7b-pool"). - Namespace(ns). - TargetPortNumber(8000). - Selector(map[string]string{"app": "vllm-llama2-7b-pool"}). - ExtensionRef("epp"). - ObjRef() if err := k8sClient.Create(context.Background(), pool); err != nil { logutil.Fatal(logger, err, "Unable to create inferencePool", "pool", pool.Name) } - models := []*v1alpha2.InferenceModel{ - utiltesting.MakeInferenceModel("sample"). - Namespace(ns). - ModelName("sql-lora"). - Criticality(v1alpha2.Critical). - PoolName(pool.Name). - TargetModel("sql-lora-1fdg2"). - ObjRef(), - utiltesting.MakeInferenceModel("sheddable"). - Namespace(ns). - ModelName("sql-lora-sheddable"). - Criticality(v1alpha2.Sheddable). - PoolName(pool.Name). - TargetModel("sql-lora-1fdg3"). - ObjRef(), - utiltesting.MakeInferenceModel("generic"). - Namespace(ns). - ModelName("my-model"). - Criticality(v1alpha2.Critical). - PoolName(pool.Name). - TargetModel("my-model-12345"). - ObjRef(), - utiltesting.MakeInferenceModel("direct-model"). - Namespace(ns). - ModelName("direct-model"). - Criticality(v1alpha2.Critical). - PoolName(pool.Name). - ObjRef(), - } - for i := range models { - logger.Info("Creating inference model", "model", models[i]) - if err := k8sClient.Create(context.Background(), models[i]); err != nil { - logutil.Fatal(logger, err, "Unable to create inferenceModel", "modelName", models[i].Name) - } - } - - assert.Eventually(nil, func() bool { - modelExist := serverRunner.Datastore.ModelGet("my-model") - synced := serverRunner.Datastore.PoolHasSynced() && modelExist != nil - return synced - }, 10*time.Second, 10*time.Millisecond) - return func() { _ = testEnv.Stop() _ = k8sClient.DeleteAllOf(context.Background(), &v1alpha2.InferencePool{}) @@ -329,11 +297,11 @@ func streamedRequest(t *testing.T, client extProcPb.ExternalProcessor_ProcessCli func makeMetadata(endpoint string) *structpb.Struct { return &structpb.Struct{ Fields: map[string]*structpb.Value{ - runserver.DefaultDestinationEndpointHintMetadataNamespace: { + server.DefaultDestinationEndpointHintMetadataNamespace: { Kind: &structpb.Value_StructValue{ StructValue: &structpb.Struct{ Fields: map[string]*structpb.Value{ - runserver.DefaultDestinationEndpointHintKey: { + server.DefaultDestinationEndpointHintKey: { Kind: &structpb.Value_StringValue{ StringValue: endpoint, }, @@ -373,37 +341,3 @@ func registerMetricsHandler(mgr manager.Manager, port int) error { } return nil } - -// inject options that allow multiple test runs to run -// https://github.com/kubernetes-sigs/controller-runtime/issues/2937 -func managerTestOptions(namespace, name string) ctrl.Options { - return ctrl.Options{ - Scheme: scheme, - Cache: cache.Options{ - ByObject: map[client.Object]cache.ByObject{ - &corev1.Pod{}: { - Namespaces: map[string]cache.Config{ - namespace: {}, - }, - }, - &v1alpha2.InferencePool{}: { - Namespaces: map[string]cache.Config{ - namespace: { - FieldSelector: fields.SelectorFromSet(fields.Set{ - "metadata.name": name, - }), - }, - }, - }, - &v1alpha2.InferenceModel{}: { - Namespaces: map[string]cache.Config{ - namespace: {}, - }, - }, - }, - }, - Controller: config.Controller{ - SkipNameValidation: ptr.To(true), - }, - } -}