Skip to content

Commit a323132

Browse files
committed
Create retention leases file during recovery (#39359)
Today we load the shard history retention leases from disk whenever opening the engine, and treat a missing file as an empty set of leases. However in some cases this is inappropriate: we might be restoring from a snapshot (if the target index already exists then there may be leases on disk) or force-allocating a stale primary, and in neither case does it make sense to restore the retention leases from disk. With this change we write an empty retention leases file during recovery, except for the following cases: - During peer recovery the on-disk leases may be accurate and could be needed if the recovery target is made into a primary. - During recovery from an existing store, as long as we are not force-allocating a stale primary. Relates #37165
1 parent 8d2184b commit a323132

File tree

8 files changed

+150
-1
lines changed

8 files changed

+150
-1
lines changed

server/src/main/java/org/elasticsearch/cluster/routing/RecoverySource.java

+14
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,10 @@ public boolean shouldBootstrapNewHistoryUUID() {
9797
return false;
9898
}
9999

100+
public boolean expectEmptyRetentionLeases() {
101+
return true;
102+
}
103+
100104
@Override
101105
public boolean equals(Object o) {
102106
if (this == o) return true;
@@ -181,6 +185,11 @@ public Type getType() {
181185
public String toString() {
182186
return "existing store recovery; bootstrap_history_uuid=" + bootstrapNewHistoryUUID;
183187
}
188+
189+
@Override
190+
public boolean expectEmptyRetentionLeases() {
191+
return bootstrapNewHistoryUUID;
192+
}
184193
}
185194

186195
/**
@@ -317,5 +326,10 @@ public Type getType() {
317326
public String toString() {
318327
return "peer recovery";
319328
}
329+
330+
@Override
331+
public boolean expectEmptyRetentionLeases() {
332+
return false;
333+
}
320334
}
321335
}

server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java

+8
Original file line numberDiff line numberDiff line change
@@ -330,6 +330,9 @@ public synchronized void updateRetentionLeasesOnReplica(final RetentionLeases re
330330
*/
331331
public RetentionLeases loadRetentionLeases(final Path path) throws IOException {
332332
final RetentionLeases retentionLeases = RetentionLeases.FORMAT.loadLatestState(logger, NamedXContentRegistry.EMPTY, path);
333+
334+
// TODO after backporting we expect this never to happen in 8.x, so adjust this to throw an exception instead.
335+
assert Version.CURRENT.major <= 8 : "throw an exception instead of returning EMPTY on null";
333336
if (retentionLeases == null) {
334337
return RetentionLeases.EMPTY;
335338
}
@@ -355,6 +358,11 @@ public void persistRetentionLeases(final Path path) throws WriteStateException {
355358
}
356359
}
357360

361+
public boolean assertRetentionLeasesPersisted(final Path path) throws IOException {
362+
assert RetentionLeases.FORMAT.loadLatestState(logger, NamedXContentRegistry.EMPTY, path) != null;
363+
return true;
364+
}
365+
358366
public static class CheckpointState implements Writeable {
359367

360368
/**

server/src/main/java/org/elasticsearch/index/shard/IndexShard.java

+7
Original file line numberDiff line numberDiff line change
@@ -1434,6 +1434,9 @@ private void innerOpenEngineAndTranslog() throws IOException {
14341434
final long globalCheckpoint = Translog.readGlobalCheckpoint(translogConfig.getTranslogPath(), translogUUID);
14351435
replicationTracker.updateGlobalCheckpointOnReplica(globalCheckpoint, "read from translog checkpoint");
14361436
updateRetentionLeasesOnReplica(loadRetentionLeases());
1437+
assert recoveryState.getRecoverySource().expectEmptyRetentionLeases() == false || getRetentionLeases().leases().isEmpty()
1438+
: "expected empty set of retention leases with recovery source [" + recoveryState.getRecoverySource()
1439+
+ "] but got " + getRetentionLeases();
14371440
trimUnsafeCommits();
14381441
synchronized (mutex) {
14391442
verifyNotClosed();
@@ -2029,6 +2032,10 @@ public void persistRetentionLeases() throws WriteStateException {
20292032
replicationTracker.persistRetentionLeases(path.getShardStatePath());
20302033
}
20312034

2035+
public boolean assertRetentionLeasesPersisted() throws IOException {
2036+
return replicationTracker.assertRetentionLeasesPersisted(path.getShardStatePath());
2037+
}
2038+
20322039
/**
20332040
* Syncs the current retention leases to all replicas.
20342041
*/

server/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java

+10
Original file line numberDiff line numberDiff line change
@@ -401,9 +401,11 @@ private void internalRecoverFromStore(IndexShard indexShard) throws IndexShardRe
401401
final String translogUUID = Translog.createEmptyTranslog(
402402
indexShard.shardPath().resolveTranslog(), localCheckpoint, shardId, indexShard.getPendingPrimaryTerm());
403403
store.associateIndexWithNewTranslog(translogUUID);
404+
writeEmptyRetentionLeasesFile(indexShard);
404405
} else if (indexShouldExists) {
405406
if (recoveryState.getRecoverySource().shouldBootstrapNewHistoryUUID()) {
406407
store.bootstrapNewHistory();
408+
writeEmptyRetentionLeasesFile(indexShard);
407409
}
408410
// since we recover from local, just fill the files and size
409411
try {
@@ -420,6 +422,7 @@ private void internalRecoverFromStore(IndexShard indexShard) throws IndexShardRe
420422
indexShard.shardPath().resolveTranslog(), SequenceNumbers.NO_OPS_PERFORMED, shardId,
421423
indexShard.getPendingPrimaryTerm());
422424
store.associateIndexWithNewTranslog(translogUUID);
425+
writeEmptyRetentionLeasesFile(indexShard);
423426
}
424427
indexShard.openEngineAndRecoverFromTranslog();
425428
indexShard.getEngine().fillSeqNoGaps(indexShard.getPendingPrimaryTerm());
@@ -432,6 +435,12 @@ private void internalRecoverFromStore(IndexShard indexShard) throws IndexShardRe
432435
}
433436
}
434437

438+
private static void writeEmptyRetentionLeasesFile(IndexShard indexShard) throws IOException {
439+
assert indexShard.getRetentionLeases().leases().isEmpty() : indexShard.getRetentionLeases(); // not loaded yet
440+
indexShard.persistRetentionLeases();
441+
assert indexShard.loadRetentionLeases().leases().isEmpty();
442+
}
443+
435444
private void addRecoveredFileDetails(SegmentInfos si, Store store, RecoveryState.Index index) throws IOException {
436445
final Directory directory = store.directory();
437446
for (String name : Lucene.files(si)) {
@@ -471,6 +480,7 @@ private void restore(final IndexShard indexShard, final Repository repository, f
471480
indexShard.shardPath().resolveTranslog(), localCheckpoint, shardId, indexShard.getPendingPrimaryTerm());
472481
store.associateIndexWithNewTranslog(translogUUID);
473482
assert indexShard.shardRouting.primary() : "only primary shards can recover from store";
483+
writeEmptyRetentionLeasesFile(indexShard);
474484
indexShard.openEngineAndRecoverFromTranslog();
475485
indexShard.getEngine().fillSeqNoGaps(indexShard.getPendingPrimaryTerm());
476486
indexShard.finalizeRecovery();

server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java

+9
Original file line numberDiff line numberDiff line change
@@ -414,6 +414,15 @@ public void cleanFiles(int totalTranslogOps, Store.MetadataSnapshot sourceMetaDa
414414
indexShard.shardPath().resolveTranslog(), SequenceNumbers.UNASSIGNED_SEQ_NO, shardId,
415415
indexShard.getPendingPrimaryTerm());
416416
store.associateIndexWithNewTranslog(translogUUID);
417+
418+
if (indexShard.getRetentionLeases().leases().isEmpty()) {
419+
// if empty, may be a fresh IndexShard, so write an empty leases file to disk
420+
indexShard.persistRetentionLeases();
421+
assert indexShard.loadRetentionLeases().leases().isEmpty();
422+
} else {
423+
assert indexShard.assertRetentionLeasesPersisted();
424+
}
425+
417426
} catch (CorruptIndexException | IndexFormatTooNewException | IndexFormatTooOldException ex) {
418427
// this is a fatal exception at this stage.
419428
// this means we transferred files from the remote that have not be checksummed and they are

server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseIT.java

+28
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,11 @@
1919

2020
package org.elasticsearch.index.seqno;
2121

22+
import org.elasticsearch.ElasticsearchException;
2223
import org.elasticsearch.action.ActionListener;
2324
import org.elasticsearch.action.support.master.AcknowledgedResponse;
2425
import org.elasticsearch.action.support.replication.ReplicationResponse;
26+
import org.elasticsearch.cluster.node.DiscoveryNode;
2527
import org.elasticsearch.cluster.routing.ShardRouting;
2628
import org.elasticsearch.common.settings.Setting;
2729
import org.elasticsearch.common.settings.Settings;
@@ -31,6 +33,7 @@
3133
import org.elasticsearch.index.shard.IndexShard;
3234
import org.elasticsearch.index.shard.ShardId;
3335
import org.elasticsearch.indices.IndicesService;
36+
import org.elasticsearch.indices.recovery.PeerRecoveryTargetService;
3437
import org.elasticsearch.plugins.Plugin;
3538
import org.elasticsearch.test.ESIntegTestCase;
3639
import org.elasticsearch.test.transport.MockTransportService;
@@ -45,6 +48,7 @@
4548
import java.util.List;
4649
import java.util.Map;
4750
import java.util.concurrent.CountDownLatch;
51+
import java.util.concurrent.Semaphore;
4852
import java.util.concurrent.TimeUnit;
4953
import java.util.concurrent.atomic.AtomicBoolean;
5054
import java.util.concurrent.atomic.AtomicReference;
@@ -53,6 +57,7 @@
5357
import java.util.stream.Collectors;
5458
import java.util.stream.Stream;
5559

60+
import static org.elasticsearch.indices.recovery.RecoverySettings.INDICES_RECOVERY_RETRY_DELAY_NETWORK_SETTING;
5661
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
5762
import static org.hamcrest.Matchers.anyOf;
5863
import static org.hamcrest.Matchers.contains;
@@ -388,6 +393,29 @@ public void testRetentionLeasesSyncOnRecovery() throws Exception {
388393
currentRetentionLeases.put(id, primary.renewRetentionLease(id, retainingSequenceNumber, source));
389394
}
390395

396+
// Cause some recoveries to fail to ensure that retention leases are handled properly when retrying a recovery
397+
assertAcked(client().admin().cluster().prepareUpdateSettings().setPersistentSettings(Settings.builder()
398+
.put(INDICES_RECOVERY_RETRY_DELAY_NETWORK_SETTING.getKey(), "100ms")));
399+
final Semaphore recoveriesToDisrupt = new Semaphore(scaledRandomIntBetween(0, 4));
400+
final MockTransportService primaryTransportService
401+
= (MockTransportService) internalCluster().getInstance(TransportService.class, primaryShardNodeName);
402+
primaryTransportService.addSendBehavior((connection, requestId, action, request, options) -> {
403+
if (action.equals(PeerRecoveryTargetService.Actions.FINALIZE) && recoveriesToDisrupt.tryAcquire()) {
404+
if (randomBoolean()) {
405+
// return a ConnectTransportException to the START_RECOVERY action
406+
final TransportService replicaTransportService
407+
= internalCluster().getInstance(TransportService.class, connection.getNode().getName());
408+
final DiscoveryNode primaryNode = primaryTransportService.getLocalNode();
409+
replicaTransportService.disconnectFromNode(primaryNode);
410+
replicaTransportService.connectToNode(primaryNode);
411+
} else {
412+
// return an exception to the FINALIZE action
413+
throw new ElasticsearchException("failing recovery for test purposes");
414+
}
415+
}
416+
connection.sendRequest(requestId, action, request, options);
417+
});
418+
391419
// now allow the replicas to be allocated and wait for recovery to finalize
392420
allowNodes("index", 1 + numberOfReplicas);
393421
ensureGreen("index");

server/src/test/java/org/elasticsearch/index/shard/IndexShardRetentionLeaseTests.java

+14
Original file line numberDiff line numberDiff line change
@@ -265,6 +265,20 @@ public void testPersistence() throws IOException {
265265
} finally {
266266
closeShards(recoveredShard);
267267
}
268+
269+
// we should not recover retention leases when force-allocating a stale primary
270+
final IndexShard forceRecoveredShard = reinitShard(
271+
indexShard,
272+
ShardRoutingHelper.initWithSameId(indexShard.routingEntry(),
273+
RecoverySource.ExistingStoreRecoverySource.FORCE_STALE_PRIMARY_INSTANCE));
274+
try {
275+
recoverShardFromStore(forceRecoveredShard);
276+
final RetentionLeases recoveredRetentionLeases = forceRecoveredShard.getEngine().config().retentionLeasesSupplier().get();
277+
assertThat(recoveredRetentionLeases.leases(), empty());
278+
assertThat(recoveredRetentionLeases.version(), equalTo(0L));
279+
} finally {
280+
closeShards(forceRecoveredShard);
281+
}
268282
} finally {
269283
closeShards(indexShard);
270284
}

test/framework/src/main/java/org/elasticsearch/repositories/blobstore/ESBlobStoreRepositoryIntegTestCase.java

+60-1
Original file line numberDiff line numberDiff line change
@@ -23,9 +23,15 @@
2323
import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse;
2424
import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotRequestBuilder;
2525
import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse;
26+
import org.elasticsearch.action.admin.indices.stats.ShardStats;
2627
import org.elasticsearch.action.index.IndexRequestBuilder;
2728
import org.elasticsearch.client.Client;
29+
import org.elasticsearch.cluster.metadata.IndexMetaData;
2830
import org.elasticsearch.common.blobstore.BlobContainer;
31+
import org.elasticsearch.common.settings.Settings;
32+
import org.elasticsearch.index.seqno.RetentionLeaseActions;
33+
import org.elasticsearch.index.seqno.RetentionLeases;
34+
import org.elasticsearch.index.shard.ShardId;
2935
import org.elasticsearch.repositories.IndexId;
3036
import org.elasticsearch.repositories.RepositoriesService;
3137
import org.elasticsearch.repositories.Repository;
@@ -43,6 +49,7 @@
4349
import java.util.concurrent.CountDownLatch;
4450
import java.util.concurrent.ExecutionException;
4551

52+
import static org.elasticsearch.index.seqno.RetentionLeaseActions.RETAIN_ALL;
4653
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
4754
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
4855
import static org.hamcrest.Matchers.equalTo;
@@ -87,8 +94,8 @@ public void testSnapshotAndRestore() throws Exception {
8794
int[] docCounts = new int[indexCount];
8895
String[] indexNames = generateRandomNames(indexCount);
8996
for (int i = 0; i < indexCount; i++) {
90-
logger.info("--> create random index {} with {} records", indexNames[i], docCounts[i]);
9197
docCounts[i] = iterations(10, 1000);
98+
logger.info("--> create random index {} with {} records", indexNames[i], docCounts[i]);
9299
addRandomDocuments(indexNames[i], docCounts[i]);
93100
assertHitCount(client().prepareSearch(indexNames[i]).setSize(0).get(), docCounts[i]);
94101
}
@@ -267,6 +274,58 @@ public void testIndicesDeletedFromRepository() throws Exception {
267274
}
268275
}
269276

277+
public void testRetentionLeasesClearedOnRestore() throws Exception {
278+
final String repoName = randomAsciiName();
279+
logger.info("--> creating repository {}", repoName);
280+
createAndCheckTestRepository(repoName);
281+
282+
final String indexName = randomAsciiName();
283+
final int shardCount = randomIntBetween(1, 5);
284+
assertAcked(client().admin().indices().prepareCreate(indexName).setSettings(
285+
Settings.builder().put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, shardCount)).get());
286+
final ShardId shardId = new ShardId(resolveIndex(indexName), randomIntBetween(0, shardCount - 1));
287+
288+
final int snapshotDocCount = iterations(10, 1000);
289+
logger.info("--> indexing {} docs into {}", snapshotDocCount, indexName);
290+
addRandomDocuments(indexName, snapshotDocCount);
291+
assertHitCount(client().prepareSearch(indexName).setSize(0).get(), snapshotDocCount);
292+
293+
final String leaseId = randomAsciiName();
294+
logger.info("--> adding retention lease with id {} to {}", leaseId, shardId);
295+
client().execute(RetentionLeaseActions.Add.INSTANCE, new RetentionLeaseActions.AddRequest(
296+
shardId, leaseId, RETAIN_ALL, "test")).actionGet();
297+
298+
final ShardStats shardStats = Arrays.stream(client().admin().indices().prepareStats(indexName).get().getShards())
299+
.filter(s -> s.getShardRouting().shardId().equals(shardId)).findFirst().get();
300+
final RetentionLeases retentionLeases = shardStats.getRetentionLeaseStats().retentionLeases();
301+
assertTrue(shardStats + ": " + retentionLeases, retentionLeases.contains(leaseId));
302+
303+
final String snapshotName = randomAsciiName();
304+
logger.info("--> create snapshot {}:{}", repoName, snapshotName);
305+
assertSuccessfulSnapshot(client().admin().cluster().prepareCreateSnapshot(repoName, snapshotName)
306+
.setWaitForCompletion(true).setIndices(indexName));
307+
308+
if (randomBoolean()) {
309+
final int extraDocCount = iterations(10, 1000);
310+
logger.info("--> indexing {} extra docs into {}", extraDocCount, indexName);
311+
addRandomDocuments(indexName, extraDocCount);
312+
}
313+
314+
logger.info("--> close index {}", indexName);
315+
assertAcked(client().admin().indices().prepareClose(indexName));
316+
317+
logger.info("--> restore index {} from snapshot", indexName);
318+
assertSuccessfulRestore(client().admin().cluster().prepareRestoreSnapshot(repoName, snapshotName).setWaitForCompletion(true));
319+
320+
ensureGreen();
321+
assertHitCount(client().prepareSearch(indexName).setSize(0).get(), snapshotDocCount);
322+
323+
final RetentionLeases restoredRetentionLeases = Arrays.stream(client().admin().indices().prepareStats(indexName).get()
324+
.getShards()).filter(s -> s.getShardRouting().shardId().equals(shardId)).findFirst().get()
325+
.getRetentionLeaseStats().retentionLeases();
326+
assertFalse(restoredRetentionLeases.toString() + " has no " + leaseId, restoredRetentionLeases.contains(leaseId));
327+
}
328+
270329
protected void addRandomDocuments(String name, int numDocs) throws ExecutionException, InterruptedException {
271330
IndexRequestBuilder[] indexRequestBuilders = new IndexRequestBuilder[numDocs];
272331
for (int i = 0; i < numDocs; i++) {

0 commit comments

Comments
 (0)