-
Notifications
You must be signed in to change notification settings - Fork 53
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Support full duplex streaming #450
Conversation
[APPROVALNOTIFIER] This PR is APPROVED This pull-request has been approved by: kfswain The full list of commands accepted by this bot can be found here. The pull request process is described here
Needs approval from an approver in each of these files:
Approvers can indicate their approval by writing |
✅ Deploy Preview for gateway-api-inference-extension ready!
To edit notification comments on pull requests, go to your Netlify site configuration. |
# operation: | ||
# op: replace | ||
# path: "/default_filter_chain/filters/0/typed_config/http_filters/0/typed_config/processing_mode/response_header_mode" | ||
# value: SEND |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should you set the request_header_mode to SEND as well?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It actually already is via:
body: Buffered |
We just need to populate the request field for it to be included as per: https://gateway.envoyproxy.io/latest/api/extension_types/#extprocprocessingmode
I suppose we could include it here for completeness though. Open to either
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But I would like to do away with this specific patch policy stuff and use EnvoyExtensionPolicy
for this. I have PRs out to Envoy to support envoyproxy/gateway#5349 & envoyproxy/envoy#38578. I just need to follow up on those and get them unstuck.
} | ||
|
||
switch v := req.Request.(type) { | ||
case *extProcPb.ProcessingRequest_RequestHeaders: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A few questions:
-
Does the streaming server buffer the entire body, or buffer just a portion of the body, then send the response?
-
Do you have some integration tests with Envoy <-> StreamingServer to test the end-to-end functionalities?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- Currently we are buffering the whole body as it was faster to implement, but a follow up is to stream the response back.
- Not yet. I've been testing this all manually on my own cluster. I also have that as a follow up
EasyCLA started happy hour a little early today: communitybridge/easycla#4605 |
/check-cla |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks Kellen, mostly asking questions
config/manifests/ext_proc.yaml
Outdated
- -grpcPort | ||
- "9002" | ||
- -grpcHealthPort | ||
- "9003" | ||
env: | ||
- name: USE_STREAMING | ||
value: "true" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we use_streaming even if the ext-proc is setup in buffered mode?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I tried it, and it didn't work out of the box. I think because we have to use the streaming response body.
We can probably do some generalizing (i.e. if we recieve a stream in, we can assume stream out)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok, then we probably want to set this to false then since we are commenting out the streaming part in the patch.
type StreamRequestState int | ||
|
||
const ( | ||
RequestReceived StreamRequestState = 0 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you comment if those are states that we defined or states part of the ext-proc streaming protocol?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These are states I defined. But based on the protocol we need to follow. I used the states because simple nil checking of the response object we need to send back is not enough. (Imagine the case where we receive a stream chunk, but that chunk doesnt have the model
field. So even though the bodyResponse
is non-nil, we cant send it yet b/c we havent sent the header yet)
case *extProcPb.ProcessingRequest_RequestBody: | ||
loggerVerbose.Info("Incoming body chunk", "body", string(v.RequestBody.Body), "EoS", v.RequestBody.EndOfStream) | ||
go func() { | ||
_, err := writer.Write(v.RequestBody.Body) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this were we block to get the whole stream?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, each stream chunk that comes in is passed to the writer, the writer then blocks until it's read. Once we get an EndofStream
we let the decoder use the reader to read them all, and close the reader (which closes the corresponding writer(s)).
For a true stream, we would need to leverage decode.Token()
and stream back what we can. Although since we modify the body it may be challenging to not buffer.
config/manifests/ext_proc.yaml
Outdated
- -grpcPort | ||
- "9002" | ||
- -grpcHealthPort | ||
- "9003" | ||
env: | ||
- name: USE_STREAMING | ||
value: "true" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok, then we probably want to set this to false then since we are commenting out the streaming part in the patch.
// To buffer the full message, we create a goroutine with a writer.Write() | ||
// call, which will block until the corresponding reader reads from it. | ||
// We do not read until we receive the EndofStream signal, and then | ||
// decode the entire JSON body. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is there a sequence id on the chunks that we need to adhere to when writing to the pipe? is it possible that we receive the chunks out of order?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think all the web protocol portion is handled before the Process()
func is called, as I didn't see any race condition issues related to this with my testing.
Additionally, the only fields exposed in the RequestBody are Body
and EndofStream
: https://github.com/envoyproxy/go-control-plane/blob/66fc0a3b55b04be5eeb362fb9639a51845218b38/envoy/service/ext_proc/v3/external_processor.pb.go#L605
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
4fe978e
to
f65f43b
Compare
… request and response
…ementing on both servers
This looks good to me, but we need to add proper test coverage as a followup. Let me know when ready and I can tag it. |
/check-cla |
/lgtm |
I forced merged this due to EasyCLA having an outage: communitybridge/easycla#4605 That was the only blocker holding this PR. |
/easycla |
This PR supports the FULL_DUPLEX_STREAMED mode for ext-proc.
Fixes #388
This feature is currently only enabled via an env_var as it is still experimental.
Follow ups:
decoder.Token()
to read JSON tokens and stream back the body as it comes in.