From 66883070cf36fde81e1ea0b4eee100799b9c1ec5 Mon Sep 17 00:00:00 2001 From: PratRanj07 Date: Mon, 26 May 2025 09:49:12 +0530 Subject: [PATCH 1/3] Added new fields in DescribeConsumerGroup --- examples/kafkajs/admin/describe-groups.js | 2 +- lib/admin.js | 7 +++++++ lib/kafkajs/_admin.js | 8 ++++++++ lib/kafkajs/_kafka.js | 3 ++- lib/rdkafka.js | 1 + src/common.cc | 25 +++++++++++++++++++++++ types/rdkafka.d.ts | 8 ++++++++ 7 files changed, 52 insertions(+), 2 deletions(-) diff --git a/examples/kafkajs/admin/describe-groups.js b/examples/kafkajs/admin/describe-groups.js index b0317254..27ba2dea 100644 --- a/examples/kafkajs/admin/describe-groups.js +++ b/examples/kafkajs/admin/describe-groups.js @@ -1,5 +1,5 @@ // require('kafkajs') is replaced with require('@confluentinc/kafka-javascript').KafkaJS. -const { Kafka, ConsumerGroupStates } = require('@confluentinc/kafka-javascript').KafkaJS; +const { Kafka, ConsumerGroupStates, ConsumerGroupTypes } = require('@confluentinc/kafka-javascript').KafkaJS; const { parseArgs } = require('node:util'); function printNode(node, prefix = '') { diff --git a/lib/admin.js b/lib/admin.js index bde01f72..5bd8d25d 100644 --- a/lib/admin.js +++ b/lib/admin.js @@ -29,6 +29,12 @@ const ConsumerGroupStates = { EMPTY: 5, }; +const ConsumerGroupTypes = { + UNKNOWN: 0, + CONSUMER: 1, + CLASSIC: 2, +}; + /** * A list of ACL operation types. * @enum {number} @@ -95,6 +101,7 @@ module.exports = { create: createAdminClient, createFrom: createAdminClientFrom, ConsumerGroupStates: Object.freeze(ConsumerGroupStates), + ConsumerGroupTypes: Object.freeze(ConsumerGroupTypes), AclOperationTypes: Object.freeze(AclOperationTypes), IsolationLevel: Object.freeze(IsolationLevel), OffsetSpec, diff --git a/lib/kafkajs/_admin.js b/lib/kafkajs/_admin.js index 9011ed5f..89d22e7d 100644 --- a/lib/kafkajs/_admin.js +++ b/lib/kafkajs/_admin.js @@ -1009,6 +1009,14 @@ module.exports = { * @see RdKafka.ConsumerGroupStates */ ConsumerGroupStates: RdKafka.AdminClient.ConsumerGroupStates, + /** + * A list of consumer group types. + * @enum {number} + * @readonly + * @memberof KafkaJS + * @see RdKafka.ConsumerGroupTypes + */ + ConsumerGroupTypes: RdKafka.AdminClient.ConsumerGroupTypes, /** * A list of ACL operation types. * @enum {number} diff --git a/lib/kafkajs/_kafka.js b/lib/kafkajs/_kafka.js index e671d94a..d0e19b06 100644 --- a/lib/kafkajs/_kafka.js +++ b/lib/kafkajs/_kafka.js @@ -1,6 +1,6 @@ const { Producer, CompressionTypes } = require('./_producer'); const { Consumer, PartitionAssigners } = require('./_consumer'); -const { Admin, ConsumerGroupStates, AclOperationTypes, IsolationLevel } = require('./_admin'); +const { Admin, ConsumerGroupStates, ConsumerGroupTypes, AclOperationTypes, IsolationLevel } = require('./_admin'); const error = require('./_error'); const { logLevel, checkIfKafkaJsKeysPresent, CompatibilityErrorMessages } = require('./_common'); @@ -119,6 +119,7 @@ module.exports = { PartitionAssigners, PartitionAssignors: PartitionAssigners, CompressionTypes, + ConsumerGroupTypes, ConsumerGroupStates, AclOperationTypes, IsolationLevel}; diff --git a/lib/rdkafka.js b/lib/rdkafka.js index 5d8a5c30..8c931116 100644 --- a/lib/rdkafka.js +++ b/lib/rdkafka.js @@ -39,5 +39,6 @@ module.exports = { IsolationLevel: Admin.IsolationLevel, OffsetSpec: Admin.OffsetSpec, ConsumerGroupStates: Admin.ConsumerGroupStates, + ConsumerGroupTypes: Admin.ConsumerGroupTypes, AclOperationTypes: Admin.AclOperationTypes, }; diff --git a/src/common.cc b/src/common.cc index e488e02e..5fe5a50c 100644 --- a/src/common.cc +++ b/src/common.cc @@ -980,6 +980,9 @@ v8::Local FromMemberDescription( assignment: { topicPartitions: TopicPartition[] }, + targetAssignment?: { + topicPartitions: TopicPartition[] + } } */ v8::Local returnObject = Nan::New(); @@ -1028,6 +1031,23 @@ v8::Local FromMemberDescription( Nan::Set(returnObject, Nan::New("assignment").ToLocalChecked(), assignmentObject); + // targetAssignment + const rd_kafka_MemberAssignment_t* target_assignment = + rd_kafka_MemberDescription_target_assignment(member); + if (target_assignment) { + const rd_kafka_topic_partition_list_t* target_partitions = + rd_kafka_MemberAssignment_partitions(target_assignment); + v8::Local targetTopicPartitions = + Conversion::TopicPartition::ToTopicPartitionV8Array( + target_partitions, false); + v8::Local targetAssignmentObject = Nan::New(); + Nan::Set(targetAssignmentObject, + Nan::New("topicPartitions").ToLocalChecked(), + targetTopicPartitions); + Nan::Set(returnObject, Nan::New("targetAssignment").ToLocalChecked(), + targetAssignmentObject); + } + return returnObject; } @@ -1105,6 +1125,11 @@ v8::Local FromConsumerGroupDescription( Nan::Set(returnObject, Nan::New("state").ToLocalChecked(), Nan::New(rd_kafka_ConsumerGroupDescription_state(desc))); + // type + Nan::Set(returnObject, Nan::New("type").ToLocalChecked(), + Nan::New(rd_kafka_ConsumerGroupDescription_type(desc)) + .ToLocalChecked()); + // coordinator const rd_kafka_Node_t* coordinator = rd_kafka_ConsumerGroupDescription_coordinator(desc); diff --git a/types/rdkafka.d.ts b/types/rdkafka.d.ts index 097587f9..99a16183 100644 --- a/types/rdkafka.d.ts +++ b/types/rdkafka.d.ts @@ -352,6 +352,12 @@ export enum ConsumerGroupStates { EMPTY = 5, } +export enum ConsumerGroupTypes { + UNKNOWN = 0, + CONSUMER = 1, + CLASSIC = 2, +} + export interface GroupOverview { groupId: string; protocolType: string; @@ -383,6 +389,7 @@ export type MemberDescription = { memberMetadata: Buffer groupInstanceId?: string, assignment: TopicPartition[] + targetAssignment?: TopicPartition[] } export type Node = { @@ -407,6 +414,7 @@ export type GroupDescription = { protocolType: string partitionAssignor: string state: ConsumerGroupStates + type: ConsumerGroupTypes coordinator: Node authorizedOperations?: AclOperationTypes[] } From 1e0ad09c0e8a938f57d24756f59efe4d12a0f062 Mon Sep 17 00:00:00 2001 From: PratRanj07 Date: Mon, 26 May 2025 11:27:09 +0530 Subject: [PATCH 2/3] error debug --- examples/kafkajs/admin/describe-groups.js | 1 + src/common.cc | 3 +-- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/examples/kafkajs/admin/describe-groups.js b/examples/kafkajs/admin/describe-groups.js index 27ba2dea..6c4612d1 100644 --- a/examples/kafkajs/admin/describe-groups.js +++ b/examples/kafkajs/admin/describe-groups.js @@ -72,6 +72,7 @@ async function adminStart() { console.log(`\tProtocol type: ${group.protocolType}`); console.log(`\tPartition assignor: ${group.partitionAssignor}`); console.log(`\tState: ${group.state}`); + console.log(`\tType: ${group.type}`); console.log(`\tCoordinator: ${group.coordinator ? group.coordinator.id : group.coordinator}`); printNode(group.coordinator, '\t'); console.log(`\tAuthorized operations: ${group.authorizedOperations}`); diff --git a/src/common.cc b/src/common.cc index 5fe5a50c..bbd5be9e 100644 --- a/src/common.cc +++ b/src/common.cc @@ -1127,8 +1127,7 @@ v8::Local FromConsumerGroupDescription( // type Nan::Set(returnObject, Nan::New("type").ToLocalChecked(), - Nan::New(rd_kafka_ConsumerGroupDescription_type(desc)) - .ToLocalChecked()); + Nan::New(rd_kafka_ConsumerGroupDescription_type(desc))); // coordinator const rd_kafka_Node_t* coordinator = From 7c59a2f027e6c0f5253c0f68e4538dfabc8a6177 Mon Sep 17 00:00:00 2001 From: PratRanj07 Date: Mon, 26 May 2025 15:43:09 +0530 Subject: [PATCH 3/3] test changes --- test/promisified/admin/describe_groups.spec.js | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/test/promisified/admin/describe_groups.spec.js b/test/promisified/admin/describe_groups.spec.js index d70daf70..bdee03d6 100644 --- a/test/promisified/admin/describe_groups.spec.js +++ b/test/promisified/admin/describe_groups.spec.js @@ -9,7 +9,7 @@ const { createAdmin, sleep, } = require('../testhelpers'); -const { ConsumerGroupStates, ErrorCodes, AclOperationTypes } = require('../../../lib').KafkaJS; +const { ConsumerGroupStates, ConsumerGroupTypes, ErrorCodes, AclOperationTypes } = require('../../../lib').KafkaJS; describe('Admin > describeGroups', () => { let topicName, groupId, consumer, admin, groupInstanceId, producer; @@ -86,6 +86,7 @@ describe('Admin > describeGroups', () => { isSimpleConsumerGroup: false, protocolType: 'consumer', state: ConsumerGroupStates.STABLE, + type: ConsumerGroupTypes.CLASSIC, coordinator: expect.objectContaining({ id: expect.any(Number), host: expect.any(String), @@ -137,6 +138,7 @@ describe('Admin > describeGroups', () => { protocol: '', partitionAssignor: '', state: ConsumerGroupStates.EMPTY, + type: ConsumerGroupTypes.CLASSIC, protocolType: 'consumer', isSimpleConsumerGroup: false, coordinator: expect.objectContaining({