24
24
import org .elasticsearch .common .io .stream .StreamInput ;
25
25
import org .elasticsearch .common .io .stream .StreamOutput ;
26
26
import org .elasticsearch .common .settings .Settings ;
27
+ import org .elasticsearch .index .IndexSettings ;
27
28
import org .elasticsearch .index .shard .ShardId ;
28
29
import org .elasticsearch .persistent .PersistentTasksCustomMetaData ;
29
30
import org .elasticsearch .persistent .PersistentTasksService ;
@@ -224,29 +225,13 @@ protected void doExecute(Request request, ActionListener<Response> listener) {
224
225
*/
225
226
void start (Request request , String clusterNameAlias , IndexMetaData leaderIndexMetadata , IndexMetaData followIndexMetadata ,
226
227
ActionListener <Response > handler ) {
227
- if (leaderIndexMetadata == null ) {
228
- handler .onFailure (new IllegalArgumentException ("leader index [" + request .leaderIndex + "] does not exist" ));
229
- return ;
230
- }
231
-
232
- if (followIndexMetadata == null ) {
233
- handler .onFailure (new IllegalArgumentException ("follow index [" + request .followIndex + "] does not exist" ));
234
- return ;
235
- }
236
-
237
- if (leaderIndexMetadata .getNumberOfShards () != followIndexMetadata .getNumberOfShards ()) {
238
- handler .onFailure (new IllegalArgumentException ("leader index primary shards [" +
239
- leaderIndexMetadata .getNumberOfShards () + "] does not match with the number of " +
240
- "shards of the follow index [" + followIndexMetadata .getNumberOfShards () + "]" ));
241
- // TODO: other validation checks
242
- } else {
228
+ validate (leaderIndexMetadata ,followIndexMetadata , request );
243
229
final int numShards = followIndexMetadata .getNumberOfShards ();
244
230
final AtomicInteger counter = new AtomicInteger (numShards );
245
231
final AtomicReferenceArray <Object > responses = new AtomicReferenceArray <>(followIndexMetadata .getNumberOfShards ());
246
232
Map <String , String > filteredHeaders = threadPool .getThreadContext ().getHeaders ().entrySet ().stream ()
247
233
.filter (e -> ShardFollowTask .HEADER_FILTERS .contains (e .getKey ()))
248
- .collect (Collectors .toMap (Map .Entry ::getKey , Map .Entry ::getValue ));
249
- for (int i = 0 ; i < numShards ; i ++) {
234
+ .collect (Collectors .toMap (Map .Entry ::getKey , Map .Entry ::getValue ));for (int i = 0 ; i < numShards ; i ++) {
250
235
final int shardId = i ;
251
236
String taskId = followIndexMetadata .getIndexUUID () + "-" + shardId ;
252
237
ShardFollowTask shardFollowTask = new ShardFollowTask (clusterNameAlias ,
@@ -261,39 +246,59 @@ public void onResponse(PersistentTasksCustomMetaData.PersistentTask<ShardFollowT
261
246
finalizeResponse ();
262
247
}
263
248
264
- @ Override
265
- public void onFailure (Exception e ) {
266
- responses .set (shardId , e );
267
- finalizeResponse ();
268
- }
249
+ @ Override
250
+ public void onFailure (Exception e ) {
251
+ responses .set (shardId , e );
252
+ finalizeResponse ();
253
+ }
269
254
270
- void finalizeResponse () {
271
- Exception error = null ;
272
- if (counter .decrementAndGet () == 0 ) {
273
- for (int j = 0 ; j < responses .length (); j ++) {
274
- Object response = responses .get (j );
275
- if (response instanceof Exception ) {
276
- if (error == null ) {
277
- error = (Exception ) response ;
278
- } else {
279
- error .addSuppressed ((Throwable ) response );
280
- }
255
+ void finalizeResponse () {
256
+ Exception error = null ;
257
+ if (counter .decrementAndGet () == 0 ) {
258
+ for (int j = 0 ; j < responses .length (); j ++) {
259
+ Object response = responses .get (j );
260
+ if (response instanceof Exception ) {
261
+ if (error == null ) {
262
+ error = (Exception ) response ;
263
+ } else {
264
+ error .addSuppressed ((Throwable ) response );
281
265
}
282
266
}
267
+ }
283
268
284
- if (error == null ) {
285
- // include task ids?
286
- handler .onResponse (new Response (true ));
287
- } else {
288
- // TODO: cancel all started tasks
289
- handler .onFailure (error );
290
- }
269
+ if (error == null ) {
270
+ // include task ids?
271
+ handler .onResponse (new Response (true ));
272
+ } else {
273
+ // TODO: cancel all started tasks
274
+ handler .onFailure (error );
291
275
}
292
276
}
293
277
}
294
- );
295
- }
278
+ }
279
+ );
296
280
}
297
281
}
298
282
}
283
+
284
+
285
+ static void validate (IndexMetaData leaderIndex , IndexMetaData followIndex , Request request ) {
286
+ if (leaderIndex == null ) {
287
+ throw new IllegalArgumentException ("leader index [" + request .leaderIndex + "] does not exist" );
288
+ }
289
+
290
+ if (followIndex == null ) {
291
+ throw new IllegalArgumentException ("follow index [" + request .followIndex + "] does not exist" );
292
+ }
293
+ if (leaderIndex .getSettings ().getAsBoolean (IndexSettings .INDEX_SOFT_DELETES_SETTING .getKey (), false ) == false ) {
294
+ throw new IllegalArgumentException ("leader index [" + request .leaderIndex + "] does not have soft deletes enabled" );
295
+ }
296
+
297
+ if (leaderIndex .getNumberOfShards () != followIndex .getNumberOfShards ()) {
298
+ throw new IllegalArgumentException ("leader index primary shards [" + leaderIndex .getNumberOfShards () +
299
+ "] does not match with the number of shards of the follow index [" + followIndex .getNumberOfShards () + "]" );
300
+ }
301
+ // TODO: other validation checks
302
+ }
303
+
299
304
}
0 commit comments