diff --git a/pkg/ext-proc/benchmark/benchmark.go b/pkg/ext-proc/benchmark/benchmark.go deleted file mode 100644 index da5b2188..00000000 --- a/pkg/ext-proc/benchmark/benchmark.go +++ /dev/null @@ -1,151 +0,0 @@ -package main - -import ( - "encoding/json" - "flag" - "fmt" - "net" - "os" - "time" - - "github.com/bojand/ghz/printer" - "github.com/bojand/ghz/runner" - extProcPb "github.com/envoyproxy/go-control-plane/envoy/service/ext_proc/v3" - "github.com/jhump/protoreflect/desc" - "google.golang.org/grpc" - "google.golang.org/grpc/reflection" - "google.golang.org/protobuf/proto" - klog "k8s.io/klog/v2" - - "ext-proc/backend" - "ext-proc/handlers" - "ext-proc/scheduling" -) - -var ( - svrAddr = flag.String("server_address", "localhost:9002", "Address of the ext proc server") - totalRequests = flag.Int("total_requests", 100000, "number of requests to be sent for load test") - targetPodHeader = flag.String("targetPodHeader", "target-pod", "the header key for the target pod address to instruct Envoy to send the request to. This must match Envoy configuration.") - - // Flags when running a local ext proc server. - numFakePods = flag.Int("num_fake_pods", 200, "number of fake pods when running a local ext proc server") - numModelsPerPod = flag.Int("num_models_per_pod", 5, "number of fake models per pod when running a local ext proc server") - localServer = flag.Bool("local_server", true, "whether to start a local ext proc server") - refreshPodsInterval = flag.Duration("refreshPodsInterval", 10*time.Second, "interval to refresh pods") - refreshMetricsInterval = flag.Duration("refreshMetricsInterval", 50*time.Millisecond, "interval to refresh metrics") -) - -const ( - port = 9002 -) - -func main() { - klog.InitFlags(nil) - flag.Parse() - - if *localServer { - go startExtProc() - time.Sleep(time.Second) // wait until server is up - klog.Info("Server started") - } - - report, err := runner.Run( - "envoy.service.ext_proc.v3.ExternalProcessor.Process", - *svrAddr, - runner.WithInsecure(true), - runner.WithBinaryDataFunc(generateRequest), - runner.WithTotalRequests(uint(*totalRequests)), - ) - if err != nil { - klog.Fatal(err) - } - - printer := printer.ReportPrinter{ - Out: os.Stdout, - Report: report, - } - - printer.Print("summary") -} - -func generateRequest(mtd *desc.MethodDescriptor, callData *runner.CallData) []byte { - numModels := *numFakePods * (*numModelsPerPod) - j := map[string]interface{}{ - "model": modelName(int(callData.RequestNumber) % numModels), - "prompt": "Write as if you were a critic: San Francisco", - "max_tokens": 100, - "temperature": 0, - } - - llmReq, err := json.Marshal(j) - if err != nil { - klog.Fatal(err) - } - req := &extProcPb.ProcessingRequest{ - Request: &extProcPb.ProcessingRequest_RequestBody{ - RequestBody: &extProcPb.HttpBody{Body: llmReq}, - }, - } - data, err := proto.Marshal(req) - if err != nil { - klog.Fatal("marshaling error: ", err) - } - return data -} - -// startExtProc starts an extProc server with fake pods. -func startExtProc() { - pods, fm := fakePods() - pmc := &backend.FakePodMetricsClient{Res: fm} - - lis, err := net.Listen("tcp", fmt.Sprintf(":%d", port)) - if err != nil { - klog.Fatalf("failed to listen: %v", err) - } - - s := grpc.NewServer() - - pp := backend.NewProvider(pmc, &backend.FakePodLister{Pods: pods}) - if err := pp.Init(*refreshPodsInterval, *refreshMetricsInterval); err != nil { - klog.Fatalf("failed to initialize: %v", err) - } - extProcPb.RegisterExternalProcessorServer(s, handlers.NewServer(pp, scheduling.NewScheduler(pp), *targetPodHeader)) - - klog.Infof("Starting gRPC server on port :%v", port) - reflection.Register(s) - s.Serve(lis) -} - -func fakePods() (backend.PodSet, map[backend.Pod]*backend.PodMetrics) { - pods := make(backend.PodSet) - metrics := make(map[backend.Pod]*backend.PodMetrics, *numFakePods) - for i := 0; i < *numFakePods; i++ { - address := fmt.Sprintf("address-%v", i) - pod := backend.Pod{ - Namespace: "default", - Name: fmt.Sprintf("pod-%v", i), - Address: address, - } - pods[pod] = true - metrics[pod] = fakeMetrics(i) - } - - return pods, metrics -} - -// fakeMetrics adds numModelsPerPod number of adapters to the pod metrics. -func fakeMetrics(podNumber int) *backend.PodMetrics { - metrics := &backend.PodMetrics{ - Metrics: backend.Metrics{ - CachedModels: make(map[string]int), - }, - } - for i := 0; i < *numModelsPerPod; i++ { - metrics.CachedModels[modelName(podNumber*(*numModelsPerPod)+i)] = 0 - } - return metrics -} - -func modelName(i int) string { - return fmt.Sprintf("adapter-%v", i) -} diff --git a/pkg/ext-proc/test/benchmark/benchmark.go b/pkg/ext-proc/test/benchmark/benchmark.go new file mode 100644 index 00000000..2673078a --- /dev/null +++ b/pkg/ext-proc/test/benchmark/benchmark.go @@ -0,0 +1,97 @@ +package main + +import ( + "flag" + "fmt" + "os" + "time" + + "github.com/bojand/ghz/printer" + "github.com/bojand/ghz/runner" + "github.com/jhump/protoreflect/desc" + "google.golang.org/protobuf/proto" + klog "k8s.io/klog/v2" + + "ext-proc/backend" + "ext-proc/test" +) + +var ( + svrAddr = flag.String("server_address", "localhost:9002", "Address of the ext proc server") + totalRequests = flag.Int("total_requests", 100000, "number of requests to be sent for load test") + // Flags when running a local ext proc server. + numFakePods = flag.Int("num_fake_pods", 200, "number of fake pods when running a local ext proc server") + numModelsPerPod = flag.Int("num_models_per_pod", 5, "number of fake models per pod when running a local ext proc server") + localServer = flag.Bool("local_server", true, "whether to start a local ext proc server") + refreshPodsInterval = flag.Duration("refreshPodsInterval", 10*time.Second, "interval to refresh pods") + refreshMetricsInterval = flag.Duration("refreshMetricsInterval", 50*time.Millisecond, "interval to refresh metrics") +) + +const ( + port = 9002 +) + +func main() { + klog.InitFlags(nil) + flag.Parse() + + if *localServer { + test.StartExtProc(port, *refreshPodsInterval, *refreshMetricsInterval, fakePods()) + time.Sleep(time.Second) // wait until server is up + klog.Info("Server started") + } + + report, err := runner.Run( + "envoy.service.ext_proc.v3.ExternalProcessor.Process", + *svrAddr, + runner.WithInsecure(true), + runner.WithBinaryDataFunc(generateRequest), + runner.WithTotalRequests(uint(*totalRequests)), + ) + if err != nil { + klog.Fatal(err) + } + + printer := printer.ReportPrinter{ + Out: os.Stdout, + Report: report, + } + + printer.Print("summary") +} + +func generateRequest(mtd *desc.MethodDescriptor, callData *runner.CallData) []byte { + numModels := *numFakePods * (*numModelsPerPod) + req := test.GenerateRequest(modelName(int(callData.RequestNumber) % numModels)) + data, err := proto.Marshal(req) + if err != nil { + klog.Fatal("marshaling error: ", err) + } + return data +} + +func fakePods() []*backend.PodMetrics { + pms := make([]*backend.PodMetrics, 0, *numFakePods) + for i := 0; i < *numFakePods; i++ { + metrics := fakeMetrics(i) + pod := test.FakePod(i) + pms = append(pms, &backend.PodMetrics{Pod: pod, Metrics: metrics}) + } + + return pms +} + +// fakeMetrics adds numModelsPerPod number of adapters to the pod metrics. +func fakeMetrics(podNumber int) backend.Metrics { + metrics := backend.Metrics{ + ActiveModels: make(map[string]int), + } + for i := 0; i < *numModelsPerPod; i++ { + metrics.ActiveModels[modelName(podNumber*(*numModelsPerPod)+i)] = 0 + } + return metrics +} + +func modelName(i int) string { + return fmt.Sprintf("adapter-%v", i) +} diff --git a/pkg/ext-proc/test/hermetic_test.go b/pkg/ext-proc/test/hermetic_test.go new file mode 100644 index 00000000..51c4a219 --- /dev/null +++ b/pkg/ext-proc/test/hermetic_test.go @@ -0,0 +1,155 @@ +// Package test contains e2e tests for the ext proc while faking the backend pods. +package test + +import ( + "context" + "ext-proc/backend" + "fmt" + "log" + "testing" + "time" + + configPb "github.com/envoyproxy/go-control-plane/envoy/config/core/v3" + extProcPb "github.com/envoyproxy/go-control-plane/envoy/service/ext_proc/v3" + "github.com/google/go-cmp/cmp" + "google.golang.org/protobuf/testing/protocmp" + + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" +) + +const ( + port = 9002 +) + +func TestHandleRequestBody(t *testing.T) { + tests := []struct { + name string + req *extProcPb.ProcessingRequest + pods []*backend.PodMetrics + wantHeaders []*configPb.HeaderValueOption + wantBody []byte + wantErr bool + }{ + { + name: "success", + req: GenerateRequest("my-model"), + // pod-1 will be picked because it has relatively low queue size, with the requested + // model being active, and has low KV cache. + pods: []*backend.PodMetrics{ + { + Pod: FakePod(0), + Metrics: backend.Metrics{ + WaitingQueueSize: 0, + KVCacheUsagePercent: 0.2, + ActiveModels: map[string]int{ + "foo": 1, + "bar": 1, + }, + }, + }, + { + Pod: FakePod(1), + Metrics: backend.Metrics{ + WaitingQueueSize: 3, + KVCacheUsagePercent: 0.1, + ActiveModels: map[string]int{ + "foo": 1, + "my-model": 1, + }, + }, + }, + { + Pod: FakePod(2), + Metrics: backend.Metrics{ + WaitingQueueSize: 10, + KVCacheUsagePercent: 0.2, + ActiveModels: map[string]int{ + "foo": 1, + }, + }, + }, + }, + wantHeaders: []*configPb.HeaderValueOption{ + { + Header: &configPb.HeaderValue{ + Key: "target-pod", + RawValue: []byte("address-1"), + }, + }, + }, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + client, cleanup := setUpServer(t, test.pods) + t.Cleanup(cleanup) + want := &extProcPb.ProcessingResponse{ + Response: &extProcPb.ProcessingResponse_RequestBody{ + RequestBody: &extProcPb.BodyResponse{ + Response: &extProcPb.CommonResponse{ + HeaderMutation: &extProcPb.HeaderMutation{ + SetHeaders: test.wantHeaders, + }, + // TODO: Also check body once it's added. + // BodyMutation: &extProcPb.BodyMutation{ + // Mutation: &extProcPb.BodyMutation_Body{ + // Body: test.wantBody, + // }, + // }, + }, + }, + }, + } + res, err := sendRequest(t, client, test.req) + + if (err != nil) != test.wantErr { + t.Fatalf("Unexpected error, got %v, want %v", err, test.wantErr) + } + + if diff := cmp.Diff(want, res, protocmp.Transform()); diff != "" { + t.Errorf("Unexpected response, (-want +got): %v", diff) + } + }) + } + +} + +func setUpServer(t *testing.T, pods []*backend.PodMetrics) (client extProcPb.ExternalProcessor_ProcessClient, cleanup func()) { + server := StartExtProc(port, time.Second, time.Second, pods) + + address := fmt.Sprintf("localhost:%v", port) + // Create a grpc connection + conn, err := grpc.NewClient(address, grpc.WithTransportCredentials(insecure.NewCredentials())) + if err != nil { + log.Fatalf("Failed to connect to %v: %v", address, err) + } + + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + client, err = extProcPb.NewExternalProcessorClient(conn).Process(ctx) + if err != nil { + log.Fatalf("Failed to create client: %v", err) + } + return client, func() { + cancel() + conn.Close() + server.GracefulStop() + } +} + +func sendRequest(t *testing.T, client extProcPb.ExternalProcessor_ProcessClient, req *extProcPb.ProcessingRequest) (*extProcPb.ProcessingResponse, error) { + t.Logf("Sending request: %v", req) + if err := client.Send(req); err != nil { + t.Logf("Failed to send request %+v: %v", req, err) + return nil, err + } + + res, err := client.Recv() + if err != nil { + t.Logf("Failed to receive: %v", err) + return nil, err + } + t.Logf("Received request %+v", res) + return res, err +} diff --git a/pkg/ext-proc/test/utils.go b/pkg/ext-proc/test/utils.go new file mode 100644 index 00000000..1fafbcb3 --- /dev/null +++ b/pkg/ext-proc/test/utils.go @@ -0,0 +1,80 @@ +package test + +import ( + "encoding/json" + "fmt" + "net" + "time" + + "google.golang.org/grpc" + "google.golang.org/grpc/reflection" + klog "k8s.io/klog/v2" + + extProcPb "github.com/envoyproxy/go-control-plane/envoy/service/ext_proc/v3" + + "ext-proc/backend" + "ext-proc/handlers" + "ext-proc/scheduling" +) + +func StartExtProc(port int, refreshPodsInterval, refreshMetricsInterval time.Duration, pods []*backend.PodMetrics) *grpc.Server { + ps := make(backend.PodSet) + pms := make(map[backend.Pod]*backend.PodMetrics) + for _, pod := range pods { + ps[pod.Pod] = true + pms[pod.Pod] = pod + } + pmc := &backend.FakePodMetricsClient{Res: pms} + pp := backend.NewProvider(pmc, &backend.FakePodLister{Pods: ps}) + if err := pp.Init(refreshPodsInterval, refreshMetricsInterval); err != nil { + klog.Fatalf("failed to initialize: %v", err) + } + return startExtProc(port, pp) +} + +// startExtProc starts an extProc server with fake pods. +func startExtProc(port int, pp *backend.Provider) *grpc.Server { + lis, err := net.Listen("tcp", fmt.Sprintf(":%d", port)) + if err != nil { + klog.Fatalf("failed to listen: %v", err) + } + + s := grpc.NewServer() + + extProcPb.RegisterExternalProcessorServer(s, handlers.NewServer(pp, scheduling.NewScheduler(pp), "target-pod")) + + klog.Infof("Starting gRPC server on port :%v", port) + reflection.Register(s) + go s.Serve(lis) + return s +} + +func GenerateRequest(model string) *extProcPb.ProcessingRequest { + j := map[string]interface{}{ + "model": model, + "prompt": "hello", + "max_tokens": 100, + "temperature": 0, + } + + llmReq, err := json.Marshal(j) + if err != nil { + klog.Fatal(err) + } + req := &extProcPb.ProcessingRequest{ + Request: &extProcPb.ProcessingRequest_RequestBody{ + RequestBody: &extProcPb.HttpBody{Body: llmReq}, + }, + } + return req +} + +func FakePod(index int) backend.Pod { + address := fmt.Sprintf("address-%v", index) + pod := backend.Pod{ + Namespace: "default", + Name: fmt.Sprintf("pod-%v", index), + Address: address, + } + return pod +}