Skip to content

Commit ae2932c

Browse files
glbrnttWendellXY
authored andcommitted
Expose closeFuture in server interceptor context (grpc#1553)
Motivation: Server calls expose a `closeFuture` where users can register callbacks to tear things down when the RPC ends. Interceptors don't have this capability and must rely on observing an `.end`. Modifications: Expose the `closeFuture` from `ServerCallContext` to the `ServerInterceptorContext`. Result: - Users can be notified in interceptors when the call ends. - Resolves grpc#1552
1 parent d7183a2 commit ae2932c

9 files changed

+78
-1
lines changed

Sources/GRPC/AsyncAwaitSupport/GRPCAsyncServerHandler.swift

+1
Original file line numberDiff line numberDiff line change
@@ -283,6 +283,7 @@ internal final class AsyncServerHandler<
283283
callType: callType,
284284
remoteAddress: context.remoteAddress,
285285
userInfoRef: self.userInfoRef,
286+
closeFuture: context.closeFuture,
286287
interceptors: interceptors,
287288
onRequestPart: self.receiveInterceptedPart(_:),
288289
onResponsePart: self.sendInterceptedPart(_:promise:)

Sources/GRPC/CallHandlers/BidirectionalStreamingServerHandler.swift

+1
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,7 @@ public final class BidirectionalStreamingServerHandler<
9292
callType: .bidirectionalStreaming,
9393
remoteAddress: context.remoteAddress,
9494
userInfoRef: userInfoRef,
95+
closeFuture: context.closeFuture,
9596
interceptors: interceptors,
9697
onRequestPart: self.receiveInterceptedPart(_:),
9798
onResponsePart: self.sendInterceptedPart(_:promise:)

Sources/GRPC/CallHandlers/ClientStreamingServerHandler.swift

+1
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,7 @@ public final class ClientStreamingServerHandler<
9393
callType: .clientStreaming,
9494
remoteAddress: context.remoteAddress,
9595
userInfoRef: userInfoRef,
96+
closeFuture: context.closeFuture,
9697
interceptors: interceptors,
9798
onRequestPart: self.receiveInterceptedPart(_:),
9899
onResponsePart: self.sendInterceptedPart(_:promise:)

Sources/GRPC/CallHandlers/ServerStreamingServerHandler.swift

+1
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,7 @@ public final class ServerStreamingServerHandler<
8989
callType: .serverStreaming,
9090
remoteAddress: context.remoteAddress,
9191
userInfoRef: userInfoRef,
92+
closeFuture: context.closeFuture,
9293
interceptors: interceptors,
9394
onRequestPart: self.receiveInterceptedPart(_:),
9495
onResponsePart: self.sendInterceptedPart(_:promise:)

Sources/GRPC/CallHandlers/UnaryServerHandler.swift

+1
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,7 @@ public final class UnaryServerHandler<
8787
callType: .unary,
8888
remoteAddress: context.remoteAddress,
8989
userInfoRef: userInfoRef,
90+
closeFuture: context.closeFuture,
9091
interceptors: interceptors,
9192
onRequestPart: self.receiveInterceptedPart(_:),
9293
onResponsePart: self.sendInterceptedPart(_:promise:)

Sources/GRPC/Interceptor/ServerInterceptorContext.swift

+6
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,12 @@ public struct ServerInterceptorContext<Request, Response> {
5454
return self._pipeline.remoteAddress
5555
}
5656

57+
/// A future which completes when the call closes. This may be used to register callbacks which
58+
/// free up resources used by the interceptor.
59+
public var closeFuture: EventLoopFuture<Void> {
60+
return self._pipeline.closeFuture
61+
}
62+
5763
/// A 'UserInfo' dictionary.
5864
///
5965
/// - Important: While `UserInfo` has value-semantics, this property retrieves from, and sets a

Sources/GRPC/Interceptor/ServerInterceptorPipeline.swift

+7
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,11 @@ internal final class ServerInterceptorPipeline<Request, Response> {
4242
@usableFromInline
4343
internal let userInfoRef: Ref<UserInfo>
4444

45+
/// A future which completes when the call closes. This may be used to register callbacks which
46+
/// free up resources used by the interceptor.
47+
@usableFromInline
48+
internal let closeFuture: EventLoopFuture<Void>
49+
4550
/// Called when a response part has traversed the interceptor pipeline.
4651
@usableFromInline
4752
internal let _onResponsePart: (GRPCServerResponsePart<Response>, EventLoopPromise<Void>?) -> Void
@@ -99,6 +104,7 @@ internal final class ServerInterceptorPipeline<Request, Response> {
99104
callType: GRPCCallType,
100105
remoteAddress: SocketAddress?,
101106
userInfoRef: Ref<UserInfo>,
107+
closeFuture: EventLoopFuture<Void>,
102108
interceptors: [ServerInterceptor<Request, Response>],
103109
onRequestPart: @escaping (GRPCServerRequestPart<Request>) -> Void,
104110
onResponsePart: @escaping (GRPCServerResponsePart<Response>, EventLoopPromise<Void>?) -> Void
@@ -109,6 +115,7 @@ internal final class ServerInterceptorPipeline<Request, Response> {
109115
self.type = callType
110116
self.remoteAddress = remoteAddress
111117
self.userInfoRef = userInfoRef
118+
self.closeFuture = closeFuture
112119

113120
self._onResponsePart = onResponsePart
114121
self._onRequestPart = onRequestPart

Tests/GRPCTests/InterceptorsTests.swift

+59-1
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
* See the License for the specific language governing permissions and
1414
* limitations under the License.
1515
*/
16+
import Atomics
1617
import EchoImplementation
1718
import EchoModel
1819
import GRPC
@@ -28,14 +29,15 @@ class InterceptorsTests: GRPCTestCase {
2829
private var server: Server!
2930
private var connection: ClientConnection!
3031
private var echo: Echo_EchoNIOClient!
32+
private let onCloseCounter = ManagedAtomic<Int>(0)
3133

3234
override func setUp() {
3335
super.setUp()
3436
self.group = MultiThreadedEventLoopGroup(numberOfThreads: 1)
3537

3638
self.server = try! Server.insecure(group: self.group)
3739
.withServiceProviders([
38-
EchoProvider(),
40+
EchoProvider(interceptors: CountOnCloseInterceptors(counter: self.onCloseCounter)),
3941
HelloWorldProvider(interceptors: HelloWorldServerInterceptorFactory()),
4042
])
4143
.withLogger(self.serverLogger)
@@ -64,6 +66,8 @@ class InterceptorsTests: GRPCTestCase {
6466
let get = self.echo.get(.with { $0.text = "hello" })
6567
assertThat(try get.response.wait(), .is(.with { $0.text = "hello :teg ohce tfiwS" }))
6668
assertThat(try get.status.wait(), .hasCode(.ok))
69+
70+
XCTAssertEqual(self.onCloseCounter.load(ordering: .sequentiallyConsistent), 1)
6771
}
6872

6973
func testCollect() {
@@ -73,6 +77,8 @@ class InterceptorsTests: GRPCTestCase {
7377
collect.sendEnd(promise: nil)
7478
assertThat(try collect.response.wait(), .is(.with { $0.text = "3 4 1 2 :tcelloc ohce tfiwS" }))
7579
assertThat(try collect.status.wait(), .hasCode(.ok))
80+
81+
XCTAssertEqual(self.onCloseCounter.load(ordering: .sequentiallyConsistent), 1)
7682
}
7783

7884
func testExpand() {
@@ -81,6 +87,8 @@ class InterceptorsTests: GRPCTestCase {
8187
assertThat(response, .is(.with { $0.text = "hello :)0( dnapxe ohce tfiwS" }))
8288
}
8389
assertThat(try expand.status.wait(), .hasCode(.ok))
90+
91+
XCTAssertEqual(self.onCloseCounter.load(ordering: .sequentiallyConsistent), 1)
8492
}
8593

8694
func testUpdate() {
@@ -91,6 +99,8 @@ class InterceptorsTests: GRPCTestCase {
9199
update.sendMessage(.with { $0.text = "hello" }, promise: nil)
92100
update.sendEnd(promise: nil)
93101
assertThat(try update.status.wait(), .hasCode(.ok))
102+
103+
XCTAssertEqual(self.onCloseCounter.load(ordering: .sequentiallyConsistent), 1)
94104
}
95105

96106
func testSayHello() {
@@ -360,6 +370,54 @@ final class ReversingInterceptors: Echo_EchoClientInterceptorFactoryProtocol {
360370
}
361371
}
362372

373+
final class CountOnCloseInterceptors: Echo_EchoServerInterceptorFactoryProtocol {
374+
// This interceptor is stateless, let's just share it.
375+
private let interceptors: [ServerInterceptor<Echo_EchoRequest, Echo_EchoResponse>]
376+
377+
init(counter: ManagedAtomic<Int>) {
378+
self.interceptors = [CountOnCloseServerInterceptor(counter: counter)]
379+
}
380+
381+
func makeGetInterceptors() -> [ServerInterceptor<Echo_EchoRequest, Echo_EchoResponse>] {
382+
return self.interceptors
383+
}
384+
385+
func makeExpandInterceptors() -> [ServerInterceptor<Echo_EchoRequest, Echo_EchoResponse>] {
386+
return self.interceptors
387+
}
388+
389+
func makeCollectInterceptors() -> [ServerInterceptor<Echo_EchoRequest, Echo_EchoResponse>] {
390+
return self.interceptors
391+
}
392+
393+
func makeUpdateInterceptors() -> [ServerInterceptor<Echo_EchoRequest, Echo_EchoResponse>] {
394+
return self.interceptors
395+
}
396+
}
397+
398+
final class CountOnCloseServerInterceptor: ServerInterceptor<Echo_EchoRequest, Echo_EchoResponse> {
399+
private let counter: ManagedAtomic<Int>
400+
401+
init(counter: ManagedAtomic<Int>) {
402+
self.counter = counter
403+
}
404+
405+
override func receive(
406+
_ part: GRPCServerRequestPart<Echo_EchoRequest>,
407+
context: ServerInterceptorContext<Echo_EchoRequest, Echo_EchoResponse>
408+
) {
409+
switch part {
410+
case .metadata:
411+
context.closeFuture.whenComplete { _ in
412+
self.counter.wrappingIncrement(ordering: .sequentiallyConsistent)
413+
}
414+
default:
415+
()
416+
}
417+
context.receive(part)
418+
}
419+
}
420+
363421
private enum MagicKey: UserInfo.Key {
364422
typealias Value = String
365423
}

Tests/GRPCTests/ServerInterceptorPipelineTests.swift

+1
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ class ServerInterceptorPipelineTests: GRPCTestCase {
4343
callType: callType,
4444
remoteAddress: nil,
4545
userInfoRef: Ref(UserInfo()),
46+
closeFuture: self.embeddedEventLoop.makeSucceededVoidFuture(),
4647
interceptors: interceptors,
4748
onRequestPart: onRequestPart,
4849
onResponsePart: onResponsePart

0 commit comments

Comments
 (0)