@@ -58,9 +58,9 @@ func (s *StreamingServer) Process(srv extProcPb.ExternalProcessor_ProcessServer)
58
58
decoder := json .NewDecoder (reader )
59
59
60
60
var requestBody , responseBody map [string ]interface {}
61
- // Create variable for error handling as each request should only report once for
62
- // error metric . This doesn't cover the error "Cannot receive stream request" because
63
- // such error might happen even the response is processed.
61
+ // Create error handling var as each request should only report once for
62
+ // error metrics . This doesn't cover the error "Cannot receive stream request" because
63
+ // such errors might happen even though response is processed.
64
64
var err error
65
65
defer func (error ) {
66
66
if reqCtx .ResponseStatusCode != "" {
@@ -93,6 +93,11 @@ func (s *StreamingServer) Process(srv extProcPb.ExternalProcessor_ProcessServer)
93
93
// Do nothing. Header info is handled in the HandleRequestBody func
94
94
case * extProcPb.ProcessingRequest_RequestBody :
95
95
loggerVerbose .Info ("Incoming body chunk" , "body" , string (v .RequestBody .Body ), "EoS" , v .RequestBody .EndOfStream )
96
+ // In the stream case, we can receive multiple request bodies.
97
+ // To buffer the full message, we create a goroutine with a writer.Write()
98
+ // call, which will block until the corresponding reader reads from it.
99
+ // We do not read until we receive the EndofStream signal, and then
100
+ // decode the entire JSON body.
96
101
go func () {
97
102
_ , err := writer .Write (v .RequestBody .Body )
98
103
if err != nil {
@@ -260,7 +265,6 @@ type StreamingRequestContext struct {
260
265
Model string
261
266
ResolvedTargetModel string
262
267
RequestState StreamRequestState
263
- EndOfStream bool
264
268
RequestReceivedTimestamp time.Time
265
269
ResponseCompleteTimestamp time.Time
266
270
RequestSize int
0 commit comments