|
22 | 22 | import org.apache.http.entity.StringEntity;
|
23 | 23 | import org.elasticsearch.Version;
|
24 | 24 | import org.elasticsearch.action.support.PlainActionFuture;
|
| 25 | +import org.elasticsearch.client.Request; |
25 | 26 | import org.elasticsearch.client.Response;
|
| 27 | +import org.elasticsearch.client.ResponseException; |
26 | 28 | import org.elasticsearch.cluster.metadata.IndexMetaData;
|
27 | 29 | import org.elasticsearch.common.settings.Settings;
|
28 | 30 | import org.elasticsearch.common.util.concurrent.AbstractRunnable;
|
@@ -239,4 +241,34 @@ public void testRelocationWithConcurrentIndexing() throws Exception {
|
239 | 241 | }
|
240 | 242 | }
|
241 | 243 |
|
| 244 | + public void testRecoverSyncedFlushIndex() throws Exception { |
| 245 | + final String index = "recover_synced_flush_index"; |
| 246 | + if (CLUSTER_TYPE == ClusterType.OLD) { |
| 247 | + Settings.Builder settings = Settings.builder() |
| 248 | + .put(IndexMetaData.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 1) |
| 249 | + .put(IndexMetaData.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), 1) |
| 250 | + // if the node with the replica is the first to be restarted, while a replica is still recovering |
| 251 | + // then delayed allocation will kick in. When the node comes back, the master will search for a copy |
| 252 | + // but the recovering copy will be seen as invalid and the cluster health won't return to GREEN |
| 253 | + // before timing out |
| 254 | + .put(INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), "100ms") |
| 255 | + .put(SETTING_ALLOCATION_MAX_RETRY.getKey(), "0"); // fail faster |
| 256 | + createIndex(index, settings.build()); |
| 257 | + indexDocs(index, 0, randomInt(5)); |
| 258 | + // We have to spin synced-flush requests here because we fire the global checkpoint sync for the last write operation. |
| 259 | + // A synced-flush request considers the global checkpoint sync as an going operation because it acquires a shard permit. |
| 260 | + assertBusy(() -> { |
| 261 | + try { |
| 262 | + Response resp = client().performRequest(new Request("POST", index + "/_flush/synced")); |
| 263 | + Map<String, Object> result = ObjectPath.createFromResponse(resp).evaluate("_shards"); |
| 264 | + assertThat(result.get("successful"), equalTo(result.get("total"))); |
| 265 | + assertThat(result.get("failed"), equalTo(0)); |
| 266 | + } catch (ResponseException ex) { |
| 267 | + throw new AssertionError(ex); // cause assert busy to retry |
| 268 | + } |
| 269 | + }); |
| 270 | + } |
| 271 | + ensureGreen(index); |
| 272 | + } |
| 273 | + |
242 | 274 | }
|
0 commit comments