6
6
7
7
package org .elasticsearch .xpack .ccr ;
8
8
9
- import com .carrotsearch .hppc .cursors .ObjectCursor ;
10
9
import org .elasticsearch .ElasticsearchException ;
11
10
import org .elasticsearch .action .ActionListener ;
12
11
import org .elasticsearch .action .admin .cluster .settings .ClusterUpdateSettingsRequest ;
@@ -222,9 +221,9 @@ public void testRetentionLeaseIsRenewedDuringRecovery() throws Exception {
222
221
223
222
// block the recovery from completing; this ensures the background sync is still running
224
223
final ClusterStateResponse followerClusterState = followerClient ().admin ().cluster ().prepareState ().clear ().setNodes (true ).get ();
225
- for (final ObjectCursor < DiscoveryNode > senderNode : followerClusterState .getState ().nodes (). getNodes (). values ()) {
224
+ for (final DiscoveryNode senderNode : followerClusterState .getState ().nodes ()) {
226
225
final MockTransportService senderTransportService =
227
- (MockTransportService ) getFollowerCluster ().getInstance (TransportService .class , senderNode .value . getName ());
226
+ (MockTransportService ) getFollowerCluster ().getInstance (TransportService .class , senderNode .getName ());
228
227
senderTransportService .addSendBehavior (
229
228
(connection , requestId , action , request , options ) -> {
230
229
if (ClearCcrRestoreSessionAction .NAME .equals (action )
@@ -246,9 +245,9 @@ public void testRetentionLeaseIsRenewedDuringRecovery() throws Exception {
246
245
assertRetentionLeaseRenewal (numberOfShards , numberOfReplicas , followerIndex , leaderIndex );
247
246
latch .countDown ();
248
247
} finally {
249
- for (final ObjectCursor < DiscoveryNode > senderNode : followerClusterState .getState ().nodes (). getDataNodes (). values ()) {
248
+ for (final DiscoveryNode senderNode : followerClusterState .getState ().nodes ()) {
250
249
final MockTransportService senderTransportService =
251
- (MockTransportService ) getFollowerCluster ().getInstance (TransportService .class , senderNode .value . getName ());
250
+ (MockTransportService ) getFollowerCluster ().getInstance (TransportService .class , senderNode .getName ());
252
251
senderTransportService .clearAllRules ();
253
252
}
254
253
}
@@ -403,9 +402,9 @@ public void testUnfollowRemovesRetentionLeases() throws Exception {
403
402
404
403
final ClusterStateResponse followerClusterState = followerClient ().admin ().cluster ().prepareState ().clear ().setNodes (true ).get ();
405
404
try {
406
- for (final ObjectCursor < DiscoveryNode > senderNode : followerClusterState .getState ().nodes (). getNodes (). values ()) {
405
+ for (final DiscoveryNode senderNode : followerClusterState .getState ().nodes ()) {
407
406
final MockTransportService senderTransportService =
408
- (MockTransportService ) getFollowerCluster ().getInstance (TransportService .class , senderNode .value . getName ());
407
+ (MockTransportService ) getFollowerCluster ().getInstance (TransportService .class , senderNode .getName ());
409
408
senderTransportService .addSendBehavior (
410
409
(connection , requestId , action , request , options ) -> {
411
410
if (RetentionLeaseActions .Remove .ACTION_NAME .equals (action )
@@ -454,9 +453,9 @@ public void testUnfollowRemovesRetentionLeases() throws Exception {
454
453
assertThat (shardStats .getRetentionLeaseStats ().retentionLeases ().leases (), empty ());
455
454
}
456
455
} finally {
457
- for (final ObjectCursor < DiscoveryNode > senderNode : followerClusterState .getState ().nodes (). getDataNodes (). values ()) {
456
+ for (final DiscoveryNode senderNode : followerClusterState .getState ().nodes ()) {
458
457
final MockTransportService senderTransportService =
459
- (MockTransportService ) getFollowerCluster ().getInstance (TransportService .class , senderNode .value . getName ());
458
+ (MockTransportService ) getFollowerCluster ().getInstance (TransportService .class , senderNode .getName ());
460
459
senderTransportService .clearAllRules ();
461
460
}
462
461
}
@@ -486,9 +485,9 @@ public void testUnfollowFailsToRemoveRetentionLeases() throws Exception {
486
485
487
486
final ClusterStateResponse followerClusterState = followerClient ().admin ().cluster ().prepareState ().clear ().setNodes (true ).get ();
488
487
try {
489
- for (final ObjectCursor < DiscoveryNode > senderNode : followerClusterState .getState ().nodes (). getNodes (). values ()) {
488
+ for (final DiscoveryNode senderNode : followerClusterState .getState ().nodes ()) {
490
489
final MockTransportService senderTransportService =
491
- (MockTransportService ) getFollowerCluster ().getInstance (TransportService .class , senderNode .value . getName ());
490
+ (MockTransportService ) getFollowerCluster ().getInstance (TransportService .class , senderNode .getName ());
492
491
senderTransportService .addSendBehavior (
493
492
(connection , requestId , action , request , options ) -> {
494
493
if (RetentionLeaseActions .Remove .ACTION_NAME .equals (action )
@@ -524,9 +523,9 @@ public void testUnfollowFailsToRemoveRetentionLeases() throws Exception {
524
523
getLeaderCluster ().getClusterName (),
525
524
new Index (leaderIndex , leaderUUID ))));
526
525
} finally {
527
- for (final ObjectCursor < DiscoveryNode > senderNode : followerClusterState .getState ().nodes (). getDataNodes (). values ()) {
526
+ for (final DiscoveryNode senderNode : followerClusterState .getState ().nodes ()) {
528
527
final MockTransportService senderTransportService =
529
- (MockTransportService ) getFollowerCluster ().getInstance (TransportService .class , senderNode .value . getName ());
528
+ (MockTransportService ) getFollowerCluster ().getInstance (TransportService .class , senderNode .getName ());
530
529
senderTransportService .clearAllRules ();
531
530
}
532
531
}
@@ -614,7 +613,6 @@ public void testRetentionLeaseAdvancesWhileFollowing() throws Exception {
614
613
});
615
614
}
616
615
617
- @ AwaitsFix (bugUrl = "https://github.com/elastic/elasticsearch/issues/39509" )
618
616
public void testRetentionLeaseRenewalIsCancelledWhenFollowingIsPaused () throws Exception {
619
617
final String leaderIndex = "leader" ;
620
618
final String followerIndex = "follower" ;
@@ -765,35 +763,36 @@ public void testRetentionLeaseIsAddedIfItDisappearsWhileFollowing() throws Excep
765
763
final CountDownLatch latch = new CountDownLatch (1 );
766
764
767
765
final ClusterStateResponse followerClusterState = followerClient ().admin ().cluster ().prepareState ().clear ().setNodes (true ).get ();
768
- for (final ObjectCursor <DiscoveryNode > senderNode : followerClusterState .getState ().nodes ().getNodes ().values ()) {
769
- final MockTransportService senderTransportService =
770
- (MockTransportService ) getFollowerCluster ().getInstance (TransportService .class , senderNode .value .getName ());
771
- senderTransportService .addSendBehavior (
766
+ try {
767
+ for (final DiscoveryNode senderNode : followerClusterState .getState ().nodes ()) {
768
+ final MockTransportService senderTransportService =
769
+ (MockTransportService ) getFollowerCluster ().getInstance (TransportService .class , senderNode .getName ());
770
+ senderTransportService .addSendBehavior (
772
771
(connection , requestId , action , request , options ) -> {
773
772
if (RetentionLeaseActions .Renew .ACTION_NAME .equals (action )
774
- || TransportActionProxy .getProxyAction (RetentionLeaseActions .Renew .ACTION_NAME ).equals (action )) {
773
+ || TransportActionProxy .getProxyAction (RetentionLeaseActions .Renew .ACTION_NAME ).equals (action )) {
775
774
senderTransportService .clearAllRules ();
776
775
final RetentionLeaseActions .RenewRequest renewRequest = (RetentionLeaseActions .RenewRequest ) request ;
777
776
final String primaryShardNodeId =
778
- getLeaderCluster ()
779
- .clusterService ()
780
- .state ()
781
- .routingTable ()
782
- .index (leaderIndex )
783
- .shard (renewRequest .getShardId ().id ())
784
- .primaryShard ()
785
- .currentNodeId ();
777
+ getLeaderCluster ()
778
+ .clusterService ()
779
+ .state ()
780
+ .routingTable ()
781
+ .index (leaderIndex )
782
+ .shard (renewRequest .getShardId ().id ())
783
+ .primaryShard ()
784
+ .currentNodeId ();
786
785
final String primaryShardNodeName =
787
- getLeaderCluster ().clusterService ().state ().nodes ().get (primaryShardNodeId ).getName ();
786
+ getLeaderCluster ().clusterService ().state ().nodes ().get (primaryShardNodeId ).getName ();
788
787
final IndexShard primary =
789
- getLeaderCluster ()
790
- .getInstance (IndicesService .class , primaryShardNodeName )
791
- .getShardOrNull (renewRequest .getShardId ());
788
+ getLeaderCluster ()
789
+ .getInstance (IndicesService .class , primaryShardNodeName )
790
+ .getShardOrNull (renewRequest .getShardId ());
792
791
final CountDownLatch innerLatch = new CountDownLatch (1 );
793
792
// this forces the background renewal from following to face a retention lease not found exception
794
793
primary .removeRetentionLease (
795
- getRetentionLeaseId (followerIndex , leaderIndex ),
796
- ActionListener .wrap (r -> innerLatch .countDown (), e -> fail (e .toString ())));
794
+ getRetentionLeaseId (followerIndex , leaderIndex ),
795
+ ActionListener .wrap (r -> innerLatch .countDown (), e -> fail (e .toString ())));
797
796
798
797
try {
799
798
innerLatch .await ();
@@ -806,11 +805,18 @@ public void testRetentionLeaseIsAddedIfItDisappearsWhileFollowing() throws Excep
806
805
}
807
806
connection .sendRequest (requestId , action , request , options );
808
807
});
809
- }
808
+ }
810
809
811
- latch .await ();
810
+ latch .await ();
812
811
813
- assertRetentionLeaseRenewal (numberOfShards , numberOfReplicas , followerIndex , leaderIndex );
812
+ assertRetentionLeaseRenewal (numberOfShards , numberOfReplicas , followerIndex , leaderIndex );
813
+ } finally {
814
+ for (final DiscoveryNode senderNode : followerClusterState .getState ().nodes ()) {
815
+ final MockTransportService senderTransportService =
816
+ (MockTransportService ) getFollowerCluster ().getInstance (TransportService .class , senderNode .getName ());
817
+ senderTransportService .clearAllRules ();
818
+ }
819
+ }
814
820
}
815
821
816
822
/**
@@ -857,9 +863,9 @@ public void testPeriodicRenewalDoesNotAddRetentionLeaseAfterUnfollow() throws Ex
857
863
final ClusterStateResponse followerClusterState = followerClient ().admin ().cluster ().prepareState ().clear ().setNodes (true ).get ();
858
864
859
865
try {
860
- for (final ObjectCursor < DiscoveryNode > senderNode : followerClusterState .getState ().nodes (). getNodes (). values ()) {
866
+ for (final DiscoveryNode senderNode : followerClusterState .getState ().nodes ()) {
861
867
final MockTransportService senderTransportService =
862
- (MockTransportService ) getFollowerCluster ().getInstance (TransportService .class , senderNode .value . getName ());
868
+ (MockTransportService ) getFollowerCluster ().getInstance (TransportService .class , senderNode .getName ());
863
869
senderTransportService .addSendBehavior (
864
870
(connection , requestId , action , request , options ) -> {
865
871
if (RetentionLeaseActions .Renew .ACTION_NAME .equals (action )
@@ -913,9 +919,9 @@ public void onResponseReceived(
913
919
assertThat (shardStats .getRetentionLeaseStats ().retentionLeases ().leases (), empty ());
914
920
}
915
921
} finally {
916
- for (final ObjectCursor < DiscoveryNode > senderNode : followerClusterState .getState ().nodes (). getDataNodes (). values ()) {
922
+ for (final DiscoveryNode senderNode : followerClusterState .getState ().nodes ()) {
917
923
final MockTransportService senderTransportService =
918
- (MockTransportService ) getFollowerCluster ().getInstance (TransportService .class , senderNode . value .getName ());
924
+ (MockTransportService ) getFollowerCluster ().getInstance (TransportService .class , senderNode .getName ());
919
925
senderTransportService .clearAllRules ();
920
926
}
921
927
}
0 commit comments