Skip to content

Commit e2b931e

Browse files
authored
Use Lucene history in primary-replica resync (#33178)
This commit makes primary-replica resyncer use Lucene as the source of history operation instead of translog if soft-deletes is enabled. With this change, we no longer expose translog snapshot directly in IndexShard. Relates #29530
1 parent 5954354 commit e2b931e

File tree

6 files changed

+15
-31
lines changed

6 files changed

+15
-31
lines changed

server/src/main/java/org/elasticsearch/index/engine/Engine.java

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -592,12 +592,6 @@ public enum SearcherScope {
592592
*/
593593
public abstract Closeable acquireRetentionLockForPeerRecovery();
594594

595-
/**
596-
* Creates a new translog snapshot from this engine for reading translog operations whose seq# in the provided range.
597-
* The caller has to close the returned snapshot after finishing the reading.
598-
*/
599-
public abstract Translog.Snapshot newSnapshotFromMinSeqNo(long minSeqNo) throws IOException;
600-
601595
public abstract TranslogStats getTranslogStats();
602596

603597
/**

server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -480,11 +480,6 @@ public void syncTranslog() throws IOException {
480480
revisitIndexDeletionPolicyOnTranslogSynced();
481481
}
482482

483-
@Override
484-
public Translog.Snapshot newSnapshotFromMinSeqNo(long minSeqNo) throws IOException {
485-
return getTranslog().newSnapshotFromMinSeqNo(minSeqNo);
486-
}
487-
488483
/**
489484
* Creates a new history snapshot for reading operations since the provided seqno.
490485
* The returned snapshot can be retrieved from either Lucene index or translog files.

server/src/main/java/org/elasticsearch/index/shard/IndexShard.java

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1635,15 +1635,6 @@ public Closeable acquireRetentionLockForPeerRecovery() {
16351635
return getEngine().acquireRetentionLockForPeerRecovery();
16361636
}
16371637

1638-
/**
1639-
* Creates a new translog snapshot for reading translog operations whose seq# at least the provided seq#.
1640-
* The caller has to close the returned snapshot after finishing the reading.
1641-
*/
1642-
public Translog.Snapshot newTranslogSnapshotFromMinSeqNo(long minSeqNo) throws IOException {
1643-
// TODO: Remove this method after primary-replica resync use soft-deletes
1644-
return getEngine().newSnapshotFromMinSeqNo(minSeqNo);
1645-
}
1646-
16471638
/**
16481639
* Returns the estimated number of history operations whose seq# at least the provided seq# in this shard.
16491640
*/

server/src/main/java/org/elasticsearch/index/shard/PrimaryReplicaSyncer.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -89,8 +89,7 @@ public void resync(final IndexShard indexShard, final ActionListener<ResyncTask>
8989
// Wrap translog snapshot to make it synchronized as it is accessed by different threads through SnapshotSender.
9090
// Even though those calls are not concurrent, snapshot.next() uses non-synchronized state and is not multi-thread-compatible
9191
// Also fail the resync early if the shard is shutting down
92-
// TODO: A follow-up to make resync using soft-deletes
93-
snapshot = indexShard.newTranslogSnapshotFromMinSeqNo(startingSeqNo);
92+
snapshot = indexShard.getHistoryOperations("resync", startingSeqNo);
9493
final Translog.Snapshot originalSnapshot = snapshot;
9594
final Translog.Snapshot wrappedSnapshot = new Translog.Snapshot() {
9695
@Override

server/src/test/java/org/elasticsearch/index/shard/PrimaryReplicaSyncerTests.java

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -106,17 +106,22 @@ public void testSyncerSendsOffCorrectDocuments() throws Exception {
106106
.isPresent(),
107107
is(false));
108108
}
109-
110-
assertEquals(globalCheckPoint == numDocs - 1 ? 0 : numDocs, resyncTask.getTotalOperations());
111109
if (syncNeeded && globalCheckPoint < numDocs - 1) {
112-
long skippedOps = globalCheckPoint + 1; // everything up to global checkpoint included
113-
assertEquals(skippedOps, resyncTask.getSkippedOperations());
114-
assertEquals(numDocs - skippedOps, resyncTask.getResyncedOperations());
110+
if (shard.indexSettings.isSoftDeleteEnabled()) {
111+
assertThat(resyncTask.getSkippedOperations(), equalTo(0));
112+
assertThat(resyncTask.getResyncedOperations(), equalTo(resyncTask.getTotalOperations()));
113+
assertThat(resyncTask.getTotalOperations(), equalTo(Math.toIntExact(numDocs - 1 - globalCheckPoint)));
114+
} else {
115+
int skippedOps = Math.toIntExact(globalCheckPoint + 1); // everything up to global checkpoint included
116+
assertThat(resyncTask.getSkippedOperations(), equalTo(skippedOps));
117+
assertThat(resyncTask.getResyncedOperations(), equalTo(numDocs - skippedOps));
118+
assertThat(resyncTask.getTotalOperations(), equalTo(globalCheckPoint == numDocs - 1 ? 0 : numDocs));
119+
}
115120
} else {
116-
assertEquals(0, resyncTask.getSkippedOperations());
117-
assertEquals(0, resyncTask.getResyncedOperations());
121+
assertThat(resyncTask.getSkippedOperations(), equalTo(0));
122+
assertThat(resyncTask.getResyncedOperations(), equalTo(0));
123+
assertThat(resyncTask.getTotalOperations(), equalTo(0));
118124
}
119-
120125
closeShards(shard);
121126
}
122127

x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/bulk/BulkShardOperationsTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ public void testPrimaryTermFromFollower() throws IOException {
6161
final TransportWriteAction.WritePrimaryResult<BulkShardOperationsRequest, BulkShardOperationsResponse> result =
6262
TransportBulkShardOperationsAction.shardOperationOnPrimary(followerPrimary.shardId(), operations, followerPrimary, logger);
6363

64-
try (Translog.Snapshot snapshot = followerPrimary.newTranslogSnapshotFromMinSeqNo(0)) {
64+
try (Translog.Snapshot snapshot = followerPrimary.getHistoryOperations("test", 0)) {
6565
assertThat(snapshot.totalOperations(), equalTo(operations.size()));
6666
Translog.Operation operation;
6767
while ((operation = snapshot.next()) != null) {

0 commit comments

Comments
 (0)