23
23
import org .elasticsearch .common .settings .Settings ;
24
24
import org .elasticsearch .common .unit .ByteSizeUnit ;
25
25
import org .elasticsearch .common .unit .TimeValue ;
26
+ import org .elasticsearch .index .snapshots .IndexShardSnapshotStatus ;
26
27
import org .elasticsearch .plugins .Plugin ;
27
28
import org .elasticsearch .snapshots .mockstore .MockRepository ;
28
29
import org .elasticsearch .test .ESIntegTestCase ;
31
32
32
33
import java .util .Arrays ;
33
34
import java .util .Collection ;
35
+ import java .util .List ;
34
36
import java .util .concurrent .TimeUnit ;
37
+ import java .util .stream .Collectors ;
35
38
36
39
import static org .elasticsearch .test .hamcrest .ElasticsearchAssertions .assertAcked ;
37
40
import static org .hamcrest .Matchers .equalTo ;
41
+ import static org .hamcrest .Matchers .everyItem ;
42
+ import static org .hamcrest .Matchers .hasSize ;
38
43
39
44
@ ESIntegTestCase .ClusterScope (scope = ESIntegTestCase .Scope .TEST , numDataNodes = 0 , transportClientRatio = 0 )
40
45
public class SnapshotShardsServiceIT extends AbstractSnapshotIntegTestCase {
@@ -71,6 +76,9 @@ public void testRetryPostingSnapshotStatusMessages() throws Exception {
71
76
.get ();
72
77
waitForBlock (blockedNode , "test-repo" , TimeValue .timeValueSeconds (60 ));
73
78
79
+ final SnapshotId snapshotId = client ().admin ().cluster ().prepareGetSnapshots ("test-repo" ).setSnapshots ("test-snap" )
80
+ .get ().getSnapshots ().get (0 ).snapshotId ();
81
+
74
82
logger .info ("--> start disrupting cluster" );
75
83
final NetworkDisruption networkDisruption = new NetworkDisruption (new NetworkDisruption .TwoPartitions (masterNode , dataNode ),
76
84
NetworkDisruption .NetworkDelay .random (random ()));
@@ -79,16 +87,27 @@ public void testRetryPostingSnapshotStatusMessages() throws Exception {
79
87
80
88
logger .info ("--> unblocking repository" );
81
89
unblockNode ("test-repo" , blockedNode );
82
- Thread .sleep (200 );
90
+
91
+ // Retrieve snapshot status from the data node.
92
+ SnapshotShardsService snapshotShardsService = internalCluster ().getInstance (SnapshotShardsService .class , blockedNode );
93
+ assertBusy (() -> {
94
+ final Snapshot snapshot = new Snapshot ("test-repo" , snapshotId );
95
+ List <IndexShardSnapshotStatus .Stage > stages = snapshotShardsService .currentSnapshotShards (snapshot )
96
+ .values ().stream ().map (IndexShardSnapshotStatus ::stage ).collect (Collectors .toList ());
97
+ assertThat (stages , hasSize (shards ));
98
+ assertThat (stages , everyItem (equalTo (IndexShardSnapshotStatus .Stage .DONE )));
99
+ });
100
+
83
101
logger .info ("--> stop disrupting cluster" );
102
+ networkDisruption .stopDisrupting ();
84
103
internalCluster ().clearDisruptionScheme (true );
85
104
86
105
assertBusy (() -> {
87
106
GetSnapshotsResponse snapshotsStatusResponse = client ().admin ().cluster ()
88
107
.prepareGetSnapshots ("test-repo" )
89
108
.setSnapshots ("test-snap" ).get ();
90
- logger .info ("Status size [{}]" , snapshotsStatusResponse .getSnapshots ().get (0 ).successfulShards ());
91
109
SnapshotInfo snapshotInfo = snapshotsStatusResponse .getSnapshots ().get (0 );
110
+ logger .info ("Snapshot status [{}], successfulShards [{}]" , snapshotInfo .state (), snapshotInfo .successfulShards ());
92
111
assertThat (snapshotInfo .state (), equalTo (SnapshotState .SUCCESS ));
93
112
assertThat (snapshotInfo .successfulShards (), equalTo (shards ));
94
113
}, 10 , TimeUnit .SECONDS );
0 commit comments