14
14
import org .elasticsearch .cluster .metadata .MetaData ;
15
15
import org .elasticsearch .cluster .service .ClusterService ;
16
16
import org .elasticsearch .common .bytes .BytesReference ;
17
+ import org .elasticsearch .common .settings .Settings ;
17
18
import org .elasticsearch .common .xcontent .ToXContent ;
18
19
import org .elasticsearch .common .xcontent .XContentBuilder ;
19
20
import org .elasticsearch .common .xcontent .json .JsonXContent ;
20
21
import org .elasticsearch .mock .orig .Mockito ;
21
22
import org .elasticsearch .search .SearchHit ;
22
23
import org .elasticsearch .search .SearchHits ;
23
24
import org .elasticsearch .test .ESTestCase ;
25
+ import org .elasticsearch .threadpool .FixedExecutorBuilder ;
26
+ import org .elasticsearch .threadpool .ThreadPool ;
24
27
import org .elasticsearch .xpack .core .ml .MLMetadataField ;
25
28
import org .elasticsearch .xpack .core .ml .MlMetadata ;
26
29
import org .elasticsearch .xpack .core .ml .action .DeleteModelSnapshotAction ;
27
30
import org .elasticsearch .xpack .core .ml .job .config .Job ;
28
31
import org .elasticsearch .xpack .core .ml .job .config .JobTests ;
29
32
import org .elasticsearch .xpack .core .ml .job .persistence .AnomalyDetectorsIndex ;
30
33
import org .elasticsearch .xpack .core .ml .job .process .autodetect .state .ModelSnapshot ;
34
+ import org .elasticsearch .xpack .ml .MachineLearning ;
35
+ import org .junit .After ;
31
36
import org .junit .Before ;
32
37
import org .mockito .invocation .InvocationOnMock ;
33
38
import org .mockito .stubbing .Answer ;
38
43
import java .util .HashMap ;
39
44
import java .util .List ;
40
45
import java .util .Map ;
46
+ import java .util .concurrent .CountDownLatch ;
47
+ import java .util .concurrent .TimeUnit ;
41
48
42
49
import static org .hamcrest .Matchers .equalTo ;
50
+ import static org .hamcrest .Matchers .is ;
43
51
import static org .mockito .Matchers .any ;
44
52
import static org .mockito .Matchers .same ;
45
53
import static org .mockito .Mockito .doAnswer ;
46
54
import static org .mockito .Mockito .mock ;
47
- import static org .mockito .Mockito .verify ;
48
55
import static org .mockito .Mockito .when ;
49
56
50
57
public class ExpiredModelSnapshotsRemoverTests extends ESTestCase {
51
58
52
59
private Client client ;
60
+ private ThreadPool threadPool ;
53
61
private ClusterService clusterService ;
54
62
private ClusterState clusterState ;
55
63
private List <SearchRequest > capturedSearchRequests ;
56
64
private List <DeleteModelSnapshotAction .Request > capturedDeleteModelSnapshotRequests ;
57
65
private List <SearchResponse > searchResponsesPerCall ;
58
- private ActionListener < Boolean > listener ;
66
+ private TestListener listener ;
59
67
60
68
@ Before
61
69
public void setUpTests () {
@@ -66,7 +74,19 @@ public void setUpTests() {
66
74
clusterState = mock (ClusterState .class );
67
75
when (clusterService .state ()).thenReturn (clusterState );
68
76
client = mock (Client .class );
69
- listener = mock (ActionListener .class );
77
+ listener = new TestListener ();
78
+
79
+ // Init thread pool
80
+ Settings settings = Settings .builder ()
81
+ .put ("node.name" , "expired_model_snapshots_remover_test" )
82
+ .build ();
83
+ threadPool = new ThreadPool (settings ,
84
+ new FixedExecutorBuilder (settings , MachineLearning .UTILITY_THREAD_POOL_NAME , 1 , 1000 , "" ));
85
+ }
86
+
87
+ @ After
88
+ public void shutdownThreadPool () throws InterruptedException {
89
+ terminate (threadPool );
70
90
}
71
91
72
92
public void testRemove_GivenJobsWithoutRetentionPolicy () {
@@ -78,7 +98,8 @@ public void testRemove_GivenJobsWithoutRetentionPolicy() {
78
98
79
99
createExpiredModelSnapshotsRemover ().remove (listener );
80
100
81
- verify (listener ).onResponse (true );
101
+ listener .waitToCompletion ();
102
+ assertThat (listener .success , is (true ));
82
103
Mockito .verifyNoMoreInteractions (client );
83
104
}
84
105
@@ -88,7 +109,8 @@ public void testRemove_GivenJobWithoutActiveSnapshot() {
88
109
89
110
createExpiredModelSnapshotsRemover ().remove (listener );
90
111
91
- verify (listener ).onResponse (true );
112
+ listener .waitToCompletion ();
113
+ assertThat (listener .success , is (true ));
92
114
Mockito .verifyNoMoreInteractions (client );
93
115
}
94
116
@@ -108,6 +130,9 @@ public void testRemove_GivenJobsWithMixedRetentionPolicies() throws IOException
108
130
109
131
createExpiredModelSnapshotsRemover ().remove (listener );
110
132
133
+ listener .waitToCompletion ();
134
+ assertThat (listener .success , is (true ));
135
+
111
136
assertThat (capturedSearchRequests .size (), equalTo (2 ));
112
137
SearchRequest searchRequest = capturedSearchRequests .get (0 );
113
138
assertThat (searchRequest .indices (), equalTo (new String [] {AnomalyDetectorsIndex .jobResultsAliasedName ("snapshots-1" )}));
@@ -124,8 +149,6 @@ public void testRemove_GivenJobsWithMixedRetentionPolicies() throws IOException
124
149
deleteSnapshotRequest = capturedDeleteModelSnapshotRequests .get (2 );
125
150
assertThat (deleteSnapshotRequest .getJobId (), equalTo ("snapshots-2" ));
126
151
assertThat (deleteSnapshotRequest .getSnapshotId (), equalTo ("snapshots-2_1" ));
127
-
128
- verify (listener ).onResponse (true );
129
152
}
130
153
131
154
public void testRemove_GivenClientSearchRequestsFail () throws IOException {
@@ -144,13 +167,14 @@ public void testRemove_GivenClientSearchRequestsFail() throws IOException {
144
167
145
168
createExpiredModelSnapshotsRemover ().remove (listener );
146
169
170
+ listener .waitToCompletion ();
171
+ assertThat (listener .success , is (false ));
172
+
147
173
assertThat (capturedSearchRequests .size (), equalTo (1 ));
148
174
SearchRequest searchRequest = capturedSearchRequests .get (0 );
149
175
assertThat (searchRequest .indices (), equalTo (new String [] {AnomalyDetectorsIndex .jobResultsAliasedName ("snapshots-1" )}));
150
176
151
177
assertThat (capturedDeleteModelSnapshotRequests .size (), equalTo (0 ));
152
-
153
- verify (listener ).onFailure (any ());
154
178
}
155
179
156
180
public void testRemove_GivenClientDeleteSnapshotRequestsFail () throws IOException {
@@ -169,6 +193,9 @@ public void testRemove_GivenClientDeleteSnapshotRequestsFail() throws IOExceptio
169
193
170
194
createExpiredModelSnapshotsRemover ().remove (listener );
171
195
196
+ listener .waitToCompletion ();
197
+ assertThat (listener .success , is (false ));
198
+
172
199
assertThat (capturedSearchRequests .size (), equalTo (1 ));
173
200
SearchRequest searchRequest = capturedSearchRequests .get (0 );
174
201
assertThat (searchRequest .indices (), equalTo (new String [] {AnomalyDetectorsIndex .jobResultsAliasedName ("snapshots-1" )}));
@@ -177,8 +204,6 @@ public void testRemove_GivenClientDeleteSnapshotRequestsFail() throws IOExceptio
177
204
DeleteModelSnapshotAction .Request deleteSnapshotRequest = capturedDeleteModelSnapshotRequests .get (0 );
178
205
assertThat (deleteSnapshotRequest .getJobId (), equalTo ("snapshots-1" ));
179
206
assertThat (deleteSnapshotRequest .getSnapshotId (), equalTo ("snapshots-1_1" ));
180
-
181
- verify (listener ).onFailure (any ());
182
207
}
183
208
184
209
private void givenJobs (List <Job > jobs ) {
@@ -192,7 +217,7 @@ private void givenJobs(List<Job> jobs) {
192
217
}
193
218
194
219
private ExpiredModelSnapshotsRemover createExpiredModelSnapshotsRemover () {
195
- return new ExpiredModelSnapshotsRemover (client , clusterService );
220
+ return new ExpiredModelSnapshotsRemover (client , threadPool , clusterService );
196
221
}
197
222
198
223
private static ModelSnapshot createModelSnapshot (String jobId , String snapshotId ) {
@@ -230,7 +255,7 @@ private void givenClientRequests(boolean shouldSearchRequestsSucceed, boolean sh
230
255
int callCount = 0 ;
231
256
232
257
@ Override
233
- public Void answer (InvocationOnMock invocationOnMock ) throws Throwable {
258
+ public Void answer (InvocationOnMock invocationOnMock ) {
234
259
SearchRequest searchRequest = (SearchRequest ) invocationOnMock .getArguments ()[1 ];
235
260
capturedSearchRequests .add (searchRequest );
236
261
ActionListener <SearchResponse > listener = (ActionListener <SearchResponse >) invocationOnMock .getArguments ()[2 ];
@@ -244,7 +269,7 @@ public Void answer(InvocationOnMock invocationOnMock) throws Throwable {
244
269
}).when (client ).execute (same (SearchAction .INSTANCE ), any (), any ());
245
270
doAnswer (new Answer <Void >() {
246
271
@ Override
247
- public Void answer (InvocationOnMock invocationOnMock ) throws Throwable {
272
+ public Void answer (InvocationOnMock invocationOnMock ) {
248
273
capturedDeleteModelSnapshotRequests .add ((DeleteModelSnapshotAction .Request ) invocationOnMock .getArguments ()[1 ]);
249
274
ActionListener <DeleteModelSnapshotAction .Response > listener =
250
275
(ActionListener <DeleteModelSnapshotAction .Response >) invocationOnMock .getArguments ()[2 ];
@@ -257,4 +282,30 @@ public Void answer(InvocationOnMock invocationOnMock) throws Throwable {
257
282
}
258
283
}).when (client ).execute (same (DeleteModelSnapshotAction .INSTANCE ), any (), any ());
259
284
}
285
+
286
+ private class TestListener implements ActionListener <Boolean > {
287
+
288
+ private boolean success ;
289
+ private final CountDownLatch latch = new CountDownLatch (1 );
290
+
291
+ @ Override
292
+ public void onResponse (Boolean aBoolean ) {
293
+ success = aBoolean ;
294
+ latch .countDown ();
295
+ }
296
+
297
+ @ Override
298
+ public void onFailure (Exception e ) {
299
+ latch .countDown ();
300
+ }
301
+
302
+ public void waitToCompletion () {
303
+ try {
304
+ latch .await (10 , TimeUnit .SECONDS );
305
+ } catch (InterruptedException e ) {
306
+ fail ("listener timed out before completing" );
307
+ }
308
+ }
309
+ }
310
+
260
311
}
0 commit comments