Skip to content

Commit 98f17fa

Browse files
benchmark for consumer
1 parent aec326e commit 98f17fa

File tree

8 files changed

+696
-209
lines changed

8 files changed

+696
-209
lines changed

Diff for: Benchmarks/Benchmarks/SwiftKafkaConsumerBenchmarks/KafkaConsumerBenchmark.swift

+296-2
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,18 @@
1313
//===----------------------------------------------------------------------===//
1414

1515
import Benchmark
16+
import Crdkafka
17+
import Dispatch
18+
import struct Foundation.Date
19+
import struct Foundation.UUID
1620
import Kafka
21+
import Logging
22+
import ServiceLifecycle
1723

1824
let benchmarks = {
25+
var uniqueTestTopic: String!
26+
let messageCount: UInt = 1000
27+
1928
Benchmark.defaultConfiguration = .init(
2029
metrics: [.wallClock, .cpuTotal, .allocatedResidentMemory, .contextSwitches, .throughput] + .arc,
2130
warmupIterations: 0,
@@ -24,7 +33,292 @@ let benchmarks = {
2433
maxIterations: 100
2534
)
2635

27-
Benchmark.setup = {}
36+
Benchmark.setup = {
37+
uniqueTestTopic = try await prepareTopic(messagesCount: messageCount, partitions: 6)
38+
}
39+
40+
Benchmark.teardown = {
41+
if let uniqueTestTopic {
42+
try deleteTopic(uniqueTestTopic)
43+
}
44+
uniqueTestTopic = nil
45+
}
46+
47+
Benchmark("SwiftKafkaConsumer - basic consumer (messages: \(messageCount))") { benchmark in
48+
let uniqueGroupID = UUID().uuidString
49+
var consumerConfig = KafkaConsumerConfiguration(
50+
consumptionStrategy: .group(
51+
id: uniqueGroupID,
52+
topics: [uniqueTestTopic]
53+
),
54+
bootstrapBrokerAddresses: [brokerAddress]
55+
)
56+
consumerConfig.autoOffsetReset = .beginning
57+
consumerConfig.broker.addressFamily = .v4
58+
59+
let consumer = try KafkaConsumer(
60+
configuration: consumerConfig,
61+
logger: .perfLogger
62+
)
63+
64+
let serviceGroupConfiguration2 = ServiceGroupConfiguration(services: [consumer], gracefulShutdownSignals: [.sigterm, .sigint], logger: .perfLogger)
65+
let serviceGroup2 = ServiceGroup(configuration: serviceGroupConfiguration2)
66+
67+
try await withThrowingTaskGroup(of: Void.self) { group in
68+
benchLog("Start consuming")
69+
defer {
70+
benchLog("Finish consuming")
71+
}
72+
// Run Task
73+
group.addTask {
74+
try await serviceGroup2.run()
75+
}
76+
77+
// Second Consumer Task
78+
group.addTask {
79+
var ctr: UInt64 = 0
80+
var tmpCtr: UInt64 = 0
81+
let interval: UInt64 = Swift.max(UInt64(messageCount / 20), 1)
82+
let totalStartDate = Date.timeIntervalSinceReferenceDate
83+
var totalBytes: UInt64 = 0
84+
85+
try await benchmark.withMeasurement {
86+
for try await record in consumer.messages {
87+
ctr += 1
88+
totalBytes += UInt64(record.value.readableBytes)
89+
90+
tmpCtr += 1
91+
if tmpCtr >= interval {
92+
benchLog("read \(ctr * 100 / UInt64(messageCount))%")
93+
tmpCtr = 0
94+
}
95+
if ctr >= messageCount {
96+
break
97+
}
98+
}
99+
}
100+
101+
let timeIntervalTotal = Date.timeIntervalSinceReferenceDate - totalStartDate
102+
let avgRateMb = Double(totalBytes) / timeIntervalTotal / 1024
103+
benchLog("All read up to ctr: \(ctr), avgRate: (\(Int(avgRateMb))KB/s), timePassed: \(Int(timeIntervalTotal))sec")
104+
}
105+
106+
// Wait for second Consumer Task to complete
107+
try await group.next()
108+
// Shutdown the serviceGroup
109+
await serviceGroup2.triggerGracefulShutdown()
110+
}
111+
}
112+
113+
Benchmark("SwiftKafkaConsumer - with offset commit (messages: \(messageCount))") { benchmark in
114+
let uniqueGroupID = UUID().uuidString
115+
var consumerConfig = KafkaConsumerConfiguration(
116+
consumptionStrategy: .group(
117+
id: uniqueGroupID,
118+
topics: [uniqueTestTopic]
119+
),
120+
bootstrapBrokerAddresses: [brokerAddress]
121+
)
122+
consumerConfig.autoOffsetReset = .beginning
123+
consumerConfig.broker.addressFamily = .v4
124+
consumerConfig.isAutoCommitEnabled = false
125+
126+
let consumer = try KafkaConsumer(
127+
configuration: consumerConfig,
128+
logger: .perfLogger
129+
)
130+
131+
let serviceGroupConfiguration = ServiceGroupConfiguration(services: [consumer], gracefulShutdownSignals: [.sigterm, .sigint], logger: .perfLogger)
132+
let serviceGroup = ServiceGroup(configuration: serviceGroupConfiguration)
133+
134+
try await withThrowingTaskGroup(of: Void.self) { group in
135+
benchLog("Start consuming")
136+
defer {
137+
benchLog("Finish consuming")
138+
}
139+
// Run Task
140+
group.addTask {
141+
try await serviceGroup.run()
142+
}
143+
144+
// Second Consumer Task
145+
group.addTask {
146+
var ctr: UInt64 = 0
147+
var tmpCtr: UInt64 = 0
148+
let interval: UInt64 = Swift.max(UInt64(messageCount / 20), 1)
149+
let totalStartDate = Date.timeIntervalSinceReferenceDate
150+
var totalBytes: UInt64 = 0
151+
152+
try await benchmark.withMeasurement {
153+
for try await record in consumer.messages {
154+
try await consumer.commit(record)
155+
156+
ctr += 1
157+
totalBytes += UInt64(record.value.readableBytes)
158+
159+
tmpCtr += 1
160+
if tmpCtr >= interval {
161+
benchLog("read \(ctr * 100 / UInt64(messageCount))%")
162+
tmpCtr = 0
163+
}
164+
if ctr >= messageCount {
165+
break
166+
}
167+
}
168+
}
169+
170+
let timeIntervalTotal = Date.timeIntervalSinceReferenceDate - totalStartDate
171+
let avgRateMb = Double(totalBytes) / timeIntervalTotal / 1024
172+
benchLog("All read up to ctr: \(ctr), avgRate: (\(Int(avgRateMb))KB/s), timePassed: \(Int(timeIntervalTotal))sec")
173+
}
174+
175+
// Wait for second Consumer Task to complete
176+
try await group.next()
177+
// Shutdown the serviceGroup
178+
await serviceGroup.triggerGracefulShutdown()
179+
}
180+
}
181+
182+
Benchmark("librdkafka - basic consumer (messages: \(messageCount))") { benchmark in
183+
let uniqueGroupID = UUID().uuidString
184+
let rdKafkaConsumerConfig: [String: String] = [
185+
"group.id": uniqueGroupID,
186+
"bootstrap.servers": "\(brokerAddress.host):\(brokerAddress.port)",
187+
"broker.address.family": "v4",
188+
"auto.offset.reset": "beginning",
189+
]
190+
191+
let configPointer: OpaquePointer = rd_kafka_conf_new()
192+
for (key, value) in rdKafkaConsumerConfig {
193+
precondition(rd_kafka_conf_set(configPointer, key, value, nil, 0) == RD_KAFKA_CONF_OK)
194+
}
195+
196+
let kafkaHandle = rd_kafka_new(RD_KAFKA_CONSUMER, configPointer, nil, 0)
197+
guard let kafkaHandle else {
198+
preconditionFailure("Kafka handle was not created")
199+
}
200+
defer {
201+
rd_kafka_destroy(kafkaHandle)
202+
}
203+
204+
rd_kafka_poll_set_consumer(kafkaHandle)
205+
let subscriptionList = rd_kafka_topic_partition_list_new(1)
206+
defer {
207+
rd_kafka_topic_partition_list_destroy(subscriptionList)
208+
}
209+
rd_kafka_topic_partition_list_add(
210+
subscriptionList,
211+
uniqueTestTopic,
212+
RD_KAFKA_PARTITION_UA
213+
)
214+
rd_kafka_subscribe(kafkaHandle, subscriptionList)
215+
rd_kafka_poll(kafkaHandle, 0)
216+
217+
var ctr: UInt64 = 0
218+
var tmpCtr: UInt64 = 0
219+
220+
let interval: UInt64 = Swift.max(UInt64(messageCount / 20), 1)
221+
let totalStartDate = Date.timeIntervalSinceReferenceDate
222+
var totalBytes: UInt64 = 0
223+
224+
benchmark.withMeasurement {
225+
while ctr < messageCount {
226+
guard let record = rd_kafka_consumer_poll(kafkaHandle, 10) else {
227+
continue
228+
}
229+
defer {
230+
rd_kafka_message_destroy(record)
231+
}
232+
ctr += 1
233+
totalBytes += UInt64(record.pointee.len)
234+
235+
tmpCtr += 1
236+
if tmpCtr >= interval {
237+
benchLog("read \(ctr * 100 / UInt64(messageCount))%")
238+
tmpCtr = 0
239+
}
240+
}
241+
}
242+
243+
rd_kafka_consumer_close(kafkaHandle)
244+
245+
let timeIntervalTotal = Date.timeIntervalSinceReferenceDate - totalStartDate
246+
let avgRateMb = Double(totalBytes) / timeIntervalTotal / 1024
247+
benchLog("All read up to ctr: \(ctr), avgRate: (\(Int(avgRateMb))KB/s), timePassed: \(Int(timeIntervalTotal))sec")
248+
}
249+
250+
Benchmark("librdkafka - with offset commit (messages: \(messageCount))") { benchmark in
251+
let uniqueGroupID = UUID().uuidString
252+
let rdKafkaConsumerConfig: [String: String] = [
253+
"group.id": uniqueGroupID,
254+
"bootstrap.servers": "\(brokerAddress.host):\(brokerAddress.port)",
255+
"broker.address.family": "v4",
256+
"auto.offset.reset": "beginning",
257+
"enable.auto.commit": "false",
258+
]
259+
260+
let configPointer: OpaquePointer = rd_kafka_conf_new()
261+
for (key, value) in rdKafkaConsumerConfig {
262+
precondition(rd_kafka_conf_set(configPointer, key, value, nil, 0) == RD_KAFKA_CONF_OK)
263+
}
264+
265+
let kafkaHandle = rd_kafka_new(RD_KAFKA_CONSUMER, configPointer, nil, 0)
266+
guard let kafkaHandle else {
267+
preconditionFailure("Kafka handle was not created")
268+
}
269+
defer {
270+
rd_kafka_destroy(kafkaHandle)
271+
}
272+
273+
rd_kafka_poll_set_consumer(kafkaHandle)
274+
let subscriptionList = rd_kafka_topic_partition_list_new(1)
275+
defer {
276+
rd_kafka_topic_partition_list_destroy(subscriptionList)
277+
}
278+
rd_kafka_topic_partition_list_add(
279+
subscriptionList,
280+
uniqueTestTopic,
281+
RD_KAFKA_PARTITION_UA
282+
)
283+
rd_kafka_subscribe(kafkaHandle, subscriptionList)
284+
rd_kafka_poll(kafkaHandle, 0)
285+
286+
var ctr: UInt64 = 0
287+
var tmpCtr: UInt64 = 0
288+
289+
let interval: UInt64 = Swift.max(UInt64(messageCount / 20), 1)
290+
let totalStartDate = Date.timeIntervalSinceReferenceDate
291+
var totalBytes: UInt64 = 0
292+
293+
benchmark.withMeasurement {
294+
while ctr < messageCount {
295+
guard let record = rd_kafka_consumer_poll(kafkaHandle, 10) else {
296+
continue
297+
}
298+
defer {
299+
rd_kafka_message_destroy(record)
300+
}
301+
guard record.pointee.err != RD_KAFKA_RESP_ERR__PARTITION_EOF else {
302+
continue
303+
}
304+
let result = rd_kafka_commit_message(kafkaHandle, record, 0)
305+
precondition(result == RD_KAFKA_RESP_ERR_NO_ERROR)
306+
307+
ctr += 1
308+
totalBytes += UInt64(record.pointee.len)
309+
310+
tmpCtr += 1
311+
if tmpCtr >= interval {
312+
benchLog("read \(ctr * 100 / UInt64(messageCount))%")
313+
tmpCtr = 0
314+
}
315+
}
316+
}
317+
318+
rd_kafka_consumer_close(kafkaHandle)
28319

29-
Benchmark.teardown = {}
320+
let timeIntervalTotal = Date.timeIntervalSinceReferenceDate - totalStartDate
321+
let avgRateMb = Double(totalBytes) / timeIntervalTotal / 1024
322+
benchLog("All read up to ctr: \(ctr), avgRate: (\(Int(avgRateMb))KB/s), timePassed: \(Int(timeIntervalTotal))sec")
323+
}
30324
}

0 commit comments

Comments
 (0)