Skip to content

Commit 0fc4956

Browse files
authored
Add remote peer info to the server context (#2136)
Motivation: It's often useful to know the identity of the remote peer when handling RPCs. Modifications: - Add a 'peer' to the server context - Implement this for the in-process transport - Make some in-process inits `package`, these should never have been `public` Result: Server RPCs have some idea what the address of remote peer is
1 parent e2629bc commit 0fc4956

File tree

11 files changed

+142
-15
lines changed

11 files changed

+142
-15
lines changed

Sources/GRPCCore/Call/Server/ServerContext.swift

+20-1
Original file line numberDiff line numberDiff line change
@@ -19,17 +19,36 @@ public struct ServerContext: Sendable {
1919
/// A description of the method being called.
2020
public var descriptor: MethodDescriptor
2121

22+
/// A description of the remote peer.
23+
///
24+
/// The format of the description should follow the pattern "<transport>:<address>" where
25+
/// "<transport>" indicates the underlying network transport (such as "ipv4", "unix", or
26+
/// "in-process"). This is a guideline for how descriptions should be formatted; different
27+
/// implementations may not follow this format so you shouldn't make assumptions based on it.
28+
///
29+
/// Some examples include:
30+
/// - "ipv4:127.0.0.1:31415",
31+
/// - "ipv6:[::1]:443",
32+
/// - "in-process:27182".
33+
public var peer: String
34+
2235
/// A handle for checking the cancellation status of an RPC.
2336
public var cancellation: RPCCancellationHandle
2437

2538
/// Create a new server context.
2639
///
2740
/// - Parameters:
2841
/// - descriptor: A description of the method being called.
42+
/// - peer: A description of the remote peer.
2943
/// - cancellation: A cancellation handle. You can create a cancellation handle
3044
/// using ``withServerContextRPCCancellationHandle(_:)``.
31-
public init(descriptor: MethodDescriptor, cancellation: RPCCancellationHandle) {
45+
public init(
46+
descriptor: MethodDescriptor,
47+
peer: String,
48+
cancellation: RPCCancellationHandle
49+
) {
3250
self.descriptor = descriptor
51+
self.peer = peer
3352
self.cancellation = cancellation
3453
}
3554
}

Sources/GRPCInProcessTransport/InProcessTransport+Client.swift

+1-1
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,7 @@ extension InProcessTransport {
109109
/// - Parameters:
110110
/// - server: The in-process server transport to connect to.
111111
/// - serviceConfig: Service configuration.
112-
public init(
112+
package init(
113113
server: InProcessTransport.Server,
114114
serviceConfig: ServiceConfig = ServiceConfig()
115115
) {

Sources/GRPCInProcessTransport/InProcessTransport+Server.swift

+8-2
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ extension InProcessTransport {
3434

3535
private let newStreams: AsyncStream<RPCStream<Inbound, Outbound>>
3636
private let newStreamsContinuation: AsyncStream<RPCStream<Inbound, Outbound>>.Continuation
37+
private let peer: String
3738

3839
private struct State: Sendable {
3940
private var _nextID: UInt64
@@ -73,9 +74,10 @@ extension InProcessTransport {
7374
private let handles: Mutex<State>
7475

7576
/// Creates a new instance of ``Server``.
76-
public init() {
77+
package init(peer: String) {
7778
(self.newStreams, self.newStreamsContinuation) = AsyncStream.makeStream()
7879
self.handles = Mutex(State())
80+
self.peer = peer
7981
}
8082

8183
/// Publish a new ``RPCStream``, which will be returned by the transport's ``events``
@@ -115,7 +117,11 @@ extension InProcessTransport {
115117
handle.cancel()
116118
}
117119

118-
let context = ServerContext(descriptor: stream.descriptor, cancellation: handle)
120+
let context = ServerContext(
121+
descriptor: stream.descriptor,
122+
peer: self.peer,
123+
cancellation: handle
124+
)
119125
await streamHandler(stream, context)
120126
}
121127
}

Sources/GRPCInProcessTransport/InProcessTransport.swift

+2-1
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,8 @@ public struct InProcessTransport: Sendable {
2525
/// - Parameters:
2626
/// - serviceConfig: Configuration describing how methods should be executed.
2727
public init(serviceConfig: ServiceConfig = ServiceConfig()) {
28-
self.server = Self.Server()
28+
let peer = "in-process:\(System.pid())"
29+
self.server = Self.Server(peer: peer)
2930
self.client = Self.Client(server: self.server, serviceConfig: serviceConfig)
3031
}
3132
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
/*
2+
* Copyright 2024, gRPC Authors All rights reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
#if canImport(Darwin)
18+
import Darwin
19+
#elseif canImport(Glibc)
20+
import Glibc
21+
#elseif canImport(Musl)
22+
import Musl
23+
#endif
24+
25+
enum System {
26+
static func pid() -> Int {
27+
#if canImport(Darwin)
28+
let pid = Darwin.getpid()
29+
return Int(pid)
30+
#elseif canImport(Glibc)
31+
let pid = Glibc.getpid()
32+
return Int(pid)
33+
#elseif canImport(Musl)
34+
let pid = Musl.getpid()
35+
return Int(pid)
36+
#endif
37+
}
38+
}

Tests/GRPCCoreTests/Call/Client/Internal/ClientRPCExecutorTestSupport/ClientRPCExecutorTestHarness.swift

+2-2
Original file line numberDiff line numberDiff line change
@@ -47,13 +47,13 @@ struct ClientRPCExecutorTestHarness {
4747

4848
switch transport {
4949
case .inProcess:
50-
let server = InProcessTransport.Server()
50+
let server = InProcessTransport.Server(peer: "in-process:1234")
5151
let client = server.spawnClientTransport()
5252
self.serverTransport = StreamCountingServerTransport(wrapping: server)
5353
self.clientTransport = StreamCountingClientTransport(wrapping: client)
5454

5555
case .throwsOnStreamCreation(let code):
56-
let server = InProcessTransport.Server() // Will never be called.
56+
let server = InProcessTransport.Server(peer: "in-process:1234") // Will never be called.
5757
let client = ThrowOnStreamCreationTransport(code: code)
5858
self.serverTransport = StreamCountingServerTransport(wrapping: server)
5959
self.clientTransport = StreamCountingClientTransport(wrapping: client)

Tests/GRPCCoreTests/Call/Server/Internal/ServerRPCExecutorTestSupport/ServerRPCExecutorTestHarness.swift

+1
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,7 @@ struct ServerRPCExecutorTestHarness {
102102
await withServerContextRPCCancellationHandle { cancellation in
103103
let context = ServerContext(
104104
descriptor: MethodDescriptor(fullyQualifiedService: "foo", method: "bar"),
105+
peer: "tests",
105106
cancellation: cancellation
106107
)
107108

Tests/GRPCCoreTests/GRPCServerTests.swift

+4-1
Original file line numberDiff line numberDiff line change
@@ -334,7 +334,10 @@ final class GRPCServerTests: XCTestCase {
334334
}
335335

336336
func testTestRunStoppedServer() async throws {
337-
let server = GRPCServer(transport: InProcessTransport.Server(), services: [])
337+
let server = GRPCServer(
338+
transport: InProcessTransport.Server(peer: "in-process:1234"),
339+
services: []
340+
)
338341
// Run the server.
339342
let task = Task { try await server.serve() }
340343
task.cancel()

Tests/GRPCInProcessTransportTests/InProcessClientTransportTests.swift

+5-5
Original file line numberDiff line numberDiff line change
@@ -142,7 +142,7 @@ final class InProcessClientTransportTests: XCTestCase {
142142
}
143143

144144
func testOpenStreamSuccessfullyAndThenClose() async throws {
145-
let server = InProcessTransport.Server()
145+
let server = InProcessTransport.Server(peer: "in-process:1234")
146146
let client = makeClient(server: server)
147147

148148
try await withThrowingTaskGroup(of: Void.self) { group in
@@ -199,7 +199,7 @@ final class InProcessClientTransportTests: XCTestCase {
199199
)
200200

201201
var client = InProcessTransport.Client(
202-
server: InProcessTransport.Server(),
202+
server: InProcessTransport.Server(peer: "in-process:1234"),
203203
serviceConfig: serviceConfig
204204
)
205205

@@ -223,7 +223,7 @@ final class InProcessClientTransportTests: XCTestCase {
223223
)
224224
serviceConfig.methodConfig.append(overrideConfiguration)
225225
client = InProcessTransport.Client(
226-
server: InProcessTransport.Server(),
226+
server: InProcessTransport.Server(peer: "in-process:1234"),
227227
serviceConfig: serviceConfig
228228
)
229229

@@ -239,7 +239,7 @@ final class InProcessClientTransportTests: XCTestCase {
239239
}
240240

241241
func testOpenMultipleStreamsThenClose() async throws {
242-
let server = InProcessTransport.Server()
242+
let server = InProcessTransport.Server(peer: "in-process:1234")
243243
let client = makeClient(server: server)
244244

245245
try await withThrowingTaskGroup(of: Void.self) { group in
@@ -269,7 +269,7 @@ final class InProcessClientTransportTests: XCTestCase {
269269
}
270270

271271
func makeClient(
272-
server: InProcessTransport.Server = InProcessTransport.Server()
272+
server: InProcessTransport.Server = InProcessTransport.Server(peer: "in-process:1234")
273273
) -> InProcessTransport.Client {
274274
let defaultPolicy = RetryPolicy(
275275
maxAttempts: 10,

Tests/GRPCInProcessTransportTests/InProcessServerTransportTests.swift

+2-2
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ import XCTest
2121

2222
final class InProcessServerTransportTests: XCTestCase {
2323
func testStartListening() async throws {
24-
let transport = InProcessTransport.Server()
24+
let transport = InProcessTransport.Server(peer: "in-process:1234")
2525

2626
let outbound = GRPCAsyncThrowingStream.makeStream(of: RPCResponsePart.self)
2727
let stream = RPCStream<
@@ -53,7 +53,7 @@ final class InProcessServerTransportTests: XCTestCase {
5353
}
5454

5555
func testStopListening() async throws {
56-
let transport = InProcessTransport.Server()
56+
let transport = InProcessTransport.Server(peer: "in-process:1234")
5757

5858
let firstStreamOutbound = GRPCAsyncThrowingStream.makeStream(of: RPCResponsePart.self)
5959
let firstStream = RPCStream<

Tests/GRPCInProcessTransportTests/InProcessTransportTests.swift

+59
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,29 @@ struct InProcessTransportTests {
6464
client.beginGracefulShutdown()
6565
}
6666
}
67+
68+
@Test("Peer info")
69+
func peerInfo() async throws {
70+
try await self.withTestServerAndClient { server, client in
71+
defer {
72+
client.beginGracefulShutdown()
73+
server.beginGracefulShutdown()
74+
}
75+
76+
let peerInfo = try await client.unary(
77+
request: ClientRequest(message: ()),
78+
descriptor: .peerInfo,
79+
serializer: VoidSerializer(),
80+
deserializer: UTF8Deserializer(),
81+
options: .defaults
82+
) {
83+
try $0.message
84+
}
85+
86+
let match = peerInfo.wholeMatch(of: /in-process:\d+/)
87+
#expect(match != nil)
88+
}
89+
}
6790
}
6891

6992
private struct TestService: RegistrableRPCService {
@@ -96,6 +119,13 @@ private struct TestService: RegistrableRPCService {
96119
}
97120
}
98121

122+
func peerInfo(
123+
request: ServerRequest<Void>,
124+
context: ServerContext
125+
) async throws -> ServerResponse<String> {
126+
return ServerResponse(message: context.peer)
127+
}
128+
99129
func registerMethods(with router: inout RPCRouter) {
100130
router.registerHandler(
101131
forMethod: .testCancellation,
@@ -105,6 +135,19 @@ private struct TestService: RegistrableRPCService {
105135
try await self.cancellation(request: ServerRequest(stream: $0), context: $1)
106136
}
107137
)
138+
139+
router.registerHandler(
140+
forMethod: .peerInfo,
141+
deserializer: VoidDeserializer(),
142+
serializer: UTF8Serializer(),
143+
handler: {
144+
let response = try await self.peerInfo(
145+
request: ServerRequest<Void>(stream: $0),
146+
context: $1
147+
)
148+
return StreamingServerResponse(single: response)
149+
}
150+
)
108151
}
109152
}
110153

@@ -113,6 +156,11 @@ extension MethodDescriptor {
113156
fullyQualifiedService: "test",
114157
method: "cancellation"
115158
)
159+
160+
fileprivate static let peerInfo = Self(
161+
fullyQualifiedService: "test",
162+
method: "peerInfo"
163+
)
116164
}
117165

118166
private struct UTF8Serializer: MessageSerializer {
@@ -126,3 +174,14 @@ private struct UTF8Deserializer: MessageDeserializer {
126174
String(decoding: serializedMessageBytes, as: UTF8.self)
127175
}
128176
}
177+
178+
private struct VoidSerializer: MessageSerializer {
179+
func serialize(_ message: Void) throws -> [UInt8] {
180+
[]
181+
}
182+
}
183+
184+
private struct VoidDeserializer: MessageDeserializer {
185+
func deserialize(_ serializedMessageBytes: [UInt8]) throws {
186+
}
187+
}

0 commit comments

Comments
 (0)