From 45017093de4f50eae94ced9669d9dc446a82cb2d Mon Sep 17 00:00:00 2001 From: Gus Cairo Date: Wed, 15 Jan 2025 13:20:27 +0000 Subject: [PATCH 1/6] Add more properties to ClientContext and have the ClientTransport provide it --- .../StructuredSwift+ServiceMetadata.swift | 1 + .../GRPCCore/Call/Client/ClientContext.swift | 44 ++++++++++++++++++- .../ClientRPCExecutor+HedgingExecutor.swift | 4 +- .../ClientRPCExecutor+OneShotExecutor.swift | 4 +- .../ClientRPCExecutor+RetryExecutor.swift | 6 ++- .../Client/Internal/ClientRPCExecutor.swift | 6 +-- .../GRPCCore/Transport/ClientTransport.swift | 4 +- .../InProcessTransport+Client.swift | 14 ++++-- Tests/GRPCCoreTests/GRPCServerTests.swift | 30 ++++++------- .../Transport/AnyTransport.swift | 8 ++-- .../Transport/StreamCountingTransport.swift | 6 +-- .../Transport/ThrowingTransport.swift | 2 +- .../InProcessClientTransportTests.swift | 10 ++--- 13 files changed, 96 insertions(+), 43 deletions(-) diff --git a/Sources/GRPCCodeGen/Internal/StructuredSwift+ServiceMetadata.swift b/Sources/GRPCCodeGen/Internal/StructuredSwift+ServiceMetadata.swift index ad7109a9d..2804eaab0 100644 --- a/Sources/GRPCCodeGen/Internal/StructuredSwift+ServiceMetadata.swift +++ b/Sources/GRPCCodeGen/Internal/StructuredSwift+ServiceMetadata.swift @@ -45,6 +45,7 @@ extension VariableDescription { /// static let descriptor = GRPCCore.MethodDescriptor( /// service: GRPCCore.ServiceDescriptor(fullyQualifiedServiceName: ""), /// method: "" + /// ) /// ``` package static func methodDescriptor( accessModifier: AccessModifier? = nil, diff --git a/Sources/GRPCCore/Call/Client/ClientContext.swift b/Sources/GRPCCore/Call/Client/ClientContext.swift index 51eaa1a21..a06c1db32 100644 --- a/Sources/GRPCCore/Call/Client/ClientContext.swift +++ b/Sources/GRPCCore/Call/Client/ClientContext.swift @@ -19,8 +19,50 @@ public struct ClientContext: Sendable { /// A description of the method being called. public var descriptor: MethodDescriptor + /// A description of the remote peer. + /// + /// The format of the description should follow the pattern ":
" where + /// "" indicates the underlying network transport (such as "ipv4", "unix", or + /// "in-process"). This is a guideline for how descriptions should be formatted; different + /// implementations may not follow this format so you shouldn't make assumptions based on it. + /// + /// Some examples include: + /// - "ipv4:127.0.0.1:31415", + /// - "ipv6:[::1]:443", + /// - "in-process:27182". + public var remotePeer: String + + /// The hostname of the RPC server. + public var serverHostname: String + + /// A description of the local peer. + /// + /// The format of the description should follow the pattern ":
" where + /// "" indicates the underlying network transport (such as "ipv4", "unix", or + /// "in-process"). This is a guideline for how descriptions should be formatted; different + /// implementations may not follow this format so you shouldn't make assumptions based on it. + /// + /// Some examples include: + /// - "ipv4:127.0.0.1:31415", + /// - "ipv6:[::1]:443", + /// - "in-process:27182". + public var localPeer: String + + /// The transport in use (e.g. "tcp", "udp"). + public var networkTransportMethod: String + /// Create a new client interceptor context. - public init(descriptor: MethodDescriptor) { + public init( + descriptor: MethodDescriptor, + remotePeer: String, + localPeer: String, + serverHostname: String, + networkTransportMethod: String + ) { self.descriptor = descriptor + self.remotePeer = remotePeer + self.localPeer = localPeer + self.serverHostname = serverHostname + self.networkTransportMethod = networkTransportMethod } } diff --git a/Sources/GRPCCore/Call/Client/Internal/ClientRPCExecutor+HedgingExecutor.swift b/Sources/GRPCCore/Call/Client/Internal/ClientRPCExecutor+HedgingExecutor.swift index cb44fefff..480b23817 100644 --- a/Sources/GRPCCore/Call/Client/Internal/ClientRPCExecutor+HedgingExecutor.swift +++ b/Sources/GRPCCore/Call/Client/Internal/ClientRPCExecutor+HedgingExecutor.swift @@ -322,7 +322,7 @@ extension ClientRPCExecutor.HedgingExecutor { return try await self.transport.withStream( descriptor: method, options: options - ) { stream -> _HedgingAttemptTaskResult.AttemptResult in + ) { stream, context -> _HedgingAttemptTaskResult.AttemptResult in return await withTaskGroup(of: _HedgingAttemptTaskResult.self) { group in group.addTask { do { @@ -348,8 +348,8 @@ extension ClientRPCExecutor.HedgingExecutor { let response = await ClientRPCExecutor._execute( in: &group, + context: context, request: request, - method: method, attempt: attempt, serializer: self.serializer, deserializer: self.deserializer, diff --git a/Sources/GRPCCore/Call/Client/Internal/ClientRPCExecutor+OneShotExecutor.swift b/Sources/GRPCCore/Call/Client/Internal/ClientRPCExecutor+OneShotExecutor.swift index 32dde4a66..c04158995 100644 --- a/Sources/GRPCCore/Call/Client/Internal/ClientRPCExecutor+OneShotExecutor.swift +++ b/Sources/GRPCCore/Call/Client/Internal/ClientRPCExecutor+OneShotExecutor.swift @@ -98,11 +98,11 @@ extension ClientRPCExecutor.OneShotExecutor { ) async -> Result { return await withTaskGroup(of: Void.self, returning: Result.self) { group in do { - return try await self.transport.withStream(descriptor: method, options: options) { stream in + return try await self.transport.withStream(descriptor: method, options: options) { stream, context in let response = await ClientRPCExecutor._execute( in: &group, + context: context, request: request, - method: method, attempt: 1, serializer: self.serializer, deserializer: self.deserializer, diff --git a/Sources/GRPCCore/Call/Client/Internal/ClientRPCExecutor+RetryExecutor.swift b/Sources/GRPCCore/Call/Client/Internal/ClientRPCExecutor+RetryExecutor.swift index 6e7da3433..03acd0543 100644 --- a/Sources/GRPCCore/Call/Client/Internal/ClientRPCExecutor+RetryExecutor.swift +++ b/Sources/GRPCCore/Call/Client/Internal/ClientRPCExecutor+RetryExecutor.swift @@ -118,7 +118,7 @@ extension ClientRPCExecutor.RetryExecutor { let attemptResult = try await self.transport.withStream( descriptor: method, options: options - ) { stream in + ) { stream, context in group.addTask { var metadata = request.metadata // Work out the timeout from the deadline. @@ -127,6 +127,7 @@ extension ClientRPCExecutor.RetryExecutor { } return await self.executeAttempt( + context: context, stream: stream, metadata: metadata, retryStream: retry.stream, @@ -194,6 +195,7 @@ extension ClientRPCExecutor.RetryExecutor { @inlinable func executeAttempt( + context: ClientContext, stream: RPCStream, metadata: Metadata, retryStream: BroadcastAsyncSequence, @@ -211,8 +213,8 @@ extension ClientRPCExecutor.RetryExecutor { let response = await ClientRPCExecutor._execute( in: &group, + context: context, request: request, - method: method, attempt: attempt, serializer: self.serializer, deserializer: self.deserializer, diff --git a/Sources/GRPCCore/Call/Client/Internal/ClientRPCExecutor.swift b/Sources/GRPCCore/Call/Client/Internal/ClientRPCExecutor.swift index ade536d65..0f94d817e 100644 --- a/Sources/GRPCCore/Call/Client/Internal/ClientRPCExecutor.swift +++ b/Sources/GRPCCore/Call/Client/Internal/ClientRPCExecutor.swift @@ -104,25 +104,25 @@ extension ClientRPCExecutor { /// /// - Parameters: /// - request: The request to execute. - /// - method: A description of the method to execute the request against. + /// - context: The ``ClientContext`` related to this request. /// - attempt: The attempt number of the request. /// - serializer: A serializer to convert input messages to bytes. /// - deserializer: A deserializer to convert bytes to output messages. /// - interceptors: An array of interceptors which the request and response pass through. The /// interceptors will be called in the order of the array. + /// - stream: The stream to excecute the RPC on. /// - Returns: The deserialized response. @inlinable // would be private static func _execute( in group: inout TaskGroup, + context: ClientContext, request: StreamingClientRequest, - method: MethodDescriptor, attempt: Int, serializer: some MessageSerializer, deserializer: some MessageDeserializer, interceptors: [any ClientInterceptor], stream: RPCStream ) async -> StreamingClientResponse { - let context = ClientContext(descriptor: method) if interceptors.isEmpty { return await ClientStreamExecutor.execute( diff --git a/Sources/GRPCCore/Transport/ClientTransport.swift b/Sources/GRPCCore/Transport/ClientTransport.swift index a86a79fea..6ec624c85 100644 --- a/Sources/GRPCCore/Transport/ClientTransport.swift +++ b/Sources/GRPCCore/Transport/ClientTransport.swift @@ -59,12 +59,12 @@ public protocol ClientTransport: Sendable { /// - Parameters: /// - descriptor: A description of the method to open a stream for. /// - options: Options specific to the stream. - /// - closure: A closure that takes the opened stream as parameter. + /// - closure: A closure that takes the opened stream and the client context as its parameters. /// - Returns: Whatever value was returned from `closure`. func withStream( descriptor: MethodDescriptor, options: CallOptions, - _ closure: (_ stream: RPCStream) async throws -> T + _ closure: (_ stream: RPCStream, _ context: ClientContext) async throws -> T ) async throws -> T /// Returns the configuration for a given method. diff --git a/Sources/GRPCInProcessTransport/InProcessTransport+Client.swift b/Sources/GRPCInProcessTransport/InProcessTransport+Client.swift index b24ec07e1..6245822c9 100644 --- a/Sources/GRPCInProcessTransport/InProcessTransport+Client.swift +++ b/Sources/GRPCInProcessTransport/InProcessTransport+Client.swift @@ -225,12 +225,12 @@ extension InProcessTransport { /// - Parameters: /// - descriptor: A description of the method to open a stream for. /// - options: Options specific to the stream. - /// - closure: A closure that takes the opened stream as parameter. + /// - closure: A closure that takes the opened stream and the client context as its parameters. /// - Returns: Whatever value was returned from `closure`. public func withStream( descriptor: MethodDescriptor, options: CallOptions, - _ closure: (RPCStream) async throws -> T + _ closure: (RPCStream, ClientContext) async throws -> T ) async throws -> T { let request = GRPCAsyncThrowingStream.makeStream(of: RPCRequestPart.self) let response = GRPCAsyncThrowingStream.makeStream(of: RPCResponsePart.self) @@ -297,11 +297,19 @@ extension InProcessTransport { } } + let clientContext = ClientContext( + descriptor: descriptor, + remotePeer: "", + localPeer: "", + serverHostname: "", + networkTransportMethod: "in-process" + ) + switch acceptStream { case .success(let streamID): let streamHandlingResult: Result do { - let result = try await closure(clientStream) + let result = try await closure(clientStream, clientContext) streamHandlingResult = .success(result) } catch { streamHandlingResult = .failure(error) diff --git a/Tests/GRPCCoreTests/GRPCServerTests.swift b/Tests/GRPCCoreTests/GRPCServerTests.swift index 388940e83..02a0672e1 100644 --- a/Tests/GRPCCoreTests/GRPCServerTests.swift +++ b/Tests/GRPCCoreTests/GRPCServerTests.swift @@ -48,7 +48,7 @@ final class GRPCServerTests: XCTestCase { try await client.withStream( descriptor: BinaryEcho.Methods.get, options: .defaults - ) { stream in + ) { stream, _ in try await stream.outbound.write(.metadata([:])) try await stream.outbound.write(.message([3, 1, 4, 1, 5])) await stream.outbound.finish() @@ -75,7 +75,7 @@ final class GRPCServerTests: XCTestCase { try await client.withStream( descriptor: BinaryEcho.Methods.collect, options: .defaults - ) { stream in + ) { stream, _ in try await stream.outbound.write(.metadata([:])) try await stream.outbound.write(.message([3])) try await stream.outbound.write(.message([1])) @@ -106,7 +106,7 @@ final class GRPCServerTests: XCTestCase { try await client.withStream( descriptor: BinaryEcho.Methods.expand, options: .defaults - ) { stream in + ) { stream, _ in try await stream.outbound.write(.metadata([:])) try await stream.outbound.write(.message([3, 1, 4, 1, 5])) await stream.outbound.finish() @@ -135,7 +135,7 @@ final class GRPCServerTests: XCTestCase { try await client.withStream( descriptor: BinaryEcho.Methods.update, options: .defaults - ) { stream in + ) { stream, _ in try await stream.outbound.write(.metadata([:])) for byte in [3, 1, 4, 1, 5] as [UInt8] { try await stream.outbound.write(.message([byte])) @@ -166,7 +166,7 @@ final class GRPCServerTests: XCTestCase { try await client.withStream( descriptor: MethodDescriptor(fullyQualifiedService: "not", method: "implemented"), options: .defaults - ) { stream in + ) { stream, _ in try await stream.outbound.write(.metadata([:])) await stream.outbound.finish() @@ -187,7 +187,7 @@ final class GRPCServerTests: XCTestCase { try await client.withStream( descriptor: BinaryEcho.Methods.get, options: .defaults - ) { stream in + ) { stream, _ in try await stream.outbound.write(.metadata([:])) try await stream.outbound.write(.message([i])) await stream.outbound.finish() @@ -225,7 +225,7 @@ final class GRPCServerTests: XCTestCase { try await client.withStream( descriptor: BinaryEcho.Methods.get, options: .defaults - ) { stream in + ) { stream, _ in try await stream.outbound.write(.metadata([:])) await stream.outbound.finish() @@ -250,7 +250,7 @@ final class GRPCServerTests: XCTestCase { try await client.withStream( descriptor: MethodDescriptor(fullyQualifiedService: "not", method: "implemented"), options: .defaults - ) { stream in + ) { stream, _ in try await stream.outbound.write(.metadata([:])) await stream.outbound.finish() @@ -277,7 +277,7 @@ final class GRPCServerTests: XCTestCase { try await client.withStream( descriptor: BinaryEcho.Methods.get, options: .defaults - ) { stream in + ) { stream, _ in XCTFail("Stream shouldn't be opened") } } errorHandler: { error in @@ -291,7 +291,7 @@ final class GRPCServerTests: XCTestCase { try await client.withStream( descriptor: BinaryEcho.Methods.update, options: .defaults - ) { stream in + ) { stream, _ in try await stream.outbound.write(.metadata([:])) var iterator = stream.inbound.makeAsyncIterator() // Don't need to validate the response, just that the server is running. @@ -364,7 +364,7 @@ final class GRPCServerTests: XCTestCase { try await transport.withStream( descriptor: BinaryEcho.Methods.get, options: .defaults - ) { stream in + ) { stream, _ in try await stream.outbound.write(.metadata([:])) try await stream.outbound.write(.message([0])) await stream.outbound.finish() @@ -407,7 +407,7 @@ struct ServerTests { try await client.withStream( descriptor: BinaryEcho.Methods.get, options: .defaults - ) { stream in + ) { stream, _ in try await stream.outbound.write(.metadata([:])) try await stream.outbound.write(.message(Array("hello".utf8))) await stream.outbound.finish() @@ -437,7 +437,7 @@ struct ServerTests { try await client.withStream( descriptor: HelloWorld.Methods.sayHello, options: .defaults - ) { stream in + ) { stream, _ in try await stream.outbound.write(.metadata([:])) try await stream.outbound.write(.message(Array("Swift".utf8))) await stream.outbound.finish() @@ -494,7 +494,7 @@ struct ServerTests { try await client.withStream( descriptor: BinaryEcho.Methods.get, options: .defaults - ) { stream in + ) { stream, _ in try await stream.outbound.write(.metadata([:])) try await stream.outbound.write(.message(Array("hello".utf8))) await stream.outbound.finish() @@ -524,7 +524,7 @@ struct ServerTests { try await client.withStream( descriptor: BinaryEcho.Methods.collect, options: .defaults - ) { stream in + ) { stream, _ in try await stream.outbound.write(.metadata([:])) try await stream.outbound.write(.message(Array("hello".utf8))) await stream.outbound.finish() diff --git a/Tests/GRPCCoreTests/Test Utilities/Transport/AnyTransport.swift b/Tests/GRPCCoreTests/Test Utilities/Transport/AnyTransport.swift index 7ca178fef..e63b5e5f1 100644 --- a/Tests/GRPCCoreTests/Test Utilities/Transport/AnyTransport.swift +++ b/Tests/GRPCCoreTests/Test Utilities/Transport/AnyTransport.swift @@ -24,7 +24,7 @@ struct AnyClientTransport: ClientTransport, Sendable { @Sendable ( _ method: MethodDescriptor, _ options: CallOptions, - _ body: (RPCStream) async throws -> (any Sendable) + _ body: (RPCStream, ClientContext) async throws -> (any Sendable) ) async throws -> Any private let _connect: @Sendable () async throws -> Void private let _close: @Sendable () -> Void @@ -34,8 +34,8 @@ struct AnyClientTransport: ClientTransport, Sendable { where Transport.Inbound == Inbound, Transport.Outbound == Outbound { self._retryThrottle = { transport.retryThrottle } self._withStream = { descriptor, options, closure in - try await transport.withStream(descriptor: descriptor, options: options) { stream in - try await closure(stream) as (any Sendable) + try await transport.withStream(descriptor: descriptor, options: options) { stream, context in + try await closure(stream, context) as (any Sendable) } } @@ -67,7 +67,7 @@ struct AnyClientTransport: ClientTransport, Sendable { func withStream( descriptor: MethodDescriptor, options: CallOptions, - _ closure: (RPCStream) async throws -> T + _ closure: (RPCStream, ClientContext) async throws -> T ) async throws -> T { let result = try await self._withStream(descriptor, options, closure) return result as! T diff --git a/Tests/GRPCCoreTests/Test Utilities/Transport/StreamCountingTransport.swift b/Tests/GRPCCoreTests/Test Utilities/Transport/StreamCountingTransport.swift index 970109286..abd052e7c 100644 --- a/Tests/GRPCCoreTests/Test Utilities/Transport/StreamCountingTransport.swift +++ b/Tests/GRPCCoreTests/Test Utilities/Transport/StreamCountingTransport.swift @@ -54,15 +54,15 @@ struct StreamCountingClientTransport: ClientTransport, Sendable { func withStream( descriptor: MethodDescriptor, options: CallOptions, - _ closure: (RPCStream) async throws -> T + _ closure: (RPCStream, ClientContext) async throws -> T ) async throws -> T { do { return try await self.transport.withStream( descriptor: descriptor, options: options - ) { stream in + ) { stream, context in self._streamsOpened.increment() - return try await closure(stream) + return try await closure(stream, context) } } catch { self._streamFailures.increment() diff --git a/Tests/GRPCCoreTests/Test Utilities/Transport/ThrowingTransport.swift b/Tests/GRPCCoreTests/Test Utilities/Transport/ThrowingTransport.swift index e73bdbdf1..7f7649eb1 100644 --- a/Tests/GRPCCoreTests/Test Utilities/Transport/ThrowingTransport.swift +++ b/Tests/GRPCCoreTests/Test Utilities/Transport/ThrowingTransport.swift @@ -44,7 +44,7 @@ struct ThrowOnStreamCreationTransport: ClientTransport { func withStream( descriptor: MethodDescriptor, options: CallOptions, - _ closure: (RPCStream) async throws -> T + _ closure: (RPCStream, ClientContext) async throws -> T ) async throws -> T { throw RPCError(code: self.code, message: "") } diff --git a/Tests/GRPCInProcessTransportTests/InProcessClientTransportTests.swift b/Tests/GRPCInProcessTransportTests/InProcessClientTransportTests.swift index 8d4e3a2ae..c4f739d42 100644 --- a/Tests/GRPCInProcessTransportTests/InProcessClientTransportTests.swift +++ b/Tests/GRPCInProcessTransportTests/InProcessClientTransportTests.swift @@ -110,7 +110,7 @@ final class InProcessClientTransportTests: XCTestCase { try await withThrowingTaskGroup(of: Void.self) { group in group.addTask { - try await client.withStream(descriptor: .testTest, options: .defaults) { _ in + try await client.withStream(descriptor: .testTest, options: .defaults) { _, _ in // Once the pending stream is opened, close the client to new connections, // so that, once this closure is executed and this stream is closed, // the client will return from `connect()`. @@ -135,7 +135,7 @@ final class InProcessClientTransportTests: XCTestCase { client.beginGracefulShutdown() await XCTAssertThrowsErrorAsync(ofType: RPCError.self) { - try await client.withStream(descriptor: .testTest, options: .defaults) { _ in } + try await client.withStream(descriptor: .testTest, options: .defaults) { _, _ in } } errorHandler: { error in XCTAssertEqual(error.code, .failedPrecondition) } @@ -151,7 +151,7 @@ final class InProcessClientTransportTests: XCTestCase { } group.addTask { - try await client.withStream(descriptor: .testTest, options: .defaults) { stream in + try await client.withStream(descriptor: .testTest, options: .defaults) { stream, _ in try await stream.outbound.write(.message([1])) await stream.outbound.finish() let receivedMessages = try await stream.inbound.reduce(into: []) { $0.append($1) } @@ -248,13 +248,13 @@ final class InProcessClientTransportTests: XCTestCase { } group.addTask { - try await client.withStream(descriptor: .testTest, options: .defaults) { stream in + try await client.withStream(descriptor: .testTest, options: .defaults) { stream, _ in try await Task.sleep(for: .milliseconds(100)) } } group.addTask { - try await client.withStream(descriptor: .testTest, options: .defaults) { stream in + try await client.withStream(descriptor: .testTest, options: .defaults) { stream, _ in try await Task.sleep(for: .milliseconds(100)) } } From e2f177b152eb50d1757e76717e9fcfc790a31efa Mon Sep 17 00:00:00 2001 From: Gus Cairo Date: Wed, 15 Jan 2025 13:26:57 +0000 Subject: [PATCH 2/6] Formatting --- .../Client/Internal/ClientRPCExecutor+OneShotExecutor.swift | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/Sources/GRPCCore/Call/Client/Internal/ClientRPCExecutor+OneShotExecutor.swift b/Sources/GRPCCore/Call/Client/Internal/ClientRPCExecutor+OneShotExecutor.swift index c04158995..f77494b8e 100644 --- a/Sources/GRPCCore/Call/Client/Internal/ClientRPCExecutor+OneShotExecutor.swift +++ b/Sources/GRPCCore/Call/Client/Internal/ClientRPCExecutor+OneShotExecutor.swift @@ -98,7 +98,9 @@ extension ClientRPCExecutor.OneShotExecutor { ) async -> Result { return await withTaskGroup(of: Void.self, returning: Result.self) { group in do { - return try await self.transport.withStream(descriptor: method, options: options) { stream, context in + return try await self.transport.withStream(descriptor: method, options: options) { + stream, + context in let response = await ClientRPCExecutor._execute( in: &group, context: context, From ca14edf537b857769aac847a27fd31f8c300292d Mon Sep 17 00:00:00 2001 From: Gus Cairo Date: Wed, 15 Jan 2025 15:27:31 +0000 Subject: [PATCH 3/6] PR changes --- Sources/GRPCCore/Call/Client/ClientContext.swift | 12 +----------- .../Internal/ClientRPCExecutor+OneShotExecutor.swift | 7 ++++--- .../InProcessTransport+Client.swift | 4 +--- 3 files changed, 6 insertions(+), 17 deletions(-) diff --git a/Sources/GRPCCore/Call/Client/ClientContext.swift b/Sources/GRPCCore/Call/Client/ClientContext.swift index a06c1db32..613cf0c36 100644 --- a/Sources/GRPCCore/Call/Client/ClientContext.swift +++ b/Sources/GRPCCore/Call/Client/ClientContext.swift @@ -32,9 +32,6 @@ public struct ClientContext: Sendable { /// - "in-process:27182". public var remotePeer: String - /// The hostname of the RPC server. - public var serverHostname: String - /// A description of the local peer. /// /// The format of the description should follow the pattern ":
" where @@ -48,21 +45,14 @@ public struct ClientContext: Sendable { /// - "in-process:27182". public var localPeer: String - /// The transport in use (e.g. "tcp", "udp"). - public var networkTransportMethod: String - /// Create a new client interceptor context. public init( descriptor: MethodDescriptor, remotePeer: String, - localPeer: String, - serverHostname: String, - networkTransportMethod: String + localPeer: String ) { self.descriptor = descriptor self.remotePeer = remotePeer self.localPeer = localPeer - self.serverHostname = serverHostname - self.networkTransportMethod = networkTransportMethod } } diff --git a/Sources/GRPCCore/Call/Client/Internal/ClientRPCExecutor+OneShotExecutor.swift b/Sources/GRPCCore/Call/Client/Internal/ClientRPCExecutor+OneShotExecutor.swift index f77494b8e..cc21ad4fc 100644 --- a/Sources/GRPCCore/Call/Client/Internal/ClientRPCExecutor+OneShotExecutor.swift +++ b/Sources/GRPCCore/Call/Client/Internal/ClientRPCExecutor+OneShotExecutor.swift @@ -98,9 +98,10 @@ extension ClientRPCExecutor.OneShotExecutor { ) async -> Result { return await withTaskGroup(of: Void.self, returning: Result.self) { group in do { - return try await self.transport.withStream(descriptor: method, options: options) { - stream, - context in + return try await self.transport.withStream( + descriptor: method, + options: options + ) { stream, context in let response = await ClientRPCExecutor._execute( in: &group, context: context, diff --git a/Sources/GRPCInProcessTransport/InProcessTransport+Client.swift b/Sources/GRPCInProcessTransport/InProcessTransport+Client.swift index 6245822c9..93720dec3 100644 --- a/Sources/GRPCInProcessTransport/InProcessTransport+Client.swift +++ b/Sources/GRPCInProcessTransport/InProcessTransport+Client.swift @@ -300,9 +300,7 @@ extension InProcessTransport { let clientContext = ClientContext( descriptor: descriptor, remotePeer: "", - localPeer: "", - serverHostname: "", - networkTransportMethod: "in-process" + localPeer: "" ) switch acceptStream { From 1cea53b93bf1319cf16cce5be3a8377066949536 Mon Sep 17 00:00:00 2001 From: Gus Cairo Date: Wed, 15 Jan 2025 16:06:16 +0000 Subject: [PATCH 4/6] PR changes --- .../InProcessTransport+Client.swift | 10 +++++++--- .../InProcessTransport+Server.swift | 3 +++ .../GRPCInProcessTransport/InProcessTransport.swift | 2 +- 3 files changed, 11 insertions(+), 4 deletions(-) diff --git a/Sources/GRPCInProcessTransport/InProcessTransport+Client.swift b/Sources/GRPCInProcessTransport/InProcessTransport+Client.swift index 93720dec3..aa28fbac2 100644 --- a/Sources/GRPCInProcessTransport/InProcessTransport+Client.swift +++ b/Sources/GRPCInProcessTransport/InProcessTransport+Client.swift @@ -103,19 +103,23 @@ extension InProcessTransport { private let methodConfig: MethodConfigs private let state: Mutex + private let peer: String /// Creates a new in-process client transport. /// /// - Parameters: /// - server: The in-process server transport to connect to. /// - serviceConfig: Service configuration. + /// - peer: The system's PID for the running client and server. package init( server: InProcessTransport.Server, - serviceConfig: ServiceConfig = ServiceConfig() + serviceConfig: ServiceConfig = ServiceConfig(), + peer: String ) { self.retryThrottle = serviceConfig.retryThrottling.map { RetryThrottle(policy: $0) } self.methodConfig = MethodConfigs(serviceConfig: serviceConfig) self.state = Mutex(.unconnected(.init(serverTransport: server))) + self.peer = peer } /// Establish and maintain a connection to the remote destination. @@ -299,8 +303,8 @@ extension InProcessTransport { let clientContext = ClientContext( descriptor: descriptor, - remotePeer: "", - localPeer: "" + remotePeer: self.peer, + localPeer: self.peer ) switch acceptStream { diff --git a/Sources/GRPCInProcessTransport/InProcessTransport+Server.swift b/Sources/GRPCInProcessTransport/InProcessTransport+Server.swift index 90e291b6e..0c32e61d0 100644 --- a/Sources/GRPCInProcessTransport/InProcessTransport+Server.swift +++ b/Sources/GRPCInProcessTransport/InProcessTransport+Server.swift @@ -74,6 +74,9 @@ extension InProcessTransport { private let handles: Mutex /// Creates a new instance of ``Server``. + /// + /// - Parameters: + /// - peer: The system's PID for the running client and server. package init(peer: String) { (self.newStreams, self.newStreamsContinuation) = AsyncStream.makeStream() self.handles = Mutex(State()) diff --git a/Sources/GRPCInProcessTransport/InProcessTransport.swift b/Sources/GRPCInProcessTransport/InProcessTransport.swift index cd891a64c..e73beee91 100644 --- a/Sources/GRPCInProcessTransport/InProcessTransport.swift +++ b/Sources/GRPCInProcessTransport/InProcessTransport.swift @@ -27,6 +27,6 @@ public struct InProcessTransport: Sendable { public init(serviceConfig: ServiceConfig = ServiceConfig()) { let peer = "in-process:\(System.pid())" self.server = Self.Server(peer: peer) - self.client = Self.Client(server: self.server, serviceConfig: serviceConfig) + self.client = Self.Client(server: self.server, serviceConfig: serviceConfig, peer: peer) } } From 6d4954af9044471f2a01b0370a95662311066a3c Mon Sep 17 00:00:00 2001 From: Gus Cairo Date: Wed, 15 Jan 2025 17:50:23 +0000 Subject: [PATCH 5/6] Fix tests --- Sources/GRPCCore/Transport/ClientTransport.swift | 2 +- .../InProcessTransport+Server.swift | 2 +- .../ClientRPCExecutorTestHarness+Transport.swift | 4 ++-- .../InProcessClientTransportTests.swift | 16 ++++++++++------ 4 files changed, 14 insertions(+), 10 deletions(-) diff --git a/Sources/GRPCCore/Transport/ClientTransport.swift b/Sources/GRPCCore/Transport/ClientTransport.swift index 6ec624c85..659244e11 100644 --- a/Sources/GRPCCore/Transport/ClientTransport.swift +++ b/Sources/GRPCCore/Transport/ClientTransport.swift @@ -47,7 +47,7 @@ public protocol ClientTransport: Sendable { /// running ``connect()``. func beginGracefulShutdown() - /// Opens a stream using the transport, and uses it as input into a user-provided closure. + /// Opens a stream using the transport, and uses it as input into a user-provided closure alongisde the given context. /// /// - Important: The opened stream is closed after the closure is finished. /// diff --git a/Sources/GRPCInProcessTransport/InProcessTransport+Server.swift b/Sources/GRPCInProcessTransport/InProcessTransport+Server.swift index 0c32e61d0..659c53465 100644 --- a/Sources/GRPCInProcessTransport/InProcessTransport+Server.swift +++ b/Sources/GRPCInProcessTransport/InProcessTransport+Server.swift @@ -34,7 +34,7 @@ extension InProcessTransport { private let newStreams: AsyncStream> private let newStreamsContinuation: AsyncStream>.Continuation - private let peer: String + package let peer: String private struct State: Sendable { private var _nextID: UInt64 diff --git a/Tests/GRPCCoreTests/Call/Client/Internal/ClientRPCExecutorTestSupport/ClientRPCExecutorTestHarness+Transport.swift b/Tests/GRPCCoreTests/Call/Client/Internal/ClientRPCExecutorTestSupport/ClientRPCExecutorTestHarness+Transport.swift index aa5a21332..c80a63b7b 100644 --- a/Tests/GRPCCoreTests/Call/Client/Internal/ClientRPCExecutorTestSupport/ClientRPCExecutorTestHarness+Transport.swift +++ b/Tests/GRPCCoreTests/Call/Client/Internal/ClientRPCExecutorTestSupport/ClientRPCExecutorTestHarness+Transport.swift @@ -1,5 +1,5 @@ /* - * Copyright 2023, gRPC Authors All rights reserved. + * Copyright 2023-2025, gRPC Authors All rights reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -21,6 +21,6 @@ extension InProcessTransport.Server { func spawnClientTransport( throttle: RetryThrottle = RetryThrottle(maxTokens: 10, tokenRatio: 0.1) ) -> InProcessTransport.Client { - return InProcessTransport.Client(server: self) + return InProcessTransport.Client(server: self, peer: self.peer) } } diff --git a/Tests/GRPCInProcessTransportTests/InProcessClientTransportTests.swift b/Tests/GRPCInProcessTransportTests/InProcessClientTransportTests.swift index c4f739d42..c1e5dfc9b 100644 --- a/Tests/GRPCInProcessTransportTests/InProcessClientTransportTests.swift +++ b/Tests/GRPCInProcessTransportTests/InProcessClientTransportTests.swift @@ -1,5 +1,5 @@ /* - * Copyright 2023, gRPC Authors All rights reserved. + * Copyright 2023-2025, gRPC Authors All rights reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -198,9 +198,11 @@ final class InProcessClientTransportTests: XCTestCase { ] ) + let peer = "in-process:1234" var client = InProcessTransport.Client( - server: InProcessTransport.Server(peer: "in-process:1234"), - serviceConfig: serviceConfig + server: InProcessTransport.Server(peer: peer), + serviceConfig: serviceConfig, + peer: peer ) let firstDescriptor = MethodDescriptor(fullyQualifiedService: "test", method: "first") @@ -223,8 +225,9 @@ final class InProcessClientTransportTests: XCTestCase { ) serviceConfig.methodConfig.append(overrideConfiguration) client = InProcessTransport.Client( - server: InProcessTransport.Server(peer: "in-process:1234"), - serviceConfig: serviceConfig + server: InProcessTransport.Server(peer: peer), + serviceConfig: serviceConfig, + peer: peer ) let secondDescriptor = MethodDescriptor(fullyQualifiedService: "test", method: "second") @@ -290,7 +293,8 @@ final class InProcessClientTransportTests: XCTestCase { return InProcessTransport.Client( server: server, - serviceConfig: serviceConfig + serviceConfig: serviceConfig, + peer: server.peer ) } } From c4cb2e7b4df40cf16a4532d4632656436859d605 Mon Sep 17 00:00:00 2001 From: Gus Cairo Date: Thu, 16 Jan 2025 10:46:08 +0000 Subject: [PATCH 6/6] Fix license header check script --- dev/license-check.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dev/license-check.sh b/dev/license-check.sh index 889478b31..be92bcf85 100755 --- a/dev/license-check.sh +++ b/dev/license-check.sh @@ -88,7 +88,7 @@ check_copyright_headers() { actual_sha=$(head -n "$((drop_first + expected_lines))" "$filename" \ | tail -n "$expected_lines" \ - | sed -e 's/201[56789]-20[12][0-9]/YEARS/' -e 's/20[12][0-9]/YEARS/' \ + | sed -e 's/20[12][0-9]-20[12][0-9]/YEARS/' -e 's/20[12][0-9]/YEARS/' \ | shasum \ | awk '{print $1}')