Skip to content

Commit 883940a

Browse files
authored
[CCR] Change AutofollowCoordinator to use wait_for_metadata_version (#36264)
Changed AutofollowCoordinator makes use of the wait_for_metadata_version feature in cluster state API and removed hard coded poll interval. Originates from #35895 Relates to #33007
1 parent c6de68c commit 883940a

File tree

5 files changed

+183
-55
lines changed

5 files changed

+183
-55
lines changed

x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -156,7 +156,7 @@ public Collection<Object> createComponents(
156156

157157
return Arrays.asList(
158158
ccrLicenseChecker,
159-
new AutoFollowCoordinator(client, threadPool, clusterService, ccrLicenseChecker)
159+
new AutoFollowCoordinator(client, clusterService, ccrLicenseChecker)
160160
);
161161
}
162162

x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrLicenseChecker.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -121,8 +121,9 @@ public void checkRemoteClusterLicenseAndFetchLeaderIndexMetadataAndHistoryUUIDs(
121121
client.getRemoteClusterClient(clusterAlias),
122122
request,
123123
onFailure,
124-
leaderClusterState -> {
125-
IndexMetaData leaderIndexMetaData = leaderClusterState.getMetaData().index(leaderIndex);
124+
remoteClusterStateResponse -> {
125+
ClusterState remoteClusterState = remoteClusterStateResponse.getState();
126+
IndexMetaData leaderIndexMetaData = remoteClusterState.getMetaData().index(leaderIndex);
126127
if (leaderIndexMetaData == null) {
127128
onFailure.accept(new IndexNotFoundException(leaderIndex));
128129
return;
@@ -159,7 +160,7 @@ public void checkRemoteClusterLicenseAndFetchClusterState(
159160
final String clusterAlias,
160161
final ClusterStateRequest request,
161162
final Consumer<Exception> onFailure,
162-
final Consumer<ClusterState> leaderClusterStateConsumer) {
163+
final Consumer<ClusterStateResponse> leaderClusterStateConsumer) {
163164
try {
164165
Client remoteClient = systemClient(client.getRemoteClusterClient(clusterAlias));
165166
checkRemoteClusterLicenseAndFetchClusterState(
@@ -199,7 +200,7 @@ private void checkRemoteClusterLicenseAndFetchClusterState(
199200
final Client remoteClient,
200201
final ClusterStateRequest request,
201202
final Consumer<Exception> onFailure,
202-
final Consumer<ClusterState> leaderClusterStateConsumer,
203+
final Consumer<ClusterStateResponse> leaderClusterStateConsumer,
203204
final Function<RemoteClusterLicenseChecker.LicenseCheck, ElasticsearchStatusException> nonCompliantLicense,
204205
final Function<Exception, ElasticsearchStatusException> unknownLicense) {
205206
// we have to check the license on the remote cluster
@@ -211,7 +212,7 @@ private void checkRemoteClusterLicenseAndFetchClusterState(
211212
public void onResponse(final RemoteClusterLicenseChecker.LicenseCheck licenseCheck) {
212213
if (licenseCheck.isSuccess()) {
213214
final ActionListener<ClusterStateResponse> clusterStateListener =
214-
ActionListener.wrap(s -> leaderClusterStateConsumer.accept(s.getState()), onFailure);
215+
ActionListener.wrap(leaderClusterStateConsumer::accept, onFailure);
215216
// following an index in remote cluster, so use remote client to fetch leader index metadata
216217
remoteClient.admin().cluster().state(request, clusterStateListener);
217218
} else {

x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinator.java

Lines changed: 20 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
import org.elasticsearch.Version;
1515
import org.elasticsearch.action.ActionListener;
1616
import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest;
17+
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
1718
import org.elasticsearch.client.Client;
1819
import org.elasticsearch.cluster.ClusterChangedEvent;
1920
import org.elasticsearch.cluster.ClusterState;
@@ -26,13 +27,11 @@
2627
import org.elasticsearch.common.collect.CopyOnWriteHashMap;
2728
import org.elasticsearch.common.collect.Tuple;
2829
import org.elasticsearch.common.settings.Settings;
29-
import org.elasticsearch.common.unit.TimeValue;
3030
import org.elasticsearch.common.util.concurrent.AtomicArray;
3131
import org.elasticsearch.common.util.concurrent.CountDown;
3232
import org.elasticsearch.index.Index;
3333
import org.elasticsearch.index.IndexSettings;
3434
import org.elasticsearch.license.LicenseUtils;
35-
import org.elasticsearch.threadpool.ThreadPool;
3635
import org.elasticsearch.xpack.ccr.CcrLicenseChecker;
3736
import org.elasticsearch.xpack.core.ccr.AutoFollowMetadata;
3837
import org.elasticsearch.xpack.core.ccr.AutoFollowMetadata.AutoFollowPattern;
@@ -66,7 +65,6 @@ public class AutoFollowCoordinator implements ClusterStateListener {
6665
private static final int MAX_AUTO_FOLLOW_ERRORS = 256;
6766

6867
private final Client client;
69-
private final ThreadPool threadPool;
7068
private final ClusterService clusterService;
7169
private final CcrLicenseChecker ccrLicenseChecker;
7270

@@ -80,11 +78,9 @@ public class AutoFollowCoordinator implements ClusterStateListener {
8078

8179
public AutoFollowCoordinator(
8280
Client client,
83-
ThreadPool threadPool,
8481
ClusterService clusterService,
8582
CcrLicenseChecker ccrLicenseChecker) {
8683
this.client = client;
87-
this.threadPool = threadPool;
8884
this.clusterService = clusterService;
8985
this.ccrLicenseChecker = Objects.requireNonNull(ccrLicenseChecker, "ccrLicenseChecker");
9086
clusterService.addListener(this);
@@ -150,22 +146,24 @@ void updateAutoFollowers(ClusterState followerClusterState) {
150146

151147
Map<String, AutoFollower> newAutoFollowers = new HashMap<>(newRemoteClusters.size());
152148
for (String remoteCluster : newRemoteClusters) {
153-
AutoFollower autoFollower = new AutoFollower(remoteCluster, threadPool, this::updateStats, clusterService::state) {
149+
AutoFollower autoFollower = new AutoFollower(remoteCluster, this::updateStats, clusterService::state) {
154150

155151
@Override
156152
void getRemoteClusterState(final String remoteCluster,
157-
final BiConsumer<ClusterState, Exception> handler) {
153+
final long metadataVersion,
154+
final BiConsumer<ClusterStateResponse, Exception> handler) {
158155
final ClusterStateRequest request = new ClusterStateRequest();
159156
request.clear();
160157
request.metaData(true);
161158
request.routingTable(true);
159+
request.waitForMetaDataVersion(metadataVersion);
162160
// TODO: set non-compliant status on auto-follow coordination that can be viewed via a stats API
163161
ccrLicenseChecker.checkRemoteClusterLicenseAndFetchClusterState(
164162
client,
165163
remoteCluster,
166164
request,
167165
e -> handler.accept(null, e),
168-
remoteClusterState -> handler.accept(remoteClusterState, null));
166+
remoteClusterStateResponse -> handler.accept(remoteClusterStateResponse, null));
169167
}
170168

171169
@Override
@@ -239,19 +237,17 @@ public void clusterChanged(ClusterChangedEvent event) {
239237
abstract static class AutoFollower {
240238

241239
private final String remoteCluster;
242-
private final ThreadPool threadPool;
243240
private final Consumer<List<AutoFollowResult>> statsUpdater;
244241
private final Supplier<ClusterState> followerClusterStateSupplier;
245242

243+
private volatile long metadataVersion = 0;
246244
private volatile CountDown autoFollowPatternsCountDown;
247245
private volatile AtomicArray<AutoFollowResult> autoFollowResults;
248246

249247
AutoFollower(final String remoteCluster,
250-
final ThreadPool threadPool,
251248
final Consumer<List<AutoFollowResult>> statsUpdater,
252249
final Supplier<ClusterState> followerClusterStateSupplier) {
253250
this.remoteCluster = remoteCluster;
254-
this.threadPool = threadPool;
255251
this.statsUpdater = statsUpdater;
256252
this.followerClusterStateSupplier = followerClusterStateSupplier;
257253
}
@@ -276,9 +272,15 @@ void start() {
276272
this.autoFollowPatternsCountDown = new CountDown(patterns.size());
277273
this.autoFollowResults = new AtomicArray<>(patterns.size());
278274

279-
getRemoteClusterState(remoteCluster, (remoteClusterState, remoteError) -> {
280-
if (remoteClusterState != null) {
275+
getRemoteClusterState(remoteCluster, metadataVersion + 1, (remoteClusterStateResponse, remoteError) -> {
276+
if (remoteClusterStateResponse != null) {
281277
assert remoteError == null;
278+
if (remoteClusterStateResponse.isWaitForTimedOut()) {
279+
start();
280+
return;
281+
}
282+
ClusterState remoteClusterState = remoteClusterStateResponse.getState();
283+
metadataVersion = remoteClusterState.metaData().version();
282284
autoFollowIndices(autoFollowMetadata, clusterState, remoteClusterState, patterns);
283285
} else {
284286
assert remoteError != null;
@@ -402,8 +404,7 @@ private void finalise(int slot, AutoFollowResult result) {
402404
autoFollowResults.set(slot, result);
403405
if (autoFollowPatternsCountDown.countDown()) {
404406
statsUpdater.accept(autoFollowResults.asList());
405-
// TODO: Remove scheduling here with using cluster state API's waitForMetadataVersion:
406-
threadPool.schedule(TimeValue.timeValueMillis(2500), ThreadPool.Names.GENERIC, this::start);
407+
start();
407408
}
408409
}
409410

@@ -525,13 +526,15 @@ static Function<ClusterState, ClusterState> cleanFollowedRemoteIndices(
525526
}
526527

527528
/**
528-
* Fetch the cluster state from the leader with the specified cluster alias
529+
* Fetch a remote cluster state from with the specified cluster alias
529530
* @param remoteCluster the name of the leader cluster
531+
* @param metadataVersion the last seen metadata version
530532
* @param handler the callback to invoke
531533
*/
532534
abstract void getRemoteClusterState(
533535
String remoteCluster,
534-
BiConsumer<ClusterState, Exception> handler
536+
long metadataVersion,
537+
BiConsumer<ClusterStateResponse, Exception> handler
535538
);
536539

537540
abstract void createAndFollow(

x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportPutAutoFollowPatternAction.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77

88
import org.elasticsearch.action.ActionListener;
99
import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest;
10+
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
1011
import org.elasticsearch.action.support.ActionFilters;
1112
import org.elasticsearch.action.support.master.AcknowledgedResponse;
1213
import org.elasticsearch.action.support.master.TransportMasterNodeAction;
@@ -80,7 +81,7 @@ protected void masterOperation(PutAutoFollowPatternAction.Request request,
8081
.filter(e -> ShardFollowTask.HEADER_FILTERS.contains(e.getKey()))
8182
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
8283

83-
Consumer<ClusterState> consumer = remoteClusterState -> {
84+
Consumer<ClusterStateResponse> consumer = remoteClusterState -> {
8485
String[] indices = request.getLeaderIndexPatterns().toArray(new String[0]);
8586
ccrLicenseChecker.hasPrivilegesToFollowIndices(remoteClient, indices, e -> {
8687
if (e == null) {
@@ -94,7 +95,7 @@ protected AcknowledgedResponse newResponse(boolean acknowledged) {
9495

9596
@Override
9697
public ClusterState execute(ClusterState currentState) throws Exception {
97-
return innerPut(request, filteredHeaders, currentState, remoteClusterState);
98+
return innerPut(request, filteredHeaders, currentState, remoteClusterState.getState());
9899
}
99100
});
100101
} else {

0 commit comments

Comments
 (0)