Skip to content

Commit 459da47

Browse files
authoredJun 3, 2024
KAFKA-16525; Dynamic KRaft network manager and channel (#15986)
Allow KRaft replicas to send requests to any node (Node) not just the nodes configured in the controller.quorum.voters property. This flexibility is needed so KRaft can implement the controller.quorum.voters configuration, send request to the dynamically changing set of voters and send request to the leader endpoint (Node) discovered through the KRaft RPCs (specially BeginQuorumEpoch request and Fetch response). This was achieved by changing the RequestManager API to accept Node instead of just the replica ID. Internally, the request manager tracks connection state using the Node.idString method to match the connection management used by NetworkClient. The API for RequestManager is also changed so that the ConnectState class is not exposed in the API. This allows the request manager to reclaim heap memory for any connection that is ready. The NetworkChannel was updated to receive the endpoint information (Node) through the outbound raft request (RaftRequent.Outbound). This makes the network channel more flexible as it doesn't need to be configured with the list of all possible endpoints. RaftRequest.Outbound and RaftResponse.Inbound were updated to include the remote node instead of just the remote id. The follower state tracked by KRaft replicas was updated to include both the leader id and the leader's endpoint (Node). In this comment the node value is computed from the set of voters. In future commit this will be updated so that it is sent through KRaft RPCs. For example BeginQuorumEpoch request and Fetch response. Support for configuring controller.quorum.bootstrap.servers was added. This includes changes to KafkaConfig, QuorumConfig, etc. All of the tests using QuorumTestHarness were changed to use the controller.quorum.bootstrap.servers instead of the controller.quorum.voters for the broker configuration. Finally, the node id for the bootstrap server will be decreasing negative numbers starting with -2. Reviewers: Jason Gustafson <[email protected]>, Luke Chen <[email protected]>, Colin P. McCabe <[email protected]>
1 parent 8a882a7 commit 459da47

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

45 files changed

+2158
-985
lines changed
 

Diff for: ‎checkstyle/import-control.xml

+1
Original file line numberDiff line numberDiff line change
@@ -441,6 +441,7 @@
441441
<allow pkg="org.apache.kafka.common.message" />
442442
<allow pkg="org.apache.kafka.common.metadata" />
443443
<allow pkg="org.apache.kafka.common.metrics" />
444+
<allow pkg="org.apache.kafka.common.network" />
444445
<allow pkg="org.apache.kafka.common.protocol" />
445446
<allow pkg="org.apache.kafka.common.record" />
446447
<allow pkg="org.apache.kafka.common.requests" />

Diff for: ‎core/src/main/scala/kafka/raft/RaftManager.scala

+11-7
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import java.nio.file.Paths
2323
import java.util.OptionalInt
2424
import java.util.concurrent.CompletableFuture
2525
import java.util.{Map => JMap}
26+
import java.util.{Collection => JCollection}
2627
import kafka.log.LogManager
2728
import kafka.log.UnifiedLog
2829
import kafka.server.KafkaConfig
@@ -133,7 +134,7 @@ trait RaftManager[T] {
133134

134135
def replicatedLog: ReplicatedLog
135136

136-
def voterNode(id: Int, listener: String): Option[Node]
137+
def voterNode(id: Int, listener: ListenerName): Option[Node]
137138
}
138139

139140
class KafkaRaftManager[T](
@@ -147,6 +148,7 @@ class KafkaRaftManager[T](
147148
metrics: Metrics,
148149
threadNamePrefixOpt: Option[String],
149150
val controllerQuorumVotersFuture: CompletableFuture[JMap[Integer, InetSocketAddress]],
151+
bootstrapServers: JCollection[InetSocketAddress],
150152
fatalFaultHandler: FaultHandler
151153
) extends RaftManager[T] with Logging {
152154

@@ -185,7 +187,6 @@ class KafkaRaftManager[T](
185187
def startup(): Unit = {
186188
client.initialize(
187189
controllerQuorumVotersFuture.get(),
188-
config.controllerListenerNames.head,
189190
new FileQuorumStateStore(new File(dataDir, FileQuorumStateStore.DEFAULT_FILE_NAME)),
190191
metrics
191192
)
@@ -228,14 +229,15 @@ class KafkaRaftManager[T](
228229
expirationService,
229230
logContext,
230231
clusterId,
232+
bootstrapServers,
231233
raftConfig
232234
)
233235
client
234236
}
235237

236238
private def buildNetworkChannel(): KafkaNetworkChannel = {
237-
val netClient = buildNetworkClient()
238-
new KafkaNetworkChannel(time, netClient, config.quorumRequestTimeoutMs, threadNamePrefix)
239+
val (listenerName, netClient) = buildNetworkClient()
240+
new KafkaNetworkChannel(time, listenerName, netClient, config.quorumRequestTimeoutMs, threadNamePrefix)
239241
}
240242

241243
private def createDataDir(): File = {
@@ -254,7 +256,7 @@ class KafkaRaftManager[T](
254256
)
255257
}
256258

257-
private def buildNetworkClient(): NetworkClient = {
259+
private def buildNetworkClient(): (ListenerName, NetworkClient) = {
258260
val controllerListenerName = new ListenerName(config.controllerListenerNames.head)
259261
val controllerSecurityProtocol = config.effectiveListenerSecurityProtocolMap.getOrElse(
260262
controllerListenerName,
@@ -292,7 +294,7 @@ class KafkaRaftManager[T](
292294
val reconnectBackoffMsMs = 500
293295
val discoverBrokerVersions = true
294296

295-
new NetworkClient(
297+
val networkClient = new NetworkClient(
296298
selector,
297299
new ManualMetadataUpdater(),
298300
clientId,
@@ -309,13 +311,15 @@ class KafkaRaftManager[T](
309311
apiVersions,
310312
logContext
311313
)
314+
315+
(controllerListenerName, networkClient)
312316
}
313317

314318
override def leaderAndEpoch: LeaderAndEpoch = {
315319
client.leaderAndEpoch
316320
}
317321

318-
override def voterNode(id: Int, listener: String): Option[Node] = {
322+
override def voterNode(id: Int, listener: ListenerName): Option[Node] = {
319323
client.voterNode(id, listener).toScala
320324
}
321325
}

0 commit comments

Comments
 (0)
Please sign in to comment.