7
7
*/
8
8
package org .elasticsearch .repositories .blobstore ;
9
9
10
+ import org .elasticsearch .ExceptionsHelper ;
11
+ import org .elasticsearch .action .ActionFuture ;
10
12
import org .elasticsearch .action .ActionRunnable ;
13
+ import org .elasticsearch .action .admin .cluster .repositories .cleanup .CleanupRepositoryResponse ;
11
14
import org .elasticsearch .action .admin .cluster .snapshots .create .CreateSnapshotResponse ;
12
15
import org .elasticsearch .action .support .PlainActionFuture ;
13
16
import org .elasticsearch .cluster .RepositoryCleanupInProgress ;
17
20
import org .elasticsearch .snapshots .SnapshotState ;
18
21
import org .elasticsearch .test .ESIntegTestCase ;
19
22
23
+ import java .io .IOException ;
20
24
import java .util .concurrent .ExecutionException ;
21
25
22
26
import static org .elasticsearch .test .hamcrest .ElasticsearchAssertions .assertFutureThrows ;
27
+ import static org .hamcrest .Matchers .instanceOf ;
23
28
import static org .hamcrest .Matchers .is ;
24
29
25
30
@ ESIntegTestCase .ClusterScope (scope = ESIntegTestCase .Scope .TEST , numDataNodes = 0 )
26
31
public class BlobStoreRepositoryCleanupIT extends AbstractSnapshotIntegTestCase {
27
32
28
33
public void testMasterFailoverDuringCleanup () throws Exception {
29
- startBlockedCleanup ("test-repo" );
34
+ final ActionFuture < CleanupRepositoryResponse > cleanupFuture = startBlockedCleanup ("test-repo" );
30
35
31
36
final int nodeCount = internalCluster ().numDataAndMasterNodes ();
32
37
logger .info ("--> stopping master node" );
@@ -37,10 +42,12 @@ public void testMasterFailoverDuringCleanup() throws Exception {
37
42
logger .info ("--> wait for cleanup to finish and disappear from cluster state" );
38
43
awaitClusterState (state ->
39
44
state .custom (RepositoryCleanupInProgress .TYPE , RepositoryCleanupInProgress .EMPTY ).hasCleanupInProgress () == false );
45
+
46
+ cleanupFuture .get ();
40
47
}
41
48
42
49
public void testRepeatCleanupsDontRemove () throws Exception {
43
- final String masterNode = startBlockedCleanup ("test-repo" );
50
+ final ActionFuture < CleanupRepositoryResponse > cleanupFuture = startBlockedCleanup ("test-repo" );
44
51
45
52
logger .info ("--> sending another cleanup" );
46
53
assertFutureThrows (client ().admin ().cluster ().prepareCleanupRepository ("test-repo" ).execute (), IllegalStateException .class );
@@ -51,14 +58,19 @@ public void testRepeatCleanupsDontRemove() throws Exception {
51
58
assertTrue (cleanup .hasCleanupInProgress ());
52
59
53
60
logger .info ("--> unblocking master node" );
54
- unblockNode ("test-repo" , masterNode );
61
+ unblockNode ("test-repo" , internalCluster (). getMasterName () );
55
62
56
63
logger .info ("--> wait for cleanup to finish and disappear from cluster state" );
57
64
awaitClusterState (state ->
58
65
state .custom (RepositoryCleanupInProgress .TYPE , RepositoryCleanupInProgress .EMPTY ).hasCleanupInProgress () == false );
66
+
67
+ final ExecutionException e = expectThrows (ExecutionException .class , cleanupFuture ::get );
68
+ final Throwable ioe = ExceptionsHelper .unwrap (e , IOException .class );
69
+ assertThat (ioe , instanceOf (IOException .class ));
70
+ assertThat (ioe .getMessage (), is ("exception after block" ));
59
71
}
60
72
61
- private String startBlockedCleanup (String repoName ) throws Exception {
73
+ private ActionFuture < CleanupRepositoryResponse > startBlockedCleanup (String repoName ) throws Exception {
62
74
logger .info ("--> starting two master nodes and one data node" );
63
75
internalCluster ().startMasterOnlyNodes (2 );
64
76
internalCluster ().startDataOnlyNodes (1 );
@@ -80,13 +92,16 @@ private String startBlockedCleanup(String repoName) throws Exception {
80
92
blockMasterFromFinalizingSnapshotOnIndexFile (repoName );
81
93
82
94
logger .info ("--> starting repository cleanup" );
83
- client ().admin ().cluster ().prepareCleanupRepository (repoName ).execute ();
95
+ // running from a non-master client because shutting down a master while a request to it is pending might result in the future
96
+ // never completing
97
+ final ActionFuture <CleanupRepositoryResponse > future =
98
+ internalCluster ().nonMasterClient ().admin ().cluster ().prepareCleanupRepository (repoName ).execute ();
84
99
85
100
final String masterNode = internalCluster ().getMasterName ();
86
101
waitForBlock (masterNode , repoName );
87
102
awaitClusterState (state ->
88
103
state .custom (RepositoryCleanupInProgress .TYPE , RepositoryCleanupInProgress .EMPTY ).hasCleanupInProgress ());
89
- return masterNode ;
104
+ return future ;
90
105
}
91
106
92
107
public void testCleanupOldIndexN () throws ExecutionException , InterruptedException {
0 commit comments