Skip to content

Commit bf910e9

Browse files
authored
Fix recovery stage transition with sync_id (#57754)
If the recovery source is on an old node (before 7.2), then the recovery target won't have the safe commit after phase1 because the recovery source does not send the global checkpoint in the clean_files step. And if the recovery fails and retries, then the recovery stage won't transition properly. If a sync_id is used in peer recovery, then the clean_files step won't be executed to move the stage to TRANSLOG. This issue was addressed in #57187, but not forward-ported to 8.0. Closes #57708
1 parent cdfb528 commit bf910e9

File tree

7 files changed

+86
-29
lines changed

7 files changed

+86
-29
lines changed

qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/RecoveryIT.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -278,6 +278,9 @@ public void testRelocationWithConcurrentIndexing() throws Exception {
278278
default:
279279
throw new IllegalStateException("unknown type " + CLUSTER_TYPE);
280280
}
281+
if (randomBoolean()) {
282+
syncedFlush(index);
283+
}
281284
}
282285

283286
public void testRecovery() throws Exception {

server/src/internalClusterTest/java/org/elasticsearch/gateway/ReplicaShardAllocatorSyncIdIT.java

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
import org.elasticsearch.index.translog.Translog;
4040
import org.elasticsearch.indices.IndicesService;
4141
import org.elasticsearch.indices.recovery.PeerRecoveryTargetService;
42+
import org.elasticsearch.indices.recovery.RecoveryCleanFilesRequest;
4243
import org.elasticsearch.indices.recovery.RecoveryState;
4344
import org.elasticsearch.plugins.EnginePlugin;
4445
import org.elasticsearch.plugins.Plugin;
@@ -57,6 +58,7 @@
5758
import java.util.Optional;
5859
import java.util.Set;
5960
import java.util.concurrent.CountDownLatch;
61+
import java.util.concurrent.Semaphore;
6062
import java.util.concurrent.atomic.AtomicBoolean;
6163
import java.util.stream.Collectors;
6264
import java.util.stream.IntStream;
@@ -243,6 +245,54 @@ public void testFullClusterRestartPerformNoopRecovery() throws Exception {
243245
assertNoOpRecoveries(indexName);
244246
}
245247

248+
/**
249+
* If the recovery source is on an old node (before <pre>{@link org.elasticsearch.Version#V_7_2_0}</pre>) then the recovery target
250+
* won't have the safe commit after phase1 because the recovery source does not send the global checkpoint in the clean_files
251+
* step. And if the recovery fails and retries, then the recovery stage might not transition properly. This test simulates
252+
* this behavior by changing the global checkpoint in phase1 to unassigned.
253+
*/
254+
public void testSimulateRecoverySourceOnOldNode() throws Exception {
255+
internalCluster().startMasterOnlyNode();
256+
String source = internalCluster().startDataOnlyNode();
257+
String indexName = "test";
258+
assertAcked(client().admin().indices().prepareCreate(indexName).setSettings(
259+
Settings.builder()
260+
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
261+
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)));
262+
ensureGreen(indexName);
263+
if (randomBoolean()) {
264+
indexRandom(randomBoolean(), randomBoolean(), randomBoolean(), IntStream.range(0, between(200, 500))
265+
.mapToObj(n -> client().prepareIndex(indexName).setSource("f", "v")).collect(Collectors.toList()));
266+
}
267+
if (randomBoolean()) {
268+
client().admin().indices().prepareFlush(indexName).get();
269+
}
270+
if (randomBoolean()) {
271+
syncFlush(indexName);
272+
}
273+
internalCluster().startDataOnlyNode();
274+
MockTransportService transportService = (MockTransportService) internalCluster().getInstance(TransportService.class, source);
275+
Semaphore failRecovery = new Semaphore(1);
276+
transportService.addSendBehavior((connection, requestId, action, request, options) -> {
277+
if (action.equals(PeerRecoveryTargetService.Actions.CLEAN_FILES)) {
278+
RecoveryCleanFilesRequest cleanFilesRequest = (RecoveryCleanFilesRequest) request;
279+
request = new RecoveryCleanFilesRequest(cleanFilesRequest.recoveryId(),
280+
cleanFilesRequest.requestSeqNo(), cleanFilesRequest.shardId(), cleanFilesRequest.sourceMetaSnapshot(),
281+
cleanFilesRequest.totalTranslogOps(), SequenceNumbers.UNASSIGNED_SEQ_NO);
282+
}
283+
if (action.equals(PeerRecoveryTargetService.Actions.FINALIZE)) {
284+
if (failRecovery.tryAcquire()) {
285+
throw new IllegalStateException("simulated");
286+
}
287+
}
288+
connection.sendRequest(requestId, action, request, options);
289+
});
290+
assertAcked(client().admin().indices().prepareUpdateSettings()
291+
.setIndices(indexName).setSettings(Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1).build()));
292+
ensureGreen(indexName);
293+
transportService.clearAllRules();
294+
}
295+
246296
private void assertNoOpRecoveries(String indexName) {
247297
for (RecoveryState recovery : client().admin().indices().prepareRecoveries(indexName).get().shardRecoveryStates().get(indexName)) {
248298
if (recovery.getPrimary() == false) {

server/src/main/java/org/elasticsearch/index/shard/IndexShard.java

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1380,7 +1380,7 @@ public long recoverLocallyUpToGlobalCheckpoint() {
13801380
if (state != IndexShardState.RECOVERING) {
13811381
throw new IndexShardNotRecoveringException(shardId, state);
13821382
}
1383-
assert recoveryState.getStage() == RecoveryState.Stage.INDEX : "unexpected recovery stage [" + recoveryState.getStage() + "]";
1383+
recoveryState.validateCurrentStage(RecoveryState.Stage.INDEX);
13841384
assert routingEntry().recoverySource().getType() == RecoverySource.Type.PEER : "not a peer recovery [" + routingEntry() + "]";
13851385
final Optional<SequenceNumbers.CommitInfo> safeCommit;
13861386
final long globalCheckpoint;
@@ -1395,14 +1395,14 @@ public long recoverLocallyUpToGlobalCheckpoint() {
13951395
logger.debug("skip local recovery as failed to find the safe commit", e);
13961396
return UNASSIGNED_SEQ_NO;
13971397
}
1398-
if (safeCommit.isPresent() == false) {
1399-
logger.trace("skip local recovery as no safe commit found");
1400-
return UNASSIGNED_SEQ_NO;
1401-
}
1402-
assert safeCommit.get().localCheckpoint <= globalCheckpoint : safeCommit.get().localCheckpoint + " > " + globalCheckpoint;
14031398
try {
14041399
maybeCheckIndex(); // check index here and won't do it again if ops-based recovery occurs
14051400
recoveryState.setStage(RecoveryState.Stage.TRANSLOG);
1401+
if (safeCommit.isPresent() == false) {
1402+
logger.trace("skip local recovery as no safe commit found");
1403+
return UNASSIGNED_SEQ_NO;
1404+
}
1405+
assert safeCommit.get().localCheckpoint <= globalCheckpoint : safeCommit.get().localCheckpoint + " > " + globalCheckpoint;
14061406
if (safeCommit.get().localCheckpoint == globalCheckpoint) {
14071407
logger.trace("skip local recovery as the safe commit is up to date; safe commit {} global checkpoint {}",
14081408
safeCommit.get(), globalCheckpoint);
@@ -1561,7 +1561,7 @@ private void loadGlobalCheckpointToReplicationTracker() throws IOException {
15611561
* Operations from the translog will be replayed to bring lucene up to date.
15621562
**/
15631563
public void openEngineAndRecoverFromTranslog() throws IOException {
1564-
assert recoveryState.getStage() == RecoveryState.Stage.INDEX : "unexpected recovery stage [" + recoveryState.getStage() + "]";
1564+
recoveryState.validateCurrentStage(RecoveryState.Stage.INDEX);
15651565
maybeCheckIndex();
15661566
recoveryState.setStage(RecoveryState.Stage.TRANSLOG);
15671567
final RecoveryState.Translog translogRecoveryStats = recoveryState.getTranslog();
@@ -1582,7 +1582,7 @@ public void openEngineAndRecoverFromTranslog() throws IOException {
15821582
*/
15831583
public void openEngineAndSkipTranslogRecovery() throws IOException {
15841584
assert routingEntry().recoverySource().getType() == RecoverySource.Type.PEER : "not a peer recovery [" + routingEntry() + "]";
1585-
assert recoveryState.getStage() == RecoveryState.Stage.TRANSLOG : "unexpected recovery stage [" + recoveryState.getStage() + "]";
1585+
recoveryState.validateCurrentStage(RecoveryState.Stage.TRANSLOG);
15861586
loadGlobalCheckpointToReplicationTracker();
15871587
innerOpenEngineAndTranslog(replicationTracker);
15881588
getEngine().skipTranslogRecovery();
@@ -1617,7 +1617,7 @@ private void innerOpenEngineAndTranslog(LongSupplier globalCheckpointSupplier) t
16171617
// which settings changes could possibly have happened, so here we forcefully push any config changes to the new engine.
16181618
onSettingsChanged();
16191619
assert assertSequenceNumbersInCommit();
1620-
assert recoveryState.getStage() == RecoveryState.Stage.TRANSLOG : "TRANSLOG stage expected but was: " + recoveryState.getStage();
1620+
recoveryState.validateCurrentStage(RecoveryState.Stage.TRANSLOG);
16211621
}
16221622

16231623
private boolean assertSequenceNumbersInCommit() throws IOException {

server/src/main/java/org/elasticsearch/indices/recovery/RecoveryCleanFilesRequest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ public class RecoveryCleanFilesRequest extends RecoveryTransportRequest {
3636
private final int totalTranslogOps;
3737
private final long globalCheckpoint;
3838

39-
RecoveryCleanFilesRequest(long recoveryId, long requestSeqNo, ShardId shardId, Store.MetadataSnapshot snapshotFiles,
39+
public RecoveryCleanFilesRequest(long recoveryId, long requestSeqNo, ShardId shardId, Store.MetadataSnapshot snapshotFiles,
4040
int totalTranslogOps, long globalCheckpoint) {
4141
super(requestSeqNo);
4242
this.recoveryId = recoveryId;

server/src/main/java/org/elasticsearch/indices/recovery/RecoveryState.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -170,12 +170,20 @@ public synchronized Stage getStage() {
170170

171171
private void validateAndSetStage(Stage expected, Stage next) {
172172
if (stage != expected) {
173+
assert false : "can't move recovery to stage [" + next + "]. current stage: [" + stage + "] (expected [" + expected + "])";
173174
throw new IllegalStateException("can't move recovery to stage [" + next + "]. current stage: ["
174175
+ stage + "] (expected [" + expected + "])");
175176
}
176177
stage = next;
177178
}
178179

180+
public synchronized void validateCurrentStage(Stage expected) {
181+
if (stage != expected) {
182+
assert false : "expected stage [" + expected + "]; but current stage is [" + stage + "]";
183+
throw new IllegalStateException("expected stage [" + expected + "] but current stage is [" + stage + "]");
184+
}
185+
}
186+
179187
// synchronized is strictly speaking not needed (this is called by a single thread), but just to be safe
180188
public synchronized RecoveryState setStage(Stage stage) {
181189
switch (stage) {

server/src/test/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetServiceTests.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -216,9 +216,10 @@ public void testPrepareIndexForPeerRecovery() throws Exception {
216216

217217
// copy with truncated translog
218218
shard = newStartedShard(false);
219-
globalCheckpoint = populateRandomData(shard).getGlobalCheckpoint();
219+
SeqNoStats seqNoStats = populateRandomData(shard);
220220
replica = reinitShard(shard, ShardRoutingHelper.initWithSameId(shard.routingEntry(),
221221
RecoverySource.PeerRecoverySource.INSTANCE));
222+
globalCheckpoint = randomFrom(UNASSIGNED_SEQ_NO, seqNoStats.getMaxSeqNo());
222223
String translogUUID = Translog.createEmptyTranslog(replica.shardPath().resolveTranslog(), globalCheckpoint,
223224
replica.shardId(), replica.getPendingPrimaryTerm());
224225
replica.store().associateIndexWithNewTranslog(translogUUID);
@@ -232,6 +233,7 @@ public void testPrepareIndexForPeerRecovery() throws Exception {
232233
assertThat(replica.recoverLocallyUpToGlobalCheckpoint(), equalTo(UNASSIGNED_SEQ_NO));
233234
assertThat(replica.recoveryState().getTranslog().totalLocal(), equalTo(RecoveryState.Translog.UNKNOWN));
234235
}
236+
assertThat(replica.recoveryState().getStage(), equalTo(RecoveryState.Stage.TRANSLOG));
235237
assertThat(replica.recoveryState().getTranslog().recoveredOperations(), equalTo(0));
236238
assertThat(replica.getLastKnownGlobalCheckpoint(), equalTo(UNASSIGNED_SEQ_NO));
237239
closeShards(replica);

server/src/test/java/org/elasticsearch/indices/recovery/RecoveryTargetTests.java

Lines changed: 12 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@
2424
import org.elasticsearch.cluster.routing.ShardRouting;
2525
import org.elasticsearch.cluster.routing.ShardRoutingState;
2626
import org.elasticsearch.cluster.routing.TestShardRouting;
27-
import org.elasticsearch.common.Strings;
2827
import org.elasticsearch.common.io.stream.BytesStreamOutput;
2928
import org.elasticsearch.common.io.stream.StreamInput;
3029
import org.elasticsearch.common.io.stream.Writeable;
@@ -55,6 +54,7 @@
5554
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
5655
import static org.hamcrest.Matchers.lessThan;
5756
import static org.hamcrest.Matchers.lessThanOrEqualTo;
57+
import static org.hamcrest.Matchers.startsWith;
5858

5959
public class RecoveryTargetTests extends ESTestCase {
6060
abstract class Streamer<T extends Writeable> extends Thread {
@@ -336,31 +336,25 @@ Index createObj(StreamInput in) throws IOException {
336336
public void testStageSequenceEnforcement() {
337337
final DiscoveryNode discoveryNode = new DiscoveryNode("1", buildNewFakeTransportAddress(), emptyMap(), emptySet(),
338338
Version.CURRENT);
339-
Stage[] stages = Stage.values();
340-
int i = randomIntBetween(0, stages.length - 1);
341-
int j;
342-
do {
343-
j = randomIntBetween(0, stages.length - 1);
344-
} while (j == i);
345-
Stage t = stages[i];
346-
stages[i] = stages[j];
347-
stages[j] = t;
348-
try {
339+
final AssertionError error = expectThrows(AssertionError.class, () -> {
340+
Stage[] stages = Stage.values();
341+
int i = randomIntBetween(0, stages.length - 1);
342+
int j = randomValueOtherThan(i, () -> randomIntBetween(0, stages.length - 1));
343+
Stage t = stages[i];
344+
stages[i] = stages[j];
345+
stages[j] = t;
349346
ShardRouting shardRouting = TestShardRouting.newShardRouting(new ShardId("bla", "_na_", 0), discoveryNode.getId(),
350347
randomBoolean(), ShardRoutingState.INITIALIZING);
351348
RecoveryState state = new RecoveryState(shardRouting, discoveryNode,
352349
shardRouting.recoverySource().getType() == RecoverySource.Type.PEER ? discoveryNode : null);
353350
for (Stage stage : stages) {
354351
state.setStage(stage);
355352
}
356-
fail("succeeded in performing the illegal sequence [" + Strings.arrayToCommaDelimitedString(stages) + "]");
357-
} catch (IllegalStateException e) {
358-
// cool
359-
}
360-
353+
});
354+
assertThat(error.getMessage(), startsWith("can't move recovery to stage"));
361355
// but reset should be always possible.
362-
stages = Stage.values();
363-
i = randomIntBetween(1, stages.length - 1);
356+
Stage[] stages = Stage.values();
357+
int i = randomIntBetween(1, stages.length - 1);
364358
ArrayList<Stage> list = new ArrayList<>(Arrays.asList(Arrays.copyOfRange(stages, 0, i)));
365359
list.addAll(Arrays.asList(stages));
366360
ShardRouting shardRouting = TestShardRouting.newShardRouting(new ShardId("bla", "_na_", 0), discoveryNode.getId(),

0 commit comments

Comments
 (0)