Skip to content

Commit f09c5a9

Browse files
committed
Add some more unit tests for BBR
1 parent bab4331 commit f09c5a9

File tree

2 files changed

+157
-21
lines changed

2 files changed

+157
-21
lines changed

Diff for: pkg/body-based-routing/handlers/server.go

+11-21
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ func (s *Server) Process(srv extProcPb.ExternalProcessor_ProcessServer) error {
4646
loggerVerbose := logger.V(logutil.VERBOSE)
4747
loggerVerbose.Info("Processing")
4848

49-
reader, writer := io.Pipe()
49+
streamedBody := &streamedBody{}
5050

5151
for {
5252
select {
@@ -78,7 +78,7 @@ func (s *Server) Process(srv extProcPb.ExternalProcessor_ProcessServer) error {
7878
}
7979
case *extProcPb.ProcessingRequest_RequestBody:
8080
loggerVerbose.Info("Incoming body chunk", "body", string(v.RequestBody.Body), "EoS", v.RequestBody.EndOfStream)
81-
responses, err = s.processRequestBody(ctx, req.GetRequestBody(), writer, reader, logger)
81+
responses, err = s.processRequestBody(ctx, req.GetRequestBody(), streamedBody, logger)
8282
case *extProcPb.ProcessingRequest_RequestTrailers:
8383
responses, err = s.HandleRequestTrailers(req.GetRequestTrailers())
8484
case *extProcPb.ProcessingRequest_ResponseHeaders:
@@ -105,35 +105,25 @@ func (s *Server) Process(srv extProcPb.ExternalProcessor_ProcessServer) error {
105105
}
106106
}
107107

108-
func (s *Server) processRequestBody(ctx context.Context, body *extProcPb.HttpBody, bufferWriter *io.PipeWriter, bufferReader *io.PipeReader, logger logr.Logger) ([]*extProcPb.ProcessingResponse, error) {
108+
type streamedBody struct {
109+
body []byte
110+
}
111+
112+
func (s *Server) processRequestBody(ctx context.Context, body *extProcPb.HttpBody, streamedBody *streamedBody, logger logr.Logger) ([]*extProcPb.ProcessingResponse, error) {
109113
loggerVerbose := logger.V(logutil.VERBOSE)
110114

111115
var requestBody map[string]interface{}
112116
if s.streaming {
113117
// In the stream case, we can receive multiple request bodies.
114-
// To buffer the full message, we create a goroutine with a writer.Write()
115-
// call, which will block until the corresponding reader reads from it.
116-
// We do not read until we receive the EndofStream signal, and then
117-
// decode the entire JSON body.
118118
if !body.EndOfStream {
119-
go func() {
120-
loggerVerbose.Info("Writing to stream buffer")
121-
_, err := bufferWriter.Write(body.Body)
122-
if err != nil {
123-
logger.V(logutil.DEFAULT).Error(err, "Error populating writer")
124-
}
125-
}()
126-
119+
streamedBody.body = append(streamedBody.body, body.Body...)
127120
return nil, nil
128-
}
129-
130-
if body.EndOfStream {
121+
} else {
131122
loggerVerbose.Info("Flushing stream buffer")
132-
decoder := json.NewDecoder(bufferReader)
133-
if err := decoder.Decode(&requestBody); err != nil {
123+
err := json.Unmarshal(streamedBody.body, &requestBody)
124+
if err != nil {
134125
logger.V(logutil.DEFAULT).Error(err, "Error unmarshaling request body")
135126
}
136-
bufferReader.Close()
137127
}
138128
} else {
139129
if err := json.Unmarshal(body.GetBody(), &requestBody); err != nil {

Diff for: pkg/body-based-routing/handlers/server_test.go

+146
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,146 @@
1+
/*
2+
Copyright 2025 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package handlers
18+
19+
import (
20+
"context"
21+
"testing"
22+
23+
basepb "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
24+
eppb "github.com/envoyproxy/go-control-plane/envoy/service/ext_proc/v3"
25+
extProcPb "github.com/envoyproxy/go-control-plane/envoy/service/ext_proc/v3"
26+
"github.com/google/go-cmp/cmp"
27+
"google.golang.org/protobuf/testing/protocmp"
28+
"sigs.k8s.io/controller-runtime/pkg/log"
29+
logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/logging"
30+
)
31+
32+
func TestProcessRequestBody(t *testing.T) {
33+
ctx := logutil.NewTestLoggerIntoContext(context.Background())
34+
35+
cases := []struct {
36+
desc string
37+
streaming bool
38+
bodys []*extProcPb.HttpBody
39+
want []*extProcPb.ProcessingResponse
40+
}{
41+
{
42+
desc: "no-streaming",
43+
bodys: []*extProcPb.HttpBody{
44+
{
45+
Body: mapToBytes(t, map[string]any{
46+
"model": "foo",
47+
}),
48+
},
49+
},
50+
want: []*extProcPb.ProcessingResponse{
51+
{
52+
Response: &extProcPb.ProcessingResponse_RequestBody{
53+
RequestBody: &extProcPb.BodyResponse{
54+
Response: &extProcPb.CommonResponse{
55+
// Necessary so that the new headers are used in the routing decision.
56+
ClearRouteCache: true,
57+
HeaderMutation: &extProcPb.HeaderMutation{
58+
SetHeaders: []*basepb.HeaderValueOption{
59+
{
60+
Header: &basepb.HeaderValue{
61+
Key: modelHeader,
62+
RawValue: []byte("foo"),
63+
},
64+
},
65+
},
66+
},
67+
},
68+
},
69+
},
70+
},
71+
},
72+
},
73+
{
74+
desc: "streaming",
75+
streaming: true,
76+
bodys: []*extProcPb.HttpBody{
77+
{
78+
Body: mapToBytes(t, map[string]any{
79+
"model": "foo",
80+
}),
81+
},
82+
{
83+
EndOfStream: true,
84+
},
85+
},
86+
want: []*extProcPb.ProcessingResponse{
87+
{
88+
Response: &eppb.ProcessingResponse_RequestHeaders{
89+
RequestHeaders: &eppb.HeadersResponse{
90+
Response: &eppb.CommonResponse{
91+
ClearRouteCache: true,
92+
HeaderMutation: &eppb.HeaderMutation{
93+
SetHeaders: []*basepb.HeaderValueOption{
94+
{
95+
Header: &basepb.HeaderValue{
96+
Key: modelHeader,
97+
RawValue: []byte("foo"),
98+
},
99+
},
100+
},
101+
},
102+
},
103+
},
104+
},
105+
},
106+
{
107+
Response: &extProcPb.ProcessingResponse_RequestBody{
108+
RequestBody: &extProcPb.BodyResponse{
109+
Response: &extProcPb.CommonResponse{
110+
BodyMutation: &extProcPb.BodyMutation{
111+
Mutation: &extProcPb.BodyMutation_StreamedResponse{
112+
StreamedResponse: &extProcPb.StreamedBodyResponse{
113+
Body: mapToBytes(t, map[string]any{
114+
"model": "foo",
115+
}),
116+
EndOfStream: true,
117+
},
118+
},
119+
},
120+
},
121+
},
122+
},
123+
},
124+
},
125+
},
126+
}
127+
128+
for _, tc := range cases {
129+
t.Run(tc.desc, func(t *testing.T) {
130+
srv := NewServer(tc.streaming)
131+
streamedBody := &streamedBody{}
132+
for i, body := range tc.bodys {
133+
got, err := srv.processRequestBody(context.Background(), body, streamedBody, log.FromContext(ctx))
134+
if err != nil {
135+
t.Fatalf("processRequestBody(): %v", err)
136+
}
137+
138+
if i == len(tc.bodys)-1 {
139+
if diff := cmp.Diff(tc.want, got, protocmp.Transform()); diff != "" {
140+
t.Errorf("processRequestBody returned unexpected response, diff(-want, +got): %v", diff)
141+
}
142+
}
143+
}
144+
})
145+
}
146+
}

0 commit comments

Comments
 (0)