Skip to content

Commit 02cc730

Browse files
authored
Allow shards of closed indices to be replicated as regular shards (#38024)
This commit allows shards of indices in CLOSE state to be replicated as normal shards. It changes the MetaDataIndexStateService so that index routing tables of closed indices are kept in cluster state when the index is closed. Index routing tables are modified so that shard routings are reinitialized with the INDEX_CLOSED unassigned information. The IndicesClusterStateService is modified to remove IndexService instances of closed or reopened indices. In combination with the ShardRouting being in INITIALIZING state the shards are recreated on the data nodes to reflect the new state. If the index state is closed, the IndexShard instances will be created using the NoOpEngine as the engine implementation. This commit also mutes two tests that rely on the fact that shard locks are released when an index is closed, which is not the case anymore with replicated closed indices (actually the locks are released but reacquired once the shard is reinitialized after being closed). These tests will be adapted in follow up PRs. Finally, many things will require to be adapted or improved in follow up PRs (see #33888) but this is the first big step towards replicated closed indices. Relates to #33888
1 parent fd1046c commit 02cc730

File tree

16 files changed

+132
-69
lines changed

16 files changed

+132
-69
lines changed

server/src/main/java/org/elasticsearch/cluster/metadata/MetaDataIndexStateService.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -410,14 +410,16 @@ static ClusterState closeRoutingTable(final ClusterState currentState,
410410
}
411411

412412
logger.debug("closing index {} succeeded", index);
413-
blocks.removeIndexBlockWithId(index.getName(), INDEX_CLOSED_BLOCK_ID).addIndexBlock(index.getName(), INDEX_CLOSED_BLOCK);
414413
metadata.put(IndexMetaData.builder(indexMetaData).state(IndexMetaData.State.CLOSE));
415-
routingTable.remove(index.getName());
414+
blocks.removeIndexBlockWithId(index.getName(), INDEX_CLOSED_BLOCK_ID);
415+
blocks.addIndexBlock(index.getName(), INDEX_CLOSED_BLOCK);
416+
routingTable.addAsFromOpenToClose(metadata.getSafe(index));
416417
closedIndices.add(index.getName());
417418
} catch (final IndexNotFoundException e) {
418419
logger.debug("index {} has been deleted since it was blocked before closing, ignoring", index);
419420
}
420421
}
422+
421423
logger.info("completed closing of indices {}", closedIndices);
422424
return ClusterState.builder(currentState).blocks(blocks).metaData(metadata).routingTable(routingTable.build()).build();
423425
}

server/src/main/java/org/elasticsearch/cluster/routing/IndexRoutingTable.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -358,6 +358,13 @@ public Builder initializeAsFromCloseToOpen(IndexMetaData indexMetaData) {
358358
return initializeEmpty(indexMetaData, new UnassignedInfo(UnassignedInfo.Reason.INDEX_REOPENED, null));
359359
}
360360

361+
/**
362+
* Initializes a new empty index, as as a result of closing an opened index.
363+
*/
364+
public Builder initializeAsFromOpenToClose(IndexMetaData indexMetaData) {
365+
return initializeEmpty(indexMetaData, new UnassignedInfo(UnassignedInfo.Reason.INDEX_CLOSED, null));
366+
}
367+
361368
/**
362369
* Initializes a new empty index, to be restored from a snapshot
363370
*/

server/src/main/java/org/elasticsearch/cluster/routing/RoutingTable.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -540,6 +540,13 @@ public Builder addAsFromCloseToOpen(IndexMetaData indexMetaData) {
540540
return this;
541541
}
542542

543+
public Builder addAsFromOpenToClose(IndexMetaData indexMetaData) {
544+
assert indexMetaData.getState() == IndexMetaData.State.CLOSE;
545+
IndexRoutingTable.Builder indexRoutingBuilder = new IndexRoutingTable.Builder(indexMetaData.getIndex())
546+
.initializeAsFromOpenToClose(indexMetaData);
547+
return add(indexRoutingBuilder);
548+
}
549+
543550
public Builder addAsRestore(IndexMetaData indexMetaData, SnapshotRecoverySource recoverySource) {
544551
IndexRoutingTable.Builder indexRoutingBuilder = new IndexRoutingTable.Builder(indexMetaData.getIndex())
545552
.initializeAsRestore(indexMetaData, recoverySource);

server/src/main/java/org/elasticsearch/cluster/routing/UnassignedInfo.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -118,7 +118,11 @@ public enum Reason {
118118
/**
119119
* Forced manually to allocate
120120
*/
121-
MANUAL_ALLOCATION
121+
MANUAL_ALLOCATION,
122+
/**
123+
* Unassigned as a result of closing an index.
124+
*/
125+
INDEX_CLOSED
122126
}
123127

124128
/**
@@ -269,6 +273,8 @@ public UnassignedInfo(StreamInput in) throws IOException {
269273
public void writeTo(StreamOutput out) throws IOException {
270274
if (out.getVersion().before(Version.V_6_0_0_beta2) && reason == Reason.MANUAL_ALLOCATION) {
271275
out.writeByte((byte) Reason.ALLOCATION_FAILED.ordinal());
276+
} else if (out.getVersion().before(Version.V_7_0_0) && reason == Reason.INDEX_CLOSED) {
277+
out.writeByte((byte) Reason.REINITIALIZED.ordinal());
272278
} else {
273279
out.writeByte((byte) reason.ordinal());
274280
}

server/src/main/java/org/elasticsearch/index/engine/NoOpEngine.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -44,8 +44,7 @@ public NoOpEngine(EngineConfig config) {
4444
protected DirectoryReader open(final IndexCommit commit) throws IOException {
4545
final Directory directory = commit.getDirectory();
4646
final List<IndexCommit> indexCommits = DirectoryReader.listCommits(directory);
47-
assert indexCommits.size() == 1 : "expected only one commit point";
48-
IndexCommit indexCommit = indexCommits.get(indexCommits.size() - 1);
47+
final IndexCommit indexCommit = indexCommits.get(indexCommits.size() - 1);
4948
return new DirectoryReader(directory, new LeafReader[0]) {
5049
@Override
5150
protected DirectoryReader doOpenIfChanged() throws IOException {

server/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java

Lines changed: 31 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,7 @@
9898
import static org.elasticsearch.indices.cluster.IndicesClusterStateService.AllocatedIndices.IndexRemovalReason.DELETED;
9999
import static org.elasticsearch.indices.cluster.IndicesClusterStateService.AllocatedIndices.IndexRemovalReason.FAILURE;
100100
import static org.elasticsearch.indices.cluster.IndicesClusterStateService.AllocatedIndices.IndexRemovalReason.NO_LONGER_ASSIGNED;
101+
import static org.elasticsearch.indices.cluster.IndicesClusterStateService.AllocatedIndices.IndexRemovalReason.REOPENED;
101102

102103
public class IndicesClusterStateService extends AbstractLifecycleComponent implements ClusterStateApplier {
103104
private static final Logger logger = LogManager.getLogger(IndicesClusterStateService.class);
@@ -240,7 +241,7 @@ public synchronized void applyClusterState(final ClusterChangedEvent event) {
240241

241242
deleteIndices(event); // also deletes shards of deleted indices
242243

243-
removeUnallocatedIndices(event); // also removes shards of removed indices
244+
removeIndices(event); // also removes shards of removed indices
244245

245246
failMissingShards(state);
246247

@@ -352,17 +353,18 @@ protected void doRun() throws Exception {
352353
}
353354

354355
/**
355-
* Removes indices that have no shards allocated to this node. This does not delete the shard data as we wait for enough
356-
* shard copies to exist in the cluster before deleting shard data (triggered by {@link org.elasticsearch.indices.store.IndicesStore}).
356+
* Removes indices that have no shards allocated to this node or indices whose state has changed. This does not delete the shard data
357+
* as we wait for enough shard copies to exist in the cluster before deleting shard data (triggered by
358+
* {@link org.elasticsearch.indices.store.IndicesStore}).
357359
*
358360
* @param event the cluster changed event
359361
*/
360-
private void removeUnallocatedIndices(final ClusterChangedEvent event) {
362+
private void removeIndices(final ClusterChangedEvent event) {
361363
final ClusterState state = event.state();
362364
final String localNodeId = state.nodes().getLocalNodeId();
363365
assert localNodeId != null;
364366

365-
Set<Index> indicesWithShards = new HashSet<>();
367+
final Set<Index> indicesWithShards = new HashSet<>();
366368
RoutingNode localRoutingNode = state.getRoutingNodes().node(localNodeId);
367369
if (localRoutingNode != null) { // null e.g. if we are not a data node
368370
for (ShardRouting shardRouting : localRoutingNode) {
@@ -371,20 +373,27 @@ private void removeUnallocatedIndices(final ClusterChangedEvent event) {
371373
}
372374

373375
for (AllocatedIndex<? extends Shard> indexService : indicesService) {
374-
Index index = indexService.index();
375-
if (indicesWithShards.contains(index) == false) {
376+
final Index index = indexService.index();
377+
final IndexMetaData indexMetaData = state.metaData().index(index);
378+
final IndexMetaData existingMetaData = indexService.getIndexSettings().getIndexMetaData();
379+
380+
AllocatedIndices.IndexRemovalReason reason = null;
381+
if (indexMetaData != null && indexMetaData.getState() != existingMetaData.getState()) {
382+
reason = indexMetaData.getState() == IndexMetaData.State.CLOSE ? CLOSED : REOPENED;
383+
} else if (indicesWithShards.contains(index) == false) {
376384
// if the cluster change indicates a brand new cluster, we only want
377385
// to remove the in-memory structures for the index and not delete the
378386
// contents on disk because the index will later be re-imported as a
379387
// dangling index
380-
final IndexMetaData indexMetaData = state.metaData().index(index);
381388
assert indexMetaData != null || event.isNewCluster() :
382389
"index " + index + " does not exist in the cluster state, it should either " +
383390
"have been deleted or the cluster must be new";
384-
final AllocatedIndices.IndexRemovalReason reason =
385-
indexMetaData != null && indexMetaData.getState() == IndexMetaData.State.CLOSE ? CLOSED : NO_LONGER_ASSIGNED;
386-
logger.debug("{} removing index, [{}]", index, reason);
387-
indicesService.removeIndex(index, reason, "removing index (no shards allocated)");
391+
reason = indexMetaData != null && indexMetaData.getState() == IndexMetaData.State.CLOSE ? CLOSED : NO_LONGER_ASSIGNED;
392+
}
393+
394+
if (reason != null) {
395+
logger.debug("{} removing index ({})", index, reason);
396+
indicesService.removeIndex(index, reason, "removing index (" + reason + ")");
388397
}
389398
}
390399
}
@@ -595,7 +604,7 @@ private void updateShard(DiscoveryNodes nodes, ShardRouting shardRouting, Shard
595604
ClusterState clusterState) {
596605
final ShardRouting currentRoutingEntry = shard.routingEntry();
597606
assert currentRoutingEntry.isSameAllocation(shardRouting) :
598-
"local shard has a different allocation id but wasn't cleaning by removeShards. "
607+
"local shard has a different allocation id but wasn't cleaned by removeShards. "
599608
+ "cluster state: " + shardRouting + " local: " + currentRoutingEntry;
600609

601610
final long primaryTerm;
@@ -730,7 +739,7 @@ private void failAndRemoveShard(ShardRouting shardRouting, boolean sendShardFail
730739
private void sendFailShard(ShardRouting shardRouting, String message, @Nullable Exception failure, ClusterState state) {
731740
try {
732741
logger.warn(() -> new ParameterizedMessage(
733-
"[{}] marking and sending shard failed due to [{}]", shardRouting.shardId(), message), failure);
742+
"{} marking and sending shard failed due to [{}]", shardRouting.shardId(), message), failure);
734743
failedShardsCache.put(shardRouting.shardId(), shardRouting);
735744
shardStateAction.localShardFailed(shardRouting, message, failure, SHARD_STATE_ACTION_LISTENER, state);
736745
} catch (Exception inner) {
@@ -931,7 +940,7 @@ enum IndexRemovalReason {
931940
DELETED,
932941

933942
/**
934-
* The index have been closed. The index should be removed and all associated resources released. Persistent parts of the index
943+
* The index has been closed. The index should be removed and all associated resources released. Persistent parts of the index
935944
* like the shards files, state and transaction logs are kept around in the case of a disaster recovery.
936945
*/
937946
CLOSED,
@@ -941,7 +950,13 @@ enum IndexRemovalReason {
941950
* Persistent parts of the index like the shards files, state and transaction logs are kept around in the
942951
* case of a disaster recovery.
943952
*/
944-
FAILURE
953+
FAILURE,
954+
955+
/**
956+
* The index has been reopened. The index should be removed and all associated resources released. Persistent parts of the index
957+
* like the shards files, state and transaction logs are kept around in the case of a disaster recovery.
958+
*/
959+
REOPENED,
945960
}
946961
}
947962
}

server/src/main/java/org/elasticsearch/search/SearchService.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -268,7 +268,7 @@ public void afterIndexRemoved(Index index, IndexSettings indexSettings, IndexRem
268268
// it's fine to keep the contexts open if the index is still "alive"
269269
// unfortunately we don't have a clear way to signal today why an index is closed.
270270
// to release memory and let references to the filesystem go etc.
271-
if (reason == IndexRemovalReason.DELETED || reason == IndexRemovalReason.CLOSED) {
271+
if (reason == IndexRemovalReason.DELETED || reason == IndexRemovalReason.CLOSED || reason == IndexRemovalReason.REOPENED) {
272272
freeAllContextForIndex(index);
273273
}
274274

server/src/test/java/org/elasticsearch/cluster/metadata/MetaDataIndexStateServiceTests.java

Lines changed: 28 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import org.elasticsearch.cluster.routing.RoutingTable;
3636
import org.elasticsearch.cluster.routing.ShardRouting;
3737
import org.elasticsearch.cluster.routing.ShardRoutingState;
38+
import org.elasticsearch.cluster.routing.UnassignedInfo;
3839
import org.elasticsearch.cluster.shards.ClusterShardLimitIT;
3940
import org.elasticsearch.common.Nullable;
4041
import org.elasticsearch.common.ValidationException;
@@ -210,7 +211,14 @@ public void testAddIndexClosedBlocks() {
210211
for (Index index : indices) {
211212
assertTrue(blockedIndices.containsKey(index));
212213
if (mixedVersions) {
213-
assertIsClosed(index.getName(), updatedState);
214+
assertThat(updatedState.metaData().index(index).getState(), is(IndexMetaData.State.CLOSE));
215+
assertTrue(updatedState.blocks().hasIndexBlock(index.getName(), MetaDataIndexStateService.INDEX_CLOSED_BLOCK));
216+
assertThat("Index " + index + " must have only 1 block with id=" + MetaDataIndexStateService.INDEX_CLOSED_BLOCK_ID,
217+
updatedState.blocks().indices().getOrDefault(index.getName(), emptySet()).stream().filter(clusterBlock ->
218+
clusterBlock.id() == MetaDataIndexStateService.INDEX_CLOSED_BLOCK_ID).count(), equalTo(1L));
219+
220+
final IndexRoutingTable indexRoutingTable = updatedState.routingTable().index(index);
221+
assertThat(indexRoutingTable, nullValue());
214222
} else {
215223
assertHasBlock(index.getName(), updatedState, blockedIndices.get(index));
216224
}
@@ -346,19 +354,18 @@ private static ClusterState addIndex(final ClusterState currentState,
346354
final ClusterState.Builder clusterStateBuilder = ClusterState.builder(currentState);
347355
clusterStateBuilder.metaData(MetaData.builder(currentState.metaData()).put(indexMetaData, true));
348356

349-
if (state == IndexMetaData.State.OPEN) {
350-
final IndexRoutingTable.Builder indexRoutingTable = IndexRoutingTable.builder(indexMetaData.getIndex());
351-
for (int j = 0; j < indexMetaData.getNumberOfShards(); j++) {
352-
ShardId shardId = new ShardId(indexMetaData.getIndex(), j);
353-
IndexShardRoutingTable.Builder indexShardRoutingBuilder = new IndexShardRoutingTable.Builder(shardId);
354-
indexShardRoutingBuilder.addShard(newShardRouting(shardId, randomAlphaOfLength(10), true, ShardRoutingState.STARTED));
355-
for (int k = 0; k < indexMetaData.getNumberOfReplicas(); k++) {
356-
indexShardRoutingBuilder.addShard(newShardRouting(shardId, randomAlphaOfLength(10), false, ShardRoutingState.STARTED));
357-
}
358-
indexRoutingTable.addIndexShard(indexShardRoutingBuilder.build());
357+
final IndexRoutingTable.Builder indexRoutingTable = IndexRoutingTable.builder(indexMetaData.getIndex());
358+
for (int j = 0; j < indexMetaData.getNumberOfShards(); j++) {
359+
ShardId shardId = new ShardId(indexMetaData.getIndex(), j);
360+
IndexShardRoutingTable.Builder indexShardRoutingBuilder = new IndexShardRoutingTable.Builder(shardId);
361+
indexShardRoutingBuilder.addShard(newShardRouting(shardId, randomAlphaOfLength(10), true, ShardRoutingState.STARTED));
362+
for (int k = 0; k < indexMetaData.getNumberOfReplicas(); k++) {
363+
indexShardRoutingBuilder.addShard(newShardRouting(shardId, randomAlphaOfLength(10), false, ShardRoutingState.STARTED));
359364
}
360-
clusterStateBuilder.routingTable(RoutingTable.builder(currentState.routingTable()).add(indexRoutingTable).build());
365+
indexRoutingTable.addIndexShard(indexShardRoutingBuilder.build());
361366
}
367+
clusterStateBuilder.routingTable(RoutingTable.builder(currentState.routingTable()).add(indexRoutingTable).build());
368+
362369
if (block != null) {
363370
clusterStateBuilder.blocks(ClusterBlocks.builder().blocks(currentState.blocks()).addIndexBlock(index, block));
364371
}
@@ -372,11 +379,19 @@ private static void assertIsOpened(final String indexName, final ClusterState cl
372379

373380
private static void assertIsClosed(final String indexName, final ClusterState clusterState) {
374381
assertThat(clusterState.metaData().index(indexName).getState(), is(IndexMetaData.State.CLOSE));
375-
assertThat(clusterState.routingTable().index(indexName), nullValue());
376382
assertThat(clusterState.blocks().hasIndexBlock(indexName, MetaDataIndexStateService.INDEX_CLOSED_BLOCK), is(true));
377383
assertThat("Index " + indexName + " must have only 1 block with [id=" + MetaDataIndexStateService.INDEX_CLOSED_BLOCK_ID + "]",
378384
clusterState.blocks().indices().getOrDefault(indexName, emptySet()).stream()
379385
.filter(clusterBlock -> clusterBlock.id() == MetaDataIndexStateService.INDEX_CLOSED_BLOCK_ID).count(), equalTo(1L));
386+
387+
final IndexRoutingTable indexRoutingTable = clusterState.routingTable().index(indexName);
388+
assertThat(indexRoutingTable, notNullValue());
389+
390+
for(IndexShardRoutingTable shardRoutingTable : indexRoutingTable) {
391+
assertThat(shardRoutingTable.shards().stream().allMatch(ShardRouting::unassigned), is(true));
392+
assertThat(shardRoutingTable.shards().stream().map(ShardRouting::unassignedInfo).map(UnassignedInfo::getReason)
393+
.allMatch(info -> info == UnassignedInfo.Reason.INDEX_CLOSED), is(true));
394+
}
380395
}
381396

382397
private static void assertHasBlock(final String indexName, final ClusterState clusterState, final ClusterBlock closingBlock) {

server/src/test/java/org/elasticsearch/cluster/routing/UnassignedInfoTests.java

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
import org.elasticsearch.index.Index;
4141
import org.elasticsearch.snapshots.Snapshot;
4242
import org.elasticsearch.snapshots.SnapshotId;
43+
import org.elasticsearch.test.VersionUtils;
4344

4445
import java.io.IOException;
4546
import java.nio.ByteBuffer;
@@ -54,6 +55,7 @@
5455
import static org.hamcrest.Matchers.nullValue;
5556

5657
public class UnassignedInfoTests extends ESAllocationTestCase {
58+
5759
public void testReasonOrdinalOrder() {
5860
UnassignedInfo.Reason[] order = new UnassignedInfo.Reason[]{
5961
UnassignedInfo.Reason.INDEX_CREATED,
@@ -70,7 +72,8 @@ public void testReasonOrdinalOrder() {
7072
UnassignedInfo.Reason.REALLOCATED_REPLICA,
7173
UnassignedInfo.Reason.PRIMARY_FAILED,
7274
UnassignedInfo.Reason.FORCED_EMPTY_PRIMARY,
73-
UnassignedInfo.Reason.MANUAL_ALLOCATION,};
75+
UnassignedInfo.Reason.MANUAL_ALLOCATION,
76+
UnassignedInfo.Reason.INDEX_CLOSED,};
7477
for (int i = 0; i < order.length; i++) {
7578
assertThat(order[i].ordinal(), equalTo(i));
7679
}
@@ -95,6 +98,21 @@ public void testSerialization() throws Exception {
9598
assertThat(read.getNumFailedAllocations(), equalTo(meta.getNumFailedAllocations()));
9699
}
97100

101+
public void testBwcSerialization() throws Exception {
102+
final UnassignedInfo unassignedInfo = new UnassignedInfo(UnassignedInfo.Reason.INDEX_CLOSED, "message");
103+
BytesStreamOutput out = new BytesStreamOutput();
104+
out.setVersion(VersionUtils.randomVersionBetween(random(), Version.V_6_0_0, VersionUtils.getPreviousVersion(Version.V_7_0_0)));
105+
unassignedInfo.writeTo(out);
106+
out.close();
107+
108+
UnassignedInfo read = new UnassignedInfo(out.bytes().streamInput());
109+
assertThat(read.getReason(), equalTo(UnassignedInfo.Reason.REINITIALIZED));
110+
assertThat(read.getUnassignedTimeInMillis(), equalTo(unassignedInfo.getUnassignedTimeInMillis()));
111+
assertThat(read.getMessage(), equalTo(unassignedInfo.getMessage()));
112+
assertThat(read.getDetails(), equalTo(unassignedInfo.getDetails()));
113+
assertThat(read.getNumFailedAllocations(), equalTo(unassignedInfo.getNumFailedAllocations()));
114+
}
115+
98116
public void testIndexCreated() {
99117
MetaData metaData = MetaData.builder()
100118
.put(IndexMetaData.builder("test").settings(settings(Version.CURRENT))

0 commit comments

Comments
 (0)