Skip to content

Auto extract IDs from headers on the server #1510

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 11 additions & 3 deletions Package.swift
Original file line number Diff line number Diff line change
Expand Up @@ -51,19 +51,25 @@ let packageDependencies: [Package.Dependency] = [
from: "1.20.2"
),
.package(
url: "https://github.com/apple/swift-log.git",
from: "1.4.4"
// url: "https://github.com/apple/swift-log.git",
// from: "1.4.4"
url: "https://github.com/slashmo/swift-log.git",
branch: "feature/baggage"
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is now merged and available in 1.5.x

),
.package(
url: "https://github.com/apple/swift-argument-parser.git",
// Version is higher than in other Package@swift manifests: 1.1.0 raised the minimum Swift
// version and indluded async support.
// version and included async support.
from: "1.1.1"
),
.package(
url: "https://github.com/apple/swift-docc-plugin",
from: "1.0.0"
),
.package(
url: "https://github.com/apple/swift-distributed-tracing",
branch: "main"
),
].appending(
.package(
url: "https://github.com/apple/swift-nio-ssl.git",
Expand Down Expand Up @@ -114,6 +120,7 @@ extension Target.Dependency {
package: "swift-nio-transport-services"
)
static let logging: Self = .product(name: "Logging", package: "swift-log")
static let tracing: Self = .product(name: "Tracing", package: "swift-distributed-tracing")
static let protobuf: Self = .product(name: "SwiftProtobuf", package: "swift-protobuf")
static let protobufPluginLibrary: Self = .product(
name: "SwiftProtobufPluginLibrary",
Expand All @@ -139,6 +146,7 @@ extension Target {
.nioHTTP2,
.nioExtras,
.logging,
.tracing,
.protobuf,
].appending(
.nioSSL, if: includeNIOSSL
Expand Down
27 changes: 25 additions & 2 deletions Sources/GRPC/AsyncAwaitSupport/GRPCAsyncServerHandler.swift
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import DequeModule
import Logging
import NIOCore
import NIOHPACK
import Tracing

@available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
public struct GRPCAsyncServerHandler<
Expand Down Expand Up @@ -191,7 +192,15 @@ internal final class AsyncServerHandler<

/// A logger.
@usableFromInline
internal let logger: Logger
internal var logger: Logger

/// Contextual baggage which can be used to start tracing spans,
/// or carry additional contextual information through the handler.
@usableFromInline
internal var baggage: Baggage

@usableFromInline
internal let tracer: Tracing.Tracer? // FIXME: tri state "don't trace" / "global" / "configured"

/// A reference to the user info. This is shared with the interceptor pipeline and may be accessed
/// from the async call context. `UserInfo` is _not_ `Sendable` and must always be accessed from
Expand Down Expand Up @@ -267,6 +276,8 @@ internal final class AsyncServerHandler<
self.compressionEnabledOnRPC = context.encoding.isEnabled
self.compressResponsesIfPossible = true
self.logger = context.logger
self.baggage = .topLevel // TODO: or carry from context?
self.tracer = context.tracer

self.userInfoRef = Ref(UserInfo())
self.handlerStateMachine = .init()
Expand Down Expand Up @@ -295,6 +306,18 @@ internal final class AsyncServerHandler<
internal func receiveMetadata(_ headers: HPACKHeaders) {
switch self.interceptorStateMachine.interceptRequestMetadata() {
case .intercept:
if let tracer = self.tracer {
tracer.extract(headers, into: &baggage, using: HPACKHeadersExtractor())

if let metadata: Logger.Metadata = self.logger.metadataProvider?.metadata(baggage) { // FIXME: function naming a bit ugly here
for (k, v) in metadata {
self.logger[metadataKey: k] = v
self.interceptors?.logger[metadataKey: k] = v
// self.logger[metadataKey: extractor.loggerKey] = "\(id)"
// self.interceptors?.logger[metadataKey: extractor.loggerKey] = "\(id)"
}
}
}
self.interceptors?.receive(.metadata(headers))
case .cancel:
self.cancel(error: nil)
Expand All @@ -315,7 +338,7 @@ internal final class AsyncServerHandler<

switch self.interceptorStateMachine.interceptRequestMessage() {
case .intercept:
self.interceptors?.receive(.message(request))
self.interceptors?.receive(.message(request)) // TODO: if we wrapped this in a withSpan, would that be correct?
case .cancel:
self.cancel(error: nil)
case .drop:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,10 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import Logging
import NIOCore
import NIOHPACK
import Tracing

public final class BidirectionalStreamingServerHandler<
Serializer: MessageSerializer,
Expand Down Expand Up @@ -56,6 +58,12 @@ public final class BidirectionalStreamingServerHandler<
@usableFromInline
internal var state: State = .idle

@usableFromInline
internal var logger: Logger

@usableFromInline
internal var baggage: Baggage

@usableFromInline
internal enum State {
// No headers have been received.
Expand Down Expand Up @@ -85,6 +93,8 @@ public final class BidirectionalStreamingServerHandler<

let userInfoRef = Ref(UserInfo())
self.userInfoRef = userInfoRef
self.logger = context.logger
self.baggage = .topLevel
self.interceptors = ServerInterceptorPipeline(
logger: context.logger,
eventLoop: context.eventLoop,
Expand All @@ -102,6 +112,16 @@ public final class BidirectionalStreamingServerHandler<

@inlinable
public func receiveMetadata(_ headers: HPACKHeaders) {
if let tracer = self.context.tracer {
tracer.extract(headers, into: &self.baggage, using: HPACKHeadersExtractor())

if let metadata: Logger.Metadata = self.logger.metadataProvider?.metadata(baggage) { // FIXME: function naming a bit ugly here
for (k, v) in metadata {
self.logger[metadataKey: k] = v
self.interceptors?.logger[metadataKey: k] = v
}
}
}
self.interceptors.receive(.metadata(headers))
}

Expand Down Expand Up @@ -164,7 +184,7 @@ public final class BidirectionalStreamingServerHandler<
let context = _StreamingResponseCallContext<Request, Response>(
eventLoop: self.context.eventLoop,
headers: headers,
logger: self.context.logger,
logger: self.logger,
userInfoRef: self.userInfoRef,
compressionIsEnabled: self.context.encoding.isEnabled,
closeFuture: self.context.closeFuture,
Expand Down
24 changes: 23 additions & 1 deletion Sources/GRPC/CallHandlers/ClientStreamingServerHandler.swift
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,10 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import Logging
import NIOCore
import NIOHPACK
import Tracing

public final class ClientStreamingServerHandler<
Serializer: MessageSerializer,
Expand Down Expand Up @@ -56,6 +58,12 @@ public final class ClientStreamingServerHandler<
@usableFromInline
internal var state: State = .idle

@usableFromInline
internal var logger: Logger

@usableFromInline
internal var baggage: Baggage

@usableFromInline
internal enum State {
// Nothing has happened yet.
Expand Down Expand Up @@ -86,6 +94,8 @@ public final class ClientStreamingServerHandler<

let userInfoRef = Ref(UserInfo())
self.userInfoRef = userInfoRef
self.logger = context.logger
self.baggage = .topLevel
self.interceptors = ServerInterceptorPipeline(
logger: context.logger,
eventLoop: context.eventLoop,
Expand All @@ -103,6 +113,18 @@ public final class ClientStreamingServerHandler<

@inlinable
public func receiveMetadata(_ headers: HPACKHeaders) {
if let tracer = self.context.tracer {
var baggage = Baggage.topLevel
tracer.extract(headers, into: &baggage, using: HPACKHeadersExtractor())
self.baggage = baggage

if let metadata: Logger.Metadata = self.logger.metadataProvider?.metadata(baggage) { // FIXME: function naming a bit ugly here
for (k, v) in metadata {
self.logger[metadataKey: k] = v
self.interceptors?.logger[metadataKey: k] = v
}
}
}
self.interceptors.receive(.metadata(headers))
}

Expand Down Expand Up @@ -165,7 +187,7 @@ public final class ClientStreamingServerHandler<
let context = UnaryResponseCallContext<Response>(
eventLoop: self.context.eventLoop,
headers: headers,
logger: self.context.logger,
logger: self.logger,
userInfoRef: self.userInfoRef,
closeFuture: self.context.closeFuture
)
Expand Down
22 changes: 21 additions & 1 deletion Sources/GRPC/CallHandlers/ServerStreamingServerHandler.swift
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,10 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import Logging
import NIOCore
import NIOHPACK
import Tracing

public final class ServerStreamingServerHandler<
Serializer: MessageSerializer,
Expand Down Expand Up @@ -52,6 +54,12 @@ public final class ServerStreamingServerHandler<
@usableFromInline
internal var state: State = .idle

@usableFromInline
internal var logger: Logger

@usableFromInline
internal var baggage: Baggage

@usableFromInline
internal enum State {
// Initial state. Nothing has happened yet.
Expand Down Expand Up @@ -82,6 +90,8 @@ public final class ServerStreamingServerHandler<

let userInfoRef = Ref(UserInfo())
self.userInfoRef = userInfoRef
self.logger = context.logger
self.baggage = .topLevel
self.interceptors = ServerInterceptorPipeline(
logger: context.logger,
eventLoop: context.eventLoop,
Expand All @@ -99,6 +109,16 @@ public final class ServerStreamingServerHandler<

@inlinable
public func receiveMetadata(_ headers: HPACKHeaders) {
if let tracer = self.context.tracer {
tracer.extract(headers, into: &self.baggage, using: HPACKHeadersExtractor())

if let metadata: Logger.Metadata = self.logger.metadataProvider?.metadata(baggage) { // FIXME: function naming a bit ugly here
for (k, v) in metadata {
self.logger[metadataKey: k] = v
self.interceptors?.logger[metadataKey: k] = v
}
}
}
self.interceptors.receive(.metadata(headers))
}

Expand Down Expand Up @@ -161,7 +181,7 @@ public final class ServerStreamingServerHandler<
let context = _StreamingResponseCallContext<Request, Response>(
eventLoop: self.context.eventLoop,
headers: headers,
logger: self.context.logger,
logger: self.logger,
userInfoRef: self.userInfoRef,
compressionIsEnabled: self.context.encoding.isEnabled,
closeFuture: self.context.closeFuture,
Expand Down
22 changes: 21 additions & 1 deletion Sources/GRPC/CallHandlers/UnaryServerHandler.swift
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,10 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import Logging
import NIOCore
import NIOHPACK
import Tracing

public final class UnaryServerHandler<
Serializer: MessageSerializer,
Expand Down Expand Up @@ -51,6 +53,12 @@ public final class UnaryServerHandler<
@usableFromInline
internal var state: State = .idle

@usableFromInline
internal var logger: Logger

@usableFromInline
internal var baggage: Baggage

@usableFromInline
internal enum State {
// Initial state. Nothing has happened yet.
Expand All @@ -77,6 +85,8 @@ public final class UnaryServerHandler<
self.serializer = responseSerializer
self.deserializer = requestDeserializer
self.context = context
self.baggage = .topLevel // TODO: decide if rather we set it from somewhere here?
self.logger = context.logger

let userInfoRef = Ref(UserInfo())
self.userInfoRef = userInfoRef
Expand All @@ -97,6 +107,16 @@ public final class UnaryServerHandler<

@inlinable
public func receiveMetadata(_ metadata: HPACKHeaders) {
if let tracer = self.context.tracer {
tracer.extract(metadata, into: &baggage, using: HPACKHeadersExtractor())

if let metadata: Logger.Metadata = self.logger.metadataProvider?.metadata(baggage) { // FIXME: function naming a bit ugly here
for (k, v) in metadata {
self.logger[metadataKey: k] = v
self.interceptors?.logger[metadataKey: k] = v
}
}
}
self.interceptors.receive(.metadata(metadata))
}

Expand Down Expand Up @@ -159,7 +179,7 @@ public final class UnaryServerHandler<
let context = UnaryResponseCallContext<Response>(
eventLoop: self.context.eventLoop,
headers: headers,
logger: self.context.logger,
logger: self.logger,
userInfoRef: self.userInfoRef,
closeFuture: self.context.closeFuture
)
Expand Down
3 changes: 2 additions & 1 deletion Sources/GRPC/GRPCServerPipelineConfigurator.swift
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,8 @@ final class GRPCServerPipelineConfigurator: ChannelInboundHandler, RemovableChan
errorDelegate: self.configuration.errorDelegate,
normalizeHeaders: normalizeHeaders,
maximumReceiveMessageLength: self.configuration.maximumReceiveMessageLength,
logger: logger
logger: logger,
tracer: self.configuration.tracer
)
}

Expand Down
4 changes: 4 additions & 0 deletions Sources/GRPC/GRPCServerRequestRoutingHandler.swift
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import NIOHPACK
import NIOHTTP1
import NIOHTTP2
import SwiftProtobuf
import Tracing

/// Provides ``GRPCServerHandlerProtocol`` objects for the methods on a particular service name.
///
Expand All @@ -45,6 +46,7 @@ public struct CallHandlerContext {
internal var errorDelegate: ServerErrorDelegate?
@usableFromInline
internal var logger: Logger
// TODO: does this need baggage?
@usableFromInline
internal var encoding: ServerMessageEncoding
@usableFromInline
Expand All @@ -59,6 +61,8 @@ public struct CallHandlerContext {
internal var allocator: ByteBufferAllocator
@usableFromInline
internal var closeFuture: EventLoopFuture<Void>
@usableFromInline
internal var tracer: Tracing.Tracer?
}

/// A call URI split into components.
Expand Down
Loading