@@ -55,8 +55,7 @@ func (s *StreamingServer) Process(srv extProcPb.ExternalProcessor_ProcessServer)
55
55
RequestState : RequestReceived ,
56
56
}
57
57
58
- reader , writer := io .Pipe ()
59
- decoder := json .NewDecoder (reader )
58
+ var body []byte
60
59
61
60
var requestBody , responseBody map [string ]interface {}
62
61
// Create error handling var as each request should only report once for
@@ -95,28 +94,18 @@ func (s *StreamingServer) Process(srv extProcPb.ExternalProcessor_ProcessServer)
95
94
case * extProcPb.ProcessingRequest_RequestBody :
96
95
loggerVerbose .Info ("Incoming body chunk" , "body" , string (v .RequestBody .Body ), "EoS" , v .RequestBody .EndOfStream )
97
96
// In the stream case, we can receive multiple request bodies.
98
- // To buffer the full message, we create a goroutine with a writer.Write()
99
- // call, which will block until the corresponding reader reads from it.
100
- // We do not read until we receive the EndofStream signal, and then
101
- // decode the entire JSON body.
102
- go func () {
103
- _ , err := writer .Write (v .RequestBody .Body )
104
- if err != nil {
105
- logger .V (logutil .DEFAULT ).Error (err , "Error populating writer" )
106
- }
107
- }()
97
+ body = append (body , v .RequestBody .Body ... )
108
98
109
99
// Message is buffered, we can read and decode.
110
100
if v .RequestBody .EndOfStream {
111
101
loggerVerbose .Info ("decoding" )
112
- err = decoder . Decode ( & requestBody )
102
+ err = json . Unmarshal ( body , & requestBody )
113
103
if err != nil {
114
104
logger .V (logutil .DEFAULT ).Error (err , "Error unmarshaling request body" )
115
105
}
116
- // Body stream complete. Close the reader pipe, and start anew for response.
117
- reader .Close ()
118
- reader , writer = io .Pipe ()
119
- decoder = json .NewDecoder (reader )
106
+
107
+ // Body stream complete. Allocate empty slice for response to use.
108
+ body = []byte {}
120
109
121
110
reqCtx , err = s .HandleRequestBody (ctx , reqCtx , req , requestBody )
122
111
if err != nil {
@@ -184,25 +173,18 @@ func (s *StreamingServer) Process(srv extProcPb.ExternalProcessor_ProcessServer)
184
173
},
185
174
}
186
175
} else {
187
- go func () {
188
- _ , err := writer .Write (v .ResponseBody .Body )
189
- if err != nil {
190
- logger .V (logutil .DEFAULT ).Error (err , "Error populating writer" )
191
- }
192
- }()
176
+ body = append (body , v .ResponseBody .Body ... )
193
177
194
178
// Message is buffered, we can read and decode.
195
179
if v .ResponseBody .EndOfStream {
196
180
// Don't send a 500 on a response error. Just let the message passthrough and log our error for debugging purposes.
197
181
// We assume the body is valid JSON, err messages are not guaranteed to be json, and so capturing and sending a 500 obfuscates the response message.
198
182
// using the standard 'err' var will send an immediate error response back to the caller.
199
183
var responseErr error
200
- responseErr = decoder . Decode ( & responseBody )
184
+ responseErr = json . Unmarshal ( body , & responseBody )
201
185
if responseErr != nil {
202
186
logger .V (logutil .DEFAULT ).Error (responseErr , "Error unmarshaling request body" )
203
187
}
204
- // Body stream complete. Close the reader pipe.
205
- reader .Close ()
206
188
207
189
reqCtx , responseErr = s .HandleResponseBody (ctx , reqCtx , responseBody )
208
190
if responseErr != nil {
0 commit comments