Skip to content

Commit 33ae034

Browse files
authored
Create retention leases file during recovery (#39359) (#40082)
Today we load the shard history retention leases from disk whenever opening the engine, and treat a missing file as an empty set of leases. However in some cases this is inappropriate: we might be restoring from a snapshot (if the target index already exists then there may be leases on disk) or force-allocating a stale primary, and in neither case does it make sense to restore the retention leases from disk. With this change we write an empty retention leases file during recovery, except for the following cases: - During peer recovery the on-disk leases may be accurate and could be needed if the recovery target is made into a primary. - During recovery from an existing store, as long as we are not force-allocating a stale primary. Relates #37165
1 parent 479502e commit 33ae034

File tree

8 files changed

+152
-1
lines changed

8 files changed

+152
-1
lines changed

server/src/main/java/org/elasticsearch/cluster/routing/RecoverySource.java

+14
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,10 @@ public boolean shouldBootstrapNewHistoryUUID() {
9797
return false;
9898
}
9999

100+
public boolean expectEmptyRetentionLeases() {
101+
return true;
102+
}
103+
100104
@Override
101105
public boolean equals(Object o) {
102106
if (this == o) return true;
@@ -181,6 +185,11 @@ public Type getType() {
181185
public String toString() {
182186
return "existing store recovery; bootstrap_history_uuid=" + bootstrapNewHistoryUUID;
183187
}
188+
189+
@Override
190+
public boolean expectEmptyRetentionLeases() {
191+
return bootstrapNewHistoryUUID;
192+
}
184193
}
185194

186195
/**
@@ -317,5 +326,10 @@ public Type getType() {
317326
public String toString() {
318327
return "peer recovery";
319328
}
329+
330+
@Override
331+
public boolean expectEmptyRetentionLeases() {
332+
return false;
333+
}
320334
}
321335
}

server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java

+8
Original file line numberDiff line numberDiff line change
@@ -329,6 +329,9 @@ public synchronized void updateRetentionLeasesOnReplica(final RetentionLeases re
329329
*/
330330
public RetentionLeases loadRetentionLeases(final Path path) throws IOException {
331331
final RetentionLeases retentionLeases = RetentionLeases.FORMAT.loadLatestState(logger, NamedXContentRegistry.EMPTY, path);
332+
333+
// TODO after backporting we expect this never to happen in 8.x, so adjust this to throw an exception instead.
334+
assert Version.CURRENT.major <= 8 : "throw an exception instead of returning EMPTY on null";
332335
if (retentionLeases == null) {
333336
return RetentionLeases.EMPTY;
334337
}
@@ -354,6 +357,11 @@ public void persistRetentionLeases(final Path path) throws IOException {
354357
}
355358
}
356359

360+
public boolean assertRetentionLeasesPersisted(final Path path) throws IOException {
361+
assert RetentionLeases.FORMAT.loadLatestState(logger, NamedXContentRegistry.EMPTY, path) != null;
362+
return true;
363+
}
364+
357365
public static class CheckpointState implements Writeable {
358366

359367
/**

server/src/main/java/org/elasticsearch/index/shard/IndexShard.java

+7
Original file line numberDiff line numberDiff line change
@@ -1481,6 +1481,9 @@ private void innerOpenEngineAndTranslog() throws IOException {
14811481
final long globalCheckpoint = Translog.readGlobalCheckpoint(translogConfig.getTranslogPath(), translogUUID);
14821482
replicationTracker.updateGlobalCheckpointOnReplica(globalCheckpoint, "read from translog checkpoint");
14831483
updateRetentionLeasesOnReplica(loadRetentionLeases());
1484+
assert recoveryState.getRecoverySource().expectEmptyRetentionLeases() == false || getRetentionLeases().leases().isEmpty()
1485+
: "expected empty set of retention leases with recovery source [" + recoveryState.getRecoverySource()
1486+
+ "] but got " + getRetentionLeases();
14841487
trimUnsafeCommits();
14851488
synchronized (mutex) {
14861489
verifyNotClosed();
@@ -2080,6 +2083,10 @@ public void persistRetentionLeases() throws IOException {
20802083
replicationTracker.persistRetentionLeases(path.getShardStatePath());
20812084
}
20822085

2086+
public boolean assertRetentionLeasesPersisted() throws IOException {
2087+
return replicationTracker.assertRetentionLeasesPersisted(path.getShardStatePath());
2088+
}
2089+
20832090
/**
20842091
* Syncs the current retention leases to all replicas.
20852092
*/

server/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java

+10
Original file line numberDiff line numberDiff line change
@@ -403,9 +403,11 @@ private void internalRecoverFromStore(IndexShard indexShard) throws IndexShardRe
403403
final String translogUUID = Translog.createEmptyTranslog(
404404
indexShard.shardPath().resolveTranslog(), localCheckpoint, shardId, indexShard.getPendingPrimaryTerm());
405405
store.associateIndexWithNewTranslog(translogUUID);
406+
writeEmptyRetentionLeasesFile(indexShard);
406407
} else if (indexShouldExists) {
407408
if (recoveryState.getRecoverySource().shouldBootstrapNewHistoryUUID()) {
408409
store.bootstrapNewHistory();
410+
writeEmptyRetentionLeasesFile(indexShard);
409411
}
410412
if (indexShard.indexSettings().getIndexVersionCreated().before(Version.V_6_0_0_rc1)) {
411413
if (store.ensureIndexHas6xCommitTags()) {
@@ -427,6 +429,7 @@ private void internalRecoverFromStore(IndexShard indexShard) throws IndexShardRe
427429
indexShard.shardPath().resolveTranslog(), SequenceNumbers.NO_OPS_PERFORMED, shardId,
428430
indexShard.getPendingPrimaryTerm());
429431
store.associateIndexWithNewTranslog(translogUUID);
432+
writeEmptyRetentionLeasesFile(indexShard);
430433
}
431434
indexShard.openEngineAndRecoverFromTranslog();
432435
indexShard.getEngine().fillSeqNoGaps(indexShard.getPendingPrimaryTerm());
@@ -439,6 +442,12 @@ private void internalRecoverFromStore(IndexShard indexShard) throws IndexShardRe
439442
}
440443
}
441444

445+
private static void writeEmptyRetentionLeasesFile(IndexShard indexShard) throws IOException {
446+
assert indexShard.getRetentionLeases().leases().isEmpty() : indexShard.getRetentionLeases(); // not loaded yet
447+
indexShard.persistRetentionLeases();
448+
assert indexShard.loadRetentionLeases().leases().isEmpty();
449+
}
450+
442451
private void addRecoveredFileDetails(SegmentInfos si, Store store, RecoveryState.Index index) throws IOException {
443452
final Directory directory = store.directory();
444453
for (String name : Lucene.files(si)) {
@@ -478,6 +487,7 @@ private void restore(final IndexShard indexShard, final Repository repository, f
478487
indexShard.shardPath().resolveTranslog(), localCheckpoint, shardId, indexShard.getPendingPrimaryTerm());
479488
store.associateIndexWithNewTranslog(translogUUID);
480489
assert indexShard.shardRouting.primary() : "only primary shards can recover from store";
490+
writeEmptyRetentionLeasesFile(indexShard);
481491
indexShard.openEngineAndRecoverFromTranslog();
482492
indexShard.getEngine().fillSeqNoGaps(indexShard.getPendingPrimaryTerm());
483493
indexShard.finalizeRecovery();

server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java

+9
Original file line numberDiff line numberDiff line change
@@ -414,6 +414,15 @@ public void cleanFiles(int totalTranslogOps, Store.MetadataSnapshot sourceMetaDa
414414
indexShard.shardPath().resolveTranslog(), SequenceNumbers.UNASSIGNED_SEQ_NO, shardId,
415415
indexShard.getPendingPrimaryTerm());
416416
store.associateIndexWithNewTranslog(translogUUID);
417+
418+
if (indexShard.getRetentionLeases().leases().isEmpty()) {
419+
// if empty, may be a fresh IndexShard, so write an empty leases file to disk
420+
indexShard.persistRetentionLeases();
421+
assert indexShard.loadRetentionLeases().leases().isEmpty();
422+
} else {
423+
assert indexShard.assertRetentionLeasesPersisted();
424+
}
425+
417426
} catch (CorruptIndexException | IndexFormatTooNewException | IndexFormatTooOldException ex) {
418427
// this is a fatal exception at this stage.
419428
// this means we transferred files from the remote that have not be checksummed and they are

server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseIT.java

+28
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,11 @@
1919

2020
package org.elasticsearch.index.seqno;
2121

22+
import org.elasticsearch.ElasticsearchException;
2223
import org.elasticsearch.action.ActionListener;
2324
import org.elasticsearch.action.support.master.AcknowledgedResponse;
2425
import org.elasticsearch.action.support.replication.ReplicationResponse;
26+
import org.elasticsearch.cluster.node.DiscoveryNode;
2527
import org.elasticsearch.cluster.routing.ShardRouting;
2628
import org.elasticsearch.common.settings.Setting;
2729
import org.elasticsearch.common.settings.Settings;
@@ -31,6 +33,7 @@
3133
import org.elasticsearch.index.shard.IndexShard;
3234
import org.elasticsearch.index.shard.ShardId;
3335
import org.elasticsearch.indices.IndicesService;
36+
import org.elasticsearch.indices.recovery.PeerRecoveryTargetService;
3437
import org.elasticsearch.plugins.Plugin;
3538
import org.elasticsearch.test.ESIntegTestCase;
3639
import org.elasticsearch.test.transport.MockTransportService;
@@ -45,6 +48,7 @@
4548
import java.util.List;
4649
import java.util.Map;
4750
import java.util.concurrent.CountDownLatch;
51+
import java.util.concurrent.Semaphore;
4852
import java.util.concurrent.TimeUnit;
4953
import java.util.concurrent.atomic.AtomicBoolean;
5054
import java.util.concurrent.atomic.AtomicReference;
@@ -53,6 +57,7 @@
5357
import java.util.stream.Collectors;
5458
import java.util.stream.Stream;
5559

60+
import static org.elasticsearch.indices.recovery.RecoverySettings.INDICES_RECOVERY_RETRY_DELAY_NETWORK_SETTING;
5661
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
5762
import static org.hamcrest.Matchers.anyOf;
5863
import static org.hamcrest.Matchers.contains;
@@ -391,6 +396,29 @@ public void testRetentionLeasesSyncOnRecovery() throws Exception {
391396
currentRetentionLeases.put(id, primary.renewRetentionLease(id, retainingSequenceNumber, source));
392397
}
393398

399+
// Cause some recoveries to fail to ensure that retention leases are handled properly when retrying a recovery
400+
assertAcked(client().admin().cluster().prepareUpdateSettings().setPersistentSettings(Settings.builder()
401+
.put(INDICES_RECOVERY_RETRY_DELAY_NETWORK_SETTING.getKey(), "100ms")));
402+
final Semaphore recoveriesToDisrupt = new Semaphore(scaledRandomIntBetween(0, 4));
403+
final MockTransportService primaryTransportService
404+
= (MockTransportService) internalCluster().getInstance(TransportService.class, primaryShardNodeName);
405+
primaryTransportService.addSendBehavior((connection, requestId, action, request, options) -> {
406+
if (action.equals(PeerRecoveryTargetService.Actions.FINALIZE) && recoveriesToDisrupt.tryAcquire()) {
407+
if (randomBoolean()) {
408+
// return a ConnectTransportException to the START_RECOVERY action
409+
final TransportService replicaTransportService
410+
= internalCluster().getInstance(TransportService.class, connection.getNode().getName());
411+
final DiscoveryNode primaryNode = primaryTransportService.getLocalNode();
412+
replicaTransportService.disconnectFromNode(primaryNode);
413+
replicaTransportService.connectToNode(primaryNode);
414+
} else {
415+
// return an exception to the FINALIZE action
416+
throw new ElasticsearchException("failing recovery for test purposes");
417+
}
418+
}
419+
connection.sendRequest(requestId, action, request, options);
420+
});
421+
394422
// now allow the replicas to be allocated and wait for recovery to finalize
395423
allowNodes("index", 1 + numberOfReplicas);
396424
ensureGreen("index");

server/src/test/java/org/elasticsearch/index/shard/IndexShardRetentionLeaseTests.java

+14
Original file line numberDiff line numberDiff line change
@@ -265,6 +265,20 @@ public void testPersistence() throws IOException {
265265
} finally {
266266
closeShards(recoveredShard);
267267
}
268+
269+
// we should not recover retention leases when force-allocating a stale primary
270+
final IndexShard forceRecoveredShard = reinitShard(
271+
indexShard,
272+
ShardRoutingHelper.initWithSameId(indexShard.routingEntry(),
273+
RecoverySource.ExistingStoreRecoverySource.FORCE_STALE_PRIMARY_INSTANCE));
274+
try {
275+
recoverShardFromStore(forceRecoveredShard);
276+
final RetentionLeases recoveredRetentionLeases = forceRecoveredShard.getEngine().config().retentionLeasesSupplier().get();
277+
assertThat(recoveredRetentionLeases.leases(), empty());
278+
assertThat(recoveredRetentionLeases.version(), equalTo(0L));
279+
} finally {
280+
closeShards(forceRecoveredShard);
281+
}
268282
} finally {
269283
closeShards(indexShard);
270284
}

test/framework/src/main/java/org/elasticsearch/repositories/blobstore/ESBlobStoreRepositoryIntegTestCase.java

+62-1
Original file line numberDiff line numberDiff line change
@@ -23,9 +23,16 @@
2323
import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse;
2424
import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotRequestBuilder;
2525
import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse;
26+
import org.elasticsearch.action.admin.indices.stats.ShardStats;
2627
import org.elasticsearch.action.index.IndexRequestBuilder;
2728
import org.elasticsearch.client.Client;
29+
import org.elasticsearch.cluster.metadata.IndexMetaData;
2830
import org.elasticsearch.common.blobstore.BlobContainer;
31+
import org.elasticsearch.common.settings.Settings;
32+
import org.elasticsearch.index.IndexSettings;
33+
import org.elasticsearch.index.seqno.RetentionLeaseActions;
34+
import org.elasticsearch.index.seqno.RetentionLeases;
35+
import org.elasticsearch.index.shard.ShardId;
2936
import org.elasticsearch.repositories.IndexId;
3037
import org.elasticsearch.repositories.RepositoriesService;
3138
import org.elasticsearch.repositories.Repository;
@@ -43,6 +50,7 @@
4350
import java.util.concurrent.CountDownLatch;
4451
import java.util.concurrent.ExecutionException;
4552

53+
import static org.elasticsearch.index.seqno.RetentionLeaseActions.RETAIN_ALL;
4654
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
4755
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
4856
import static org.hamcrest.Matchers.equalTo;
@@ -87,8 +95,8 @@ public void testSnapshotAndRestore() throws Exception {
8795
int[] docCounts = new int[indexCount];
8896
String[] indexNames = generateRandomNames(indexCount);
8997
for (int i = 0; i < indexCount; i++) {
90-
logger.info("--> create random index {} with {} records", indexNames[i], docCounts[i]);
9198
docCounts[i] = iterations(10, 1000);
99+
logger.info("--> create random index {} with {} records", indexNames[i], docCounts[i]);
92100
addRandomDocuments(indexNames[i], docCounts[i]);
93101
assertHitCount(client().prepareSearch(indexNames[i]).setSize(0).get(), docCounts[i]);
94102
}
@@ -267,6 +275,59 @@ public void testIndicesDeletedFromRepository() throws Exception {
267275
}
268276
}
269277

278+
public void testRetentionLeasesClearedOnRestore() throws Exception {
279+
final String repoName = randomAsciiName();
280+
logger.info("--> creating repository {}", repoName);
281+
createAndCheckTestRepository(repoName);
282+
283+
final String indexName = randomAsciiName();
284+
final int shardCount = randomIntBetween(1, 5);
285+
assertAcked(client().admin().indices().prepareCreate(indexName).setSettings(Settings.builder()
286+
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, shardCount)
287+
.put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true)).get());
288+
final ShardId shardId = new ShardId(resolveIndex(indexName), randomIntBetween(0, shardCount - 1));
289+
290+
final int snapshotDocCount = iterations(10, 1000);
291+
logger.info("--> indexing {} docs into {}", snapshotDocCount, indexName);
292+
addRandomDocuments(indexName, snapshotDocCount);
293+
assertHitCount(client().prepareSearch(indexName).setSize(0).get(), snapshotDocCount);
294+
295+
final String leaseId = randomAsciiName();
296+
logger.info("--> adding retention lease with id {} to {}", leaseId, shardId);
297+
client().execute(RetentionLeaseActions.Add.INSTANCE, new RetentionLeaseActions.AddRequest(
298+
shardId, leaseId, RETAIN_ALL, "test")).actionGet();
299+
300+
final ShardStats shardStats = Arrays.stream(client().admin().indices().prepareStats(indexName).get().getShards())
301+
.filter(s -> s.getShardRouting().shardId().equals(shardId)).findFirst().get();
302+
final RetentionLeases retentionLeases = shardStats.getRetentionLeaseStats().retentionLeases();
303+
assertTrue(shardStats + ": " + retentionLeases, retentionLeases.contains(leaseId));
304+
305+
final String snapshotName = randomAsciiName();
306+
logger.info("--> create snapshot {}:{}", repoName, snapshotName);
307+
assertSuccessfulSnapshot(client().admin().cluster().prepareCreateSnapshot(repoName, snapshotName)
308+
.setWaitForCompletion(true).setIndices(indexName));
309+
310+
if (randomBoolean()) {
311+
final int extraDocCount = iterations(10, 1000);
312+
logger.info("--> indexing {} extra docs into {}", extraDocCount, indexName);
313+
addRandomDocuments(indexName, extraDocCount);
314+
}
315+
316+
logger.info("--> close index {}", indexName);
317+
assertAcked(client().admin().indices().prepareClose(indexName));
318+
319+
logger.info("--> restore index {} from snapshot", indexName);
320+
assertSuccessfulRestore(client().admin().cluster().prepareRestoreSnapshot(repoName, snapshotName).setWaitForCompletion(true));
321+
322+
ensureGreen();
323+
assertHitCount(client().prepareSearch(indexName).setSize(0).get(), snapshotDocCount);
324+
325+
final RetentionLeases restoredRetentionLeases = Arrays.stream(client().admin().indices().prepareStats(indexName).get()
326+
.getShards()).filter(s -> s.getShardRouting().shardId().equals(shardId)).findFirst().get()
327+
.getRetentionLeaseStats().retentionLeases();
328+
assertFalse(restoredRetentionLeases.toString() + " has no " + leaseId, restoredRetentionLeases.contains(leaseId));
329+
}
330+
270331
protected void addRandomDocuments(String name, int numDocs) throws ExecutionException, InterruptedException {
271332
IndexRequestBuilder[] indexRequestBuilders = new IndexRequestBuilder[numDocs];
272333
for (int i = 0; i < numDocs; i++) {

0 commit comments

Comments
 (0)