Skip to content

Auto extract IDs from headers on the server #1510

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion Sources/GRPC/AsyncAwaitSupport/GRPCAsyncServerHandler.swift
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,10 @@ internal final class AsyncServerHandler<

/// A logger.
@usableFromInline
internal let logger: Logger
internal var logger: Logger

@usableFromInline
internal let traceIDExtractor: Server.Configuration.TraceIDExtractor?

/// A reference to the user info. This is shared with the interceptor pipeline and may be accessed
/// from the async call context. `UserInfo` is _not_ `Sendable` and must always be accessed from
Expand Down Expand Up @@ -267,6 +270,7 @@ internal final class AsyncServerHandler<
self.compressionEnabledOnRPC = context.encoding.isEnabled
self.compressResponsesIfPossible = true
self.logger = context.logger
self.traceIDExtractor = context.traceIDExtractor

self.userInfoRef = Ref(UserInfo())
self.handlerStateMachine = .init()
Expand Down Expand Up @@ -295,6 +299,10 @@ internal final class AsyncServerHandler<
internal func receiveMetadata(_ headers: HPACKHeaders) {
switch self.interceptorStateMachine.interceptRequestMetadata() {
case .intercept:
if let extractor = self.traceIDExtractor, let id = extractor.extract(from: headers) {
self.logger[metadataKey: extractor.loggerKey] = "\(id)"
self.interceptors?.logger[metadataKey: extractor.loggerKey] = "\(id)"
}
self.interceptors?.receive(.metadata(headers))
case .cancel:
self.cancel(error: nil)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import Logging
import NIOCore
import NIOHPACK

Expand Down Expand Up @@ -56,6 +57,9 @@ public final class BidirectionalStreamingServerHandler<
@usableFromInline
internal var state: State = .idle

@usableFromInline
internal var logger: Logger

@usableFromInline
internal enum State {
// No headers have been received.
Expand Down Expand Up @@ -85,6 +89,7 @@ public final class BidirectionalStreamingServerHandler<

let userInfoRef = Ref(UserInfo())
self.userInfoRef = userInfoRef
self.logger = context.logger
self.interceptors = ServerInterceptorPipeline(
logger: context.logger,
eventLoop: context.eventLoop,
Expand All @@ -102,6 +107,10 @@ public final class BidirectionalStreamingServerHandler<

@inlinable
public func receiveMetadata(_ headers: HPACKHeaders) {
if let extractor = self.context.traceIDExtractor, let id = extractor.extract(from: headers) {
self.logger[metadataKey: extractor.loggerKey] = "\(id)"
self.interceptors.logger[metadataKey: extractor.loggerKey] = "\(id)"
}
self.interceptors.receive(.metadata(headers))
}

Expand Down Expand Up @@ -164,7 +173,7 @@ public final class BidirectionalStreamingServerHandler<
let context = _StreamingResponseCallContext<Request, Response>(
eventLoop: self.context.eventLoop,
headers: headers,
logger: self.context.logger,
logger: self.logger,
userInfoRef: self.userInfoRef,
compressionIsEnabled: self.context.encoding.isEnabled,
closeFuture: self.context.closeFuture,
Expand Down
11 changes: 10 additions & 1 deletion Sources/GRPC/CallHandlers/ClientStreamingServerHandler.swift
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import Logging
import NIOCore
import NIOHPACK

Expand Down Expand Up @@ -56,6 +57,9 @@ public final class ClientStreamingServerHandler<
@usableFromInline
internal var state: State = .idle

@usableFromInline
internal var logger: Logger

@usableFromInline
internal enum State {
// Nothing has happened yet.
Expand Down Expand Up @@ -86,6 +90,7 @@ public final class ClientStreamingServerHandler<

let userInfoRef = Ref(UserInfo())
self.userInfoRef = userInfoRef
self.logger = context.logger
self.interceptors = ServerInterceptorPipeline(
logger: context.logger,
eventLoop: context.eventLoop,
Expand All @@ -103,6 +108,10 @@ public final class ClientStreamingServerHandler<

@inlinable
public func receiveMetadata(_ headers: HPACKHeaders) {
if let extractor = self.context.traceIDExtractor, let id = extractor.extract(from: headers) {
self.logger[metadataKey: extractor.loggerKey] = "\(id)"
self.interceptors.logger[metadataKey: extractor.loggerKey] = "\(id)"
}
self.interceptors.receive(.metadata(headers))
}

Expand Down Expand Up @@ -165,7 +174,7 @@ public final class ClientStreamingServerHandler<
let context = UnaryResponseCallContext<Response>(
eventLoop: self.context.eventLoop,
headers: headers,
logger: self.context.logger,
logger: self.logger,
userInfoRef: self.userInfoRef,
closeFuture: self.context.closeFuture
)
Expand Down
11 changes: 10 additions & 1 deletion Sources/GRPC/CallHandlers/ServerStreamingServerHandler.swift
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import Logging
import NIOCore
import NIOHPACK

Expand Down Expand Up @@ -52,6 +53,9 @@ public final class ServerStreamingServerHandler<
@usableFromInline
internal var state: State = .idle

@usableFromInline
internal var logger: Logger

@usableFromInline
internal enum State {
// Initial state. Nothing has happened yet.
Expand Down Expand Up @@ -82,6 +86,7 @@ public final class ServerStreamingServerHandler<

let userInfoRef = Ref(UserInfo())
self.userInfoRef = userInfoRef
self.logger = context.logger
self.interceptors = ServerInterceptorPipeline(
logger: context.logger,
eventLoop: context.eventLoop,
Expand All @@ -99,6 +104,10 @@ public final class ServerStreamingServerHandler<

@inlinable
public func receiveMetadata(_ headers: HPACKHeaders) {
if let extractor = self.context.traceIDExtractor, let id = extractor.extract(from: headers) {
self.logger[metadataKey: extractor.loggerKey] = "\(id)"
self.interceptors.logger[metadataKey: extractor.loggerKey] = "\(id)"
}
self.interceptors.receive(.metadata(headers))
}

Expand Down Expand Up @@ -161,7 +170,7 @@ public final class ServerStreamingServerHandler<
let context = _StreamingResponseCallContext<Request, Response>(
eventLoop: self.context.eventLoop,
headers: headers,
logger: self.context.logger,
logger: self.logger,
userInfoRef: self.userInfoRef,
compressionIsEnabled: self.context.encoding.isEnabled,
closeFuture: self.context.closeFuture,
Expand Down
11 changes: 10 additions & 1 deletion Sources/GRPC/CallHandlers/UnaryServerHandler.swift
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import Logging
import NIOCore
import NIOHPACK

Expand Down Expand Up @@ -51,6 +52,9 @@ public final class UnaryServerHandler<
@usableFromInline
internal var state: State = .idle

@usableFromInline
internal var logger: Logger

@usableFromInline
internal enum State {
// Initial state. Nothing has happened yet.
Expand All @@ -77,6 +81,7 @@ public final class UnaryServerHandler<
self.serializer = responseSerializer
self.deserializer = requestDeserializer
self.context = context
self.logger = context.logger

let userInfoRef = Ref(UserInfo())
self.userInfoRef = userInfoRef
Expand All @@ -97,6 +102,10 @@ public final class UnaryServerHandler<

@inlinable
public func receiveMetadata(_ metadata: HPACKHeaders) {
if let extractor = self.context.traceIDExtractor, let id = extractor.extract(from: metadata) {
self.logger[metadataKey: extractor.loggerKey] = "\(id)"
self.interceptors.logger[metadataKey: extractor.loggerKey] = "\(id)"
}
self.interceptors.receive(.metadata(metadata))
}

Expand Down Expand Up @@ -159,7 +168,7 @@ public final class UnaryServerHandler<
let context = UnaryResponseCallContext<Response>(
eventLoop: self.context.eventLoop,
headers: headers,
logger: self.context.logger,
logger: self.logger,
userInfoRef: self.userInfoRef,
closeFuture: self.context.closeFuture
)
Expand Down
3 changes: 2 additions & 1 deletion Sources/GRPC/GRPCServerPipelineConfigurator.swift
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,8 @@ final class GRPCServerPipelineConfigurator: ChannelInboundHandler, RemovableChan
errorDelegate: self.configuration.errorDelegate,
normalizeHeaders: normalizeHeaders,
maximumReceiveMessageLength: self.configuration.maximumReceiveMessageLength,
logger: logger
logger: logger,
traceIDExtractor: self.configuration.traceIDExtractor
)
}

Expand Down
2 changes: 2 additions & 0 deletions Sources/GRPC/GRPCServerRequestRoutingHandler.swift
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ public struct CallHandlerContext {
internal var allocator: ByteBufferAllocator
@usableFromInline
internal var closeFuture: EventLoopFuture<Void>
@usableFromInline
internal var traceIDExtractor: Server.Configuration.TraceIDExtractor?
}

/// A call URI split into components.
Expand Down
8 changes: 6 additions & 2 deletions Sources/GRPC/HTTP2ToRawGRPCServerCodec.swift
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ internal final class HTTP2ToRawGRPCServerCodec: ChannelInboundHandler, GRPCServe
typealias OutboundOut = HTTP2Frame.FramePayload

private var logger: Logger
private let traceIDExtractor: Server.Configuration.TraceIDExtractor?
private var state: HTTP2ToRawGRPCStateMachine
private let errorDelegate: ServerErrorDelegate?
private var context: ChannelHandlerContext!
Expand Down Expand Up @@ -73,9 +74,11 @@ internal final class HTTP2ToRawGRPCServerCodec: ChannelInboundHandler, GRPCServe
errorDelegate: ServerErrorDelegate?,
normalizeHeaders: Bool,
maximumReceiveMessageLength: Int,
logger: Logger
logger: Logger,
traceIDExtractor: Server.Configuration.TraceIDExtractor?
) {
self.logger = logger
self.traceIDExtractor = traceIDExtractor
self.errorDelegate = errorDelegate
self.servicesByName = servicesByName
self.encoding = encoding
Expand Down Expand Up @@ -127,7 +130,8 @@ internal final class HTTP2ToRawGRPCServerCodec: ChannelInboundHandler, GRPCServe
closeFuture: context.channel.closeFuture,
services: self.servicesByName,
encoding: self.encoding,
normalizeHeaders: self.normalizeHeaders
normalizeHeaders: self.normalizeHeaders,
traceIDExtractor: self.traceIDExtractor
)

switch receiveHeaders {
Expand Down
18 changes: 12 additions & 6 deletions Sources/GRPC/HTTP2ToRawGRPCStateMachine.swift
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,8 @@ extension HTTP2ToRawGRPCStateMachine.State {
closeFuture: EventLoopFuture<Void>,
services: [Substring: CallHandlerProvider],
encoding: ServerMessageEncoding,
normalizeHeaders: Bool
normalizeHeaders: Bool,
traceIDExtractor: Server.Configuration.TraceIDExtractor?
) -> HTTP2ToRawGRPCStateMachine.StateAndReceiveHeadersAction {
// Extract and validate the content type. If it's nil we need to close.
guard let contentType = self.extractContentType(from: headers) else {
Expand Down Expand Up @@ -326,7 +327,8 @@ extension HTTP2ToRawGRPCStateMachine.State {
remoteAddress: remoteAddress,
responseWriter: responseWriter,
allocator: allocator,
closeFuture: closeFuture
closeFuture: closeFuture,
traceIDExtractor: traceIDExtractor
)

// We have a matching service, hopefully we have a provider for the method too.
Expand Down Expand Up @@ -865,7 +867,8 @@ extension HTTP2ToRawGRPCStateMachine {
closeFuture: EventLoopFuture<Void>,
services: [Substring: CallHandlerProvider],
encoding: ServerMessageEncoding,
normalizeHeaders: Bool
normalizeHeaders: Bool,
traceIDExtractor: Server.Configuration.TraceIDExtractor?
) -> ReceiveHeadersAction {
return self.withStateAvoidingCoWs { state in
state.receive(
Expand All @@ -879,7 +882,8 @@ extension HTTP2ToRawGRPCStateMachine {
closeFuture: closeFuture,
services: services,
encoding: encoding,
normalizeHeaders: normalizeHeaders
normalizeHeaders: normalizeHeaders,
traceIDExtractor: traceIDExtractor
)
}
}
Expand Down Expand Up @@ -974,7 +978,8 @@ extension HTTP2ToRawGRPCStateMachine.State {
closeFuture: EventLoopFuture<Void>,
services: [Substring: CallHandlerProvider],
encoding: ServerMessageEncoding,
normalizeHeaders: Bool
normalizeHeaders: Bool,
traceIDExtractor: Server.Configuration.TraceIDExtractor?
) -> HTTP2ToRawGRPCStateMachine.ReceiveHeadersAction {
switch self {
// These are the only states in which we can receive headers. Everything else is invalid.
Expand All @@ -991,7 +996,8 @@ extension HTTP2ToRawGRPCStateMachine.State {
closeFuture: closeFuture,
services: services,
encoding: encoding,
normalizeHeaders: normalizeHeaders
normalizeHeaders: normalizeHeaders,
traceIDExtractor: traceIDExtractor
)
self = stateAndAction.state
return stateAndAction.action
Expand Down
2 changes: 1 addition & 1 deletion Sources/GRPC/Interceptor/ServerInterceptorPipeline.swift
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ internal final class ServerInterceptorPipeline<Request, Response> {

/// A logger.
@usableFromInline
internal let logger: Logger
internal var logger: Logger

/// A reference to a 'UserInfo'.
@usableFromInline
Expand Down
Loading