24
24
import org .elasticsearch .cluster .ClusterState ;
25
25
import org .elasticsearch .cluster .metadata .IndexMetaData ;
26
26
import org .elasticsearch .cluster .metadata .MappingMetaData ;
27
+ import org .elasticsearch .cluster .metadata .MetaData ;
27
28
import org .elasticsearch .cluster .routing .IndexRoutingTable ;
28
29
import org .elasticsearch .cluster .service .ClusterService ;
29
30
import org .elasticsearch .common .CheckedConsumer ;
30
31
import org .elasticsearch .common .settings .IndexScopedSettings ;
31
32
import org .elasticsearch .common .settings .Settings ;
33
+ import org .elasticsearch .common .settings .SettingsModule ;
32
34
import org .elasticsearch .common .unit .TimeValue ;
33
35
import org .elasticsearch .common .util .concurrent .EsRejectedExecutionException ;
34
36
import org .elasticsearch .index .Index ;
46
48
import org .elasticsearch .tasks .TaskId ;
47
49
import org .elasticsearch .threadpool .ThreadPool ;
48
50
import org .elasticsearch .xpack .ccr .Ccr ;
51
+ import org .elasticsearch .xpack .ccr .CcrSettings ;
49
52
import org .elasticsearch .xpack .ccr .action .bulk .BulkShardOperationsAction ;
50
53
import org .elasticsearch .xpack .ccr .action .bulk .BulkShardOperationsRequest ;
51
54
import org .elasticsearch .xpack .ccr .action .bulk .BulkShardOperationsResponse ;
@@ -69,16 +72,20 @@ public class ShardFollowTasksExecutor extends PersistentTasksExecutor<ShardFollo
69
72
private final ThreadPool threadPool ;
70
73
private final ClusterService clusterService ;
71
74
private final IndexScopedSettings indexScopedSettings ;
75
+ private volatile TimeValue waitForMetadataTimeOut ;
72
76
73
77
public ShardFollowTasksExecutor (Client client ,
74
78
ThreadPool threadPool ,
75
79
ClusterService clusterService ,
76
- IndexScopedSettings indexScopedSettings ) {
80
+ SettingsModule settingsModule ) {
77
81
super (ShardFollowTask .NAME , Ccr .CCR_THREAD_POOL_NAME );
78
82
this .client = client ;
79
83
this .threadPool = threadPool ;
80
84
this .clusterService = clusterService ;
81
- this .indexScopedSettings = indexScopedSettings ;
85
+ this .indexScopedSettings = settingsModule .getIndexScopedSettings ();
86
+ this .waitForMetadataTimeOut = CcrSettings .CCR_WAIT_FOR_METADATA_TIMEOUT .get (settingsModule .getSettings ());
87
+ clusterService .getClusterSettings ().addSettingsUpdateConsumer (CcrSettings .CCR_WAIT_FOR_METADATA_TIMEOUT ,
88
+ newVal -> this .waitForMetadataTimeOut = newVal );
82
89
}
83
90
84
91
@ Override
@@ -112,33 +119,25 @@ protected AllocatedPersistentTask createTask(long id, String type, String action
112
119
scheduler , System ::nanoTime ) {
113
120
114
121
@ Override
115
- protected void innerUpdateMapping (LongConsumer handler , Consumer <Exception > errorHandler ) {
116
- Index leaderIndex = params .getLeaderShardId ().getIndex ();
117
- Index followIndex = params .getFollowShardId ().getIndex ();
118
-
119
- ClusterStateRequest clusterStateRequest = CcrRequests .metaDataRequest (leaderIndex .getName ());
120
- CheckedConsumer <ClusterStateResponse , Exception > onResponse = clusterStateResponse -> {
121
- IndexMetaData indexMetaData = clusterStateResponse .getState ().metaData ().getIndexSafe (leaderIndex );
122
- if (indexMetaData .getMappings ().isEmpty ()) {
123
- assert indexMetaData .getMappingVersion () == 1 ;
124
- handler .accept (indexMetaData .getMappingVersion ());
125
- return ;
126
- }
127
-
128
- assert indexMetaData .getMappings ().size () == 1 : "expected exactly one mapping, but got [" +
129
- indexMetaData .getMappings ().size () + "]" ;
130
- MappingMetaData mappingMetaData = indexMetaData .getMappings ().iterator ().next ().value ;
131
-
132
- PutMappingRequest putMappingRequest = CcrRequests .putMappingRequest (followIndex .getName (), mappingMetaData );
133
- followerClient .admin ().indices ().putMapping (putMappingRequest , ActionListener .wrap (
134
- putMappingResponse -> handler .accept (indexMetaData .getMappingVersion ()),
135
- errorHandler ));
136
- };
137
- try {
138
- remoteClient (params ).admin ().cluster ().state (clusterStateRequest , ActionListener .wrap (onResponse , errorHandler ));
139
- } catch (Exception e ) {
140
- errorHandler .accept (e );
141
- }
122
+ protected void innerUpdateMapping (long minRequiredMappingVersion , LongConsumer handler , Consumer <Exception > errorHandler ) {
123
+ final Index followerIndex = params .getFollowShardId ().getIndex ();
124
+ getIndexMetadata (minRequiredMappingVersion , 0L , params , ActionListener .wrap (
125
+ indexMetaData -> {
126
+ if (indexMetaData .getMappings ().isEmpty ()) {
127
+ assert indexMetaData .getMappingVersion () == 1 ;
128
+ handler .accept (indexMetaData .getMappingVersion ());
129
+ return ;
130
+ }
131
+ assert indexMetaData .getMappings ().size () == 1 : "expected exactly one mapping, but got [" +
132
+ indexMetaData .getMappings ().size () + "]" ;
133
+ MappingMetaData mappingMetaData = indexMetaData .getMappings ().iterator ().next ().value ;
134
+ PutMappingRequest putMappingRequest = CcrRequests .putMappingRequest (followerIndex .getName (), mappingMetaData );
135
+ followerClient .admin ().indices ().putMapping (putMappingRequest , ActionListener .wrap (
136
+ putMappingResponse -> handler .accept (indexMetaData .getMappingVersion ()),
137
+ errorHandler ));
138
+ },
139
+ errorHandler
140
+ ));
142
141
}
143
142
144
143
@ Override
@@ -257,6 +256,39 @@ private Client remoteClient(ShardFollowTask params) {
257
256
return wrapClient (client .getRemoteClusterClient (params .getRemoteCluster ()), params .getHeaders ());
258
257
}
259
258
259
+ private void getIndexMetadata (long minRequiredMappingVersion , long minRequiredMetadataVersion ,
260
+ ShardFollowTask params , ActionListener <IndexMetaData > listener ) {
261
+ final Index leaderIndex = params .getLeaderShardId ().getIndex ();
262
+ final ClusterStateRequest clusterStateRequest = CcrRequests .metaDataRequest (leaderIndex .getName ());
263
+ if (minRequiredMetadataVersion > 0 ) {
264
+ clusterStateRequest .waitForMetaDataVersion (minRequiredMetadataVersion ).waitForTimeout (waitForMetadataTimeOut );
265
+ }
266
+ try {
267
+ remoteClient (params ).admin ().cluster ().state (clusterStateRequest , ActionListener .wrap (
268
+ r -> {
269
+ // if wait_for_metadata_version timeout, the response is empty
270
+ if (r .getState () == null ) {
271
+ assert minRequiredMetadataVersion > 0 ;
272
+ getIndexMetadata (minRequiredMappingVersion , minRequiredMetadataVersion , params , listener );
273
+ return ;
274
+ }
275
+ final MetaData metaData = r .getState ().metaData ();
276
+ final IndexMetaData indexMetaData = metaData .getIndexSafe (leaderIndex );
277
+ if (indexMetaData .getMappingVersion () < minRequiredMappingVersion ) {
278
+ // ask for the next version.
279
+ getIndexMetadata (minRequiredMappingVersion , metaData .version () + 1 , params , listener );
280
+ } else {
281
+ assert metaData .version () >= minRequiredMetadataVersion : metaData .version () + " < " + minRequiredMetadataVersion ;
282
+ listener .onResponse (indexMetaData );
283
+ }
284
+ },
285
+ listener ::onFailure
286
+ ));
287
+ } catch (Exception e ) {
288
+ listener .onFailure (e );
289
+ }
290
+ }
291
+
260
292
interface FollowerStatsInfoHandler {
261
293
void accept (String followerHistoryUUID , long globalCheckpoint , long maxSeqNo );
262
294
}
0 commit comments