Skip to content

[KIP-848]: Structure change for DescribeConsumerGroups #327

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion examples/kafkajs/admin/describe-groups.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
// require('kafkajs') is replaced with require('@confluentinc/kafka-javascript').KafkaJS.
const { Kafka, ConsumerGroupStates } = require('@confluentinc/kafka-javascript').KafkaJS;
const { Kafka, ConsumerGroupStates, ConsumerGroupTypes } = require('@confluentinc/kafka-javascript').KafkaJS;
const { parseArgs } = require('node:util');

function printNode(node, prefix = '') {
Expand Down Expand Up @@ -72,6 +72,7 @@ async function adminStart() {
console.log(`\tProtocol type: ${group.protocolType}`);
console.log(`\tPartition assignor: ${group.partitionAssignor}`);
console.log(`\tState: ${group.state}`);
console.log(`\tType: ${group.type}`);
console.log(`\tCoordinator: ${group.coordinator ? group.coordinator.id : group.coordinator}`);
printNode(group.coordinator, '\t');
console.log(`\tAuthorized operations: ${group.authorizedOperations}`);
Expand Down
7 changes: 7 additions & 0 deletions lib/admin.js
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,12 @@ const ConsumerGroupStates = {
EMPTY: 5,
};

const ConsumerGroupTypes = {
UNKNOWN: 0,
CONSUMER: 1,
CLASSIC: 2,
};

/**
* A list of ACL operation types.
* @enum {number}
Expand Down Expand Up @@ -95,6 +101,7 @@ module.exports = {
create: createAdminClient,
createFrom: createAdminClientFrom,
ConsumerGroupStates: Object.freeze(ConsumerGroupStates),
ConsumerGroupTypes: Object.freeze(ConsumerGroupTypes),
AclOperationTypes: Object.freeze(AclOperationTypes),
IsolationLevel: Object.freeze(IsolationLevel),
OffsetSpec,
Expand Down
8 changes: 8 additions & 0 deletions lib/kafkajs/_admin.js
Original file line number Diff line number Diff line change
Expand Up @@ -1009,6 +1009,14 @@ module.exports = {
* @see RdKafka.ConsumerGroupStates
*/
ConsumerGroupStates: RdKafka.AdminClient.ConsumerGroupStates,
/**
* A list of consumer group types.
* @enum {number}
* @readonly
* @memberof KafkaJS
* @see RdKafka.ConsumerGroupTypes
Copy link
Preview

Copilot AI May 26, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The @see tag references RdKafka.ConsumerGroupTypes but the enum is on RdKafka.AdminClient.ConsumerGroupTypes. Update the annotation to match the actual export path.

Suggested change
* @see RdKafka.ConsumerGroupTypes
* @see RdKafka.AdminClient.ConsumerGroupTypes

Copilot uses AI. Check for mistakes.

*/
ConsumerGroupTypes: RdKafka.AdminClient.ConsumerGroupTypes,
/**
* A list of ACL operation types.
* @enum {number}
Expand Down
3 changes: 2 additions & 1 deletion lib/kafkajs/_kafka.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
const { Producer, CompressionTypes } = require('./_producer');
const { Consumer, PartitionAssigners } = require('./_consumer');
const { Admin, ConsumerGroupStates, AclOperationTypes, IsolationLevel } = require('./_admin');
const { Admin, ConsumerGroupStates, ConsumerGroupTypes, AclOperationTypes, IsolationLevel } = require('./_admin');
const error = require('./_error');
const { logLevel, checkIfKafkaJsKeysPresent, CompatibilityErrorMessages } = require('./_common');

Expand Down Expand Up @@ -119,6 +119,7 @@ module.exports = {
PartitionAssigners,
PartitionAssignors: PartitionAssigners,
CompressionTypes,
ConsumerGroupTypes,
ConsumerGroupStates,
AclOperationTypes,
IsolationLevel};
1 change: 1 addition & 0 deletions lib/rdkafka.js
Original file line number Diff line number Diff line change
Expand Up @@ -39,5 +39,6 @@ module.exports = {
IsolationLevel: Admin.IsolationLevel,
OffsetSpec: Admin.OffsetSpec,
ConsumerGroupStates: Admin.ConsumerGroupStates,
ConsumerGroupTypes: Admin.ConsumerGroupTypes,
AclOperationTypes: Admin.AclOperationTypes,
};
24 changes: 24 additions & 0 deletions src/common.cc
Original file line number Diff line number Diff line change
Expand Up @@ -980,6 +980,9 @@ v8::Local<v8::Object> FromMemberDescription(
assignment: {
topicPartitions: TopicPartition[]
},
targetAssignment?: {
topicPartitions: TopicPartition[]
}
}
*/
v8::Local<v8::Object> returnObject = Nan::New<v8::Object>();
Expand Down Expand Up @@ -1028,6 +1031,23 @@ v8::Local<v8::Object> FromMemberDescription(
Nan::Set(returnObject, Nan::New("assignment").ToLocalChecked(),
assignmentObject);

// targetAssignment
const rd_kafka_MemberAssignment_t* target_assignment =
rd_kafka_MemberDescription_target_assignment(member);
if (target_assignment) {
const rd_kafka_topic_partition_list_t* target_partitions =
rd_kafka_MemberAssignment_partitions(target_assignment);
v8::Local<v8::Array> targetTopicPartitions =
Conversion::TopicPartition::ToTopicPartitionV8Array(
target_partitions, false);
v8::Local<v8::Object> targetAssignmentObject = Nan::New<v8::Object>();
Nan::Set(targetAssignmentObject,
Nan::New("topicPartitions").ToLocalChecked(),
targetTopicPartitions);
Nan::Set(returnObject, Nan::New("targetAssignment").ToLocalChecked(),
targetAssignmentObject);
}

return returnObject;
}

Expand Down Expand Up @@ -1105,6 +1125,10 @@ v8::Local<v8::Object> FromConsumerGroupDescription(
Nan::Set(returnObject, Nan::New("state").ToLocalChecked(),
Nan::New<v8::Number>(rd_kafka_ConsumerGroupDescription_state(desc)));

// type
Nan::Set(returnObject, Nan::New("type").ToLocalChecked(),
Nan::New<v8::Number>(rd_kafka_ConsumerGroupDescription_type(desc)));

// coordinator
const rd_kafka_Node_t* coordinator =
rd_kafka_ConsumerGroupDescription_coordinator(desc);
Expand Down
4 changes: 3 additions & 1 deletion test/promisified/admin/describe_groups.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ const {
createAdmin,
sleep,
} = require('../testhelpers');
const { ConsumerGroupStates, ErrorCodes, AclOperationTypes } = require('../../../lib').KafkaJS;
const { ConsumerGroupStates, ConsumerGroupTypes, ErrorCodes, AclOperationTypes } = require('../../../lib').KafkaJS;

describe('Admin > describeGroups', () => {
let topicName, groupId, consumer, admin, groupInstanceId, producer;
Expand Down Expand Up @@ -86,6 +86,7 @@ describe('Admin > describeGroups', () => {
isSimpleConsumerGroup: false,
protocolType: 'consumer',
state: ConsumerGroupStates.STABLE,
type: ConsumerGroupTypes.CLASSIC,
coordinator: expect.objectContaining({
id: expect.any(Number),
host: expect.any(String),
Expand Down Expand Up @@ -137,6 +138,7 @@ describe('Admin > describeGroups', () => {
protocol: '',
partitionAssignor: '',
state: ConsumerGroupStates.EMPTY,
type: ConsumerGroupTypes.CLASSIC,
protocolType: 'consumer',
isSimpleConsumerGroup: false,
coordinator: expect.objectContaining({
Expand Down
8 changes: 8 additions & 0 deletions types/rdkafka.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -352,6 +352,12 @@ export enum ConsumerGroupStates {
EMPTY = 5,
}

export enum ConsumerGroupTypes {
UNKNOWN = 0,
CONSUMER = 1,
CLASSIC = 2,
}

export interface GroupOverview {
groupId: string;
protocolType: string;
Expand Down Expand Up @@ -383,6 +389,7 @@ export type MemberDescription = {
memberMetadata: Buffer
groupInstanceId?: string,
assignment: TopicPartition[]
targetAssignment?: TopicPartition[]
}

export type Node = {
Expand All @@ -407,6 +414,7 @@ export type GroupDescription = {
protocolType: string
partitionAssignor: string
state: ConsumerGroupStates
type: ConsumerGroupTypes
coordinator: Node
authorizedOperations?: AclOperationTypes[]
}
Expand Down