diff --git a/pkg/body-based-routing/handlers/server.go b/pkg/body-based-routing/handlers/server.go index 36eb3c2f..fee8f78c 100644 --- a/pkg/body-based-routing/handlers/server.go +++ b/pkg/body-based-routing/handlers/server.go @@ -46,7 +46,7 @@ func (s *Server) Process(srv extProcPb.ExternalProcessor_ProcessServer) error { loggerVerbose := logger.V(logutil.VERBOSE) loggerVerbose.Info("Processing") - reader, writer := io.Pipe() + var streamedBody []byte for { select { @@ -78,7 +78,7 @@ func (s *Server) Process(srv extProcPb.ExternalProcessor_ProcessServer) error { } case *extProcPb.ProcessingRequest_RequestBody: loggerVerbose.Info("Incoming body chunk", "body", string(v.RequestBody.Body), "EoS", v.RequestBody.EndOfStream) - responses, err = s.processRequestBody(ctx, req.GetRequestBody(), writer, reader, logger) + responses, err = s.processRequestBody(ctx, req.GetRequestBody(), streamedBody, logger) case *extProcPb.ProcessingRequest_RequestTrailers: responses, err = s.HandleRequestTrailers(req.GetRequestTrailers()) case *extProcPb.ProcessingRequest_ResponseHeaders: @@ -105,35 +105,20 @@ func (s *Server) Process(srv extProcPb.ExternalProcessor_ProcessServer) error { } } -func (s *Server) processRequestBody(ctx context.Context, body *extProcPb.HttpBody, bufferWriter *io.PipeWriter, bufferReader *io.PipeReader, logger logr.Logger) ([]*extProcPb.ProcessingResponse, error) { +func (s *Server) processRequestBody(ctx context.Context, body *extProcPb.HttpBody, streamedBody []byte, logger logr.Logger) ([]*extProcPb.ProcessingResponse, error) { loggerVerbose := logger.V(logutil.VERBOSE) var requestBody map[string]interface{} if s.streaming { // In the stream case, we can receive multiple request bodies. - // To buffer the full message, we create a goroutine with a writer.Write() - // call, which will block until the corresponding reader reads from it. - // We do not read until we receive the EndofStream signal, and then - // decode the entire JSON body. - if !body.EndOfStream { - go func() { - loggerVerbose.Info("Writing to stream buffer") - _, err := bufferWriter.Write(body.Body) - if err != nil { - logger.V(logutil.DEFAULT).Error(err, "Error populating writer") - } - }() - - return nil, nil - } + streamedBody = append(streamedBody, body.Body...) if body.EndOfStream { loggerVerbose.Info("Flushing stream buffer") - decoder := json.NewDecoder(bufferReader) - if err := decoder.Decode(&requestBody); err != nil { + err := json.Unmarshal(streamedBody, &requestBody) + if err != nil { logger.V(logutil.DEFAULT).Error(err, "Error unmarshaling request body") } - bufferReader.Close() } } else { if err := json.Unmarshal(body.GetBody(), &requestBody); err != nil {