Skip to content

Commit aa43105

Browse files
authored
feat(circuit): support node circuit breaker (#2409)
* feat(circuit): add circuit object storage Signed-off-by: Robin Han <[email protected]> * fix(circuit): fix code review comment Signed-off-by: Robin Han <[email protected]> --------- Signed-off-by: Robin Han <[email protected]>
1 parent 0fd8e9a commit aa43105

File tree

41 files changed

+971
-107
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

41 files changed

+971
-107
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
/*
2+
* Copyright 2024, AutoMQ HK Limited.
3+
*
4+
* The use of this file is governed by the Business Source License,
5+
* as detailed in the file "/LICENSE.S3Stream" included in this repository.
6+
*
7+
* As of the Change Date specified in that file, in accordance with
8+
* the Business Source License, use of this software will be governed
9+
* by the Apache License, Version 2.0
10+
*/
11+
12+
package org.apache.kafka.common.errors.s3;
13+
14+
import org.apache.kafka.common.errors.ApiException;
15+
16+
public class NodeLockedException extends ApiException {
17+
18+
public NodeLockedException(String message) {
19+
super(message);
20+
}
21+
22+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
/*
2+
* Copyright 2024, AutoMQ HK Limited.
3+
*
4+
* The use of this file is governed by the Business Source License,
5+
* as detailed in the file "/LICENSE.S3Stream" included in this repository.
6+
*
7+
* As of the Change Date specified in that file, in accordance with
8+
* the Business Source License, use of this software will be governed
9+
* by the Apache License, Version 2.0
10+
*/
11+
12+
package org.apache.kafka.common.errors.s3;
13+
14+
import org.apache.kafka.common.errors.ApiException;
15+
16+
public class ObjectNotCommittedException extends ApiException {
17+
public ObjectNotCommittedException(String message) {
18+
super(message);
19+
}
20+
}

Diff for: clients/src/main/java/org/apache/kafka/common/protocol/Errors.java

+4
Original file line numberDiff line numberDiff line change
@@ -151,6 +151,8 @@
151151
import org.apache.kafka.common.errors.s3.NodeEpochExpiredException;
152152
import org.apache.kafka.common.errors.s3.NodeEpochNotExistException;
153153
import org.apache.kafka.common.errors.s3.NodeFencedException;
154+
import org.apache.kafka.common.errors.s3.NodeLockedException;
155+
import org.apache.kafka.common.errors.s3.ObjectNotCommittedException;
154156
import org.apache.kafka.common.errors.s3.ObjectNotExistException;
155157
import org.apache.kafka.common.errors.s3.OffsetNotMatchedException;
156158
import org.apache.kafka.common.errors.s3.RedundantOperationException;
@@ -435,6 +437,8 @@ public enum Errors {
435437
KEY_EXIST(512, "The key already exists.", KeyExistException::new),
436438
KEY_NOT_EXIST(513, "The key does not exist.", ObjectNotExistException::new),
437439
NODE_FENCED(514, "The node is fenced.", NodeFencedException::new),
440+
NODE_LOCKED(515, "The node is locked", NodeLockedException::new),
441+
OBJECT_NOT_COMMITED(516, "The object is not commited.", ObjectNotCommittedException::new),
438442
STREAM_INNER_ERROR(599, "The stream inner error.", StreamInnerErrorException::new),
439443
// AutoMQ inject end
440444

Diff for: core/src/main/java/kafka/autobalancer/AutoBalancerManager.java

+1
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,7 @@ protected void init() {
7575

7676
this.anomalyDetector = new AnomalyDetectorImpl(config.originals(),
7777
new LogContext(String.format("[AnomalyDetector id=%d] ", nodeId)), clusterModel, actionExecutorService);
78+
((AnomalyDetectorImpl) this.anomalyDetector).lockedNodes(() -> quorumController.nodeControlManager().lockedNodes());
7879

7980
this.reconfigurables.add(anomalyDetector);
8081

Diff for: core/src/main/java/kafka/autobalancer/detector/AnomalyDetectorImpl.java

+7
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444
import java.util.concurrent.TimeUnit;
4545
import java.util.concurrent.locks.Lock;
4646
import java.util.concurrent.locks.ReentrantLock;
47+
import java.util.function.Supplier;
4748
import java.util.stream.Collectors;
4849

4950
public class AnomalyDetectorImpl extends AbstractAnomalyDetector implements LeaderChangeListener {
@@ -56,6 +57,7 @@ public class AnomalyDetectorImpl extends AbstractAnomalyDetector implements Lead
5657
private final Lock configChangeLock = new ReentrantLock();
5758
private List<Goal> goalsByPriority;
5859
private Set<Integer> excludedBrokers;
60+
private Supplier<Set<Integer>> lockedNodes = Collections::emptySet;
5961
private Set<String> excludedTopics;
6062
private long detectInterval;
6163
private long maxTolerateMetricsDelayMs;
@@ -266,6 +268,10 @@ public void detect() {
266268
this.executorService.schedule(this::detect, nextExecutionDelay, TimeUnit.MILLISECONDS);
267269
}
268270

271+
public void lockedNodes(Supplier<Set<Integer>> lockedNodes) {
272+
this.lockedNodes = lockedNodes;
273+
}
274+
269275
private boolean isRunnable() {
270276
return this.running.get() && this.isLeader;
271277
}
@@ -283,6 +289,7 @@ long detect0() throws Exception {
283289
try {
284290
detectInterval = this.detectInterval;
285291
excludedBrokers = new HashSet<>(this.excludedBrokers);
292+
excludedBrokers.addAll(lockedNodes.get());
286293
excludedTopics = new HashSet<>(this.excludedTopics);
287294
maxTolerateMetricsDelayMs = this.maxTolerateMetricsDelayMs;
288295
maxExecutionConcurrency = this.executionConcurrency;

Diff for: core/src/main/scala/kafka/controller/streamaspect/client/s3/StreamClientFactory.java

+10-6
Original file line numberDiff line numberDiff line change
@@ -18,23 +18,27 @@
1818
import org.apache.kafka.controller.stream.StreamClient;
1919

2020
import com.automq.stream.s3.Config;
21+
import com.automq.stream.s3.operator.ObjectStorage;
2122
import com.automq.stream.s3.operator.ObjectStorageFactory;
2223

24+
import static com.automq.stream.s3.operator.ObjectStorageFactory.EXTENSION_TYPE_BACKGROUND;
25+
import static com.automq.stream.s3.operator.ObjectStorageFactory.EXTENSION_TYPE_KEY;
26+
2327
public class StreamClientFactory {
2428

2529
/**
2630
* This method will be called by {@link StreamClientFactoryProxy}
2731
*/
2832
public static StreamClient get(Context context) {
2933
Config streamConfig = ConfigUtils.to(context.kafkaConfig);
34+
ObjectStorage objectStorage = ObjectStorageFactory.instance().builder()
35+
.buckets(streamConfig.dataBuckets())
36+
.tagging(streamConfig.objectTagging())
37+
.extension(EXTENSION_TYPE_KEY, EXTENSION_TYPE_BACKGROUND)
38+
.build();
3039
return StreamClient.builder()
3140
.streamConfig(streamConfig)
32-
.objectStorage(
33-
ObjectStorageFactory.instance()
34-
.builder(streamConfig.dataBuckets().get(0))
35-
.tagging(streamConfig.objectTagging())
36-
.build()
37-
)
41+
.objectStorage(objectStorage)
3842
.build();
3943
}
4044
}

Diff for: core/src/main/scala/kafka/log/stream/s3/DefaultS3Client.java

+38-23
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,10 @@
6565
import java.util.concurrent.CompletableFuture;
6666
import java.util.concurrent.TimeUnit;
6767

68+
import static com.automq.stream.s3.operator.ObjectStorageFactory.EXTENSION_TYPE_BACKGROUND;
69+
import static com.automq.stream.s3.operator.ObjectStorageFactory.EXTENSION_TYPE_KEY;
70+
import static com.automq.stream.s3.operator.ObjectStorageFactory.EXTENSION_TYPE_MAIN;
71+
6872
public class DefaultS3Client implements Client {
6973
private static final Logger LOGGER = LoggerFactory.getLogger(DefaultS3Client.class);
7074
protected final Config config;
@@ -74,8 +78,8 @@ public class DefaultS3Client implements Client {
7478

7579
protected ControllerRequestSender requestSender;
7680

77-
protected ObjectStorage objectStorage;
78-
protected ObjectStorage compactionObjectStorage;
81+
protected ObjectStorage mainObjectStorage;
82+
protected ObjectStorage backgroundObjectStorage;
7983

8084
protected WriteAheadLog writeAheadLog;
8185
protected StorageFailureHandlerChain storageFailureHandlerChain;
@@ -109,7 +113,6 @@ public DefaultS3Client(BrokerServer brokerServer, Config config) {
109113

110114
@Override
111115
public void start() {
112-
BucketURI dataBucket = config.dataBuckets().get(0);
113116
long refillToken = (long) (config.networkBaselineBandwidth() * ((double) config.refillPeriodMs() / 1000));
114117
if (refillToken <= 0) {
115118
throw new IllegalArgumentException(String.format("refillToken must be greater than 0, bandwidth: %d, refill period: %dms",
@@ -127,34 +130,30 @@ public void start() {
127130
S3StreamMetricsManager.registerNetworkAvailableBandwidthSupplier(AsyncNetworkBandwidthLimiter.Type.OUTBOUND, () ->
128131
config.networkBaselineBandwidth() - (long) networkOutboundRate.derive(
129132
TimeUnit.NANOSECONDS.toSeconds(System.nanoTime()), NetworkStats.getInstance().networkOutboundUsageTotal().get()));
130-
this.objectStorage = ObjectStorageFactory.instance().builder(dataBucket).tagging(config.objectTagging())
131-
.inboundLimiter(networkInboundLimiter).outboundLimiter(networkOutboundLimiter).readWriteIsolate(true)
132-
.threadPrefix("dataflow").build();
133-
if (!objectStorage.readinessCheck()) {
133+
134+
this.localIndexCache = new LocalStreamRangeIndexCache();
135+
this.objectReaderFactory = new DefaultObjectReaderFactory(() -> this.mainObjectStorage);
136+
this.metadataManager = new StreamMetadataManager(brokerServer, config.nodeId(), objectReaderFactory, localIndexCache);
137+
this.requestSender = new ControllerRequestSender(brokerServer, new ControllerRequestSender.RetryPolicyContext(config.controllerRequestRetryMaxCount(),
138+
config.controllerRequestRetryBaseDelayMs()));
139+
this.streamManager = newStreamManager(config.nodeId(), config.nodeEpoch(), false);
140+
this.objectManager = newObjectManager(config.nodeId(), config.nodeEpoch(), false);
141+
this.mainObjectStorage = newMainObjectStorage();
142+
if (!mainObjectStorage.readinessCheck()) {
134143
throw new IllegalArgumentException(String.format("%s is not ready", config.dataBuckets()));
135144
}
136-
this.compactionObjectStorage = ObjectStorageFactory.instance().builder(dataBucket).tagging(config.objectTagging())
137-
.inboundLimiter(networkInboundLimiter).outboundLimiter(networkOutboundLimiter)
138-
.threadPrefix("compaction").build();
139-
ControllerRequestSender.RetryPolicyContext retryPolicyContext = new ControllerRequestSender.RetryPolicyContext(config.controllerRequestRetryMaxCount(),
140-
config.controllerRequestRetryBaseDelayMs());
141-
localIndexCache = new LocalStreamRangeIndexCache();
142-
localIndexCache.init(config.nodeId(), objectStorage);
145+
this.backgroundObjectStorage = newBackgroundObjectStorage();
146+
localIndexCache.init(config.nodeId(), backgroundObjectStorage);
143147
localIndexCache.start();
144-
this.objectReaderFactory = new DefaultObjectReaderFactory(objectStorage);
145-
this.metadataManager = new StreamMetadataManager(brokerServer, config.nodeId(), objectReaderFactory, localIndexCache);
146-
this.requestSender = new ControllerRequestSender(brokerServer, retryPolicyContext);
147-
this.streamManager = newStreamManager(config.nodeId(), config.nodeEpoch(), false);
148148
this.streamManager.setStreamCloseHook(streamId -> localIndexCache.uploadOnStreamClose());
149-
this.objectManager = newObjectManager(config.nodeId(), config.nodeEpoch(), false);
150149
this.objectManager.setCommitStreamSetObjectHook(localIndexCache::updateIndexFromRequest);
151-
this.blockCache = new StreamReaders(this.config.blockCacheSize(), objectManager, objectStorage, objectReaderFactory);
152-
this.compactionManager = new CompactionManager(this.config, this.objectManager, this.streamManager, compactionObjectStorage);
150+
this.blockCache = new StreamReaders(this.config.blockCacheSize(), objectManager, mainObjectStorage, objectReaderFactory);
151+
this.compactionManager = new CompactionManager(this.config, this.objectManager, this.streamManager, backgroundObjectStorage);
153152
this.writeAheadLog = buildWAL();
154153
this.storageFailureHandlerChain = new StorageFailureHandlerChain();
155154
this.storage = newS3Storage();
156155
// stream object compactions share the same object storage with stream set object compactions
157-
this.streamClient = new S3StreamClient(this.streamManager, this.storage, this.objectManager, compactionObjectStorage, this.config, networkInboundLimiter, networkOutboundLimiter);
156+
this.streamClient = new S3StreamClient(this.streamManager, this.storage, this.objectManager, backgroundObjectStorage, this.config, networkInboundLimiter, networkOutboundLimiter);
158157
storageFailureHandlerChain.addHandler(new ForceCloseStorageFailureHandler(streamClient));
159158
storageFailureHandlerChain.addHandler(new HaltStorageFailureHandler());
160159
this.streamClient.registerStreamLifeCycleListener(localIndexCache);
@@ -221,6 +220,22 @@ protected WriteAheadLog buildWAL() {
221220
}
222221
}
223222

223+
protected ObjectStorage newMainObjectStorage() {
224+
return ObjectStorageFactory.instance().builder()
225+
.buckets(config.dataBuckets())
226+
.extension(EXTENSION_TYPE_KEY, EXTENSION_TYPE_MAIN)
227+
.readWriteIsolate(true)
228+
.build();
229+
}
230+
231+
protected ObjectStorage newBackgroundObjectStorage() {
232+
return ObjectStorageFactory.instance().builder()
233+
.buckets(config.dataBuckets())
234+
.extension(EXTENSION_TYPE_KEY, EXTENSION_TYPE_BACKGROUND)
235+
.readWriteIsolate(false)
236+
.build();
237+
}
238+
224239
protected StreamManager newStreamManager(int nodeId, long nodeEpoch, boolean failoverMode) {
225240
return new ControllerStreamManager(this.metadataManager, this.requestSender, nodeId, nodeEpoch,
226241
this::getAutoMQVersion, failoverMode);
@@ -232,7 +247,7 @@ protected ObjectManager newObjectManager(int nodeId, long nodeEpoch, boolean fai
232247
}
233248

234249
protected S3Storage newS3Storage() {
235-
return new S3Storage(config, writeAheadLog, streamManager, objectManager, blockCache, objectStorage, storageFailureHandlerChain);
250+
return new S3Storage(config, writeAheadLog, streamManager, objectManager, blockCache, mainObjectStorage, storageFailureHandlerChain);
236251
}
237252

238253
protected Failover failover() {

Diff for: core/src/main/scala/kafka/log/stream/s3/objects/ControllerObjectManager.java

+6-2
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,8 @@
3434

3535
import com.automq.stream.s3.compact.CompactOperations;
3636
import com.automq.stream.s3.exceptions.AutoMQException;
37+
import com.automq.stream.s3.exceptions.CompactedObjectsNotFoundException;
38+
import com.automq.stream.s3.exceptions.ObjectNotCommittedException;
3739
import com.automq.stream.s3.metadata.S3ObjectMetadata;
3840
import com.automq.stream.s3.objects.CommitStreamSetObjectHook;
3941
import com.automq.stream.s3.objects.CommitStreamSetObjectRequest;
@@ -180,7 +182,7 @@ public Builder toRequestBuilder() {
180182
throw Errors.forCode(resp.errorCode()).exception();
181183
case OBJECT_NOT_EXIST:
182184
case COMPACTED_OBJECTS_NOT_FOUND:
183-
throw code.exception();
185+
throw new CompactedObjectsNotFoundException();
184186
default:
185187
LOGGER.error("Error while committing stream set object: {}, code: {}, retry later", request, code);
186188
return ResponseHandleResult.withRetry();
@@ -231,7 +233,9 @@ public Builder toRequestBuilder() {
231233
throw Errors.forCode(resp.errorCode()).exception();
232234
case OBJECT_NOT_EXIST:
233235
case COMPACTED_OBJECTS_NOT_FOUND:
234-
throw code.exception();
236+
throw new CompactedObjectsNotFoundException();
237+
case OBJECT_NOT_COMMITED:
238+
throw new ObjectNotCommittedException();
235239
case STREAM_NOT_EXIST:
236240
case STREAM_FENCED:
237241
LOGGER.warn("Stream fenced or not exist: {}, code: {}", request, Errors.forCode(resp.errorCode()));

Diff for: core/src/main/scala/kafka/log/stream/s3/streams/ControllerStreamManager.java

+3
Original file line numberDiff line numberDiff line change
@@ -259,6 +259,9 @@ public String toString() {
259259
case STREAM_NOT_CLOSED:
260260
logger.warn("open stream fail: {}, epoch {}, code: STREAM_NOT_CLOSED, retry later", streamId, epoch);
261261
return ResponseHandleResult.withRetry();
262+
case NODE_LOCKED:
263+
logger.warn("[NODE_LOCKED]open stream fail: {}, epoch {}", streamId, epoch);
264+
throw code.exception();
262265
default:
263266
logger.error("Error while opening stream: {}, epoch {}, code: {}, retry later", streamId, epoch, code);
264267
return ResponseHandleResult.withRetry();

Diff for: core/src/main/scala/kafka/log/streamaspect/DefaultOpenStreamChecker.java

+23-3
Original file line numberDiff line numberDiff line change
@@ -22,19 +22,23 @@
2222

2323
import com.automq.stream.s3.metadata.StreamState;
2424

25+
import java.util.Arrays;
26+
2527
public class DefaultOpenStreamChecker implements OpenStreamChecker {
28+
private final int nodeId;
2629
private final KRaftMetadataCache metadataCache;
2730

28-
public DefaultOpenStreamChecker(KRaftMetadataCache metadataCache) {
31+
public DefaultOpenStreamChecker(int nodeId, KRaftMetadataCache metadataCache) {
32+
this.nodeId = nodeId;
2933
this.metadataCache = metadataCache;
3034
}
3135

3236
@Override
3337
public boolean check(Uuid topicId, int partition, long streamId, long epoch) throws StreamFencedException {
34-
return metadataCache.safeRun(image -> DefaultOpenStreamChecker.check(image, topicId, partition, streamId, epoch));
38+
return metadataCache.safeRun(image -> DefaultOpenStreamChecker.check(image, topicId, partition, streamId, epoch, nodeId));
3539
}
3640

37-
public static boolean check(MetadataImage image, Uuid topicId, int partition, long streamId, long epoch) throws StreamFencedException {
41+
public static boolean check(MetadataImage image, Uuid topicId, int partition, long streamId, long epoch, int currentNodeId) throws StreamFencedException {
3842
// When ABA reassign happens:
3943
// 1. Assign P0 to broker0 with epoch=0, broker0 opens the partition
4044
// 2. Assign P0 to broker1 with epoch=1, broker1 waits for the partition to be closed
@@ -52,6 +56,10 @@ public static boolean check(MetadataImage image, Uuid topicId, int partition, lo
5256
if (currentEpoch > epoch) {
5357
throw new StreamFencedException(String.format("partition=%s-%d with epoch=%d is fenced by new leader epoch=%d", topicId, partition, epoch, currentEpoch));
5458
}
59+
if (!contains(partitionImage.isr, currentNodeId)) {
60+
throw new StreamFencedException(String.format("partition=%s-%d with epoch=%d move to other nodes %s", topicId, partition, epoch, Arrays.toString(partitionImage.isr)));
61+
}
62+
5563
S3StreamMetadataImage stream = image.streamsMetadata().getStreamMetadata(streamId);
5664
if (stream == null) {
5765
throw new StreamFencedException(String.format("streamId=%d cannot be found, it may be deleted or not created yet", streamId));
@@ -60,4 +68,16 @@ public static boolean check(MetadataImage image, Uuid topicId, int partition, lo
6068
throw new StreamFencedException(String.format("streamId=%d with epoch=%d is fenced by new leader epoch=%d", streamId, epoch, stream.getEpoch()));
6169
return StreamState.CLOSED.equals(stream.state());
6270
}
71+
72+
private static boolean contains(int[] isr, int nodeId) {
73+
if (isr == null) {
74+
return false;
75+
}
76+
for (int replica : isr) {
77+
if (replica == nodeId) {
78+
return true;
79+
}
80+
}
81+
return false;
82+
}
6383
}

Diff for: core/src/main/scala/kafka/log/streamaspect/ElasticLogManager.scala

+1-1
Original file line numberDiff line numberDiff line change
@@ -133,7 +133,7 @@ object ElasticLogManager {
133133
context.config = config
134134
context.brokerServer = broker
135135
val openStreamChecker = if (broker != null) {
136-
new DefaultOpenStreamChecker(broker.metadataCache)
136+
new DefaultOpenStreamChecker(config.nodeId, broker.metadataCache)
137137
} else {
138138
OpenStreamChecker.NOOP
139139
}

Diff for: metadata/src/main/java/org/apache/kafka/controller/QuorumController.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -145,7 +145,7 @@
145145
import org.apache.kafka.controller.errors.ControllerExceptions;
146146
import org.apache.kafka.controller.errors.EventHandlerExceptionInfo;
147147
import org.apache.kafka.controller.metrics.QuorumControllerMetrics;
148-
import org.apache.kafka.controller.stream.DefaultNodeRuntimeInfoGetter;
148+
import org.apache.kafka.controller.stream.DefaultNodeRuntimeInfoManager;
149149
import org.apache.kafka.controller.stream.KVControlManager;
150150
import org.apache.kafka.controller.stream.NodeControlManager;
151151
import org.apache.kafka.controller.stream.S3ObjectControlManager;
@@ -2165,7 +2165,7 @@ private QuorumController(
21652165
this.streamControlManager = new StreamControlManager(this, snapshotRegistry, logContext,
21662166
this.s3ObjectControlManager, clusterControl, featureControl, replicationControl);
21672167
this.topicDeletionManager = new TopicDeletionManager(snapshotRegistry, this, streamControlManager, kvControlManager);
2168-
this.nodeControlManager = new NodeControlManager(snapshotRegistry, new DefaultNodeRuntimeInfoGetter(clusterControl, streamControlManager));
2168+
this.nodeControlManager = new NodeControlManager(snapshotRegistry, new DefaultNodeRuntimeInfoManager(clusterControl, streamControlManager));
21692169
this.extension = extension.apply(this);
21702170

21712171
// set the nodeControlManager here to avoid circular dependency

0 commit comments

Comments
 (0)