Skip to content

Commit dfe8463

Browse files
committed
Recover closed indices after a full cluster restart (elastic#39249)
Closing an index is a process that can be broken down into several steps: 1. first, the state of the cluster is updated to add a write block on the index to be closed 2. then, a transport replication action is executed on all shards of the index. This action checks that the maximum sequence number and the global checkpoint have identical values, indicating that all in flight writing operations have been completed on the shard. 3. finally, and if the previous steps were successful, the cluster state is updated again to change the state of the index from `OPEN`to `CLOSE`. During the last step, the master node retrieves the minimum node version among all the nodes that compose the cluster: * If a node is in pre 8.0 version, the index is closed and the index routing table is removed from the cluster state. This is the "old" way of closing indices and closed indices with no routing table are not replicated. * If all nodes are in version 8.0 or higher, the index is closed and its routing table is reinitialized in cluster state. This is the new way of closing indices and such closed indices will be replicated in the cluster. But routing tables are not persisted in the cluster state, so after a full cluster restart there is no way to make the distinction between an index closed in 7.x and an index closed and replicated on 8.0. This commit introduces a new private index settings named `index.verified_before_close` that is added to closed indices that are replicated at closing time. This setting serves as a marker to indicate that the index has been closed using the new Close Index API on a cluster that supports replication of closed indices. This way, after a full cluster restart, the Gateway service can automatically recovers those closed indices as if they were opened indices. Closed indices that don't have this setting (because they were closed on a pre-8.0 cluster, or a cluster in mixed version) won't be recovered and will need to be reopened and closed again on a 8.0 cluster. Note that reopening the index removes the private setting. Relates to elastic#33888
1 parent 95d9e9a commit dfe8463

File tree

13 files changed

+419
-32
lines changed

13 files changed

+419
-32
lines changed

qa/full-cluster-restart/src/test/java/org/elasticsearch/upgrades/FullClusterRestartIT.java

Lines changed: 97 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import org.elasticsearch.client.RestClient;
2828
import org.elasticsearch.client.WarningFailureException;
2929
import org.elasticsearch.cluster.metadata.IndexMetaData;
30+
import org.elasticsearch.cluster.metadata.MetaDataIndexStateService;
3031
import org.elasticsearch.common.Booleans;
3132
import org.elasticsearch.common.CheckedFunction;
3233
import org.elasticsearch.common.Strings;
@@ -47,6 +48,7 @@
4748
import java.io.IOException;
4849
import java.util.ArrayList;
4950
import java.util.Base64;
51+
import java.util.Collection;
5052
import java.util.HashMap;
5153
import java.util.HashSet;
5254
import java.util.List;
@@ -65,8 +67,11 @@
6567
import static org.hamcrest.Matchers.containsString;
6668
import static org.hamcrest.Matchers.equalTo;
6769
import static org.hamcrest.Matchers.greaterThan;
70+
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
6871
import static org.hamcrest.Matchers.hasSize;
72+
import static org.hamcrest.Matchers.is;
6973
import static org.hamcrest.Matchers.notNullValue;
74+
import static org.hamcrest.Matchers.nullValue;
7075
import static org.hamcrest.Matchers.startsWith;
7176

7277
/**
@@ -1022,8 +1027,98 @@ public void testSoftDeletes() throws Exception {
10221027
}
10231028
}
10241029

1025-
private void checkSnapshot(final String snapshotName, final int count, final Version tookOnVersion)
1026-
throws IOException {
1030+
/**
1031+
* This test creates an index in the old cluster and then closes it. When the cluster is fully restarted in a newer version,
1032+
* it verifies that the index exists and is replicated if the old version supports replication.
1033+
*/
1034+
public void testClosedIndices() throws Exception {
1035+
if (isRunningAgainstOldCluster()) {
1036+
createIndex(index, Settings.builder()
1037+
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1)
1038+
.build());
1039+
ensureGreen(index);
1040+
1041+
int numDocs = 0;
1042+
if (randomBoolean()) {
1043+
numDocs = between(1, 100);
1044+
for (int i = 0; i < numDocs; i++) {
1045+
final Request request = new Request("POST", "/" + index + "/_doc/" + i);
1046+
request.setJsonEntity(Strings.toString(JsonXContent.contentBuilder().startObject().field("field", "v1").endObject()));
1047+
assertOK(client().performRequest(request));
1048+
if (rarely()) {
1049+
refresh();
1050+
}
1051+
}
1052+
refresh();
1053+
}
1054+
1055+
assertTotalHits(numDocs, entityAsMap(client().performRequest(new Request("GET", "/" + index + "/_search"))));
1056+
saveInfoDocument(index + "_doc_count", Integer.toString(numDocs));
1057+
closeIndex(index);
1058+
}
1059+
1060+
if (getOldClusterVersion().onOrAfter(Version.V_8_0_0)) {
1061+
ensureGreenLongWait(index);
1062+
assertClosedIndex(index, true);
1063+
} else {
1064+
assertClosedIndex(index, false);
1065+
}
1066+
1067+
if (isRunningAgainstOldCluster() == false) {
1068+
openIndex(index);
1069+
ensureGreen(index);
1070+
1071+
final int expectedNumDocs = Integer.parseInt(loadInfoDocument(index + "_doc_count"));
1072+
assertTotalHits(expectedNumDocs, entityAsMap(client().performRequest(new Request("GET", "/" + index + "/_search"))));
1073+
}
1074+
}
1075+
1076+
/**
1077+
* Asserts that an index is closed in the cluster state. If `checkRoutingTable` is true, it also asserts
1078+
* that the index has started shards.
1079+
*/
1080+
@SuppressWarnings("unchecked")
1081+
private void assertClosedIndex(final String index, final boolean checkRoutingTable) throws IOException {
1082+
final Map<String, ?> state = entityAsMap(client().performRequest(new Request("GET", "/_cluster/state")));
1083+
1084+
final Map<String, ?> metadata = (Map<String, Object>) XContentMapValues.extractValue("metadata.indices." + index, state);
1085+
assertThat(metadata, notNullValue());
1086+
assertThat(metadata.get("state"), equalTo("close"));
1087+
1088+
final Map<String, ?> blocks = (Map<String, Object>) XContentMapValues.extractValue("blocks.indices." + index, state);
1089+
assertThat(blocks, notNullValue());
1090+
assertThat(blocks.containsKey(String.valueOf(MetaDataIndexStateService.INDEX_CLOSED_BLOCK_ID)), is(true));
1091+
1092+
final Map<String, ?> settings = (Map<String, Object>) XContentMapValues.extractValue("settings", metadata);
1093+
assertThat(settings, notNullValue());
1094+
1095+
final Map<String, ?> routingTable = (Map<String, Object>) XContentMapValues.extractValue("routing_table.indices." + index, state);
1096+
if (checkRoutingTable) {
1097+
assertThat(routingTable, notNullValue());
1098+
assertThat(Booleans.parseBoolean((String) XContentMapValues.extractValue("index.verified_before_close", settings)), is(true));
1099+
final String numberOfShards = (String) XContentMapValues.extractValue("index.number_of_shards", settings);
1100+
assertThat(numberOfShards, notNullValue());
1101+
final int nbShards = Integer.parseInt(numberOfShards);
1102+
assertThat(nbShards, greaterThanOrEqualTo(1));
1103+
1104+
for (int i = 0; i < nbShards; i++) {
1105+
final Collection<Map<String, ?>> shards =
1106+
(Collection<Map<String, ?>>) XContentMapValues.extractValue("shards." + i, routingTable);
1107+
assertThat(shards, notNullValue());
1108+
assertThat(shards.size(), equalTo(2));
1109+
for (Map<String, ?> shard : shards) {
1110+
assertThat(XContentMapValues.extractValue("shard", shard), equalTo(i));
1111+
assertThat(XContentMapValues.extractValue("state", shard), equalTo("STARTED"));
1112+
assertThat(XContentMapValues.extractValue("index", shard), equalTo(index));
1113+
}
1114+
}
1115+
} else {
1116+
assertThat(routingTable, nullValue());
1117+
assertThat(XContentMapValues.extractValue("index.verified_before_close", settings), nullValue());
1118+
}
1119+
}
1120+
1121+
private void checkSnapshot(final String snapshotName, final int count, final Version tookOnVersion) throws IOException {
10271122
// Check the snapshot metadata, especially the version
10281123
Request listSnapshotRequest = new Request("GET", "/_snapshot/repo/" + snapshotName);
10291124
Map<String, Object> listSnapshotResponse = entityAsMap(client().performRequest(listSnapshotRequest));

qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/RecoveryIT.java

Lines changed: 147 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,15 +24,20 @@
2424
import org.elasticsearch.client.Response;
2525
import org.elasticsearch.client.ResponseException;
2626
import org.elasticsearch.cluster.metadata.IndexMetaData;
27+
import org.elasticsearch.cluster.metadata.MetaDataIndexStateService;
28+
import org.elasticsearch.common.Booleans;
2729
import org.elasticsearch.common.settings.Settings;
2830
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
31+
import org.elasticsearch.common.xcontent.support.XContentMapValues;
2932
import org.elasticsearch.index.IndexSettings;
3033
import org.elasticsearch.rest.action.document.RestIndexAction;
3134
import org.elasticsearch.test.rest.yaml.ObjectPath;
3235

3336
import java.io.IOException;
3437
import java.util.ArrayList;
38+
import java.util.Collection;
3539
import java.util.List;
40+
import java.util.Locale;
3641
import java.util.Map;
3742
import java.util.concurrent.Future;
3843
import java.util.function.Predicate;
@@ -43,7 +48,9 @@
4348
import static org.elasticsearch.cluster.routing.allocation.decider.MaxRetryAllocationDecider.SETTING_ALLOCATION_MAX_RETRY;
4449
import static org.hamcrest.Matchers.equalTo;
4550
import static org.hamcrest.Matchers.hasSize;
51+
import static org.hamcrest.Matchers.is;
4652
import static org.hamcrest.Matchers.notNullValue;
53+
import static org.hamcrest.Matchers.nullValue;
4754

4855
/**
4956
* In depth testing of the recovery mechanism during a rolling restart.
@@ -310,4 +317,144 @@ public void testRecoveryWithSoftDeletes() throws Exception {
310317
}
311318
ensureGreen(index);
312319
}
320+
321+
/**
322+
* This test creates an index in the non upgraded cluster and closes it. It then checks that the index
323+
* is effectively closed and potentially replicated (if the version the index was created on supports
324+
* the replication of closed indices) during the rolling upgrade.
325+
*/
326+
public void testRecoveryClosedIndex() throws Exception {
327+
final String indexName = "closed_index_created_on_old";
328+
if (CLUSTER_TYPE == ClusterType.OLD) {
329+
createIndex(indexName, Settings.builder()
330+
.put(IndexMetaData.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 1)
331+
.put(IndexMetaData.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), 1)
332+
// if the node with the replica is the first to be restarted, while a replica is still recovering
333+
// then delayed allocation will kick in. When the node comes back, the master will search for a copy
334+
// but the recovering copy will be seen as invalid and the cluster health won't return to GREEN
335+
// before timing out
336+
.put(INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), "100ms")
337+
.put(SETTING_ALLOCATION_MAX_RETRY.getKey(), "0") // fail faster
338+
.build());
339+
ensureGreen(indexName);
340+
closeIndex(indexName);
341+
}
342+
343+
final Version indexVersionCreated = indexVersionCreated(indexName);
344+
if (indexVersionCreated.onOrAfter(Version.V_8_0_0)) {
345+
// index was created on a version that supports the replication of closed indices,
346+
// so we expect the index to be closed and replicated
347+
ensureGreen(indexName);
348+
assertClosedIndex(indexName, true);
349+
} else {
350+
assertClosedIndex(indexName, false);
351+
}
352+
}
353+
354+
/**
355+
* This test creates and closes a new index at every stage of the rolling upgrade. It then checks that the index
356+
* is effectively closed and potentially replicated if the cluster supports replication of closed indices at the
357+
* time the index was closed.
358+
*/
359+
public void testCloseIndexDuringRollingUpgrade() throws Exception {
360+
final Version minimumNodeVersion = minimumNodeVersion();
361+
final String indexName =
362+
String.join("_", "index", CLUSTER_TYPE.toString(), Integer.toString(minimumNodeVersion.id)).toLowerCase(Locale.ROOT);
363+
364+
if (indexExists(indexName) == false) {
365+
createIndex(indexName, Settings.builder()
366+
.put(IndexMetaData.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 1)
367+
.put(IndexMetaData.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), 0)
368+
.build());
369+
ensureGreen(indexName);
370+
closeIndex(indexName);
371+
}
372+
373+
if (minimumNodeVersion.onOrAfter(Version.V_8_0_0)) {
374+
// index is created on a version that supports the replication of closed indices,
375+
// so we expect the index to be closed and replicated
376+
ensureGreen(indexName);
377+
assertClosedIndex(indexName, true);
378+
} else {
379+
assertClosedIndex(indexName, false);
380+
}
381+
}
382+
383+
/**
384+
* Returns the version in which the given index has been created
385+
*/
386+
private static Version indexVersionCreated(final String indexName) throws IOException {
387+
final Request request = new Request("GET", "/" + indexName + "/_settings");
388+
final String versionCreatedSetting = indexName + ".settings.index.version.created";
389+
request.addParameter("filter_path", versionCreatedSetting);
390+
391+
final Response response = client().performRequest(request);
392+
return Version.fromId(Integer.parseInt(ObjectPath.createFromResponse(response).evaluate(versionCreatedSetting)));
393+
}
394+
395+
/**
396+
* Returns the minimum node version among all nodes of the cluster
397+
*/
398+
private static Version minimumNodeVersion() throws IOException {
399+
final Request request = new Request("GET", "_nodes");
400+
request.addParameter("filter_path", "nodes.*.version");
401+
402+
final Response response = client().performRequest(request);
403+
final Map<String, Object> nodes = ObjectPath.createFromResponse(response).evaluate("nodes");
404+
405+
Version minVersion = null;
406+
for (Map.Entry<String, Object> node : nodes.entrySet()) {
407+
@SuppressWarnings("unchecked")
408+
Version nodeVersion = Version.fromString((String) ((Map<String, Object>) node.getValue()).get("version"));
409+
if (minVersion == null || minVersion.after(nodeVersion)) {
410+
minVersion = nodeVersion;
411+
}
412+
}
413+
assertNotNull(minVersion);
414+
return minVersion;
415+
}
416+
417+
/**
418+
* Asserts that an index is closed in the cluster state. If `checkRoutingTable` is true, it also asserts
419+
* that the index has started shards.
420+
*/
421+
@SuppressWarnings("unchecked")
422+
private void assertClosedIndex(final String index, final boolean checkRoutingTable) throws IOException {
423+
final Map<String, ?> state = entityAsMap(client().performRequest(new Request("GET", "/_cluster/state")));
424+
425+
final Map<String, ?> metadata = (Map<String, Object>) XContentMapValues.extractValue("metadata.indices." + index, state);
426+
assertThat(metadata, notNullValue());
427+
assertThat(metadata.get("state"), equalTo("close"));
428+
429+
final Map<String, ?> blocks = (Map<String, Object>) XContentMapValues.extractValue("blocks.indices." + index, state);
430+
assertThat(blocks, notNullValue());
431+
assertThat(blocks.containsKey(String.valueOf(MetaDataIndexStateService.INDEX_CLOSED_BLOCK_ID)), is(true));
432+
433+
final Map<String, ?> settings = (Map<String, Object>) XContentMapValues.extractValue("settings", metadata);
434+
assertThat(settings, notNullValue());
435+
436+
final int numberOfShards = Integer.parseInt((String) XContentMapValues.extractValue("index.number_of_shards", settings));
437+
final int numberOfReplicas = Integer.parseInt((String) XContentMapValues.extractValue("index.number_of_replicas", settings));
438+
439+
final Map<String, ?> routingTable = (Map<String, Object>) XContentMapValues.extractValue("routing_table.indices." + index, state);
440+
if (checkRoutingTable) {
441+
assertThat(routingTable, notNullValue());
442+
assertThat(Booleans.parseBoolean((String) XContentMapValues.extractValue("index.verified_before_close", settings)), is(true));
443+
444+
for (int i = 0; i < numberOfShards; i++) {
445+
final Collection<Map<String, ?>> shards =
446+
(Collection<Map<String, ?>>) XContentMapValues.extractValue("shards." + i, routingTable);
447+
assertThat(shards, notNullValue());
448+
assertThat(shards.size(), equalTo(numberOfReplicas + 1));
449+
for (Map<String, ?> shard : shards) {
450+
assertThat(XContentMapValues.extractValue("shard", shard), equalTo(i));
451+
assertThat(XContentMapValues.extractValue("state", shard), equalTo("STARTED"));
452+
assertThat(XContentMapValues.extractValue("index", shard), equalTo(index));
453+
}
454+
}
455+
} else {
456+
assertThat(routingTable, nullValue());
457+
assertThat(XContentMapValues.extractValue("index.verified_before_close", settings), nullValue());
458+
}
459+
}
313460
}

server/src/main/java/org/elasticsearch/cluster/metadata/MetaDataIndexStateService.java

Lines changed: 27 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,8 @@
5252
import org.elasticsearch.common.ValidationException;
5353
import org.elasticsearch.common.collect.ImmutableOpenIntMap;
5454
import org.elasticsearch.common.inject.Inject;
55+
import org.elasticsearch.common.settings.Setting;
56+
import org.elasticsearch.common.settings.Settings;
5557
import org.elasticsearch.common.unit.TimeValue;
5658
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
5759
import org.elasticsearch.common.util.concurrent.AtomicArray;
@@ -90,6 +92,8 @@ public class MetaDataIndexStateService {
9092
public static final int INDEX_CLOSED_BLOCK_ID = 4;
9193
public static final ClusterBlock INDEX_CLOSED_BLOCK = new ClusterBlock(4, "index closed", false,
9294
false, false, RestStatus.FORBIDDEN, ClusterBlockLevel.READ_WRITE);
95+
public static final Setting<Boolean> VERIFIED_BEFORE_CLOSE_SETTING =
96+
Setting.boolSetting("index.verified_before_close", false, Setting.Property.IndexScope, Setting.Property.PrivateIndex);
9397

9498
private final ClusterService clusterService;
9599
private final AllocationService allocationService;
@@ -402,15 +406,22 @@ static ClusterState closeRoutingTable(final ClusterState currentState,
402406
continue;
403407
}
404408

405-
logger.debug("closing index {} succeeded", index);
406-
metadata.put(IndexMetaData.builder(indexMetaData).state(IndexMetaData.State.CLOSE));
407409
blocks.removeIndexBlockWithId(index.getName(), INDEX_CLOSED_BLOCK_ID);
408410
blocks.addIndexBlock(index.getName(), INDEX_CLOSED_BLOCK);
411+
final IndexMetaData.Builder updatedMetaData = IndexMetaData.builder(indexMetaData).state(IndexMetaData.State.CLOSE);
409412
if (removeRoutingTable) {
413+
metadata.put(updatedMetaData);
410414
routingTable.remove(index.getName());
411415
} else {
416+
metadata.put(updatedMetaData
417+
.settingsVersion(indexMetaData.getSettingsVersion() + 1)
418+
.settings(Settings.builder()
419+
.put(indexMetaData.getSettings())
420+
.put(VERIFIED_BEFORE_CLOSE_SETTING.getKey(), true)));
412421
routingTable.addAsFromOpenToClose(metadata.getSafe(index));
413422
}
423+
424+
logger.debug("closing index {} succeeded", index);
414425
closedIndices.add(index.getName());
415426
} catch (final IndexNotFoundException e) {
416427
logger.debug("index {} has been deleted since it was blocked before closing, ignoring", index);
@@ -490,7 +501,15 @@ ClusterState openIndices(final Index[] indices, final ClusterState currentState)
490501
for (IndexMetaData indexMetaData : indicesToOpen) {
491502
final Index index = indexMetaData.getIndex();
492503
if (indexMetaData.getState() != IndexMetaData.State.OPEN) {
493-
IndexMetaData updatedIndexMetaData = IndexMetaData.builder(indexMetaData).state(IndexMetaData.State.OPEN).build();
504+
final Settings.Builder updatedSettings = Settings.builder().put(indexMetaData.getSettings());
505+
updatedSettings.remove(VERIFIED_BEFORE_CLOSE_SETTING.getKey());
506+
507+
IndexMetaData updatedIndexMetaData = IndexMetaData.builder(indexMetaData)
508+
.state(IndexMetaData.State.OPEN)
509+
.settingsVersion(indexMetaData.getSettingsVersion() + 1)
510+
.settings(updatedSettings)
511+
.build();
512+
494513
// The index might be closed because we couldn't import it due to old incompatible version
495514
// We need to check that this index can be upgraded to the current version
496515
updatedIndexMetaData = metaDataIndexUpgradeService.upgradeIndexMetaData(updatedIndexMetaData, minIndexCompatibilityVersion);
@@ -554,4 +573,9 @@ public static ClusterBlock createIndexClosingBlock() {
554573
EnumSet.of(ClusterBlockLevel.WRITE));
555574
}
556575

576+
public static boolean isIndexVerifiedBeforeClosed(final IndexMetaData indexMetaData) {
577+
return indexMetaData.getState() == IndexMetaData.State.CLOSE
578+
&& VERIFIED_BEFORE_CLOSE_SETTING.exists(indexMetaData.getSettings())
579+
&& VERIFIED_BEFORE_CLOSE_SETTING.get(indexMetaData.getSettings());
580+
}
557581
}

0 commit comments

Comments
 (0)