diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinator.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinator.java index 4d709b6d35bc6..ab78dbf3d62b6 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinator.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinator.java @@ -14,6 +14,7 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest; import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse; +import org.elasticsearch.action.support.GroupedActionListener; import org.elasticsearch.client.Client; import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterState; @@ -428,23 +429,21 @@ private void checkAutoFollowPattern(String autoFollowPattenName, MetaData remoteMetadata, MetaData localMetadata, Consumer resultHandler) { + final GroupedActionListener> groupedListener = new GroupedActionListener<>( + ActionListener.wrap( + rs -> resultHandler.accept(new AutoFollowResult(autoFollowPattenName, new ArrayList<>(rs))), + e -> { throw new AssertionError("must never happen", e); }), + leaderIndicesToFollow.size(), Collections.emptyList()); - final CountDown leaderIndicesCountDown = new CountDown(leaderIndicesToFollow.size()); - final AtomicArray> results = new AtomicArray<>(leaderIndicesToFollow.size()); - for (int i = 0; i < leaderIndicesToFollow.size(); i++) { - final Index indexToFollow = leaderIndicesToFollow.get(i); - final int slot = i; - + for (final Index indexToFollow : leaderIndicesToFollow) { List otherMatchingPatterns = patternsForTheSameRemoteCluster.stream() .filter(otherPattern -> otherPattern.v2().match(indexToFollow.getName())) .map(Tuple::v1) .collect(Collectors.toList()); if (otherMatchingPatterns.size() != 0) { - results.set(slot, new Tuple<>(indexToFollow, new ElasticsearchException("index to follow [" + indexToFollow.getName() + - "] for pattern [" + autoFollowPattenName + "] matches with other patterns " + otherMatchingPatterns + ""))); - if (leaderIndicesCountDown.countDown()) { - resultHandler.accept(new AutoFollowResult(autoFollowPattenName, results.asList())); - } + groupedListener.onResponse( + new Tuple<>(indexToFollow, new ElasticsearchException("index to follow [" + indexToFollow.getName() + + "] for pattern [" + autoFollowPattenName + "] matches with other patterns " + otherMatchingPatterns + ""))); } else { final Settings leaderIndexSettings = remoteMetadata.getIndexSafe(indexToFollow).getSettings(); if (IndexSettings.INDEX_SOFT_DELETES_SETTING.get(leaderIndexSettings) == false) { @@ -456,28 +455,15 @@ private void checkAutoFollowPattern(String autoFollowPattenName, if (error != null) { failure.addSuppressed(error); } - results.set(slot, new Tuple<>(indexToFollow, failure)); - if (leaderIndicesCountDown.countDown()) { - resultHandler.accept(new AutoFollowResult(autoFollowPattenName, results.asList())); - } + groupedListener.onResponse(new Tuple<>(indexToFollow, failure)); }); - continue; } else if (leaderIndexAlreadyFollowed(autoFollowPattern, indexToFollow, localMetadata)) { - updateAutoFollowMetadata(recordLeaderIndexAsFollowFunction(autoFollowPattenName, indexToFollow), error -> { - results.set(slot, new Tuple<>(indexToFollow, error)); - if (leaderIndicesCountDown.countDown()) { - resultHandler.accept(new AutoFollowResult(autoFollowPattenName, results.asList())); - } - }); - continue; + updateAutoFollowMetadata(recordLeaderIndexAsFollowFunction(autoFollowPattenName, indexToFollow), + error -> groupedListener.onResponse(new Tuple<>(indexToFollow, error))); + } else { + followLeaderIndex(autoFollowPattenName, remoteCluster, indexToFollow, autoFollowPattern, headers, + error -> groupedListener.onResponse(new Tuple<>(indexToFollow, error))); } - - followLeaderIndex(autoFollowPattenName, remoteCluster, indexToFollow, autoFollowPattern, headers, error -> { - results.set(slot, new Tuple<>(indexToFollow, error)); - if (leaderIndicesCountDown.countDown()) { - resultHandler.accept(new AutoFollowResult(autoFollowPattenName, results.asList())); - } - }); } } }