Skip to content

Add highwatermark offset to eachBatch callback #317

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 4 commits into
base: master
Choose a base branch
from

Conversation

milindl
Copy link
Contributor

@milindl milindl commented May 16, 2025

Fixes #282.

@Copilot Copilot AI review requested due to automatic review settings May 16, 2025 07:59
@milindl milindl requested review from a team as code owners May 16, 2025 07:59
Copy link

@Copilot Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This PR adds support for exposing the high watermark offset and lag information in the eachBatch callback, including updating tests and documentation.

  • Updates consumer batch payload computation to include highWatermark, offsetLag, and offsetLagLow.
  • Enhances tests to verify the correct reporting of these values for multiple partitions.
  • Updates examples and the CHANGELOG to reflect these changes.

Reviewed Changes

Copilot reviewed 5 out of 5 changed files in this pull request and generated 1 comment.

File Description
test/promisified/consumer/consumeMessages.spec.js Adds a new test case to validate highWatermark and lag values.
lib/kafkajs/_consumer.js Updates batch payload construction with highWatermark and lag calculations.
examples/typescript/kafkajs.ts Adds configuration for auto.offset.reset.
CHANGELOG.md Documents new features added in this release.

@milindl milindl requested a review from Copilot May 16, 2025 08:00
@airlock-confluentinc airlock-confluentinc bot force-pushed the dev_add_highwatermark_to_batch branch from 3f6861d to 2c473e1 Compare May 16, 2025 08:00
@airlock-confluentinc airlock-confluentinc bot force-pushed the dev_add_highwatermark_to_batch branch from 2c473e1 to 297cfa5 Compare May 16, 2025 08:01
Copy link

@Copilot Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

Adds support for reporting the high watermark and lag metrics in each batch callback and updates the consumer implementation and tests accordingly.

  • Fetches and exposes highWatermark, offsetLag(), and offsetLagLow() in eachBatch
  • Updates the internal consumer to call getWatermarkOffsets and calculate lag
  • Adds tests and example usage demonstrating the new metrics

Reviewed Changes

Copilot reviewed 5 out of 5 changed files in this pull request and generated 1 comment.

File Description
test/promisified/consumer/consumeMessages.spec.js New test case validating highWatermark and lag values
lib/kafkajs/_consumer.js Batch payload now fetches watermark offsets and computes lag
examples/typescript/kafkajs.ts Example consumer config updated with offset reset option
CHANGELOG.md Notes the new highWatermark and lag enhancements (v1.4.0)
Comments suppressed due to low confidence (3)

test/promisified/consumer/consumeMessages.spec.js:920

  • The test waits for new messages but doesn’t assert that batch.highWatermark or lag metrics are updated after the second send; consider adding explicit assertions for the updated watermark and lag.
await waitFor(() => (messagesConsumed === (partition0ProducedMessages + messages1.length + messages2.length)), () => { }, 100);

lib/kafkajs/_consumer.js:866

  • getWatermarkOffsets is likely asynchronous; you need to await its result and mark #createBatchPayload as async to ensure correct watermark values.
const watermarkOffsets = this.#internalClient.getWatermarkOffsets(topic, partition);

examples/typescript/kafkajs.ts:47

  • [nitpick] The option 'auto.offset.reset' may not be recognized at this level; document or move it into the correct consumer config namespace or use fromBeginning: true instead.
'auto.offset.reset': 'earliest',

highWatermark = watermarkOffsets.highOffset.toString();
/* While calculating lag, we subtract 1 from the high offset
* for compatibility reasons with KafkaJS's API */
offsetLag_ = (watermarkOffsets.highOffset - 1) - messages[messages.length - 1].offset;
Copy link
Preview

Copilot AI May 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Calling getWatermarkOffsets on every batch may incur extra broker requests; consider caching per-partition watermarks or fetching them less frequently.

Copilot uses AI. Check for mistakes.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For reference to other reviewers: getWatermarkOffsets is locally cached and does not have a discernible impact on performance.

@cprovencher cprovencher added rebase and removed rebase labels May 16, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

batch highWatermark always returns -1001
2 participants