|
15 | 15 | import org.elasticsearch.client.ResponseException;
|
16 | 16 | import org.elasticsearch.client.RestClient;
|
17 | 17 | import org.elasticsearch.cluster.metadata.IndexMetaData;
|
| 18 | +import org.elasticsearch.common.CheckedRunnable; |
18 | 19 | import org.elasticsearch.common.Nullable;
|
19 | 20 | import org.elasticsearch.common.Strings;
|
20 | 21 | import org.elasticsearch.common.settings.Settings;
|
@@ -329,64 +330,78 @@ public void testAllocateActionOnlyReplicas() throws Exception {
|
329 | 330 | });
|
330 | 331 | }
|
331 | 332 |
|
332 |
| - @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/50781") |
333 | 333 | public void testWaitForSnapshot() throws Exception {
|
334 | 334 | createIndexWithSettings(index, Settings.builder().put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
|
335 | 335 | .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0));
|
336 |
| - String smlPolicy = randomAlphaOfLengthBetween(4, 10); |
337 |
| - createNewSingletonPolicy("delete", new WaitForSnapshotAction(smlPolicy)); |
| 336 | + String slmPolicy = randomAlphaOfLengthBetween(4, 10); |
| 337 | + createNewSingletonPolicy("delete", new WaitForSnapshotAction(slmPolicy)); |
338 | 338 | updatePolicy(index, policy);
|
339 |
| - assertBusy(() -> assertThat(getStepKeyForIndex(index).getAction(), equalTo("wait_for_snapshot"))); |
340 |
| - assertBusy(() -> assertThat(getFailedStepForIndex(index), equalTo("wait-for-snapshot"))); |
| 339 | + assertBusy( () -> { |
| 340 | + Map<String, Object> indexILMState = explainIndex(index); |
| 341 | + assertThat(indexILMState.get("action"), is("wait_for_snapshot")); |
| 342 | + assertThat(indexILMState.get("failed_step"), is("wait-for-snapshot")); |
| 343 | + }, slmPolicy); |
341 | 344 |
|
342 | 345 | String repo = createSnapshotRepo();
|
343 |
| - createSlmPolicy(smlPolicy, repo); |
| 346 | + createSlmPolicy(slmPolicy, repo); |
344 | 347 |
|
345 |
| - assertBusy(() -> assertThat(getStepKeyForIndex(index).getAction(), equalTo("wait_for_snapshot"))); |
| 348 | + assertBusy( () -> { |
| 349 | + Map<String, Object> indexILMState = explainIndex(index); |
| 350 | + //wait for step to notice that the slm policy is created and to get out of error |
| 351 | + assertThat(indexILMState.get("failed_step"), nullValue()); |
| 352 | + assertThat(indexILMState.get("action"), is("wait_for_snapshot")); |
| 353 | + assertThat(indexILMState.get("step"), is("wait-for-snapshot")); |
| 354 | + }, slmPolicy); |
346 | 355 |
|
347 |
| - Request request = new Request("PUT", "/_slm/policy/" + smlPolicy + "/_execute"); |
| 356 | + Request request = new Request("PUT", "/_slm/policy/" + slmPolicy + "/_execute"); |
348 | 357 | assertOK(client().performRequest(request));
|
349 | 358 |
|
350 |
| - |
351 |
| - assertBusy(() -> assertThat(getStepKeyForIndex(index).getAction(), equalTo("completed")), 2, TimeUnit.MINUTES); |
352 |
| - |
353 |
| - request = new Request("DELETE", "/_slm/policy/" + smlPolicy); |
354 |
| - assertOK(client().performRequest(request)); |
355 |
| - |
356 |
| - request = new Request("DELETE", "/_snapshot/" + repo); |
357 |
| - assertOK(client().performRequest(request)); |
| 359 | + assertBusy(() -> assertThat(getStepKeyForIndex(index).getAction(), equalTo("complete")), slmPolicy); |
358 | 360 | }
|
359 | 361 |
|
360 |
| - @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/50781") |
361 | 362 | public void testWaitForSnapshotSlmExecutedBefore() throws Exception {
|
362 | 363 | createIndexWithSettings(index, Settings.builder().put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
|
363 | 364 | .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0));
|
364 |
| - String smlPolicy = randomAlphaOfLengthBetween(4, 10); |
365 |
| - createNewSingletonPolicy("delete", new WaitForSnapshotAction(smlPolicy)); |
| 365 | + String slmPolicy = randomAlphaOfLengthBetween(4, 10); |
| 366 | + createNewSingletonPolicy("delete", new WaitForSnapshotAction(slmPolicy)); |
366 | 367 |
|
367 | 368 | String repo = createSnapshotRepo();
|
368 |
| - createSlmPolicy(smlPolicy, repo); |
| 369 | + createSlmPolicy(slmPolicy, repo); |
369 | 370 |
|
370 |
| - Request request = new Request("PUT", "/_slm/policy/" + smlPolicy + "/_execute"); |
| 371 | + Request request = new Request("PUT", "/_slm/policy/" + slmPolicy + "/_execute"); |
371 | 372 | assertOK(client().performRequest(request));
|
372 | 373 |
|
| 374 | + //wait for slm to finish execution |
| 375 | + assertBusy(() -> { |
| 376 | + Response response = client().performRequest(new Request("GET", "/_slm/policy/" + slmPolicy)); |
| 377 | + try (InputStream is = response.getEntity().getContent()) { |
| 378 | + Map<String, Object> responseMap = XContentHelper.convertToMap(XContentType.JSON.xContent(), is, true); |
| 379 | + assertEquals(1, ((Map<?, ?>) ((Map<?, ?>) responseMap.get(slmPolicy)).get("stats")).get("snapshots_taken")); |
| 380 | + } |
| 381 | + }, slmPolicy); |
| 382 | + |
373 | 383 | updatePolicy(index, policy);
|
374 |
| - assertBusy(() -> assertThat(getStepKeyForIndex(index).getAction(), equalTo("wait_for_snapshot"))); |
375 |
| - assertBusy(() -> assertThat(getStepKeyForIndex(index).getName(), equalTo("wait-for-snapshot"))); |
376 | 384 |
|
377 |
| - request = new Request("PUT", "/_slm/policy/" + smlPolicy + "/_execute"); |
378 |
| - assertOK(client().performRequest(request)); |
| 385 | + assertBusy( () -> { |
| 386 | + Map<String, Object> indexILMState = explainIndex(index); |
| 387 | + assertThat(indexILMState.get("failed_step"), nullValue()); |
| 388 | + assertThat(indexILMState.get("action"), is("wait_for_snapshot")); |
| 389 | + assertThat(indexILMState.get("step"), is("wait-for-snapshot")); |
| 390 | + }, slmPolicy); |
379 | 391 |
|
380 |
| - request = new Request("PUT", "/_slm/policy/" + smlPolicy + "/_execute"); |
| 392 | + request = new Request("PUT", "/_slm/policy/" + slmPolicy + "/_execute"); |
381 | 393 | assertOK(client().performRequest(request));
|
382 | 394 |
|
383 |
| - assertBusy(() -> assertThat(getStepKeyForIndex(index).getAction(), equalTo("completed")), 2, TimeUnit.MINUTES); |
384 |
| - |
385 |
| - request = new Request("DELETE", "/_slm/policy/" + smlPolicy); |
386 |
| - assertOK(client().performRequest(request)); |
| 395 | + //wait for slm to finish execution |
| 396 | + assertBusy(() -> { |
| 397 | + Response response = client().performRequest(new Request("GET", "/_slm/policy/" + slmPolicy)); |
| 398 | + try (InputStream is = response.getEntity().getContent()) { |
| 399 | + Map<String, Object> responseMap = XContentHelper.convertToMap(XContentType.JSON.xContent(), is, true); |
| 400 | + assertEquals(2, ((Map<?, ?>) ((Map<?, ?>) responseMap.get(slmPolicy)).get("stats")).get("snapshots_taken")); |
| 401 | + } |
| 402 | + }, slmPolicy); |
387 | 403 |
|
388 |
| - request = new Request("DELETE", "/_snapshot/" + repo); |
389 |
| - assertOK(client().performRequest(request)); |
| 404 | + assertBusy(() -> assertThat(getStepKeyForIndex(index).getAction(), equalTo("complete")), slmPolicy); |
390 | 405 | }
|
391 | 406 |
|
392 | 407 | public void testDelete() throws Exception {
|
@@ -1811,11 +1826,29 @@ private String createSnapshotRepo() throws IOException {
|
1811 | 1826 | .field("type", "fs")
|
1812 | 1827 | .startObject("settings")
|
1813 | 1828 | .field("compress", randomBoolean())
|
1814 |
| - .field("location", System.getProperty("tests.path.repo")) |
1815 |
| - .field("max_snapshot_bytes_per_sec", "256b") |
| 1829 | + //random location to avoid clash with other snapshots |
| 1830 | + .field("location", System.getProperty("tests.path.repo")+ "/" + randomAlphaOfLengthBetween(4, 10)) |
| 1831 | + .field("max_snapshot_bytes_per_sec", "100m") |
1816 | 1832 | .endObject()
|
1817 | 1833 | .endObject()));
|
1818 | 1834 | assertOK(client().performRequest(request));
|
1819 | 1835 | return repo;
|
1820 | 1836 | }
|
| 1837 | + |
| 1838 | + //adds debug information for waitForSnapshot tests |
| 1839 | + private void assertBusy(CheckedRunnable<Exception> runnable, String slmPolicy) throws Exception { |
| 1840 | + assertBusy(() -> { |
| 1841 | + try { |
| 1842 | + runnable.run(); |
| 1843 | + } catch (AssertionError e) { |
| 1844 | + Map<String, Object> slm; |
| 1845 | + try (InputStream is = client().performRequest(new Request("GET", "/_slm/policy/" + slmPolicy)).getEntity().getContent()) { |
| 1846 | + slm = XContentHelper.convertToMap(XContentType.JSON.xContent(), is, false); |
| 1847 | + } catch (Exception ignored) { |
| 1848 | + slm = new HashMap<>(); |
| 1849 | + } |
| 1850 | + throw new AssertionError("Index:" + explainIndex(index) + "\nSLM:" + slm, e); |
| 1851 | + } |
| 1852 | + }); |
| 1853 | + } |
1821 | 1854 | }
|
0 commit comments