@@ -18,30 +18,36 @@ package handlers
18
18
19
19
import (
20
20
"context"
21
+ "encoding/json"
21
22
"errors"
22
23
"io"
23
24
24
25
extProcPb "github.com/envoyproxy/go-control-plane/envoy/service/ext_proc/v3"
26
+ "github.com/go-logr/logr"
25
27
"google.golang.org/grpc/codes"
26
28
"google.golang.org/grpc/status"
27
29
"sigs.k8s.io/controller-runtime/pkg/log"
28
30
logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/logging"
29
31
)
30
32
31
- func NewServer () * Server {
32
- return & Server {}
33
+ func NewServer (streaming bool ) * Server {
34
+ return & Server {streaming : streaming }
33
35
}
34
36
35
37
// Server implements the Envoy external processing server.
36
38
// https://www.envoyproxy.io/docs/envoy/latest/api-v3/service/ext_proc/v3/external_processor.proto
37
- type Server struct {}
39
+ type Server struct {
40
+ streaming bool
41
+ }
38
42
39
43
func (s * Server ) Process (srv extProcPb.ExternalProcessor_ProcessServer ) error {
40
44
ctx := srv .Context ()
41
45
logger := log .FromContext (ctx )
42
46
loggerVerbose := logger .V (logutil .VERBOSE )
43
47
loggerVerbose .Info ("Processing" )
44
48
49
+ reader , writer := io .Pipe ()
50
+
45
51
for {
46
52
select {
47
53
case <- ctx .Done ():
@@ -61,12 +67,19 @@ func (s *Server) Process(srv extProcPb.ExternalProcessor_ProcessServer) error {
61
67
}
62
68
63
69
var resp * extProcPb.ProcessingResponse
70
+ var requestBody []byte
64
71
var err error
65
72
switch v := req .Request .(type ) {
66
73
case * extProcPb.ProcessingRequest_RequestHeaders :
67
- resp , err = s .HandleRequestHeaders (req .GetRequestHeaders ())
74
+ if ! s .streaming {
75
+ // If streaming, then headers are handled when processing request body.
76
+ resp , err = s .HandleRequestHeaders (req .GetRequestHeaders ())
77
+ } else {
78
+ loggerVerbose .Info ("Received headers, passing off header processing until body arrives..." )
79
+ }
68
80
case * extProcPb.ProcessingRequest_RequestBody :
69
- resp , err = s .HandleRequestBody (ctx , req .GetRequestBody ())
81
+ loggerVerbose .Info ("Incoming body chunk" , "body" , string (v .RequestBody .Body ), "EoS" , v .RequestBody .EndOfStream )
82
+ resp , requestBody , err = s .processRequestBody (ctx , req .GetRequestBody (), writer , reader , logger )
70
83
case * extProcPb.ProcessingRequest_ResponseHeaders :
71
84
resp , err = s .HandleResponseHeaders (req .GetResponseHeaders ())
72
85
case * extProcPb.ProcessingRequest_ResponseBody :
@@ -81,10 +94,84 @@ func (s *Server) Process(srv extProcPb.ExternalProcessor_ProcessServer) error {
81
94
return status .Errorf (status .Code (err ), "failed to handle request: %v" , err )
82
95
}
83
96
84
- loggerVerbose .Info ("Response generated" , "response" , resp )
85
- if err := srv .Send (resp ); err != nil {
86
- logger .V (logutil .DEFAULT ).Error (err , "Send failed" )
87
- return status .Errorf (codes .Unknown , "failed to send response back to Envoy: %v" , err )
97
+ if resp != nil {
98
+ loggerVerbose .Info ("Response generated" , "response" , resp )
99
+ if err := srv .Send (resp ); err != nil {
100
+ logger .V (logutil .DEFAULT ).Error (err , "Send failed" )
101
+ return status .Errorf (codes .Unknown , "failed to send response back to Envoy: %v" , err )
102
+ }
103
+
104
+ if s .streaming {
105
+ bodyResp := & extProcPb.ProcessingResponse {
106
+ Response : & extProcPb.ProcessingResponse_RequestBody {
107
+ RequestBody : & extProcPb.BodyResponse {
108
+ Response : & extProcPb.CommonResponse {
109
+ BodyMutation : & extProcPb.BodyMutation {
110
+ Mutation : & extProcPb.BodyMutation_StreamedResponse {
111
+ StreamedResponse : & extProcPb.StreamedBodyResponse {
112
+ Body : requestBody ,
113
+ EndOfStream : true ,
114
+ },
115
+ },
116
+ },
117
+ },
118
+ },
119
+ },
120
+ }
121
+ loggerVerbose .Info ("Response generated" , "response" , bodyResp )
122
+ if err := srv .Send (bodyResp ); err != nil {
123
+ logger .V (logutil .DEFAULT ).Error (err , "Send failed" )
124
+ return status .Errorf (codes .Unknown , "failed to send response back to Envoy: %v" , err )
125
+ }
126
+ }
88
127
}
89
128
}
90
129
}
130
+
131
+ func (s * Server ) processRequestBody (ctx context.Context , body * extProcPb.HttpBody , bufferWriter * io.PipeWriter , bufferReader * io.PipeReader , logger logr.Logger ) (* extProcPb.ProcessingResponse , []byte , error ) {
132
+ loggerVerbose := logger .V (logutil .VERBOSE )
133
+
134
+ var requestBody map [string ]interface {}
135
+ if s .streaming {
136
+ // In the stream case, we can receive multiple request bodies.
137
+ // To buffer the full message, we create a goroutine with a writer.Write()
138
+ // call, which will block until the corresponding reader reads from it.
139
+ // We do not read until we receive the EndofStream signal, and then
140
+ // decode the entire JSON body.
141
+ if ! body .EndOfStream {
142
+ go func () {
143
+ loggerVerbose .Info ("Writing to stream buffer" )
144
+ _ , err := bufferWriter .Write (body .Body )
145
+ if err != nil {
146
+ logger .V (logutil .DEFAULT ).Error (err , "Error populating writer" )
147
+ }
148
+ }()
149
+
150
+ return nil , nil , nil
151
+ }
152
+
153
+ if body .EndOfStream {
154
+ loggerVerbose .Info ("Flushing stream buffer" )
155
+ decoder := json .NewDecoder (bufferReader )
156
+ if err := decoder .Decode (& requestBody ); err != nil {
157
+ logger .V (logutil .DEFAULT ).Error (err , "Error unmarshaling request body" )
158
+ }
159
+ bufferReader .Close ()
160
+ }
161
+ } else {
162
+ if err := json .Unmarshal (body .GetBody (), & requestBody ); err != nil {
163
+ return nil , nil , err
164
+ }
165
+ }
166
+
167
+ requestBodyResp , err := s .HandleRequestBody (ctx , requestBody )
168
+ if err != nil {
169
+ return nil , nil , err
170
+ }
171
+
172
+ requestBodyBytes , err := json .Marshal (requestBody )
173
+ if err != nil {
174
+ return nil , nil , err
175
+ }
176
+ return requestBodyResp , requestBodyBytes , nil
177
+ }
0 commit comments