Skip to content

Commit 70965a0

Browse files
authored
Support full duplex streaming (#450)
This PR supports the FULL_DUPLEX_STREAMED mode for ext-proc.
1 parent 0aa142d commit 70965a0

File tree

7 files changed

+613
-50
lines changed

7 files changed

+613
-50
lines changed

cmd/epp/main.go

+6
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,11 @@ func run() error {
110110
flag.Parse()
111111
initLogging(&opts)
112112

113+
useStreamingServer, err := strconv.ParseBool(os.Getenv("USE_STREAMING"))
114+
if err != nil {
115+
setupLog.Error(err, "Failed to parse env var USE_STREAMING, defaulting to false")
116+
}
117+
113118
// Validate flags
114119
if err := validateFlags(); err != nil {
115120
setupLog.Error(err, "Failed to validate flags")
@@ -153,6 +158,7 @@ func run() error {
153158
SecureServing: *secureServing,
154159
CertPath: *certPath,
155160
Provider: provider,
161+
UseStreaming: useStreamingServer,
156162
}
157163
if err := serverRunner.SetupWithManager(ctx, mgr); err != nil {
158164
setupLog.Error(err, "Failed to setup ext-proc controllers")

config/manifests/ext_proc.yaml

+4-1
Original file line numberDiff line numberDiff line change
@@ -77,11 +77,14 @@ spec:
7777
- -poolName
7878
- "my-pool"
7979
- -v
80-
- "3"
80+
- "4"
8181
- -grpcPort
8282
- "9002"
8383
- -grpcHealthPort
8484
- "9003"
85+
env:
86+
- name: USE_STREAMING
87+
value: "false"
8588
ports:
8689
- containerPort: 9002
8790
- containerPort: 9003

config/manifests/gateway/extension_policy.yaml

+1
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ spec:
1111
name: inference-gateway-ext-proc
1212
port: 9002
1313
processingMode:
14+
allowModeOverride: true
1415
request:
1516
body: Buffered
1617
response:

config/manifests/gateway/patch_policy.yaml

+32-1
Original file line numberDiff line numberDiff line change
@@ -48,10 +48,41 @@ spec:
4848
typed_config:
4949
"@type": "type.googleapis.com/envoy.extensions.transport_sockets.tls.v3.UpstreamTlsContext"
5050
common_tls_context: {}
51-
5251
- type: "type.googleapis.com/envoy.config.route.v3.RouteConfiguration"
5352
name: default/inference-gateway/llm-gw
5453
operation:
5554
op: replace
5655
path: "/virtual_hosts/0/routes/0/route/cluster"
5756
value: original_destination_cluster
57+
# Uncomment the below to enable full duplex streaming
58+
# - type: "type.googleapis.com/envoy.config.listener.v3.Listener"
59+
# name: "default/inference-gateway/llm-gw"
60+
# operation:
61+
# op: add
62+
# path: "/default_filter_chain/filters/0/typed_config/http_filters/0/typed_config/processing_mode/request_body_mode"
63+
# value: FULL_DUPLEX_STREAMED
64+
# - type: "type.googleapis.com/envoy.config.listener.v3.Listener"
65+
# name: "default/inference-gateway/llm-gw"
66+
# operation:
67+
# op: add
68+
# path: "/default_filter_chain/filters/0/typed_config/http_filters/0/typed_config/processing_mode/request_trailer_mode"
69+
# value: SEND
70+
# - type: "type.googleapis.com/envoy.config.listener.v3.Listener"
71+
# name: "default/inference-gateway/llm-gw"
72+
# operation:
73+
# op: add
74+
# path: "/default_filter_chain/filters/0/typed_config/http_filters/0/typed_config/processing_mode/response_body_mode"
75+
# value: FULL_DUPLEX_STREAMED
76+
# - type: "type.googleapis.com/envoy.config.listener.v3.Listener"
77+
# name: "default/inference-gateway/llm-gw"
78+
# operation:
79+
# op: replace
80+
# path: "/default_filter_chain/filters/0/typed_config/http_filters/0/typed_config/processing_mode/response_trailer_mode"
81+
# value: SEND
82+
# - type: "type.googleapis.com/envoy.config.listener.v3.Listener"
83+
# name: "default/inference-gateway/llm-gw"
84+
# operation:
85+
# op: replace
86+
# path: "/default_filter_chain/filters/0/typed_config/http_filters/0/typed_config/processing_mode/response_header_mode"
87+
# value: SEND
88+

pkg/epp/handlers/server.go

+57-47
Original file line numberDiff line numberDiff line change
@@ -132,53 +132,9 @@ func (s *Server) Process(srv extProcPb.ExternalProcessor_ProcessServer) error {
132132

133133
if err != nil {
134134
logger.V(logutil.DEFAULT).Error(err, "Failed to process request", "request", req)
135-
switch errutil.CanonicalCode(err) {
136-
// This code can be returned by scheduler when there is no capacity for sheddable
137-
// requests.
138-
case errutil.InferencePoolResourceExhausted:
139-
resp = &extProcPb.ProcessingResponse{
140-
Response: &extProcPb.ProcessingResponse_ImmediateResponse{
141-
ImmediateResponse: &extProcPb.ImmediateResponse{
142-
Status: &envoyTypePb.HttpStatus{
143-
Code: envoyTypePb.StatusCode_TooManyRequests,
144-
},
145-
},
146-
},
147-
}
148-
// This code can be returned by when EPP processes the request and run into server-side errors.
149-
case errutil.Internal:
150-
resp = &extProcPb.ProcessingResponse{
151-
Response: &extProcPb.ProcessingResponse_ImmediateResponse{
152-
ImmediateResponse: &extProcPb.ImmediateResponse{
153-
Status: &envoyTypePb.HttpStatus{
154-
Code: envoyTypePb.StatusCode_InternalServerError,
155-
},
156-
},
157-
},
158-
}
159-
// This code can be returned when users provide invalid json request.
160-
case errutil.BadRequest:
161-
resp = &extProcPb.ProcessingResponse{
162-
Response: &extProcPb.ProcessingResponse_ImmediateResponse{
163-
ImmediateResponse: &extProcPb.ImmediateResponse{
164-
Status: &envoyTypePb.HttpStatus{
165-
Code: envoyTypePb.StatusCode_BadRequest,
166-
},
167-
},
168-
},
169-
}
170-
case errutil.BadConfiguration:
171-
resp = &extProcPb.ProcessingResponse{
172-
Response: &extProcPb.ProcessingResponse_ImmediateResponse{
173-
ImmediateResponse: &extProcPb.ImmediateResponse{
174-
Status: &envoyTypePb.HttpStatus{
175-
Code: envoyTypePb.StatusCode_NotFound,
176-
},
177-
},
178-
},
179-
}
180-
default:
181-
return status.Errorf(status.Code(err), "failed to handle request: %v", err)
135+
resp, err = BuildErrResponse(err)
136+
if err != nil {
137+
return err
182138
}
183139
}
184140

@@ -190,6 +146,60 @@ func (s *Server) Process(srv extProcPb.ExternalProcessor_ProcessServer) error {
190146
}
191147
}
192148

149+
func BuildErrResponse(err error) (*extProcPb.ProcessingResponse, error) {
150+
var resp *extProcPb.ProcessingResponse
151+
152+
switch errutil.CanonicalCode(err) {
153+
// This code can be returned by scheduler when there is no capacity for sheddable
154+
// requests.
155+
case errutil.InferencePoolResourceExhausted:
156+
resp = &extProcPb.ProcessingResponse{
157+
Response: &extProcPb.ProcessingResponse_ImmediateResponse{
158+
ImmediateResponse: &extProcPb.ImmediateResponse{
159+
Status: &envoyTypePb.HttpStatus{
160+
Code: envoyTypePb.StatusCode_TooManyRequests,
161+
},
162+
},
163+
},
164+
}
165+
// This code can be returned by when EPP processes the request and run into server-side errors.
166+
case errutil.Internal:
167+
resp = &extProcPb.ProcessingResponse{
168+
Response: &extProcPb.ProcessingResponse_ImmediateResponse{
169+
ImmediateResponse: &extProcPb.ImmediateResponse{
170+
Status: &envoyTypePb.HttpStatus{
171+
Code: envoyTypePb.StatusCode_InternalServerError,
172+
},
173+
},
174+
},
175+
}
176+
// This code can be returned when users provide invalid json request.
177+
case errutil.BadRequest:
178+
resp = &extProcPb.ProcessingResponse{
179+
Response: &extProcPb.ProcessingResponse_ImmediateResponse{
180+
ImmediateResponse: &extProcPb.ImmediateResponse{
181+
Status: &envoyTypePb.HttpStatus{
182+
Code: envoyTypePb.StatusCode_BadRequest,
183+
},
184+
},
185+
},
186+
}
187+
case errutil.BadConfiguration:
188+
resp = &extProcPb.ProcessingResponse{
189+
Response: &extProcPb.ProcessingResponse_ImmediateResponse{
190+
ImmediateResponse: &extProcPb.ImmediateResponse{
191+
Status: &envoyTypePb.HttpStatus{
192+
Code: envoyTypePb.StatusCode_NotFound,
193+
},
194+
},
195+
},
196+
}
197+
default:
198+
return nil, status.Errorf(status.Code(err), "failed to handle request: %v", err)
199+
}
200+
return resp, nil
201+
}
202+
193203
// RequestContext stores context information during the life time of an HTTP request.
194204
type RequestContext struct {
195205
TargetPod string

0 commit comments

Comments
 (0)