Skip to content

Commit c3bbb75

Browse files
Feature: expose librdkafka statistics as swift metrics (#92)
* introduce statistics for producer * add statistics to new consumer with events * fix some artefacts * adjust to KeyRefreshAttempts * draft: statistics with metrics * make structures internal * Update Sources/Kafka/Configuration/KafkaConfiguration+Metrics.swift Co-authored-by: Felix Schlegel <[email protected]> * Update Sources/Kafka/Configuration/KafkaConsumerConfiguration.swift Co-authored-by: Felix Schlegel <[email protected]> * Update Sources/Kafka/Configuration/KafkaConfiguration+Metrics.swift Co-authored-by: Felix Schlegel <[email protected]> * Update Sources/Kafka/Configuration/KafkaConfiguration+Metrics.swift Co-authored-by: Felix Schlegel <[email protected]> * address review comments * formatting * map gauges in one place * move json mode as rd kafka statistics, misc renaming + docc * address review comments * remove import Metrics * divide producer/consumer configuration * apply swiftformat * fix code after conflicts * fix formatting --------- Co-authored-by: Felix Schlegel <[email protected]>
1 parent f1800c2 commit c3bbb75

12 files changed

+357
-7
lines changed

Diff for: Package.swift

+6-1
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ let package = Package(
4949
.package(url: "https://github.com/apple/swift-nio-ssl", from: "2.25.0"),
5050
.package(url: "https://github.com/swift-server/swift-service-lifecycle.git", from: "2.1.0"),
5151
.package(url: "https://github.com/apple/swift-log.git", from: "1.0.0"),
52+
.package(url: "https://github.com/apple/swift-metrics", from: "2.4.1"),
5253
// The zstd Swift package produces warnings that we cannot resolve:
5354
// https://github.com/facebook/zstd/issues/3328
5455
.package(url: "https://github.com/facebook/zstd.git", from: "1.5.0"),
@@ -83,6 +84,7 @@ let package = Package(
8384
.product(name: "NIOCore", package: "swift-nio"),
8485
.product(name: "ServiceLifecycle", package: "swift-service-lifecycle"),
8586
.product(name: "Logging", package: "swift-log"),
87+
.product(name: "Metrics", package: "swift-metrics"),
8688
]
8789
),
8890
.target(
@@ -93,7 +95,10 @@ let package = Package(
9395
),
9496
.testTarget(
9597
name: "KafkaTests",
96-
dependencies: ["Kafka"]
98+
dependencies: [
99+
"Kafka",
100+
.product(name: "MetricsTestKit", package: "swift-metrics"),
101+
]
97102
),
98103
.testTarget(
99104
name: "IntegrationTests",
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,146 @@
1+
//===----------------------------------------------------------------------===//
2+
//
3+
// This source file is part of the swift-kafka-client open source project
4+
//
5+
// Copyright (c) 2023 Apple Inc. and the swift-kafka-client project authors
6+
// Licensed under Apache License v2.0
7+
//
8+
// See LICENSE.txt for license information
9+
// See CONTRIBUTORS.txt for the list of swift-kafka-client project authors
10+
//
11+
// SPDX-License-Identifier: Apache-2.0
12+
//
13+
//===----------------------------------------------------------------------===//
14+
15+
import Metrics
16+
17+
extension KafkaConfiguration {
18+
// MARK: - Metrics
19+
20+
/// Configuration for the consumer metrics emitted by `SwiftKafka`.
21+
public struct ConsumerMetrics: Sendable {
22+
internal var enabled: Bool {
23+
self.updateInterval != nil &&
24+
(self.queuedOperation != nil ||
25+
self.totalKafkaBrokerRequests != nil ||
26+
self.totalKafkaBrokerBytesSent != nil ||
27+
self.totalKafkaBrokerResponses != nil ||
28+
self.totalKafkaBrokerResponsesSize != nil ||
29+
self.totalKafkaBrokerMessagesBytesRecieved != nil ||
30+
self.topicsInMetadataCache != nil)
31+
}
32+
33+
/// Update interval for statistics.
34+
public var updateInterval: Duration?
35+
36+
/// Number of operations (callbacks, events, etc) waiting in the queue.
37+
public var queuedOperation: Gauge?
38+
39+
/// Total number of requests sent to Kafka brokers.
40+
public var totalKafkaBrokerRequests: Gauge?
41+
/// Total number of bytes transmitted to Kafka brokers.
42+
public var totalKafkaBrokerBytesSent: Gauge?
43+
/// Total number of responses received from Kafka brokers.
44+
public var totalKafkaBrokerResponses: Gauge?
45+
/// Total number of bytes received from Kafka brokers.
46+
public var totalKafkaBrokerResponsesSize: Gauge?
47+
48+
/// Total number of messages consumed, not including ignored messages (due to offset, etc), from Kafka brokers.
49+
public var totalKafkaBrokerMessagesRecieved: Gauge?
50+
/// Total number of message bytes (including framing) received from Kafka brokers.
51+
public var totalKafkaBrokerMessagesBytesRecieved: Gauge?
52+
53+
/// Number of topics in the metadata cache.
54+
public var topicsInMetadataCache: Gauge?
55+
56+
private static func record<T: BinaryInteger>(_ value: T?, to: Gauge?) {
57+
guard let value,
58+
let to else {
59+
return
60+
}
61+
to.record(value)
62+
}
63+
64+
internal func update(with rdKafkaStatistics: RDKafkaStatistics) {
65+
Self.record(rdKafkaStatistics.queuedOperation, to: self.queuedOperation)
66+
67+
Self.record(rdKafkaStatistics.totalKafkaBrokerRequests, to: self.totalKafkaBrokerRequests)
68+
Self.record(rdKafkaStatistics.totalKafkaBrokerBytesSent, to: self.totalKafkaBrokerBytesSent)
69+
Self.record(rdKafkaStatistics.totalKafkaBrokerResponses, to: self.totalKafkaBrokerResponses)
70+
Self.record(rdKafkaStatistics.totalKafkaBrokerResponsesSize, to: self.totalKafkaBrokerResponsesSize)
71+
72+
Self.record(rdKafkaStatistics.totalKafkaBrokerMessagesRecieved, to: self.totalKafkaBrokerMessagesRecieved)
73+
Self.record(rdKafkaStatistics.totalKafkaBrokerMessagesBytesRecieved, to: self.totalKafkaBrokerMessagesBytesRecieved)
74+
75+
Self.record(rdKafkaStatistics.topicsInMetadataCache, to: self.topicsInMetadataCache)
76+
}
77+
}
78+
79+
/// Configuration for the producer metrics emitted by `SwiftKafka`.
80+
public struct ProducerMetrics: Sendable {
81+
internal var enabled: Bool {
82+
self.updateInterval != nil &&
83+
(self.queuedOperation != nil ||
84+
self.queuedProducerMessages != nil ||
85+
self.queuedProducerMessagesSize != nil ||
86+
self.totalKafkaBrokerRequests != nil ||
87+
self.totalKafkaBrokerBytesSent != nil ||
88+
self.totalKafkaBrokerResponses != nil ||
89+
self.totalKafkaBrokerResponsesSize != nil ||
90+
self.totalKafkaBrokerMessagesSent != nil ||
91+
self.totalKafkaBrokerMessagesBytesSent != nil ||
92+
self.topicsInMetadataCache != nil)
93+
}
94+
95+
/// Update interval for statistics.
96+
public var updateInterval: Duration?
97+
98+
/// Number of operations (callbacks, events, etc) waiting in the queue.
99+
public var queuedOperation: Gauge?
100+
/// Current number of queued producer messages.
101+
public var queuedProducerMessages: Gauge?
102+
/// Current total size in bytes of queued producer messages.
103+
public var queuedProducerMessagesSize: Gauge?
104+
105+
/// Total number of requests sent to Kafka brokers.
106+
public var totalKafkaBrokerRequests: Gauge?
107+
/// Total number of bytes transmitted to Kafka brokers.
108+
public var totalKafkaBrokerBytesSent: Gauge?
109+
/// Total number of responses received from Kafka brokers.
110+
public var totalKafkaBrokerResponses: Gauge?
111+
/// Total number of bytes received from Kafka brokers.
112+
public var totalKafkaBrokerResponsesSize: Gauge?
113+
114+
/// Total number of messages transmitted (produced) to Kafka brokers.
115+
public var totalKafkaBrokerMessagesSent: Gauge?
116+
/// Total number of message bytes (including framing, such as per-Message framing and MessageSet/batch framing) transmitted to Kafka brokers.
117+
public var totalKafkaBrokerMessagesBytesSent: Gauge?
118+
119+
/// Number of topics in the metadata cache.
120+
public var topicsInMetadataCache: Gauge?
121+
122+
private static func record<T: BinaryInteger>(_ value: T?, to: Gauge?) {
123+
guard let value,
124+
let to else {
125+
return
126+
}
127+
to.record(value)
128+
}
129+
130+
internal func update(with rdKafkaStatistics: RDKafkaStatistics) {
131+
Self.record(rdKafkaStatistics.queuedOperation, to: self.queuedOperation)
132+
Self.record(rdKafkaStatistics.queuedProducerMessages, to: self.queuedProducerMessages)
133+
Self.record(rdKafkaStatistics.queuedProducerMessagesSize, to: self.queuedProducerMessagesSize)
134+
135+
Self.record(rdKafkaStatistics.totalKafkaBrokerRequests, to: self.totalKafkaBrokerRequests)
136+
Self.record(rdKafkaStatistics.totalKafkaBrokerBytesSent, to: self.totalKafkaBrokerBytesSent)
137+
Self.record(rdKafkaStatistics.totalKafkaBrokerResponses, to: self.totalKafkaBrokerResponses)
138+
Self.record(rdKafkaStatistics.totalKafkaBrokerResponsesSize, to: self.totalKafkaBrokerResponsesSize)
139+
140+
Self.record(rdKafkaStatistics.totalKafkaBrokerMessagesSent, to: self.totalKafkaBrokerMessagesSent)
141+
Self.record(rdKafkaStatistics.totalKafkaBrokerMessagesBytesSent, to: self.totalKafkaBrokerMessagesBytesSent)
142+
143+
Self.record(rdKafkaStatistics.topicsInMetadataCache, to: self.topicsInMetadataCache)
144+
}
145+
}
146+
}

Diff for: Sources/Kafka/Configuration/KafkaConsumerConfiguration.swift

+8
Original file line numberDiff line numberDiff line change
@@ -238,6 +238,9 @@ public struct KafkaConsumerConfiguration {
238238
/// Reconnect options.
239239
public var reconnect: KafkaConfiguration.ReconnectOptions = .init()
240240

241+
/// Options for librdkafka metrics updates
242+
public var metrics: KafkaConfiguration.ConsumerMetrics = .init()
243+
241244
/// Security protocol to use (plaintext, ssl, sasl_plaintext, sasl_ssl).
242245
/// Default: `.plaintext`
243246
public var securityProtocol: KafkaConfiguration.SecurityProtocol = .plaintext
@@ -302,6 +305,11 @@ extension KafkaConsumerConfiguration {
302305
resultDict["reconnect.backoff.ms"] = String(reconnect.backoff.rawValue)
303306
resultDict["reconnect.backoff.max.ms"] = String(reconnect.maximumBackoff.inMilliseconds)
304307

308+
if self.metrics.enabled,
309+
let updateInterval = self.metrics.updateInterval {
310+
resultDict["statistics.interval.ms"] = String(updateInterval.inMilliseconds)
311+
}
312+
305313
// Merge with SecurityProtocol configuration dictionary
306314
resultDict.merge(securityProtocol.dictionary) { _, _ in
307315
fatalError("securityProtocol and \(#file) should not have duplicate keys")

Diff for: Sources/Kafka/Configuration/KafkaProducerConfiguration.swift

+8
Original file line numberDiff line numberDiff line change
@@ -161,6 +161,9 @@ public struct KafkaProducerConfiguration {
161161
/// Reconnect options.
162162
public var reconnect: KafkaConfiguration.ReconnectOptions = .init()
163163

164+
/// Options for librdkafka metrics updates
165+
public var metrics: KafkaConfiguration.ProducerMetrics = .init()
166+
164167
/// Security protocol to use (plaintext, ssl, sasl_plaintext, sasl_ssl).
165168
/// Default: `.plaintext`
166169
public var securityProtocol: KafkaConfiguration.SecurityProtocol = .plaintext
@@ -212,6 +215,11 @@ extension KafkaProducerConfiguration {
212215
resultDict["reconnect.backoff.ms"] = String(self.reconnect.backoff.rawValue)
213216
resultDict["reconnect.backoff.max.ms"] = String(self.reconnect.maximumBackoff.inMilliseconds)
214217

218+
if self.metrics.enabled,
219+
let updateInterval = self.metrics.updateInterval {
220+
resultDict["statistics.interval.ms"] = String(updateInterval.inMilliseconds)
221+
}
222+
215223
// Merge with SecurityProtocol configuration dictionary
216224
resultDict.merge(self.securityProtocol.dictionary) { _, _ in
217225
fatalError("securityProtocol and \(#file) should not have duplicate keys")

Diff for: Sources/Kafka/KafkaConsumer.swift

+16-1
Original file line numberDiff line numberDiff line change
@@ -146,6 +146,7 @@ public final class KafkaConsumer: Sendable, Service {
146146
NIOAsyncSequenceProducerBackPressureStrategies.HighLowWatermark,
147147
KafkaConsumerMessagesDelegate
148148
>
149+
149150
/// The configuration object of the consumer client.
150151
private let configuration: KafkaConsumerConfiguration
151152
/// A logger.
@@ -222,6 +223,9 @@ public final class KafkaConsumer: Sendable, Service {
222223
if configuration.isAutoCommitEnabled == false {
223224
subscribedEvents.append(.offsetCommit)
224225
}
226+
if configuration.metrics.enabled {
227+
subscribedEvents.append(.statistics)
228+
}
225229

226230
let client = try RDKafkaClient.makeClient(
227231
type: .consumer,
@@ -262,6 +266,9 @@ public final class KafkaConsumer: Sendable, Service {
262266
if configuration.isAutoCommitEnabled == false {
263267
subscribedEvents.append(.offsetCommit)
264268
}
269+
if configuration.metrics.enabled {
270+
subscribedEvents.append(.statistics)
271+
}
265272

266273
let client = try RDKafkaClient.makeClient(
267274
type: .consumer,
@@ -374,7 +381,15 @@ public final class KafkaConsumer: Sendable, Service {
374381
switch nextAction {
375382
case .pollForEvents(let client):
376383
// Event poll to serve any events queued inside of `librdkafka`.
377-
_ = client.eventPoll()
384+
let events = client.eventPoll()
385+
for event in events {
386+
switch event {
387+
case .statistics(let statistics):
388+
self.configuration.metrics.update(with: statistics)
389+
default:
390+
break
391+
}
392+
}
378393
try await Task.sleep(for: self.configuration.pollInterval)
379394
case .terminatePollLoop:
380395
return

Diff for: Sources/Kafka/KafkaConsumerEvent.swift

+2
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@ public enum KafkaConsumerEvent: Sendable, Hashable {
1919

2020
internal init(_ event: RDKafkaClient.KafkaEvent) {
2121
switch event {
22+
case .statistics:
23+
fatalError("Cannot cast \(event) to KafkaConsumerEvent")
2224
case .deliveryReport:
2325
fatalError("Cannot cast \(event) to KafkaConsumerEvent")
2426
}

Diff for: Sources/Kafka/KafkaProducer.swift

+20-5
Original file line numberDiff line numberDiff line change
@@ -116,10 +116,16 @@ public final class KafkaProducer: Service, Sendable {
116116
) throws {
117117
let stateMachine = NIOLockedValueBox(StateMachine(logger: logger))
118118

119+
var subscribedEvents: [RDKafkaEvent] = [.log] // No .deliveryReport here!
120+
121+
if configuration.metrics.enabled {
122+
subscribedEvents.append(.statistics)
123+
}
124+
119125
let client = try RDKafkaClient.makeClient(
120126
type: .producer,
121127
configDictionary: configuration.dictionary,
122-
events: [.log], // No .deliveryReport here!
128+
events: subscribedEvents,
123129
logger: logger
124130
)
125131

@@ -156,10 +162,16 @@ public final class KafkaProducer: Service, Sendable {
156162
) throws -> (KafkaProducer, KafkaProducerEvents) {
157163
let stateMachine = NIOLockedValueBox(StateMachine(logger: logger))
158164

165+
var subscribedEvents: [RDKafkaEvent] = [.log, .deliveryReport]
166+
// Listen to statistics events when statistics enabled
167+
if configuration.metrics.enabled {
168+
subscribedEvents.append(.statistics)
169+
}
170+
159171
let client = try RDKafkaClient.makeClient(
160172
type: .producer,
161173
configDictionary: configuration.dictionary,
162-
events: [.log, .deliveryReport],
174+
events: subscribedEvents,
163175
logger: logger
164176
)
165177

@@ -212,9 +224,12 @@ public final class KafkaProducer: Service, Sendable {
212224
case .pollAndYield(let client, let source):
213225
let events = client.eventPoll()
214226
for event in events {
215-
let producerEvent = KafkaProducerEvent(event)
216-
// Ignore YieldResult as we don't support back pressure in KafkaProducer
217-
_ = source?.yield(producerEvent)
227+
switch event {
228+
case .statistics(let statistics):
229+
self.configuration.metrics.update(with: statistics)
230+
case .deliveryReport(let reports):
231+
_ = source?.yield(.deliveryReports(reports))
232+
}
218233
}
219234
try await Task.sleep(for: self.configuration.pollInterval)
220235
case .flushFinishSourceAndTerminatePollLoop(let client, let source):

Diff for: Sources/Kafka/KafkaProducerEvent.swift

+2
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@ public enum KafkaProducerEvent: Sendable, Hashable {
2323
switch event {
2424
case .deliveryReport(results: let results):
2525
self = .deliveryReports(results)
26+
case .statistics:
27+
fatalError("Cannot cast \(event) to KafkaProducerEvent")
2628
}
2729
}
2830
}

Diff for: Sources/Kafka/RDKafka/RDKafkaClient.swift

+22
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414

1515
import Crdkafka
1616
import Dispatch
17+
import class Foundation.JSONDecoder
1718
import Logging
1819

1920
/// Base class for ``KafkaProducer`` and ``KafkaConsumer``,
@@ -295,6 +296,7 @@ final class RDKafkaClient: Sendable {
295296
/// Swift wrapper for events from `librdkafka`'s event queue.
296297
enum KafkaEvent {
297298
case deliveryReport(results: [KafkaDeliveryReport])
299+
case statistics(RDKafkaStatistics)
298300
}
299301

300302
/// Poll the event `rd_kafka_queue_t` for new events.
@@ -321,6 +323,10 @@ final class RDKafkaClient: Sendable {
321323
self.handleLogEvent(event)
322324
case .offsetCommit:
323325
self.handleOffsetCommitEvent(event)
326+
case .statistics:
327+
if let forwardEvent = self.handleStatistics(event) {
328+
events.append(forwardEvent)
329+
}
324330
case .none:
325331
// Finished reading events, return early
326332
return events
@@ -352,6 +358,22 @@ final class RDKafkaClient: Sendable {
352358
return .deliveryReport(results: deliveryReportResults)
353359
}
354360

361+
/// Handle event of type `RDKafkaEvent.statistics`.
362+
///
363+
/// - Parameter event: Pointer to underlying `rd_kafka_event_t`.
364+
private func handleStatistics(_ event: OpaquePointer?) -> KafkaEvent? {
365+
let jsonStr = String(cString: rd_kafka_event_stats(event))
366+
do {
367+
if let jsonData = jsonStr.data(using: .utf8) {
368+
let json = try JSONDecoder().decode(RDKafkaStatistics.self, from: jsonData)
369+
return .statistics(json)
370+
}
371+
} catch {
372+
assertionFailure("Error occurred when decoding JSON statistics: \(error) when decoding \(jsonStr)")
373+
}
374+
return nil
375+
}
376+
355377
/// Handle event of type `RDKafkaEvent.log`.
356378
///
357379
/// - Parameter event: Pointer to underlying `rd_kafka_event_t`.

0 commit comments

Comments
 (0)