Skip to content

Create retention leases file during recovery #39359

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,10 @@ public boolean shouldBootstrapNewHistoryUUID() {
return false;
}

public boolean expectEmptyRetentionLeases() {
return true;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
Expand Down Expand Up @@ -181,6 +185,11 @@ public Type getType() {
public String toString() {
return "existing store recovery; bootstrap_history_uuid=" + bootstrapNewHistoryUUID;
}

@Override
public boolean expectEmptyRetentionLeases() {
return bootstrapNewHistoryUUID;
}
}

/**
Expand Down Expand Up @@ -317,5 +326,10 @@ public Type getType() {
public String toString() {
return "peer recovery";
}

@Override
public boolean expectEmptyRetentionLeases() {
return false;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -330,6 +330,9 @@ public synchronized void updateRetentionLeasesOnReplica(final RetentionLeases re
*/
public RetentionLeases loadRetentionLeases(final Path path) throws IOException {
final RetentionLeases retentionLeases = RetentionLeases.FORMAT.loadLatestState(logger, NamedXContentRegistry.EMPTY, path);

// TODO after backporting we expect this never to happen in 8.x, so adjust this to throw an exception instead.
assert Version.CURRENT.major <= 8 : "throw an exception instead of returning EMPTY on null";
if (retentionLeases == null) {
return RetentionLeases.EMPTY;
}
Expand All @@ -355,6 +358,11 @@ public void persistRetentionLeases(final Path path) throws WriteStateException {
}
}

public boolean assertRetentionLeasesPersisted(final Path path) throws IOException {
assert RetentionLeases.FORMAT.loadLatestState(logger, NamedXContentRegistry.EMPTY, path) != null;
return true;
}

public static class CheckpointState implements Writeable {

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1434,6 +1434,9 @@ private void innerOpenEngineAndTranslog() throws IOException {
final long globalCheckpoint = Translog.readGlobalCheckpoint(translogConfig.getTranslogPath(), translogUUID);
replicationTracker.updateGlobalCheckpointOnReplica(globalCheckpoint, "read from translog checkpoint");
updateRetentionLeasesOnReplica(loadRetentionLeases());
assert recoveryState.getRecoverySource().expectEmptyRetentionLeases() == false || getRetentionLeases().leases().isEmpty()
: "expected empty set of retention leases with recovery source [" + recoveryState.getRecoverySource()
+ "] but got " + getRetentionLeases();
trimUnsafeCommits();
synchronized (mutex) {
verifyNotClosed();
Expand Down Expand Up @@ -2018,6 +2021,10 @@ public void persistRetentionLeases() throws WriteStateException {
replicationTracker.persistRetentionLeases(path.getShardStatePath());
}

public boolean assertRetentionLeasesPersisted() throws IOException {
return replicationTracker.assertRetentionLeasesPersisted(path.getShardStatePath());
}

/**
* Syncs the current retention leases to all replicas.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -401,9 +401,11 @@ private void internalRecoverFromStore(IndexShard indexShard) throws IndexShardRe
final String translogUUID = Translog.createEmptyTranslog(
indexShard.shardPath().resolveTranslog(), localCheckpoint, shardId, indexShard.getPendingPrimaryTerm());
store.associateIndexWithNewTranslog(translogUUID);
writeEmptyRetentionLeasesFile(indexShard);
} else if (indexShouldExists) {
if (recoveryState.getRecoverySource().shouldBootstrapNewHistoryUUID()) {
store.bootstrapNewHistory();
writeEmptyRetentionLeasesFile(indexShard);
}
// since we recover from local, just fill the files and size
try {
Expand All @@ -420,6 +422,7 @@ private void internalRecoverFromStore(IndexShard indexShard) throws IndexShardRe
indexShard.shardPath().resolveTranslog(), SequenceNumbers.NO_OPS_PERFORMED, shardId,
indexShard.getPendingPrimaryTerm());
store.associateIndexWithNewTranslog(translogUUID);
writeEmptyRetentionLeasesFile(indexShard);
}
indexShard.openEngineAndRecoverFromTranslog();
indexShard.getEngine().fillSeqNoGaps(indexShard.getPendingPrimaryTerm());
Expand All @@ -432,6 +435,12 @@ private void internalRecoverFromStore(IndexShard indexShard) throws IndexShardRe
}
}

private static void writeEmptyRetentionLeasesFile(IndexShard indexShard) throws IOException {
assert indexShard.getRetentionLeases().leases().isEmpty() : indexShard.getRetentionLeases(); // not loaded yet
indexShard.persistRetentionLeases();
assert indexShard.loadRetentionLeases().leases().isEmpty();
}

private void addRecoveredFileDetails(SegmentInfos si, Store store, RecoveryState.Index index) throws IOException {
final Directory directory = store.directory();
for (String name : Lucene.files(si)) {
Expand Down Expand Up @@ -471,6 +480,7 @@ private void restore(final IndexShard indexShard, final Repository repository, f
indexShard.shardPath().resolveTranslog(), localCheckpoint, shardId, indexShard.getPendingPrimaryTerm());
store.associateIndexWithNewTranslog(translogUUID);
assert indexShard.shardRouting.primary() : "only primary shards can recover from store";
writeEmptyRetentionLeasesFile(indexShard);
indexShard.openEngineAndRecoverFromTranslog();
indexShard.getEngine().fillSeqNoGaps(indexShard.getPendingPrimaryTerm());
indexShard.finalizeRecovery();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -414,6 +414,15 @@ public void cleanFiles(int totalTranslogOps, Store.MetadataSnapshot sourceMetaDa
indexShard.shardPath().resolveTranslog(), SequenceNumbers.UNASSIGNED_SEQ_NO, shardId,
indexShard.getPendingPrimaryTerm());
store.associateIndexWithNewTranslog(translogUUID);

if (indexShard.getRetentionLeases().leases().isEmpty()) {
// if empty, may be a fresh IndexShard, so write an empty leases file to disk
indexShard.persistRetentionLeases();
assert indexShard.loadRetentionLeases().leases().isEmpty();
} else {
assert indexShard.assertRetentionLeasesPersisted();
}

} catch (CorruptIndexException | IndexFormatTooNewException | IndexFormatTooOldException ex) {
// this is a fatal exception at this stage.
// this means we transferred files from the remote that have not be checksummed and they are
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,11 @@

package org.elasticsearch.index.seqno;

import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.action.support.replication.ReplicationResponse;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
Expand All @@ -31,9 +33,12 @@
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.indices.recovery.PeerRecoveryTargetService;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.transport.MockTransportService;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;

import java.io.Closeable;
import java.util.ArrayList;
Expand All @@ -43,6 +48,7 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
Expand All @@ -51,6 +57,7 @@
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.elasticsearch.indices.recovery.RecoverySettings.INDICES_RECOVERY_RETRY_DELAY_NETWORK_SETTING;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.anyOf;
import static org.hamcrest.Matchers.contains;
Expand All @@ -73,7 +80,7 @@ public List<Setting<?>> getSettings() {
protected Collection<Class<? extends Plugin>> nodePlugins() {
return Stream.concat(
super.nodePlugins().stream(),
Stream.of(RetentionLeaseSyncIntervalSettingPlugin.class))
Stream.of(RetentionLeaseSyncIntervalSettingPlugin.class, MockTransportService.TestPlugin.class))
.collect(Collectors.toList());
}

Expand Down Expand Up @@ -355,6 +362,29 @@ public void testRetentionLeasesSyncOnRecovery() throws Exception {
currentRetentionLeases.put(id, primary.renewRetentionLease(id, retainingSequenceNumber, source));
}

// Cause some recoveries to fail to ensure that retention leases are handled properly when retrying a recovery
assertAcked(client().admin().cluster().prepareUpdateSettings().setPersistentSettings(Settings.builder()
.put(INDICES_RECOVERY_RETRY_DELAY_NETWORK_SETTING.getKey(), "100ms")));
final Semaphore recoveriesToDisrupt = new Semaphore(scaledRandomIntBetween(0, 4));
final MockTransportService primaryTransportService
= (MockTransportService) internalCluster().getInstance(TransportService.class, primaryShardNodeName);
primaryTransportService.addSendBehavior((connection, requestId, action, request, options) -> {
if (action.equals(PeerRecoveryTargetService.Actions.FINALIZE) && recoveriesToDisrupt.tryAcquire()) {
if (randomBoolean()) {
// return a ConnectTransportException to the START_RECOVERY action
final TransportService replicaTransportService
= internalCluster().getInstance(TransportService.class, connection.getNode().getName());
final DiscoveryNode primaryNode = primaryTransportService.getLocalNode();
replicaTransportService.disconnectFromNode(primaryNode);
replicaTransportService.connectToNode(primaryNode);
} else {
// return an exception to the FINALIZE action
throw new ElasticsearchException("failing recovery for test purposes");
}
}
connection.sendRequest(requestId, action, request, options);
});

// now allow the replicas to be allocated and wait for recovery to finalize
allowNodes("index", 1 + numberOfReplicas);
ensureGreen("index");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,20 @@ public void testPersistence() throws IOException {
} finally {
closeShards(recoveredShard);
}

// we should not recover retention leases when force-allocating a stale primary
final IndexShard forceRecoveredShard = reinitShard(
indexShard,
ShardRoutingHelper.initWithSameId(indexShard.routingEntry(),
RecoverySource.ExistingStoreRecoverySource.FORCE_STALE_PRIMARY_INSTANCE));
try {
recoverShardFromStore(forceRecoveredShard);
final RetentionLeases recoveredRetentionLeases = forceRecoveredShard.getEngine().config().retentionLeasesSupplier().get();
assertThat(recoveredRetentionLeases.leases(), empty());
assertThat(recoveredRetentionLeases.version(), equalTo(0L));
} finally {
closeShards(forceRecoveredShard);
}
} finally {
closeShards(indexShard);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,15 @@
import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse;
import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotRequestBuilder;
import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse;
import org.elasticsearch.action.admin.indices.stats.ShardStats;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.blobstore.BlobContainer;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.seqno.RetentionLeaseActions;
import org.elasticsearch.index.seqno.RetentionLeases;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.repositories.IndexId;
import org.elasticsearch.repositories.RepositoriesService;
import org.elasticsearch.repositories.Repository;
Expand All @@ -43,6 +49,7 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;

import static org.elasticsearch.index.seqno.RetentionLeaseActions.RETAIN_ALL;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
import static org.hamcrest.Matchers.equalTo;
Expand Down Expand Up @@ -87,8 +94,8 @@ public void testSnapshotAndRestore() throws Exception {
int[] docCounts = new int[indexCount];
String[] indexNames = generateRandomNames(indexCount);
for (int i = 0; i < indexCount; i++) {
logger.info("--> create random index {} with {} records", indexNames[i], docCounts[i]);
docCounts[i] = iterations(10, 1000);
logger.info("--> create random index {} with {} records", indexNames[i], docCounts[i]);
addRandomDocuments(indexNames[i], docCounts[i]);
assertHitCount(client().prepareSearch(indexNames[i]).setSize(0).get(), docCounts[i]);
}
Expand Down Expand Up @@ -267,6 +274,58 @@ public void testIndicesDeletedFromRepository() throws Exception {
}
}

public void testRetentionLeasesClearedOnRestore() throws Exception {
final String repoName = randomAsciiName();
logger.info("--> creating repository {}", repoName);
createAndCheckTestRepository(repoName);

final String indexName = randomAsciiName();
final int shardCount = randomIntBetween(1, 5);
assertAcked(client().admin().indices().prepareCreate(indexName).setSettings(
Settings.builder().put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, shardCount)).get());
final ShardId shardId = new ShardId(resolveIndex(indexName), randomIntBetween(0, shardCount - 1));

final int snapshotDocCount = iterations(10, 1000);
logger.info("--> indexing {} docs into {}", snapshotDocCount, indexName);
addRandomDocuments(indexName, snapshotDocCount);
assertHitCount(client().prepareSearch(indexName).setSize(0).get(), snapshotDocCount);

final String leaseId = randomAsciiName();
logger.info("--> adding retention lease with id {} to {}", leaseId, shardId);
client().execute(RetentionLeaseActions.Add.INSTANCE, new RetentionLeaseActions.AddRequest(
shardId, leaseId, RETAIN_ALL, "test")).actionGet();

final ShardStats shardStats = Arrays.stream(client().admin().indices().prepareStats(indexName).get().getShards())
.filter(s -> s.getShardRouting().shardId().equals(shardId)).findFirst().get();
final RetentionLeases retentionLeases = shardStats.getRetentionLeaseStats().retentionLeases();
assertTrue(shardStats + ": " + retentionLeases, retentionLeases.contains(leaseId));

final String snapshotName = randomAsciiName();
logger.info("--> create snapshot {}:{}", repoName, snapshotName);
assertSuccessfulSnapshot(client().admin().cluster().prepareCreateSnapshot(repoName, snapshotName)
.setWaitForCompletion(true).setIndices(indexName));

if (randomBoolean()) {
final int extraDocCount = iterations(10, 1000);
logger.info("--> indexing {} extra docs into {}", extraDocCount, indexName);
addRandomDocuments(indexName, extraDocCount);
}

logger.info("--> close index {}", indexName);
assertAcked(client().admin().indices().prepareClose(indexName));

logger.info("--> restore index {} from snapshot", indexName);
assertSuccessfulRestore(client().admin().cluster().prepareRestoreSnapshot(repoName, snapshotName).setWaitForCompletion(true));

ensureGreen();
assertHitCount(client().prepareSearch(indexName).setSize(0).get(), snapshotDocCount);

final RetentionLeases restoredRetentionLeases = Arrays.stream(client().admin().indices().prepareStats(indexName).get()
.getShards()).filter(s -> s.getShardRouting().shardId().equals(shardId)).findFirst().get()
.getRetentionLeaseStats().retentionLeases();
assertFalse(restoredRetentionLeases.toString() + " has no " + leaseId, restoredRetentionLeases.contains(leaseId));
}

protected void addRandomDocuments(String name, int numDocs) throws ExecutionException, InterruptedException {
IndexRequestBuilder[] indexRequestBuilders = new IndexRequestBuilder[numDocs];
for (int i = 0; i < numDocs; i++) {
Expand Down