6
6
7
7
package org .elasticsearch .xpack .ccr ;
8
8
9
- import com .carrotsearch .hppc .cursors .ObjectCursor ;
10
9
import org .elasticsearch .ElasticsearchException ;
11
10
import org .elasticsearch .action .ActionListener ;
12
11
import org .elasticsearch .action .admin .cluster .settings .ClusterUpdateSettingsRequest ;
44
43
import org .elasticsearch .plugins .Plugin ;
45
44
import org .elasticsearch .snapshots .RestoreInfo ;
46
45
import org .elasticsearch .snapshots .RestoreService ;
47
- import org .elasticsearch .test .junit .annotations .TestLogging ;
48
46
import org .elasticsearch .test .transport .MockTransportService ;
49
47
import org .elasticsearch .transport .ConnectTransportException ;
50
48
import org .elasticsearch .transport .RemoteTransportException ;
88
86
import static org .hamcrest .Matchers .greaterThan ;
89
87
import static org .hamcrest .Matchers .hasSize ;
90
88
91
- @ TestLogging (value = "org.elasticsearch.xpack.ccr:trace,org.elasticsearch.indices.recovery:trace,org.elasticsearch.index.seqno:debug" )
92
89
public class CcrRetentionLeaseIT extends CcrIntegTestCase {
93
90
94
91
public static final class RetentionLeaseRenewIntervalSettingPlugin extends Plugin {
@@ -224,9 +221,9 @@ public void testRetentionLeaseIsRenewedDuringRecovery() throws Exception {
224
221
225
222
// block the recovery from completing; this ensures the background sync is still running
226
223
final ClusterStateResponse followerClusterState = followerClient ().admin ().cluster ().prepareState ().clear ().setNodes (true ).get ();
227
- for (final ObjectCursor < DiscoveryNode > senderNode : followerClusterState .getState ().nodes (). getNodes (). values ()) {
224
+ for (final DiscoveryNode senderNode : followerClusterState .getState ().nodes ()) {
228
225
final MockTransportService senderTransportService =
229
- (MockTransportService ) getFollowerCluster ().getInstance (TransportService .class , senderNode .value . getName ());
226
+ (MockTransportService ) getFollowerCluster ().getInstance (TransportService .class , senderNode .getName ());
230
227
senderTransportService .addSendBehavior (
231
228
(connection , requestId , action , request , options ) -> {
232
229
if (ClearCcrRestoreSessionAction .NAME .equals (action )
@@ -248,9 +245,9 @@ public void testRetentionLeaseIsRenewedDuringRecovery() throws Exception {
248
245
assertRetentionLeaseRenewal (numberOfShards , numberOfReplicas , followerIndex , leaderIndex );
249
246
latch .countDown ();
250
247
} finally {
251
- for (final ObjectCursor < DiscoveryNode > senderNode : followerClusterState .getState ().nodes (). getDataNodes (). values ()) {
248
+ for (final DiscoveryNode senderNode : followerClusterState .getState ().nodes ()) {
252
249
final MockTransportService senderTransportService =
253
- (MockTransportService ) getFollowerCluster ().getInstance (TransportService .class , senderNode .value . getName ());
250
+ (MockTransportService ) getFollowerCluster ().getInstance (TransportService .class , senderNode .getName ());
254
251
senderTransportService .clearAllRules ();
255
252
}
256
253
}
@@ -405,9 +402,9 @@ public void testUnfollowRemovesRetentionLeases() throws Exception {
405
402
406
403
final ClusterStateResponse followerClusterState = followerClient ().admin ().cluster ().prepareState ().clear ().setNodes (true ).get ();
407
404
try {
408
- for (final ObjectCursor < DiscoveryNode > senderNode : followerClusterState .getState ().nodes (). getNodes (). values ()) {
405
+ for (final DiscoveryNode senderNode : followerClusterState .getState ().nodes ()) {
409
406
final MockTransportService senderTransportService =
410
- (MockTransportService ) getFollowerCluster ().getInstance (TransportService .class , senderNode .value . getName ());
407
+ (MockTransportService ) getFollowerCluster ().getInstance (TransportService .class , senderNode .getName ());
411
408
senderTransportService .addSendBehavior (
412
409
(connection , requestId , action , request , options ) -> {
413
410
if (RetentionLeaseActions .Remove .ACTION_NAME .equals (action )
@@ -456,9 +453,9 @@ public void testUnfollowRemovesRetentionLeases() throws Exception {
456
453
assertThat (Strings .toString (shardStats ), shardStats .getRetentionLeaseStats ().retentionLeases ().leases (), empty ());
457
454
}
458
455
} finally {
459
- for (final ObjectCursor < DiscoveryNode > senderNode : followerClusterState .getState ().nodes (). getDataNodes (). values ()) {
456
+ for (final DiscoveryNode senderNode : followerClusterState .getState ().nodes ()) {
460
457
final MockTransportService senderTransportService =
461
- (MockTransportService ) getFollowerCluster ().getInstance (TransportService .class , senderNode .value . getName ());
458
+ (MockTransportService ) getFollowerCluster ().getInstance (TransportService .class , senderNode .getName ());
462
459
senderTransportService .clearAllRules ();
463
460
}
464
461
}
@@ -488,9 +485,9 @@ public void testUnfollowFailsToRemoveRetentionLeases() throws Exception {
488
485
489
486
final ClusterStateResponse followerClusterState = followerClient ().admin ().cluster ().prepareState ().clear ().setNodes (true ).get ();
490
487
try {
491
- for (final ObjectCursor < DiscoveryNode > senderNode : followerClusterState .getState ().nodes (). getNodes (). values ()) {
488
+ for (final DiscoveryNode senderNode : followerClusterState .getState ().nodes ()) {
492
489
final MockTransportService senderTransportService =
493
- (MockTransportService ) getFollowerCluster ().getInstance (TransportService .class , senderNode .value . getName ());
490
+ (MockTransportService ) getFollowerCluster ().getInstance (TransportService .class , senderNode .getName ());
494
491
senderTransportService .addSendBehavior (
495
492
(connection , requestId , action , request , options ) -> {
496
493
if (RetentionLeaseActions .Remove .ACTION_NAME .equals (action )
@@ -526,9 +523,9 @@ public void testUnfollowFailsToRemoveRetentionLeases() throws Exception {
526
523
getLeaderCluster ().getClusterName (),
527
524
new Index (leaderIndex , leaderUUID ))));
528
525
} finally {
529
- for (final ObjectCursor < DiscoveryNode > senderNode : followerClusterState .getState ().nodes (). getDataNodes (). values ()) {
526
+ for (final DiscoveryNode senderNode : followerClusterState .getState ().nodes ()) {
530
527
final MockTransportService senderTransportService =
531
- (MockTransportService ) getFollowerCluster ().getInstance (TransportService .class , senderNode .value . getName ());
528
+ (MockTransportService ) getFollowerCluster ().getInstance (TransportService .class , senderNode .getName ());
532
529
senderTransportService .clearAllRules ();
533
530
}
534
531
}
@@ -766,35 +763,36 @@ public void testRetentionLeaseIsAddedIfItDisappearsWhileFollowing() throws Excep
766
763
final CountDownLatch latch = new CountDownLatch (1 );
767
764
768
765
final ClusterStateResponse followerClusterState = followerClient ().admin ().cluster ().prepareState ().clear ().setNodes (true ).get ();
769
- for (final ObjectCursor <DiscoveryNode > senderNode : followerClusterState .getState ().nodes ().getNodes ().values ()) {
770
- final MockTransportService senderTransportService =
771
- (MockTransportService ) getFollowerCluster ().getInstance (TransportService .class , senderNode .value .getName ());
772
- senderTransportService .addSendBehavior (
766
+ try {
767
+ for (final DiscoveryNode senderNode : followerClusterState .getState ().nodes ()) {
768
+ final MockTransportService senderTransportService =
769
+ (MockTransportService ) getFollowerCluster ().getInstance (TransportService .class , senderNode .getName ());
770
+ senderTransportService .addSendBehavior (
773
771
(connection , requestId , action , request , options ) -> {
774
772
if (RetentionLeaseActions .Renew .ACTION_NAME .equals (action )
775
- || TransportActionProxy .getProxyAction (RetentionLeaseActions .Renew .ACTION_NAME ).equals (action )) {
773
+ || TransportActionProxy .getProxyAction (RetentionLeaseActions .Renew .ACTION_NAME ).equals (action )) {
776
774
senderTransportService .clearAllRules ();
777
775
final RetentionLeaseActions .RenewRequest renewRequest = (RetentionLeaseActions .RenewRequest ) request ;
778
776
final String primaryShardNodeId =
779
- getLeaderCluster ()
780
- .clusterService ()
781
- .state ()
782
- .routingTable ()
783
- .index (leaderIndex )
784
- .shard (renewRequest .getShardId ().id ())
785
- .primaryShard ()
786
- .currentNodeId ();
777
+ getLeaderCluster ()
778
+ .clusterService ()
779
+ .state ()
780
+ .routingTable ()
781
+ .index (leaderIndex )
782
+ .shard (renewRequest .getShardId ().id ())
783
+ .primaryShard ()
784
+ .currentNodeId ();
787
785
final String primaryShardNodeName =
788
- getLeaderCluster ().clusterService ().state ().nodes ().get (primaryShardNodeId ).getName ();
786
+ getLeaderCluster ().clusterService ().state ().nodes ().get (primaryShardNodeId ).getName ();
789
787
final IndexShard primary =
790
- getLeaderCluster ()
791
- .getInstance (IndicesService .class , primaryShardNodeName )
792
- .getShardOrNull (renewRequest .getShardId ());
788
+ getLeaderCluster ()
789
+ .getInstance (IndicesService .class , primaryShardNodeName )
790
+ .getShardOrNull (renewRequest .getShardId ());
793
791
final CountDownLatch innerLatch = new CountDownLatch (1 );
794
792
// this forces the background renewal from following to face a retention lease not found exception
795
793
primary .removeRetentionLease (
796
- getRetentionLeaseId (followerIndex , leaderIndex ),
797
- ActionListener .wrap (r -> innerLatch .countDown (), e -> fail (e .toString ())));
794
+ getRetentionLeaseId (followerIndex , leaderIndex ),
795
+ ActionListener .wrap (r -> innerLatch .countDown (), e -> fail (e .toString ())));
798
796
799
797
try {
800
798
innerLatch .await ();
@@ -807,11 +805,18 @@ public void testRetentionLeaseIsAddedIfItDisappearsWhileFollowing() throws Excep
807
805
}
808
806
connection .sendRequest (requestId , action , request , options );
809
807
});
810
- }
808
+ }
811
809
812
- latch .await ();
810
+ latch .await ();
813
811
814
- assertRetentionLeaseRenewal (numberOfShards , numberOfReplicas , followerIndex , leaderIndex );
812
+ assertRetentionLeaseRenewal (numberOfShards , numberOfReplicas , followerIndex , leaderIndex );
813
+ } finally {
814
+ for (final DiscoveryNode senderNode : followerClusterState .getState ().nodes ()) {
815
+ final MockTransportService senderTransportService =
816
+ (MockTransportService ) getFollowerCluster ().getInstance (TransportService .class , senderNode .getName ());
817
+ senderTransportService .clearAllRules ();
818
+ }
819
+ }
815
820
}
816
821
817
822
/**
@@ -858,9 +863,9 @@ public void testPeriodicRenewalDoesNotAddRetentionLeaseAfterUnfollow() throws Ex
858
863
final ClusterStateResponse followerClusterState = followerClient ().admin ().cluster ().prepareState ().clear ().setNodes (true ).get ();
859
864
860
865
try {
861
- for (final ObjectCursor < DiscoveryNode > senderNode : followerClusterState .getState ().nodes (). getNodes (). values ()) {
866
+ for (final DiscoveryNode senderNode : followerClusterState .getState ().nodes ()) {
862
867
final MockTransportService senderTransportService =
863
- (MockTransportService ) getFollowerCluster ().getInstance (TransportService .class , senderNode .value . getName ());
868
+ (MockTransportService ) getFollowerCluster ().getInstance (TransportService .class , senderNode .getName ());
864
869
senderTransportService .addSendBehavior (
865
870
(connection , requestId , action , request , options ) -> {
866
871
if (RetentionLeaseActions .Renew .ACTION_NAME .equals (action )
@@ -914,9 +919,9 @@ public void onResponseReceived(
914
919
assertThat (Strings .toString (shardStats ), shardStats .getRetentionLeaseStats ().retentionLeases ().leases (), empty ());
915
920
}
916
921
} finally {
917
- for (final ObjectCursor < DiscoveryNode > senderNode : followerClusterState .getState ().nodes (). getDataNodes (). values ()) {
922
+ for (final DiscoveryNode senderNode : followerClusterState .getState ().nodes ()) {
918
923
final MockTransportService senderTransportService =
919
- (MockTransportService ) getFollowerCluster ().getInstance (TransportService .class , senderNode . value .getName ());
924
+ (MockTransportService ) getFollowerCluster ().getInstance (TransportService .class , senderNode .getName ());
920
925
senderTransportService .clearAllRules ();
921
926
}
922
927
}
0 commit comments