Skip to content

Commit c870864

Browse files
introduce statistics for producer
1 parent f8cb0a0 commit c870864

9 files changed

+390
-0
lines changed

Diff for: Package.swift

+2
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ let package = Package(
4747
// The zstd Swift package produces warnings that we cannot resolve:
4848
// https://github.com/facebook/zstd/issues/3328
4949
.package(url: "https://github.com/facebook/zstd.git", from: "1.5.0"),
50+
.package(url: "https://github.com/swift-extras/swift-extras-json.git", .upToNextMajor(from: "0.6.0")),
5051
],
5152
targets: [
5253
.target(
@@ -76,6 +77,7 @@ let package = Package(
7677
.product(name: "NIOCore", package: "swift-nio"),
7778
.product(name: "ServiceLifecycle", package: "swift-service-lifecycle"),
7879
.product(name: "Logging", package: "swift-log"),
80+
.product(name: "ExtrasJSON", package: "swift-extras-json"),
7981
]
8082
),
8183
.systemLibrary(

Diff for: Sources/SwiftKafka/Configuration/KafkaConfiguration.swift

+7
Original file line numberDiff line numberDiff line change
@@ -206,3 +206,10 @@ public enum KafkaConfiguration {
206206
public static let v6 = IPAddressFamily(description: "v6")
207207
}
208208
}
209+
210+
extension Duration {
211+
// Calculated total milliseconds
212+
internal var totalMilliseconds: Int64 {
213+
self.components.seconds * 1000 + self.components.attoseconds / 1_000_000_000_000_000
214+
}
215+
}

Diff for: Sources/SwiftKafka/Configuration/KafkaProducerConfiguration.swift

+11
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,11 @@ public struct KafkaProducerConfiguration {
2020
/// Default: `.milliseconds(100)`
2121
public var pollInterval: Duration = .milliseconds(100)
2222

23+
/// Interval for librdkafka statistics reports
24+
/// 0ms - disabled
25+
/// >= 1ms - statistics provided every specified interval
26+
public var statisticsInterval: Duration = .zero
27+
2328
/// Maximum timeout for flushing outstanding produce requests when the ``KakfaProducer`` is shutting down.
2429
/// Default: `10000`
2530
public var flushTimeoutMilliseconds: Int = 10000 {
@@ -107,6 +112,12 @@ extension KafkaProducerConfiguration {
107112
internal var dictionary: [String: String] {
108113
var resultDict: [String: String] = [:]
109114

115+
// we only check that it is 0 or >=1 ms, librdkafka checks for negativity
116+
// in both debug and release
117+
// FIXME: should we make `get throws` and throw exception instead of assert?
118+
assert(self.statisticsInterval == .zero || self.statisticsInterval > Duration.milliseconds(1), "Statistics interval must be expressed in milliseconds")
119+
resultDict["statistics.interval.ms"] = String(self.statisticsInterval.totalMilliseconds)
120+
110121
resultDict["enable.idempotence"] = String(self.enableIdempotence)
111122
resultDict["queue.buffering.max.messages"] = String(self.queue.bufferingMaxMessages)
112123
resultDict["queue.buffering.max.kbytes"] = String(self.queue.bufferingMaxKBytes)

Diff for: Sources/SwiftKafka/KafkaProducerEvent.swift

+4
Original file line numberDiff line numberDiff line change
@@ -16,13 +16,17 @@
1616
public enum KafkaProducerEvent: Sendable, Hashable {
1717
/// A collection of delivery reports received from the Kafka cluster indicating the status of produced messages.
1818
case deliveryReports([KafkaDeliveryReport])
19+
/// Statistics from librdkafka
20+
case statistics(KafkaStatistics)
1921
/// - Important: Always provide a `default` case when switching over this `enum`.
2022
case DO_NOT_SWITCH_OVER_THIS_EXHAUSITVELY
2123

2224
internal init(_ event: RDKafkaClient.KafkaEvent) {
2325
switch event {
2426
case .deliveryReport(results: let results):
2527
self = .deliveryReports(results)
28+
case .statistics(let stat):
29+
self = .statistics(stat)
2630
case .consumerMessages:
2731
fatalError("Cannot cast \(event) to KafkaProducerEvent")
2832
}

Diff for: Sources/SwiftKafka/RDKafka/RDKafkaClient.swift

+8
Original file line numberDiff line numberDiff line change
@@ -136,6 +136,7 @@ final class RDKafkaClient: Sendable {
136136
enum KafkaEvent {
137137
case deliveryReport(results: [KafkaDeliveryReport])
138138
case consumerMessages(result: Result<KafkaConsumerMessage, Error>)
139+
case statistics(KafkaStatistics)
139140
}
140141

141142
/// Poll the event `rd_kafka_queue_t` for new events.
@@ -166,6 +167,8 @@ final class RDKafkaClient: Sendable {
166167
self.handleLogEvent(event)
167168
case .offsetCommit:
168169
self.handleOffsetCommitEvent(event)
170+
case .statistics:
171+
events.append(self.handleStatistics(event))
169172
case .none:
170173
// Finished reading events, return early
171174
return events
@@ -217,6 +220,11 @@ final class RDKafkaClient: Sendable {
217220
// The returned message(s) MUST NOT be freed with rd_kafka_message_destroy().
218221
}
219222

223+
private func handleStatistics(_ event: OpaquePointer?) -> KafkaEvent {
224+
let jsonStr = String(cString: rd_kafka_event_stats(event))
225+
return .statistics(KafkaStatistics(jsonString: jsonStr))
226+
}
227+
220228
/// Handle event of type `RDKafkaEvent.log`.
221229
///
222230
/// - Parameter event: Pointer to underlying `rd_kafka_event_t`.

Diff for: Sources/SwiftKafka/Utilities/KafkaStatistics.swift

+25
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
//===----------------------------------------------------------------------===//
2+
//
3+
// This source file is part of the swift-kafka-gsoc open source project
4+
//
5+
// Copyright (c) 2022 Apple Inc. and the swift-kafka-gsoc project authors
6+
// Licensed under Apache License v2.0
7+
//
8+
// See LICENSE.txt for license information
9+
// See CONTRIBUTORS.txt for the list of swift-kafka-gsoc project authors
10+
//
11+
// SPDX-License-Identifier: Apache-2.0
12+
//
13+
//===----------------------------------------------------------------------===//
14+
15+
import ExtrasJSON
16+
17+
public struct KafkaStatistics: Sendable, Hashable {
18+
public let jsonString: String
19+
20+
public var json: KafkaStatisticsJson {
21+
get throws {
22+
return try XJSONDecoder().decode(KafkaStatisticsJson.self, from: self.jsonString.utf8)
23+
}
24+
}
25+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,177 @@
1+
//===----------------------------------------------------------------------===//
2+
//
3+
// This source file is part of the swift-kafka-gsoc open source project
4+
//
5+
// Copyright (c) 2022 Apple Inc. and the swift-kafka-gsoc project authors
6+
// Licensed under Apache License v2.0
7+
//
8+
// See LICENSE.txt for license information
9+
// See CONTRIBUTORS.txt for the list of swift-kafka-gsoc project authors
10+
//
11+
// SPDX-License-Identifier: Apache-2.0
12+
//
13+
//===----------------------------------------------------------------------===//
14+
15+
// This file was generated from JSON Schema using quicktype, do not modify it directly.
16+
// To parse the JSON, add this file to your project and do:
17+
//
18+
// let statistics = try? newJSONDecoder().decode(KafkaStatisticsJsonModel.self, from: jsonData)
19+
20+
// MARK: - Statistics
21+
22+
public struct KafkaStatisticsJson: Hashable, Codable {
23+
let name, clientID, type: String?
24+
let ts, time, age, replyq: Int?
25+
let msgCnt, msgSize, msgMax, msgSizeMax: Int?
26+
let simpleCnt, metadataCacheCnt: Int?
27+
let brokers: [String: Broker]?
28+
let topics: [String: Topic]?
29+
let cgrp: Cgrp?
30+
let tx, txBytes, rx, rxBytes: Int?
31+
let txmsgs, txmsgBytes, rxmsgs, rxmsgBytes: Int?
32+
33+
enum CodingKeys: String, CodingKey {
34+
case name
35+
case clientID = "client_id"
36+
case type, ts, time, age, replyq
37+
case msgCnt = "msg_cnt"
38+
case msgSize = "msg_size"
39+
case msgMax = "msg_max"
40+
case msgSizeMax = "msg_size_max"
41+
case simpleCnt = "simple_cnt"
42+
case metadataCacheCnt = "metadata_cache_cnt"
43+
case brokers, topics, cgrp, tx
44+
case txBytes = "tx_bytes"
45+
case rx
46+
case rxBytes = "rx_bytes"
47+
case txmsgs
48+
case txmsgBytes = "txmsg_bytes"
49+
case rxmsgs
50+
case rxmsgBytes = "rxmsg_bytes"
51+
}
52+
}
53+
54+
// MARK: - Broker
55+
56+
public struct Broker: Hashable, Codable {
57+
let name: String?
58+
let nodeid: Int?
59+
let nodename, source, state: String?
60+
let stateage, outbufCnt, outbufMsgCnt, waitrespCnt: Int?
61+
let waitrespMsgCnt, tx, txbytes, txerrs: Int?
62+
let txretries, txidle, reqTimeouts, rx: Int?
63+
let rxbytes, rxerrs, rxcorriderrs, rxpartial: Int?
64+
let rxidle, zbufGrow, bufGrow, wakeups: Int?
65+
let connects, disconnects: Int?
66+
let intLatency, outbufLatency, rtt, throttle: [String: Int]?
67+
let req: [String: Int]?
68+
let toppars: [String: Toppar]?
69+
70+
enum CodingKeys: String, CodingKey {
71+
case name, nodeid, nodename, source, state, stateage
72+
case outbufCnt = "outbuf_cnt"
73+
case outbufMsgCnt = "outbuf_msg_cnt"
74+
case waitrespCnt = "waitresp_cnt"
75+
case waitrespMsgCnt = "waitresp_msg_cnt"
76+
case tx, txbytes, txerrs, txretries, txidle
77+
case reqTimeouts = "req_timeouts"
78+
case rx, rxbytes, rxerrs, rxcorriderrs, rxpartial, rxidle
79+
case zbufGrow = "zbuf_grow"
80+
case bufGrow = "buf_grow"
81+
case wakeups, connects, disconnects
82+
case intLatency = "int_latency"
83+
case outbufLatency = "outbuf_latency"
84+
case rtt, throttle, req, toppars
85+
}
86+
}
87+
88+
// MARK: - Toppars
89+
90+
struct Toppar: Hashable, Codable {
91+
let topic: String?
92+
let partition: Int?
93+
94+
enum CodingKeys: String, CodingKey {
95+
case topic, partition
96+
}
97+
}
98+
99+
// MARK: - Cgrp
100+
101+
struct Cgrp: Hashable, Codable {
102+
let state: String?
103+
let stateage: Int?
104+
let joinState: String?
105+
let rebalanceAge, rebalanceCnt: Int?
106+
let rebalanceReason: String?
107+
let assignmentSize: Int?
108+
109+
enum CodingKeys: String, CodingKey {
110+
case state, stateage
111+
case joinState = "join_state"
112+
case rebalanceAge = "rebalance_age"
113+
case rebalanceCnt = "rebalance_cnt"
114+
case rebalanceReason = "rebalance_reason"
115+
case assignmentSize = "assignment_size"
116+
}
117+
}
118+
119+
// MARK: - Topic
120+
121+
struct Topic: Hashable, Codable {
122+
let topic: String?
123+
let age, metadataAge: Int?
124+
let batchsize, batchcnt: [String: Int]?
125+
let partitions: [String: Partition]?
126+
127+
enum CodingKeys: String, CodingKey {
128+
case topic, age
129+
case metadataAge = "metadata_age"
130+
case batchsize, batchcnt, partitions
131+
}
132+
}
133+
134+
// MARK: - Partition
135+
136+
struct Partition: Hashable, Codable {
137+
let partition, broker, leader: Int?
138+
let desired, unknown: Bool?
139+
let msgqCnt, msgqBytes, xmitMsgqCnt, xmitMsgqBytes: Int?
140+
let fetchqCnt, fetchqSize: Int?
141+
let fetchState: String?
142+
let queryOffset, nextOffset, appOffset, storedOffset: Int?
143+
let commitedOffset, committedOffset, eofOffset, loOffset: Int?
144+
let hiOffset, lsOffset, consumerLag, consumerLagStored: Int?
145+
let txmsgs, txbytes, rxmsgs, rxbytes: Int?
146+
let msgs, rxVerDrops, msgsInflight, nextACKSeq: Int?
147+
let nextErrSeq, ackedMsgid: Int?
148+
149+
enum CodingKeys: String, CodingKey {
150+
case partition, broker, leader, desired, unknown
151+
case msgqCnt = "msgq_cnt"
152+
case msgqBytes = "msgq_bytes"
153+
case xmitMsgqCnt = "xmit_msgq_cnt"
154+
case xmitMsgqBytes = "xmit_msgq_bytes"
155+
case fetchqCnt = "fetchq_cnt"
156+
case fetchqSize = "fetchq_size"
157+
case fetchState = "fetch_state"
158+
case queryOffset = "query_offset"
159+
case nextOffset = "next_offset"
160+
case appOffset = "app_offset"
161+
case storedOffset = "stored_offset"
162+
case commitedOffset = "commited_offset"
163+
case committedOffset = "committed_offset"
164+
case eofOffset = "eof_offset"
165+
case loOffset = "lo_offset"
166+
case hiOffset = "hi_offset"
167+
case lsOffset = "ls_offset"
168+
case consumerLag = "consumer_lag"
169+
case consumerLagStored = "consumer_lag_stored"
170+
case txmsgs, txbytes, rxmsgs, rxbytes, msgs
171+
case rxVerDrops = "rx_ver_drops"
172+
case msgsInflight = "msgs_inflight"
173+
case nextACKSeq = "next_ack_seq"
174+
case nextErrSeq = "next_err_seq"
175+
case ackedMsgid = "acked_msgid"
176+
}
177+
}

0 commit comments

Comments
 (0)