diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/RecoverySource.java b/server/src/main/java/org/elasticsearch/cluster/routing/RecoverySource.java index 25a605088ef66..91229c93ca854 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/RecoverySource.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/RecoverySource.java @@ -97,6 +97,10 @@ public boolean shouldBootstrapNewHistoryUUID() { return false; } + public boolean expectEmptyRetentionLeases() { + return true; + } + @Override public boolean equals(Object o) { if (this == o) return true; @@ -181,6 +185,11 @@ public Type getType() { public String toString() { return "existing store recovery; bootstrap_history_uuid=" + bootstrapNewHistoryUUID; } + + @Override + public boolean expectEmptyRetentionLeases() { + return bootstrapNewHistoryUUID; + } } /** @@ -317,5 +326,10 @@ public Type getType() { public String toString() { return "peer recovery"; } + + @Override + public boolean expectEmptyRetentionLeases() { + return false; + } } } diff --git a/server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java b/server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java index 566a81b3af4b0..745f6beb7343e 100644 --- a/server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java +++ b/server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java @@ -330,6 +330,9 @@ public synchronized void updateRetentionLeasesOnReplica(final RetentionLeases re */ public RetentionLeases loadRetentionLeases(final Path path) throws IOException { final RetentionLeases retentionLeases = RetentionLeases.FORMAT.loadLatestState(logger, NamedXContentRegistry.EMPTY, path); + + // TODO after backporting we expect this never to happen in 8.x, so adjust this to throw an exception instead. + assert Version.CURRENT.major <= 8 : "throw an exception instead of returning EMPTY on null"; if (retentionLeases == null) { return RetentionLeases.EMPTY; } @@ -355,6 +358,11 @@ public void persistRetentionLeases(final Path path) throws WriteStateException { } } + public boolean assertRetentionLeasesPersisted(final Path path) throws IOException { + assert RetentionLeases.FORMAT.loadLatestState(logger, NamedXContentRegistry.EMPTY, path) != null; + return true; + } + public static class CheckpointState implements Writeable { /** diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 12308488406a5..7f8efe6b5fa2d 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -1434,6 +1434,9 @@ private void innerOpenEngineAndTranslog() throws IOException { final long globalCheckpoint = Translog.readGlobalCheckpoint(translogConfig.getTranslogPath(), translogUUID); replicationTracker.updateGlobalCheckpointOnReplica(globalCheckpoint, "read from translog checkpoint"); updateRetentionLeasesOnReplica(loadRetentionLeases()); + assert recoveryState.getRecoverySource().expectEmptyRetentionLeases() == false || getRetentionLeases().leases().isEmpty() + : "expected empty set of retention leases with recovery source [" + recoveryState.getRecoverySource() + + "] but got " + getRetentionLeases(); trimUnsafeCommits(); synchronized (mutex) { verifyNotClosed(); @@ -2018,6 +2021,10 @@ public void persistRetentionLeases() throws WriteStateException { replicationTracker.persistRetentionLeases(path.getShardStatePath()); } + public boolean assertRetentionLeasesPersisted() throws IOException { + return replicationTracker.assertRetentionLeasesPersisted(path.getShardStatePath()); + } + /** * Syncs the current retention leases to all replicas. */ diff --git a/server/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java b/server/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java index d15de54c54e99..c97c19eb0f3ec 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java +++ b/server/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java @@ -401,9 +401,11 @@ private void internalRecoverFromStore(IndexShard indexShard) throws IndexShardRe final String translogUUID = Translog.createEmptyTranslog( indexShard.shardPath().resolveTranslog(), localCheckpoint, shardId, indexShard.getPendingPrimaryTerm()); store.associateIndexWithNewTranslog(translogUUID); + writeEmptyRetentionLeasesFile(indexShard); } else if (indexShouldExists) { if (recoveryState.getRecoverySource().shouldBootstrapNewHistoryUUID()) { store.bootstrapNewHistory(); + writeEmptyRetentionLeasesFile(indexShard); } // since we recover from local, just fill the files and size try { @@ -420,6 +422,7 @@ private void internalRecoverFromStore(IndexShard indexShard) throws IndexShardRe indexShard.shardPath().resolveTranslog(), SequenceNumbers.NO_OPS_PERFORMED, shardId, indexShard.getPendingPrimaryTerm()); store.associateIndexWithNewTranslog(translogUUID); + writeEmptyRetentionLeasesFile(indexShard); } indexShard.openEngineAndRecoverFromTranslog(); indexShard.getEngine().fillSeqNoGaps(indexShard.getPendingPrimaryTerm()); @@ -432,6 +435,12 @@ private void internalRecoverFromStore(IndexShard indexShard) throws IndexShardRe } } + private static void writeEmptyRetentionLeasesFile(IndexShard indexShard) throws IOException { + assert indexShard.getRetentionLeases().leases().isEmpty() : indexShard.getRetentionLeases(); // not loaded yet + indexShard.persistRetentionLeases(); + assert indexShard.loadRetentionLeases().leases().isEmpty(); + } + private void addRecoveredFileDetails(SegmentInfos si, Store store, RecoveryState.Index index) throws IOException { final Directory directory = store.directory(); for (String name : Lucene.files(si)) { @@ -471,6 +480,7 @@ private void restore(final IndexShard indexShard, final Repository repository, f indexShard.shardPath().resolveTranslog(), localCheckpoint, shardId, indexShard.getPendingPrimaryTerm()); store.associateIndexWithNewTranslog(translogUUID); assert indexShard.shardRouting.primary() : "only primary shards can recover from store"; + writeEmptyRetentionLeasesFile(indexShard); indexShard.openEngineAndRecoverFromTranslog(); indexShard.getEngine().fillSeqNoGaps(indexShard.getPendingPrimaryTerm()); indexShard.finalizeRecovery(); diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java b/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java index b75cb23e9e656..a97208561962e 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java @@ -414,6 +414,15 @@ public void cleanFiles(int totalTranslogOps, Store.MetadataSnapshot sourceMetaDa indexShard.shardPath().resolveTranslog(), SequenceNumbers.UNASSIGNED_SEQ_NO, shardId, indexShard.getPendingPrimaryTerm()); store.associateIndexWithNewTranslog(translogUUID); + + if (indexShard.getRetentionLeases().leases().isEmpty()) { + // if empty, may be a fresh IndexShard, so write an empty leases file to disk + indexShard.persistRetentionLeases(); + assert indexShard.loadRetentionLeases().leases().isEmpty(); + } else { + assert indexShard.assertRetentionLeasesPersisted(); + } + } catch (CorruptIndexException | IndexFormatTooNewException | IndexFormatTooOldException ex) { // this is a fatal exception at this stage. // this means we transferred files from the remote that have not be checksummed and they are diff --git a/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseIT.java b/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseIT.java index ecfe2c15a08a5..1b8a4f7b552e2 100644 --- a/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseIT.java +++ b/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseIT.java @@ -19,9 +19,11 @@ package org.elasticsearch.index.seqno; +import org.elasticsearch.ElasticsearchException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.action.support.replication.ReplicationResponse; +import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; @@ -31,9 +33,12 @@ import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.indices.IndicesService; +import org.elasticsearch.indices.recovery.PeerRecoveryTargetService; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.test.ESIntegTestCase; +import org.elasticsearch.test.transport.MockTransportService; import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TransportService; import java.io.Closeable; import java.util.ArrayList; @@ -43,6 +48,7 @@ import java.util.List; import java.util.Map; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; @@ -51,6 +57,7 @@ import java.util.stream.Collectors; import java.util.stream.Stream; +import static org.elasticsearch.indices.recovery.RecoverySettings.INDICES_RECOVERY_RETRY_DELAY_NETWORK_SETTING; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.hamcrest.Matchers.anyOf; import static org.hamcrest.Matchers.contains; @@ -73,7 +80,7 @@ public List> getSettings() { protected Collection> nodePlugins() { return Stream.concat( super.nodePlugins().stream(), - Stream.of(RetentionLeaseSyncIntervalSettingPlugin.class)) + Stream.of(RetentionLeaseSyncIntervalSettingPlugin.class, MockTransportService.TestPlugin.class)) .collect(Collectors.toList()); } @@ -355,6 +362,29 @@ public void testRetentionLeasesSyncOnRecovery() throws Exception { currentRetentionLeases.put(id, primary.renewRetentionLease(id, retainingSequenceNumber, source)); } + // Cause some recoveries to fail to ensure that retention leases are handled properly when retrying a recovery + assertAcked(client().admin().cluster().prepareUpdateSettings().setPersistentSettings(Settings.builder() + .put(INDICES_RECOVERY_RETRY_DELAY_NETWORK_SETTING.getKey(), "100ms"))); + final Semaphore recoveriesToDisrupt = new Semaphore(scaledRandomIntBetween(0, 4)); + final MockTransportService primaryTransportService + = (MockTransportService) internalCluster().getInstance(TransportService.class, primaryShardNodeName); + primaryTransportService.addSendBehavior((connection, requestId, action, request, options) -> { + if (action.equals(PeerRecoveryTargetService.Actions.FINALIZE) && recoveriesToDisrupt.tryAcquire()) { + if (randomBoolean()) { + // return a ConnectTransportException to the START_RECOVERY action + final TransportService replicaTransportService + = internalCluster().getInstance(TransportService.class, connection.getNode().getName()); + final DiscoveryNode primaryNode = primaryTransportService.getLocalNode(); + replicaTransportService.disconnectFromNode(primaryNode); + replicaTransportService.connectToNode(primaryNode); + } else { + // return an exception to the FINALIZE action + throw new ElasticsearchException("failing recovery for test purposes"); + } + } + connection.sendRequest(requestId, action, request, options); + }); + // now allow the replicas to be allocated and wait for recovery to finalize allowNodes("index", 1 + numberOfReplicas); ensureGreen("index"); diff --git a/server/src/test/java/org/elasticsearch/index/shard/IndexShardRetentionLeaseTests.java b/server/src/test/java/org/elasticsearch/index/shard/IndexShardRetentionLeaseTests.java index 566d1feaf007d..45cb82ca4b1d3 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/IndexShardRetentionLeaseTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/IndexShardRetentionLeaseTests.java @@ -262,6 +262,20 @@ public void testPersistence() throws IOException { } finally { closeShards(recoveredShard); } + + // we should not recover retention leases when force-allocating a stale primary + final IndexShard forceRecoveredShard = reinitShard( + indexShard, + ShardRoutingHelper.initWithSameId(indexShard.routingEntry(), + RecoverySource.ExistingStoreRecoverySource.FORCE_STALE_PRIMARY_INSTANCE)); + try { + recoverShardFromStore(forceRecoveredShard); + final RetentionLeases recoveredRetentionLeases = forceRecoveredShard.getEngine().config().retentionLeasesSupplier().get(); + assertThat(recoveredRetentionLeases.leases(), empty()); + assertThat(recoveredRetentionLeases.version(), equalTo(0L)); + } finally { + closeShards(forceRecoveredShard); + } } finally { closeShards(indexShard); } diff --git a/test/framework/src/main/java/org/elasticsearch/repositories/blobstore/ESBlobStoreRepositoryIntegTestCase.java b/test/framework/src/main/java/org/elasticsearch/repositories/blobstore/ESBlobStoreRepositoryIntegTestCase.java index 2b8fba34c2f3e..8187a46fa7425 100644 --- a/test/framework/src/main/java/org/elasticsearch/repositories/blobstore/ESBlobStoreRepositoryIntegTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/repositories/blobstore/ESBlobStoreRepositoryIntegTestCase.java @@ -23,9 +23,15 @@ import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse; import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotRequestBuilder; import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse; +import org.elasticsearch.action.admin.indices.stats.ShardStats; import org.elasticsearch.action.index.IndexRequestBuilder; import org.elasticsearch.client.Client; +import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.common.blobstore.BlobContainer; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.index.seqno.RetentionLeaseActions; +import org.elasticsearch.index.seqno.RetentionLeases; +import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.repositories.IndexId; import org.elasticsearch.repositories.RepositoriesService; import org.elasticsearch.repositories.Repository; @@ -43,6 +49,7 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; +import static org.elasticsearch.index.seqno.RetentionLeaseActions.RETAIN_ALL; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount; import static org.hamcrest.Matchers.equalTo; @@ -87,8 +94,8 @@ public void testSnapshotAndRestore() throws Exception { int[] docCounts = new int[indexCount]; String[] indexNames = generateRandomNames(indexCount); for (int i = 0; i < indexCount; i++) { - logger.info("--> create random index {} with {} records", indexNames[i], docCounts[i]); docCounts[i] = iterations(10, 1000); + logger.info("--> create random index {} with {} records", indexNames[i], docCounts[i]); addRandomDocuments(indexNames[i], docCounts[i]); assertHitCount(client().prepareSearch(indexNames[i]).setSize(0).get(), docCounts[i]); } @@ -267,6 +274,58 @@ public void testIndicesDeletedFromRepository() throws Exception { } } + public void testRetentionLeasesClearedOnRestore() throws Exception { + final String repoName = randomAsciiName(); + logger.info("--> creating repository {}", repoName); + createAndCheckTestRepository(repoName); + + final String indexName = randomAsciiName(); + final int shardCount = randomIntBetween(1, 5); + assertAcked(client().admin().indices().prepareCreate(indexName).setSettings( + Settings.builder().put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, shardCount)).get()); + final ShardId shardId = new ShardId(resolveIndex(indexName), randomIntBetween(0, shardCount - 1)); + + final int snapshotDocCount = iterations(10, 1000); + logger.info("--> indexing {} docs into {}", snapshotDocCount, indexName); + addRandomDocuments(indexName, snapshotDocCount); + assertHitCount(client().prepareSearch(indexName).setSize(0).get(), snapshotDocCount); + + final String leaseId = randomAsciiName(); + logger.info("--> adding retention lease with id {} to {}", leaseId, shardId); + client().execute(RetentionLeaseActions.Add.INSTANCE, new RetentionLeaseActions.AddRequest( + shardId, leaseId, RETAIN_ALL, "test")).actionGet(); + + final ShardStats shardStats = Arrays.stream(client().admin().indices().prepareStats(indexName).get().getShards()) + .filter(s -> s.getShardRouting().shardId().equals(shardId)).findFirst().get(); + final RetentionLeases retentionLeases = shardStats.getRetentionLeaseStats().retentionLeases(); + assertTrue(shardStats + ": " + retentionLeases, retentionLeases.contains(leaseId)); + + final String snapshotName = randomAsciiName(); + logger.info("--> create snapshot {}:{}", repoName, snapshotName); + assertSuccessfulSnapshot(client().admin().cluster().prepareCreateSnapshot(repoName, snapshotName) + .setWaitForCompletion(true).setIndices(indexName)); + + if (randomBoolean()) { + final int extraDocCount = iterations(10, 1000); + logger.info("--> indexing {} extra docs into {}", extraDocCount, indexName); + addRandomDocuments(indexName, extraDocCount); + } + + logger.info("--> close index {}", indexName); + assertAcked(client().admin().indices().prepareClose(indexName)); + + logger.info("--> restore index {} from snapshot", indexName); + assertSuccessfulRestore(client().admin().cluster().prepareRestoreSnapshot(repoName, snapshotName).setWaitForCompletion(true)); + + ensureGreen(); + assertHitCount(client().prepareSearch(indexName).setSize(0).get(), snapshotDocCount); + + final RetentionLeases restoredRetentionLeases = Arrays.stream(client().admin().indices().prepareStats(indexName).get() + .getShards()).filter(s -> s.getShardRouting().shardId().equals(shardId)).findFirst().get() + .getRetentionLeaseStats().retentionLeases(); + assertFalse(restoredRetentionLeases.toString() + " has no " + leaseId, restoredRetentionLeases.contains(leaseId)); + } + protected void addRandomDocuments(String name, int numDocs) throws ExecutionException, InterruptedException { IndexRequestBuilder[] indexRequestBuilders = new IndexRequestBuilder[numDocs]; for (int i = 0; i < numDocs; i++) {