Skip to content

Commit a4ce5fd

Browse files
glbrnttWendellXY
authored andcommitted
Allow server handlers to send response headers directly (grpc#1599)
Motivation: The async server call context allows users to set headers which are sent when the first message is sent. In many cases this is fine, however, some use cases require the headers to be sent immediately. Modifications: - Add `sendHeaders(_:)` to the `GRPCAsyncServerCallContext` which sends headers to the client and throws if headers have already been written or it's too late to send them. Result: Headers can be sent directly from a server call handler.
1 parent 08b85f2 commit a4ce5fd

9 files changed

+252
-24
lines changed

Sources/GRPC/AsyncAwaitSupport/AsyncServerHandler/ServerHandlerStateMachine/ServerHandlerStateMachine+Actions.swift

+3-2
Original file line numberDiff line numberDiff line change
@@ -70,12 +70,13 @@ extension ServerHandlerStateMachine {
7070

7171
/// Update the metadata. It must not have been written yet.
7272
@inlinable
73-
mutating func update(_ metadata: HPACKHeaders) {
73+
mutating func update(_ metadata: HPACKHeaders) -> Bool {
7474
switch self {
7575
case .notWritten:
7676
self = .notWritten(metadata)
77+
return true
7778
case .written:
78-
assertionFailure("Metadata must not be set after it has been sent")
79+
return false
7980
}
8081
}
8182

Sources/GRPC/AsyncAwaitSupport/AsyncServerHandler/ServerHandlerStateMachine/ServerHandlerStateMachine+Draining.swift

+4-4
Original file line numberDiff line numberDiff line change
@@ -50,16 +50,16 @@ extension ServerHandlerStateMachine {
5050
@inlinable
5151
mutating func setResponseHeaders(
5252
_ metadata: HPACKHeaders
53-
) -> Self.NextStateAndOutput<Void> {
54-
self.responseHeaders.update(metadata)
55-
return .init(nextState: .draining(self))
53+
) -> Self.NextStateAndOutput<Bool> {
54+
let output = self.responseHeaders.update(metadata)
55+
return .init(nextState: .draining(self), output: output)
5656
}
5757

5858
@inlinable
5959
mutating func setResponseTrailers(
6060
_ metadata: HPACKHeaders
6161
) -> Self.NextStateAndOutput<Void> {
62-
self.responseTrailers.update(metadata)
62+
_ = self.responseTrailers.update(metadata)
6363
return .init(nextState: .draining(self))
6464
}
6565

Sources/GRPC/AsyncAwaitSupport/AsyncServerHandler/ServerHandlerStateMachine/ServerHandlerStateMachine+Finished.swift

+2-2
Original file line numberDiff line numberDiff line change
@@ -35,8 +35,8 @@ extension ServerHandlerStateMachine {
3535
@inlinable
3636
mutating func setResponseHeaders(
3737
_ headers: HPACKHeaders
38-
) -> Self.NextStateAndOutput<Void> {
39-
return .init(nextState: .finished(self))
38+
) -> Self.NextStateAndOutput<Bool> {
39+
return .init(nextState: .finished(self), output: false)
4040
}
4141

4242
@inlinable

Sources/GRPC/AsyncAwaitSupport/AsyncServerHandler/ServerHandlerStateMachine/ServerHandlerStateMachine+Handling.swift

+4-4
Original file line numberDiff line numberDiff line change
@@ -50,16 +50,16 @@ extension ServerHandlerStateMachine {
5050
@inlinable
5151
mutating func setResponseHeaders(
5252
_ metadata: HPACKHeaders
53-
) -> Self.NextStateAndOutput<Void> {
54-
self.responseHeaders.update(metadata)
55-
return .init(nextState: .handling(self))
53+
) -> Self.NextStateAndOutput<Bool> {
54+
let output = self.responseHeaders.update(metadata)
55+
return .init(nextState: .handling(self), output: output)
5656
}
5757

5858
@inlinable
5959
mutating func setResponseTrailers(
6060
_ metadata: HPACKHeaders
6161
) -> Self.NextStateAndOutput<Void> {
62-
self.responseTrailers.update(metadata)
62+
_ = self.responseTrailers.update(metadata)
6363
return .init(nextState: .handling(self))
6464
}
6565

Sources/GRPC/AsyncAwaitSupport/AsyncServerHandler/ServerHandlerStateMachine/ServerHandlerStateMachine.swift

+1-1
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ internal struct ServerHandlerStateMachine {
2727
}
2828

2929
@inlinable
30-
mutating func setResponseHeaders(_ headers: HPACKHeaders) {
30+
mutating func setResponseHeaders(_ headers: HPACKHeaders) -> Bool {
3131
switch self.state {
3232
case var .handling(handling):
3333
let nextStateAndOutput = handling.setResponseHeaders(headers)

Sources/GRPC/AsyncAwaitSupport/GRPCAsyncServerCallContext.swift

+17-2
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,21 @@ public struct GRPCAsyncServerCallContext: Sendable {
3030
Response(contextProvider: self.contextProvider)
3131
}
3232

33+
/// Notifies the client that the RPC has been accepted for processing by the server.
34+
///
35+
/// On accepting the RPC the server will send the given headers (which may be empty) along with
36+
/// any transport specific headers (such the ":status" pseudo header) to the client.
37+
///
38+
/// It is not necessary to call this function: the RPC is implicitly accepted when the first
39+
/// response message is sent, however this may be useful when clients require an early indication
40+
/// that the RPC has been accepted.
41+
///
42+
/// If the RPC has already been accepted (either implicitly or explicitly) then this function is
43+
/// a no-op.
44+
public func acceptRPC(headers: HPACKHeaders) async {
45+
await self.contextProvider.acceptRPC(headers)
46+
}
47+
3348
/// Access the ``UserInfo`` dictionary which is shared with the interceptor contexts for this RPC.
3449
///
3550
/// - Important: While ``UserInfo`` has value-semantics, this function accesses a reference
@@ -87,8 +102,8 @@ extension GRPCAsyncServerCallContext {
87102
/// Set the metadata to return at the start of the RPC.
88103
///
89104
/// - Important: If this is required it should be updated _before_ the first response is sent
90-
/// via the response stream writer. Updates must not be made after the first response has
91-
/// been sent.
105+
/// via the response stream writer. Updates must not be made after the RPC has been accepted
106+
/// or the first response has been sent otherwise this method will throw an error.
92107
public func setHeaders(_ headers: HPACKHeaders) async throws {
93108
try await self.contextProvider.setResponseHeaders(headers)
94109
}

Sources/GRPC/AsyncAwaitSupport/GRPCAsyncServerHandler.swift

+49-2
Original file line numberDiff line numberDiff line change
@@ -207,6 +207,16 @@ internal final class AsyncServerHandler<
207207
@usableFromInline
208208
internal var compressResponsesIfPossible: Bool
209209

210+
/// The interceptor pipeline does not track flushing as a separate event. The flush decision is
211+
/// included with metadata alongside each message. For the status and trailers the flush is
212+
/// implicit. For headers we track whether to flush here.
213+
///
214+
/// In most cases the flush will be delayed until the first message is flushed and this will
215+
/// remain unset. However, this may be set when the server handler
216+
/// uses ``GRPCAsyncServerCallContext/sendHeaders(_:)``.
217+
@usableFromInline
218+
internal var flushNextHeaders: Bool
219+
210220
/// A state machine for the interceptor pipeline.
211221
@usableFromInline
212222
internal private(set) var interceptorStateMachine: ServerInterceptorStateMachine
@@ -265,6 +275,7 @@ internal final class AsyncServerHandler<
265275
self.errorDelegate = context.errorDelegate
266276
self.compressionEnabledOnRPC = context.encoding.isEnabled
267277
self.compressResponsesIfPossible = true
278+
self.flushNextHeaders = false
268279
self.logger = context.logger
269280

270281
self.userInfoRef = Ref(UserInfo())
@@ -685,7 +696,9 @@ internal final class AsyncServerHandler<
685696
switch self.interceptorStateMachine.interceptedResponseMetadata() {
686697
case .forward:
687698
if let responseWriter = self.responseWriter {
688-
responseWriter.sendMetadata(metadata, flush: false, promise: promise)
699+
let flush = self.flushNextHeaders
700+
self.flushNextHeaders = false
701+
responseWriter.sendMetadata(metadata, flush: flush, promise: promise)
689702
} else if let promise = promise {
690703
promise.fail(GRPCStatus.processingError)
691704
}
@@ -747,11 +760,44 @@ extension AsyncServerHandler: AsyncServerCallContextProvider {
747760
@usableFromInline
748761
internal func setResponseHeaders(_ headers: HPACKHeaders) async throws {
749762
let completed = self.eventLoop.submit {
750-
self.handlerStateMachine.setResponseHeaders(headers)
763+
if !self.handlerStateMachine.setResponseHeaders(headers) {
764+
throw GRPCStatus(
765+
code: .failedPrecondition,
766+
message: "Tried to send response headers in an invalid state"
767+
)
768+
}
751769
}
752770
try await completed.get()
753771
}
754772

773+
@usableFromInline
774+
internal func acceptRPC(_ headers: HPACKHeaders) async {
775+
let completed = self.eventLoop.submit {
776+
guard self.handlerStateMachine.setResponseHeaders(headers) else { return }
777+
778+
// Shh,it's a lie! We don't really have a message to send but the state machine doesn't know
779+
// (or care) about that. It will, however, tell us if we can send the headers or not.
780+
switch self.handlerStateMachine.sendMessage() {
781+
case let .intercept(.some(headers)):
782+
switch self.interceptorStateMachine.interceptResponseMetadata() {
783+
case .intercept:
784+
self.flushNextHeaders = true
785+
self.interceptors?.send(.metadata(headers), promise: nil)
786+
case .cancel:
787+
return self.cancel(error: nil)
788+
case .drop:
789+
()
790+
}
791+
792+
case .intercept(.none), .drop:
793+
// intercept(.none) means headers have already been sent; we should never hit this because
794+
// we guard on setting the response headers above.
795+
()
796+
}
797+
}
798+
try? await completed.get()
799+
}
800+
755801
@usableFromInline
756802
internal func setResponseTrailers(_ headers: HPACKHeaders) async throws {
757803
let completed = self.eventLoop.submit {
@@ -798,6 +844,7 @@ extension AsyncServerHandler: AsyncServerCallContextProvider {
798844
@usableFromInline
799845
protocol AsyncServerCallContextProvider: Sendable {
800846
func setResponseHeaders(_ headers: HPACKHeaders) async throws
847+
func acceptRPC(_ headers: HPACKHeaders) async
801848
func setResponseTrailers(_ trailers: HPACKHeaders) async throws
802849
func setResponseCompression(_ enabled: Bool) async throws
803850

Tests/GRPCTests/AsyncAwaitSupport/AsyncServerHandler/ServerHandlerStateMachine/ServerHandlerStateMachineTests.swift

+4-5
Original file line numberDiff line numberDiff line change
@@ -210,15 +210,15 @@ internal final class ServerHandlerStateMachineTests: GRPCTestCase {
210210

211211
func testSetResponseHeadersWhenHandling() {
212212
var stateMachine = self.makeStateMachine(inState: .handling)
213-
stateMachine.setResponseHeaders(["foo": "bar"])
213+
XCTAssertTrue(stateMachine.setResponseHeaders(["foo": "bar"]))
214214
stateMachine.sendMessage().assertInterceptHeadersThenMessage { headers in
215215
XCTAssertEqual(headers, ["foo": "bar"])
216216
}
217217
}
218218

219219
func testSetResponseHeadersWhenHandlingAreMovedToDraining() {
220220
var stateMachine = self.makeStateMachine(inState: .handling)
221-
stateMachine.setResponseHeaders(["foo": "bar"])
221+
XCTAssertTrue(stateMachine.setResponseHeaders(["foo": "bar"]))
222222
stateMachine.handleEnd().assertForward()
223223
stateMachine.sendMessage().assertInterceptHeadersThenMessage { headers in
224224
XCTAssertEqual(headers, ["foo": "bar"])
@@ -227,16 +227,15 @@ internal final class ServerHandlerStateMachineTests: GRPCTestCase {
227227

228228
func testSetResponseHeadersWhenDraining() {
229229
var stateMachine = self.makeStateMachine(inState: .draining)
230-
stateMachine.setResponseHeaders(["foo": "bar"])
230+
XCTAssertTrue(stateMachine.setResponseHeaders(["foo": "bar"]))
231231
stateMachine.sendMessage().assertInterceptHeadersThenMessage { headers in
232232
XCTAssertEqual(headers, ["foo": "bar"])
233233
}
234234
}
235235

236236
func testSetResponseHeadersWhenFinished() {
237237
var stateMachine = self.makeStateMachine(inState: .finished)
238-
stateMachine.setResponseHeaders(["foo": "bar"])
239-
// Nothing we can assert on, only that we don't crash.
238+
XCTAssertFalse(stateMachine.setResponseHeaders(["foo": "bar"]))
240239
}
241240

242241
func testSetResponseTrailersWhenHandling() {

0 commit comments

Comments
 (0)