Skip to content

Commit 1f2076f

Browse files
blindspotbountyfelixschlegelaxelanderssonFranzBuschSamsv77
authored
chore(patch): [sc-12737] update from upstream (#35)
* Feature: expose librdkafka statistics as swift metrics (swift-server#92) * introduce statistics for producer * add statistics to new consumer with events * fix some artefacts * adjust to KeyRefreshAttempts * draft: statistics with metrics * make structures internal * Update Sources/Kafka/Configuration/KafkaConfiguration+Metrics.swift Co-authored-by: Felix Schlegel <[email protected]> * Update Sources/Kafka/Configuration/KafkaConsumerConfiguration.swift Co-authored-by: Felix Schlegel <[email protected]> * Update Sources/Kafka/Configuration/KafkaConfiguration+Metrics.swift Co-authored-by: Felix Schlegel <[email protected]> * Update Sources/Kafka/Configuration/KafkaConfiguration+Metrics.swift Co-authored-by: Felix Schlegel <[email protected]> * address review comments * formatting * map gauges in one place * move json mode as rd kafka statistics, misc renaming + docc * address review comments * remove import Metrics * divide producer/consumer configuration * apply swiftformat * fix code after conflicts * fix formatting --------- Co-authored-by: Felix Schlegel <[email protected]> * Add benchmark infratructure without actual tests (swift-server#146) * add benchmark infratructure without actual test * apply swiftformat * fix header in sh file * use new async seq methods * Update to latest librdkafka & add a define for RAND_priv_bytes (swift-server#148) Co-authored-by: Franz Busch <[email protected]> * exit from consumer batch loop when no more messages left (swift-server#153) * Lower requirements for consumer state machine (swift-server#154) * lower requirements for kafka consumer * add twin test for kafka producer * defer source.finish (swift-server#157) * Add two consumer benchmark (swift-server#149) * benchmark for consumer * attempty to speedup benchmarks * check CI works for one test * enable one more test * try to lower poll interval * adjust max duration of test * remain only manual commit test * check if commit is the reason for test delays * try all with schedule commit * revert max test time to 5 seconds * dockerfiles * test set threasholds * create dummy thresholds from ci results * disable benchmark in CI * add header * add stable metrics * update thresholds to stable metrics only * try use '1' instead of 'true' * adjust thresholds to CI results (as temporary measure) * set 20% threshold.. * move arc to unstable metrics * try use 'true' in quotes for CI * try reduce number of messages for more reliable results * try upgrade bench * disable benchmark in CI * Update librdkafka for BoringSSL (swift-server#162) * chore(patch): [sc-8379] use returned error (swift-server#163) * [producer message] Allow optional key for initializer (swift-server#164) Co-authored-by: Harish Yerra <[email protected]> * Allow groupID to be specified when assigning partition (swift-server#161) * Allow groupID to be specified when assigning partition Motivation: A Consumer Group can provide a lot of benefits even if the dynamic loadbalancing features are not used. Modifications: Allow for an optional GroupID when creating a partition consumer. Result: Consumer Groups can now be used when manual assignment is used. * fix format --------- Co-authored-by: Ómar Kjartan Yasin <[email protected]> Co-authored-by: blindspotbounty <[email protected]> Co-authored-by: Franz Busch <[email protected]> * Wrap rd_kafka_consumer_poll into iterator (use librdkafka embedded backpressure) (swift-server#158) * remove message sequence * test consumer with implicit rebalance * misc + format * remove artefact * don't check a lot of messages * fix typo * slow down first consumer to lower message to fit CI timeout * remove helpers * use exact benchmark version to avoid missing thresholds error (as no thresholds so far) * add deprecated marks for backpressure, change comment for future dev * address comments --------- Co-authored-by: Felix Schlegel <[email protected]> Co-authored-by: Axel Andersson <[email protected]> Co-authored-by: Franz Busch <[email protected]> Co-authored-by: Samuel M <[email protected]> Co-authored-by: Harish Yerra <[email protected]> Co-authored-by: Harish Yerra <[email protected]> Co-authored-by: Omar Yasin <[email protected]> Co-authored-by: Ómar Kjartan Yasin <[email protected]>
1 parent 52ceca1 commit 1f2076f

32 files changed

+1407
-572
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,345 @@
1+
//===----------------------------------------------------------------------===//
2+
//
3+
// This source file is part of the swift-kafka-client open source project
4+
//
5+
// Copyright (c) 2023 Apple Inc. and the swift-kafka-client project authors
6+
// Licensed under Apache License v2.0
7+
//
8+
// See LICENSE.txt for license information
9+
// See CONTRIBUTORS.txt for the list of swift-kafka-client project authors
10+
//
11+
// SPDX-License-Identifier: Apache-2.0
12+
//
13+
//===----------------------------------------------------------------------===//
14+
15+
import Benchmark
16+
import Crdkafka
17+
import Dispatch
18+
import struct Foundation.Date
19+
import struct Foundation.UUID
20+
import Kafka
21+
import Logging
22+
import ServiceLifecycle
23+
24+
let benchmarks = {
25+
var uniqueTestTopic: String!
26+
let messageCount: UInt = 1000
27+
28+
Benchmark.defaultConfiguration = .init(
29+
metrics: [
30+
.wallClock,
31+
.cpuTotal,
32+
.contextSwitches,
33+
.throughput,
34+
.allocatedResidentMemory,
35+
] + .arc,
36+
warmupIterations: 0,
37+
scalingFactor: .one,
38+
maxDuration: .seconds(5),
39+
maxIterations: 100,
40+
thresholds: [
41+
.wallClock: .init(relative: [.p90: 35]),
42+
.cpuTotal: .init(relative: [.p90: 35]),
43+
.allocatedResidentMemory: .init(relative: [.p90: 20]),
44+
.contextSwitches: .init(relative: [.p90: 35]),
45+
.throughput: .init(relative: [.p90: 35]),
46+
.objectAllocCount: .init(relative: [.p90: 20]),
47+
.retainCount: .init(relative: [.p90: 20]),
48+
.releaseCount: .init(relative: [.p90: 20]),
49+
.retainReleaseDelta: .init(relative: [.p90: 20]),
50+
]
51+
)
52+
53+
Benchmark.setup = {
54+
uniqueTestTopic = try await prepareTopic(messagesCount: messageCount, partitions: 6)
55+
}
56+
57+
Benchmark.teardown = {
58+
if let uniqueTestTopic {
59+
try deleteTopic(uniqueTestTopic)
60+
}
61+
uniqueTestTopic = nil
62+
}
63+
64+
Benchmark("SwiftKafkaConsumer_basic_consumer_messages_\(messageCount)") { benchmark in
65+
let uniqueGroupID = UUID().uuidString
66+
var consumerConfig = KafkaConsumerConfiguration(
67+
consumptionStrategy: .group(
68+
id: uniqueGroupID,
69+
topics: [uniqueTestTopic]
70+
),
71+
bootstrapBrokerAddresses: [brokerAddress]
72+
)
73+
consumerConfig.autoOffsetReset = .beginning
74+
consumerConfig.broker.addressFamily = .v4
75+
// We must specify it at least 10 otherwise CI will timeout
76+
consumerConfig.pollInterval = .milliseconds(1)
77+
78+
let consumer = try KafkaConsumer(
79+
configuration: consumerConfig,
80+
logger: .perfLogger
81+
)
82+
83+
let serviceGroupConfiguration = ServiceGroupConfiguration(services: [consumer], gracefulShutdownSignals: [.sigterm, .sigint], logger: .perfLogger)
84+
let serviceGroup = ServiceGroup(configuration: serviceGroupConfiguration)
85+
86+
try await withThrowingTaskGroup(of: Void.self) { group in
87+
benchLog("Start consuming")
88+
defer {
89+
benchLog("Finish consuming")
90+
}
91+
// Run Task
92+
group.addTask {
93+
try await serviceGroup.run()
94+
}
95+
96+
// Second Consumer Task
97+
group.addTask {
98+
var ctr: UInt64 = 0
99+
var tmpCtr: UInt64 = 0
100+
let interval: UInt64 = Swift.max(UInt64(messageCount / 20), 1)
101+
let totalStartDate = Date.timeIntervalSinceReferenceDate
102+
var totalBytes: UInt64 = 0
103+
104+
try await benchmark.withMeasurement {
105+
for try await record in consumer.messages {
106+
ctr += 1
107+
totalBytes += UInt64(record.value.readableBytes)
108+
109+
tmpCtr += 1
110+
if tmpCtr >= interval {
111+
benchLog("read \(ctr * 100 / UInt64(messageCount))%")
112+
tmpCtr = 0
113+
}
114+
if ctr >= messageCount {
115+
break
116+
}
117+
}
118+
}
119+
120+
let timeIntervalTotal = Date.timeIntervalSinceReferenceDate - totalStartDate
121+
let avgRateMb = Double(totalBytes) / timeIntervalTotal / 1024
122+
benchLog("All read up to ctr: \(ctr), avgRate: (\(Int(avgRateMb))KB/s), timePassed: \(Int(timeIntervalTotal))sec")
123+
}
124+
125+
// Wait for second Consumer Task to complete
126+
try await group.next()
127+
// Shutdown the serviceGroup
128+
await serviceGroup.triggerGracefulShutdown()
129+
}
130+
}
131+
132+
Benchmark("SwiftKafkaConsumer_with_offset_commit_messages_\(messageCount)") { benchmark in
133+
let uniqueGroupID = UUID().uuidString
134+
var consumerConfig = KafkaConsumerConfiguration(
135+
consumptionStrategy: .group(
136+
id: uniqueGroupID,
137+
topics: [uniqueTestTopic]
138+
),
139+
bootstrapBrokerAddresses: [brokerAddress]
140+
)
141+
consumerConfig.autoOffsetReset = .beginning
142+
consumerConfig.broker.addressFamily = .v4
143+
consumerConfig.isAutoCommitEnabled = false
144+
// We must specify it at least 10 otherwise CI will timeout
145+
consumerConfig.pollInterval = .milliseconds(1)
146+
147+
let consumer = try KafkaConsumer(
148+
configuration: consumerConfig,
149+
logger: .perfLogger
150+
)
151+
152+
let serviceGroupConfiguration = ServiceGroupConfiguration(services: [consumer], gracefulShutdownSignals: [.sigterm, .sigint], logger: .perfLogger)
153+
let serviceGroup = ServiceGroup(configuration: serviceGroupConfiguration)
154+
155+
try await withThrowingTaskGroup(of: Void.self) { group in
156+
benchLog("Start consuming")
157+
defer {
158+
benchLog("Finish consuming")
159+
}
160+
// Run Task
161+
group.addTask {
162+
try await serviceGroup.run()
163+
}
164+
165+
// Second Consumer Task
166+
group.addTask {
167+
var ctr: UInt64 = 0
168+
var tmpCtr: UInt64 = 0
169+
let interval: UInt64 = Swift.max(UInt64(messageCount / 20), 1)
170+
let totalStartDate = Date.timeIntervalSinceReferenceDate
171+
var totalBytes: UInt64 = 0
172+
173+
try await benchmark.withMeasurement {
174+
for try await record in consumer.messages {
175+
try consumer.scheduleCommit(record)
176+
177+
ctr += 1
178+
totalBytes += UInt64(record.value.readableBytes)
179+
180+
tmpCtr += 1
181+
if tmpCtr >= interval {
182+
benchLog("read \(ctr * 100 / UInt64(messageCount))%")
183+
tmpCtr = 0
184+
}
185+
if ctr >= messageCount {
186+
break
187+
}
188+
}
189+
}
190+
191+
let timeIntervalTotal = Date.timeIntervalSinceReferenceDate - totalStartDate
192+
let avgRateMb = Double(totalBytes) / timeIntervalTotal / 1024
193+
benchLog("All read up to ctr: \(ctr), avgRate: (\(Int(avgRateMb))KB/s), timePassed: \(Int(timeIntervalTotal))sec")
194+
}
195+
196+
// Wait for second Consumer Task to complete
197+
try await group.next()
198+
// Shutdown the serviceGroup
199+
await serviceGroup.triggerGracefulShutdown()
200+
}
201+
}
202+
203+
Benchmark("librdkafka_basic_consumer_messages_\(messageCount)") { benchmark in
204+
let uniqueGroupID = UUID().uuidString
205+
let rdKafkaConsumerConfig: [String: String] = [
206+
"group.id": uniqueGroupID,
207+
"bootstrap.servers": "\(brokerAddress.host):\(brokerAddress.port)",
208+
"broker.address.family": "v4",
209+
"auto.offset.reset": "beginning",
210+
]
211+
212+
let configPointer: OpaquePointer = rd_kafka_conf_new()
213+
for (key, value) in rdKafkaConsumerConfig {
214+
precondition(rd_kafka_conf_set(configPointer, key, value, nil, 0) == RD_KAFKA_CONF_OK)
215+
}
216+
217+
let kafkaHandle = rd_kafka_new(RD_KAFKA_CONSUMER, configPointer, nil, 0)
218+
guard let kafkaHandle else {
219+
preconditionFailure("Kafka handle was not created")
220+
}
221+
defer {
222+
rd_kafka_destroy(kafkaHandle)
223+
}
224+
225+
rd_kafka_poll_set_consumer(kafkaHandle)
226+
let subscriptionList = rd_kafka_topic_partition_list_new(1)
227+
defer {
228+
rd_kafka_topic_partition_list_destroy(subscriptionList)
229+
}
230+
rd_kafka_topic_partition_list_add(
231+
subscriptionList,
232+
uniqueTestTopic,
233+
RD_KAFKA_PARTITION_UA
234+
)
235+
rd_kafka_subscribe(kafkaHandle, subscriptionList)
236+
rd_kafka_poll(kafkaHandle, 0)
237+
238+
var ctr: UInt64 = 0
239+
var tmpCtr: UInt64 = 0
240+
241+
let interval: UInt64 = Swift.max(UInt64(messageCount / 20), 1)
242+
let totalStartDate = Date.timeIntervalSinceReferenceDate
243+
var totalBytes: UInt64 = 0
244+
245+
benchmark.withMeasurement {
246+
while ctr < messageCount {
247+
guard let record = rd_kafka_consumer_poll(kafkaHandle, 10) else {
248+
continue
249+
}
250+
defer {
251+
rd_kafka_message_destroy(record)
252+
}
253+
ctr += 1
254+
totalBytes += UInt64(record.pointee.len)
255+
256+
tmpCtr += 1
257+
if tmpCtr >= interval {
258+
benchLog("read \(ctr * 100 / UInt64(messageCount))%")
259+
tmpCtr = 0
260+
}
261+
}
262+
}
263+
264+
rd_kafka_consumer_close(kafkaHandle)
265+
266+
let timeIntervalTotal = Date.timeIntervalSinceReferenceDate - totalStartDate
267+
let avgRateMb = Double(totalBytes) / timeIntervalTotal / 1024
268+
benchLog("All read up to ctr: \(ctr), avgRate: (\(Int(avgRateMb))KB/s), timePassed: \(Int(timeIntervalTotal))sec")
269+
}
270+
271+
Benchmark("librdkafka_with_offset_commit_messages_\(messageCount)") { benchmark in
272+
let uniqueGroupID = UUID().uuidString
273+
let rdKafkaConsumerConfig: [String: String] = [
274+
"group.id": uniqueGroupID,
275+
"bootstrap.servers": "\(brokerAddress.host):\(brokerAddress.port)",
276+
"broker.address.family": "v4",
277+
"auto.offset.reset": "beginning",
278+
"enable.auto.commit": "false",
279+
]
280+
281+
let configPointer: OpaquePointer = rd_kafka_conf_new()
282+
for (key, value) in rdKafkaConsumerConfig {
283+
precondition(rd_kafka_conf_set(configPointer, key, value, nil, 0) == RD_KAFKA_CONF_OK)
284+
}
285+
286+
let kafkaHandle = rd_kafka_new(RD_KAFKA_CONSUMER, configPointer, nil, 0)
287+
guard let kafkaHandle else {
288+
preconditionFailure("Kafka handle was not created")
289+
}
290+
defer {
291+
rd_kafka_destroy(kafkaHandle)
292+
}
293+
294+
rd_kafka_poll_set_consumer(kafkaHandle)
295+
let subscriptionList = rd_kafka_topic_partition_list_new(1)
296+
defer {
297+
rd_kafka_topic_partition_list_destroy(subscriptionList)
298+
}
299+
rd_kafka_topic_partition_list_add(
300+
subscriptionList,
301+
uniqueTestTopic,
302+
RD_KAFKA_PARTITION_UA
303+
)
304+
rd_kafka_subscribe(kafkaHandle, subscriptionList)
305+
rd_kafka_poll(kafkaHandle, 0)
306+
307+
var ctr: UInt64 = 0
308+
var tmpCtr: UInt64 = 0
309+
310+
let interval: UInt64 = Swift.max(UInt64(messageCount / 20), 1)
311+
let totalStartDate = Date.timeIntervalSinceReferenceDate
312+
var totalBytes: UInt64 = 0
313+
314+
benchmark.withMeasurement {
315+
while ctr < messageCount {
316+
guard let record = rd_kafka_consumer_poll(kafkaHandle, 10) else {
317+
continue
318+
}
319+
defer {
320+
rd_kafka_message_destroy(record)
321+
}
322+
guard record.pointee.err != RD_KAFKA_RESP_ERR__PARTITION_EOF else {
323+
continue
324+
}
325+
let result = rd_kafka_commit_message(kafkaHandle, record, 0)
326+
precondition(result == RD_KAFKA_RESP_ERR_NO_ERROR)
327+
328+
ctr += 1
329+
totalBytes += UInt64(record.pointee.len)
330+
331+
tmpCtr += 1
332+
if tmpCtr >= interval {
333+
benchLog("read \(ctr * 100 / UInt64(messageCount))%")
334+
tmpCtr = 0
335+
}
336+
}
337+
}
338+
339+
rd_kafka_consumer_close(kafkaHandle)
340+
341+
let timeIntervalTotal = Date.timeIntervalSinceReferenceDate - totalStartDate
342+
let avgRateMb = Double(totalBytes) / timeIntervalTotal / 1024
343+
benchLog("All read up to ctr: \(ctr), avgRate: (\(Int(avgRateMb))KB/s), timePassed: \(Int(timeIntervalTotal))sec")
344+
}
345+
}

0 commit comments

Comments
 (0)