Skip to content

Commit f1800c2

Browse files
KafkaConsumer: back pressure + improved read speed (#139)
* Add Back Pressure to `KafkaConsumer` Motivation: Closes #131. Modifications: * re-add `KafkaConsumerConfiguration.backPressureStrategy: BackPressureStrategy`, currently allowing users to add high-low-watermark backpressure to their `KafkaConsumer`s * `KafkaConsumer`: * make `KafkaConsumerMessages` use `NIOAsyncSequenceProducerBackPressureStrategies.HighLowWatermark` as backpressure strategy * remove `rd_kafka_poll_set_consumer` -> use two separate queues for consumer events and consumer messages so we can exert backpressure on the consumer message queue * remove idle polling mechanism where incoming messages were discarded when `KafkaConsumerMessages` was terminated -> we now have to independent queues * rename `.pollForAndYieldMessage` -> `.pollForEventsAndMessages` * refactor `State` and add `ConsumerMessagesSequenceState` * `KafkaProducer`: * rename `.consumptionStopped` -> `.eventConsumptionFinished` * `RDKafkaClient`: * bring back `consumerPoll()` * `eventPoll()`: only queue main queue for events since consumer messages are now handled on a different queue * KafkaConsumer: two state machines Modifications: * have two state machines: 1. consumer state itself 2. state of consumer messages async sequence * KafkaConsumer: merge both state machines * Refactor + DocC * Review Franz + Blindspot Modifications: * `KafkaConsumer`: * end consumer message poll loop when async sequence drops message * do not sleep if we picked up reading new messages again after we finished reading a partition * `messageRunLoop`: * fix `fatalError` where `newMessagesProduced()` is invoked after `stopProducing()` * add func `batchConsumerPoll` that reads a batch of messages to avoid acquiring the lock in `messageRunLoop` too often
1 parent 73bc4b4 commit f1800c2

File tree

6 files changed

+368
-189
lines changed

6 files changed

+368
-189
lines changed

Diff for: Sources/Kafka/Configuration/KafkaConsumerConfiguration.swift

+32-1
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,37 @@ public struct KafkaConsumerConfiguration {
2323
/// Default: `.milliseconds(100)`
2424
public var pollInterval: Duration = .milliseconds(100)
2525

26+
/// A struct representing different back pressure strategies for consuming messages in ``KafkaConsumer``.
27+
public struct BackPressureStrategy: Sendable, Hashable {
28+
enum _BackPressureStrategy: Sendable, Hashable {
29+
case watermark(low: Int, high: Int)
30+
}
31+
32+
let _internal: _BackPressureStrategy
33+
34+
private init(backPressureStrategy: _BackPressureStrategy) {
35+
self._internal = backPressureStrategy
36+
}
37+
38+
/// A back pressure strategy based on high and low watermarks.
39+
///
40+
/// The consumer maintains a buffer size between a low watermark and a high watermark
41+
/// to control the flow of incoming messages.
42+
///
43+
/// - Parameter low: The lower threshold for the buffer size (low watermark).
44+
/// - Parameter high: The upper threshold for the buffer size (high watermark).
45+
public static func watermark(low: Int, high: Int) -> BackPressureStrategy {
46+
return .init(backPressureStrategy: .watermark(low: low, high: high))
47+
}
48+
}
49+
50+
/// The backpressure strategy to be used for message consumption.
51+
/// See ``KafkaConsumerConfiguration/BackPressureStrategy-swift.struct`` for more information.
52+
public var backPressureStrategy: BackPressureStrategy = .watermark(
53+
low: 10,
54+
high: 50
55+
)
56+
2657
/// A struct representing the different Kafka message consumption strategies.
2758
public struct ConsumptionStrategy: Sendable, Hashable {
2859
enum _ConsumptionStrategy: Sendable, Hashable {
@@ -63,7 +94,7 @@ public struct KafkaConsumerConfiguration {
6394
}
6495

6596
/// The strategy used for consuming messages.
66-
/// See ``KafkaConfiguration/ConsumptionStrategy`` for more information.
97+
/// See ``KafkaConsumerConfiguration/ConsumptionStrategy-swift.struct-swift.struct`` for more information.
6798
public var consumptionStrategy: ConsumptionStrategy
6899

69100
// MARK: - Consumer-specific Config Properties

0 commit comments

Comments
 (0)