65
65
import java .util .stream .Collectors ;
66
66
67
67
public class FollowIndexAction extends Action <FollowIndexAction .Request ,
68
- FollowIndexAction . Response , FollowIndexAction .RequestBuilder > {
68
+ AcknowledgedResponse , FollowIndexAction .RequestBuilder > {
69
69
70
70
public static final FollowIndexAction INSTANCE = new FollowIndexAction ();
71
71
public static final String NAME = "cluster:admin/xpack/ccr/follow_index" ;
@@ -80,8 +80,8 @@ public RequestBuilder newRequestBuilder(ElasticsearchClient client) {
80
80
}
81
81
82
82
@ Override
83
- public Response newResponse () {
84
- return new Response ();
83
+ public AcknowledgedResponse newResponse () {
84
+ return new AcknowledgedResponse ();
85
85
}
86
86
87
87
public static class Request extends ActionRequest implements ToXContentObject {
@@ -137,9 +137,17 @@ public static Request fromXContent(XContentParser parser, String followerIndex)
137
137
private TimeValue retryTimeout ;
138
138
private TimeValue idleShardRetryDelay ;
139
139
140
- public Request (String leaderIndex , String followerIndex , Integer maxBatchOperationCount , Integer maxConcurrentReadBatches ,
141
- Long maxOperationSizeInBytes , Integer maxConcurrentWriteBatches , Integer maxWriteBufferSize ,
142
- TimeValue retryTimeout , TimeValue idleShardRetryDelay ) {
140
+ public Request (
141
+ String leaderIndex ,
142
+ String followerIndex ,
143
+ Integer maxBatchOperationCount ,
144
+ Integer maxConcurrentReadBatches ,
145
+ Long maxOperationSizeInBytes ,
146
+ Integer maxConcurrentWriteBatches ,
147
+ Integer maxWriteBufferSize ,
148
+ TimeValue retryTimeout ,
149
+ TimeValue idleShardRetryDelay ) {
150
+
143
151
if (leaderIndex == null ) {
144
152
throw new IllegalArgumentException ("leader_index is missing" );
145
153
}
@@ -279,8 +287,17 @@ public boolean equals(Object o) {
279
287
280
288
@ Override
281
289
public int hashCode () {
282
- return Objects .hash (leaderIndex , followerIndex , maxBatchOperationCount , maxConcurrentReadBatches , maxOperationSizeInBytes ,
283
- maxConcurrentWriteBatches , maxWriteBufferSize , retryTimeout , idleShardRetryDelay );
290
+ return Objects .hash (
291
+ leaderIndex ,
292
+ followerIndex ,
293
+ maxBatchOperationCount ,
294
+ maxConcurrentReadBatches ,
295
+ maxOperationSizeInBytes ,
296
+ maxConcurrentWriteBatches ,
297
+ maxWriteBufferSize ,
298
+ retryTimeout ,
299
+ idleShardRetryDelay
300
+ );
284
301
}
285
302
}
286
303
@@ -291,17 +308,7 @@ public static class RequestBuilder extends ActionRequestBuilder<Request, Respons
291
308
}
292
309
}
293
310
294
- public static class Response extends AcknowledgedResponse {
295
-
296
- Response () {
297
- }
298
-
299
- Response (boolean acknowledged ) {
300
- super (acknowledged );
301
- }
302
- }
303
-
304
- public static class TransportAction extends HandledTransportAction <Request , Response > {
311
+ public static class TransportAction extends HandledTransportAction <Request , AcknowledgedResponse > {
305
312
306
313
private final Client client ;
307
314
private final ClusterService clusterService ;
@@ -332,7 +339,7 @@ public TransportAction(
332
339
}
333
340
334
341
@ Override
335
- protected void doExecute (final Request request , final ActionListener <Response > listener ) {
342
+ protected void doExecute (final Request request , final ActionListener <AcknowledgedResponse > listener ) {
336
343
if (ccrLicenseChecker .isCcrAllowed ()) {
337
344
final String [] indices = new String []{request .leaderIndex };
338
345
final Map <String , List <String >> remoteClusterIndices = remoteClusterService .groupClusterIndices (indices , s -> false );
@@ -351,7 +358,8 @@ protected void doExecute(final Request request, final ActionListener<Response> l
351
358
}
352
359
}
353
360
354
- private void followLocalIndex (final Request request , final ActionListener <Response > listener ) {
361
+ private void followLocalIndex (final Request request ,
362
+ final ActionListener <AcknowledgedResponse > listener ) {
355
363
final ClusterState state = clusterService .state ();
356
364
final IndexMetaData followerIndexMetadata = state .getMetaData ().index (request .getFollowerIndex ());
357
365
// following an index in local cluster, so use local cluster state to fetch leader index metadata
@@ -367,7 +375,7 @@ private void followRemoteIndex(
367
375
final Request request ,
368
376
final String clusterAlias ,
369
377
final String leaderIndex ,
370
- final ActionListener <Response > listener ) {
378
+ final ActionListener <AcknowledgedResponse > listener ) {
371
379
final ClusterState state = clusterService .state ();
372
380
final IndexMetaData followerIndexMetadata = state .getMetaData ().index (request .getFollowerIndex ());
373
381
ccrLicenseChecker .checkRemoteClusterLicenseAndFetchLeaderIndexMetadata (
@@ -394,8 +402,13 @@ private void followRemoteIndex(
394
402
* <li>The leader index and follow index need to have the same number of primary shards</li>
395
403
* </ul>
396
404
*/
397
- void start (Request request , String clusterNameAlias , IndexMetaData leaderIndexMetadata , IndexMetaData followIndexMetadata ,
398
- ActionListener <Response > handler ) throws IOException {
405
+ void start (
406
+ Request request ,
407
+ String clusterNameAlias ,
408
+ IndexMetaData leaderIndexMetadata ,
409
+ IndexMetaData followIndexMetadata ,
410
+ ActionListener <AcknowledgedResponse > handler ) throws IOException {
411
+
399
412
MapperService mapperService = followIndexMetadata != null ? indicesService .createIndexMapperService (followIndexMetadata ) : null ;
400
413
validate (request , leaderIndexMetadata , followIndexMetadata , mapperService );
401
414
final int numShards = followIndexMetadata .getNumberOfShards ();
@@ -443,7 +456,7 @@ void finalizeResponse() {
443
456
444
457
if (error == null ) {
445
458
// include task ids?
446
- handler .onResponse (new Response (true ));
459
+ handler .onResponse (new AcknowledgedResponse (true ));
447
460
} else {
448
461
// TODO: cancel all started tasks
449
462
handler .onFailure (error );
@@ -506,7 +519,9 @@ void finalizeResponse() {
506
519
WHITELISTED_SETTINGS = Collections .unmodifiableSet (whiteListedSettings );
507
520
}
508
521
509
- static void validate (Request request , IndexMetaData leaderIndex , IndexMetaData followIndex , MapperService followerMapperService ) {
522
+ static void validate (Request request ,
523
+ IndexMetaData leaderIndex ,
524
+ IndexMetaData followIndex , MapperService followerMapperService ) {
510
525
if (leaderIndex == null ) {
511
526
throw new IllegalArgumentException ("leader index [" + request .leaderIndex + "] does not exist" );
512
527
}
0 commit comments