Skip to content

Commit 389c625

Browse files
authored
Remove PRRLs before performing file-based recovery (#43928)
If the primary performs a file-based recovery to a node that has (or recently had) a copy of the shard then it is possible that the persisted global checkpoint of the new copy is behind that of the old copy since file-based recoveries are somewhat destructive operations. Today we leave that node's PRRL in place during the recovery with the expectation that it can be used by the new copy. However this isn't the case if the new copy needs more history to be retained, because retention leases may only advance and never retreat. This commit addresses this by removing any existing PRRL during a file-based recovery: since we are performing a file-based recovery we have already determined that there isn't enough history for an ops-based recovery, so there is little point in keeping the old lease in place. Caught by [a failure of `RecoveryWhileUnderLoadIT.testRecoverWhileRelocating`](https://scans.gradle.com/s/wxccfrtfgjj3g/console-log?task=:server:integTest#L14) Relates #41536
1 parent 291ff8d commit 389c625

File tree

4 files changed

+95
-6
lines changed

4 files changed

+95
-6
lines changed

server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java

+16-3
Original file line numberDiff line numberDiff line change
@@ -437,6 +437,10 @@ public void addPeerRecoveryRetentionLease(String nodeId, long globalCheckpoint,
437437
addRetentionLease(getPeerRecoveryRetentionLeaseId(nodeId), globalCheckpoint + 1, PEER_RECOVERY_RETENTION_LEASE_SOURCE, listener);
438438
}
439439

440+
public void removePeerRecoveryRetentionLease(String nodeId, ActionListener<ReplicationResponse> listener) {
441+
removeRetentionLease(getPeerRecoveryRetentionLeaseId(nodeId), listener);
442+
}
443+
440444
/**
441445
* Source for peer recovery retention leases; see {@link ReplicationTracker#addPeerRecoveryRetentionLease}.
442446
*/
@@ -498,9 +502,18 @@ public synchronized void renewPeerRecoveryRetentionLeases() {
498502
final RetentionLease retentionLease = retentionLeases.get(getPeerRecoveryRetentionLeaseId(shardRouting));
499503
if (retentionLease != null) {
500504
final CheckpointState checkpointState = checkpoints.get(shardRouting.allocationId().getId());
501-
renewRetentionLease(getPeerRecoveryRetentionLeaseId(shardRouting),
502-
Math.max(0L, checkpointState.globalCheckpoint + 1L),
503-
PEER_RECOVERY_RETENTION_LEASE_SOURCE);
505+
final long newRetainedSequenceNumber = Math.max(0L, checkpointState.globalCheckpoint + 1L);
506+
if (retentionLease.retainingSequenceNumber() <= newRetainedSequenceNumber) {
507+
renewRetentionLease(getPeerRecoveryRetentionLeaseId(shardRouting), newRetainedSequenceNumber,
508+
PEER_RECOVERY_RETENTION_LEASE_SOURCE);
509+
} else {
510+
// the retention lease is tied to the node, not the shard copy, so it's possible a copy was removed and now
511+
// we are in the process of recovering it again. The recovery process will fix the lease before initiating
512+
// tracking on this copy:
513+
assert checkpointState.tracked == false
514+
&& checkpointState.globalCheckpoint == SequenceNumbers.UNASSIGNED_SEQ_NO :
515+
"cannot renew " + retentionLease + " according to " + checkpointState + " for " + shardRouting;
516+
}
504517
}
505518
}
506519
}

server/src/main/java/org/elasticsearch/index/shard/IndexShard.java

+5
Original file line numberDiff line numberDiff line change
@@ -2503,6 +2503,11 @@ public void addPeerRecoveryRetentionLease(String nodeId, long globalCheckpoint,
25032503
replicationTracker.addPeerRecoveryRetentionLease(nodeId, globalCheckpoint, listener);
25042504
}
25052505

2506+
public void removePeerRecoveryRetentionLease(String nodeId, ActionListener<ReplicationResponse> listener) {
2507+
assert assertPrimaryMode();
2508+
replicationTracker.removePeerRecoveryRetentionLease(nodeId, listener);
2509+
}
2510+
25062511
class ShardEventListener implements Engine.EventListener {
25072512
private final CopyOnWriteArrayList<Consumer<ShardFailure>> delegates = new CopyOnWriteArrayList<>();
25082513

server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java

+25-1
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@
5353
import org.elasticsearch.index.engine.RecoveryEngineException;
5454
import org.elasticsearch.index.seqno.LocalCheckpointTracker;
5555
import org.elasticsearch.index.seqno.RetentionLeaseAlreadyExistsException;
56+
import org.elasticsearch.index.seqno.RetentionLeaseNotFoundException;
5657
import org.elasticsearch.index.seqno.RetentionLeases;
5758
import org.elasticsearch.index.seqno.SequenceNumbers;
5859
import org.elasticsearch.index.shard.IndexShard;
@@ -196,7 +197,30 @@ public void recoverToTarget(ActionListener<RecoveryResponse> listener) {
196197
logger.warn("releasing snapshot caused exception", ex);
197198
}
198199
});
199-
phase1(safeCommitRef.getIndexCommit(), shard.getLastKnownGlobalCheckpoint(), () -> estimateNumOps, sendFileStep);
200+
201+
final StepListener<ReplicationResponse> deleteRetentionLeaseStep = new StepListener<>();
202+
if (shard.indexSettings().isSoftDeleteEnabled()
203+
&& shard.indexSettings().getIndexMetaData().getState() != IndexMetaData.State.CLOSE) {
204+
runUnderPrimaryPermit(() -> {
205+
try {
206+
// If the target previously had a copy of this shard then a file-based recovery might move its global
207+
// checkpoint backwards. We must therefore remove any existing retention lease so that we can create a
208+
// new one later on in the recovery.
209+
shard.removePeerRecoveryRetentionLease(request.targetNode().getId(), deleteRetentionLeaseStep);
210+
} catch (RetentionLeaseNotFoundException e) {
211+
logger.debug("no peer-recovery retention lease for " + request.targetAllocationId());
212+
deleteRetentionLeaseStep.onResponse(null);
213+
}
214+
}, shardId + " removing retention leaes for [" + request.targetAllocationId() + "]",
215+
shard, cancellableThreads, logger);
216+
} else {
217+
deleteRetentionLeaseStep.onResponse(null);
218+
}
219+
220+
deleteRetentionLeaseStep.whenComplete(ignored -> {
221+
phase1(safeCommitRef.getIndexCommit(), shard.getLastKnownGlobalCheckpoint(), () -> estimateNumOps, sendFileStep);
222+
}, onFailure);
223+
200224
} catch (final Exception e) {
201225
throw new RecoveryEngineException(shard.shardId(), 1, "sendFileStep failed", e);
202226
}

server/src/test/java/org/elasticsearch/indices/recovery/IndexRecoveryIT.java

+49-2
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@
4949
import org.elasticsearch.common.unit.TimeValue;
5050
import org.elasticsearch.common.xcontent.XContentType;
5151
import org.elasticsearch.index.Index;
52+
import org.elasticsearch.index.IndexService;
5253
import org.elasticsearch.index.analysis.AbstractTokenFilterFactory;
5354
import org.elasticsearch.index.analysis.TokenFilterFactory;
5455
import org.elasticsearch.index.mapper.MapperParsingException;
@@ -70,6 +71,7 @@
7071
import org.elasticsearch.test.ESIntegTestCase;
7172
import org.elasticsearch.test.ESIntegTestCase.ClusterScope;
7273
import org.elasticsearch.test.ESIntegTestCase.Scope;
74+
import org.elasticsearch.test.InternalSettingsPlugin;
7375
import org.elasticsearch.test.InternalTestCluster;
7476
import org.elasticsearch.test.junit.annotations.TestLogging;
7577
import org.elasticsearch.test.store.MockFSIndexStore;
@@ -127,8 +129,12 @@ public class IndexRecoveryIT extends ESIntegTestCase {
127129

128130
@Override
129131
protected Collection<Class<? extends Plugin>> nodePlugins() {
130-
return Arrays.asList(MockTransportService.TestPlugin.class, MockFSIndexStore.TestPlugin.class,
131-
RecoverySettingsChunkSizePlugin.class, TestAnalysisPlugin.class);
132+
return Arrays.asList(
133+
MockTransportService.TestPlugin.class,
134+
MockFSIndexStore.TestPlugin.class,
135+
RecoverySettingsChunkSizePlugin.class,
136+
TestAnalysisPlugin.class,
137+
InternalSettingsPlugin.class);
132138
}
133139

134140
@After
@@ -1015,4 +1021,45 @@ public TokenStream create(TokenStream tokenStream) {
10151021
});
10161022
}
10171023
}
1024+
1025+
public void testRepeatedRecovery() throws Exception {
1026+
internalCluster().ensureAtLeastNumDataNodes(2);
1027+
1028+
// Ensures that you can remove a replica and then add it back again without any ill effects, even if it's allocated back to the
1029+
// node that held it previously, in case that node hasn't completely cleared it up.
1030+
1031+
final String indexName = "test-index";
1032+
createIndex(indexName, Settings.builder()
1033+
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1)
1034+
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, randomIntBetween(1, 6))
1035+
.put(IndexService.RETENTION_LEASE_SYNC_INTERVAL_SETTING.getKey(), "200ms")
1036+
.build());
1037+
indexRandom(randomBoolean(), false, randomBoolean(), IntStream.range(0, randomIntBetween(0, 10))
1038+
.mapToObj(n -> client().prepareIndex(indexName, "_doc").setSource("num", n)).collect(toList()));
1039+
1040+
assertThat(client().admin().indices().prepareFlush(indexName).get().getFailedShards(), equalTo(0));
1041+
1042+
assertBusy(() -> {
1043+
final ShardStats[] shardsStats = client().admin().indices().prepareStats(indexName).get().getIndex(indexName).getShards();
1044+
for (final ShardStats shardStats : shardsStats) {
1045+
final long maxSeqNo = shardStats.getSeqNoStats().getMaxSeqNo();
1046+
assertTrue(shardStats.getRetentionLeaseStats().retentionLeases().leases().stream()
1047+
.allMatch(retentionLease -> retentionLease.retainingSequenceNumber() == maxSeqNo + 1));
1048+
}
1049+
});
1050+
1051+
logger.info("--> remove replicas");
1052+
assertAcked(client().admin().indices().prepareUpdateSettings(indexName)
1053+
.setSettings(Settings.builder().put("index.number_of_replicas", 0)));
1054+
ensureGreen(indexName);
1055+
1056+
logger.info("--> index more documents");
1057+
indexRandom(randomBoolean(), false, randomBoolean(), IntStream.range(0, randomIntBetween(0, 10))
1058+
.mapToObj(n -> client().prepareIndex(indexName, "_doc").setSource("num", n)).collect(toList()));
1059+
1060+
logger.info("--> add replicas again");
1061+
assertAcked(client().admin().indices().prepareUpdateSettings(indexName)
1062+
.setSettings(Settings.builder().put("index.number_of_replicas", 1)));
1063+
ensureGreen(indexName);
1064+
}
10181065
}

0 commit comments

Comments
 (0)