From 674ddf2e06cd543c726140a83dc3204448017b24 Mon Sep 17 00:00:00 2001 From: David Turner Date: Thu, 25 Jul 2019 12:06:03 +0100 Subject: [PATCH 01/14] Recover peers using history from Lucene Thanks to peer recovery retention leases we now retain the history needed to perform peer recoveries from the index instead of from the translog. This commit adjusts the peer recovery process to do so, and also adjusts it to use the existence of a retention lease to decide whether or not to attempt an operations-based recovery. Reverts #38904 and #42211 Relates #41536 --- .../elasticsearch/index/engine/Engine.java | 2 +- .../index/engine/InternalEngine.java | 30 ++-- .../recovery/RecoverySourceHandler.java | 97 ++++++++---- .../gateway/RecoveryFromGatewayIT.java | 14 +- .../RecoveryDuringReplicationTests.java | 11 ++ .../index/shard/IndexShardTests.java | 18 ++- .../shard/PrimaryReplicaSyncerTests.java | 14 +- .../indices/recovery/IndexRecoveryIT.java | 141 ++++++++++++++++++ .../recovery/RecoverySourceHandlerTests.java | 14 ++ .../indices/recovery/RecoveryTests.java | 9 +- 10 files changed, 294 insertions(+), 56 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/engine/Engine.java b/server/src/main/java/org/elasticsearch/index/engine/Engine.java index bcab18ba33ea3..4aa49929a252c 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/Engine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/Engine.java @@ -747,7 +747,7 @@ public abstract int estimateNumberOfHistoryOperations(String source, MapperService mapperService, long startingSeqNo) throws IOException; /** - * Checks if this engine has every operations since {@code startingSeqNo}(inclusive) in its translog + * Checks if this engine has every operations since {@code startingSeqNo}(inclusive) in its history (either Lucene or translog) */ public abstract boolean hasCompleteOperationHistory(String source, MapperService mapperService, long startingSeqNo) throws IOException; diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index af0adfdedcf45..d3806b736b629 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -514,10 +514,15 @@ public void syncTranslog() throws IOException { } /** - * Creates a new history snapshot for reading operations since the provided seqno from the translog. + * Creates a new history snapshot for reading operations since the provided seqno. + * The returned snapshot can be retrieved from either Lucene index or translog files. */ @Override public Translog.Snapshot readHistoryOperations(String source, MapperService mapperService, long startingSeqNo) throws IOException { + if (engineConfig.getIndexSettings().isSoftDeleteEnabled()) { + return newChangesSnapshot(source, mapperService, Math.max(0, startingSeqNo), Long.MAX_VALUE, false); + } + return getTranslog().newSnapshotFromMinSeqNo(startingSeqNo); } @@ -525,7 +530,14 @@ public Translog.Snapshot readHistoryOperations(String source, MapperService mapp * Returns the estimated number of history operations whose seq# at least the provided seq# in this engine. */ @Override - public int estimateNumberOfHistoryOperations(String source, MapperService mapperService, long startingSeqNo) { + public int estimateNumberOfHistoryOperations(String source, MapperService mapperService, long startingSeqNo) throws IOException { + if (engineConfig.getIndexSettings().isSoftDeleteEnabled()) { + try (Translog.Snapshot snapshot = newChangesSnapshot(source, mapperService, Math.max(0, startingSeqNo), + Long.MAX_VALUE, false)) { + return snapshot.totalOperations(); + } + } + return getTranslog().estimateTotalOperationsFromMinSeq(startingSeqNo); } @@ -2568,6 +2580,10 @@ public Translog.Snapshot newChangesSnapshot(String source, MapperService mapperS @Override public boolean hasCompleteOperationHistory(String source, MapperService mapperService, long startingSeqNo) throws IOException { + if (engineConfig.getIndexSettings().isSoftDeleteEnabled()) { + return getMinRetainedSeqNo() <= startingSeqNo; + } + final long currentLocalCheckpoint = localCheckpointTracker.getProcessedCheckpoint(); // avoid scanning translog if not necessary if (startingSeqNo > currentLocalCheckpoint) { @@ -2597,15 +2613,7 @@ public final long getMinRetainedSeqNo() { @Override public Closeable acquireRetentionLock() { if (softDeleteEnabled) { - final Releasable softDeletesRetentionLock = softDeletesPolicy.acquireRetentionLock(); - final Closeable translogRetentionLock; - try { - translogRetentionLock = translog.acquireRetentionLock(); - } catch (Exception e) { - softDeletesRetentionLock.close(); - throw e; - } - return () -> IOUtils.close(translogRetentionLock, softDeletesRetentionLock); + return softDeletesPolicy.acquireRetentionLock(); } else { return translog.acquireRetentionLock(); } diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java b/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java index d812dedbc5cf9..31ef193434be9 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java @@ -29,6 +29,7 @@ import org.apache.lucene.store.IndexInput; import org.apache.lucene.store.RateLimiter; import org.apache.lucene.util.ArrayUtil; +import org.apache.lucene.util.SetOnce; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.StepListener; @@ -52,7 +53,8 @@ import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.engine.RecoveryEngineException; -import org.elasticsearch.index.seqno.RetentionLeaseAlreadyExistsException; +import org.elasticsearch.index.seqno.ReplicationTracker; +import org.elasticsearch.index.seqno.RetentionLease; import org.elasticsearch.index.seqno.RetentionLeaseNotFoundException; import org.elasticsearch.index.seqno.RetentionLeases; import org.elasticsearch.index.seqno.SequenceNumbers; @@ -149,6 +151,10 @@ public void recoverToTarget(ActionListener listener) { IOUtils.closeWhileHandlingException(releaseResources, () -> wrappedListener.onFailure(e)); }; + final boolean useRetentionLeases = shard.indexSettings().isSoftDeleteEnabled() + && shard.indexSettings().getIndexMetaData().getState() != IndexMetaData.State.CLOSE; + final SetOnce retentionLeaseRef = new SetOnce<>(); + runUnderPrimaryPermit(() -> { final IndexShardRoutingTable routingTable = shard.getReplicationGroup().getRoutingTable(); ShardRouting targetShardRouting = routingTable.getByAllocationId(request.targetAllocationId()); @@ -158,13 +164,32 @@ public void recoverToTarget(ActionListener listener) { throw new DelayRecoveryException("source node does not have the shard listed in its state as allocated on the node"); } assert targetShardRouting.initializing() : "expected recovery target to be initializing but was " + targetShardRouting; + retentionLeaseRef.set(useRetentionLeases ? shard.getRetentionLeases().get( + ReplicationTracker.getPeerRecoveryRetentionLeaseId(targetShardRouting)) : null); }, shardId + " validating recovery target ["+ request.targetAllocationId() + "] registered ", shard, cancellableThreads, logger); - final Closeable retentionLock = shard.acquireRetentionLock(); - resources.add(retentionLock); + final long startingSeqNo; - final boolean isSequenceNumberBasedRecovery = request.startingSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO && - isTargetSameHistory() && shard.hasCompleteHistoryOperations("peer-recovery", request.startingSeqNo()); + final boolean isSequenceNumberBasedRecovery + = request.startingSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO + && isTargetSameHistory() + && shard.hasCompleteHistoryOperations("peer-recovery", request.startingSeqNo()) + && (useRetentionLeases == false + || (retentionLeaseRef.get() != null && retentionLeaseRef.get().retainingSequenceNumber() <= request.startingSeqNo())); + + final Closeable retentionLock; + if (isSequenceNumberBasedRecovery && useRetentionLeases) { + // all the history we need is retained by an existing retention lease, so we do not need a separate retention lock + retentionLock = () -> {}; + logger.trace("history is retained by {}", retentionLeaseRef.get()); + } else { + // temporarily prevent any history from being discarded, and do this before acquiring the safe commit so that we can + // be certain that all operations after the safe commit's local checkpoint will be retained for the duration of this + // recovery. + retentionLock = shard.acquireRetentionLock(); + resources.add(retentionLock); + logger.trace("history is retained by retention lock"); + } final StepListener sendFileStep = new StepListener<>(); final StepListener establishRetentionLeaseStep = new StepListener<>(); @@ -184,9 +209,22 @@ public void recoverToTarget(ActionListener listener) { } catch (final Exception e) { throw new RecoveryEngineException(shard.shardId(), 1, "snapshot failed", e); } - // We need to set this to 0 to create a translog roughly according to the retention policy on the target. Note that it will - // still filter out legacy operations without seqNo. - startingSeqNo = 0; + + // Try and copy enough operations to the recovering peer so that if it is promoted to primary then it has a chance of being + // able to recover other replicas using operations-based recoveries. If we are not using retention leases then we + // conservatively copy all available operations. If we are using retention leases then "enough operations" is just the + // operations from the local checkpoint of the safe commit onwards, because when using soft deletes the safe commit retains + // at least as much history as anything else. The safe commit will often contain all the history retained by the current set + // of retention leases, but this is not guaranteed: an earlier peer recovery from a different primary might have created a + // retention lease for some history that this primary already discarded, since we discard history when the global checkpoint + // advances and not when creating a new safe commit. In any case this is a best-effort thing since future recoveries can + // always fall back to file-based ones, and only really presents a problem if this primary fails before things have settled + // down. + startingSeqNo = useRetentionLeases + ? Long.parseLong(safeCommitRef.getIndexCommit().getUserData().get(SequenceNumbers.LOCAL_CHECKPOINT_KEY)) + 1L + : 0; + logger.trace("performing file-based recovery followed by history replay starting at [{}]", startingSeqNo); + try { final int estimateNumOps = shard.estimateNumberOfHistoryOperations("peer-recovery", startingSeqNo); shard.store().incRef(); @@ -201,8 +239,7 @@ public void recoverToTarget(ActionListener listener) { }); final StepListener deleteRetentionLeaseStep = new StepListener<>(); - if (shard.indexSettings().isSoftDeleteEnabled() - && shard.indexSettings().getIndexMetaData().getState() != IndexMetaData.State.CLOSE) { + if (useRetentionLeases) { runUnderPrimaryPermit(() -> { try { // If the target previously had a copy of this shard then a file-based recovery might move its global @@ -233,20 +270,19 @@ public void recoverToTarget(ActionListener listener) { assert startingSeqNo >= 0 : "startingSeqNo must be non negative. got: " + startingSeqNo; sendFileStep.whenComplete(r -> { - if (shard.indexSettings().isSoftDeleteEnabled() - && shard.indexSettings().getIndexMetaData().getState() != IndexMetaData.State.CLOSE) { + if (useRetentionLeases && isSequenceNumberBasedRecovery == false) { + // We can in general use retention leases for peer recovery, but there is no lease for the target node right now. runUnderPrimaryPermit(() -> { - try { - // conservative estimate of the GCP for creating the lease. TODO use the actual GCP once it's appropriate - final long globalCheckpoint = startingSeqNo - 1; - // blindly create the lease. TODO integrate this with the recovery process - shard.addPeerRecoveryRetentionLease(request.targetNode().getId(), globalCheckpoint, + // Start the lease off retaining all the history needed from the local checkpoint of the safe commit that we've + // just established on the replica. This primary is certainly retaining such history, but other replicas might + // not be. No big deal if this recovery succeeds, but if this primary fails then the new primary might have to + // repeat phase 1 to recover this replica. + // TODO TBD maybe do this earlier? + final long localCheckpointOfSafeCommit = startingSeqNo - 1; + logger.trace("creating new retention lease at [{}]", localCheckpointOfSafeCommit); + shard.addPeerRecoveryRetentionLease(request.targetNode().getId(), localCheckpointOfSafeCommit, new ThreadedActionListener<>(logger, shard.getThreadPool(), ThreadPool.Names.GENERIC, establishRetentionLeaseStep, false)); - } catch (RetentionLeaseAlreadyExistsException e) { - logger.debug("peer-recovery retention lease already exists", e); - establishRetentionLeaseStep.onResponse(null); - } }, shardId + " establishing retention lease for [" + request.targetAllocationId() + "]", shard, cancellableThreads, logger); } else { @@ -255,6 +291,11 @@ public void recoverToTarget(ActionListener listener) { }, onFailure); establishRetentionLeaseStep.whenComplete(r -> { + if (useRetentionLeases) { + // all the history we need is now retained by a retention lease so we can discard the retention lock + retentionLock.close(); + } + assert Transports.assertNotTransportThread(RecoverySourceHandler.this + "[prepareTargetForTranslog]"); // For a sequence based recovery, the target can keep its local translog prepareTargetForTranslog(isSequenceNumberBasedRecovery == false, @@ -273,14 +314,16 @@ public void recoverToTarget(ActionListener listener) { shardId + " initiating tracking of " + request.targetAllocationId(), shard, cancellableThreads, logger); final long endingSeqNo = shard.seqNoStats().getMaxSeqNo(); - if (logger.isTraceEnabled()) { - logger.trace("snapshot translog for recovery; current size is [{}]", - shard.estimateNumberOfHistoryOperations("peer-recovery", startingSeqNo)); - } + logger.trace("snapshot translog for recovery; current size is [{}]", + shard.estimateNumberOfHistoryOperations("peer-recovery", startingSeqNo)); final Translog.Snapshot phase2Snapshot = shard.getHistoryOperations("peer-recovery", startingSeqNo); resources.add(phase2Snapshot); - // we can release the retention lock here because the snapshot itself will retain the required operations. - retentionLock.close(); + + if (useRetentionLeases == false) { + // we can release the retention lock here because the snapshot itself will retain the required operations. + retentionLock.close(); + } + // we have to capture the max_seen_auto_id_timestamp and the max_seq_no_of_updates to make sure that these values // are at least as high as the corresponding values on the primary when any of these operations were executed on it. final long maxSeenAutoIdTimestamp = shard.getMaxSeenAutoIdTimestamp(); diff --git a/server/src/test/java/org/elasticsearch/gateway/RecoveryFromGatewayIT.java b/server/src/test/java/org/elasticsearch/gateway/RecoveryFromGatewayIT.java index 962788f09d23b..4656c2da54155 100644 --- a/server/src/test/java/org/elasticsearch/gateway/RecoveryFromGatewayIT.java +++ b/server/src/test/java/org/elasticsearch/gateway/RecoveryFromGatewayIT.java @@ -40,6 +40,7 @@ import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.env.NodeEnvironment; import org.elasticsearch.index.Index; +import org.elasticsearch.index.IndexService; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.MergePolicyConfig; import org.elasticsearch.index.engine.Engine; @@ -427,8 +428,12 @@ public void testReuseInFileBasedPeerRecovery() throws Exception { .setSettings(Settings.builder() .put("number_of_shards", 1) .put("number_of_replicas", 1) + // disable merges to keep segments the same - .put(MergePolicyConfig.INDEX_MERGE_ENABLED, "false") + .put(MergePolicyConfig.INDEX_MERGE_ENABLED, false) + + // expire retention leases quickly + .put(IndexService.RETENTION_LEASE_SYNC_INTERVAL_SETTING.getKey(), "100ms") ).get(); logger.info("--> indexing docs"); @@ -472,10 +477,13 @@ public Settings onNodeStopped(String nodeName) throws Exception { .put(IndexSettings.INDEX_TRANSLOG_RETENTION_AGE_SETTING.getKey(), "-1") .put(IndexSettings.INDEX_TRANSLOG_RETENTION_SIZE_SETTING.getKey(), "-1") .put(IndexSettings.INDEX_SOFT_DELETES_RETENTION_OPERATIONS_SETTING.getKey(), 0) + .put(IndexSettings.INDEX_SOFT_DELETES_RETENTION_LEASE_PERIOD_SETTING.getKey(), "0s") ).get(); - client(primaryNode).admin().indices().prepareFlush("test").setForce(true).get(); + assertBusy(() -> assertThat(client().admin().indices().prepareStats("test").get().getShards()[0] + .getRetentionLeaseStats().retentionLeases().leases().size(), equalTo(1))); + client().admin().indices().prepareFlush("test").setForce(true).get(); if (softDeleteEnabled) { // We need an extra flush to advance the min_retained_seqno of the SoftDeletesPolicy - client(primaryNode).admin().indices().prepareFlush("test").setForce(true).get(); + client().admin().indices().prepareFlush("test").setForce(true).get(); } return super.onNodeStopped(nodeName); } diff --git a/server/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java b/server/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java index 9c6340459f5f0..f05ddce567a8a 100644 --- a/server/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java +++ b/server/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java @@ -49,6 +49,7 @@ import org.elasticsearch.index.engine.InternalEngineFactory; import org.elasticsearch.index.engine.InternalEngineTests; import org.elasticsearch.index.mapper.SourceToParse; +import org.elasticsearch.index.seqno.RetentionLease; import org.elasticsearch.index.seqno.RetentionLeases; import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.index.shard.IndexShard; @@ -79,6 +80,7 @@ import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.everyItem; +import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.isIn; @@ -290,6 +292,15 @@ public void testRecoveryAfterPrimaryPromotion() throws Exception { // We need an extra flush to advance the min_retained_seqno on the new primary so ops-based won't happen. // The min_retained_seqno only advances when a merge asks for the retention query. newPrimary.flush(new FlushRequest().force(true)); + + // We also need to make sure that there is no retention lease holding on to any history. The lease for the old primary + // expires since there are no unassigned shards in this replication group). + assertBusy(() -> { + newPrimary.syncRetentionLeases(); + //noinspection OptionalGetWithoutIsPresent since there must be at least one lease + assertThat(newPrimary.getRetentionLeases().leases().stream().mapToLong(RetentionLease::retainingSequenceNumber) + .min().getAsLong(), greaterThan(newPrimary.seqNoStats().getMaxSeqNo())); + }); } uncommittedOpsOnPrimary = shards.indexDocs(randomIntBetween(0, 10)); totalDocs += uncommittedOpsOnPrimary; diff --git a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index ee919250bb838..bfd3f222488dd 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -1556,14 +1556,18 @@ public String[] listAll() throws IOException { public void testRefreshMetric() throws IOException { IndexShard shard = newStartedShard(); - assertThat(shard.refreshStats().getTotal(), equalTo(2L)); // refresh on: finalize and end of recovery + // refresh on: finalize and end of recovery + // finalizing a replica involves two refreshes with soft deletes because of estimateNumberOfHistoryOperations() + final long initialRefreshes = shard.routingEntry().primary() || shard.indexSettings().isSoftDeleteEnabled() == false ? 2L : 3L; + logger.info("--> checking refresh stats for [{}], expecting [{}]", shard.routingEntry(), initialRefreshes); + assertThat(shard.refreshStats().getTotal(), equalTo(initialRefreshes)); long initialTotalTime = shard.refreshStats().getTotalTimeInMillis(); // check time advances for (int i = 1; shard.refreshStats().getTotalTimeInMillis() == initialTotalTime; i++) { indexDoc(shard, "_doc", "test"); - assertThat(shard.refreshStats().getTotal(), equalTo(2L + i - 1)); + assertThat(shard.refreshStats().getTotal(), equalTo(initialRefreshes + i - 1)); shard.refresh("test"); - assertThat(shard.refreshStats().getTotal(), equalTo(2L + i)); + assertThat(shard.refreshStats().getTotal(), equalTo(initialRefreshes + i)); assertThat(shard.refreshStats().getTotalTimeInMillis(), greaterThanOrEqualTo(initialTotalTime)); } long refreshCount = shard.refreshStats().getTotal(); @@ -1590,18 +1594,18 @@ public void testExternalRefreshMetric() throws IOException { assertThat(shard.refreshStats().getExternalTotal(), equalTo(2L + i)); assertThat(shard.refreshStats().getExternalTotalTimeInMillis(), greaterThanOrEqualTo(initialTotalTime)); } - long externalRefreshCount = shard.refreshStats().getExternalTotal(); - + final long externalRefreshCount = shard.refreshStats().getExternalTotal(); + final long extraInternalRefreshes = shard.routingEntry().primary() || shard.indexSettings().isSoftDeleteEnabled() == false ? 0 : 1; indexDoc(shard, "_doc", "test"); try (Engine.GetResult ignored = shard.get(new Engine.Get(true, false, "_doc", "test", new Term(IdFieldMapper.NAME, Uid.encodeId("test"))))) { assertThat(shard.refreshStats().getExternalTotal(), equalTo(externalRefreshCount)); - assertThat(shard.refreshStats().getExternalTotal(), equalTo(shard.refreshStats().getTotal() - 1)); + assertThat(shard.refreshStats().getExternalTotal(), equalTo(shard.refreshStats().getTotal() - 1 - extraInternalRefreshes)); } indexDoc(shard, "_doc", "test"); shard.writeIndexingBuffer(); assertThat(shard.refreshStats().getExternalTotal(), equalTo(externalRefreshCount)); - assertThat(shard.refreshStats().getExternalTotal(), equalTo(shard.refreshStats().getTotal() - 2)); + assertThat(shard.refreshStats().getExternalTotal(), equalTo(shard.refreshStats().getTotal() - 2 - extraInternalRefreshes)); closeShards(shard); } diff --git a/server/src/test/java/org/elasticsearch/index/shard/PrimaryReplicaSyncerTests.java b/server/src/test/java/org/elasticsearch/index/shard/PrimaryReplicaSyncerTests.java index 481aaa233caed..617fffa6d1b16 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/PrimaryReplicaSyncerTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/PrimaryReplicaSyncerTests.java @@ -124,10 +124,16 @@ public void testSyncerSendsOffCorrectDocuments() throws Exception { assertThat(resyncRequest.getMaxSeenAutoIdTimestampOnPrimary(), equalTo(shard.getMaxSeenAutoIdTimestamp())); } if (syncNeeded && globalCheckPoint < numDocs - 1) { - int skippedOps = Math.toIntExact(globalCheckPoint + 1); // everything up to global checkpoint included - assertThat(resyncTask.getSkippedOperations(), equalTo(skippedOps)); - assertThat(resyncTask.getResyncedOperations(), equalTo(numDocs - skippedOps)); - assertThat(resyncTask.getTotalOperations(), equalTo(globalCheckPoint == numDocs - 1 ? 0 : numDocs)); + if (shard.indexSettings.isSoftDeleteEnabled()) { + assertThat(resyncTask.getSkippedOperations(), equalTo(0)); + assertThat(resyncTask.getResyncedOperations(), equalTo(resyncTask.getTotalOperations())); + assertThat(resyncTask.getTotalOperations(), equalTo(Math.toIntExact(numDocs - 1 - globalCheckPoint))); + } else { + int skippedOps = Math.toIntExact(globalCheckPoint + 1); // everything up to global checkpoint included + assertThat(resyncTask.getSkippedOperations(), equalTo(skippedOps)); + assertThat(resyncTask.getResyncedOperations(), equalTo(numDocs - skippedOps)); + assertThat(resyncTask.getTotalOperations(), equalTo(globalCheckPoint == numDocs - 1 ? 0 : numDocs)); + } } else { assertThat(resyncTask.getSkippedOperations(), equalTo(0)); assertThat(resyncTask.getResyncedOperations(), equalTo(0)); diff --git a/server/src/test/java/org/elasticsearch/indices/recovery/IndexRecoveryIT.java b/server/src/test/java/org/elasticsearch/indices/recovery/IndexRecoveryIT.java index 60659b78d98cf..a91aee2ea53f0 100644 --- a/server/src/test/java/org/elasticsearch/indices/recovery/IndexRecoveryIT.java +++ b/server/src/test/java/org/elasticsearch/indices/recovery/IndexRecoveryIT.java @@ -36,15 +36,21 @@ import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.action.support.WriteRequest.RefreshPolicy; +import org.elasticsearch.action.support.replication.ReplicationResponse; import org.elasticsearch.cluster.action.shard.ShardStateAction; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.cluster.routing.IndexShardRoutingTable; import org.elasticsearch.cluster.routing.RecoverySource; import org.elasticsearch.cluster.routing.RecoverySource.PeerRecoverySource; import org.elasticsearch.cluster.routing.RecoverySource.SnapshotRecoverySource; +import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.cluster.routing.UnassignedInfo; import org.elasticsearch.cluster.routing.allocation.command.AllocateEmptyPrimaryAllocationCommand; import org.elasticsearch.cluster.routing.allocation.command.MoveAllocationCommand; import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.Priority; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; @@ -52,12 +58,14 @@ import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.index.Index; import org.elasticsearch.index.IndexService; +import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.MockEngineFactoryPlugin; import org.elasticsearch.index.analysis.AbstractTokenFilterFactory; import org.elasticsearch.index.analysis.TokenFilterFactory; import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.mapper.MapperParsingException; import org.elasticsearch.index.recovery.RecoveryStats; +import org.elasticsearch.index.seqno.ReplicationTracker; import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.ShardId; @@ -1065,6 +1073,139 @@ public void testRecoverLocallyUpToGlobalCheckpoint() throws Exception { } } + public void testUsesFileBasedRecoveryIfRetentionLeaseMissing() throws Exception { + internalCluster().ensureAtLeastNumDataNodes(2); + + String indexName = "test-index"; + createIndex(indexName, Settings.builder() + .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1) + .put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true) + .put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), "12h") + .build()); + indexRandom(randomBoolean(), randomBoolean(), randomBoolean(), IntStream.range(0, between(0, 100)) + .mapToObj(n -> client().prepareIndex(indexName, "_doc").setSource("num", n)).collect(toList())); + ensureGreen(indexName); + + final ShardId shardId = new ShardId(resolveIndex(indexName), 0); + final DiscoveryNodes discoveryNodes = clusterService().state().nodes(); + final IndexShardRoutingTable indexShardRoutingTable = clusterService().state().routingTable().shardRoutingTable(shardId); + + final IndexShard primary = internalCluster().getInstance(IndicesService.class, + discoveryNodes.get(indexShardRoutingTable.primaryShard().currentNodeId()).getName()).getShardOrNull(shardId); + + final ShardRouting replicaShardRouting = indexShardRoutingTable.replicaShards().get(0); + internalCluster().restartNode(discoveryNodes.get(replicaShardRouting.currentNodeId()).getName(), + new InternalTestCluster.RestartCallback() { + @Override + public Settings onNodeStopped(String nodeName) throws Exception { + assertFalse(client().admin().cluster().prepareHealth() + .setWaitForNodes(Integer.toString(discoveryNodes.getSize() - 1)) + .setWaitForEvents(Priority.LANGUID).get().isTimedOut()); + + final PlainActionFuture future = new PlainActionFuture<>(); + primary.removeRetentionLease(ReplicationTracker.getPeerRecoveryRetentionLeaseId(replicaShardRouting), future); + future.get(); + + return super.onNodeStopped(nodeName); + } + }); + + ensureGreen(indexName); + + //noinspection OptionalGetWithoutIsPresent because it fails the test if absent + final RecoveryState recoveryState = client().admin().indices().prepareRecoveries(indexName).get() + .shardRecoveryStates().get(indexName).stream().filter(rs -> rs.getPrimary() == false).findFirst().get(); + assertThat(recoveryState.getIndex().totalFileCount(), greaterThan(0)); + } + + public void testUsesFileBasedRecoveryIfRetentionLeaseAheadOfGlobalCheckpoint() throws Exception { + internalCluster().ensureAtLeastNumDataNodes(2); + + String indexName = "test-index"; + createIndex(indexName, Settings.builder() + .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1) + .put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true) + .put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), "12h") + .build()); + indexRandom(randomBoolean(), randomBoolean(), randomBoolean(), IntStream.range(0, between(0, 100)) + .mapToObj(n -> client().prepareIndex(indexName, "_doc").setSource("num", n)).collect(toList())); + ensureGreen(indexName); + + final ShardId shardId = new ShardId(resolveIndex(indexName), 0); + final DiscoveryNodes discoveryNodes = clusterService().state().nodes(); + final IndexShardRoutingTable indexShardRoutingTable = clusterService().state().routingTable().shardRoutingTable(shardId); + + final IndexShard primary = internalCluster().getInstance(IndicesService.class, + discoveryNodes.get(indexShardRoutingTable.primaryShard().currentNodeId()).getName()).getShardOrNull(shardId); + + final ShardRouting replicaShardRouting = indexShardRoutingTable.replicaShards().get(0); + internalCluster().restartNode(discoveryNodes.get(replicaShardRouting.currentNodeId()).getName(), + new InternalTestCluster.RestartCallback() { + @Override + public Settings onNodeStopped(String nodeName) throws Exception { + assertFalse(client().admin().cluster().prepareHealth() + .setWaitForNodes(Integer.toString(discoveryNodes.getSize() - 1)) + .setWaitForEvents(Priority.LANGUID).get().isTimedOut()); + + indexRandom(randomBoolean(), randomBoolean(), randomBoolean(), IntStream.range(0, between(1, 100)) + .mapToObj(n -> client().prepareIndex(indexName, "_doc").setSource("num", n)).collect(toList())); + + // We do not guarantee that the replica can recover locally all the way to its own global checkpoint before starting + // to recover from the primary, so we must be careful not to perform an operations-based recovery if this would require + // some operations that are not being retained. Emulate this by advancing the lease ahead of the replica's GCP: + primary.renewRetentionLease(ReplicationTracker.getPeerRecoveryRetentionLeaseId(replicaShardRouting), + primary.seqNoStats().getMaxSeqNo() + 1, ReplicationTracker.PEER_RECOVERY_RETENTION_LEASE_SOURCE); + + return super.onNodeStopped(nodeName); + } + }); + + ensureGreen(indexName); + + //noinspection OptionalGetWithoutIsPresent because it fails the test if absent + final RecoveryState recoveryState = client().admin().indices().prepareRecoveries(indexName).get() + .shardRecoveryStates().get(indexName).stream().filter(rs -> rs.getPrimary() == false).findFirst().get(); + assertThat(recoveryState.getIndex().totalFileCount(), greaterThan(0)); + } + + public void testDoesNotCopyOperationsInSafeCommit() throws Exception { + internalCluster().ensureAtLeastNumDataNodes(2); + + String indexName = "test-index"; + createIndex(indexName, Settings.builder() + .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0) + .put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true).build()); + indexRandom(randomBoolean(), randomBoolean(), randomBoolean(), IntStream.range(0, between(0, 100)) + .mapToObj(n -> client().prepareIndex(indexName, "_doc").setSource("num", n)).collect(toList())); + + final ShardId shardId = new ShardId(resolveIndex(indexName), 0); + final DiscoveryNodes discoveryNodes = clusterService().state().nodes(); + final IndexShardRoutingTable indexShardRoutingTable = clusterService().state().routingTable().shardRoutingTable(shardId); + + final IndexShard primary = internalCluster().getInstance(IndicesService.class, + discoveryNodes.get(indexShardRoutingTable.primaryShard().currentNodeId()).getName()).getShardOrNull(shardId); + final long maxSeqNoBeforeRecovery = primary.seqNoStats().getMaxSeqNo(); + assertBusy(() -> assertThat(primary.getLastSyncedGlobalCheckpoint(), equalTo(maxSeqNoBeforeRecovery))); + assertThat(client().admin().indices().prepareFlush(indexName).get().getFailedShards(), is(0)); // makes a safe commit + + indexRandom(randomBoolean(), randomBoolean(), randomBoolean(), IntStream.range(0, between(0, 100)) + .mapToObj(n -> client().prepareIndex(indexName, "_doc").setSource("num", n)).collect(toList())); + + assertAcked(client().admin().indices().prepareUpdateSettings(indexName) + .setSettings(Settings.builder().put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1))); + ensureGreen(indexName); + final long maxSeqNoAfterRecovery = primary.seqNoStats().getMaxSeqNo(); + + //noinspection OptionalGetWithoutIsPresent because it fails the test if absent + final RecoveryState recoveryState = client().admin().indices().prepareRecoveries(indexName).get() + .shardRecoveryStates().get(indexName).stream().filter(rs -> rs.getPrimary() == false).findFirst().get(); + assertThat((long)recoveryState.getTranslog().recoveredOperations(), + lessThanOrEqualTo(maxSeqNoAfterRecovery - maxSeqNoBeforeRecovery)); + } + public static final class TestAnalysisPlugin extends Plugin implements AnalysisPlugin { final AtomicBoolean throwParsingError = new AtomicBoolean(); @Override diff --git a/server/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java b/server/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java index f6e1de0233bf7..af67937f2eeba 100644 --- a/server/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java +++ b/server/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java @@ -55,6 +55,7 @@ import org.elasticsearch.common.util.CancellableThreads; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.core.internal.io.IOUtils; +import org.elasticsearch.index.IndexService; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.engine.SegmentsStats; @@ -78,6 +79,7 @@ import org.elasticsearch.test.DummyShardLock; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.IndexSettingsModule; +import org.elasticsearch.test.VersionUtils; import org.elasticsearch.threadpool.FixedExecutorBuilder; import org.elasticsearch.threadpool.TestThreadPool; import org.elasticsearch.threadpool.ThreadPool; @@ -441,6 +443,18 @@ public void testThrowExceptionOnPrimaryRelocatedBeforePhase1Started() throws IOE ((ActionListener)invocation.getArguments()[0]).onResponse(() -> {}); return null; }).when(shard).acquirePrimaryOperationPermit(any(), anyString(), anyObject()); + + final IndexMetaData.Builder indexMetaData = IndexMetaData.builder("test").settings(Settings.builder() + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, between(0,5)) + .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, between(1,5)) + .put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), randomBoolean()) + .put(IndexMetaData.SETTING_VERSION_CREATED, VersionUtils.randomVersion(random())) + .put(IndexMetaData.SETTING_INDEX_UUID, UUIDs.randomBase64UUID(random()))); + if (randomBoolean()) { + indexMetaData.state(IndexMetaData.State.CLOSE); + } + when(shard.indexSettings()).thenReturn(new IndexSettings(indexMetaData.build(), Settings.EMPTY)); + final AtomicBoolean phase1Called = new AtomicBoolean(); final AtomicBoolean prepareTargetForTranslogCalled = new AtomicBoolean(); final AtomicBoolean phase2Called = new AtomicBoolean(); diff --git a/server/src/test/java/org/elasticsearch/indices/recovery/RecoveryTests.java b/server/src/test/java/org/elasticsearch/indices/recovery/RecoveryTests.java index 28e84c1210a29..c031f36f33601 100644 --- a/server/src/test/java/org/elasticsearch/indices/recovery/RecoveryTests.java +++ b/server/src/test/java/org/elasticsearch/indices/recovery/RecoveryTests.java @@ -70,7 +70,8 @@ public void testTranslogHistoryTransferred() throws Exception { shards.addReplica(); shards.startAll(); final IndexShard replica = shards.getReplicas().get(0); - assertThat(getTranslog(replica).totalOperations(), equalTo(docs + moreDocs)); + boolean softDeletesEnabled = replica.indexSettings().isSoftDeleteEnabled(); + assertThat(getTranslog(replica).totalOperations(), equalTo(softDeletesEnabled ? moreDocs : docs + moreDocs)); shards.assertAllEqual(docs + moreDocs); } } @@ -285,7 +286,8 @@ public void testDifferentHistoryUUIDDisablesOPsRecovery() throws Exception { shards.recoverReplica(newReplica); // file based recovery should be made assertThat(newReplica.recoveryState().getIndex().fileDetails(), not(empty())); - assertThat(getTranslog(newReplica).totalOperations(), equalTo(numDocs)); + boolean softDeletesEnabled = replica.indexSettings().isSoftDeleteEnabled(); + assertThat(getTranslog(newReplica).totalOperations(), equalTo(softDeletesEnabled ? nonFlushedDocs : numDocs)); // history uuid was restored assertThat(newReplica.getHistoryUUID(), equalTo(historyUUID)); @@ -401,7 +403,8 @@ public void testShouldFlushAfterPeerRecovery() throws Exception { shards.recoverReplica(replica); // Make sure the flushing will eventually be completed (eg. `shouldPeriodicallyFlush` is false) assertBusy(() -> assertThat(getEngine(replica).shouldPeriodicallyFlush(), equalTo(false))); - assertThat(getTranslog(replica).totalOperations(), equalTo(numDocs)); + boolean softDeletesEnabled = replica.indexSettings().isSoftDeleteEnabled(); + assertThat(getTranslog(replica).totalOperations(), equalTo(softDeletesEnabled ? 0 : numDocs)); shards.assertAllEqual(numDocs); } } From 4d53ec9eb3246a4eae1fbd0858c20ba834a71c8e Mon Sep 17 00:00:00 2001 From: David Turner Date: Thu, 25 Jul 2019 13:15:03 +0100 Subject: [PATCH 02/14] Remove unnecessary extra logging --- .../test/java/org/elasticsearch/index/shard/IndexShardTests.java | 1 - 1 file changed, 1 deletion(-) diff --git a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index bfd3f222488dd..2c847cf2ce558 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -1559,7 +1559,6 @@ public void testRefreshMetric() throws IOException { // refresh on: finalize and end of recovery // finalizing a replica involves two refreshes with soft deletes because of estimateNumberOfHistoryOperations() final long initialRefreshes = shard.routingEntry().primary() || shard.indexSettings().isSoftDeleteEnabled() == false ? 2L : 3L; - logger.info("--> checking refresh stats for [{}], expecting [{}]", shard.routingEntry(), initialRefreshes); assertThat(shard.refreshStats().getTotal(), equalTo(initialRefreshes)); long initialTotalTime = shard.refreshStats().getTotalTimeInMillis(); // check time advances From 56c24668165e0c02f2bc48ad40a34c885f832632 Mon Sep 17 00:00:00 2001 From: David Turner Date: Thu, 25 Jul 2019 13:25:32 +0100 Subject: [PATCH 03/14] Precommit --- .../indices/recovery/RecoverySourceHandlerTests.java | 1 - 1 file changed, 1 deletion(-) diff --git a/server/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java b/server/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java index af67937f2eeba..38d4b6220dd3d 100644 --- a/server/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java +++ b/server/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java @@ -55,7 +55,6 @@ import org.elasticsearch.common.util.CancellableThreads; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.core.internal.io.IOUtils; -import org.elasticsearch.index.IndexService; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.engine.SegmentsStats; From 190649d59bcf651b08b16f81833d7b140f8a8330 Mon Sep 17 00:00:00 2001 From: David Turner Date: Sat, 27 Jul 2019 06:55:26 +0100 Subject: [PATCH 04/14] Obtain retention lock earlier --- .../recovery/RecoverySourceHandler.java | 18 ++++++++---------- 1 file changed, 8 insertions(+), 10 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java b/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java index 31ef193434be9..cf92ec812c4b0 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java @@ -168,7 +168,8 @@ public void recoverToTarget(ActionListener listener) { ReplicationTracker.getPeerRecoveryRetentionLeaseId(targetShardRouting)) : null); }, shardId + " validating recovery target ["+ request.targetAllocationId() + "] registered ", shard, cancellableThreads, logger); - + final Closeable retentionLock = shard.acquireRetentionLock(); + resources.add(retentionLock); final long startingSeqNo; final boolean isSequenceNumberBasedRecovery = request.startingSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO @@ -177,17 +178,14 @@ && isTargetSameHistory() && (useRetentionLeases == false || (retentionLeaseRef.get() != null && retentionLeaseRef.get().retainingSequenceNumber() <= request.startingSeqNo())); - final Closeable retentionLock; if (isSequenceNumberBasedRecovery && useRetentionLeases) { // all the history we need is retained by an existing retention lease, so we do not need a separate retention lock - retentionLock = () -> {}; + retentionLock.close(); logger.trace("history is retained by {}", retentionLeaseRef.get()); } else { - // temporarily prevent any history from being discarded, and do this before acquiring the safe commit so that we can - // be certain that all operations after the safe commit's local checkpoint will be retained for the duration of this - // recovery. - retentionLock = shard.acquireRetentionLock(); - resources.add(retentionLock); + // all the history we need is retained by the retention lock, obtained before calling shard.hasCompleteHistoryOperations() + // and before acquiring the safe commit we'll be using, so we can be certain that all operations after the safe commit's + // local checkpoint will be retained for the duration of this recovery. logger.trace("history is retained by retention lock"); } @@ -291,8 +289,8 @@ && isTargetSameHistory() }, onFailure); establishRetentionLeaseStep.whenComplete(r -> { - if (useRetentionLeases) { - // all the history we need is now retained by a retention lease so we can discard the retention lock + if (useRetentionLeases && isSequenceNumberBasedRecovery == false) { + // all the history we need is now retained by a retention lease so we can close the retention lock retentionLock.close(); } From 7222857d4cf744f3338396088f6c31b0186e3931 Mon Sep 17 00:00:00 2001 From: David Turner Date: Sat, 27 Jul 2019 07:38:30 +0100 Subject: [PATCH 05/14] TBD discussed --- .../elasticsearch/indices/recovery/RecoverySourceHandler.java | 1 - 1 file changed, 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java b/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java index cf92ec812c4b0..4af451e241e74 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java @@ -275,7 +275,6 @@ && isTargetSameHistory() // just established on the replica. This primary is certainly retaining such history, but other replicas might // not be. No big deal if this recovery succeeds, but if this primary fails then the new primary might have to // repeat phase 1 to recover this replica. - // TODO TBD maybe do this earlier? final long localCheckpointOfSafeCommit = startingSeqNo - 1; logger.trace("creating new retention lease at [{}]", localCheckpointOfSafeCommit); shard.addPeerRecoveryRetentionLease(request.targetNode().getId(), localCheckpointOfSafeCommit, From 94dd36b4284d48da07c86af85acbfdefeee0cdc5 Mon Sep 17 00:00:00 2001 From: David Turner Date: Mon, 29 Jul 2019 08:49:31 +0100 Subject: [PATCH 06/14] Close retention lock while establishing lease --- .../indices/recovery/RecoverySourceHandler.java | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java b/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java index 4af451e241e74..03018e5da087d 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java @@ -282,17 +282,14 @@ && isTargetSameHistory() ThreadPool.Names.GENERIC, establishRetentionLeaseStep, false)); }, shardId + " establishing retention lease for [" + request.targetAllocationId() + "]", shard, cancellableThreads, logger); + // all the history we need is now retained by a retention lease so we can close the retention lock + retentionLock.close(); } else { establishRetentionLeaseStep.onResponse(null); } }, onFailure); establishRetentionLeaseStep.whenComplete(r -> { - if (useRetentionLeases && isSequenceNumberBasedRecovery == false) { - // all the history we need is now retained by a retention lease so we can close the retention lock - retentionLock.close(); - } - assert Transports.assertNotTransportThread(RecoverySourceHandler.this + "[prepareTargetForTranslog]"); // For a sequence based recovery, the target can keep its local translog prepareTargetForTranslog(isSequenceNumberBasedRecovery == false, From 5fb8bda58b7ee62a8865ece9b1f1160142fd65c9 Mon Sep 17 00:00:00 2001 From: David Turner Date: Mon, 29 Jul 2019 10:57:53 +0100 Subject: [PATCH 07/14] Create new PRRL using global checkpoint By cloning the primary's lease we need not worry about creating a lease that retains history which has already been discarded. --- .../index/seqno/ReplicationTracker.java | 35 +++++++ .../elasticsearch/index/shard/IndexShard.java | 8 ++ .../recovery/RecoverySourceHandler.java | 43 +++++--- ...ReplicationTrackerRetentionLeaseTests.java | 99 +++++++++++++++++++ 4 files changed, 173 insertions(+), 12 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java b/server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java index 715b25a5175fa..f42b112d2cf93 100644 --- a/server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java +++ b/server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java @@ -290,6 +290,35 @@ public RetentionLease addRetentionLease( return retentionLease; } + /** + * Atomically clones an existing retention lease to a new ID. + * + * @param sourceLeaseId the identifier of the source retention lease + * @param targetLeaseId the identifier of the retention lease to create + * @param listener the callback when the retention lease is successfully added and synced to replicas + * @return the new retention lease + * @throws RetentionLeaseNotFoundException if the specified source retention lease does not exist + * @throws RetentionLeaseAlreadyExistsException if the specified target retention lease already exists + */ + RetentionLease cloneRetentionLease(String sourceLeaseId, String targetLeaseId, ActionListener listener) { + Objects.requireNonNull(listener); + final RetentionLease retentionLease; + final RetentionLeases currentRetentionLeases; + synchronized (this) { + if (getRetentionLeases().contains(sourceLeaseId) == false) { + throw new RetentionLeaseNotFoundException(sourceLeaseId); + } + final RetentionLease sourceLease = getRetentionLeases().get(sourceLeaseId); + retentionLease = innerAddRetentionLease(targetLeaseId, sourceLease.retainingSequenceNumber(), sourceLease.source()); + currentRetentionLeases = retentionLeases; + } + + // Syncing here may not be strictly necessary, because this new lease isn't retaining any extra history that wasn't previously + // retained by the source lease; however we prefer to sync anyway since we expect to do so whenever creating a new lease. + onSyncRetentionLeases.accept(currentRetentionLeases, listener); + return retentionLease; + } + /** * Adds a new retention lease, but does not synchronise it with the rest of the replication group. * @@ -446,6 +475,12 @@ public void addPeerRecoveryRetentionLease(String nodeId, long globalCheckpoint, addRetentionLease(getPeerRecoveryRetentionLeaseId(nodeId), globalCheckpoint + 1, PEER_RECOVERY_RETENTION_LEASE_SOURCE, listener); } + public RetentionLease cloneLocalPeerRecoveryRetentionLease(String nodeId, ActionListener listener) { + return cloneRetentionLease( + getPeerRecoveryRetentionLeaseId(getPeerRecoveryRetentionLeaseId(routingTable.primaryShard())), + getPeerRecoveryRetentionLeaseId(nodeId), listener); + } + public void removePeerRecoveryRetentionLease(String nodeId, ActionListener listener) { removeRetentionLease(getPeerRecoveryRetentionLeaseId(nodeId), listener); } diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 3ed38cb7aae9f..b5dfef9b175b5 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -40,6 +40,7 @@ import org.elasticsearch.Assertions; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ExceptionsHelper; +import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.admin.indices.flush.FlushRequest; import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeRequest; @@ -2591,9 +2592,16 @@ public boolean isRelocatedPrimary() { public void addPeerRecoveryRetentionLease(String nodeId, long globalCheckpoint, ActionListener listener) { assert assertPrimaryMode(); + // only needed for BWC reasons involving rolling upgrades from versions that do not support PRRLs: + assert indexSettings.getIndexVersionCreated().before(Version.V_7_4_0); replicationTracker.addPeerRecoveryRetentionLease(nodeId, globalCheckpoint, listener); } + public RetentionLease cloneLocalPeerRecoveryRetentionLease(String nodeId, ActionListener listener) { + assert assertPrimaryMode(); + return replicationTracker.cloneLocalPeerRecoveryRetentionLease(nodeId, listener); + } + public void removePeerRecoveryRetentionLease(String nodeId, ActionListener listener) { assert assertPrimaryMode(); replicationTracker.removePeerRecoveryRetentionLease(nodeId, listener); diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java b/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java index 03018e5da087d..23350cf2b1411 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java @@ -31,6 +31,7 @@ import org.apache.lucene.util.ArrayUtil; import org.apache.lucene.util.SetOnce; import org.elasticsearch.ExceptionsHelper; +import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.StepListener; import org.elasticsearch.action.support.ThreadedActionListener; @@ -177,6 +178,11 @@ && isTargetSameHistory() && shard.hasCompleteHistoryOperations("peer-recovery", request.startingSeqNo()) && (useRetentionLeases == false || (retentionLeaseRef.get() != null && retentionLeaseRef.get().retainingSequenceNumber() <= request.startingSeqNo())); + // NB check hasCompleteHistoryOperations when computing isSequenceNumberBasedRecovery, even if there is a retention lease, + // because when doing a rolling upgrade from earlier than 7.4 we may create some leases that are initially unsatisfied. It's + // possible there are other cases where we cannot satisfy all leases, because that's not a property we currently expect to hold. + // Also it's pretty cheap when soft deletes are enabled, and it'd be a disaster if we tried a sequence-number-based recovery + // without having a complete history. if (isSequenceNumberBasedRecovery && useRetentionLeases) { // all the history we need is retained by an existing retention lease, so we do not need a separate retention lock @@ -271,19 +277,32 @@ && isTargetSameHistory() if (useRetentionLeases && isSequenceNumberBasedRecovery == false) { // We can in general use retention leases for peer recovery, but there is no lease for the target node right now. runUnderPrimaryPermit(() -> { - // Start the lease off retaining all the history needed from the local checkpoint of the safe commit that we've - // just established on the replica. This primary is certainly retaining such history, but other replicas might - // not be. No big deal if this recovery succeeds, but if this primary fails then the new primary might have to - // repeat phase 1 to recover this replica. - final long localCheckpointOfSafeCommit = startingSeqNo - 1; - logger.trace("creating new retention lease at [{}]", localCheckpointOfSafeCommit); - shard.addPeerRecoveryRetentionLease(request.targetNode().getId(), localCheckpointOfSafeCommit, - new ThreadedActionListener<>(logger, shard.getThreadPool(), - ThreadPool.Names.GENERIC, establishRetentionLeaseStep, false)); + // Clone the peer recovery retention lease belonging to the source shard. We are retaining history between the + // the local checkpoint of the safe commit we're creating and this lease's retained seqno with the retention + // lock, and by cloning an existing lease we (approximately) know that all our peers are also retaining history + // as requested by the cloned lease. If the recovery now fails before copying enough history over then a + // subsequent attempt will find this lease, determine it is not enough, and fall back to a file-based recovery. + // + // (approximately) because we do not guarantee to be able to satisfy every lease on every peer. + logger.trace("cloning primary's retention lease"); + try { + final RetentionLease clonedLease = shard.cloneLocalPeerRecoveryRetentionLease(request.targetNode().getId(), + new ThreadedActionListener<>(logger, shard.getThreadPool(), + ThreadPool.Names.GENERIC, establishRetentionLeaseStep, false)); + logger.trace("cloned primary's retention lease as [{}]", clonedLease); + } catch (RetentionLeaseNotFoundException e) { + // it's possible that the primary has no retention lease yet if we are doing a rolling upgrade from a + // version before 7.4, and in that case we just create a lease using the local checkpoint of the safe commit + // which we're using for recovery as a conservative estimate for the global checkpoint. + assert shard.indexSettings().getIndexVersionCreated().before(Version.V_7_4_0); + final long estimatedGlobalCheckpoint = startingSeqNo - 1; + shard.addPeerRecoveryRetentionLease(request.targetNode().getId(), + estimatedGlobalCheckpoint, new ThreadedActionListener<>(logger, shard.getThreadPool(), + ThreadPool.Names.GENERIC, establishRetentionLeaseStep, false)); + logger.trace("created retention lease with estimated checkpoint of [{}]", estimatedGlobalCheckpoint); + } }, shardId + " establishing retention lease for [" + request.targetAllocationId() + "]", shard, cancellableThreads, logger); - // all the history we need is now retained by a retention lease so we can close the retention lock - retentionLock.close(); } else { establishRetentionLeaseStep.onResponse(null); } @@ -313,7 +332,7 @@ && isTargetSameHistory() final Translog.Snapshot phase2Snapshot = shard.getHistoryOperations("peer-recovery", startingSeqNo); resources.add(phase2Snapshot); - if (useRetentionLeases == false) { + if (useRetentionLeases == false || isSequenceNumberBasedRecovery == false) { // we can release the retention lock here because the snapshot itself will retain the required operations. retentionLock.close(); } diff --git a/server/src/test/java/org/elasticsearch/index/seqno/ReplicationTrackerRetentionLeaseTests.java b/server/src/test/java/org/elasticsearch/index/seqno/ReplicationTrackerRetentionLeaseTests.java index 393ff44ef5c66..7611fad5a7e43 100644 --- a/server/src/test/java/org/elasticsearch/index/seqno/ReplicationTrackerRetentionLeaseTests.java +++ b/server/src/test/java/org/elasticsearch/index/seqno/ReplicationTrackerRetentionLeaseTests.java @@ -20,6 +20,8 @@ package org.elasticsearch.index.seqno; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.support.PlainActionFuture; +import org.elasticsearch.action.support.replication.ReplicationResponse; import org.elasticsearch.cluster.routing.AllocationId; import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.settings.Settings; @@ -247,6 +249,103 @@ public void testRemoveRetentionLease() { } } + public void testCloneRetentionLease() { + final AllocationId allocationId = AllocationId.newInitializing(); + final AtomicReference replicationTrackerRef = new AtomicReference<>(); + final AtomicLong timeReference = new AtomicLong(); + final AtomicBoolean synced = new AtomicBoolean(); + final ReplicationTracker replicationTracker = new ReplicationTracker( + new ShardId("test", "_na", 0), + allocationId.getId(), + IndexSettingsModule.newIndexSettings("test", Settings.EMPTY), + randomLongBetween(1, Long.MAX_VALUE), + UNASSIGNED_SEQ_NO, + value -> {}, + timeReference::get, + (leases, listener) -> { + assertFalse(Thread.holdsLock(replicationTrackerRef.get())); + assertTrue(synced.compareAndSet(false, true)); + listener.onResponse(new ReplicationResponse()); + }); + replicationTrackerRef.set(replicationTracker); + replicationTracker.updateFromMaster( + randomNonNegativeLong(), + Collections.singleton(allocationId.getId()), + routingTable(Collections.emptySet(), allocationId)); + replicationTracker.activatePrimaryMode(SequenceNumbers.NO_OPS_PERFORMED); + + final long addTime = randomLongBetween(timeReference.get(), Long.MAX_VALUE); + timeReference.set(addTime); + final long minimumRetainingSequenceNumber = randomLongBetween(SequenceNumbers.NO_OPS_PERFORMED, Long.MAX_VALUE); + final PlainActionFuture addFuture = new PlainActionFuture<>(); + replicationTracker.addRetentionLease("source", minimumRetainingSequenceNumber, "test-source", addFuture); + addFuture.actionGet(); + assertTrue(synced.get()); + synced.set(false); + + final long cloneTime = randomLongBetween(timeReference.get(), Long.MAX_VALUE); + timeReference.set(cloneTime); + final PlainActionFuture cloneFuture = new PlainActionFuture<>(); + final RetentionLease clonedLease = replicationTracker.cloneRetentionLease("source", "target", cloneFuture); + cloneFuture.actionGet(); + assertTrue(synced.get()); + synced.set(false); + + assertThat(clonedLease.id(), equalTo("target")); + assertThat(clonedLease.retainingSequenceNumber(), equalTo(minimumRetainingSequenceNumber)); + assertThat(clonedLease.timestamp(), equalTo(cloneTime)); + assertThat(clonedLease.source(), equalTo("test-source")); + + assertThat(replicationTracker.getRetentionLeases().get("target"), equalTo(clonedLease)); + } + + public void testCloneNonexistentRetentionLease() { + final AllocationId allocationId = AllocationId.newInitializing(); + final ReplicationTracker replicationTracker = new ReplicationTracker( + new ShardId("test", "_na", 0), + allocationId.getId(), + IndexSettingsModule.newIndexSettings("test", Settings.EMPTY), + randomLongBetween(1, Long.MAX_VALUE), + UNASSIGNED_SEQ_NO, + value -> {}, + () -> 0L, + (leases, listener) -> { }); + replicationTracker.updateFromMaster( + randomNonNegativeLong(), + Collections.singleton(allocationId.getId()), + routingTable(Collections.emptySet(), allocationId)); + replicationTracker.activatePrimaryMode(SequenceNumbers.NO_OPS_PERFORMED); + + assertThat(expectThrows(RetentionLeaseNotFoundException.class, + () -> replicationTracker.cloneRetentionLease("nonexistent-lease-id", "target", ActionListener.wrap(() -> {}))).getMessage(), + equalTo("retention lease with ID [nonexistent-lease-id] not found")); + } + + public void testCloneDuplicateRetentionLease() { + final AllocationId allocationId = AllocationId.newInitializing(); + final ReplicationTracker replicationTracker = new ReplicationTracker( + new ShardId("test", "_na", 0), + allocationId.getId(), + IndexSettingsModule.newIndexSettings("test", Settings.EMPTY), + randomLongBetween(1, Long.MAX_VALUE), + UNASSIGNED_SEQ_NO, + value -> {}, + () -> 0L, + (leases, listener) -> { }); + replicationTracker.updateFromMaster( + randomNonNegativeLong(), + Collections.singleton(allocationId.getId()), + routingTable(Collections.emptySet(), allocationId)); + replicationTracker.activatePrimaryMode(SequenceNumbers.NO_OPS_PERFORMED); + + replicationTracker.addRetentionLease("source", randomLongBetween(0L, Long.MAX_VALUE), "test-source", ActionListener.wrap(() -> {})); + replicationTracker.addRetentionLease("exists", randomLongBetween(0L, Long.MAX_VALUE), "test-source", ActionListener.wrap(() -> {})); + + assertThat(expectThrows(RetentionLeaseAlreadyExistsException.class, + () -> replicationTracker.cloneRetentionLease("source", "exists", ActionListener.wrap(() -> {}))).getMessage(), + equalTo("retention lease with ID [exists] already exists")); + } + public void testRemoveNotFound() { final AllocationId allocationId = AllocationId.newInitializing(); long primaryTerm = randomLongBetween(1, Long.MAX_VALUE); From 9e1017159c202fc01c16931dbede3dee717abfae Mon Sep 17 00:00:00 2001 From: David Turner Date: Mon, 29 Jul 2019 18:33:57 +0100 Subject: [PATCH 08/14] orly --- .../java/org/elasticsearch/index/seqno/ReplicationTracker.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java b/server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java index f42b112d2cf93..411421c371a9f 100644 --- a/server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java +++ b/server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java @@ -477,7 +477,7 @@ public void addPeerRecoveryRetentionLease(String nodeId, long globalCheckpoint, public RetentionLease cloneLocalPeerRecoveryRetentionLease(String nodeId, ActionListener listener) { return cloneRetentionLease( - getPeerRecoveryRetentionLeaseId(getPeerRecoveryRetentionLeaseId(routingTable.primaryShard())), + getPeerRecoveryRetentionLeaseId(routingTable.primaryShard()), getPeerRecoveryRetentionLeaseId(nodeId), listener); } From bc5149a634dc37ae7d3b9ec5aa43d4373aca5494 Mon Sep 17 00:00:00 2001 From: David Turner Date: Tue, 30 Jul 2019 15:29:18 +0100 Subject: [PATCH 09/14] Assert primary mode --- .../java/org/elasticsearch/index/seqno/ReplicationTracker.java | 1 + 1 file changed, 1 insertion(+) diff --git a/server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java b/server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java index 411421c371a9f..91d00396db3bb 100644 --- a/server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java +++ b/server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java @@ -305,6 +305,7 @@ RetentionLease cloneRetentionLease(String sourceLeaseId, String targetLeaseId, A final RetentionLease retentionLease; final RetentionLeases currentRetentionLeases; synchronized (this) { + assert primaryMode; if (getRetentionLeases().contains(sourceLeaseId) == false) { throw new RetentionLeaseNotFoundException(sourceLeaseId); } From 14fde1c2a755978f8da764aa5f50585575baef46 Mon Sep 17 00:00:00 2001 From: David Turner Date: Wed, 31 Jul 2019 08:51:37 +0100 Subject: [PATCH 10/14] Base initial GCP on the cloned retention lease --- .../recovery/RecoverySourceHandler.java | 95 +++++++++++-------- .../recovery/RecoverySourceHandlerTests.java | 10 +- 2 files changed, 63 insertions(+), 42 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java b/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java index 23350cf2b1411..d0faa80cde66a 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java @@ -78,6 +78,7 @@ import java.util.Comparator; import java.util.List; import java.util.Locale; +import java.util.OptionalLong; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.atomic.AtomicInteger; @@ -196,7 +197,6 @@ && isTargetSameHistory() } final StepListener sendFileStep = new StepListener<>(); - final StepListener establishRetentionLeaseStep = new StepListener<>(); final StepListener prepareEngineStep = new StepListener<>(); final StepListener sendSnapshotStep = new StepListener<>(); final StepListener finalizeStep = new StepListener<>(); @@ -264,7 +264,16 @@ && isTargetSameHistory() deleteRetentionLeaseStep.whenComplete(ignored -> { assert Transports.assertNotTransportThread(RecoverySourceHandler.this + "[phase1]"); - phase1(safeCommitRef.getIndexCommit(), shard.getLastKnownGlobalCheckpoint(), () -> estimateNumOps, sendFileStep); + + final Consumer> getGlobalCheckpoint; + if (useRetentionLeases) { + getGlobalCheckpoint = l -> createRetentionLease(startingSeqNo, l); + } else { + final long globalCheckpoint = shard.getLastKnownGlobalCheckpoint(); + getGlobalCheckpoint = l -> l.onResponse(globalCheckpoint); + } + + phase1(safeCommitRef.getIndexCommit(), getGlobalCheckpoint, () -> estimateNumOps, sendFileStep); }, onFailure); } catch (final Exception e) { @@ -274,41 +283,6 @@ && isTargetSameHistory() assert startingSeqNo >= 0 : "startingSeqNo must be non negative. got: " + startingSeqNo; sendFileStep.whenComplete(r -> { - if (useRetentionLeases && isSequenceNumberBasedRecovery == false) { - // We can in general use retention leases for peer recovery, but there is no lease for the target node right now. - runUnderPrimaryPermit(() -> { - // Clone the peer recovery retention lease belonging to the source shard. We are retaining history between the - // the local checkpoint of the safe commit we're creating and this lease's retained seqno with the retention - // lock, and by cloning an existing lease we (approximately) know that all our peers are also retaining history - // as requested by the cloned lease. If the recovery now fails before copying enough history over then a - // subsequent attempt will find this lease, determine it is not enough, and fall back to a file-based recovery. - // - // (approximately) because we do not guarantee to be able to satisfy every lease on every peer. - logger.trace("cloning primary's retention lease"); - try { - final RetentionLease clonedLease = shard.cloneLocalPeerRecoveryRetentionLease(request.targetNode().getId(), - new ThreadedActionListener<>(logger, shard.getThreadPool(), - ThreadPool.Names.GENERIC, establishRetentionLeaseStep, false)); - logger.trace("cloned primary's retention lease as [{}]", clonedLease); - } catch (RetentionLeaseNotFoundException e) { - // it's possible that the primary has no retention lease yet if we are doing a rolling upgrade from a - // version before 7.4, and in that case we just create a lease using the local checkpoint of the safe commit - // which we're using for recovery as a conservative estimate for the global checkpoint. - assert shard.indexSettings().getIndexVersionCreated().before(Version.V_7_4_0); - final long estimatedGlobalCheckpoint = startingSeqNo - 1; - shard.addPeerRecoveryRetentionLease(request.targetNode().getId(), - estimatedGlobalCheckpoint, new ThreadedActionListener<>(logger, shard.getThreadPool(), - ThreadPool.Names.GENERIC, establishRetentionLeaseStep, false)); - logger.trace("created retention lease with estimated checkpoint of [{}]", estimatedGlobalCheckpoint); - } - }, shardId + " establishing retention lease for [" + request.targetAllocationId() + "]", - shard, cancellableThreads, logger); - } else { - establishRetentionLeaseStep.onResponse(null); - } - }, onFailure); - - establishRetentionLeaseStep.whenComplete(r -> { assert Transports.assertNotTransportThread(RecoverySourceHandler.this + "[prepareTargetForTranslog]"); // For a sequence based recovery, the target can keep its local translog prepareTargetForTranslog(isSequenceNumberBasedRecovery == false, @@ -455,7 +429,8 @@ static final class SendFileResult { * segments that are missing. Only segments that have the same size and * checksum can be reused */ - void phase1(IndexCommit snapshot, long globalCheckpoint, IntSupplier translogOps, ActionListener listener) { + void phase1(IndexCommit snapshot, Consumer> getGlobalCheckpoint, + IntSupplier translogOps, ActionListener listener) { cancellableThreads.checkForCancel(); // Total size of segment files that are recovered long totalSizeInBytes = 0; @@ -518,6 +493,7 @@ void phase1(IndexCommit snapshot, long globalCheckpoint, IntSupplier translogOps phase1ExistingFileNames.size(), new ByteSizeValue(existingTotalSizeInBytes)); final StepListener sendFileInfoStep = new StepListener<>(); final StepListener sendFilesStep = new StepListener<>(); + final StepListener getGlobalCheckpointStep = new StepListener<>(); final StepListener cleanFilesStep = new StepListener<>(); cancellableThreads.execute(() -> recoveryTarget.receiveFileInfo(phase1FileNames, phase1FileSizes, phase1ExistingFileNames, @@ -526,7 +502,9 @@ void phase1(IndexCommit snapshot, long globalCheckpoint, IntSupplier translogOps sendFileInfoStep.whenComplete(r -> sendFiles(store, phase1Files.toArray(new StoreFileMetaData[0]), translogOps, sendFilesStep), listener::onFailure); - sendFilesStep.whenComplete(r -> + sendFilesStep.whenComplete(r -> getGlobalCheckpoint.accept(getGlobalCheckpointStep), listener::onFailure); + + getGlobalCheckpointStep.whenComplete(globalCheckpoint -> cleanFiles(store, recoverySourceMetadata, translogOps, globalCheckpoint, cleanFilesStep), listener::onFailure); final long totalSize = totalSizeInBytes; @@ -550,6 +528,45 @@ void phase1(IndexCommit snapshot, long globalCheckpoint, IntSupplier translogOps } } + private void createRetentionLease(final long startingSeqNo, ActionListener listener) { + runUnderPrimaryPermit(() -> { + // Clone the peer recovery retention lease belonging to the source shard. We are retaining history between the the local + // checkpoint of the safe commit we're creating and this lease's retained seqno with the retention lock, and by cloning an + // existing lease we (approximately) know that all our peers are also retaining history as requested by the cloned lease. If + // the recovery now fails before copying enough history over then a subsequent attempt will find this lease, determine it is + // not enough, and fall back to a file-based recovery. + // + // (approximately) because we do not guarantee to be able to satisfy every lease on every peer. + logger.trace("cloning primary's retention lease"); + try { + final StepListener cloneRetentionLeaseStep = new StepListener<>(); + final RetentionLease clonedLease + = shard.cloneLocalPeerRecoveryRetentionLease(request.targetNode().getId(), + new ThreadedActionListener<>(logger, shard.getThreadPool(), + ThreadPool.Names.GENERIC, cloneRetentionLeaseStep, false)); + logger.trace("cloned primary's retention lease as [{}]", clonedLease); + cloneRetentionLeaseStep.whenComplete( + rr -> listener.onResponse(clonedLease.retainingSequenceNumber() - 1), + listener::onFailure); + } catch (RetentionLeaseNotFoundException e) { + // it's possible that the primary has no retention lease yet if we are doing a rolling upgrade from a version before + // 7.4, and in that case we just create a lease using the local checkpoint of the safe commit which we're using for + // recovery as a conservative estimate for the global checkpoint. + assert shard.indexSettings().getIndexVersionCreated().before(Version.V_7_4_0); + final StepListener addRetentionLeaseStep = new StepListener<>(); + final long estimatedGlobalCheckpoint = startingSeqNo - 1; + shard.addPeerRecoveryRetentionLease(request.targetNode().getId(), + estimatedGlobalCheckpoint, new ThreadedActionListener<>(logger, shard.getThreadPool(), + ThreadPool.Names.GENERIC, addRetentionLeaseStep, false)); + addRetentionLeaseStep.whenComplete( + rr -> listener.onResponse(estimatedGlobalCheckpoint), + listener::onFailure); + logger.trace("created retention lease with estimated checkpoint of [{}]", estimatedGlobalCheckpoint); + } + }, shardId + " establishing retention lease for [" + request.targetAllocationId() + "]", + shard, cancellableThreads, logger); + } + boolean canSkipPhase1(Store.MetadataSnapshot source, Store.MetadataSnapshot target) { if (source.getSyncId() == null || source.getSyncId().equals(target.getSyncId()) == false) { return false; diff --git a/server/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java b/server/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java index 38d4b6220dd3d..40a8d122aa0f5 100644 --- a/server/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java +++ b/server/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java @@ -101,6 +101,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Consumer; import java.util.function.IntSupplier; import java.util.zip.CRC32; @@ -466,9 +467,10 @@ public void testThrowExceptionOnPrimaryRelocatedBeforePhase1Started() throws IOE between(1, 8)) { @Override - void phase1(IndexCommit snapshot, long globalCheckpoint, IntSupplier translogOps, ActionListener listener) { + void phase1(IndexCommit snapshot, Consumer> getGlobalCheckpoint, + IntSupplier translogOps, ActionListener listener) { phase1Called.set(true); - super.phase1(snapshot, globalCheckpoint, translogOps, listener); + super.phase1(snapshot, getGlobalCheckpoint, translogOps, listener); } @Override @@ -683,7 +685,9 @@ public void cleanFiles(int totalTranslogOps, long globalCheckpoint, Store.Metada final StepListener phase1Listener = new StepListener<>(); try { final CountDownLatch latch = new CountDownLatch(1); - handler.phase1(DirectoryReader.listCommits(dir).get(0), randomNonNegativeLong(), () -> 0, + handler.phase1(DirectoryReader.listCommits(dir).get(0), + l -> recoveryExecutor.execute(() -> l.onResponse(randomNonNegativeLong())), + () -> 0, new LatchedActionListener<>(phase1Listener, latch)); latch.await(); phase1Listener.result(); From 825fc8f0c0286ba31e8cc8735a99f9cb2e1cf0d2 Mon Sep 17 00:00:00 2001 From: David Turner Date: Wed, 31 Jul 2019 10:08:52 +0100 Subject: [PATCH 11/14] Imports --- .../elasticsearch/indices/recovery/RecoverySourceHandler.java | 1 - 1 file changed, 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java b/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java index d0faa80cde66a..f2366d848d94f 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java @@ -78,7 +78,6 @@ import java.util.Comparator; import java.util.List; import java.util.Locale; -import java.util.OptionalLong; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.atomic.AtomicInteger; From 24b5806e5e58133d1485b938966213f9d9b4d67c Mon Sep 17 00:00:00 2001 From: David Turner Date: Wed, 31 Jul 2019 10:25:08 +0100 Subject: [PATCH 12/14] Ensure that commit remains safe --- .../recovery/RecoverySourceHandler.java | 32 ++++++++++--------- .../recovery/RecoverySourceHandlerTests.java | 4 +-- 2 files changed, 19 insertions(+), 17 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java b/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java index f2366d848d94f..e2f2f516acefa 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java @@ -264,15 +264,14 @@ && isTargetSameHistory() deleteRetentionLeaseStep.whenComplete(ignored -> { assert Transports.assertNotTransportThread(RecoverySourceHandler.this + "[phase1]"); - final Consumer> getGlobalCheckpoint; + final Consumer> getInitialGlobalCheckpoint; if (useRetentionLeases) { - getGlobalCheckpoint = l -> createRetentionLease(startingSeqNo, l); + getInitialGlobalCheckpoint = l -> createRetentionLease(startingSeqNo, l); } else { - final long globalCheckpoint = shard.getLastKnownGlobalCheckpoint(); - getGlobalCheckpoint = l -> l.onResponse(globalCheckpoint); + getInitialGlobalCheckpoint = l -> l.onResponse(SequenceNumbers.UNASSIGNED_SEQ_NO); } - phase1(safeCommitRef.getIndexCommit(), getGlobalCheckpoint, () -> estimateNumOps, sendFileStep); + phase1(safeCommitRef.getIndexCommit(), getInitialGlobalCheckpoint, () -> estimateNumOps, sendFileStep); }, onFailure); } catch (final Exception e) { @@ -428,7 +427,7 @@ static final class SendFileResult { * segments that are missing. Only segments that have the same size and * checksum can be reused */ - void phase1(IndexCommit snapshot, Consumer> getGlobalCheckpoint, + void phase1(IndexCommit snapshot, Consumer> getInitialGlobalCheckpoint, IntSupplier translogOps, ActionListener listener) { cancellableThreads.checkForCancel(); // Total size of segment files that are recovered @@ -492,7 +491,7 @@ void phase1(IndexCommit snapshot, Consumer> getGlobalCheckp phase1ExistingFileNames.size(), new ByteSizeValue(existingTotalSizeInBytes)); final StepListener sendFileInfoStep = new StepListener<>(); final StepListener sendFilesStep = new StepListener<>(); - final StepListener getGlobalCheckpointStep = new StepListener<>(); + final StepListener getInitialGlobalCheckpointStep = new StepListener<>(); final StepListener cleanFilesStep = new StepListener<>(); cancellableThreads.execute(() -> recoveryTarget.receiveFileInfo(phase1FileNames, phase1FileSizes, phase1ExistingFileNames, @@ -501,10 +500,13 @@ void phase1(IndexCommit snapshot, Consumer> getGlobalCheckp sendFileInfoStep.whenComplete(r -> sendFiles(store, phase1Files.toArray(new StoreFileMetaData[0]), translogOps, sendFilesStep), listener::onFailure); - sendFilesStep.whenComplete(r -> getGlobalCheckpoint.accept(getGlobalCheckpointStep), listener::onFailure); + sendFilesStep.whenComplete(r -> getInitialGlobalCheckpoint.accept(getInitialGlobalCheckpointStep), listener::onFailure); - getGlobalCheckpointStep.whenComplete(globalCheckpoint -> - cleanFiles(store, recoverySourceMetadata, translogOps, globalCheckpoint, cleanFilesStep), listener::onFailure); + getInitialGlobalCheckpointStep.whenComplete(initialGlobalCheckpoint -> + cleanFiles(store, recoverySourceMetadata, translogOps, + Math.max(initialGlobalCheckpoint, Long.parseLong(snapshot.getUserData().get(SequenceNumbers.MAX_SEQ_NO))), + cleanFilesStep), + listener::onFailure); final long totalSize = totalSizeInBytes; final long existingTotalSize = existingTotalSizeInBytes; @@ -527,7 +529,7 @@ void phase1(IndexCommit snapshot, Consumer> getGlobalCheckp } } - private void createRetentionLease(final long startingSeqNo, ActionListener listener) { + private void createRetentionLease(final long startingSeqNo, ActionListener initialGlobalCheckpointListener) { runUnderPrimaryPermit(() -> { // Clone the peer recovery retention lease belonging to the source shard. We are retaining history between the the local // checkpoint of the safe commit we're creating and this lease's retained seqno with the retention lock, and by cloning an @@ -545,8 +547,8 @@ private void createRetentionLease(final long startingSeqNo, ActionListener ThreadPool.Names.GENERIC, cloneRetentionLeaseStep, false)); logger.trace("cloned primary's retention lease as [{}]", clonedLease); cloneRetentionLeaseStep.whenComplete( - rr -> listener.onResponse(clonedLease.retainingSequenceNumber() - 1), - listener::onFailure); + rr -> initialGlobalCheckpointListener.onResponse(clonedLease.retainingSequenceNumber() - 1), + initialGlobalCheckpointListener::onFailure); } catch (RetentionLeaseNotFoundException e) { // it's possible that the primary has no retention lease yet if we are doing a rolling upgrade from a version before // 7.4, and in that case we just create a lease using the local checkpoint of the safe commit which we're using for @@ -558,8 +560,8 @@ private void createRetentionLease(final long startingSeqNo, ActionListener estimatedGlobalCheckpoint, new ThreadedActionListener<>(logger, shard.getThreadPool(), ThreadPool.Names.GENERIC, addRetentionLeaseStep, false)); addRetentionLeaseStep.whenComplete( - rr -> listener.onResponse(estimatedGlobalCheckpoint), - listener::onFailure); + rr -> initialGlobalCheckpointListener.onResponse(estimatedGlobalCheckpoint), + initialGlobalCheckpointListener::onFailure); logger.trace("created retention lease with estimated checkpoint of [{}]", estimatedGlobalCheckpoint); } }, shardId + " establishing retention lease for [" + request.targetAllocationId() + "]", diff --git a/server/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java b/server/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java index 40a8d122aa0f5..82deeaaea8b26 100644 --- a/server/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java +++ b/server/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java @@ -467,10 +467,10 @@ public void testThrowExceptionOnPrimaryRelocatedBeforePhase1Started() throws IOE between(1, 8)) { @Override - void phase1(IndexCommit snapshot, Consumer> getGlobalCheckpoint, + void phase1(IndexCommit snapshot, Consumer> getInitialGlobalCheckpoint, IntSupplier translogOps, ActionListener listener) { phase1Called.set(true); - super.phase1(snapshot, getGlobalCheckpoint, translogOps, listener); + super.phase1(snapshot, getInitialGlobalCheckpoint, translogOps, listener); } @Override From d63e77774b37fd6bb20e5c5b2e2c3ffbbd1852ca Mon Sep 17 00:00:00 2001 From: David Turner Date: Wed, 31 Jul 2019 12:18:54 +0100 Subject: [PATCH 13/14] Sample the GCP later and merely assert that it's ahead of the leased GCP --- .../index/seqno/ReplicationTracker.java | 6 ++- .../elasticsearch/index/shard/IndexShard.java | 5 ++- .../recovery/RecoverySourceHandler.java | 43 ++++++++++--------- .../recovery/RecoverySourceHandlerTests.java | 7 +-- 4 files changed, 34 insertions(+), 27 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java b/server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java index 91d00396db3bb..7185fd4319af4 100644 --- a/server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java +++ b/server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java @@ -472,8 +472,10 @@ public boolean assertRetentionLeasesPersisted(final Path path) throws IOExceptio * containing the persistent node ID calculated by {@link ReplicationTracker#getPeerRecoveryRetentionLeaseId}, and retain operations * with sequence numbers strictly greater than the given global checkpoint. */ - public void addPeerRecoveryRetentionLease(String nodeId, long globalCheckpoint, ActionListener listener) { - addRetentionLease(getPeerRecoveryRetentionLeaseId(nodeId), globalCheckpoint + 1, PEER_RECOVERY_RETENTION_LEASE_SOURCE, listener); + public RetentionLease addPeerRecoveryRetentionLease(String nodeId, long globalCheckpoint, + ActionListener listener) { + return addRetentionLease(getPeerRecoveryRetentionLeaseId(nodeId), globalCheckpoint + 1, + PEER_RECOVERY_RETENTION_LEASE_SOURCE, listener); } public RetentionLease cloneLocalPeerRecoveryRetentionLease(String nodeId, ActionListener listener) { diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index c8c1143f2185f..dc3795f160677 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -2598,11 +2598,12 @@ public boolean isRelocatedPrimary() { return replicationTracker.isRelocated(); } - public void addPeerRecoveryRetentionLease(String nodeId, long globalCheckpoint, ActionListener listener) { + public RetentionLease addPeerRecoveryRetentionLease(String nodeId, long globalCheckpoint, + ActionListener listener) { assert assertPrimaryMode(); // only needed for BWC reasons involving rolling upgrades from versions that do not support PRRLs: assert indexSettings.getIndexVersionCreated().before(Version.V_7_4_0); - replicationTracker.addPeerRecoveryRetentionLease(nodeId, globalCheckpoint, listener); + return replicationTracker.addPeerRecoveryRetentionLease(nodeId, globalCheckpoint, listener); } public RetentionLease cloneLocalPeerRecoveryRetentionLease(String nodeId, ActionListener listener) { diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java b/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java index e2f2f516acefa..34bccd097fd85 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java @@ -264,14 +264,14 @@ && isTargetSameHistory() deleteRetentionLeaseStep.whenComplete(ignored -> { assert Transports.assertNotTransportThread(RecoverySourceHandler.this + "[phase1]"); - final Consumer> getInitialGlobalCheckpoint; + final Consumer> createRetentionLeaseAsync; if (useRetentionLeases) { - getInitialGlobalCheckpoint = l -> createRetentionLease(startingSeqNo, l); + createRetentionLeaseAsync = l -> createRetentionLease(startingSeqNo, l); } else { - getInitialGlobalCheckpoint = l -> l.onResponse(SequenceNumbers.UNASSIGNED_SEQ_NO); + createRetentionLeaseAsync = l -> l.onResponse(null); } - phase1(safeCommitRef.getIndexCommit(), getInitialGlobalCheckpoint, () -> estimateNumOps, sendFileStep); + phase1(safeCommitRef.getIndexCommit(), createRetentionLeaseAsync, () -> estimateNumOps, sendFileStep); }, onFailure); } catch (final Exception e) { @@ -427,7 +427,7 @@ static final class SendFileResult { * segments that are missing. Only segments that have the same size and * checksum can be reused */ - void phase1(IndexCommit snapshot, Consumer> getInitialGlobalCheckpoint, + void phase1(IndexCommit snapshot, Consumer> createRetentionLease, IntSupplier translogOps, ActionListener listener) { cancellableThreads.checkForCancel(); // Total size of segment files that are recovered @@ -491,7 +491,7 @@ void phase1(IndexCommit snapshot, Consumer> getInitialGloba phase1ExistingFileNames.size(), new ByteSizeValue(existingTotalSizeInBytes)); final StepListener sendFileInfoStep = new StepListener<>(); final StepListener sendFilesStep = new StepListener<>(); - final StepListener getInitialGlobalCheckpointStep = new StepListener<>(); + final StepListener createRetentionLeaseStep = new StepListener<>(); final StepListener cleanFilesStep = new StepListener<>(); cancellableThreads.execute(() -> recoveryTarget.receiveFileInfo(phase1FileNames, phase1FileSizes, phase1ExistingFileNames, @@ -500,12 +500,19 @@ void phase1(IndexCommit snapshot, Consumer> getInitialGloba sendFileInfoStep.whenComplete(r -> sendFiles(store, phase1Files.toArray(new StoreFileMetaData[0]), translogOps, sendFilesStep), listener::onFailure); - sendFilesStep.whenComplete(r -> getInitialGlobalCheckpoint.accept(getInitialGlobalCheckpointStep), listener::onFailure); - - getInitialGlobalCheckpointStep.whenComplete(initialGlobalCheckpoint -> - cleanFiles(store, recoverySourceMetadata, translogOps, - Math.max(initialGlobalCheckpoint, Long.parseLong(snapshot.getUserData().get(SequenceNumbers.MAX_SEQ_NO))), - cleanFilesStep), + sendFilesStep.whenComplete(r -> createRetentionLease.accept(createRetentionLeaseStep), listener::onFailure); + + createRetentionLeaseStep.whenComplete(retentionLease -> + { + final long lastKnownGlobalCheckpoint = shard.getLastKnownGlobalCheckpoint(); + assert retentionLease == null || retentionLease.retainingSequenceNumber() - 1 <= lastKnownGlobalCheckpoint + : retentionLease + " vs " + lastKnownGlobalCheckpoint; + // Establishes new empty translog on the replica with global checkpoint set to lastKnownGlobalCheckpoint. We want + // the commit we just copied to be a safe commit on the replica, so why not set the global checkpoint on the replica + // to the max seqno of this commit? Because (in rare corner cases) this commit might not be a safe commit here on + // the primary, and in these cases the max seqno would be too high to be valid as a global checkpoint. + cleanFiles(store, recoverySourceMetadata, translogOps, lastKnownGlobalCheckpoint, cleanFilesStep); + }, listener::onFailure); final long totalSize = totalSizeInBytes; @@ -529,7 +536,7 @@ void phase1(IndexCommit snapshot, Consumer> getInitialGloba } } - private void createRetentionLease(final long startingSeqNo, ActionListener initialGlobalCheckpointListener) { + private void createRetentionLease(final long startingSeqNo, ActionListener listener) { runUnderPrimaryPermit(() -> { // Clone the peer recovery retention lease belonging to the source shard. We are retaining history between the the local // checkpoint of the safe commit we're creating and this lease's retained seqno with the retention lock, and by cloning an @@ -546,9 +553,7 @@ private void createRetentionLease(final long startingSeqNo, ActionListener new ThreadedActionListener<>(logger, shard.getThreadPool(), ThreadPool.Names.GENERIC, cloneRetentionLeaseStep, false)); logger.trace("cloned primary's retention lease as [{}]", clonedLease); - cloneRetentionLeaseStep.whenComplete( - rr -> initialGlobalCheckpointListener.onResponse(clonedLease.retainingSequenceNumber() - 1), - initialGlobalCheckpointListener::onFailure); + cloneRetentionLeaseStep.whenComplete(rr -> listener.onResponse(clonedLease), listener::onFailure); } catch (RetentionLeaseNotFoundException e) { // it's possible that the primary has no retention lease yet if we are doing a rolling upgrade from a version before // 7.4, and in that case we just create a lease using the local checkpoint of the safe commit which we're using for @@ -556,12 +561,10 @@ private void createRetentionLease(final long startingSeqNo, ActionListener assert shard.indexSettings().getIndexVersionCreated().before(Version.V_7_4_0); final StepListener addRetentionLeaseStep = new StepListener<>(); final long estimatedGlobalCheckpoint = startingSeqNo - 1; - shard.addPeerRecoveryRetentionLease(request.targetNode().getId(), + final RetentionLease newLease = shard.addPeerRecoveryRetentionLease(request.targetNode().getId(), estimatedGlobalCheckpoint, new ThreadedActionListener<>(logger, shard.getThreadPool(), ThreadPool.Names.GENERIC, addRetentionLeaseStep, false)); - addRetentionLeaseStep.whenComplete( - rr -> initialGlobalCheckpointListener.onResponse(estimatedGlobalCheckpoint), - initialGlobalCheckpointListener::onFailure); + addRetentionLeaseStep.whenComplete(rr -> listener.onResponse(newLease), listener::onFailure); logger.trace("created retention lease with estimated checkpoint of [{}]", estimatedGlobalCheckpoint); } }, shardId + " establishing retention lease for [" + request.targetAllocationId() + "]", diff --git a/server/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java b/server/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java index 82deeaaea8b26..a175699ad2cf6 100644 --- a/server/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java +++ b/server/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java @@ -64,6 +64,7 @@ import org.elasticsearch.index.mapper.SeqNoFieldMapper; import org.elasticsearch.index.mapper.Uid; import org.elasticsearch.index.seqno.ReplicationTracker; +import org.elasticsearch.index.seqno.RetentionLease; import org.elasticsearch.index.seqno.RetentionLeases; import org.elasticsearch.index.seqno.SeqNoStats; import org.elasticsearch.index.seqno.SequenceNumbers; @@ -467,10 +468,10 @@ public void testThrowExceptionOnPrimaryRelocatedBeforePhase1Started() throws IOE between(1, 8)) { @Override - void phase1(IndexCommit snapshot, Consumer> getInitialGlobalCheckpoint, + void phase1(IndexCommit snapshot, Consumer> createRetentionLease, IntSupplier translogOps, ActionListener listener) { phase1Called.set(true); - super.phase1(snapshot, getInitialGlobalCheckpoint, translogOps, listener); + super.phase1(snapshot, createRetentionLease, translogOps, listener); } @Override @@ -686,7 +687,7 @@ public void cleanFiles(int totalTranslogOps, long globalCheckpoint, Store.Metada try { final CountDownLatch latch = new CountDownLatch(1); handler.phase1(DirectoryReader.listCommits(dir).get(0), - l -> recoveryExecutor.execute(() -> l.onResponse(randomNonNegativeLong())), + l -> recoveryExecutor.execute(() -> l.onResponse(null)), () -> 0, new LatchedActionListener<>(phase1Listener, latch)); latch.await(); From f47e56eaf8f2cf69ad8008f7cd0bc1dcd7437897 Mon Sep 17 00:00:00 2001 From: David Turner Date: Wed, 31 Jul 2019 15:31:29 +0100 Subject: [PATCH 14/14] Handle the synced-flush case which skips most of phase 1 --- .../recovery/RecoverySourceHandler.java | 38 +++++++++++-------- .../indices/recovery/IndexRecoveryIT.java | 36 ++++++++++++++++++ 2 files changed, 59 insertions(+), 15 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java b/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java index 34bccd097fd85..77c86c6e02946 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java @@ -430,14 +430,6 @@ static final class SendFileResult { void phase1(IndexCommit snapshot, Consumer> createRetentionLease, IntSupplier translogOps, ActionListener listener) { cancellableThreads.checkForCancel(); - // Total size of segment files that are recovered - long totalSizeInBytes = 0; - // Total size of segment files that were able to be re-used - long existingTotalSizeInBytes = 0; - final List phase1FileNames = new ArrayList<>(); - final List phase1FileSizes = new ArrayList<>(); - final List phase1ExistingFileNames = new ArrayList<>(); - final List phase1ExistingFileSizes = new ArrayList<>(); final Store store = shard.store(); try { StopWatch stopWatch = new StopWatch().start(); @@ -457,6 +449,16 @@ void phase1(IndexCommit snapshot, Consumer> creat } } if (canSkipPhase1(recoverySourceMetadata, request.metadataSnapshot()) == false) { + final List phase1FileNames = new ArrayList<>(); + final List phase1FileSizes = new ArrayList<>(); + final List phase1ExistingFileNames = new ArrayList<>(); + final List phase1ExistingFileSizes = new ArrayList<>(); + + // Total size of segment files that are recovered + long totalSizeInBytes = 0; + // Total size of segment files that were able to be re-used + long existingTotalSizeInBytes = 0; + // Generate a "diff" of all the identical, different, and missing // segment files on the target node, using the existing files on // the source node @@ -524,15 +526,21 @@ void phase1(IndexCommit snapshot, Consumer> creat phase1ExistingFileSizes, existingTotalSize, took)); }, listener::onFailure); } else { - logger.trace("skipping [phase1]- identical sync id [{}] found on both source and target", - recoverySourceMetadata.getSyncId()); - final TimeValue took = stopWatch.totalTime(); - logger.trace("recovery [phase1]: took [{}]", took); - listener.onResponse(new SendFileResult(phase1FileNames, phase1FileSizes, totalSizeInBytes, phase1ExistingFileNames, - phase1ExistingFileSizes, existingTotalSizeInBytes, took)); + logger.trace("skipping [phase1] since source and target have identical sync id [{}]", recoverySourceMetadata.getSyncId()); + + // but we must still create a retention lease + final StepListener createRetentionLeaseStep = new StepListener<>(); + createRetentionLease.accept(createRetentionLeaseStep); + createRetentionLeaseStep.whenComplete(retentionLease -> { + final TimeValue took = stopWatch.totalTime(); + logger.trace("recovery [phase1]: took [{}]", took); + listener.onResponse(new SendFileResult(Collections.emptyList(), Collections.emptyList(), 0L, Collections.emptyList(), + Collections.emptyList(), 0L, took)); + }, listener::onFailure); + } } catch (Exception e) { - throw new RecoverFilesRecoveryException(request.shardId(), phase1FileNames.size(), new ByteSizeValue(totalSizeInBytes), e); + throw new RecoverFilesRecoveryException(request.shardId(), 0, new ByteSizeValue(0L), e); } } diff --git a/server/src/test/java/org/elasticsearch/indices/recovery/IndexRecoveryIT.java b/server/src/test/java/org/elasticsearch/indices/recovery/IndexRecoveryIT.java index a91aee2ea53f0..9ab56df76d5c3 100644 --- a/server/src/test/java/org/elasticsearch/indices/recovery/IndexRecoveryIT.java +++ b/server/src/test/java/org/elasticsearch/indices/recovery/IndexRecoveryIT.java @@ -37,6 +37,7 @@ import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.action.support.WriteRequest.RefreshPolicy; import org.elasticsearch.action.support.replication.ReplicationResponse; +import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.action.shard.ShardStateAction; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.node.DiscoveryNode; @@ -876,6 +877,7 @@ public void testHistoryRetention() throws Exception { internalCluster().stopRandomNode(InternalTestCluster.nameFilter(secondNodeToStop)); final long desyncNanoTime = System.nanoTime(); + //noinspection StatementWithEmptyBody while (System.nanoTime() <= desyncNanoTime) { // time passes } @@ -1015,6 +1017,40 @@ public void testRecoveryFlushReplica() throws Exception { assertThat(syncIds, hasSize(1)); } + public void testRecoveryUsingSyncedFlushWithoutRetentionLease() throws Exception { + internalCluster().ensureAtLeastNumDataNodes(2); + String indexName = "test-index"; + createIndex(indexName, Settings.builder() + .put("index.number_of_shards", 1) + .put("index.number_of_replicas", 1) + .put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), "24h") // do not reallocate the lost shard + .put(IndexSettings.INDEX_SOFT_DELETES_RETENTION_LEASE_PERIOD_SETTING.getKey(), "100ms") // expire leases quickly + .put(IndexService.RETENTION_LEASE_SYNC_INTERVAL_SETTING.getKey(), "100ms") // sync frequently + .build()); + int numDocs = randomIntBetween(0, 10); + indexRandom(randomBoolean(), false, randomBoolean(), IntStream.range(0, numDocs) + .mapToObj(n -> client().prepareIndex(indexName, "_doc").setSource("num", n)).collect(toList())); + ensureGreen(indexName); + + final ShardId shardId = new ShardId(resolveIndex(indexName), 0); + assertThat(SyncedFlushUtil.attemptSyncedFlush(logger, internalCluster(), shardId).successfulShards(), equalTo(2)); + + final ClusterState clusterState = client().admin().cluster().prepareState().get().getState(); + final ShardRouting shardToResync = randomFrom(clusterState.routingTable().shardRoutingTable(shardId).activeShards()); + internalCluster().restartNode(clusterState.nodes().get(shardToResync.currentNodeId()).getName(), + new InternalTestCluster.RestartCallback() { + @Override + public Settings onNodeStopped(String nodeName) throws Exception { + assertBusy(() -> assertFalse(client().admin().indices().prepareStats(indexName).get() + .getShards()[0].getRetentionLeaseStats().retentionLeases().contains( + ReplicationTracker.getPeerRecoveryRetentionLeaseId(shardToResync)))); + return super.onNodeStopped(nodeName); + } + }); + + ensureGreen(indexName); + } + public void testRecoverLocallyUpToGlobalCheckpoint() throws Exception { internalCluster().ensureAtLeastNumDataNodes(2); List nodes = randomSubsetOf(2, StreamSupport.stream(clusterService().state().nodes().getDataNodes().spliterator(), false)