diff --git a/.semaphore/semaphore.yml b/.semaphore/semaphore.yml index d25079c9..dac27212 100644 --- a/.semaphore/semaphore.yml +++ b/.semaphore/semaphore.yml @@ -102,10 +102,18 @@ blocks: commands: - make test - artifact push workflow coverage/mocha/coverage-final.json --destination "mocha-coverage.json" - - name: "Promisified Tests" + - name: "Promisified Tests (Classic Protocol)" commands: - '[[ -z $DOCKERHUB_APIKEY ]] || docker login --username $DOCKERHUB_USER --password $DOCKERHUB_APIKEY' - - docker compose up -d && sleep 30 + - docker compose -f test/docker/docker-compose.yml up -d && sleep 30 + - export NODE_OPTIONS='--max-old-space-size=1536' + - npx jest --no-colors --ci test/promisified/ + - artifact push workflow coverage/jest/coverage-final.json --destination "jest-coverage.json" + - name: "Promisified Tests (Consumer Protocol)" + commands: + - '[[ -z $DOCKERHUB_APIKEY ]] || docker login --username $DOCKERHUB_USER --password $DOCKERHUB_APIKEY' + - docker compose -f test/docker/docker-compose-kraft.yml up -d && sleep 30 + - export TEST_CONSUMER_GROUP_PROTOCOL=consumer - export NODE_OPTIONS='--max-old-space-size=1536' - npx jest --no-colors --ci test/promisified/ - artifact push workflow coverage/jest/coverage-final.json --destination "jest-coverage.json" @@ -163,10 +171,10 @@ blocks: - export BUILD_LIBRDKAFKA=0 - npm run install-from-source jobs: - - name: "Performance Test" + - name: "Performance Test (Classic Protocol)" commands: - '[[ -z $DOCKERHUB_APIKEY ]] || docker login --username $DOCKERHUB_USER --password $DOCKERHUB_APIKEY' - - docker compose up -d && sleep 30 + - docker compose -f test/docker/docker-compose.yml up -d && sleep 30 - export NODE_OPTIONS='--max-old-space-size=1536' - cd examples/performance - npm install diff --git a/lib/kafkajs/_consumer.js b/lib/kafkajs/_consumer.js index 8de46c6e..27cbebb9 100644 --- a/lib/kafkajs/_consumer.js +++ b/lib/kafkajs/_consumer.js @@ -474,7 +474,7 @@ class Consumer { } } - #kafkaJSToConsumerConfig(kjsConfig) { + #kafkaJSToConsumerConfig(kjsConfig, isClassicProtocol = true) { if (!kjsConfig || Object.keys(kjsConfig).length === 0) { return {}; } @@ -498,26 +498,46 @@ class Consumer { } if (Object.hasOwn(kjsConfig, 'partitionAssignors')) { + if (!isClassicProtocol) { + throw new error.KafkaJSError( + "partitionAssignors is not supported when group.protocol is not 'classic'.", + { code: error.ErrorCodes.ERR__INVALID_ARG } + ); + } if (!Array.isArray(kjsConfig.partitionAssignors)) { throw new error.KafkaJSError(CompatibilityErrorMessages.partitionAssignors(), { code: error.ErrorCodes.ERR__INVALID_ARG }); } - kjsConfig.partitionAssignors.forEach(assignor => { if (typeof assignor !== 'string') throw new error.KafkaJSError(CompatibilityErrorMessages.partitionAssignors(), { code: error.ErrorCodes.ERR__INVALID_ARG }); }); - rdKafkaConfig['partition.assignment.strategy'] = kjsConfig.partitionAssignors.join(','); - } else { + } else if (isClassicProtocol) { rdKafkaConfig['partition.assignment.strategy'] = PartitionAssigners.roundRobin; } if (Object.hasOwn(kjsConfig, 'sessionTimeout')) { + if (!isClassicProtocol) { + throw new error.KafkaJSError( + "sessionTimeout is not supported when group.protocol is not 'classic'.", + { code: error.ErrorCodes.ERR__INVALID_ARG } + ); + } rdKafkaConfig['session.timeout.ms'] = kjsConfig.sessionTimeout; - } else { + } else if (isClassicProtocol) { rdKafkaConfig['session.timeout.ms'] = 30000; } + if (Object.hasOwn(kjsConfig, 'heartbeatInterval')) { + if (!isClassicProtocol) { + throw new error.KafkaJSError( + "heartbeatInterval is not supported when group.protocol is not 'classic'.", + { code: error.ErrorCodes.ERR__INVALID_ARG } + ); + } + rdKafkaConfig['heartbeat.interval.ms'] = kjsConfig.heartbeatInterval; + } + if (Object.hasOwn(kjsConfig, 'rebalanceTimeout')) { /* In librdkafka, we use the max poll interval as the rebalance timeout as well. */ rdKafkaConfig['max.poll.interval.ms'] = +kjsConfig.rebalanceTimeout; @@ -525,10 +545,6 @@ class Consumer { rdKafkaConfig['max.poll.interval.ms'] = 300000; /* librdkafka default */ } - if (Object.hasOwn(kjsConfig, 'heartbeatInterval')) { - rdKafkaConfig['heartbeat.interval.ms'] = kjsConfig.heartbeatInterval; - } - if (Object.hasOwn(kjsConfig, 'metadataMaxAge')) { rdKafkaConfig['topic.metadata.refresh.interval.ms'] = kjsConfig.metadataMaxAge; } @@ -605,8 +621,10 @@ class Consumer { } #finalizedConfig() { + const protocol = this.#userConfig['group.protocol']; + const isClassicProtocol = protocol === undefined || protocol === 'classic'; /* Creates an rdkafka config based off the kafkaJS block. Switches to compatibility mode if the block exists. */ - let compatibleConfig = this.#kafkaJSToConsumerConfig(this.#userConfig.kafkaJS); + let compatibleConfig = this.#kafkaJSToConsumerConfig(this.#userConfig.kafkaJS, isClassicProtocol); /* There can be multiple different and conflicting config directives for setting the log level: * 1. If there's a kafkaJS block: diff --git a/test/docker/docker-compose-kraft.yml b/test/docker/docker-compose-kraft.yml new file mode 100644 index 00000000..f1976dd3 --- /dev/null +++ b/test/docker/docker-compose-kraft.yml @@ -0,0 +1,14 @@ +services: + kafka: + image: apache/kafka:4.0.0 + restart: unless-stopped + container_name: kafka + ports: + - 9092:29092 + - 9093:29093 + volumes: + - ./kafka_jaas.conf:/etc/kafka/kafka_jaas.conf + - ./kraft/server.properties:/mnt/shared/config/server.properties + environment: + KAFKA_OPTS: "-Djava.security.auth.login.config=/etc/kafka/kafka_jaas.conf" + diff --git a/docker-compose.yml b/test/docker/docker-compose.yml similarity index 100% rename from docker-compose.yml rename to test/docker/docker-compose.yml diff --git a/test/docker/kafka_jaas.conf b/test/docker/kafka_jaas.conf new file mode 100644 index 00000000..b241ce22 --- /dev/null +++ b/test/docker/kafka_jaas.conf @@ -0,0 +1,13 @@ +KafkaServer { + org.apache.kafka.common.security.plain.PlainLoginModule required + username="admin" + password="admin" + user_admin="admin" + user_testuser="testpass"; +}; + +Client { + org.apache.kafka.common.security.plain.PlainLoginModule required + username="admin" + password="admin"; +}; diff --git a/test/docker/kraft/server.properties b/test/docker/kraft/server.properties new file mode 100644 index 00000000..70bfb052 --- /dev/null +++ b/test/docker/kraft/server.properties @@ -0,0 +1,31 @@ +broker.id=0 +port=9092 +reserved.broker.max.id=65536 +listeners=PLAINTEXT://:9092,CONTROLLER://:38705,SASL_PLAINTEXT://:9093,DOCKER://:29092,DOCKER_SASL_PLAINTEXT://:29093 +advertised.listeners=PLAINTEXT://kafka:9092,SASL_PLAINTEXT://kafka:9093,DOCKER://localhost:9092,DOCKER_SASL_PLAINTEXT://localhost:9093 +num.partitions=4 +auto.create.topics.enable=true +delete.topic.enable=true +default.replication.factor=1 +offsets.topic.replication.factor=1 +transaction.state.log.replication.factor=1 +transaction.state.log.min.isr=1 +security.inter.broker.protocol=SASL_PLAINTEXT +sasl.mechanism.controller.protocol=PLAIN +sasl.mechanism.inter.broker.protocol=PLAIN +super.users=User:admin +allow.everyone.if.no.acl.found=true + +broker.rack=RACK1 +replica.selector.class=org.apache.kafka.common.replica.RackAwareReplicaSelector +group.coordinator.rebalance.protocols=classic,consumer +connections.max.reauth.ms=10000 +log.retention.bytes=1000000000 +process.roles=broker,controller +controller.listener.names=CONTROLLER +listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL,CONTROLLER:SASL_PLAINTEXT,DOCKER:PLAINTEXT,DOCKER_SASL_PLAINTEXT:SASL_PLAINTEXT +authorizer.class.name=org.apache.kafka.metadata.authorizer.StandardAuthorizer +sasl.enabled.mechanisms=PLAIN +controller.quorum.voters=0@kafka:38705 +group.consumer.min.session.timeout.ms =6000 +group.consumer.session.timeout.ms = 10000 diff --git a/test/promisified/admin/describe_groups.spec.js b/test/promisified/admin/describe_groups.spec.js index d70daf70..bcaeebd3 100644 --- a/test/promisified/admin/describe_groups.spec.js +++ b/test/promisified/admin/describe_groups.spec.js @@ -1,6 +1,7 @@ jest.setTimeout(30000); const { + testConsumerGroupProtocolClassic, createConsumer, createProducer, secureRandom, @@ -66,6 +67,9 @@ describe('Admin > describeGroups', () => { }); it('should describe consumer groups', async () => { + if (testConsumerGroupProtocolClassic()) { + return ; + } let messagesConsumed = 0; await consumer.connect(); @@ -81,8 +85,8 @@ describe('Admin > describeGroups', () => { expect(describeGroupsResult.groups[0]).toEqual( expect.objectContaining({ groupId, - protocol: 'roundrobin', - partitionAssignor: 'roundrobin', + protocol: expect.any(String), + partitionAssignor: expect.any(String), isSimpleConsumerGroup: false, protocolType: 'consumer', state: ConsumerGroupStates.STABLE, @@ -134,8 +138,8 @@ describe('Admin > describeGroups', () => { expect(describeGroupsResult.groups[0]).toEqual( expect.objectContaining({ groupId, - protocol: '', - partitionAssignor: '', + protocol: expect.any(String), + partitionAssignor: expect.any(String), state: ConsumerGroupStates.EMPTY, protocolType: 'consumer', isSimpleConsumerGroup: false, diff --git a/test/promisified/consumer/groupInstanceId.spec.js b/test/promisified/consumer/groupInstanceId.spec.js index bf814000..a8d9a688 100644 --- a/test/promisified/consumer/groupInstanceId.spec.js +++ b/test/promisified/consumer/groupInstanceId.spec.js @@ -3,6 +3,7 @@ jest.setTimeout(30000); const { waitFor, secureRandom, createTopic, + testConsumerGroupProtocolClassic, createConsumer, sleep, } = require("../testhelpers"); const { ErrorCodes } = require('../../../lib').KafkaJS; @@ -128,9 +129,12 @@ describe('Consumer with static membership', () => { expect(consumer2.assignment().length).toBe(1); await waitFor(() => consumer2.assignment().length === 2, () => null, 1000); + expect(consumer2.assignment().length).toBe(2); - expect(assigns).toBe(2); - expect(revokes).toBe(1); + if (testConsumerGroupProtocolClassic()) { + expect(assigns).toBe(2); + expect(revokes).toBe(1); + } await consumer2.disconnect(); }); diff --git a/test/promisified/consumer/incrementalRebalance.spec.js b/test/promisified/consumer/incrementalRebalance.spec.js index 61647ad4..1e5e6bcf 100644 --- a/test/promisified/consumer/incrementalRebalance.spec.js +++ b/test/promisified/consumer/incrementalRebalance.spec.js @@ -3,7 +3,8 @@ jest.setTimeout(30000); const { waitFor, secureRandom, createTopic, - createConsumer, } = require("../testhelpers"); + createConsumer, + testConsumerGroupProtocolClassic } = require("../testhelpers"); const { PartitionAssigners, ErrorCodes } = require('../../../lib').KafkaJS; describe('Consumer > incremental rebalance', () => { @@ -12,7 +13,9 @@ describe('Consumer > incremental rebalance', () => { const consumerConfig = { groupId, - partitionAssigners: [PartitionAssigners.cooperativeSticky], + ...(testConsumerGroupProtocolClassic() && { + partitionAssigners: [PartitionAssigners.cooperativeSticky], + }), }; beforeEach(async () => { diff --git a/test/promisified/testhelpers.js b/test/promisified/testhelpers.js index a642a5bf..05a205c2 100644 --- a/test/promisified/testhelpers.js +++ b/test/promisified/testhelpers.js @@ -15,6 +15,15 @@ const clusterInformation = { const debug = process.env.TEST_DEBUG; +function testConsumerGroupProtocol() { + return process.env.TEST_CONSUMER_GROUP_PROTOCOL ?? null; +} + +function testConsumerGroupProtocolClassic() { + const protocol = testConsumerGroupProtocol(); + return protocol === null || protocol === "classic"; +} + function makeConfig(config, common) { const kafkaJS = Object.assign(config, clusterInformation.kafkaJS); if (debug) { @@ -27,6 +36,34 @@ function makeConfig(config, common) { } function createConsumer(config, common = {}) { + const protocol = testConsumerGroupProtocol(); + if (protocol !== null && !('group.protocol' in common)) { + common['group.protocol'] = protocol; + } + if (!testConsumerGroupProtocolClassic()) { + const forbiddenProperties = [ + "session.timeout.ms", + "partition.assignment.strategy", + "heartbeat.interval.ms", + "group.protocol.type" + ]; + const forbiddenPropertiesKafkaJS = [ + "sessionTimeout", + "partitionAssignors", + "partitionAssigners", + "heartbeatInterval" + ]; + for (const prop of forbiddenProperties) { + if (prop in common) { + delete common[prop]; + } + } + for (const prop of forbiddenPropertiesKafkaJS) { + if (prop in config) { + delete config[prop]; + } + } + } const kafka = new Kafka(makeConfig(config, common)); return kafka.consumer(); } @@ -129,6 +166,7 @@ class SequentialPromises { } module.exports = { + testConsumerGroupProtocolClassic, createConsumer, createProducer, createAdmin,