@@ -209,23 +209,17 @@ public void testRetentionLeaseIsRenewedDuringRecovery() throws Exception {
209
209
for (final ObjectCursor <DiscoveryNode > senderNode : followerClusterState .getState ().nodes ().getNodes ().values ()) {
210
210
final MockTransportService senderTransportService =
211
211
(MockTransportService ) getFollowerCluster ().getInstance (TransportService .class , senderNode .value .getName ());
212
- final ClusterStateResponse leaderClusterState = leaderClient ().admin ().cluster ().prepareState ().clear ().setNodes (true ).get ();
213
- for (final ObjectCursor <DiscoveryNode > receiverNode : leaderClusterState .getState ().nodes ().getNodes ().values ()) {
214
- final MockTransportService receiverTransportService =
215
- (MockTransportService ) getLeaderCluster ().getInstance (TransportService .class , receiverNode .value .getName ());
216
- senderTransportService .addSendBehavior (receiverTransportService ,
217
- (connection , requestId , action , request , options ) -> {
218
- if (ClearCcrRestoreSessionAction .NAME .equals (action )) {
219
- try {
220
- latch .await ();
221
- } catch (final InterruptedException e ) {
222
- fail (e .toString ());
223
- }
212
+ senderTransportService .addSendBehavior (
213
+ (connection , requestId , action , request , options ) -> {
214
+ if (ClearCcrRestoreSessionAction .NAME .equals (action )) {
215
+ try {
216
+ latch .await ();
217
+ } catch (final InterruptedException e ) {
218
+ fail (e .toString ());
224
219
}
225
- connection .sendRequest (requestId , action , request , options );
226
- });
227
- }
228
-
220
+ }
221
+ connection .sendRequest (requestId , action , request , options );
222
+ });
229
223
}
230
224
231
225
final PlainActionFuture <RestoreInfo > future = PlainActionFuture .newFuture ();
@@ -419,50 +413,45 @@ public void testUnfollowRemovesRetentionLeases() throws Exception {
419
413
for (final ObjectCursor <DiscoveryNode > senderNode : followerClusterState .getState ().nodes ().getNodes ().values ()) {
420
414
final MockTransportService senderTransportService =
421
415
(MockTransportService ) getFollowerCluster ().getInstance (TransportService .class , senderNode .value .getName ());
422
- final ClusterStateResponse leaderClusterState =
423
- leaderClient ().admin ().cluster ().prepareState ().clear ().setNodes (true ).get ();
424
- for (final ObjectCursor <DiscoveryNode > receiverNode : leaderClusterState .getState ().nodes ().getNodes ().values ()) {
425
- final MockTransportService receiverTransportService =
426
- (MockTransportService ) getLeaderCluster ().getInstance (TransportService .class , receiverNode .value .getName ());
427
- senderTransportService .addSendBehavior (receiverTransportService ,
428
- (connection , requestId , action , request , options ) -> {
429
- if (RetentionLeaseActions .Remove .ACTION_NAME .equals (action )) {
430
- final RetentionLeaseActions .RemoveRequest removeRequest = (RetentionLeaseActions .RemoveRequest ) request ;
431
- if (shardIds .contains (removeRequest .getShardId ().id ())) {
432
- final String primaryShardNodeId =
433
- getLeaderCluster ()
434
- .clusterService ()
435
- .state ()
436
- .routingTable ()
437
- .index (leaderIndex )
438
- .shard (removeRequest .getShardId ().id ())
439
- .primaryShard ()
440
- .currentNodeId ();
441
- final String primaryShardNodeName =
442
- getLeaderCluster ().clusterService ().state ().nodes ().get (primaryShardNodeId ).getName ();
443
- final IndexShard primary =
444
- getLeaderCluster ()
445
- .getInstance (IndicesService .class , primaryShardNodeName )
446
- .getShardOrNull (removeRequest .getShardId ());
447
- final CountDownLatch latch = new CountDownLatch (1 );
448
- primary .removeRetentionLease (
449
- retentionLeaseId ,
450
- ActionListener .wrap (r -> latch .countDown (), e -> fail (e .toString ())));
451
- try {
452
- latch .await ();
453
- } catch (final InterruptedException e ) {
454
- Thread .currentThread ().interrupt ();
455
- fail (e .toString ());
456
- }
416
+ senderTransportService .addSendBehavior (
417
+ (connection , requestId , action , request , options ) -> {
418
+ if (RetentionLeaseActions .Remove .ACTION_NAME .equals (action )) {
419
+ final RetentionLeaseActions .RemoveRequest removeRequest = (RetentionLeaseActions .RemoveRequest ) request ;
420
+ if (shardIds .contains (removeRequest .getShardId ().id ())) {
421
+ final String primaryShardNodeId =
422
+ getLeaderCluster ()
423
+ .clusterService ()
424
+ .state ()
425
+ .routingTable ()
426
+ .index (leaderIndex )
427
+ .shard (removeRequest .getShardId ().id ())
428
+ .primaryShard ()
429
+ .currentNodeId ();
430
+ final String primaryShardNodeName =
431
+ getLeaderCluster ().clusterService ().state ().nodes ().get (primaryShardNodeId ).getName ();
432
+ final IndexShard primary =
433
+ getLeaderCluster ()
434
+ .getInstance (IndicesService .class , primaryShardNodeName )
435
+ .getShardOrNull (removeRequest .getShardId ());
436
+ final CountDownLatch latch = new CountDownLatch (1 );
437
+ primary .removeRetentionLease (
438
+ retentionLeaseId ,
439
+ ActionListener .wrap (r -> latch .countDown (), e -> fail (e .toString ())));
440
+ try {
441
+ latch .await ();
442
+ } catch (final InterruptedException e ) {
443
+ Thread .currentThread ().interrupt ();
444
+ fail (e .toString ());
457
445
}
458
446
}
459
- connection .sendRequest (requestId , action , request , options );
460
- });
461
- }
462
-
447
+ }
448
+ connection .sendRequest (requestId , action , request , options );
449
+ });
463
450
}
464
451
465
452
453
+
454
+
466
455
pauseFollow (followerIndex );
467
456
followerClient ().admin ().indices ().close (new CloseIndexRequest (followerIndex )).actionGet ();
468
457
assertAcked (followerClient ().execute (UnfollowAction .INSTANCE , new UnfollowAction .Request (followerIndex )).actionGet ());
@@ -508,25 +497,18 @@ public void testUnfollowFailsToRemoveRetentionLeases() throws Exception {
508
497
for (final ObjectCursor <DiscoveryNode > senderNode : followerClusterState .getState ().nodes ().getNodes ().values ()) {
509
498
final MockTransportService senderTransportService =
510
499
(MockTransportService ) getFollowerCluster ().getInstance (TransportService .class , senderNode .value .getName ());
511
- final ClusterStateResponse leaderClusterState =
512
- leaderClient ().admin ().cluster ().prepareState ().clear ().setNodes (true ).get ();
513
- for (final ObjectCursor <DiscoveryNode > receiverNode : leaderClusterState .getState ().nodes ().getNodes ().values ()) {
514
- final MockTransportService receiverTransportService =
515
- (MockTransportService ) getLeaderCluster ().getInstance (TransportService .class , receiverNode .value .getName ());
516
- senderTransportService .addSendBehavior (receiverTransportService ,
517
- (connection , requestId , action , request , options ) -> {
518
- if (RetentionLeaseActions .Remove .ACTION_NAME .equals (action )) {
519
- final RetentionLeaseActions .RemoveRequest removeRequest = (RetentionLeaseActions .RemoveRequest ) request ;
520
- if (shardIds .contains (removeRequest .getShardId ().id ())) {
521
- throw randomBoolean ()
522
- ? new ConnectTransportException (receiverNode .value , "connection failed" )
523
- : new IndexShardClosedException (removeRequest .getShardId ());
524
- }
500
+ senderTransportService .addSendBehavior (
501
+ (connection , requestId , action , request , options ) -> {
502
+ if (RetentionLeaseActions .Remove .ACTION_NAME .equals (action )) {
503
+ final RetentionLeaseActions .RemoveRequest removeRequest = (RetentionLeaseActions .RemoveRequest ) request ;
504
+ if (shardIds .contains (removeRequest .getShardId ().id ())) {
505
+ throw randomBoolean ()
506
+ ? new ConnectTransportException (connection .getNode (), "connection failed" )
507
+ : new IndexShardClosedException (removeRequest .getShardId ());
525
508
}
526
- connection .sendRequest (requestId , action , request , options );
527
- });
528
- }
529
-
509
+ }
510
+ connection .sendRequest (requestId , action , request , options );
511
+ });
530
512
}
531
513
532
514
final ElasticsearchException e = expectThrows (
0 commit comments