Skip to content

Commit af4918e

Browse files
committed
Simplify AutoFollowCoordinator with GroupedListener (#39603)
This change simplifies AutoFollowCoordinator by replacing a combination of AtomicArray and CountDown with GroupedActionListener.
1 parent 7b8ff2d commit af4918e

File tree

1 file changed

+16
-30
lines changed

1 file changed

+16
-30
lines changed

x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinator.java

Lines changed: 16 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
import org.elasticsearch.action.ActionListener;
1515
import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest;
1616
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
17+
import org.elasticsearch.action.support.GroupedActionListener;
1718
import org.elasticsearch.client.Client;
1819
import org.elasticsearch.cluster.ClusterChangedEvent;
1920
import org.elasticsearch.cluster.ClusterState;
@@ -428,23 +429,21 @@ private void checkAutoFollowPattern(String autoFollowPattenName,
428429
MetaData remoteMetadata,
429430
MetaData localMetadata,
430431
Consumer<AutoFollowResult> resultHandler) {
432+
final GroupedActionListener<Tuple<Index, Exception>> groupedListener = new GroupedActionListener<>(
433+
ActionListener.wrap(
434+
rs -> resultHandler.accept(new AutoFollowResult(autoFollowPattenName, new ArrayList<>(rs))),
435+
e -> { throw new AssertionError("must never happen", e); }),
436+
leaderIndicesToFollow.size(), Collections.emptyList());
431437

432-
final CountDown leaderIndicesCountDown = new CountDown(leaderIndicesToFollow.size());
433-
final AtomicArray<Tuple<Index, Exception>> results = new AtomicArray<>(leaderIndicesToFollow.size());
434-
for (int i = 0; i < leaderIndicesToFollow.size(); i++) {
435-
final Index indexToFollow = leaderIndicesToFollow.get(i);
436-
final int slot = i;
437-
438+
for (final Index indexToFollow : leaderIndicesToFollow) {
438439
List<String> otherMatchingPatterns = patternsForTheSameRemoteCluster.stream()
439440
.filter(otherPattern -> otherPattern.v2().match(indexToFollow.getName()))
440441
.map(Tuple::v1)
441442
.collect(Collectors.toList());
442443
if (otherMatchingPatterns.size() != 0) {
443-
results.set(slot, new Tuple<>(indexToFollow, new ElasticsearchException("index to follow [" + indexToFollow.getName() +
444-
"] for pattern [" + autoFollowPattenName + "] matches with other patterns " + otherMatchingPatterns + "")));
445-
if (leaderIndicesCountDown.countDown()) {
446-
resultHandler.accept(new AutoFollowResult(autoFollowPattenName, results.asList()));
447-
}
444+
groupedListener.onResponse(
445+
new Tuple<>(indexToFollow, new ElasticsearchException("index to follow [" + indexToFollow.getName() +
446+
"] for pattern [" + autoFollowPattenName + "] matches with other patterns " + otherMatchingPatterns + "")));
448447
} else {
449448
final Settings leaderIndexSettings = remoteMetadata.getIndexSafe(indexToFollow).getSettings();
450449
if (IndexSettings.INDEX_SOFT_DELETES_SETTING.get(leaderIndexSettings) == false) {
@@ -456,28 +455,15 @@ private void checkAutoFollowPattern(String autoFollowPattenName,
456455
if (error != null) {
457456
failure.addSuppressed(error);
458457
}
459-
results.set(slot, new Tuple<>(indexToFollow, failure));
460-
if (leaderIndicesCountDown.countDown()) {
461-
resultHandler.accept(new AutoFollowResult(autoFollowPattenName, results.asList()));
462-
}
458+
groupedListener.onResponse(new Tuple<>(indexToFollow, failure));
463459
});
464-
continue;
465460
} else if (leaderIndexAlreadyFollowed(autoFollowPattern, indexToFollow, localMetadata)) {
466-
updateAutoFollowMetadata(recordLeaderIndexAsFollowFunction(autoFollowPattenName, indexToFollow), error -> {
467-
results.set(slot, new Tuple<>(indexToFollow, error));
468-
if (leaderIndicesCountDown.countDown()) {
469-
resultHandler.accept(new AutoFollowResult(autoFollowPattenName, results.asList()));
470-
}
471-
});
472-
continue;
461+
updateAutoFollowMetadata(recordLeaderIndexAsFollowFunction(autoFollowPattenName, indexToFollow),
462+
error -> groupedListener.onResponse(new Tuple<>(indexToFollow, error)));
463+
} else {
464+
followLeaderIndex(autoFollowPattenName, remoteCluster, indexToFollow, autoFollowPattern, headers,
465+
error -> groupedListener.onResponse(new Tuple<>(indexToFollow, error)));
473466
}
474-
475-
followLeaderIndex(autoFollowPattenName, remoteCluster, indexToFollow, autoFollowPattern, headers, error -> {
476-
results.set(slot, new Tuple<>(indexToFollow, error));
477-
if (leaderIndicesCountDown.countDown()) {
478-
resultHandler.accept(new AutoFollowResult(autoFollowPattenName, results.asList()));
479-
}
480-
});
481467
}
482468
}
483469
}

0 commit comments

Comments
 (0)