|
55 | 55 | import java.util.List;
|
56 | 56 | import java.util.Map;
|
57 | 57 | import java.util.concurrent.TimeUnit;
|
| 58 | +import java.util.stream.StreamSupport; |
58 | 59 |
|
59 | 60 | import static java.util.Collections.emptyMap;
|
60 | 61 | import static java.util.Collections.emptySet;
|
61 | 62 | import static org.elasticsearch.action.DocWriteResponse.Result.CREATED;
|
62 | 63 | import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
|
63 | 64 | import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
|
| 65 | +import static org.hamcrest.Matchers.arrayWithSize; |
64 | 66 | import static org.hamcrest.Matchers.equalTo;
|
65 | 67 | import static org.hamcrest.Matchers.hasItem;
|
66 | 68 | import static org.hamcrest.Matchers.hasSize;
|
@@ -142,7 +144,13 @@ private <Req extends ActionRequest, Res extends ActionResponse> ActionFuture<Res
|
142 | 144 | // Wait for no publication in progress to not accidentally cancel a publication different from the one triggered by the given
|
143 | 145 | // request.
|
144 | 146 | assertBusy(
|
145 |
| - () -> assertFalse(((Coordinator) internalCluster().getCurrentMasterNodeInstance(Discovery.class)).publicationInProgress())); |
| 147 | + () -> { |
| 148 | + assertFalse(((Coordinator) internalCluster().getCurrentMasterNodeInstance(Discovery.class)).publicationInProgress()); |
| 149 | + assertThat(StreamSupport.stream( |
| 150 | + internalCluster().getInstances(Discovery.class).spliterator(), false) |
| 151 | + .map(coordinator -> ((Coordinator) coordinator).getLastAcceptedState().version()) |
| 152 | + .distinct().toArray(), arrayWithSize(1)); |
| 153 | + }); |
146 | 154 | ActionFuture<Res> future = req.execute();
|
147 | 155 | assertBusy(
|
148 | 156 | () -> assertTrue(((Coordinator)internalCluster().getCurrentMasterNodeInstance(Discovery.class)).cancelCommittedPublication()));
|
@@ -268,11 +276,9 @@ public void testDelayedMappingPropagationOnPrimary() throws Exception {
|
268 | 276 |
|
269 | 277 | // Now make sure the indexing request finishes successfully
|
270 | 278 | disruption.stopDisrupting();
|
271 |
| - assertBusy(() -> { |
272 |
| - assertTrue(putMappingResponse.get(10, TimeUnit.SECONDS).isAcknowledged()); |
273 |
| - assertThat(docIndexResponse.get(10, TimeUnit.SECONDS), instanceOf(IndexResponse.class)); |
274 |
| - assertEquals(1, docIndexResponse.get(10, TimeUnit.SECONDS).getShardInfo().getTotal()); |
275 |
| - }); |
| 279 | + assertTrue(putMappingResponse.get(10, TimeUnit.SECONDS).isAcknowledged()); |
| 280 | + assertThat(docIndexResponse.get(10, TimeUnit.SECONDS), instanceOf(IndexResponse.class)); |
| 281 | + assertEquals(1, docIndexResponse.get(10, TimeUnit.SECONDS).getShardInfo().getTotal()); |
276 | 282 | }
|
277 | 283 |
|
278 | 284 | public void testDelayedMappingPropagationOnReplica() throws Exception {
|
@@ -373,11 +379,9 @@ public void testDelayedMappingPropagationOnReplica() throws Exception {
|
373 | 379 |
|
374 | 380 | // Now make sure the indexing request finishes successfully
|
375 | 381 | disruption.stopDisrupting();
|
376 |
| - assertBusy(() -> { |
377 |
| - assertTrue(putMappingResponse.get(10, TimeUnit.SECONDS).isAcknowledged()); |
378 |
| - assertThat(docIndexResponse.get(10, TimeUnit.SECONDS), instanceOf(IndexResponse.class)); |
379 |
| - assertEquals(2, docIndexResponse.get(10, TimeUnit.SECONDS).getShardInfo().getTotal()); // both shards should have succeeded |
380 |
| - }); |
| 382 | + assertTrue(putMappingResponse.get(10, TimeUnit.SECONDS).isAcknowledged()); |
| 383 | + assertThat(docIndexResponse.get(10, TimeUnit.SECONDS), instanceOf(IndexResponse.class)); |
| 384 | + assertEquals(2, docIndexResponse.get(10, TimeUnit.SECONDS).getShardInfo().getTotal()); // both shards should have succeeded |
381 | 385 |
|
382 | 386 | assertThat(dynamicMappingsFut.get(10, TimeUnit.SECONDS).getResult(), equalTo(CREATED));
|
383 | 387 | }
|
|
0 commit comments