Skip to content

Commit 9eca0de

Browse files
Chillax-0v0superhx
authored andcommitted
refactor(controller): consider brokers that has recently CONTROLLED_SHUTDOWN as SHUTTING_DOWN (#2261)
* refactor(controller): consider brokers that has recently `CONTROLLED_SHUTDOWN` as `SHUTTING_DOWN` Signed-off-by: Ning Yu <[email protected]> * test: test `BrokerHeartbeatManager#brokerState` Signed-off-by: Ning Yu <[email protected]> * revert(NodeState): revert `SHUTDOWN` and `SHUTTING_DOWN` to `FENCED` and `CONTROLLED_SHUTDOWN` Signed-off-by: Ning Yu <[email protected]> --------- Signed-off-by: Ning Yu <[email protected]>
1 parent 9f3b55b commit 9eca0de

File tree

6 files changed

+115
-11
lines changed

6 files changed

+115
-11
lines changed

metadata/src/main/java/org/apache/kafka/controller/BrokerHeartbeatManager.java

+41
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import org.apache.kafka.common.message.BrokerHeartbeatRequestData;
2121
import org.apache.kafka.common.utils.LogContext;
2222
import org.apache.kafka.common.utils.Time;
23+
import org.apache.kafka.controller.stream.NodeState;
2324
import org.apache.kafka.controller.stream.OverloadCircuitBreaker;
2425
import org.apache.kafka.metadata.placement.UsableBroker;
2526

@@ -83,6 +84,16 @@ static class BrokerHeartbeatState {
8384
*/
8485
private long controlledShutdownOffset;
8586

87+
// AutoMQ inject start
88+
/**
89+
* The last time the broker was controlled shutdown, in monotonic nanoseconds, or 0
90+
* if the broker has never been controlled shutdown since the most recent start.
91+
* It will be updated on receiving a broker heartbeat with controlled shutdown request.
92+
* It will be reset to 0 when the broker is active again.
93+
*/
94+
private long lastControlledShutdownNs;
95+
// AutoMQ inject end
96+
8697
/**
8798
* The previous entry in the unfenced list, or null if the broker is not in that list.
8899
*/
@@ -100,6 +111,9 @@ static class BrokerHeartbeatState {
100111
this.next = null;
101112
this.metadataOffset = -1;
102113
this.controlledShutdownOffset = -1;
114+
// AutoMQ inject start
115+
this.lastControlledShutdownNs = 0;
116+
// AutoMQ inject end
103117
}
104118

105119
/**
@@ -122,6 +136,12 @@ boolean fenced() {
122136
boolean shuttingDown() {
123137
return controlledShutdownOffset >= 0;
124138
}
139+
140+
// AutoMQ inject start
141+
long lastControlledShutdownNs() {
142+
return lastControlledShutdownNs;
143+
}
144+
// AutoMQ inject end
125145
}
126146

127147
static class MetadataOffsetComparator implements Comparator<BrokerHeartbeatState> {
@@ -441,6 +461,9 @@ void maybeUpdateControlledShutdownOffset(int brokerId, long controlledShutDownOf
441461
throw new RuntimeException("Fenced brokers cannot enter controlled shutdown.");
442462
}
443463
active.remove(broker);
464+
// AutoMQ inject start
465+
broker.lastControlledShutdownNs = time.nanoseconds();
466+
// AutoMQ inject end
444467
if (broker.controlledShutdownOffset < 0) {
445468
broker.controlledShutdownOffset = controlledShutDownOffset;
446469
log.debug("Updated the controlled shutdown offset for broker {} to {}.",
@@ -489,6 +512,24 @@ Iterator<UsableBroker> usableBrokers(
489512
}
490513

491514
// AutoMQ inject start
515+
public NodeState brokerState(int brokerId, long shutdownTimeoutNs) {
516+
BrokerHeartbeatState broker = brokers.get(brokerId);
517+
if (broker == null) {
518+
return NodeState.UNKNOWN;
519+
}
520+
if (broker.shuttingDown()) {
521+
return NodeState.CONTROLLED_SHUTDOWN;
522+
}
523+
if (broker.fenced()) {
524+
if (broker.lastControlledShutdownNs() + shutdownTimeoutNs > time.nanoseconds()) {
525+
// The broker is still in controlled shutdown.
526+
return NodeState.CONTROLLED_SHUTDOWN;
527+
}
528+
return NodeState.FENCED;
529+
}
530+
return NodeState.ACTIVE;
531+
}
532+
492533
long nextCheckTimeNs() {
493534
if (overloadCircuitBreaker.isOverload()) {
494535
return Long.MAX_VALUE;

metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java

+4
Original file line numberDiff line numberDiff line change
@@ -1003,5 +1003,9 @@ public List<BrokerRegistration> getActiveBrokers() {
10031003
.filter(b -> isActive(b.id()))
10041004
.collect(Collectors.toList());
10051005
}
1006+
1007+
public BrokerHeartbeatManager getHeartbeatManager() {
1008+
return heartbeatManager;
1009+
}
10061010
// AutoMQ inject end
10071011
}

metadata/src/main/java/org/apache/kafka/controller/stream/DefaultNodeRuntimeInfoGetter.java

+9-10
Original file line numberDiff line numberDiff line change
@@ -11,10 +11,14 @@
1111

1212
package org.apache.kafka.controller.stream;
1313

14+
import org.apache.kafka.controller.BrokerHeartbeatManager;
1415
import org.apache.kafka.controller.ClusterControlManager;
15-
import org.apache.kafka.metadata.BrokerRegistration;
16+
17+
import java.util.concurrent.TimeUnit;
1618

1719
public class DefaultNodeRuntimeInfoGetter implements NodeRuntimeInfoGetter {
20+
private static final long SHUTDOWN_TIMEOUT_NS = TimeUnit.SECONDS.toNanos(60);
21+
1822
private final ClusterControlManager clusterControlManager;
1923
private final StreamControlManager streamControlManager;
2024

@@ -25,17 +29,12 @@ public DefaultNodeRuntimeInfoGetter(ClusterControlManager clusterControlManager,
2529

2630
@Override
2731
public NodeState state(int nodeId) {
28-
BrokerRegistration brokerRegistration = clusterControlManager.registration(nodeId);
29-
if (brokerRegistration == null) {
32+
BrokerHeartbeatManager brokerHeartbeatManager = clusterControlManager.getHeartbeatManager();
33+
if (null == brokerHeartbeatManager) {
34+
// This controller is not the active controller, so we don't have the heartbeat manager.
3035
return NodeState.UNKNOWN;
3136
}
32-
if (brokerRegistration.fenced()) {
33-
return NodeState.FENCED;
34-
}
35-
if (brokerRegistration.inControlledShutdown()) {
36-
return NodeState.CONTROLLED_SHUTDOWN;
37-
}
38-
return NodeState.ACTIVE;
37+
return brokerHeartbeatManager.brokerState(nodeId, SHUTDOWN_TIMEOUT_NS);
3938
}
4039

4140
@Override

metadata/src/main/java/org/apache/kafka/controller/stream/NodeState.java

+20-1
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,25 @@
1111

1212
package org.apache.kafka.controller.stream;
1313

14+
import org.apache.kafka.controller.BrokerControlState;
15+
1416
public enum NodeState {
15-
ACTIVE, FENCED, CONTROLLED_SHUTDOWN, UNKNOWN
17+
/**
18+
* The node is active and can handle requests.
19+
*/
20+
ACTIVE,
21+
/**
22+
* The node is shut down and cannot handle requests.
23+
*/
24+
FENCED,
25+
/**
26+
* The node is shutting down in a controlled manner.
27+
* Note: In AutoMQ, this state is different from {@link BrokerControlState#CONTROLLED_SHUTDOWN}. In some cases,
28+
* a node in {@link BrokerControlState#FENCED} state may still be shutting down in a controlled manner.
29+
*/
30+
CONTROLLED_SHUTDOWN,
31+
/**
32+
* The state of the node is unknown, possibly because it has not yet registered.
33+
*/
34+
UNKNOWN
1635
}

metadata/src/test/java/org/apache/kafka/controller/BrokerHeartbeatManagerTest.java

+40
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import org.apache.kafka.controller.BrokerHeartbeatManager.BrokerHeartbeatStateIterator;
2525
import org.apache.kafka.controller.BrokerHeartbeatManager.BrokerHeartbeatStateList;
2626
import org.apache.kafka.controller.BrokerHeartbeatManager.UsableBrokerIterator;
27+
import org.apache.kafka.controller.stream.NodeState;
2728
import org.apache.kafka.metadata.placement.UsableBroker;
2829

2930
import org.junit.jupiter.api.Test;
@@ -362,4 +363,43 @@ public void testTouchThrowsExceptionUnlessRegistered() {
362363
assertThrows(IllegalStateException.class,
363364
() -> manager.touch(4, false, 0)).getMessage());
364365
}
366+
367+
// AutoMQ inject start
368+
@Test
369+
public void testBrokerState() {
370+
final long shutdownTimeoutNs = 10_000_000; // 10ms
371+
// init
372+
BrokerHeartbeatManager manager = newBrokerHeartbeatManager();
373+
manager.time().sleep(1000);
374+
manager.register(0, true);
375+
376+
// FENCED Broker
377+
assertEquals(NodeState.FENCED, manager.brokerState(0, shutdownTimeoutNs));
378+
379+
// UNFENCED Broker
380+
manager.touch(0, false, 100);
381+
assertEquals(NodeState.ACTIVE, manager.brokerState(0, shutdownTimeoutNs));
382+
383+
// CONTROLLED_SHUTDOWN Broker
384+
manager.maybeUpdateControlledShutdownOffset(0, 100);
385+
assertEquals(NodeState.CONTROLLED_SHUTDOWN, manager.brokerState(0, shutdownTimeoutNs));
386+
387+
// SHUTDOWN_NOW Broker within shutdownTimeoutNs
388+
manager.touch(0, true, 100);
389+
manager.time().sleep(5);
390+
assertEquals(NodeState.CONTROLLED_SHUTDOWN, manager.brokerState(0, shutdownTimeoutNs));
391+
392+
// SHUTDOWN_NOW Broker after shutdownTimeoutNs
393+
manager.time().sleep(6);
394+
assertEquals(NodeState.FENCED, manager.brokerState(0, shutdownTimeoutNs));
395+
396+
// UNFENCED Broker after SHUTDOWN
397+
manager.touch(0, false, 100);
398+
assertEquals(NodeState.ACTIVE, manager.brokerState(0, shutdownTimeoutNs));
399+
400+
// UNREGISTERED Broker
401+
manager.remove(0);
402+
assertEquals(NodeState.UNKNOWN, manager.brokerState(0, shutdownTimeoutNs));
403+
}
404+
// AutoMQ inject end
365405
}

metadata/src/test/java/org/apache/kafka/controller/stream/NodeControlManagerTest.java

+1
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,7 @@ public void testRegister() {
107107
assertEquals(0, nodes.get(0).nodeId());
108108
assertEquals(2L, nodes.get(0).nodeEpoch());
109109
assertEquals("wal2", nodes.get(0).walConfig());
110+
assertEquals(NodeState.FENCED.name(), nodes.get(0).state());
110111
}
111112

112113
AutomqRegisterNodeRequestData.TagCollection tags(Map<String, String> tags) {

0 commit comments

Comments
 (0)