Skip to content

feat(core): reuse unregistered node when requesting for next node id #2183

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Nov 27, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
import org.apache.kafka.server.common.MetadataVersion;
import org.apache.kafka.timeline.SnapshotRegistry;
import org.apache.kafka.timeline.TimelineHashMap;
import org.apache.kafka.timeline.TimelineHashSet;

import org.slf4j.Logger;

Expand Down Expand Up @@ -305,6 +306,11 @@ boolean check() {
* The real next available node id is generally one greater than this value.
*/
private AtomicInteger nextNodeId = new AtomicInteger(-1);

/**
* A set of node IDs that have been unregistered and can be reused for new node assignments.
*/
private final TimelineHashSet<Integer> reusableNodeIds;
// AutoMQ for Kafka inject end

private ClusterControlManager(
Expand Down Expand Up @@ -339,6 +345,7 @@ private ClusterControlManager(
this.brokerUncleanShutdownHandler = brokerUncleanShutdownHandler;
// AutoMQ for Kafka inject start
this.maxControllerId = QuorumConfig.parseVoterConnections(quorumVoters).keySet().stream().max(Integer::compareTo).orElse(0);
this.reusableNodeIds = new TimelineHashSet<>(snapshotRegistry, 0);
// AutoMQ for Kafka inject end
this.interBrokerListenerName = interBrokerListenerName;
}
Expand Down Expand Up @@ -386,11 +393,21 @@ boolean zkRegistrationAllowed() {

// AutoMQ for Kafka inject start
public ControllerResult<Integer> getNextNodeId() {
int maxBrokerId = brokerRegistrations.keySet().stream().max(Integer::compareTo).orElse(0);
int maxNodeId = Math.max(maxBrokerId, maxControllerId);
int nextId = this.nextNodeId.accumulateAndGet(maxNodeId, (x, y) -> Math.max(x, y) + 1);
// Let the broker's nodeId start from 1000 to easily distinguish broker and controller.
nextId = Math.max(nextId, 1000);
int nextId;
if (!reusableNodeIds.isEmpty()) {
Iterator<Integer> iterator = reusableNodeIds.iterator();
nextId = iterator.next();
// we simply remove the id from reusable id set because we're unable to determine if the id
// will finally be used.
iterator.remove();
} else {
int maxBrokerId = brokerRegistrations.keySet().stream().max(Integer::compareTo).orElse(0);
int maxNodeId = Math.max(maxBrokerId, maxControllerId);
nextId = this.nextNodeId.accumulateAndGet(maxNodeId, (x, y) -> Math.max(x, y) + 1);
// Let the broker's nodeId start from 1000 to easily distinguish broker and controller.
nextId = Math.max(nextId, 1000);
}

UpdateNextNodeIdRecord record = new UpdateNextNodeIdRecord().setNodeId(nextId);

List<ApiMessageAndVersion> records = new ArrayList<>();
Expand Down Expand Up @@ -623,6 +640,11 @@ public void replay(RegisterBrokerRecord record, long offset) {
if (prevRegistration != null) heartbeatManager.remove(brokerId);
heartbeatManager.register(brokerId, record.fenced());
}

// AutoMQ injection start
reusableNodeIds.remove(brokerId);
// AutoMQ injection end

if (prevRegistration == null) {
log.info("Replayed initial RegisterBrokerRecord for broker {}: {}", record.brokerId(), record);
} else if (prevRegistration.incarnationId().equals(record.incarnationId())) {
Expand All @@ -648,6 +670,9 @@ public void replay(UnregisterBrokerRecord record) {
if (heartbeatManager != null) heartbeatManager.remove(brokerId);
updateDirectories(brokerId, registration.directories(), null);
brokerRegistrations.remove(brokerId);
// AutoMQ injection start
reusableNodeIds.add(brokerId);
// AutoMQ injection end
log.info("Replayed {}", record);
}
}
Expand Down