Skip to content

Commit 578514e

Browse files
committed
Recover peers from translog, ignoring soft deletes (#38904)
Today if soft deletes are enabled then we read the operations needed for peer recovery from Lucene. However we do not currently make any attempt to retain history in Lucene specifically for peer recoveries so we may discard it and fall back to a more expensive file-based recovery. Yet we still retain sufficient history in the translog to perform an operations-based peer recovery. In the long run we would like to fix this by retaining more history in Lucene, possibly using shard history retention leases (#37165). For now, however, this commit reverts to performing peer recoveries using the history retained in the translog regardless of whether soft deletes are enabled or not.
1 parent a211e51 commit 578514e

File tree

7 files changed

+87
-41
lines changed

7 files changed

+87
-41
lines changed

server/src/main/java/org/elasticsearch/index/engine/Engine.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -767,7 +767,7 @@ public abstract int estimateNumberOfHistoryOperations(String source,
767767
MapperService mapperService, long startingSeqNo) throws IOException;
768768

769769
/**
770-
* Checks if this engine has every operations since {@code startingSeqNo}(inclusive) in its history (either Lucene or translog)
770+
* Checks if this engine has every operations since {@code startingSeqNo}(inclusive) in its translog
771771
*/
772772
public abstract boolean hasCompleteOperationHistory(String source, MapperService mapperService, long startingSeqNo) throws IOException;
773773

server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java

+19-20
Original file line numberDiff line numberDiff line change
@@ -502,16 +502,11 @@ public void syncTranslog() throws IOException {
502502
}
503503

504504
/**
505-
* Creates a new history snapshot for reading operations since the provided seqno.
506-
* The returned snapshot can be retrieved from either Lucene index or translog files.
505+
* Creates a new history snapshot for reading operations since the provided seqno from the translog.
507506
*/
508507
@Override
509508
public Translog.Snapshot readHistoryOperations(String source, MapperService mapperService, long startingSeqNo) throws IOException {
510-
if (engineConfig.getIndexSettings().isSoftDeleteEnabled()) {
511-
return newChangesSnapshot(source, mapperService, Math.max(0, startingSeqNo), Long.MAX_VALUE, false);
512-
} else {
513-
return getTranslog().newSnapshotFromMinSeqNo(startingSeqNo);
514-
}
509+
return getTranslog().newSnapshotFromMinSeqNo(startingSeqNo);
515510
}
516511

517512
/**
@@ -2546,21 +2541,17 @@ public Translog.Snapshot newChangesSnapshot(String source, MapperService mapperS
25462541

25472542
@Override
25482543
public boolean hasCompleteOperationHistory(String source, MapperService mapperService, long startingSeqNo) throws IOException {
2549-
if (engineConfig.getIndexSettings().isSoftDeleteEnabled()) {
2550-
return getMinRetainedSeqNo() <= startingSeqNo;
2551-
} else {
2552-
final long currentLocalCheckpoint = getLocalCheckpointTracker().getCheckpoint();
2553-
final LocalCheckpointTracker tracker = new LocalCheckpointTracker(startingSeqNo, startingSeqNo - 1);
2554-
try (Translog.Snapshot snapshot = getTranslog().newSnapshotFromMinSeqNo(startingSeqNo)) {
2555-
Translog.Operation operation;
2556-
while ((operation = snapshot.next()) != null) {
2557-
if (operation.seqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO) {
2558-
tracker.markSeqNoAsCompleted(operation.seqNo());
2559-
}
2544+
final long currentLocalCheckpoint = getLocalCheckpointTracker().getCheckpoint();
2545+
final LocalCheckpointTracker tracker = new LocalCheckpointTracker(startingSeqNo, startingSeqNo - 1);
2546+
try (Translog.Snapshot snapshot = getTranslog().newSnapshotFromMinSeqNo(startingSeqNo)) {
2547+
Translog.Operation operation;
2548+
while ((operation = snapshot.next()) != null) {
2549+
if (operation.seqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO) {
2550+
tracker.markSeqNoAsCompleted(operation.seqNo());
25602551
}
25612552
}
2562-
return tracker.getCheckpoint() >= currentLocalCheckpoint;
25632553
}
2554+
return tracker.getCheckpoint() >= currentLocalCheckpoint;
25642555
}
25652556

25662557
/**
@@ -2575,7 +2566,15 @@ public final long getMinRetainedSeqNo() {
25752566
@Override
25762567
public Closeable acquireRetentionLock() {
25772568
if (softDeleteEnabled) {
2578-
return softDeletesPolicy.acquireRetentionLock();
2569+
final Releasable softDeletesRetentionLock = softDeletesPolicy.acquireRetentionLock();
2570+
final Closeable translogRetentionLock;
2571+
try {
2572+
translogRetentionLock = translog.acquireRetentionLock();
2573+
} catch (Exception e) {
2574+
softDeletesRetentionLock.close();
2575+
throw e;
2576+
}
2577+
return () -> IOUtils.close(translogRetentionLock, softDeletesRetentionLock);
25792578
} else {
25802579
return translog.acquireRetentionLock();
25812580
}

server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java

+3-4
Original file line numberDiff line numberDiff line change
@@ -177,10 +177,9 @@ public void recoverToTarget(ActionListener<RecoveryResponse> listener) {
177177
// We must have everything above the local checkpoint in the commit
178178
requiredSeqNoRangeStart =
179179
Long.parseLong(phase1Snapshot.getIndexCommit().getUserData().get(SequenceNumbers.LOCAL_CHECKPOINT_KEY)) + 1;
180-
// If soft-deletes enabled, we need to transfer only operations after the local_checkpoint of the commit to have
181-
// the same history on the target. However, with translog, we need to set this to 0 to create a translog roughly
182-
// according to the retention policy on the target. Note that it will still filter out legacy operations without seqNo.
183-
startingSeqNo = shard.indexSettings().isSoftDeleteEnabled() ? requiredSeqNoRangeStart : 0;
180+
// We need to set this to 0 to create a translog roughly according to the retention policy on the target. Note that it will
181+
// still filter out legacy operations without seqNo.
182+
startingSeqNo = 0;
184183
try {
185184
final int estimateNumOps = shard.estimateNumberOfHistoryOperations("peer-recovery", startingSeqNo);
186185
sendFileResult = phase1(phase1Snapshot.getIndexCommit(), () -> estimateNumOps);

server/src/main/java/org/elasticsearch/indices/recovery/RecoveryState.java

+4
Original file line numberDiff line numberDiff line change
@@ -417,6 +417,10 @@ public synchronized void reset() {
417417
stopTime = 0;
418418
}
419419

420+
// for tests
421+
public long getStartNanoTime() {
422+
return startNanoTime;
423+
}
420424
}
421425

422426
public static class VerifyIndex extends Timer implements ToXContentFragment, Writeable {

server/src/test/java/org/elasticsearch/index/shard/PrimaryReplicaSyncerTests.java

+4-10
Original file line numberDiff line numberDiff line change
@@ -115,16 +115,10 @@ public void testSyncerSendsOffCorrectDocuments() throws Exception {
115115
assertThat(resyncRequest.getMaxSeenAutoIdTimestampOnPrimary(), equalTo(shard.getMaxSeenAutoIdTimestamp()));
116116
}
117117
if (syncNeeded && globalCheckPoint < numDocs - 1) {
118-
if (shard.indexSettings.isSoftDeleteEnabled()) {
119-
assertThat(resyncTask.getSkippedOperations(), equalTo(0));
120-
assertThat(resyncTask.getResyncedOperations(), equalTo(resyncTask.getTotalOperations()));
121-
assertThat(resyncTask.getTotalOperations(), equalTo(Math.toIntExact(numDocs - 1 - globalCheckPoint)));
122-
} else {
123-
int skippedOps = Math.toIntExact(globalCheckPoint + 1); // everything up to global checkpoint included
124-
assertThat(resyncTask.getSkippedOperations(), equalTo(skippedOps));
125-
assertThat(resyncTask.getResyncedOperations(), equalTo(numDocs - skippedOps));
126-
assertThat(resyncTask.getTotalOperations(), equalTo(globalCheckPoint == numDocs - 1 ? 0 : numDocs));
127-
}
118+
int skippedOps = Math.toIntExact(globalCheckPoint + 1); // everything up to global checkpoint included
119+
assertThat(resyncTask.getSkippedOperations(), equalTo(skippedOps));
120+
assertThat(resyncTask.getResyncedOperations(), equalTo(numDocs - skippedOps));
121+
assertThat(resyncTask.getTotalOperations(), equalTo(globalCheckPoint == numDocs - 1 ? 0 : numDocs));
128122
} else {
129123
assertThat(resyncTask.getSkippedOperations(), equalTo(0));
130124
assertThat(resyncTask.getResyncedOperations(), equalTo(0));

server/src/test/java/org/elasticsearch/indices/recovery/IndexRecoveryIT.java

+53
Original file line numberDiff line numberDiff line change
@@ -26,11 +26,13 @@
2626
import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse;
2727
import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse;
2828
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
29+
import org.elasticsearch.action.admin.indices.recovery.RecoveryRequest;
2930
import org.elasticsearch.action.admin.indices.recovery.RecoveryResponse;
3031
import org.elasticsearch.action.admin.indices.stats.CommonStatsFlags;
3132
import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse;
3233
import org.elasticsearch.action.index.IndexRequestBuilder;
3334
import org.elasticsearch.action.search.SearchResponse;
35+
import org.elasticsearch.action.support.WriteRequest.RefreshPolicy;
3436
import org.elasticsearch.cluster.action.shard.ShardStateAction;
3537
import org.elasticsearch.cluster.metadata.IndexMetaData;
3638
import org.elasticsearch.cluster.routing.RecoverySource;
@@ -786,4 +788,55 @@ public void sendRequest(Transport.Connection connection, long requestId, String
786788
assertHitCount(client().prepareSearch(indexName).get(), numDocs);
787789
}
788790
}
791+
792+
@TestLogging("org.elasticsearch.indices.recovery:TRACE")
793+
public void testHistoryRetention() throws Exception {
794+
internalCluster().startNodes(3);
795+
796+
final String indexName = "test";
797+
client().admin().indices().prepareCreate(indexName).setSettings(Settings.builder()
798+
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
799+
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 2)).get();
800+
ensureGreen(indexName);
801+
802+
// Perform some replicated operations so the replica isn't simply empty, because ops-based recovery isn't better in that case
803+
final List<IndexRequestBuilder> requests = new ArrayList<>();
804+
final int replicatedDocCount = scaledRandomIntBetween(25, 250);
805+
while (requests.size() < replicatedDocCount) {
806+
requests.add(client().prepareIndex(indexName, "_doc").setSource("{}", XContentType.JSON));
807+
}
808+
indexRandom(true, requests);
809+
if (randomBoolean()) {
810+
flush(indexName);
811+
}
812+
813+
internalCluster().stopRandomNode(s -> true);
814+
internalCluster().stopRandomNode(s -> true);
815+
816+
final long desyncNanoTime = System.nanoTime();
817+
while (System.nanoTime() <= desyncNanoTime) {
818+
// time passes
819+
}
820+
821+
final int numNewDocs = scaledRandomIntBetween(25, 250);
822+
for (int i = 0; i < numNewDocs; i++) {
823+
client().prepareIndex(indexName, "_doc").setSource("{}", XContentType.JSON).setRefreshPolicy(RefreshPolicy.IMMEDIATE).get();
824+
}
825+
// Flush twice to update the safe commit's local checkpoint
826+
assertThat(client().admin().indices().prepareFlush(indexName).setForce(true).execute().get().getFailedShards(), equalTo(0));
827+
assertThat(client().admin().indices().prepareFlush(indexName).setForce(true).execute().get().getFailedShards(), equalTo(0));
828+
829+
assertAcked(client().admin().indices().prepareUpdateSettings(indexName)
830+
.setSettings(Settings.builder().put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1)));
831+
internalCluster().startNode();
832+
ensureGreen(indexName);
833+
834+
final RecoveryResponse recoveryResponse = client().admin().indices().recoveries(new RecoveryRequest(indexName)).get();
835+
final List<RecoveryState> recoveryStates = recoveryResponse.shardRecoveryStates().get(indexName);
836+
recoveryStates.removeIf(r -> r.getTimer().getStartNanoTime() <= desyncNanoTime);
837+
838+
assertThat(recoveryStates, hasSize(1));
839+
assertThat(recoveryStates.get(0).getIndex().totalFileCount(), is(0));
840+
assertThat(recoveryStates.get(0).getTranslog().recoveredOperations(), greaterThan(0));
841+
}
789842
}

server/src/test/java/org/elasticsearch/indices/recovery/RecoveryTests.java

+3-6
Original file line numberDiff line numberDiff line change
@@ -68,8 +68,7 @@ public void testTranslogHistoryTransferred() throws Exception {
6868
shards.addReplica();
6969
shards.startAll();
7070
final IndexShard replica = shards.getReplicas().get(0);
71-
boolean softDeletesEnabled = replica.indexSettings().isSoftDeleteEnabled();
72-
assertThat(getTranslog(replica).totalOperations(), equalTo(softDeletesEnabled ? moreDocs : docs + moreDocs));
71+
assertThat(getTranslog(replica).totalOperations(), equalTo(docs + moreDocs));
7372
shards.assertAllEqual(docs + moreDocs);
7473
}
7574
}
@@ -282,8 +281,7 @@ public void testDifferentHistoryUUIDDisablesOPsRecovery() throws Exception {
282281
shards.recoverReplica(newReplica);
283282
// file based recovery should be made
284283
assertThat(newReplica.recoveryState().getIndex().fileDetails(), not(empty()));
285-
boolean softDeletesEnabled = replica.indexSettings().isSoftDeleteEnabled();
286-
assertThat(getTranslog(newReplica).totalOperations(), equalTo(softDeletesEnabled ? nonFlushedDocs : numDocs));
284+
assertThat(getTranslog(newReplica).totalOperations(), equalTo(numDocs));
287285

288286
// history uuid was restored
289287
assertThat(newReplica.getHistoryUUID(), equalTo(historyUUID));
@@ -387,8 +385,7 @@ public void testShouldFlushAfterPeerRecovery() throws Exception {
387385
shards.recoverReplica(replica);
388386
// Make sure the flushing will eventually be completed (eg. `shouldPeriodicallyFlush` is false)
389387
assertBusy(() -> assertThat(getEngine(replica).shouldPeriodicallyFlush(), equalTo(false)));
390-
boolean softDeletesEnabled = replica.indexSettings().isSoftDeleteEnabled();
391-
assertThat(getTranslog(replica).totalOperations(), equalTo(softDeletesEnabled ? 0 : numDocs));
388+
assertThat(getTranslog(replica).totalOperations(), equalTo(numDocs));
392389
shards.assertAllEqual(numDocs);
393390
}
394391
}

0 commit comments

Comments
 (0)