Skip to content

Commit 35efa9a

Browse files
committed
add initial integration test for body-based routing extension
1 parent b40de04 commit 35efa9a

File tree

2 files changed

+180
-12
lines changed

2 files changed

+180
-12
lines changed

pkg/body-based-routing/server/runserver.go

+16-12
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,8 @@ import (
3232

3333
// ExtProcServerRunner provides methods to manage an external process server.
3434
type ExtProcServerRunner struct {
35-
GrpcPort int
35+
GrpcPort int
36+
SecureServing bool
3637
}
3738

3839
// Default values for CLI flags in main
@@ -42,26 +43,29 @@ const (
4243

4344
func NewDefaultExtProcServerRunner() *ExtProcServerRunner {
4445
return &ExtProcServerRunner{
45-
GrpcPort: DefaultGrpcPort,
46+
GrpcPort: DefaultGrpcPort,
47+
SecureServing: true,
4648
}
4749
}
4850

4951
// AsRunnable returns a Runnable that can be used to start the ext-proc gRPC server.
5052
// The runnable implements LeaderElectionRunnable with leader election disabled.
5153
func (r *ExtProcServerRunner) AsRunnable(logger logr.Logger) manager.Runnable {
5254
return runnable.NoLeaderElection(manager.RunnableFunc(func(ctx context.Context) error {
53-
cert, err := tlsutil.CreateSelfSignedTLSCertificate(logger)
54-
if err != nil {
55-
logger.Error(err, "Failed to create self signed certificate")
56-
return err
55+
var srv *grpc.Server
56+
if r.SecureServing {
57+
cert, err := tlsutil.CreateSelfSignedTLSCertificate(logger)
58+
if err != nil {
59+
logger.Error(err, "Failed to create self signed certificate")
60+
return err
61+
}
62+
creds := credentials.NewTLS(&tls.Config{Certificates: []tls.Certificate{cert}})
63+
srv = grpc.NewServer(grpc.Creds(creds))
64+
} else {
65+
srv = grpc.NewServer()
5766
}
58-
creds := credentials.NewTLS(&tls.Config{Certificates: []tls.Certificate{cert}})
5967

60-
srv := grpc.NewServer(grpc.Creds(creds))
61-
extProcPb.RegisterExternalProcessorServer(
62-
srv,
63-
handlers.NewServer(),
64-
)
68+
extProcPb.RegisterExternalProcessorServer(srv, handlers.NewServer())
6569

6670
// Forward to the gRPC runnable.
6771
return runnable.GRPCServer("ext-proc", srv, r.GrpcPort).Start(ctx)

test/integration/bbr/hermetic_test.go

+164
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,164 @@
1+
/*
2+
Copyright 2025 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
// Package bbr contains integration tests for the body-based routing extension.
18+
package bbr
19+
20+
import (
21+
"context"
22+
"encoding/json"
23+
"fmt"
24+
"testing"
25+
"time"
26+
27+
configPb "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
28+
extProcPb "github.com/envoyproxy/go-control-plane/envoy/service/ext_proc/v3"
29+
"github.com/go-logr/logr"
30+
"github.com/google/go-cmp/cmp"
31+
"google.golang.org/grpc"
32+
"google.golang.org/grpc/credentials/insecure"
33+
"google.golang.org/protobuf/testing/protocmp"
34+
runserver "sigs.k8s.io/gateway-api-inference-extension/pkg/body-based-routing/server"
35+
logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/logging"
36+
)
37+
38+
const port = runserver.DefaultGrpcPort
39+
40+
var logger = logutil.NewTestLogger().V(logutil.VERBOSE)
41+
42+
func TestBodyBasedRouting(t *testing.T) {
43+
tests := []struct {
44+
name string
45+
req *extProcPb.ProcessingRequest
46+
wantHeaders []*configPb.HeaderValueOption
47+
wantBody []byte
48+
wantErr bool
49+
}{
50+
{
51+
name: "success adding model parameter to header",
52+
req: generateRequest(logger, "llama"),
53+
wantHeaders: []*configPb.HeaderValueOption{
54+
{
55+
Header: &configPb.HeaderValue{
56+
Key: "X-Gateway-Model-Name",
57+
RawValue: []byte("llama"),
58+
},
59+
},
60+
},
61+
wantBody: []byte("{\"max_tokens\":100,\"model\":\"llama\",\"prompt\":\"test1\",\"temperature\":0}"),
62+
wantErr: false,
63+
},
64+
}
65+
66+
for _, test := range tests {
67+
t.Run(test.name, func(t *testing.T) {
68+
client, cleanup := setUpHermeticServer()
69+
t.Cleanup(cleanup)
70+
71+
want := &extProcPb.ProcessingResponse{
72+
Response: &extProcPb.ProcessingResponse_RequestBody{
73+
RequestBody: &extProcPb.BodyResponse{
74+
Response: &extProcPb.CommonResponse{
75+
HeaderMutation: &extProcPb.HeaderMutation{
76+
SetHeaders: test.wantHeaders,
77+
},
78+
ClearRouteCache: true,
79+
},
80+
},
81+
},
82+
}
83+
84+
res, err := sendRequest(t, client, test.req)
85+
if err != nil && !test.wantErr {
86+
t.Errorf("Unexpected error, got: %v, want error: %v", err, test.wantErr)
87+
}
88+
if diff := cmp.Diff(want, res, protocmp.Transform()); diff != "" {
89+
t.Errorf("Unexpected response, (-want +got): %v", diff)
90+
}
91+
})
92+
}
93+
}
94+
95+
func setUpHermeticServer() (client extProcPb.ExternalProcessor_ProcessClient, cleanup func()) {
96+
serverCtx, stopServer := context.WithCancel(context.Background())
97+
serverRunner := runserver.NewDefaultExtProcServerRunner()
98+
serverRunner.SecureServing = false
99+
100+
go func() {
101+
if err := serverRunner.AsRunnable(logger.WithName("ext-proc")).Start(serverCtx); err != nil {
102+
logutil.Fatal(logger, err, "Failed to start ext-proc server")
103+
}
104+
}()
105+
106+
address := fmt.Sprintf("localhost:%v", port)
107+
// Create a grpc connection
108+
conn, err := grpc.NewClient(address, grpc.WithTransportCredentials(insecure.NewCredentials()))
109+
if err != nil {
110+
logutil.Fatal(logger, err, "Failed to connect", "address", address)
111+
}
112+
113+
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
114+
client, err = extProcPb.NewExternalProcessorClient(conn).Process(ctx)
115+
if err != nil {
116+
logutil.Fatal(logger, err, "Failed to create client")
117+
}
118+
return client, func() {
119+
cancel()
120+
conn.Close()
121+
stopServer()
122+
123+
// wait a little until the goroutines actually exit
124+
time.Sleep(5 * time.Second)
125+
}
126+
}
127+
128+
func generateRequest(logger logr.Logger, model string) *extProcPb.ProcessingRequest {
129+
j := map[string]interface{}{
130+
"prompt": "test1",
131+
"max_tokens": 100,
132+
"temperature": 0,
133+
}
134+
if model != "" {
135+
j["model"] = model
136+
}
137+
138+
llmReq, err := json.Marshal(j)
139+
if err != nil {
140+
logutil.Fatal(logger, err, "Failed to unmarshal LLM request")
141+
}
142+
req := &extProcPb.ProcessingRequest{
143+
Request: &extProcPb.ProcessingRequest_RequestBody{
144+
RequestBody: &extProcPb.HttpBody{Body: llmReq},
145+
},
146+
}
147+
return req
148+
}
149+
150+
func sendRequest(t *testing.T, client extProcPb.ExternalProcessor_ProcessClient, req *extProcPb.ProcessingRequest) (*extProcPb.ProcessingResponse, error) {
151+
t.Logf("Sending request: %v", req)
152+
if err := client.Send(req); err != nil {
153+
t.Logf("Failed to send request %+v: %v", req, err)
154+
return nil, err
155+
}
156+
157+
res, err := client.Recv()
158+
if err != nil {
159+
t.Logf("Failed to receive: %v", err)
160+
return nil, err
161+
}
162+
t.Logf("Received request %+v", res)
163+
return res, err
164+
}

0 commit comments

Comments
 (0)