Skip to content

Commit f5e7f3e

Browse files
authored
Refactor to always use a barrier for pending operation (#26)
Readers writer lock with reentrant calls Refactor to always use a barrier for pending operation to avoid problems or regressions with async pause or resume calls that will be introduced later. Uses linked lists for the cache. Removes heap-js code. Test name improvement to run single parametric tests Seek signature and add partitionsConsumedConcurrently to ConsumerRunConfig type definition Make final max poll interval double of configured value so even last message processed before cache cleanup can take that time to process. Fix to restart max poll interval timer on fetch. Marking batch stale after cache clear was requested and max poll interval is reached before it's cleared. Add assignmentLost function to the rebalance callback. Fix to nextN size, version with max.poll.interval.ms applied to each message or batch (only for messages after cache reset) Performance test, removing outliers Start performance timer from first message received after resuming
1 parent 7e09cb7 commit f5e7f3e

22 files changed

+1915
-3551
lines changed

LICENSE.heap-js

-36
This file was deleted.

LICENSE.kafkajs

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
The promisified API (lib/kafkajs) is inspired by kafkajs (github.com/tulios/kafkajs).
22
The promisified tests (test/promisified) are also adapted from there.
3-
An async lock implementation and many error types are also adapted from there.
3+
Many error types are also adapted from there.
44
The license notice is reproduced below.
55

66
----

eslint.config.js

+2-2
Original file line numberDiff line numberDiff line change
@@ -37,12 +37,12 @@ module.exports = ts.config(
3737
{
3838
...js.configs.recommended,
3939
files: ["lib/**/*.js", "test/promisified/**/*.js"],
40-
ignores: ["lib/kafkajs/_heap.js"]
40+
ignores: []
4141
},
4242
{
4343
...ckjsSpecificSettings,
4444
files: ["lib/**/*.js", "test/promisified/**/*.js"],
45-
ignores: ["lib/kafkajs/_heap.js"]
45+
ignores: []
4646
},
4747
{
4848
...jest.configs['flat/recommended'],

examples/performance/performance-primitives-kafkajs.js

+4-2
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,8 @@ async function runConsumer(brokers, topic, totalMessageCnt) {
128128
totalMessageSize += message.value.length;
129129
if (messagesReceived === 1) {
130130
consumer.pause([{ topic }]);
131+
} else if (messagesReceived === 2) {
132+
startTime = hrtime();
131133
} else if (messagesReceived === totalMessageCnt) {
132134
let elapsed = hrtime(startTime);
133135
let durationNanos = elapsed[0] * 1e9 + elapsed[1];
@@ -153,7 +155,6 @@ async function runConsumer(brokers, topic, totalMessageCnt) {
153155
console.log("Starting consumer.")
154156

155157
totalMessageSize = 0;
156-
startTime = hrtime();
157158
consumer.resume([{ topic }]);
158159
await new Promise((resolve) => {
159160
let interval = setInterval(() => {
@@ -203,6 +204,8 @@ async function runConsumeTransformProduce(brokers, consumeTopic, produceTopic, t
203204
totalMessageSize += message.value.length;
204205
if (messagesReceived === 1) {
205206
consumer.pause([{ topic }]);
207+
} else if (messagesReceived === 2) {
208+
startTime = hrtime();
206209
} else if (messagesReceived === totalMessageCnt) {
207210
let elapsed = hrtime(startTime);
208211
let durationNanos = elapsed[0] * 1e9 + elapsed[1];
@@ -228,7 +231,6 @@ async function runConsumeTransformProduce(brokers, consumeTopic, produceTopic, t
228231
console.log("Starting consume-transform-produce.")
229232

230233
totalMessageSize = 0;
231-
startTime = hrtime();
232234
consumer.resume([{ topic: consumeTopic }]);
233235
await new Promise((resolve) => {
234236
let interval = setInterval(() => {

examples/performance/performance-primitives.js

+4-2
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,8 @@ async function runConsumer(brokers, topic, totalMessageCnt) {
134134
totalMessageSize += message.value.length;
135135
if (messagesReceived === 1) {
136136
consumer.pause([{ topic }]);
137+
} else if (messagesReceived === 2) {
138+
startTime = hrtime();
137139
} else if (messagesReceived === totalMessageCnt) {
138140
let elapsed = hrtime(startTime);
139141
let durationNanos = elapsed[0] * 1e9 + elapsed[1];
@@ -159,7 +161,6 @@ async function runConsumer(brokers, topic, totalMessageCnt) {
159161
console.log("Starting consumer.")
160162

161163
totalMessageSize = 0;
162-
startTime = hrtime();
163164
consumer.resume([{ topic }]);
164165
await new Promise((resolve) => {
165166
let interval = setInterval(() => {
@@ -222,6 +223,8 @@ async function runConsumeTransformProduce(brokers, consumeTopic, produceTopic, t
222223
totalMessageSize += message.value.length;
223224
if (messagesReceived === 1) {
224225
consumer.pause([{ topic }]);
226+
} else if (messagesReceived === 2) {
227+
startTime = hrtime();
225228
} else if (messagesReceived === totalMessageCnt) {
226229
let elapsed = hrtime(startTime);
227230
let durationNanos = elapsed[0] * 1e9 + elapsed[1];
@@ -247,7 +250,6 @@ async function runConsumeTransformProduce(brokers, consumeTopic, produceTopic, t
247250
console.log("Starting consume-transform-produce.")
248251

249252
totalMessageSize = 0;
250-
startTime = hrtime();
251253
consumer.resume([{ topic: consumeTopic }]);
252254
await new Promise((resolve) => {
253255
let interval = setInterval(() => {

lib/kafka-consumer.js

+13
Original file line numberDiff line numberDiff line change
@@ -340,6 +340,19 @@ KafkaConsumer.prototype.assignments = function() {
340340
return this._errorWrap(this._client.assignments(), true);
341341
};
342342

343+
/**
344+
* Is current assignment in rebalance callback lost?
345+
*
346+
* @note This method should only be called from within the rebalance callback
347+
* when partitions are revoked.
348+
*
349+
* @return {boolean} true if assignment was lost
350+
*/
351+
352+
KafkaConsumer.prototype.assignmentLost = function() {
353+
return this._client.assignmentLost();
354+
};
355+
343356
/**
344357
* Get the type of rebalance protocol used in the consumer group.
345358
*

0 commit comments

Comments
 (0)