Skip to content

Commit d94680a

Browse files
committed
Initial working hard code of duplex streaming
1 parent ff5ab61 commit d94680a

File tree

4 files changed

+309
-15
lines changed

4 files changed

+309
-15
lines changed

config/manifests/ext_proc.yaml

+1-1
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ spec:
7171
spec:
7272
containers:
7373
- name: inference-gateway-ext-proc
74-
image: us-central1-docker.pkg.dev/k8s-staging-images/gateway-api-inference-extension/epp:main
74+
image: us-east1-docker.pkg.dev/kfswain-gke-dev/test-repo/ext-proc:test-feb-32
7575
imagePullPolicy: Always
7676
args:
7777
- -poolName

config/manifests/gateway/patch_policy.yaml

+12-13
Original file line numberDiff line numberDiff line change
@@ -54,16 +54,15 @@ spec:
5454
op: replace
5555
path: "/virtual_hosts/0/routes/0/route/cluster"
5656
value: original_destination_cluster
57-
# - type: "type.googleapis.com/envoy.config.listener.v3.Listener"
58-
# name: "default/inference-gateway/llm-gw"
59-
# operation:
60-
# op: add
61-
# path: "/default_filter_chain/filters/0/typed_config/http_filters/0/typed_config/processing_mode/request_body_mode"
62-
# #value: FULL_DUPLEX_STREAMED
63-
# value: BUFFERED
64-
# - type: "type.googleapis.com/envoy.config.listener.v3.Listener"
65-
# name: "default/inference-gateway/llm-gw"
66-
# operation:
67-
# op: add
68-
# path: "/default_filter_chain/filters/0/typed_config/http_filters/0/typed_config/processing_mode/request_trailer_mode"
69-
# value: SEND
57+
- type: "type.googleapis.com/envoy.config.listener.v3.Listener"
58+
name: "default/inference-gateway/llm-gw"
59+
operation:
60+
op: add
61+
path: "/default_filter_chain/filters/0/typed_config/http_filters/0/typed_config/processing_mode/request_body_mode"
62+
value: FULL_DUPLEX_STREAMED
63+
- type: "type.googleapis.com/envoy.config.listener.v3.Listener"
64+
name: "default/inference-gateway/llm-gw"
65+
operation:
66+
op: add
67+
path: "/default_filter_chain/filters/0/typed_config/http_filters/0/typed_config/processing_mode/request_trailer_mode"
68+
value: SEND

pkg/epp/handlers/streamingserver.go

+295
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,295 @@
1+
package handlers
2+
3+
import (
4+
"io"
5+
"strconv"
6+
7+
configPb "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
8+
extProcPb "github.com/envoyproxy/go-control-plane/envoy/service/ext_proc/v3"
9+
"google.golang.org/grpc/codes"
10+
"google.golang.org/grpc/status"
11+
"google.golang.org/protobuf/types/known/structpb"
12+
"sigs.k8s.io/controller-runtime/pkg/log"
13+
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datastore"
14+
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/metrics"
15+
errutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/error"
16+
logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/logging"
17+
)
18+
19+
func NewStreamingServer(scheduler Scheduler, destinationEndpointHintMetadataNamespace, destinationEndpointHintKey string, datastore datastore.Datastore) *StreamingServer {
20+
return &StreamingServer{
21+
scheduler: scheduler,
22+
destinationEndpointHintMetadataNamespace: destinationEndpointHintMetadataNamespace,
23+
destinationEndpointHintKey: destinationEndpointHintKey,
24+
datastore: datastore,
25+
}
26+
}
27+
28+
type StreamingServer struct {
29+
scheduler Scheduler
30+
// The key of the header to specify the target pod address. This value needs to match Envoy
31+
// configuration.
32+
destinationEndpointHintKey string
33+
// The key acting as the outer namespace struct in the metadata extproc response to communicate
34+
// back the picked endpoints.
35+
destinationEndpointHintMetadataNamespace string
36+
datastore datastore.Datastore
37+
}
38+
39+
func (s *StreamingServer) Process(srv extProcPb.ExternalProcessor_ProcessServer) error {
40+
ctx := srv.Context()
41+
logger := log.FromContext(ctx)
42+
loggerVerbose := logger.V(logutil.VERBOSE)
43+
loggerVerbose.Info("Processing")
44+
45+
// Create request context to share states during life time of an HTTP request.
46+
// See https://github.com/envoyproxy/envoy/issues/17540.
47+
reqCtx := &RequestContext{}
48+
49+
// Create variable for error handling as each request should only report once for
50+
// error metric. This doesn't cover the error "Cannot receive stream request" because
51+
// such error might happen even the response is processed.
52+
var err error
53+
defer func(error) {
54+
if reqCtx.ResponseStatusCode != "" {
55+
metrics.RecordRequestErrCounter(reqCtx.Model, reqCtx.ResolvedTargetModel, reqCtx.ResponseStatusCode)
56+
} else if err != nil {
57+
metrics.RecordRequestErrCounter(reqCtx.Model, reqCtx.ResolvedTargetModel, errutil.CanonicalCode(err))
58+
}
59+
}(err)
60+
beeps, boops, bops := 0, 0, 0
61+
for {
62+
select {
63+
case <-ctx.Done():
64+
return ctx.Err()
65+
default:
66+
}
67+
68+
req, recvErr := srv.Recv()
69+
if recvErr == io.EOF || status.Code(recvErr) == codes.Canceled {
70+
return nil
71+
}
72+
if recvErr != nil {
73+
// This error occurs very frequently, though it doesn't seem to have any impact.
74+
// TODO Figure out if we can remove this noise.
75+
loggerVerbose.Error(err, "Cannot receive stream request")
76+
return status.Errorf(codes.Unknown, "cannot receive stream request: %v", err)
77+
}
78+
79+
// var resp *extProcPb.ProcessingResponse
80+
// switch v := req.Request.(type) {
81+
// case *extProcPb.ProcessingRequest_RequestHeaders:
82+
// reqCtx.RequestReceivedTimestamp = time.Now()
83+
// resp = HandleRequestHeaders(ctx, reqCtx, req)
84+
// loggerVerbose.Info("Request context after HandleRequestHeaders", "context", reqCtx)
85+
// case *extProcPb.ProcessingRequest_RequestBody:
86+
// loggerVerbose.Info("[TESTING] Request body before entering func", "body", req.GetRequestBody())
87+
88+
// resp, err = s.HandleRequestBody(ctx, reqCtx, req)
89+
// if !reqCtx.EndofStream {
90+
// break
91+
// }
92+
// if err == nil {
93+
// metrics.RecordRequestCounter(reqCtx.Model, reqCtx.ResolvedTargetModel)
94+
// metrics.RecordRequestSizes(reqCtx.Model, reqCtx.ResolvedTargetModel, reqCtx.RequestSize)
95+
// }
96+
// loggerVerbose.Info("Request context after HandleRequestBody", "context", reqCtx)
97+
// case *extProcPb.ProcessingRequest_ResponseHeaders:
98+
// resp, err = s.HandleResponseHeaders(ctx, reqCtx, req)
99+
// loggerVerbose.Info("Request context after HandleResponseHeaders", "context", reqCtx)
100+
// case *extProcPb.ProcessingRequest_ResponseBody:
101+
// resp, err = s.HandleResponseBody(ctx, reqCtx, req)
102+
// if err == nil && reqCtx.ResponseComplete {
103+
// reqCtx.ResponseCompleteTimestamp = time.Now()
104+
// metrics.RecordRequestLatencies(ctx, reqCtx.Model, reqCtx.ResolvedTargetModel, reqCtx.RequestReceivedTimestamp, reqCtx.ResponseCompleteTimestamp)
105+
// metrics.RecordResponseSizes(reqCtx.Model, reqCtx.ResolvedTargetModel, reqCtx.ResponseSize)
106+
// metrics.RecordInputTokens(reqCtx.Model, reqCtx.ResolvedTargetModel, reqCtx.Response.Usage.PromptTokens)
107+
// metrics.RecordOutputTokens(reqCtx.Model, reqCtx.ResolvedTargetModel, reqCtx.Response.Usage.CompletionTokens)
108+
// }
109+
// loggerVerbose.Info("Request context after HandleResponseBody", "context", reqCtx)
110+
// case *extProcPb.ProcessingRequest_RequestTrailers:
111+
// case *extProcPb.ProcessingRequest_ResponseTrailers:
112+
// default:
113+
// logger.V(logutil.DEFAULT).Error(nil, "Unknown Request type", "request", v)
114+
// return status.Error(codes.Unknown, "unknown request type")
115+
// }
116+
117+
// resp, err := s.parseError(err)
118+
// if err != nil {
119+
// // Everything is awful, run it up the stack.
120+
// logger.V(logutil.DEFAULT).Error(err, "Failed to process request", "request", req)
121+
// return err
122+
// } else if resp != nil {
123+
// // We have an immediate response we can send. Let the gateway know.
124+
// if err := srv.Send(resp); err != nil {
125+
// logger.V(logutil.DEFAULT).Error(err, "Send failed")
126+
// return status.Errorf(codes.Unknown, "failed to send response back to Envoy: %v", err)
127+
// }
128+
// }
129+
130+
// loggerVerbose.Info("Response generated", "response", resp)
131+
// if err := srv.Send(resp); err != nil {
132+
// logger.V(logutil.DEFAULT).Error(err, "Send failed")
133+
// return status.Errorf(codes.Unknown, "failed to send response back to Envoy: %v", err)
134+
// }
135+
136+
pool, err := s.datastore.PoolGet()
137+
if err != nil {
138+
return err
139+
}
140+
endpoint := "10.108.10.4" + ":" + strconv.Itoa(int(pool.Spec.TargetPortNumber))
141+
142+
targetEndpointValue := &structpb.Struct{
143+
Fields: map[string]*structpb.Value{
144+
s.destinationEndpointHintKey: {
145+
Kind: &structpb.Value_StringValue{
146+
StringValue: endpoint,
147+
},
148+
},
149+
},
150+
}
151+
dynamicMetadata := targetEndpointValue
152+
if s.destinationEndpointHintMetadataNamespace != "" {
153+
// If a namespace is defined, wrap the selected endpoint with that.
154+
dynamicMetadata = &structpb.Struct{
155+
Fields: map[string]*structpb.Value{
156+
s.destinationEndpointHintMetadataNamespace: {
157+
Kind: &structpb.Value_StructValue{
158+
StructValue: targetEndpointValue,
159+
},
160+
},
161+
},
162+
}
163+
}
164+
switch v := req.Request.(type) {
165+
case *extProcPb.ProcessingRequest_RequestHeaders:
166+
beeps++
167+
loggerVerbose.Info("BEEP", "beeps", beeps, "boops", boops, "bops", bops)
168+
headerResp := &extProcPb.ProcessingResponse{
169+
Response: &extProcPb.ProcessingResponse_RequestHeaders{
170+
RequestHeaders: &extProcPb.HeadersResponse{
171+
Response: &extProcPb.CommonResponse{
172+
HeaderMutation: &extProcPb.HeaderMutation{
173+
SetHeaders: []*configPb.HeaderValueOption{
174+
{
175+
Header: &configPb.HeaderValue{
176+
Key: s.destinationEndpointHintKey,
177+
RawValue: []byte(endpoint),
178+
},
179+
},
180+
},
181+
},
182+
},
183+
},
184+
},
185+
}
186+
if err := srv.Send(headerResp); err != nil {
187+
logger.V(logutil.DEFAULT).Error(err, "Send failed")
188+
return status.Errorf(codes.Unknown, "failed to send response back to Envoy: %v", err)
189+
}
190+
case *extProcPb.ProcessingRequest_RequestBody:
191+
boops++
192+
loggerVerbose.Info("BOOP", "beeps", beeps, "boops", boops, "bops", bops)
193+
bodyResp := &extProcPb.ProcessingResponse{
194+
// The Endpoint Picker supports two approaches to communicating the target endpoint, as a request header
195+
// and as an unstructure ext-proc response metadata key/value pair. This enables different integration
196+
// options for gateway providers.
197+
Response: &extProcPb.ProcessingResponse_RequestBody{
198+
RequestBody: &extProcPb.BodyResponse{
199+
Response: &extProcPb.CommonResponse{
200+
BodyMutation: &extProcPb.BodyMutation{
201+
Mutation: &extProcPb.BodyMutation_StreamedResponse{
202+
StreamedResponse: &extProcPb.StreamedBodyResponse{
203+
Body: v.RequestBody.Body,
204+
EndOfStream: v.RequestBody.EndOfStream,
205+
},
206+
},
207+
},
208+
},
209+
},
210+
},
211+
DynamicMetadata: dynamicMetadata,
212+
}
213+
if err := srv.Send(bodyResp); err != nil {
214+
logger.V(logutil.DEFAULT).Error(err, "Send failed")
215+
return status.Errorf(codes.Unknown, "failed to send response back to Envoy: %v", err)
216+
}
217+
case *extProcPb.ProcessingRequest_RequestTrailers:
218+
bops++
219+
loggerVerbose.Info("BOP", "beeps", beeps, "boops", boops, "bops", bops)
220+
trailerResp := &extProcPb.ProcessingResponse{
221+
Response: &extProcPb.ProcessingResponse_RequestTrailers{
222+
RequestTrailers: &extProcPb.TrailersResponse{},
223+
},
224+
}
225+
if err := srv.Send(trailerResp); err != nil {
226+
logger.V(logutil.DEFAULT).Error(err, "Send failed")
227+
return status.Errorf(codes.Unknown, "failed to send response back to Envoy: %v", err)
228+
}
229+
}
230+
}
231+
}
232+
233+
// func (s *StreamingServer) parseError(err error) (*extProcPb.ProcessingResponse, error) {
234+
// if err != nil {
235+
// return nil, nil
236+
// }
237+
238+
// resp := &extProcPb.ProcessingResponse{}
239+
// switch errutil.CanonicalCode(err) {
240+
// // This code can be returned by scheduler when there is no capacity for sheddable
241+
// // requests.
242+
// case errutil.InferencePoolResourceExhausted:
243+
// resp = &extProcPb.ProcessingResponse{
244+
// Response: &extProcPb.ProcessingResponse_ImmediateResponse{
245+
// ImmediateResponse: &extProcPb.ImmediateResponse{
246+
// Status: &envoyTypePb.HttpStatus{
247+
// Code: envoyTypePb.StatusCode_TooManyRequests,
248+
// },
249+
// },
250+
// },
251+
// }
252+
// // This code can be returned by when EPP processes the request and run into server-side errors.
253+
// case errutil.Internal:
254+
// resp = &extProcPb.ProcessingResponse{
255+
// Response: &extProcPb.ProcessingResponse_ImmediateResponse{
256+
// ImmediateResponse: &extProcPb.ImmediateResponse{
257+
// Status: &envoyTypePb.HttpStatus{
258+
// Code: envoyTypePb.StatusCode_InternalServerError,
259+
// },
260+
// },
261+
// },
262+
// }
263+
// // This code can be returned when users provide invalid json request.
264+
// case errutil.BadRequest:
265+
// resp = &extProcPb.ProcessingResponse{
266+
// Response: &extProcPb.ProcessingResponse_ImmediateResponse{
267+
// ImmediateResponse: &extProcPb.ImmediateResponse{
268+
// Status: &envoyTypePb.HttpStatus{
269+
// Code: envoyTypePb.StatusCode_BadRequest,
270+
// },
271+
// },
272+
// },
273+
// }
274+
// case errutil.BadConfiguration:
275+
// resp = &extProcPb.ProcessingResponse{
276+
// Response: &extProcPb.ProcessingResponse_ImmediateResponse{
277+
// ImmediateResponse: &extProcPb.ImmediateResponse{
278+
// Status: &envoyTypePb.HttpStatus{
279+
// Code: envoyTypePb.StatusCode_NotFound,
280+
// },
281+
// },
282+
// },
283+
// }
284+
// default:
285+
// return nil, status.Errorf(status.Code(err), "failed to handle request: %v", err)
286+
// }
287+
// return resp, nil
288+
// }
289+
290+
// type requestMutations struct {
291+
// requestHeaderResponse extProcPb.ProcessingResponse_RequestHeaders
292+
// requestBodyResponse extProcPb.ProcessingResponse_RequestBody
293+
// requestTrailerReponse extProcPb.ProcessingResponse_RequestTrailers
294+
// responseToSend extProcPb.ProcessingResponse
295+
// }

pkg/epp/server/runserver.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -151,7 +151,7 @@ func (r *ExtProcServerRunner) AsRunnable(logger logr.Logger) manager.Runnable {
151151
}
152152
extProcPb.RegisterExternalProcessorServer(
153153
srv,
154-
handlers.NewServer(scheduling.NewScheduler(r.Datastore), r.DestinationEndpointHintMetadataNamespace, r.DestinationEndpointHintKey, r.Datastore),
154+
handlers.NewStreamingServer(scheduling.NewScheduler(r.Datastore), r.DestinationEndpointHintMetadataNamespace, r.DestinationEndpointHintKey, r.Datastore),
155155
)
156156

157157
// Forward to the gRPC runnable.

0 commit comments

Comments
 (0)