From 269edf3339087ca907b45ae4d20f210bbb61fe6a Mon Sep 17 00:00:00 2001 From: Milind L Date: Fri, 16 May 2025 13:24:21 +0530 Subject: [PATCH 1/4] Add highwatermark offsets to eachBatch callback --- CHANGELOG.md | 9 +++ examples/typescript/kafkajs.ts | 1 + lib/kafkajs/_consumer.js | 18 +++++- .../consumer/consumeMessages.spec.js | 62 +++++++++++++++++++ types/kafkajs.d.ts | 2 + 5 files changed, 89 insertions(+), 3 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index dd2bbcc9..fd6808a3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,12 @@ +# confluent-kafka-javascript v1.4.0 + +v1.4.0 is a feature release. It is supported for all usage. + +## Enhancements + +1. Adds support for `highWatermark`, `offsetLag()`, and `offsetLagLow()` in `eachBatch` callback (#). + + # confluent-kafka-javascript v1.3.0 v1.3.0 is a feature release. It is supported for all usage. diff --git a/examples/typescript/kafkajs.ts b/examples/typescript/kafkajs.ts index faf11afb..673819f8 100644 --- a/examples/typescript/kafkajs.ts +++ b/examples/typescript/kafkajs.ts @@ -44,6 +44,7 @@ async function runConsumer() { }); const consumer = kafka.consumer({ + 'auto.offset.reset': 'earliest', kafkaJS: { groupId: 'test-group' + Math.random(), fromBeginning: true, diff --git a/lib/kafkajs/_consumer.js b/lib/kafkajs/_consumer.js index c6410547..d13fb7c7 100644 --- a/lib/kafkajs/_consumer.js +++ b/lib/kafkajs/_consumer.js @@ -863,6 +863,18 @@ class Consumer { #createBatchPayload(messages) { const topic = messages[0].topic; const partition = messages[0].partition; + const watermarkOffsets = this.#internalClient.getWatermarkOffsets(topic, partition); + let highWatermark = '-1001'; + let offsetLag_ = -1; + let offsetLagLow_ = -1; + + if (Number.isInteger(watermarkOffsets.highOffset)) { + highWatermark = watermarkOffsets.highOffset.toString(); + /* While calculating lag, we subtract 1 from the high offset + * for compability reasons with KafkaJS's API */ + offsetLag_ = (watermarkOffsets.highOffset - 1) - messages[messages.length - 1].offset; + offsetLagLow_ = (watermarkOffsets.highOffset - 1) - messages[0].offset; + } const messagesConverted = []; for (let i = 0; i < messages.length; i++) { @@ -892,13 +904,13 @@ class Consumer { const batch = { topic, partition, - highWatermark: '-1001', /* We don't fetch it yet. We can call committed() to fetch it but that might incur network calls. */ + highWatermark, messages: messagesConverted, isEmpty: () => false, firstOffset: () => (messagesConverted[0].offset).toString(), lastOffset: () => (messagesConverted[messagesConverted.length - 1].offset).toString(), - offsetLag: () => notImplemented(), - offsetLagLow: () => notImplemented(), + offsetLag: () => offsetLag_.toString(), + offsetLagLow: () => offsetLagLow_.toString(), }; const returnPayload = { diff --git a/test/promisified/consumer/consumeMessages.spec.js b/test/promisified/consumer/consumeMessages.spec.js index 93a3e694..8b44d894 100644 --- a/test/promisified/consumer/consumeMessages.spec.js +++ b/test/promisified/consumer/consumeMessages.spec.js @@ -858,4 +858,66 @@ describe.each(cases)('Consumer - partitionsConsumedConcurrently = %s -', (partit await waitFor(() => (messagesConsumed === messages.length || batchesCountExceeds1), () => { }, 100); expect(batchesCountExceeds1).toBe(false); }); + + it('shows the correct high watermark and lag for partition', async () => { + await producer.connect(); + + let messages0 = Array(10).fill().map(() => { + const value = secureRandom(); + return { value: `value-${value}`, partition: 0 }; + }); + let partition0ProducedMessages = messages0.length; + + const messages1 = Array(5).fill().map(() => { + const value = secureRandom(); + return { value: `value-${value}`, partition: 1 }; + }); + + const messages2 = Array(2).fill().map(() => { + const value = secureRandom(); + return { value: `value-${value}`, partition: 2 }; + }); + + for (const messages of [messages0, messages1, messages2]) { + await producer.send({ + topic: topicName, + messages: messages, + }); + } + + await consumer.connect(); + await consumer.subscribe({ topic: topicName }); + + let messagesConsumed = 0; + consumer.run({ + partitionsConsumedConcurrently, + eachBatch: async ({ batch }) => { + if (batch.partition === 0) { + expect(batch.highWatermark).toEqual(String(partition0ProducedMessages)); + } else if (batch.partition === 1) { + expect(batch.highWatermark).toEqual(String(messages1.length)); + } else if (batch.partition === 2) { + expect(batch.highWatermark).toEqual(String(messages2.length)); + } + expect(batch.offsetLag()).toEqual(String(+batch.highWatermark - 1 - +batch.lastOffset())); + expect(batch.offsetLagLow()).toEqual(String(+batch.highWatermark - 1 - +batch.firstOffset())); + messagesConsumed += batch.messages.length; + } + }); + await waitFor(() => (messagesConsumed === (partition0ProducedMessages + messages1.length + messages2.length)), () => { }, 100); + + /* Add some more messages to partition 0 to make sure high watermark is updated. */ + messages0 = Array(15).fill().map(() => { + const value = secureRandom(); + return { value: `value-${value}`, partition: 0 }; + }); + partition0ProducedMessages += messages0.length; + await producer.send({ + topic: topicName, + messages: messages0, + }); + + await waitFor(() => (messagesConsumed === (partition0ProducedMessages + messages1.length + messages2.length)), () => { }, 100); + }); + }); diff --git a/types/kafkajs.d.ts b/types/kafkajs.d.ts index 58d03a5b..3177fb84 100644 --- a/types/kafkajs.d.ts +++ b/types/kafkajs.d.ts @@ -269,6 +269,8 @@ export type Batch = { isEmpty(): boolean firstOffset(): string | null lastOffset(): string + offsetLag(): string + offsetLagLow(): string } export type KafkaMessage = MessageSetEntry | RecordBatchEntry From 297cfa5c639bd5e5246a1902d036f16c26938ca5 Mon Sep 17 00:00:00 2001 From: Milind L Date: Fri, 16 May 2025 13:29:38 +0530 Subject: [PATCH 2/4] Update changelog --- CHANGELOG.md | 2 +- examples/typescript/kafkajs.ts | 1 - lib/kafkajs/_consumer.js | 2 +- 3 files changed, 2 insertions(+), 3 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index fd6808a3..969df2d0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,7 +4,7 @@ v1.4.0 is a feature release. It is supported for all usage. ## Enhancements -1. Adds support for `highWatermark`, `offsetLag()`, and `offsetLagLow()` in `eachBatch` callback (#). +1. Adds support for `highWatermark`, `offsetLag()`, and `offsetLagLow()` in `eachBatch` callback (#317). # confluent-kafka-javascript v1.3.0 diff --git a/examples/typescript/kafkajs.ts b/examples/typescript/kafkajs.ts index 673819f8..faf11afb 100644 --- a/examples/typescript/kafkajs.ts +++ b/examples/typescript/kafkajs.ts @@ -44,7 +44,6 @@ async function runConsumer() { }); const consumer = kafka.consumer({ - 'auto.offset.reset': 'earliest', kafkaJS: { groupId: 'test-group' + Math.random(), fromBeginning: true, diff --git a/lib/kafkajs/_consumer.js b/lib/kafkajs/_consumer.js index d13fb7c7..5c4975b3 100644 --- a/lib/kafkajs/_consumer.js +++ b/lib/kafkajs/_consumer.js @@ -871,7 +871,7 @@ class Consumer { if (Number.isInteger(watermarkOffsets.highOffset)) { highWatermark = watermarkOffsets.highOffset.toString(); /* While calculating lag, we subtract 1 from the high offset - * for compability reasons with KafkaJS's API */ + * for compatibility reasons with KafkaJS's API */ offsetLag_ = (watermarkOffsets.highOffset - 1) - messages[messages.length - 1].offset; offsetLagLow_ = (watermarkOffsets.highOffset - 1) - messages[0].offset; } From ea91e6ec0e77a703e1a80d5cd54841e1aa3a7d5c Mon Sep 17 00:00:00 2001 From: Milind L Date: Fri, 16 May 2025 13:35:05 +0530 Subject: [PATCH 3/4] Add additional error handling --- lib/kafkajs/_consumer.js | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/lib/kafkajs/_consumer.js b/lib/kafkajs/_consumer.js index 5c4975b3..20715247 100644 --- a/lib/kafkajs/_consumer.js +++ b/lib/kafkajs/_consumer.js @@ -863,11 +863,18 @@ class Consumer { #createBatchPayload(messages) { const topic = messages[0].topic; const partition = messages[0].partition; - const watermarkOffsets = this.#internalClient.getWatermarkOffsets(topic, partition); + let watermarkOffsets = {}; let highWatermark = '-1001'; let offsetLag_ = -1; let offsetLagLow_ = -1; + try { + watermarkOffsets = this.#internalClient.getWatermarkOffsets(topic, partition); + } catch (e) { + /* Do nothing but logging. The batch as a whole remains valid but for the facts that highwatermark won't be there. */ + this.#logger.error(`Error while getting watermark offsets: ${e}`, this.#createConsumerBindingMessageMetadata()); + } + if (Number.isInteger(watermarkOffsets.highOffset)) { highWatermark = watermarkOffsets.highOffset.toString(); /* While calculating lag, we subtract 1 from the high offset From 40aec5fe1141c59223aef94e03192351e3c316d5 Mon Sep 17 00:00:00 2001 From: Milind L Date: Fri, 16 May 2025 13:52:03 +0530 Subject: [PATCH 4/4] Change error to warning --- lib/kafkajs/_consumer.js | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/lib/kafkajs/_consumer.js b/lib/kafkajs/_consumer.js index 20715247..01a7352a 100644 --- a/lib/kafkajs/_consumer.js +++ b/lib/kafkajs/_consumer.js @@ -871,8 +871,8 @@ class Consumer { try { watermarkOffsets = this.#internalClient.getWatermarkOffsets(topic, partition); } catch (e) { - /* Do nothing but logging. The batch as a whole remains valid but for the facts that highwatermark won't be there. */ - this.#logger.error(`Error while getting watermark offsets: ${e}`, this.#createConsumerBindingMessageMetadata()); + /* Only warn. The batch as a whole remains valid but for the fact that the highwatermark won't be there. */ + this.#logger.warn(`Could not get watermark offsets for batch: ${e}`, this.#createConsumerBindingMessageMetadata()); } if (Number.isInteger(watermarkOffsets.highOffset)) {