Skip to content

Commit 907bb55

Browse files
authored
Skip local recovery for closed or frozen indices (#44887)
For closed and frozen indices, we should not recover shard locally up to the global checkpoint before performing peer recovery for that copy might be offline when the index was closed/frozen. Relates #43463 Closes #44855
1 parent 7a247e5 commit 907bb55

File tree

8 files changed

+195
-61
lines changed

8 files changed

+195
-61
lines changed

server/src/main/java/org/elasticsearch/index/shard/IndexShard.java

+7
Original file line numberDiff line numberDiff line change
@@ -1400,6 +1400,13 @@ public long recoverLocallyUpToGlobalCheckpoint() {
14001400
recoveryState.getTranslog().totalLocal(0);
14011401
return globalCheckpoint + 1;
14021402
}
1403+
if (indexSettings.getIndexMetaData().getState() == IndexMetaData.State.CLOSE ||
1404+
IndexMetaData.INDEX_BLOCKS_WRITE_SETTING.get(indexSettings.getSettings())) {
1405+
logger.trace("skip local recovery as the index was closed or not allowed to write; safe commit {} global checkpoint {}",
1406+
safeCommit.get(), globalCheckpoint);
1407+
recoveryState.getTranslog().totalLocal(0);
1408+
return safeCommit.get().localCheckpoint + 1;
1409+
}
14031410
try {
14041411
final Engine.TranslogRecoveryRunner translogRecoveryRunner = (engine, snapshot) -> {
14051412
recoveryState.getTranslog().totalLocal(snapshot.totalOperations());

server/src/test/java/org/elasticsearch/index/engine/NoOpEngineRecoveryTests.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,8 @@ public void testRecoverFromNoOp() throws IOException {
4040
indexShard.close("test", true);
4141

4242
final ShardRouting shardRouting = indexShard.routingEntry();
43-
IndexShard primary = reinitShard(indexShard, initWithSameId(shardRouting, ExistingStoreRecoverySource.INSTANCE), NoOpEngine::new);
43+
IndexShard primary = reinitShard(indexShard, initWithSameId(shardRouting, ExistingStoreRecoverySource.INSTANCE),
44+
indexShard.indexSettings().getIndexMetaData(), NoOpEngine::new);
4445
recoverShardFromStore(primary);
4546
assertEquals(primary.seqNoStats().getMaxSeqNo(), primary.getMaxSeqNoOfUpdatesOrDeletes());
4647
assertEquals(nbDocs, primary.docStats().getCount());

server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -4106,7 +4106,7 @@ public void testDoNotTrimCommitsWhenOpenReadOnlyEngine() throws Exception {
41064106
final ShardRouting replicaRouting = shard.routingEntry();
41074107
ShardRouting readonlyShardRouting = newShardRouting(replicaRouting.shardId(), replicaRouting.currentNodeId(), true,
41084108
ShardRoutingState.INITIALIZING, RecoverySource.ExistingStoreRecoverySource.INSTANCE);
4109-
final IndexShard readonlyShard = reinitShard(shard, readonlyShardRouting,
4109+
final IndexShard readonlyShard = reinitShard(shard, readonlyShardRouting, shard.indexSettings.getIndexMetaData(),
41104110
engineConfig -> new ReadOnlyEngine(engineConfig, null, null, true, Function.identity()) {
41114111
@Override
41124112
protected void ensureMaxSeqNoEqualsToGlobalCheckpoint(SeqNoStats seqNoStats) {

server/src/test/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetServiceTests.java

+52-20
Original file line numberDiff line numberDiff line change
@@ -26,15 +26,18 @@
2626
import org.elasticsearch.action.admin.indices.flush.FlushRequest;
2727
import org.elasticsearch.action.index.IndexRequest;
2828
import org.elasticsearch.action.support.PlainActionFuture;
29+
import org.elasticsearch.cluster.metadata.IndexMetaData;
2930
import org.elasticsearch.cluster.node.DiscoveryNode;
3031
import org.elasticsearch.cluster.routing.RecoverySource;
3132
import org.elasticsearch.cluster.routing.ShardRoutingHelper;
32-
import org.elasticsearch.common.CheckedFunction;
3333
import org.elasticsearch.common.Randomness;
3434
import org.elasticsearch.common.UUIDs;
3535
import org.elasticsearch.common.bytes.BytesArray;
36+
import org.elasticsearch.common.settings.Settings;
3637
import org.elasticsearch.common.xcontent.XContentType;
38+
import org.elasticsearch.index.engine.NoOpEngine;
3739
import org.elasticsearch.index.mapper.SourceToParse;
40+
import org.elasticsearch.index.seqno.SeqNoStats;
3841
import org.elasticsearch.index.seqno.SequenceNumbers;
3942
import org.elasticsearch.index.shard.IndexShard;
4043
import org.elasticsearch.index.shard.IndexShardTestCase;
@@ -134,23 +137,24 @@ public void testWriteFileChunksConcurrently() throws Exception {
134137
closeShards(sourceShard, targetShard);
135138
}
136139

137-
public void testPrepareIndexForPeerRecovery() throws Exception {
138-
CheckedFunction<IndexShard, Long, Exception> populateData = shard -> {
139-
List<Long> seqNos = LongStream.range(0, 100).boxed().collect(Collectors.toList());
140-
Randomness.shuffle(seqNos);
141-
for (long seqNo : seqNos) {
142-
shard.applyIndexOperationOnReplica(seqNo, 1, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false, new SourceToParse(
143-
shard.shardId().getIndexName(), "_doc", UUIDs.randomBase64UUID(), new BytesArray("{}"), XContentType.JSON));
144-
if (randomInt(100) < 5) {
145-
shard.flush(new FlushRequest().waitIfOngoing(true));
146-
}
140+
private SeqNoStats populateRandomData(IndexShard shard) throws IOException {
141+
List<Long> seqNos = LongStream.range(0, 100).boxed().collect(Collectors.toList());
142+
Randomness.shuffle(seqNos);
143+
for (long seqNo : seqNos) {
144+
shard.applyIndexOperationOnReplica(seqNo, 1, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false, new SourceToParse(
145+
shard.shardId().getIndexName(), "_doc", UUIDs.randomBase64UUID(), new BytesArray("{}"), XContentType.JSON));
146+
if (randomInt(100) < 5) {
147+
shard.flush(new FlushRequest().waitIfOngoing(true));
147148
}
148-
shard.sync();
149-
long globalCheckpoint = randomLongBetween(SequenceNumbers.NO_OPS_PERFORMED, shard.getLocalCheckpoint());
150-
shard.updateGlobalCheckpointOnReplica(globalCheckpoint, "test");
151-
shard.sync();
152-
return globalCheckpoint;
153-
};
149+
}
150+
shard.sync();
151+
long globalCheckpoint = randomLongBetween(SequenceNumbers.NO_OPS_PERFORMED, shard.getLocalCheckpoint());
152+
shard.updateGlobalCheckpointOnReplica(globalCheckpoint, "test");
153+
shard.sync();
154+
return shard.seqNoStats();
155+
}
156+
157+
public void testPrepareIndexForPeerRecovery() throws Exception {
154158
DiscoveryNode localNode = new DiscoveryNode("foo", buildNewFakeTransportAddress(),
155159
Collections.emptyMap(), Collections.emptySet(), Version.CURRENT);
156160

@@ -166,7 +170,7 @@ public void testPrepareIndexForPeerRecovery() throws Exception {
166170

167171
// good copy
168172
shard = newStartedShard(false);
169-
long globalCheckpoint = populateData.apply(shard);
173+
long globalCheckpoint = populateRandomData(shard).getGlobalCheckpoint();
170174
Optional<SequenceNumbers.CommitInfo> safeCommit = shard.store().findSafeIndexCommit(globalCheckpoint);
171175
assertTrue(safeCommit.isPresent());
172176
int expectedTotalLocal = 0;
@@ -191,7 +195,7 @@ public void testPrepareIndexForPeerRecovery() throws Exception {
191195
// corrupted copy
192196
shard = newStartedShard(false);
193197
if (randomBoolean()) {
194-
populateData.apply(shard);
198+
populateRandomData(shard);
195199
}
196200
shard.store().markStoreCorrupted(new IOException("test"));
197201
replica = reinitShard(shard, ShardRoutingHelper.initWithSameId(shard.routingEntry(),
@@ -206,7 +210,7 @@ public void testPrepareIndexForPeerRecovery() throws Exception {
206210

207211
// copy with truncated translog
208212
shard = newStartedShard(false);
209-
globalCheckpoint = populateData.apply(shard);
213+
globalCheckpoint = populateRandomData(shard).getGlobalCheckpoint();
210214
replica = reinitShard(shard, ShardRoutingHelper.initWithSameId(shard.routingEntry(),
211215
RecoverySource.PeerRecoverySource.INSTANCE));
212216
String translogUUID = Translog.createEmptyTranslog(replica.shardPath().resolveTranslog(), globalCheckpoint,
@@ -226,4 +230,32 @@ public void testPrepareIndexForPeerRecovery() throws Exception {
226230
assertThat(replica.getLastKnownGlobalCheckpoint(), equalTo(UNASSIGNED_SEQ_NO));
227231
closeShards(replica);
228232
}
233+
234+
public void testClosedIndexSkipsLocalRecovery() throws Exception {
235+
DiscoveryNode localNode = new DiscoveryNode("foo", buildNewFakeTransportAddress(),
236+
Collections.emptyMap(), Collections.emptySet(), Version.CURRENT);
237+
IndexShard shard = newStartedShard(false);
238+
long globalCheckpoint = populateRandomData(shard).getGlobalCheckpoint();
239+
Optional<SequenceNumbers.CommitInfo> safeCommit = shard.store().findSafeIndexCommit(globalCheckpoint);
240+
assertTrue(safeCommit.isPresent());
241+
final IndexMetaData indexMetaData;
242+
if (randomBoolean()) {
243+
indexMetaData = IndexMetaData.builder(shard.indexSettings().getIndexMetaData())
244+
.settings(shard.indexSettings().getSettings())
245+
.state(IndexMetaData.State.CLOSE).build();
246+
} else {
247+
indexMetaData = IndexMetaData.builder(shard.indexSettings().getIndexMetaData())
248+
.settings(Settings.builder().put(shard.indexSettings().getSettings())
249+
.put(IndexMetaData.SETTING_BLOCKS_WRITE, true)).build();
250+
}
251+
IndexShard replica = reinitShard(shard, ShardRoutingHelper.initWithSameId(shard.routingEntry(),
252+
RecoverySource.PeerRecoverySource.INSTANCE), indexMetaData, NoOpEngine::new);
253+
replica.markAsRecovering("for testing", new RecoveryState(replica.routingEntry(), localNode, localNode));
254+
replica.prepareForIndexRecovery();
255+
assertThat(replica.recoverLocallyUpToGlobalCheckpoint(), equalTo(safeCommit.get().localCheckpoint + 1));
256+
assertThat(replica.recoveryState().getTranslog().totalLocal(), equalTo(0));
257+
assertThat(replica.recoveryState().getTranslog().recoveredOperations(), equalTo(0));
258+
assertThat(replica.getLastKnownGlobalCheckpoint(), equalTo(UNASSIGNED_SEQ_NO));
259+
closeShards(replica);
260+
}
229261
}

server/src/test/java/org/elasticsearch/indices/state/CloseWhileRelocatingShardsIT.java

-1
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,6 @@ protected int maximumNumberOfShards() {
8686
return 3;
8787
}
8888

89-
@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/44855")
9089
public void testCloseWhileRelocatingShards() throws Exception {
9190
final String[] indices = new String[randomIntBetween(3, 5)];
9291
final Map<String, Long> docsPerIndex = new HashMap<>();

test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java

+6-5
Original file line numberDiff line numberDiff line change
@@ -427,23 +427,24 @@ protected IndexShard reinitShard(IndexShard current, IndexingOperationListener..
427427
* @param listeners new listerns to use for the newly created shard
428428
*/
429429
protected IndexShard reinitShard(IndexShard current, ShardRouting routing, IndexingOperationListener... listeners) throws IOException {
430-
return reinitShard(current, routing, current.engineFactory, listeners);
430+
return reinitShard(current, routing, current.indexSettings.getIndexMetaData(), current.engineFactory, listeners);
431431
}
432432

433433
/**
434434
* Takes an existing shard, closes it and starts a new initialing shard at the same location
435435
*
436-
* @param routing the shard routing to use for the newly created shard.
437-
* @param listeners new listerns to use for the newly created shard
436+
* @param routing the shard routing to use for the newly created shard.
437+
* @param listeners new listerns to use for the newly created shard
438+
* @param indexMetaData the index metadata to use for the newly created shard
438439
* @param engineFactory the engine factory for the new shard
439440
*/
440-
protected IndexShard reinitShard(IndexShard current, ShardRouting routing, EngineFactory engineFactory,
441+
protected IndexShard reinitShard(IndexShard current, ShardRouting routing, IndexMetaData indexMetaData, EngineFactory engineFactory,
441442
IndexingOperationListener... listeners) throws IOException {
442443
closeShards(current);
443444
return newShard(
444445
routing,
445446
current.shardPath(),
446-
current.indexSettings().getIndexMetaData(),
447+
indexMetaData,
447448
null,
448449
null,
449450
engineFactory,

x-pack/plugin/frozen-indices/src/test/java/org/elasticsearch/index/engine/FrozenIndexRecoveryTests.java

+84-33
Original file line numberDiff line numberDiff line change
@@ -3,41 +3,92 @@
33
* or more contributor license agreements. Licensed under the Elastic License;
44
* you may not use this file except in compliance with the Elastic License.
55
*/
6+
67
package org.elasticsearch.index.engine;
78

8-
import org.elasticsearch.cluster.routing.RecoverySource;
9-
import org.elasticsearch.cluster.routing.ShardRouting;
10-
import org.elasticsearch.cluster.routing.ShardRoutingHelper;
9+
import org.elasticsearch.client.Client;
10+
import org.elasticsearch.cluster.metadata.IndexMetaData;
11+
import org.elasticsearch.cluster.node.DiscoveryNode;
1112
import org.elasticsearch.common.settings.Settings;
12-
import org.elasticsearch.index.shard.IndexShard;
13-
import org.elasticsearch.index.shard.IndexShardTestCase;
14-
15-
import java.io.IOException;
16-
17-
import static org.hamcrest.Matchers.equalTo;
18-
19-
public class FrozenIndexRecoveryTests extends IndexShardTestCase {
20-
21-
/**
22-
* Make sure we can recover from a frozen engine
23-
*/
24-
public void testRecoverFromFrozenPrimary() throws IOException {
25-
IndexShard indexShard = newStartedShard(true);
26-
indexDoc(indexShard, "_doc", "1");
27-
indexDoc(indexShard, "_doc", "2");
28-
indexDoc(indexShard, "_doc", "3");
29-
indexShard.close("test", true);
30-
final ShardRouting shardRouting = indexShard.routingEntry();
31-
IndexShard frozenShard = reinitShard(indexShard, ShardRoutingHelper.initWithSameId(shardRouting,
32-
shardRouting.primary() ? RecoverySource.ExistingStoreRecoverySource.INSTANCE : RecoverySource.PeerRecoverySource.INSTANCE
33-
), FrozenEngine::new);
34-
recoverShardFromStore(frozenShard);
35-
assertThat(frozenShard.getMaxSeqNoOfUpdatesOrDeletes(), equalTo(frozenShard.seqNoStats().getMaxSeqNo()));
36-
assertDocCount(frozenShard, 3);
37-
38-
IndexShard replica = newShard(false, Settings.EMPTY, FrozenEngine::new);
39-
recoverReplica(replica, frozenShard, true);
40-
assertDocCount(replica, 3);
41-
closeShards(frozenShard, replica);
13+
import org.elasticsearch.common.util.set.Sets;
14+
import org.elasticsearch.indices.recovery.RecoveryState;
15+
import org.elasticsearch.plugins.Plugin;
16+
import org.elasticsearch.protocol.xpack.frozen.FreezeRequest;
17+
import org.elasticsearch.test.ESIntegTestCase;
18+
import org.elasticsearch.test.InternalTestCluster;
19+
import org.elasticsearch.xpack.core.frozen.action.FreezeIndexAction;
20+
import org.elasticsearch.xpack.frozen.FrozenIndices;
21+
22+
import java.util.ArrayList;
23+
import java.util.Collection;
24+
import java.util.List;
25+
import java.util.stream.Collectors;
26+
import java.util.stream.IntStream;
27+
28+
import static java.util.stream.Collectors.toList;
29+
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
30+
import static org.hamcrest.Matchers.empty;
31+
import static org.hamcrest.Matchers.not;
32+
33+
public class FrozenIndexRecoveryTests extends ESIntegTestCase {
34+
35+
@Override
36+
protected boolean addMockInternalEngine() {
37+
return false;
38+
}
39+
40+
@Override
41+
protected Collection<Class<? extends Plugin>> nodePlugins() {
42+
List<Class<? extends Plugin>> plugins = new ArrayList<>(super.nodePlugins());
43+
plugins.add(FrozenIndices.class);
44+
return plugins;
45+
}
46+
47+
public void testRecoverExistingReplica() throws Exception {
48+
final String indexName = "test-recover-existing-replica";
49+
internalCluster().ensureAtLeastNumDataNodes(2);
50+
List<String> dataNodes = randomSubsetOf(2, Sets.newHashSet(
51+
clusterService().state().nodes().getDataNodes().valuesIt()).stream().map(DiscoveryNode::getName).collect(Collectors.toSet()));
52+
createIndex(indexName, Settings.builder()
53+
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
54+
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1)
55+
.put("index.routing.allocation.include._name", String.join(",", dataNodes))
56+
.build());
57+
indexRandom(randomBoolean(), randomBoolean(), randomBoolean(), IntStream.range(0, randomIntBetween(0, 50))
58+
.mapToObj(n -> client().prepareIndex(indexName, "_doc").setSource("num", n)).collect(toList()));
59+
ensureGreen(indexName);
60+
if (randomBoolean()) {
61+
client().admin().indices().prepareFlush(indexName).get();
62+
} else {
63+
client().admin().indices().prepareSyncedFlush(indexName).get();
64+
}
65+
// index more documents while one shard copy is offline
66+
internalCluster().restartNode(dataNodes.get(1), new InternalTestCluster.RestartCallback() {
67+
@Override
68+
public Settings onNodeStopped(String nodeName) throws Exception {
69+
Client client = client(dataNodes.get(0));
70+
int moreDocs = randomIntBetween(1, 50);
71+
for (int i = 0; i < moreDocs; i++) {
72+
client.prepareIndex(indexName, "_doc").setSource("num", i).get();
73+
}
74+
assertAcked(client().execute(FreezeIndexAction.INSTANCE, new FreezeRequest(indexName)).actionGet());
75+
return super.onNodeStopped(nodeName);
76+
}
77+
});
78+
ensureGreen(indexName);
79+
internalCluster().assertSameDocIdsOnShards();
80+
for (RecoveryState recovery : client().admin().indices().prepareRecoveries(indexName).get().shardRecoveryStates().get(indexName)) {
81+
if (recovery.getPrimary() == false) {
82+
assertThat(recovery.getIndex().fileDetails(), not(empty()));
83+
}
84+
}
85+
internalCluster().fullRestart();
86+
ensureGreen(indexName);
87+
internalCluster().assertSameDocIdsOnShards();
88+
for (RecoveryState recovery : client().admin().indices().prepareRecoveries(indexName).get().shardRecoveryStates().get(indexName)) {
89+
if (recovery.getPrimary() == false) {
90+
assertThat(recovery.getIndex().fileDetails(), empty());
91+
}
92+
}
4293
}
4394
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License;
4+
* you may not use this file except in compliance with the Elastic License.
5+
*/
6+
package org.elasticsearch.index.engine;
7+
8+
import org.elasticsearch.cluster.routing.RecoverySource;
9+
import org.elasticsearch.cluster.routing.ShardRouting;
10+
import org.elasticsearch.cluster.routing.ShardRoutingHelper;
11+
import org.elasticsearch.common.settings.Settings;
12+
import org.elasticsearch.index.shard.IndexShard;
13+
import org.elasticsearch.index.shard.IndexShardTestCase;
14+
15+
import java.io.IOException;
16+
17+
import static org.hamcrest.Matchers.equalTo;
18+
19+
public class FrozenIndexShardTests extends IndexShardTestCase {
20+
21+
/**
22+
* Make sure we can recover from a frozen engine
23+
*/
24+
public void testRecoverFromFrozenPrimary() throws IOException {
25+
IndexShard indexShard = newStartedShard(true);
26+
indexDoc(indexShard, "_doc", "1");
27+
indexDoc(indexShard, "_doc", "2");
28+
indexDoc(indexShard, "_doc", "3");
29+
indexShard.close("test", true);
30+
final ShardRouting shardRouting = indexShard.routingEntry();
31+
IndexShard frozenShard = reinitShard(indexShard, ShardRoutingHelper.initWithSameId(shardRouting,
32+
shardRouting.primary() ? RecoverySource.ExistingStoreRecoverySource.INSTANCE : RecoverySource.PeerRecoverySource.INSTANCE
33+
), indexShard.indexSettings().getIndexMetaData(), FrozenEngine::new);
34+
recoverShardFromStore(frozenShard);
35+
assertThat(frozenShard.getMaxSeqNoOfUpdatesOrDeletes(), equalTo(frozenShard.seqNoStats().getMaxSeqNo()));
36+
assertDocCount(frozenShard, 3);
37+
38+
IndexShard replica = newShard(false, Settings.EMPTY, FrozenEngine::new);
39+
recoverReplica(replica, frozenShard, true);
40+
assertDocCount(replica, 3);
41+
closeShards(frozenShard, replica);
42+
}
43+
}

0 commit comments

Comments
 (0)