56
56
import org .apache .kafka .server .common .MetadataVersion ;
57
57
import org .apache .kafka .timeline .SnapshotRegistry ;
58
58
import org .apache .kafka .timeline .TimelineHashMap ;
59
+ import org .apache .kafka .timeline .TimelineHashSet ;
59
60
60
61
import org .slf4j .Logger ;
61
62
@@ -292,6 +293,11 @@ boolean check() {
292
293
* The real next available node id is generally one greater than this value.
293
294
*/
294
295
private AtomicInteger nextNodeId = new AtomicInteger (-1 );
296
+
297
+ /**
298
+ * A set of node IDs that have been unregistered and can be reused for new node assignments.
299
+ */
300
+ private final TimelineHashSet <Integer > reusableNodeIds ;
295
301
// AutoMQ for Kafka inject end
296
302
297
303
private ClusterControlManager (
@@ -323,6 +329,7 @@ private ClusterControlManager(
323
329
this .brokerUncleanShutdownHandler = brokerUncleanShutdownHandler ;
324
330
// AutoMQ for Kafka inject start
325
331
this .maxControllerId = QuorumConfig .parseVoterConnections (quorumVoters ).keySet ().stream ().max (Integer ::compareTo ).orElse (0 );
332
+ this .reusableNodeIds = new TimelineHashSet <>(snapshotRegistry , 0 );
326
333
// AutoMQ for Kafka inject end
327
334
}
328
335
@@ -369,11 +376,21 @@ boolean zkRegistrationAllowed() {
369
376
370
377
// AutoMQ for Kafka inject start
371
378
public ControllerResult <Integer > getNextNodeId () {
372
- int maxBrokerId = brokerRegistrations .keySet ().stream ().max (Integer ::compareTo ).orElse (0 );
373
- int maxNodeId = Math .max (maxBrokerId , maxControllerId );
374
- int nextId = this .nextNodeId .accumulateAndGet (maxNodeId , (x , y ) -> Math .max (x , y ) + 1 );
375
- // Let the broker's nodeId start from 1000 to easily distinguish broker and controller.
376
- nextId = Math .max (nextId , 1000 );
379
+ int nextId ;
380
+ if (!reusableNodeIds .isEmpty ()) {
381
+ Iterator <Integer > iterator = reusableNodeIds .iterator ();
382
+ nextId = iterator .next ();
383
+ // we simply remove the id from reusable id set because we're unable to determine if the id
384
+ // will finally be used.
385
+ iterator .remove ();
386
+ } else {
387
+ int maxBrokerId = brokerRegistrations .keySet ().stream ().max (Integer ::compareTo ).orElse (0 );
388
+ int maxNodeId = Math .max (maxBrokerId , maxControllerId );
389
+ nextId = this .nextNodeId .accumulateAndGet (maxNodeId , (x , y ) -> Math .max (x , y ) + 1 );
390
+ // Let the broker's nodeId start from 1000 to easily distinguish broker and controller.
391
+ nextId = Math .max (nextId , 1000 );
392
+ }
393
+
377
394
UpdateNextNodeIdRecord record = new UpdateNextNodeIdRecord ().setNodeId (nextId );
378
395
379
396
List <ApiMessageAndVersion > records = new ArrayList <>();
@@ -583,6 +600,11 @@ public void replay(RegisterBrokerRecord record, long offset) {
583
600
if (prevRegistration != null ) heartbeatManager .remove (brokerId );
584
601
heartbeatManager .register (brokerId , record .fenced ());
585
602
}
603
+
604
+ // AutoMQ injection start
605
+ reusableNodeIds .remove (brokerId );
606
+ // AutoMQ injection end
607
+
586
608
if (prevRegistration == null ) {
587
609
log .info ("Replayed initial RegisterBrokerRecord for broker {}: {}" , record .brokerId (), record );
588
610
} else if (prevRegistration .incarnationId ().equals (record .incarnationId ())) {
@@ -608,6 +630,9 @@ public void replay(UnregisterBrokerRecord record) {
608
630
if (heartbeatManager != null ) heartbeatManager .remove (brokerId );
609
631
updateDirectories (brokerId , registration .directories (), null );
610
632
brokerRegistrations .remove (brokerId );
633
+ // AutoMQ injection start
634
+ reusableNodeIds .add (brokerId );
635
+ // AutoMQ injection end
611
636
log .info ("Replayed {}" , record );
612
637
}
613
638
}
0 commit comments