Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support full duplex streaming #450

Merged
merged 18 commits into from
Mar 6, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions cmd/epp/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,11 @@ func run() error {
flag.Parse()
initLogging(&opts)

useStreamingServer, err := strconv.ParseBool(os.Getenv("USE_STREAMING"))
if err != nil {
setupLog.Error(err, "Failed to parse env var USE_STREAMING, defaulting to false")
}

// Validate flags
if err := validateFlags(); err != nil {
setupLog.Error(err, "Failed to validate flags")
Expand Down Expand Up @@ -153,6 +158,7 @@ func run() error {
SecureServing: *secureServing,
CertPath: *certPath,
Provider: provider,
UseStreaming: useStreamingServer,
}
if err := serverRunner.SetupWithManager(ctx, mgr); err != nil {
setupLog.Error(err, "Failed to setup ext-proc controllers")
Expand Down
5 changes: 4 additions & 1 deletion config/manifests/ext_proc.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -77,11 +77,14 @@ spec:
- -poolName
- "my-pool"
- -v
- "3"
- "4"
- -grpcPort
- "9002"
- -grpcHealthPort
- "9003"
env:
- name: USE_STREAMING
value: "false"
ports:
- containerPort: 9002
- containerPort: 9003
Expand Down
1 change: 1 addition & 0 deletions config/manifests/gateway/extension_policy.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ spec:
name: inference-gateway-ext-proc
port: 9002
processingMode:
allowModeOverride: true
request:
body: Buffered
response:
Expand Down
33 changes: 32 additions & 1 deletion config/manifests/gateway/patch_policy.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,41 @@ spec:
typed_config:
"@type": "type.googleapis.com/envoy.extensions.transport_sockets.tls.v3.UpstreamTlsContext"
common_tls_context: {}

- type: "type.googleapis.com/envoy.config.route.v3.RouteConfiguration"
name: default/inference-gateway/llm-gw
operation:
op: replace
path: "/virtual_hosts/0/routes/0/route/cluster"
value: original_destination_cluster
# Uncomment the below to enable full duplex streaming
# - type: "type.googleapis.com/envoy.config.listener.v3.Listener"
# name: "default/inference-gateway/llm-gw"
# operation:
# op: add
# path: "/default_filter_chain/filters/0/typed_config/http_filters/0/typed_config/processing_mode/request_body_mode"
# value: FULL_DUPLEX_STREAMED
# - type: "type.googleapis.com/envoy.config.listener.v3.Listener"
# name: "default/inference-gateway/llm-gw"
# operation:
# op: add
# path: "/default_filter_chain/filters/0/typed_config/http_filters/0/typed_config/processing_mode/request_trailer_mode"
# value: SEND
# - type: "type.googleapis.com/envoy.config.listener.v3.Listener"
# name: "default/inference-gateway/llm-gw"
# operation:
# op: add
# path: "/default_filter_chain/filters/0/typed_config/http_filters/0/typed_config/processing_mode/response_body_mode"
# value: FULL_DUPLEX_STREAMED
# - type: "type.googleapis.com/envoy.config.listener.v3.Listener"
# name: "default/inference-gateway/llm-gw"
# operation:
# op: replace
# path: "/default_filter_chain/filters/0/typed_config/http_filters/0/typed_config/processing_mode/response_trailer_mode"
# value: SEND
# - type: "type.googleapis.com/envoy.config.listener.v3.Listener"
# name: "default/inference-gateway/llm-gw"
# operation:
# op: replace
# path: "/default_filter_chain/filters/0/typed_config/http_filters/0/typed_config/processing_mode/response_header_mode"
# value: SEND

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should you set the request_header_mode to SEND as well?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It actually already is via:

We just need to populate the request field for it to be included as per: https://gateway.envoyproxy.io/latest/api/extension_types/#extprocprocessingmode

I suppose we could include it here for completeness though. Open to either

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But I would like to do away with this specific patch policy stuff and use EnvoyExtensionPolicy for this. I have PRs out to Envoy to support envoyproxy/gateway#5349 & envoyproxy/envoy#38578. I just need to follow up on those and get them unstuck.


104 changes: 57 additions & 47 deletions pkg/epp/handlers/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,53 +132,9 @@ func (s *Server) Process(srv extProcPb.ExternalProcessor_ProcessServer) error {

if err != nil {
logger.V(logutil.DEFAULT).Error(err, "Failed to process request", "request", req)
switch errutil.CanonicalCode(err) {
// This code can be returned by scheduler when there is no capacity for sheddable
// requests.
case errutil.InferencePoolResourceExhausted:
resp = &extProcPb.ProcessingResponse{
Response: &extProcPb.ProcessingResponse_ImmediateResponse{
ImmediateResponse: &extProcPb.ImmediateResponse{
Status: &envoyTypePb.HttpStatus{
Code: envoyTypePb.StatusCode_TooManyRequests,
},
},
},
}
// This code can be returned by when EPP processes the request and run into server-side errors.
case errutil.Internal:
resp = &extProcPb.ProcessingResponse{
Response: &extProcPb.ProcessingResponse_ImmediateResponse{
ImmediateResponse: &extProcPb.ImmediateResponse{
Status: &envoyTypePb.HttpStatus{
Code: envoyTypePb.StatusCode_InternalServerError,
},
},
},
}
// This code can be returned when users provide invalid json request.
case errutil.BadRequest:
resp = &extProcPb.ProcessingResponse{
Response: &extProcPb.ProcessingResponse_ImmediateResponse{
ImmediateResponse: &extProcPb.ImmediateResponse{
Status: &envoyTypePb.HttpStatus{
Code: envoyTypePb.StatusCode_BadRequest,
},
},
},
}
case errutil.BadConfiguration:
resp = &extProcPb.ProcessingResponse{
Response: &extProcPb.ProcessingResponse_ImmediateResponse{
ImmediateResponse: &extProcPb.ImmediateResponse{
Status: &envoyTypePb.HttpStatus{
Code: envoyTypePb.StatusCode_NotFound,
},
},
},
}
default:
return status.Errorf(status.Code(err), "failed to handle request: %v", err)
resp, err = BuildErrResponse(err)
if err != nil {
return err
}
}

Expand All @@ -190,6 +146,60 @@ func (s *Server) Process(srv extProcPb.ExternalProcessor_ProcessServer) error {
}
}

func BuildErrResponse(err error) (*extProcPb.ProcessingResponse, error) {
var resp *extProcPb.ProcessingResponse

switch errutil.CanonicalCode(err) {
// This code can be returned by scheduler when there is no capacity for sheddable
// requests.
case errutil.InferencePoolResourceExhausted:
resp = &extProcPb.ProcessingResponse{
Response: &extProcPb.ProcessingResponse_ImmediateResponse{
ImmediateResponse: &extProcPb.ImmediateResponse{
Status: &envoyTypePb.HttpStatus{
Code: envoyTypePb.StatusCode_TooManyRequests,
},
},
},
}
// This code can be returned by when EPP processes the request and run into server-side errors.
case errutil.Internal:
resp = &extProcPb.ProcessingResponse{
Response: &extProcPb.ProcessingResponse_ImmediateResponse{
ImmediateResponse: &extProcPb.ImmediateResponse{
Status: &envoyTypePb.HttpStatus{
Code: envoyTypePb.StatusCode_InternalServerError,
},
},
},
}
// This code can be returned when users provide invalid json request.
case errutil.BadRequest:
resp = &extProcPb.ProcessingResponse{
Response: &extProcPb.ProcessingResponse_ImmediateResponse{
ImmediateResponse: &extProcPb.ImmediateResponse{
Status: &envoyTypePb.HttpStatus{
Code: envoyTypePb.StatusCode_BadRequest,
},
},
},
}
case errutil.BadConfiguration:
resp = &extProcPb.ProcessingResponse{
Response: &extProcPb.ProcessingResponse_ImmediateResponse{
ImmediateResponse: &extProcPb.ImmediateResponse{
Status: &envoyTypePb.HttpStatus{
Code: envoyTypePb.StatusCode_NotFound,
},
},
},
}
default:
return nil, status.Errorf(status.Code(err), "failed to handle request: %v", err)
}
return resp, nil
}

// RequestContext stores context information during the life time of an HTTP request.
type RequestContext struct {
TargetPod string
Expand Down
Loading