Skip to content

Commit 579ae6a

Browse files
authored
GH-3814: Upgrade Apache Kafka client dependencies to version 4.0.0
Fixes: #3814 Issue link: #3814 This commit upgrades the Apache Kafka client to 4.0.0 with the following changes: - Upgrade Kafka client dependencies to 4.0.0 in build.gradle - Remove ZooKeeper-based broker implementation as Kafka 4.0 fully transitions to KRaft mode - Delete EmbeddedKafkaZKBroker class and related tests - Remove ZooKeeper dependency - Refactor EmbeddedKafkaRule to use KRaft exclusively - Update KafkaClusterTestKit imports to use new packages - Update ConsumerRecords constructor calls to include the new required Map parameter - Add implementations for new Producer interface methods: - registerMetricForSubscription - unregisterMetricFromSubscription - Update KafkaStreamBrancher to use new split() and branch() methods - Remove deprecated partitioner classes from runtime hints - Remove deprecated sendOffsetsToTransaction method that used String consumerGroupId - Update BrokerAddress to use org.apache.kafka.server.network.BrokerEndPoint - Update DeserializationExceptionHandler to use new ErrorHandlerContext The commit also includes test modifications to address limitations with static port assignments in KRaft mode and adjustments to replication factors in tests. Signed-off-by: Soby Chacko <[email protected]> * Addressing PR review --------- Signed-off-by: Soby Chacko <[email protected]>
1 parent 8779cfa commit 579ae6a

File tree

71 files changed

+427
-1461
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

71 files changed

+427
-1461
lines changed

Diff for: build.gradle

+3-5
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ ext {
5959
jaywayJsonPathVersion = '2.9.0'
6060
junit4Version = '4.13.2'
6161
junitJupiterVersion = '5.11.4'
62-
kafkaVersion = '3.8.1'
62+
kafkaVersion = '4.0.0'
6363
kotlinCoroutinesVersion = '1.10.1'
6464
log4jVersion = '2.24.3'
6565
micrometerDocsVersion = '1.0.4'
@@ -355,16 +355,14 @@ project ('spring-kafka-test') {
355355
description = 'Spring Kafka Test Support'
356356

357357
dependencies {
358+
api "org.apache.logging.log4j:log4j-slf4j-impl:$log4jVersion"
358359
api 'org.springframework:spring-context'
359360
api 'org.springframework:spring-test'
360361
api "org.springframework.retry:spring-retry:$springRetryVersion"
361362

362-
api ("org.apache.zookeeper:zookeeper:$zookeeperVersion") {
363-
exclude group: 'org.slf4j', module: 'slf4j-log4j12'
364-
exclude group: 'log4j'
365-
}
366363
api "org.apache.kafka:kafka-clients:$kafkaVersion:test"
367364
api "org.apache.kafka:kafka-server:$kafkaVersion"
365+
api "org.apache.kafka:kafka-test-common-runtime:$kafkaVersion"
368366
api "org.apache.kafka:kafka-metadata:$kafkaVersion"
369367
api "org.apache.kafka:kafka-server-common:$kafkaVersion"
370368
api "org.apache.kafka:kafka-server-common:$kafkaVersion:test"

Diff for: spring-kafka-test/src/main/java/org/springframework/kafka/test/EmbeddedKafkaBrokerFactory.java

+3-14
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2024 the original author or authors.
2+
* Copyright 2024-2025 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -63,12 +63,8 @@ public static EmbeddedKafkaBroker create(EmbeddedKafka embeddedKafka, Function<S
6363
.toArray(String[]::new);
6464

6565
EmbeddedKafkaBroker embeddedKafkaBroker;
66-
if (embeddedKafka.kraft()) {
67-
embeddedKafkaBroker = kraftBroker(embeddedKafka, topics);
68-
}
69-
else {
70-
embeddedKafkaBroker = zkBroker(embeddedKafka, topics);
71-
}
66+
embeddedKafkaBroker = kraftBroker(embeddedKafka, topics);
67+
7268
int[] ports = setupPorts(embeddedKafka);
7369

7470
embeddedKafkaBroker.kafkaPorts(ports)
@@ -133,13 +129,6 @@ private static EmbeddedKafkaBroker kraftBroker(EmbeddedKafka embedded, String[]
133129
return new EmbeddedKafkaKraftBroker(embedded.count(), embedded.partitions(), topics);
134130
}
135131

136-
private static EmbeddedKafkaBroker zkBroker(EmbeddedKafka embedded, String[] topics) {
137-
return new EmbeddedKafkaZKBroker(embedded.count(), embedded.controlledShutdown(), embedded.partitions(), topics)
138-
.zkPort(embedded.zookeeperPort())
139-
.zkConnectionTimeout(embedded.zkConnectionTimeout())
140-
.zkSessionTimeout(embedded.zkSessionTimeout());
141-
}
142-
143132
private EmbeddedKafkaBrokerFactory() {
144133
}
145134

Diff for: spring-kafka-test/src/main/java/org/springframework/kafka/test/EmbeddedKafkaKraftBroker.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -41,8 +41,6 @@
4141
import java.util.stream.Collectors;
4242

4343
import kafka.server.KafkaConfig;
44-
import kafka.testkit.KafkaClusterTestKit;
45-
import kafka.testkit.TestKitNodes;
4644
import org.apache.commons.logging.LogFactory;
4745
import org.apache.kafka.clients.CommonClientConfigs;
4846
import org.apache.kafka.clients.admin.AdminClient;
@@ -53,6 +51,8 @@
5351
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
5452
import org.apache.kafka.common.KafkaException;
5553
import org.apache.kafka.common.TopicPartition;
54+
import org.apache.kafka.common.test.KafkaClusterTestKit;
55+
import org.apache.kafka.common.test.TestKitNodes;
5656
import org.apache.kafka.common.utils.Exit;
5757
import org.apache.kafka.common.utils.Utils;
5858
import org.jspecify.annotations.Nullable;

0 commit comments

Comments
 (0)