Skip to content

Commit 5ebb47a

Browse files
Add two consumer benchmark (#149)
* benchmark for consumer * attempty to speedup benchmarks * check CI works for one test * enable one more test * try to lower poll interval * adjust max duration of test * remain only manual commit test * check if commit is the reason for test delays * try all with schedule commit * revert max test time to 5 seconds * dockerfiles * test set threasholds * create dummy thresholds from ci results * disable benchmark in CI * add header * add stable metrics * update thresholds to stable metrics only * try use '1' instead of 'true' * adjust thresholds to CI results (as temporary measure) * set 20% threshold.. * move arc to unstable metrics * try use 'true' in quotes for CI * try reduce number of messages for more reliable results * try upgrade bench * disable benchmark in CI
1 parent 20faefb commit 5ebb47a

16 files changed

+782
-216
lines changed

Diff for: Benchmarks/Benchmarks/SwiftKafkaConsumerBenchmarks/KafkaConsumerBenchmark.swift

+319-4
Original file line numberDiff line numberDiff line change
@@ -13,18 +13,333 @@
1313
//===----------------------------------------------------------------------===//
1414

1515
import Benchmark
16+
import Crdkafka
17+
import Dispatch
18+
import struct Foundation.Date
19+
import struct Foundation.UUID
1620
import Kafka
21+
import Logging
22+
import ServiceLifecycle
1723

1824
let benchmarks = {
25+
var uniqueTestTopic: String!
26+
let messageCount: UInt = 1000
27+
1928
Benchmark.defaultConfiguration = .init(
20-
metrics: [.wallClock, .cpuTotal, .allocatedResidentMemory, .contextSwitches, .throughput] + .arc,
29+
metrics: [
30+
.wallClock,
31+
.cpuTotal,
32+
.contextSwitches,
33+
.throughput,
34+
.allocatedResidentMemory,
35+
] + .arc,
2136
warmupIterations: 0,
2237
scalingFactor: .one,
2338
maxDuration: .seconds(5),
24-
maxIterations: 100
39+
maxIterations: 100,
40+
thresholds: [
41+
.wallClock: .init(relative: [.p90: 35]),
42+
.cpuTotal: .init(relative: [.p90: 35]),
43+
.allocatedResidentMemory: .init(relative: [.p90: 20]),
44+
.contextSwitches: .init(relative: [.p90: 35]),
45+
.throughput: .init(relative: [.p90: 35]),
46+
.objectAllocCount: .init(relative: [.p90: 20]),
47+
.retainCount: .init(relative: [.p90: 20]),
48+
.releaseCount: .init(relative: [.p90: 20]),
49+
.retainReleaseDelta: .init(relative: [.p90: 20]),
50+
]
2551
)
2652

27-
Benchmark.setup = {}
53+
Benchmark.setup = {
54+
uniqueTestTopic = try await prepareTopic(messagesCount: messageCount, partitions: 6)
55+
}
56+
57+
Benchmark.teardown = {
58+
if let uniqueTestTopic {
59+
try deleteTopic(uniqueTestTopic)
60+
}
61+
uniqueTestTopic = nil
62+
}
63+
64+
Benchmark("SwiftKafkaConsumer_basic_consumer_messages_\(messageCount)") { benchmark in
65+
let uniqueGroupID = UUID().uuidString
66+
var consumerConfig = KafkaConsumerConfiguration(
67+
consumptionStrategy: .group(
68+
id: uniqueGroupID,
69+
topics: [uniqueTestTopic]
70+
),
71+
bootstrapBrokerAddresses: [brokerAddress]
72+
)
73+
consumerConfig.autoOffsetReset = .beginning
74+
consumerConfig.broker.addressFamily = .v4
75+
// We must specify it at least 10 otherwise CI will timeout
76+
consumerConfig.pollInterval = .milliseconds(1)
77+
78+
let consumer = try KafkaConsumer(
79+
configuration: consumerConfig,
80+
logger: .perfLogger
81+
)
82+
83+
let serviceGroupConfiguration = ServiceGroupConfiguration(services: [consumer], gracefulShutdownSignals: [.sigterm, .sigint], logger: .perfLogger)
84+
let serviceGroup = ServiceGroup(configuration: serviceGroupConfiguration)
85+
86+
try await withThrowingTaskGroup(of: Void.self) { group in
87+
benchLog("Start consuming")
88+
defer {
89+
benchLog("Finish consuming")
90+
}
91+
// Run Task
92+
group.addTask {
93+
try await serviceGroup.run()
94+
}
95+
96+
// Second Consumer Task
97+
group.addTask {
98+
var ctr: UInt64 = 0
99+
var tmpCtr: UInt64 = 0
100+
let interval: UInt64 = Swift.max(UInt64(messageCount / 20), 1)
101+
let totalStartDate = Date.timeIntervalSinceReferenceDate
102+
var totalBytes: UInt64 = 0
103+
104+
try await benchmark.withMeasurement {
105+
for try await record in consumer.messages {
106+
ctr += 1
107+
totalBytes += UInt64(record.value.readableBytes)
108+
109+
tmpCtr += 1
110+
if tmpCtr >= interval {
111+
benchLog("read \(ctr * 100 / UInt64(messageCount))%")
112+
tmpCtr = 0
113+
}
114+
if ctr >= messageCount {
115+
break
116+
}
117+
}
118+
}
119+
120+
let timeIntervalTotal = Date.timeIntervalSinceReferenceDate - totalStartDate
121+
let avgRateMb = Double(totalBytes) / timeIntervalTotal / 1024
122+
benchLog("All read up to ctr: \(ctr), avgRate: (\(Int(avgRateMb))KB/s), timePassed: \(Int(timeIntervalTotal))sec")
123+
}
124+
125+
// Wait for second Consumer Task to complete
126+
try await group.next()
127+
// Shutdown the serviceGroup
128+
await serviceGroup.triggerGracefulShutdown()
129+
}
130+
}
131+
132+
Benchmark("SwiftKafkaConsumer_with_offset_commit_messages_\(messageCount)") { benchmark in
133+
let uniqueGroupID = UUID().uuidString
134+
var consumerConfig = KafkaConsumerConfiguration(
135+
consumptionStrategy: .group(
136+
id: uniqueGroupID,
137+
topics: [uniqueTestTopic]
138+
),
139+
bootstrapBrokerAddresses: [brokerAddress]
140+
)
141+
consumerConfig.autoOffsetReset = .beginning
142+
consumerConfig.broker.addressFamily = .v4
143+
consumerConfig.isAutoCommitEnabled = false
144+
// We must specify it at least 10 otherwise CI will timeout
145+
consumerConfig.pollInterval = .milliseconds(1)
146+
147+
let consumer = try KafkaConsumer(
148+
configuration: consumerConfig,
149+
logger: .perfLogger
150+
)
151+
152+
let serviceGroupConfiguration = ServiceGroupConfiguration(services: [consumer], gracefulShutdownSignals: [.sigterm, .sigint], logger: .perfLogger)
153+
let serviceGroup = ServiceGroup(configuration: serviceGroupConfiguration)
154+
155+
try await withThrowingTaskGroup(of: Void.self) { group in
156+
benchLog("Start consuming")
157+
defer {
158+
benchLog("Finish consuming")
159+
}
160+
// Run Task
161+
group.addTask {
162+
try await serviceGroup.run()
163+
}
164+
165+
// Second Consumer Task
166+
group.addTask {
167+
var ctr: UInt64 = 0
168+
var tmpCtr: UInt64 = 0
169+
let interval: UInt64 = Swift.max(UInt64(messageCount / 20), 1)
170+
let totalStartDate = Date.timeIntervalSinceReferenceDate
171+
var totalBytes: UInt64 = 0
172+
173+
try await benchmark.withMeasurement {
174+
for try await record in consumer.messages {
175+
try consumer.scheduleCommit(record)
176+
177+
ctr += 1
178+
totalBytes += UInt64(record.value.readableBytes)
179+
180+
tmpCtr += 1
181+
if tmpCtr >= interval {
182+
benchLog("read \(ctr * 100 / UInt64(messageCount))%")
183+
tmpCtr = 0
184+
}
185+
if ctr >= messageCount {
186+
break
187+
}
188+
}
189+
}
190+
191+
let timeIntervalTotal = Date.timeIntervalSinceReferenceDate - totalStartDate
192+
let avgRateMb = Double(totalBytes) / timeIntervalTotal / 1024
193+
benchLog("All read up to ctr: \(ctr), avgRate: (\(Int(avgRateMb))KB/s), timePassed: \(Int(timeIntervalTotal))sec")
194+
}
195+
196+
// Wait for second Consumer Task to complete
197+
try await group.next()
198+
// Shutdown the serviceGroup
199+
await serviceGroup.triggerGracefulShutdown()
200+
}
201+
}
202+
203+
Benchmark("librdkafka_basic_consumer_messages_\(messageCount)") { benchmark in
204+
let uniqueGroupID = UUID().uuidString
205+
let rdKafkaConsumerConfig: [String: String] = [
206+
"group.id": uniqueGroupID,
207+
"bootstrap.servers": "\(brokerAddress.host):\(brokerAddress.port)",
208+
"broker.address.family": "v4",
209+
"auto.offset.reset": "beginning",
210+
]
211+
212+
let configPointer: OpaquePointer = rd_kafka_conf_new()
213+
for (key, value) in rdKafkaConsumerConfig {
214+
precondition(rd_kafka_conf_set(configPointer, key, value, nil, 0) == RD_KAFKA_CONF_OK)
215+
}
216+
217+
let kafkaHandle = rd_kafka_new(RD_KAFKA_CONSUMER, configPointer, nil, 0)
218+
guard let kafkaHandle else {
219+
preconditionFailure("Kafka handle was not created")
220+
}
221+
defer {
222+
rd_kafka_destroy(kafkaHandle)
223+
}
224+
225+
rd_kafka_poll_set_consumer(kafkaHandle)
226+
let subscriptionList = rd_kafka_topic_partition_list_new(1)
227+
defer {
228+
rd_kafka_topic_partition_list_destroy(subscriptionList)
229+
}
230+
rd_kafka_topic_partition_list_add(
231+
subscriptionList,
232+
uniqueTestTopic,
233+
RD_KAFKA_PARTITION_UA
234+
)
235+
rd_kafka_subscribe(kafkaHandle, subscriptionList)
236+
rd_kafka_poll(kafkaHandle, 0)
237+
238+
var ctr: UInt64 = 0
239+
var tmpCtr: UInt64 = 0
240+
241+
let interval: UInt64 = Swift.max(UInt64(messageCount / 20), 1)
242+
let totalStartDate = Date.timeIntervalSinceReferenceDate
243+
var totalBytes: UInt64 = 0
244+
245+
benchmark.withMeasurement {
246+
while ctr < messageCount {
247+
guard let record = rd_kafka_consumer_poll(kafkaHandle, 10) else {
248+
continue
249+
}
250+
defer {
251+
rd_kafka_message_destroy(record)
252+
}
253+
ctr += 1
254+
totalBytes += UInt64(record.pointee.len)
255+
256+
tmpCtr += 1
257+
if tmpCtr >= interval {
258+
benchLog("read \(ctr * 100 / UInt64(messageCount))%")
259+
tmpCtr = 0
260+
}
261+
}
262+
}
263+
264+
rd_kafka_consumer_close(kafkaHandle)
265+
266+
let timeIntervalTotal = Date.timeIntervalSinceReferenceDate - totalStartDate
267+
let avgRateMb = Double(totalBytes) / timeIntervalTotal / 1024
268+
benchLog("All read up to ctr: \(ctr), avgRate: (\(Int(avgRateMb))KB/s), timePassed: \(Int(timeIntervalTotal))sec")
269+
}
270+
271+
Benchmark("librdkafka_with_offset_commit_messages_\(messageCount)") { benchmark in
272+
let uniqueGroupID = UUID().uuidString
273+
let rdKafkaConsumerConfig: [String: String] = [
274+
"group.id": uniqueGroupID,
275+
"bootstrap.servers": "\(brokerAddress.host):\(brokerAddress.port)",
276+
"broker.address.family": "v4",
277+
"auto.offset.reset": "beginning",
278+
"enable.auto.commit": "false",
279+
]
280+
281+
let configPointer: OpaquePointer = rd_kafka_conf_new()
282+
for (key, value) in rdKafkaConsumerConfig {
283+
precondition(rd_kafka_conf_set(configPointer, key, value, nil, 0) == RD_KAFKA_CONF_OK)
284+
}
285+
286+
let kafkaHandle = rd_kafka_new(RD_KAFKA_CONSUMER, configPointer, nil, 0)
287+
guard let kafkaHandle else {
288+
preconditionFailure("Kafka handle was not created")
289+
}
290+
defer {
291+
rd_kafka_destroy(kafkaHandle)
292+
}
293+
294+
rd_kafka_poll_set_consumer(kafkaHandle)
295+
let subscriptionList = rd_kafka_topic_partition_list_new(1)
296+
defer {
297+
rd_kafka_topic_partition_list_destroy(subscriptionList)
298+
}
299+
rd_kafka_topic_partition_list_add(
300+
subscriptionList,
301+
uniqueTestTopic,
302+
RD_KAFKA_PARTITION_UA
303+
)
304+
rd_kafka_subscribe(kafkaHandle, subscriptionList)
305+
rd_kafka_poll(kafkaHandle, 0)
306+
307+
var ctr: UInt64 = 0
308+
var tmpCtr: UInt64 = 0
309+
310+
let interval: UInt64 = Swift.max(UInt64(messageCount / 20), 1)
311+
let totalStartDate = Date.timeIntervalSinceReferenceDate
312+
var totalBytes: UInt64 = 0
313+
314+
benchmark.withMeasurement {
315+
while ctr < messageCount {
316+
guard let record = rd_kafka_consumer_poll(kafkaHandle, 10) else {
317+
continue
318+
}
319+
defer {
320+
rd_kafka_message_destroy(record)
321+
}
322+
guard record.pointee.err != RD_KAFKA_RESP_ERR__PARTITION_EOF else {
323+
continue
324+
}
325+
let result = rd_kafka_commit_message(kafkaHandle, record, 0)
326+
precondition(result == RD_KAFKA_RESP_ERR_NO_ERROR)
327+
328+
ctr += 1
329+
totalBytes += UInt64(record.pointee.len)
330+
331+
tmpCtr += 1
332+
if tmpCtr >= interval {
333+
benchLog("read \(ctr * 100 / UInt64(messageCount))%")
334+
tmpCtr = 0
335+
}
336+
}
337+
}
338+
339+
rd_kafka_consumer_close(kafkaHandle)
28340

29-
Benchmark.teardown = {}
341+
let timeIntervalTotal = Date.timeIntervalSinceReferenceDate - totalStartDate
342+
let avgRateMb = Double(totalBytes) / timeIntervalTotal / 1024
343+
benchLog("All read up to ctr: \(ctr), avgRate: (\(Int(avgRateMb))KB/s), timePassed: \(Int(timeIntervalTotal))sec")
344+
}
30345
}

0 commit comments

Comments
 (0)