Skip to content

Commit 2f2d828

Browse files
Merge remote-tracking branch 'origin/main' into feature/sc-1976/gsoc-expose-librdkafka-statistics
2 parents cac1442 + 5b07fe2 commit 2f2d828

10 files changed

+139
-135
lines changed

Diff for: Sources/SwiftKafka/Configuration/KafkaProducerConfiguration.swift

+11
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,17 @@ public struct KafkaProducerConfiguration {
2525
/// >= 1ms - statistics provided every specified interval
2626
public var statisticsInterval: Duration = .zero
2727

28+
/// Maximum timeout for flushing outstanding produce requests when the ``KakfaProducer`` is shutting down.
29+
/// Default: `10000`
30+
public var flushTimeoutMilliseconds: Int = 10000 {
31+
didSet {
32+
precondition(
33+
0...Int(Int32.max) ~= self.flushTimeoutMilliseconds,
34+
"Flush timeout outside of valid range \(0...Int32.max)"
35+
)
36+
}
37+
}
38+
2839
// MARK: - Producer-specific Config Properties
2940

3041
/// When set to true, the producer will ensure that messages are successfully produced exactly once and in the original produce order. The following configuration properties are adjusted automatically (if not modified by the user) when idempotence is enabled: max.in.flight.requests.per.connection=5 (must be less than or equal to 5), retries=INT32_MAX (must be greater than 0), acks=all, queuing.strategy=fifo. Producer instantation will fail if user-supplied configuration is incompatible.

Diff for: Sources/SwiftKafka/KafkaConsumer.swift

+14-15
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@
1212
//
1313
//===----------------------------------------------------------------------===//
1414

15-
import Crdkafka
1615
import Logging
1716
import NIOConcurrencyHelpers
1817
import NIOCore
@@ -128,7 +127,7 @@ public final class KafkaConsumer: Sendable, Service {
128127
self.config = config
129128
self.logger = logger
130129

131-
let client = try RDKafka.createClient(
130+
let client = try RDKafkaClient.makeClient(
132131
type: .consumer,
133132
configDictionary: config.dictionary,
134133
events: [.log, .fetch, .offsetCommit],
@@ -164,8 +163,8 @@ public final class KafkaConsumer: Sendable, Service {
164163
)
165164
}
166165

167-
// Events that would be triggered by ``KafkaClient/poll(timeout:)``
168-
// are now triggered by ``KafkaClient/consumerPoll``.
166+
// Events that would be triggered by ``RDKafkaClient/poll(timeout:)``
167+
// are now triggered by ``RDKafkaClient/consumerPoll``.
169168
try client.pollSetConsumer()
170169

171170
switch config.consumptionStrategy._internal {
@@ -302,7 +301,7 @@ public final class KafkaConsumer: Sendable, Service {
302301
}
303302

304303
private func _triggerGracefulShutdown(
305-
client: KafkaClient,
304+
client: RDKafkaClient,
306305
logger: Logger
307306
) {
308307
do {
@@ -336,7 +335,7 @@ extension KafkaConsumer {
336335
/// - Parameter client: Client used for handling the connection to the Kafka cluster.
337336
/// - Parameter source: ``NIOAsyncSequenceProducer/Source`` used for yielding new elements.
338337
case initializing(
339-
client: KafkaClient,
338+
client: RDKafkaClient,
340339
source: Producer.Source,
341340
statisticsSource: StatisticsProducer.Source?
342341
)
@@ -346,15 +345,15 @@ extension KafkaConsumer {
346345
/// - Parameter source: ``NIOAsyncSequenceProducer/Source`` used for yielding new elements.
347346
/// - Parameter statisticsSource: ``NIOAsyncSequenceProducer/Source`` used for yielding new statistics elements.
348347
case consuming(
349-
client: KafkaClient,
348+
client: RDKafkaClient,
350349
source: Producer.Source?,
351350
statisticsSource: StatisticsProducer.Source?
352351
)
353352
/// The ``KafkaConsumer/triggerGracefulShutdown()`` has been invoked.
354353
/// We are now in the process of commiting our last state to the broker.
355354
///
356355
/// - Parameter client: Client used for handling the connection to the Kafka cluster.
357-
case finishing(client: KafkaClient)
356+
case finishing(client: RDKafkaClient)
358357
/// The ``KafkaConsumer`` is closed.
359358
case finished
360359
}
@@ -365,7 +364,7 @@ extension KafkaConsumer {
365364
/// Delayed initialization of `StateMachine` as the `source` and the `pollClosure` are
366365
/// not yet available when the normal initialization occurs.
367366
mutating func initialize(
368-
client: KafkaClient,
367+
client: RDKafkaClient,
369368
source: Producer.Source,
370369
statisticsSource: StatisticsProducer.Source?
371370
) {
@@ -387,7 +386,7 @@ extension KafkaConsumer {
387386
/// - Parameter source: ``NIOAsyncSequenceProducer/Source`` used for yielding new elements.
388387
/// - Parameter statisticsSource: ``NIOAsyncSequenceProducer/Source`` used for yielding new statistics elements.
389388
case pollForAndYieldMessage(
390-
client: KafkaClient,
389+
client: RDKafkaClient,
391390
source: Producer.Source?,
392391
statisticsSource: StatisticsProducer.Source?
393392
)
@@ -423,7 +422,7 @@ extension KafkaConsumer {
423422
enum SetUpConnectionAction {
424423
/// Set up the connection through ``subscribe()`` or ``assign()``.
425424
/// - Parameter client: Client used for handling the connection to the Kafka cluster.
426-
case setUpConnection(client: KafkaClient)
425+
case setUpConnection(client: RDKafkaClient)
427426
}
428427

429428
/// Get action to be taken when wanting to set up the connection through ``subscribe()`` or ``assign()``.
@@ -482,7 +481,7 @@ extension KafkaConsumer {
482481
enum StoreOffsetAction {
483482
/// Store the message offset with the given `client`.
484483
/// - Parameter client: Client used for handling the connection to the Kafka cluster.
485-
case storeOffset(client: KafkaClient)
484+
case storeOffset(client: RDKafkaClient)
486485
}
487486

488487
/// Get action to take when wanting to store a message offset (to be auto-committed by `librdkafka`).
@@ -508,7 +507,7 @@ extension KafkaConsumer {
508507
///
509508
/// - Parameter client: Client used for handling the connection to the Kafka cluster.
510509
case commitSync(
511-
client: KafkaClient
510+
client: RDKafkaClient
512511
)
513512
/// Throw an error. The ``KafkaConsumer`` is closed.
514513
case throwClosedError
@@ -540,14 +539,14 @@ extension KafkaConsumer {
540539
///
541540
/// - Parameter client: Client used for handling the connection to the Kafka cluster.
542541
case triggerGracefulShutdown(
543-
client: KafkaClient
542+
client: RDKafkaClient
544543
)
545544
/// Shut down the ``KafkaConsumer`` and finish the given `source` object.
546545
///
547546
/// - Parameter client: Client used for handling the connection to the Kafka cluster.
548547
/// - Parameter source: ``NIOAsyncSequenceProducer/Source`` used for yielding new elements.
549548
case triggerGracefulShutdownAndFinishSource(
550-
client: KafkaClient,
549+
client: RDKafkaClient,
551550
source: Producer.Source,
552551
statisticsSource: StatisticsProducer.Source?
553552
)

Diff for: Sources/SwiftKafka/KafkaProducer.swift

+27-29
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@
1212
//
1313
//===----------------------------------------------------------------------===//
1414

15-
import Crdkafka
1615
import Logging
1716
import NIOConcurrencyHelpers
1817
import NIOCore
@@ -136,7 +135,7 @@ public final class KafkaProducer: Service, Sendable {
136135
}
137136
}
138137

139-
let client = try RDKafka.createClient(
138+
let client = try RDKafkaClient.makeClient(
140139
type: .producer,
141140
configDictionary: config.dictionary,
142141
events: [.log], // No .deliveryReport here!
@@ -189,7 +188,7 @@ public final class KafkaProducer: Service, Sendable {
189188
)
190189
let source = sourceAndSequence.source
191190

192-
let client = try RDKafka.createClient(
191+
let client = try RDKafkaClient.makeClient(
193192
type: .producer,
194193
configDictionary: config.dictionary,
195194
events: [.log, .deliveryReport],
@@ -268,9 +267,15 @@ public final class KafkaProducer: Service, Sendable {
268267
if !flushing || events.isEmpty {
269268
try await Task.sleep(for: self.config.pollInterval)
270269
}
271-
case .terminatePollLoopAndFinishSource(let source, let statisticsSource):
270+
case .flushFinishSourceAndTerminatePollLoop(let client, let source, let statisticsSource):
271+
precondition(
272+
0...Int(Int32.max) ~= self.config.flushTimeoutMilliseconds,
273+
"Flush timeout outside of valid range \(0...Int32.max)"
274+
)
275+
try await client.flush(timeoutMilliseconds: Int32(self.config.flushTimeoutMilliseconds))
272276
source?.finish()
273277
statisticsSource?.finish()
278+
statisticsSource?.finish()
274279
return
275280
case .terminatePollLoop:
276281
return
@@ -329,7 +334,7 @@ extension KafkaProducer {
329334
/// - Parameter source: ``NIOAsyncSequenceProducer/Source`` used for yielding new elements.
330335
/// - Parameter topicHandles: Class containing all topic names with their respective `rd_kafka_topic_t` pointer.
331336
case started(
332-
client: KafkaClient,
337+
client: RDKafkaClient,
333338
messageIDCounter: UInt,
334339
source: Producer.Source?,
335340
statisticsSource: StatisticsProducer.Source?,
@@ -340,8 +345,8 @@ extension KafkaProducer {
340345
///
341346
/// - Parameter client: Client used for handling the connection to the Kafka cluster.
342347
/// - Parameter source: ``NIOAsyncSequenceProducer/Source`` used for yielding new elements.
343-
case flushing(
344-
client: KafkaClient,
348+
case finishing(
349+
client: RDKafkaClient,
345350
source: Producer.Source?,
346351
statisticsSource: StatisticsProducer.Source?
347352
)
@@ -355,7 +360,7 @@ extension KafkaProducer {
355360
/// Delayed initialization of `StateMachine` as the `source` is not yet available
356361
/// when the normal initialization occurs.
357362
mutating func initialize(
358-
client: KafkaClient,
363+
client: RDKafkaClient,
359364
source: Producer.Source?,
360365
statisticsSource: StatisticsProducer.Source?
361366
) {
@@ -377,11 +382,12 @@ extension KafkaProducer {
377382
///
378383
/// - Parameter client: Client used for handling the connection to the Kafka cluster.
379384
/// - Parameter source: ``NIOAsyncSequenceProducer/Source`` used for yielding new elements.
380-
case pollAndYield(client: KafkaClient, source: Producer.Source?, statisticsSource: StatisticsProducer.Source?, flushing: Bool = false)
385+
case pollAndYield(client: RDKafkaClient, source: Producer.Source?, statisticsSource: StatisticsProducer.Source?, flushing: Bool = false)
381386
/// Terminate the poll loop and finish the given `NIOAsyncSequenceProducerSource`.
382387
///
388+
/// - Parameter client: Client used for handling the connection to the Kafka cluster.
383389
/// - Parameter source: ``NIOAsyncSequenceProducer/Source`` used for yielding new elements.
384-
case terminatePollLoopAndFinishSource(source: Producer.Source?, statisticsSource: StatisticsProducer.Source?)
390+
case flushFinishSourceAndTerminatePollLoop(client: RDKafkaClient, source: Producer.Source?, statisticsSource: StatisticsProducer.Source?)
385391
/// Terminate the poll loop.
386392
case terminatePollLoop
387393
}
@@ -396,13 +402,8 @@ extension KafkaProducer {
396402
fatalError("\(#function) invoked while still in state \(self.state)")
397403
case .started(let client, _, let source, let statisticsSource, _):
398404
return .pollAndYield(client: client, source: source, statisticsSource: statisticsSource)
399-
case .flushing(let client, let source, let statisticsSource):
400-
if client.outgoingQueueSize > 0 {
401-
return .pollAndYield(client: client, source: source, statisticsSource: statisticsSource, flushing: true)
402-
} else {
403-
self.state = .finished
404-
return .terminatePollLoopAndFinishSource(source: source, statisticsSource: statisticsSource)
405-
}
405+
case .finishing(let client, let source, let statisticsSource):
406+
return .flushFinishSourceAndTerminatePollLoop(client: client, source: source, statisticsSource: statisticsSource)
406407
case .finished:
407408
return .terminatePollLoop
408409
}
@@ -414,7 +415,7 @@ extension KafkaProducer {
414415
///
415416
/// - Important: `newMessageID` is the new message ID assigned to the message to be sent.
416417
case send(
417-
client: KafkaClient,
418+
client: RDKafkaClient,
418419
newMessageID: UInt,
419420
topicHandles: RDKafkaTopicHandles
420421
)
@@ -444,8 +445,8 @@ extension KafkaProducer {
444445
newMessageID: newMessageID,
445446
topicHandles: topicHandles
446447
)
447-
case .flushing:
448-
throw KafkaError.connectionClosed(reason: "Producer in the process of flushing and shutting down")
448+
case .finishing:
449+
throw KafkaError.connectionClosed(reason: "Producer in the process of finishing")
449450
case .finished:
450451
throw KafkaError.connectionClosed(reason: "Tried to produce a message with a closed producer")
451452
}
@@ -479,9 +480,9 @@ extension KafkaProducer {
479480
}
480481
self.state = .started(client: client, messageIDCounter: counter, source: nil, statisticsSource: statisticsSource, topicHandles: topicHandlers)
481482
return .finishSource(source: source)
482-
case .flushing(let client, let source, let statisticsSource):
483+
case .finishing(let client, let source, let statisticsSource):
483484
// Setting source to nil to prevent incoming acknowledgements from buffering in `source`
484-
self.state = .flushing(client: client, source: nil, statisticsSource: statisticsSource)
485+
self.state = .finishing(client: client, source: nil, statisticsSource: statisticsSource)
485486
return .finishSource(source: source)
486487
case .finished:
487488
break
@@ -498,13 +499,10 @@ extension KafkaProducer {
498499
fatalError("stopStatistics() must not be invoked more than once")
499500
}
500501
self.state = .started(client: client, messageIDCounter: counter, source: source, statisticsSource: nil, topicHandles: topicHandlers)
501-
client.withKafkaHandlePointer { kafkaHandle in
502-
rd_kafka_conf_set_stats_cb(<#T##conf: OpaquePointer!##OpaquePointer!#>, <#T##stats_cb: ((OpaquePointer?, UnsafeMutablePointer<CChar>?, Int, UnsafeMutableRawPointer?) -> Int32)!##((OpaquePointer?, UnsafeMutablePointer<CChar>?, Int, UnsafeMutableRawPointer?) -> Int32)!##(OpaquePointer?, UnsafeMutablePointer<CChar>?, Int, UnsafeMutableRawPointer?) -> Int32#>)
503-
}
504502
return .finishStatisticsSource(statisticsSource: statisticsSource)
505-
case .flushing(let client, let source, let statisticsSource):
503+
case .finishing(let client, let source, let statisticsSource):
506504
// Setting source to nil to prevent incoming acknowledgements from buffering in `source`
507-
self.state = .flushing(client: client, source: source, statisticsSource: nil)
505+
self.state = .finishing(client: client, source: source, statisticsSource: nil)
508506
return .finishStatisticsSource(statisticsSource: statisticsSource)
509507
case .finished:
510508
break
@@ -520,8 +518,8 @@ extension KafkaProducer {
520518
case .uninitialized:
521519
fatalError("\(#function) invoked while still in state \(self.state)")
522520
case .started(let client, _, let source, let statisticsSource, _):
523-
self.state = .flushing(client: client, source: source, statisticsSource: statisticsSource)
524-
case .flushing, .finished:
521+
self.state = .finishing(client: client, source: source, statisticsSource: statisticsSource)
522+
case .finishing, .finished:
525523
break
526524
}
527525
}

Diff for: Sources/SwiftKafka/RDKafka/RDKafka.swift

-63
This file was deleted.

0 commit comments

Comments
 (0)