62
62
import java .util .concurrent .atomic .AtomicReferenceArray ;
63
63
import java .util .stream .Collectors ;
64
64
65
- public class FollowIndexAction extends Action <FollowIndexAction . Response > {
65
+ public class FollowIndexAction extends Action <AcknowledgedResponse > {
66
66
67
67
public static final FollowIndexAction INSTANCE = new FollowIndexAction ();
68
68
public static final String NAME = "cluster:admin/xpack/ccr/follow_index" ;
@@ -72,8 +72,8 @@ private FollowIndexAction() {
72
72
}
73
73
74
74
@ Override
75
- public Response newResponse () {
76
- return new Response ();
75
+ public AcknowledgedResponse newResponse () {
76
+ return new AcknowledgedResponse ();
77
77
}
78
78
79
79
public static class Request extends ActionRequest implements ToXContentObject {
@@ -129,9 +129,17 @@ public static Request fromXContent(XContentParser parser, String followerIndex)
129
129
private TimeValue retryTimeout ;
130
130
private TimeValue idleShardRetryDelay ;
131
131
132
- public Request (String leaderIndex , String followerIndex , Integer maxBatchOperationCount , Integer maxConcurrentReadBatches ,
133
- Long maxOperationSizeInBytes , Integer maxConcurrentWriteBatches , Integer maxWriteBufferSize ,
134
- TimeValue retryTimeout , TimeValue idleShardRetryDelay ) {
132
+ public Request (
133
+ String leaderIndex ,
134
+ String followerIndex ,
135
+ Integer maxBatchOperationCount ,
136
+ Integer maxConcurrentReadBatches ,
137
+ Long maxOperationSizeInBytes ,
138
+ Integer maxConcurrentWriteBatches ,
139
+ Integer maxWriteBufferSize ,
140
+ TimeValue retryTimeout ,
141
+ TimeValue idleShardRetryDelay ) {
142
+
135
143
if (leaderIndex == null ) {
136
144
throw new IllegalArgumentException ("leader_index is missing" );
137
145
}
@@ -271,22 +279,21 @@ public boolean equals(Object o) {
271
279
272
280
@ Override
273
281
public int hashCode () {
274
- return Objects .hash (leaderIndex , followerIndex , maxBatchOperationCount , maxConcurrentReadBatches , maxOperationSizeInBytes ,
275
- maxConcurrentWriteBatches , maxWriteBufferSize , retryTimeout , idleShardRetryDelay );
276
- }
277
- }
278
-
279
- public static class Response extends AcknowledgedResponse {
280
-
281
- Response () {
282
- }
283
-
284
- Response (boolean acknowledged ) {
285
- super (acknowledged );
282
+ return Objects .hash (
283
+ leaderIndex ,
284
+ followerIndex ,
285
+ maxBatchOperationCount ,
286
+ maxConcurrentReadBatches ,
287
+ maxOperationSizeInBytes ,
288
+ maxConcurrentWriteBatches ,
289
+ maxWriteBufferSize ,
290
+ retryTimeout ,
291
+ idleShardRetryDelay
292
+ );
286
293
}
287
294
}
288
295
289
- public static class TransportAction extends HandledTransportAction <Request , Response > {
296
+ public static class TransportAction extends HandledTransportAction <Request , AcknowledgedResponse > {
290
297
291
298
private final Client client ;
292
299
private final ThreadPool threadPool ;
@@ -318,7 +325,9 @@ public TransportAction(
318
325
}
319
326
320
327
@ Override
321
- protected void doExecute (final Task task , final Request request , final ActionListener <Response > listener ) {
328
+ protected void doExecute (final Task task ,
329
+ final Request request ,
330
+ final ActionListener <AcknowledgedResponse > listener ) {
322
331
if (ccrLicenseChecker .isCcrAllowed ()) {
323
332
final String [] indices = new String []{request .leaderIndex };
324
333
final Map <String , List <String >> remoteClusterIndices = remoteClusterService .groupClusterIndices (indices , s -> false );
@@ -337,7 +346,8 @@ protected void doExecute(final Task task, final Request request, final ActionLis
337
346
}
338
347
}
339
348
340
- private void followLocalIndex (final Request request , final ActionListener <Response > listener ) {
349
+ private void followLocalIndex (final Request request ,
350
+ final ActionListener <AcknowledgedResponse > listener ) {
341
351
final ClusterState state = clusterService .state ();
342
352
final IndexMetaData followerIndexMetadata = state .getMetaData ().index (request .getFollowerIndex ());
343
353
// following an index in local cluster, so use local cluster state to fetch leader index metadata
@@ -353,7 +363,7 @@ private void followRemoteIndex(
353
363
final Request request ,
354
364
final String clusterAlias ,
355
365
final String leaderIndex ,
356
- final ActionListener <Response > listener ) {
366
+ final ActionListener <AcknowledgedResponse > listener ) {
357
367
final ClusterState state = clusterService .state ();
358
368
final IndexMetaData followerIndexMetadata = state .getMetaData ().index (request .getFollowerIndex ());
359
369
ccrLicenseChecker .checkRemoteClusterLicenseAndFetchLeaderIndexMetadata (
@@ -380,8 +390,13 @@ private void followRemoteIndex(
380
390
* <li>The leader index and follow index need to have the same number of primary shards</li>
381
391
* </ul>
382
392
*/
383
- void start (Request request , String clusterNameAlias , IndexMetaData leaderIndexMetadata , IndexMetaData followIndexMetadata ,
384
- ActionListener <Response > handler ) throws IOException {
393
+ void start (
394
+ Request request ,
395
+ String clusterNameAlias ,
396
+ IndexMetaData leaderIndexMetadata ,
397
+ IndexMetaData followIndexMetadata ,
398
+ ActionListener <AcknowledgedResponse > handler ) throws IOException {
399
+
385
400
MapperService mapperService = followIndexMetadata != null ? indicesService .createIndexMapperService (followIndexMetadata ) : null ;
386
401
validate (request , leaderIndexMetadata , followIndexMetadata , mapperService );
387
402
final int numShards = followIndexMetadata .getNumberOfShards ();
@@ -429,7 +444,7 @@ void finalizeResponse() {
429
444
430
445
if (error == null ) {
431
446
// include task ids?
432
- handler .onResponse (new Response (true ));
447
+ handler .onResponse (new AcknowledgedResponse (true ));
433
448
} else {
434
449
// TODO: cancel all started tasks
435
450
handler .onFailure (error );
@@ -493,7 +508,9 @@ void finalizeResponse() {
493
508
WHITELISTED_SETTINGS = Collections .unmodifiableSet (whiteListedSettings );
494
509
}
495
510
496
- static void validate (Request request , IndexMetaData leaderIndex , IndexMetaData followIndex , MapperService followerMapperService ) {
511
+ static void validate (Request request ,
512
+ IndexMetaData leaderIndex ,
513
+ IndexMetaData followIndex , MapperService followerMapperService ) {
497
514
if (leaderIndex == null ) {
498
515
throw new IllegalArgumentException ("leader index [" + request .leaderIndex + "] does not exist" );
499
516
}
0 commit comments