Skip to content

Commit 622dfda

Browse files
committed
simplify body streaming
1 parent bab4331 commit 622dfda

File tree

1 file changed

+6
-21
lines changed

1 file changed

+6
-21
lines changed

Diff for: pkg/body-based-routing/handlers/server.go

+6-21
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ func (s *Server) Process(srv extProcPb.ExternalProcessor_ProcessServer) error {
4646
loggerVerbose := logger.V(logutil.VERBOSE)
4747
loggerVerbose.Info("Processing")
4848

49-
reader, writer := io.Pipe()
49+
var streamedBody []byte
5050

5151
for {
5252
select {
@@ -78,7 +78,7 @@ func (s *Server) Process(srv extProcPb.ExternalProcessor_ProcessServer) error {
7878
}
7979
case *extProcPb.ProcessingRequest_RequestBody:
8080
loggerVerbose.Info("Incoming body chunk", "body", string(v.RequestBody.Body), "EoS", v.RequestBody.EndOfStream)
81-
responses, err = s.processRequestBody(ctx, req.GetRequestBody(), writer, reader, logger)
81+
responses, err = s.processRequestBody(ctx, req.GetRequestBody(), streamedBody, logger)
8282
case *extProcPb.ProcessingRequest_RequestTrailers:
8383
responses, err = s.HandleRequestTrailers(req.GetRequestTrailers())
8484
case *extProcPb.ProcessingRequest_ResponseHeaders:
@@ -105,35 +105,20 @@ func (s *Server) Process(srv extProcPb.ExternalProcessor_ProcessServer) error {
105105
}
106106
}
107107

108-
func (s *Server) processRequestBody(ctx context.Context, body *extProcPb.HttpBody, bufferWriter *io.PipeWriter, bufferReader *io.PipeReader, logger logr.Logger) ([]*extProcPb.ProcessingResponse, error) {
108+
func (s *Server) processRequestBody(ctx context.Context, body *extProcPb.HttpBody, streamedBody []byte, logger logr.Logger) ([]*extProcPb.ProcessingResponse, error) {
109109
loggerVerbose := logger.V(logutil.VERBOSE)
110110

111111
var requestBody map[string]interface{}
112112
if s.streaming {
113113
// In the stream case, we can receive multiple request bodies.
114-
// To buffer the full message, we create a goroutine with a writer.Write()
115-
// call, which will block until the corresponding reader reads from it.
116-
// We do not read until we receive the EndofStream signal, and then
117-
// decode the entire JSON body.
118-
if !body.EndOfStream {
119-
go func() {
120-
loggerVerbose.Info("Writing to stream buffer")
121-
_, err := bufferWriter.Write(body.Body)
122-
if err != nil {
123-
logger.V(logutil.DEFAULT).Error(err, "Error populating writer")
124-
}
125-
}()
126-
127-
return nil, nil
128-
}
114+
streamedBody = append(streamedBody, body.Body...)
129115

130116
if body.EndOfStream {
131117
loggerVerbose.Info("Flushing stream buffer")
132-
decoder := json.NewDecoder(bufferReader)
133-
if err := decoder.Decode(&requestBody); err != nil {
118+
err := json.Unmarshal(streamedBody, &requestBody)
119+
if err != nil {
134120
logger.V(logutil.DEFAULT).Error(err, "Error unmarshaling request body")
135121
}
136-
bufferReader.Close()
137122
}
138123
} else {
139124
if err := json.Unmarshal(body.GetBody(), &requestBody); err != nil {

0 commit comments

Comments
 (0)