Skip to content

Commit 0e4eebe

Browse files
authored
KAFKA-12895 Drop support for Scala 2.12 in Kafka 4.0 (#17313)
Reviewers: Chia-Ping Tsai <[email protected]>
1 parent d38a90d commit 0e4eebe

File tree

55 files changed

+175
-369
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

55 files changed

+175
-369
lines changed

LICENSE-binary

-7
Original file line numberDiff line numberDiff line change
@@ -226,7 +226,6 @@ jackson-jaxrs-json-provider-2.16.2
226226
jackson-module-afterburner-2.16.2
227227
jackson-module-jaxb-annotations-2.16.2
228228
jackson-module-scala_2.13-2.16.2
229-
jackson-module-scala_2.12-2.16.2
230229
jakarta.validation-api-2.0.2
231230
javassist-3.29.2-GA
232231
jetty-client-9.4.54.v20240208
@@ -257,15 +256,9 @@ opentelemetry-proto-1.0.0-alpha
257256
plexus-utils-3.5.1
258257
reload4j-1.2.25
259258
rocksdbjni-7.9.2
260-
scala-collection-compat_2.12-2.10.0
261-
scala-collection-compat_2.13-2.10.0
262-
scala-library-2.12.19
263259
scala-library-2.13.15
264-
scala-logging_2.12-3.9.5
265260
scala-logging_2.13-3.9.5
266-
scala-reflect-2.12.19
267261
scala-reflect-2.13.15
268-
scala-java8-compat_2.12-1.0.2
269262
scala-java8-compat_2.13-1.0.2
270263
snappy-java-1.1.10.5
271264
swagger-annotations-2.2.8

README.md

+1-50
Original file line numberDiff line numberDiff line change
@@ -11,9 +11,7 @@ the broker and tools has been deprecated since Apache Kafka 3.7 and removal of b
1111
see [KIP-750](https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=181308223) and
1212
[KIP-1013](https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=284789510) for more details).
1313

14-
Scala 2.12 and 2.13 are supported and 2.13 is used by default. Scala 2.12 support has been deprecated since
15-
Apache Kafka 3.0 and will be removed in Apache Kafka 4.0 (see [KIP-751](https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=181308218)
16-
for more details). See below for how to use a specific Scala version or all of the supported Scala versions.
14+
Scala 2.13 is the only supported version in Apache Kafka.
1715

1816
### Build a jar and run it ###
1917
./gradlew jar
@@ -122,23 +120,6 @@ Using compiled files:
122120
### Cleaning the build ###
123121
./gradlew clean
124122

125-
### Running a task with one of the Scala versions available (2.12.x or 2.13.x) ###
126-
*Note that if building the jars with a version other than 2.13.x, you need to set the `SCALA_VERSION` variable or change it in `bin/kafka-run-class.sh` to run the quick start.*
127-
128-
You can pass either the major version (eg 2.12) or the full version (eg 2.12.7):
129-
130-
./gradlew -PscalaVersion=2.12 jar
131-
./gradlew -PscalaVersion=2.12 test
132-
./gradlew -PscalaVersion=2.12 releaseTarGz
133-
134-
### Running a task with all the scala versions enabled by default ###
135-
136-
Invoke the `gradlewAll` script followed by the task(s):
137-
138-
./gradlewAll test
139-
./gradlewAll jar
140-
./gradlewAll releaseTarGz
141-
142123
### Running a task for a specific project ###
143124
This is for `core`, `examples` and `clients`
144125

@@ -162,24 +143,6 @@ The `eclipse` task has been configured to use `${project_dir}/build_eclipse` as
162143
build directory (`${project_dir}/bin`) clashes with Kafka's scripts directory and we don't use Gradle's build directory
163144
to avoid known issues with this configuration.
164145

165-
### Publishing the jar for all versions of Scala and for all projects to maven ###
166-
The recommended command is:
167-
168-
./gradlewAll publish
169-
170-
For backwards compatibility, the following also works:
171-
172-
./gradlewAll uploadArchives
173-
174-
Please note for this to work you should create/update `${GRADLE_USER_HOME}/gradle.properties` (typically, `~/.gradle/gradle.properties`) and assign the following variables
175-
176-
mavenUrl=
177-
mavenUsername=
178-
mavenPassword=
179-
signing.keyId=
180-
signing.password=
181-
signing.secretKeyRingFile=
182-
183146
### Publishing the streams quickstart archetype artifact to maven ###
184147
For the Streams archetype project, one cannot use gradle to upload to maven; instead the `mvn deploy` command needs to be called at the quickstart folder:
185148

@@ -209,22 +172,10 @@ Please note for this to work you should create/update user maven settings (typic
209172
</servers>
210173
...
211174

212-
213-
### Installing ALL the jars to the local Maven repository ###
214-
The recommended command to build for both Scala 2.12 and 2.13 is:
215-
216-
./gradlewAll publishToMavenLocal
217-
218-
For backwards compatibility, the following also works:
219-
220-
./gradlewAll install
221-
222175
### Installing specific projects to the local Maven repository ###
223176

224177
./gradlew -PskipSigning=true :streams:publishToMavenLocal
225178

226-
If needed, you can specify the Scala version with `-PscalaVersion=2.13`.
227-
228179
### Building the test jar ###
229180
./gradlew testJar
230181

build.gradle

+4-27
Original file line numberDiff line numberDiff line change
@@ -773,25 +773,11 @@ subprojects {
773773
scalaCompileOptions.additionalParameters += inlineFrom
774774
}
775775

776-
if (versions.baseScala != '2.12') {
777-
scalaCompileOptions.additionalParameters += ["-opt-warnings", "-Xlint:strict-unsealed-patmat"]
778-
// Scala 2.13.2 introduces compiler warnings suppression, which is a pre-requisite for -Xfatal-warnings
779-
scalaCompileOptions.additionalParameters += ["-Xfatal-warnings"]
780-
}
781-
782-
// these options are valid for Scala versions < 2.13 only
783-
// Scala 2.13 removes them, see https://github.com/scala/scala/pull/6502 and https://github.com/scala/scala/pull/5969
784-
if (versions.baseScala == '2.12') {
785-
scalaCompileOptions.additionalParameters += [
786-
"-Xlint:by-name-right-associative",
787-
"-Xlint:nullary-override",
788-
"-Xlint:unsound-match"
789-
]
790-
}
776+
scalaCompileOptions.additionalParameters += ["-opt-warnings", "-Xlint:strict-unsealed-patmat"]
777+
// Scala 2.13.2 introduces compiler warnings suppression, which is a pre-requisite for -Xfatal-warnings
778+
scalaCompileOptions.additionalParameters += ["-Xfatal-warnings"]
791779

792-
// Scalac 2.12 `-release` requires Java 9 or higher, but Scala 2.13 doesn't have that restriction
793-
if (versions.baseScala == "2.13" || JavaVersion.current().isJava9Compatible())
794-
scalaCompileOptions.additionalParameters += ["-release", String.valueOf(minJavaVersion)]
780+
scalaCompileOptions.additionalParameters += ["-release", String.valueOf(minJavaVersion)]
795781

796782
addParametersForTests(name, options)
797783

@@ -1096,7 +1082,6 @@ project(':core') {
10961082
implementation libs.joptSimple
10971083
implementation libs.jose4j
10981084
implementation libs.metrics
1099-
implementation libs.scalaCollectionCompat
11001085
implementation libs.scalaJava8Compat
11011086
// only needed transitively, but set it explicitly to ensure it has the same version as scala-library
11021087
implementation libs.scalaReflect
@@ -2813,14 +2798,6 @@ project(':streams:streams-scala') {
28132798
api project(':streams')
28142799

28152800
api libs.scalaLibrary
2816-
if ( versions.baseScala == '2.12' ) {
2817-
// Scala-Collection-Compat isn't required when compiling with Scala 2.13 or later,
2818-
// and having it in the dependencies could lead to classpath conflicts in Scala 3
2819-
// projects that use kafka-streams-kafka_2.13 (because we don't have a Scala 3 version yet)
2820-
// but also pull in scala-collection-compat_3 via another dependency.
2821-
// So we make sure to not include it in the dependencies.
2822-
api libs.scalaCollectionCompat
2823-
}
28242801
testImplementation project(':group-coordinator')
28252802
testImplementation project(':core')
28262803
testImplementation project(':test-common')

core/src/main/scala/kafka/admin/ConfigCommand.scala

+1-1
Original file line numberDiff line numberDiff line change
@@ -628,7 +628,7 @@ object ConfigCommand extends Logging {
628628

629629
private def describeQuotaConfigs(adminClient: Admin, entityTypes: List[String], entityNames: List[String]): Unit = {
630630
val quotaConfigs = getAllClientQuotasConfigs(adminClient, entityTypes, entityNames)
631-
quotaConfigs.forKeyValue { (entity, entries) =>
631+
quotaConfigs.foreachEntry { (entity, entries) =>
632632
val entityEntries = entity.entries.asScala
633633

634634
def entitySubstr(entityType: String): Option[String] =

core/src/main/scala/kafka/admin/ZkSecurityMigrator.scala

+1-2
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@ package kafka.admin
1919

2020
import joptsimple.{OptionSet, OptionSpec, OptionSpecBuilder}
2121
import kafka.server.KafkaConfig
22-
import kafka.utils.Implicits._
2322
import kafka.utils.{Logging, ToolsUtils}
2423
import kafka.zk.{ControllerZNode, KafkaZkClient, ZkData, ZkSecurityMigratorUtils}
2524
import org.apache.kafka.common.security.JaasUtils
@@ -130,7 +129,7 @@ object ZkSecurityMigrator extends Logging {
130129
// Now override any set system properties with explicitly-provided values from the config file
131130
// Emit INFO logs due to camel-case property names encouraging mistakes -- help people see mistakes they make
132131
info(s"Found ${zkTlsConfigFileProps.size()} ZooKeeper client configuration properties in file $filename")
133-
zkTlsConfigFileProps.asScala.forKeyValue { (key, value) =>
132+
zkTlsConfigFileProps.asScala.foreachEntry { (key, value) =>
134133
info(s"Setting $key")
135134
KafkaConfig.setZooKeeperClientProperty(zkClientConfig, key, value)
136135
}

core/src/main/scala/kafka/controller/ControllerChannelManager.scala

+6-7
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@ package kafka.controller
1919
import com.yammer.metrics.core.{Gauge, Timer}
2020
import kafka.cluster.Broker
2121
import kafka.server.KafkaConfig
22-
import kafka.utils.Implicits._
2322
import kafka.utils._
2423
import org.apache.kafka.clients._
2524
import org.apache.kafka.common._
@@ -524,11 +523,11 @@ abstract class AbstractControllerBrokerRequestBatch(config: KafkaConfig,
524523
else if (metadataVersion.isAtLeast(IBP_1_0_IV0)) 1
525524
else 0
526525

527-
leaderAndIsrRequestMap.forKeyValue { (broker, leaderAndIsrPartitionStates) =>
526+
leaderAndIsrRequestMap.foreachEntry { (broker, leaderAndIsrPartitionStates) =>
528527
if (metadataInstance.liveOrShuttingDownBrokerIds.contains(broker)) {
529528
val leaderIds = mutable.Set.empty[Int]
530529
var numBecomeLeaders = 0
531-
leaderAndIsrPartitionStates.forKeyValue { (topicPartition, state) =>
530+
leaderAndIsrPartitionStates.foreachEntry { (topicPartition, state) =>
532531
leaderIds += state.leader
533532
val typeOfRequest = if (broker == state.leader) {
534533
numBecomeLeaders += 1
@@ -669,18 +668,18 @@ abstract class AbstractControllerBrokerRequestBatch(config: KafkaConfig,
669668
handleStopReplicaResponse(stopReplicaResponse, brokerId, partitionErrorsForDeletingTopics.toMap)
670669
}
671670

672-
stopReplicaRequestMap.forKeyValue { (brokerId, partitionStates) =>
671+
stopReplicaRequestMap.foreachEntry { (brokerId, partitionStates) =>
673672
if (metadataInstance.liveOrShuttingDownBrokerIds.contains(brokerId)) {
674673
if (traceEnabled)
675-
partitionStates.forKeyValue { (topicPartition, partitionState) =>
674+
partitionStates.foreachEntry { (topicPartition, partitionState) =>
676675
stateChangeLog.trace(s"Sending StopReplica request $partitionState to " +
677676
s"broker $brokerId for partition $topicPartition")
678677
}
679678

680679
val brokerEpoch = metadataInstance.liveBrokerIdAndEpochs(brokerId)
681680
if (stopReplicaRequestVersion >= 3) {
682681
val stopReplicaTopicState = mutable.Map.empty[String, StopReplicaTopicState]
683-
partitionStates.forKeyValue { (topicPartition, partitionState) =>
682+
partitionStates.foreachEntry { (topicPartition, partitionState) =>
684683
val topicState = stopReplicaTopicState.getOrElseUpdate(topicPartition.topic,
685684
new StopReplicaTopicState().setTopicName(topicPartition.topic))
686685
topicState.partitionStates().add(partitionState)
@@ -699,7 +698,7 @@ abstract class AbstractControllerBrokerRequestBatch(config: KafkaConfig,
699698
val topicStatesWithDelete = mutable.Map.empty[String, StopReplicaTopicState]
700699
val topicStatesWithoutDelete = mutable.Map.empty[String, StopReplicaTopicState]
701700

702-
partitionStates.forKeyValue { (topicPartition, partitionState) =>
701+
partitionStates.foreachEntry { (topicPartition, partitionState) =>
703702
val topicStates = if (partitionState.deletePartition()) {
704703
numPartitionStateWithDelete += 1
705704
topicStatesWithDelete

core/src/main/scala/kafka/controller/ControllerContext.scala

+1-2
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
package kafka.controller
1919

2020
import kafka.cluster.Broker
21-
import kafka.utils.Implicits._
2221
import org.apache.kafka.common.{TopicPartition, Uuid}
2322
import org.apache.kafka.metadata.LeaderAndIsr
2423

@@ -522,7 +521,7 @@ class ControllerContext extends ControllerChannelContext {
522521
}
523522

524523
private def cleanPreferredReplicaImbalanceMetric(topic: String): Unit = {
525-
partitionAssignments.getOrElse(topic, mutable.Map.empty).forKeyValue { (partition, replicaAssignment) =>
524+
partitionAssignments.getOrElse(topic, mutable.Map.empty).foreachEntry { (partition, replicaAssignment) =>
526525
partitionLeadershipInfo.get(new TopicPartition(topic, partition)).foreach { leadershipInfo =>
527526
if (!hasPreferredLeader(replicaAssignment, leadershipInfo))
528527
preferredReplicaImbalanceCount -= 1

core/src/main/scala/kafka/controller/KafkaController.scala

+8-9
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@ import kafka.coordinator.transaction.ZkProducerIdManager
2626
import kafka.server._
2727
import kafka.server.metadata.ZkFinalizedFeatureCache
2828
import kafka.utils._
29-
import kafka.utils.Implicits._
3029
import kafka.zk.KafkaZkClient.UpdateLeaderAndIsrResult
3130
import kafka.zk.TopicZNode.TopicIdReplicaAssignment
3231
import kafka.zk.{FeatureZNodeStatus, _}
@@ -1030,7 +1029,7 @@ class KafkaController(val config: KafkaConfig,
10301029

10311030
private def updateLeaderAndIsrCache(partitions: Seq[TopicPartition] = controllerContext.allPartitions.toSeq): Unit = {
10321031
val leaderIsrAndControllerEpochs = zkClient.getTopicPartitionStates(partitions)
1033-
leaderIsrAndControllerEpochs.forKeyValue { (partition, leaderIsrAndControllerEpoch) =>
1032+
leaderIsrAndControllerEpochs.foreachEntry { (partition, leaderIsrAndControllerEpoch) =>
10341033
controllerContext.putPartitionLeadershipInfo(partition, leaderIsrAndControllerEpoch)
10351034
}
10361035
}
@@ -1297,7 +1296,7 @@ class KafkaController(val config: KafkaConfig,
12971296
}.toMap.groupBy { case (_, assignedReplicas) => assignedReplicas.head }
12981297

12991298
// for each broker, check if a preferred replica election needs to be triggered
1300-
preferredReplicasForTopicsByBrokers.forKeyValue { (leaderBroker, topicPartitionsForBroker) =>
1299+
preferredReplicasForTopicsByBrokers.foreachEntry { (leaderBroker, topicPartitionsForBroker) =>
13011300
val topicsNotInPreferredReplica = topicPartitionsForBroker.filter { case (topicPartition, _) =>
13021301
val leadershipInfo = controllerContext.partitionLeadershipInfo(topicPartition)
13031302
leadershipInfo.exists(_.leaderAndIsr.leader != leaderBroker)
@@ -1776,7 +1775,7 @@ class KafkaController(val config: KafkaConfig,
17761775
}
17771776
} else if (partitionsToBeAdded.nonEmpty) {
17781777
info(s"New partitions to be added $partitionsToBeAdded")
1779-
partitionsToBeAdded.forKeyValue { (topicPartition, assignedReplicas) =>
1778+
partitionsToBeAdded.foreachEntry { (topicPartition, assignedReplicas) =>
17801779
controllerContext.updatePartitionFullReplicaAssignment(topicPartition, assignedReplicas)
17811780
}
17821781
onNewPartitionCreation(partitionsToBeAdded.keySet)
@@ -1821,7 +1820,7 @@ class KafkaController(val config: KafkaConfig,
18211820
val reassignmentResults = mutable.Map.empty[TopicPartition, ApiError]
18221821
val partitionsToReassign = mutable.Map.empty[TopicPartition, ReplicaAssignment]
18231822

1824-
zkClient.getPartitionReassignment.forKeyValue { (tp, targetReplicas) =>
1823+
zkClient.getPartitionReassignment.foreachEntry { (tp, targetReplicas) =>
18251824
maybeBuildReassignment(tp, Some(targetReplicas)) match {
18261825
case Some(context) => partitionsToReassign.put(tp, context)
18271826
case None => reassignmentResults.put(tp, new ApiError(Errors.NO_REASSIGNMENT_IN_PROGRESS))
@@ -1858,7 +1857,7 @@ class KafkaController(val config: KafkaConfig,
18581857
val reassignmentResults = mutable.Map.empty[TopicPartition, ApiError]
18591858
val partitionsToReassign = mutable.Map.empty[TopicPartition, ReplicaAssignment]
18601859

1861-
reassignments.forKeyValue { (tp, targetReplicas) =>
1860+
reassignments.foreachEntry { (tp, targetReplicas) =>
18621861
val maybeApiError = targetReplicas.flatMap(validateReplicas(tp, _))
18631862
maybeApiError match {
18641863
case None =>
@@ -2304,7 +2303,7 @@ class KafkaController(val config: KafkaConfig,
23042303

23052304
// After we have returned the result of the `AlterPartition` request, we should check whether
23062305
// there are any reassignments which can be completed by a successful ISR expansion.
2307-
partitionResponses.forKeyValue { (topicPartition, partitionResponse) =>
2306+
partitionResponses.foreachEntry { (topicPartition, partitionResponse) =>
23082307
if (controllerContext.partitionsBeingReassigned.contains(topicPartition)) {
23092308
val isSuccessfulUpdate = partitionResponse.isRight
23102309
if (isSuccessfulUpdate) {
@@ -2480,7 +2479,7 @@ class KafkaController(val config: KafkaConfig,
24802479
partitionsToAlter.keySet
24812480
)
24822481

2483-
partitionResponses.groupBy(_._1.topic).forKeyValue { (topicName, partitionResponses) =>
2482+
partitionResponses.groupBy(_._1.topic).foreachEntry { (topicName, partitionResponses) =>
24842483
// Add each topic part to the response
24852484
val topicResponse = if (useTopicsIds) {
24862485
new AlterPartitionResponseData.TopicData()
@@ -2491,7 +2490,7 @@ class KafkaController(val config: KafkaConfig,
24912490
}
24922491
alterPartitionResponse.topics.add(topicResponse)
24932492

2494-
partitionResponses.forKeyValue { (tp, errorOrIsr) =>
2493+
partitionResponses.foreachEntry { (tp, errorOrIsr) =>
24952494
// Add each partition part to the response (new ISR or error)
24962495
errorOrIsr match {
24972496
case Left(error) =>

core/src/main/scala/kafka/controller/PartitionStateMachine.scala

+1-2
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@ package kafka.controller
1919
import kafka.common.StateChangeFailedException
2020
import kafka.controller.Election._
2121
import kafka.server.KafkaConfig
22-
import kafka.utils.Implicits._
2322
import kafka.utils.Logging
2423
import kafka.zk.KafkaZkClient
2524
import kafka.zk.KafkaZkClient.UpdateLeaderAndIsrResult
@@ -437,7 +436,7 @@ class ZkPartitionStateMachine(config: KafkaConfig,
437436
val adjustedLeaderAndIsrs = partitionsWithLeaders.map(result => result.topicPartition -> result.leaderAndIsr.get).toMap
438437
val UpdateLeaderAndIsrResult(finishedUpdates, updatesToRetry) = zkClient.updateLeaderAndIsr(
439438
adjustedLeaderAndIsrs, controllerContext.epoch, controllerContext.epochZkVersion)
440-
finishedUpdates.forKeyValue { (partition, result) =>
439+
finishedUpdates.foreachEntry { (partition, result) =>
441440
result.foreach { leaderAndIsr =>
442441
val replicaAssignment = controllerContext.partitionFullReplicaAssignment(partition)
443442
val leaderIsrAndControllerEpoch = LeaderIsrAndControllerEpoch(leaderAndIsr, controllerContext.epoch)

core/src/main/scala/kafka/controller/ReplicaStateMachine.scala

+2-3
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@ package kafka.controller
1818

1919
import kafka.common.StateChangeFailedException
2020
import kafka.server.KafkaConfig
21-
import kafka.utils.Implicits._
2221
import kafka.utils.Logging
2322
import kafka.zk.KafkaZkClient
2423
import kafka.zk.KafkaZkClient.UpdateLeaderAndIsrResult
@@ -110,7 +109,7 @@ class ZkReplicaStateMachine(config: KafkaConfig,
110109
if (replicas.nonEmpty) {
111110
try {
112111
controllerBrokerRequestBatch.newBatch()
113-
replicas.groupBy(_.replica).forKeyValue { (replicaId, replicas) =>
112+
replicas.groupBy(_.replica).foreachEntry { (replicaId, replicas) =>
114113
doHandleStateChanges(replicaId, replicas, targetState)
115114
}
116115
controllerBrokerRequestBatch.sendRequestsToBrokers(controllerContext.epoch)
@@ -227,7 +226,7 @@ class ZkReplicaStateMachine(config: KafkaConfig,
227226
controllerContext.partitionLeadershipInfo(replica.topicPartition).isDefined
228227
}
229228
val updatedLeaderIsrAndControllerEpochs = removeReplicasFromIsr(replicaId, replicasWithLeadershipInfo.map(_.topicPartition))
230-
updatedLeaderIsrAndControllerEpochs.forKeyValue { (partition, leaderIsrAndControllerEpoch) =>
229+
updatedLeaderIsrAndControllerEpochs.foreachEntry { (partition, leaderIsrAndControllerEpoch) =>
231230
stateLogger.info(s"Partition $partition state changed to $leaderIsrAndControllerEpoch after removing replica $replicaId from the ISR as part of transition to $OfflineReplica")
232231
if (!controllerContext.isTopicQueuedUpForDeletion(partition.topic)) {
233232
val recipients = controllerContext.partitionReplicaAssignment(partition).filterNot(_ == replicaId)

0 commit comments

Comments
 (0)