|
54 | 54 | import java.util.List;
|
55 | 55 | import java.util.Map;
|
56 | 56 | import java.util.concurrent.TimeUnit;
|
| 57 | +import java.util.stream.StreamSupport; |
57 | 58 |
|
58 | 59 | import static java.util.Collections.emptyMap;
|
59 | 60 | import static java.util.Collections.emptySet;
|
60 | 61 | import static org.elasticsearch.action.DocWriteResponse.Result.CREATED;
|
61 | 62 | import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
|
62 | 63 | import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
|
| 64 | +import static org.hamcrest.Matchers.arrayWithSize; |
63 | 65 | import static org.hamcrest.Matchers.equalTo;
|
64 | 66 | import static org.hamcrest.Matchers.hasItem;
|
65 | 67 | import static org.hamcrest.Matchers.hasSize;
|
@@ -141,7 +143,13 @@ private <Req extends ActionRequest, Res extends ActionResponse> ActionFuture<Res
|
141 | 143 | // Wait for no publication in progress to not accidentally cancel a publication different from the one triggered by the given
|
142 | 144 | // request.
|
143 | 145 | assertBusy(
|
144 |
| - () -> assertFalse(((Coordinator) internalCluster().getCurrentMasterNodeInstance(Discovery.class)).publicationInProgress())); |
| 146 | + () -> { |
| 147 | + assertFalse(((Coordinator) internalCluster().getCurrentMasterNodeInstance(Discovery.class)).publicationInProgress()); |
| 148 | + assertThat(StreamSupport.stream( |
| 149 | + internalCluster().getInstances(Discovery.class).spliterator(), false) |
| 150 | + .map(coordinator -> ((Coordinator) coordinator).getLastAcceptedState().version()) |
| 151 | + .distinct().toArray(), arrayWithSize(1)); |
| 152 | + }); |
145 | 153 | ActionFuture<Res> future = req.execute();
|
146 | 154 | assertBusy(
|
147 | 155 | () -> assertTrue(((Coordinator)internalCluster().getCurrentMasterNodeInstance(Discovery.class)).cancelCommittedPublication()));
|
@@ -265,11 +273,9 @@ public void testDelayedMappingPropagationOnPrimary() throws Exception {
|
265 | 273 |
|
266 | 274 | // Now make sure the indexing request finishes successfully
|
267 | 275 | disruption.stopDisrupting();
|
268 |
| - assertBusy(() -> { |
269 |
| - assertTrue(putMappingResponse.get(10, TimeUnit.SECONDS).isAcknowledged()); |
270 |
| - assertThat(docIndexResponse.get(10, TimeUnit.SECONDS), instanceOf(IndexResponse.class)); |
271 |
| - assertEquals(1, docIndexResponse.get(10, TimeUnit.SECONDS).getShardInfo().getTotal()); |
272 |
| - }); |
| 276 | + assertTrue(putMappingResponse.get(10, TimeUnit.SECONDS).isAcknowledged()); |
| 277 | + assertThat(docIndexResponse.get(10, TimeUnit.SECONDS), instanceOf(IndexResponse.class)); |
| 278 | + assertEquals(1, docIndexResponse.get(10, TimeUnit.SECONDS).getShardInfo().getTotal()); |
273 | 279 | }
|
274 | 280 |
|
275 | 281 | public void testDelayedMappingPropagationOnReplica() throws Exception {
|
@@ -370,11 +376,9 @@ public void testDelayedMappingPropagationOnReplica() throws Exception {
|
370 | 376 |
|
371 | 377 | // Now make sure the indexing request finishes successfully
|
372 | 378 | disruption.stopDisrupting();
|
373 |
| - assertBusy(() -> { |
374 |
| - assertTrue(putMappingResponse.get(10, TimeUnit.SECONDS).isAcknowledged()); |
375 |
| - assertThat(docIndexResponse.get(10, TimeUnit.SECONDS), instanceOf(IndexResponse.class)); |
376 |
| - assertEquals(2, docIndexResponse.get(10, TimeUnit.SECONDS).getShardInfo().getTotal()); // both shards should have succeeded |
377 |
| - }); |
| 379 | + assertTrue(putMappingResponse.get(10, TimeUnit.SECONDS).isAcknowledged()); |
| 380 | + assertThat(docIndexResponse.get(10, TimeUnit.SECONDS), instanceOf(IndexResponse.class)); |
| 381 | + assertEquals(2, docIndexResponse.get(10, TimeUnit.SECONDS).getShardInfo().getTotal()); // both shards should have succeeded |
378 | 382 |
|
379 | 383 | assertThat(dynamicMappingsFut.get(10, TimeUnit.SECONDS).getResult(), equalTo(CREATED));
|
380 | 384 | }
|
|
0 commit comments