Skip to content

Commit e39689a

Browse files
authored
Send only ops after checkpoint in file-based recovery with soft-deletes (#33190)
Today a file-based recovery will replay all existing translog operations from the primary on a replica so that that replica can have a full history in translog as the primary. However, with soft-deletes enabled, we should not do it because: 1. All operations before the local checkpoint of the safe commit exist in the commit already. 2. The number of operations before the local checkpoint may be considerable and requires a significant amount of time to replay on a replica. Relates #30522 Relates #29530
1 parent e2b931e commit e39689a

File tree

4 files changed

+84
-33
lines changed

4 files changed

+84
-33
lines changed

server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -162,12 +162,13 @@ public RecoveryResponse recoverToTarget() throws IOException {
162162
} catch (final Exception e) {
163163
throw new RecoveryEngineException(shard.shardId(), 1, "snapshot failed", e);
164164
}
165-
// we set this to 0 to create a translog roughly according to the retention policy
166-
// on the target. Note that it will still filter out legacy operations with no sequence numbers
167-
startingSeqNo = 0; //TODO: A follow-up to send only ops above the local checkpoint if soft-deletes enabled.
168-
// but we must have everything above the local checkpoint in the commit
165+
// We must have everything above the local checkpoint in the commit
169166
requiredSeqNoRangeStart =
170167
Long.parseLong(phase1Snapshot.getIndexCommit().getUserData().get(SequenceNumbers.LOCAL_CHECKPOINT_KEY)) + 1;
168+
// If soft-deletes enabled, we need to transfer only operations after the local_checkpoint of the commit to have
169+
// the same history on the target. However, with translog, we need to set this to 0 to create a translog roughly
170+
// according to the retention policy on the target. Note that it will still filter out legacy operations without seqNo.
171+
startingSeqNo = shard.indexSettings().isSoftDeleteEnabled() ? requiredSeqNoRangeStart : 0;
171172
try {
172173
final int estimateNumOps = shard.estimateNumberOfHistoryOperations("peer-recovery", startingSeqNo);
173174
phase1(phase1Snapshot.getIndexCommit(), () -> estimateNumOps);

server/src/test/java/org/elasticsearch/gateway/RecoveryFromGatewayIT.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -482,9 +482,7 @@ public Settings onNodeStopped(String nodeName) throws Exception {
482482
assertThat("all existing files should be reused, file count mismatch", recoveryState.getIndex().reusedFileCount(), equalTo(filesReused));
483483
assertThat(recoveryState.getIndex().reusedFileCount(), equalTo(recoveryState.getIndex().totalFileCount() - filesRecovered));
484484
assertThat("> 0 files should be reused", recoveryState.getIndex().reusedFileCount(), greaterThan(0));
485-
// both cases will be zero once we start sending only ops after local checkpoint of the safe commit
486-
int expectedTranslogOps = softDeleteEnabled ? numDocs + moreDocs : 0;
487-
assertThat("no translog ops should be recovered", recoveryState.getTranslog().recoveredOperations(), equalTo(expectedTranslogOps));
485+
assertThat("no translog ops should be recovered", recoveryState.getTranslog().recoveredOperations(), equalTo(0));
488486
}
489487
}
490488
}

server/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java

Lines changed: 7 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -219,8 +219,7 @@ public void testRecoveryToReplicaThatReceivedExtraDocument() throws Exception {
219219

220220
@TestLogging("org.elasticsearch.index.shard:TRACE,org.elasticsearch.indices.recovery:TRACE")
221221
public void testRecoveryAfterPrimaryPromotion() throws Exception {
222-
Settings settings = Settings.builder().put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true).build();
223-
try (ReplicationGroup shards = createGroup(2, settings)) {
222+
try (ReplicationGroup shards = createGroup(2)) {
224223
shards.startAll();
225224
int totalDocs = shards.indexDocs(randomInt(10));
226225
int committedDocs = 0;
@@ -232,7 +231,6 @@ public void testRecoveryAfterPrimaryPromotion() throws Exception {
232231
final IndexShard oldPrimary = shards.getPrimary();
233232
final IndexShard newPrimary = shards.getReplicas().get(0);
234233
final IndexShard replica = shards.getReplicas().get(1);
235-
boolean softDeleteEnabled = replica.indexSettings().isSoftDeleteEnabled();
236234
if (randomBoolean()) {
237235
// simulate docs that were inflight when primary failed, these will be rolled back
238236
final int rollbackDocs = randomIntBetween(1, 5);
@@ -280,12 +278,13 @@ public void testRecoveryAfterPrimaryPromotion() throws Exception {
280278
assertThat(newPrimary.getLastSyncedGlobalCheckpoint(), equalTo(newPrimary.seqNoStats().getMaxSeqNo()));
281279
});
282280
newPrimary.flush(new FlushRequest().force(true));
283-
uncommittedOpsOnPrimary = shards.indexDocs(randomIntBetween(0, 10));
284-
totalDocs += uncommittedOpsOnPrimary;
285-
// we need an extra flush or refresh to advance the min_retained_seqno on the new primary so that ops-based won't happen
286-
if (softDeleteEnabled) {
281+
if (replica.indexSettings().isSoftDeleteEnabled()) {
282+
// We need an extra flush to advance the min_retained_seqno on the new primary so ops-based won't happen.
283+
// The min_retained_seqno only advances when a merge asks for the retention query.
287284
newPrimary.flush(new FlushRequest().force(true));
288285
}
286+
uncommittedOpsOnPrimary = shards.indexDocs(randomIntBetween(0, 10));
287+
totalDocs += uncommittedOpsOnPrimary;
289288
}
290289

291290
if (randomBoolean()) {
@@ -305,8 +304,7 @@ public void testRecoveryAfterPrimaryPromotion() throws Exception {
305304
assertThat(newReplica.recoveryState().getTranslog().recoveredOperations(), equalTo(totalDocs - committedDocs));
306305
} else {
307306
assertThat(newReplica.recoveryState().getIndex().fileDetails(), not(empty()));
308-
int expectOps = softDeleteEnabled ? totalDocs : uncommittedOpsOnPrimary;
309-
assertThat(newReplica.recoveryState().getTranslog().recoveredOperations(), equalTo(expectOps));
307+
assertThat(newReplica.recoveryState().getTranslog().recoveredOperations(), equalTo(uncommittedOpsOnPrimary));
310308
}
311309

312310
// roll back the extra ops in the replica

server/src/test/java/org/elasticsearch/indices/recovery/RecoveryTests.java

Lines changed: 71 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -64,13 +64,13 @@ public void testTranslogHistoryTransferred() throws Exception {
6464
int docs = shards.indexDocs(10);
6565
getTranslog(shards.getPrimary()).rollGeneration();
6666
shards.flush();
67-
if (randomBoolean()) {
68-
docs += shards.indexDocs(10);
69-
}
67+
int moreDocs = shards.indexDocs(randomInt(10));
7068
shards.addReplica();
7169
shards.startAll();
7270
final IndexShard replica = shards.getReplicas().get(0);
73-
assertThat(getTranslog(replica).totalOperations(), equalTo(docs));
71+
boolean softDeletesEnabled = replica.indexSettings().isSoftDeleteEnabled();
72+
assertThat(getTranslog(replica).totalOperations(), equalTo(softDeletesEnabled ? moreDocs : docs + moreDocs));
73+
shards.assertAllEqual(docs + moreDocs);
7474
}
7575
}
7676

@@ -107,7 +107,7 @@ public void testRetentionPolicyChangeDuringRecovery() throws Exception {
107107
}
108108
}
109109

110-
public void testRecoveryWithOutOfOrderDelete() throws Exception {
110+
public void testRecoveryWithOutOfOrderDeleteWithTranslog() throws Exception {
111111
/*
112112
* The flow of this test:
113113
* - delete #1
@@ -117,12 +117,9 @@ public void testRecoveryWithOutOfOrderDelete() throws Exception {
117117
* - flush (commit point has max_seqno 3, and local checkpoint 1 -> points at gen 2, previous commit point is maintained)
118118
* - index #2
119119
* - index #5
120-
* - If flush and the translog/lucene retention disabled, delete #1 will be removed while index #0 is still retained and replayed.
120+
* - If flush and the translog retention disabled, delete #1 will be removed while index #0 is still retained and replayed.
121121
*/
122-
Settings settings = Settings.builder().put(IndexSettings.INDEX_SOFT_DELETES_RETENTION_OPERATIONS_SETTING.getKey(), 10)
123-
// If soft-deletes is enabled, delete#1 will be reclaimed because its segment (segment_1) is fully deleted
124-
// index#0 will be retained if merge is disabled; otherwise it will be reclaimed because gcp=3 and retained_ops=0
125-
.put(MergePolicyConfig.INDEX_MERGE_ENABLED, false).build();
122+
Settings settings = Settings.builder().put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), false).build();
126123
try (ReplicationGroup shards = createGroup(1, settings)) {
127124
shards.startAll();
128125
// create out of order delete and index op on replica
@@ -131,7 +128,7 @@ public void testRecoveryWithOutOfOrderDelete() throws Exception {
131128

132129
// delete #1
133130
orgReplica.applyDeleteOperationOnReplica(1, 2, "type", "id");
134-
orgReplica.flush(new FlushRequest().force(true)); // isolate delete#1 in its own translog generation and lucene segment
131+
getTranslog(orgReplica).rollGeneration(); // isolate the delete in it's own generation
135132
// index #0
136133
orgReplica.applyIndexOperationOnReplica(0, 1, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false,
137134
SourceToParse.source(indexName, "type", "id", new BytesArray("{}"), XContentType.JSON));
@@ -151,17 +148,16 @@ public void testRecoveryWithOutOfOrderDelete() throws Exception {
151148
final int translogOps;
152149
if (randomBoolean()) {
153150
if (randomBoolean()) {
154-
logger.info("--> flushing shard (translog/soft-deletes will be trimmed)");
151+
logger.info("--> flushing shard (translog will be trimmed)");
155152
IndexMetaData.Builder builder = IndexMetaData.builder(orgReplica.indexSettings().getIndexMetaData());
156153
builder.settings(Settings.builder().put(orgReplica.indexSettings().getSettings())
157154
.put(IndexSettings.INDEX_TRANSLOG_RETENTION_AGE_SETTING.getKey(), "-1")
158-
.put(IndexSettings.INDEX_TRANSLOG_RETENTION_SIZE_SETTING.getKey(), "-1")
159-
.put(IndexSettings.INDEX_SOFT_DELETES_RETENTION_OPERATIONS_SETTING.getKey(), 0));
155+
.put(IndexSettings.INDEX_TRANSLOG_RETENTION_SIZE_SETTING.getKey(), "-1"));
160156
orgReplica.indexSettings().updateIndexMetaData(builder.build());
161157
orgReplica.onSettingsChanged();
162158
translogOps = 5; // 4 ops + seqno gaps (delete #1 is removed but index #0 will be replayed).
163159
} else {
164-
logger.info("--> flushing shard (translog/soft-deletes will be retained)");
160+
logger.info("--> flushing shard (translog will be retained)");
165161
translogOps = 6; // 5 ops + seqno gaps
166162
}
167163
flushShard(orgReplica);
@@ -180,6 +176,62 @@ public void testRecoveryWithOutOfOrderDelete() throws Exception {
180176
}
181177
}
182178

179+
public void testRecoveryWithOutOfOrderDeleteWithSoftDeletes() throws Exception {
180+
Settings settings = Settings.builder()
181+
.put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true)
182+
.put(IndexSettings.INDEX_SOFT_DELETES_RETENTION_OPERATIONS_SETTING.getKey(), 10)
183+
// If soft-deletes is enabled, delete#1 will be reclaimed because its segment (segment_1) is fully deleted
184+
// index#0 will be retained if merge is disabled; otherwise it will be reclaimed because gcp=3 and retained_ops=0
185+
.put(MergePolicyConfig.INDEX_MERGE_ENABLED, false).build();
186+
try (ReplicationGroup shards = createGroup(1, settings)) {
187+
shards.startAll();
188+
// create out of order delete and index op on replica
189+
final IndexShard orgReplica = shards.getReplicas().get(0);
190+
final String indexName = orgReplica.shardId().getIndexName();
191+
192+
// delete #1
193+
orgReplica.applyDeleteOperationOnReplica(1, 2, "type", "id");
194+
orgReplica.flush(new FlushRequest().force(true)); // isolate delete#1 in its own translog generation and lucene segment
195+
// index #0
196+
orgReplica.applyIndexOperationOnReplica(0, 1, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false,
197+
SourceToParse.source(indexName, "type", "id", new BytesArray("{}"), XContentType.JSON));
198+
// index #3
199+
orgReplica.applyIndexOperationOnReplica(3, 1, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false,
200+
SourceToParse.source(indexName, "type", "id-3", new BytesArray("{}"), XContentType.JSON));
201+
// Flushing a new commit with local checkpoint=1 allows to delete the translog gen #1.
202+
orgReplica.flush(new FlushRequest().force(true).waitIfOngoing(true));
203+
// index #2
204+
orgReplica.applyIndexOperationOnReplica(2, 1, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false,
205+
SourceToParse.source(indexName, "type", "id-2", new BytesArray("{}"), XContentType.JSON));
206+
orgReplica.updateGlobalCheckpointOnReplica(3L, "test");
207+
// index #5 -> force NoOp #4.
208+
orgReplica.applyIndexOperationOnReplica(5, 1, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false,
209+
SourceToParse.source(indexName, "type", "id-5", new BytesArray("{}"), XContentType.JSON));
210+
211+
if (randomBoolean()) {
212+
if (randomBoolean()) {
213+
logger.info("--> flushing shard (translog/soft-deletes will be trimmed)");
214+
IndexMetaData.Builder builder = IndexMetaData.builder(orgReplica.indexSettings().getIndexMetaData());
215+
builder.settings(Settings.builder().put(orgReplica.indexSettings().getSettings())
216+
.put(IndexSettings.INDEX_SOFT_DELETES_RETENTION_OPERATIONS_SETTING.getKey(), 0));
217+
orgReplica.indexSettings().updateIndexMetaData(builder.build());
218+
orgReplica.onSettingsChanged();
219+
}
220+
flushShard(orgReplica);
221+
}
222+
223+
final IndexShard orgPrimary = shards.getPrimary();
224+
shards.promoteReplicaToPrimary(orgReplica).get(); // wait for primary/replica sync to make sure seq# gap is closed.
225+
226+
IndexShard newReplica = shards.addReplicaWithExistingPath(orgPrimary.shardPath(), orgPrimary.routingEntry().currentNodeId());
227+
shards.recoverReplica(newReplica);
228+
shards.assertAllEqual(3);
229+
try (Translog.Snapshot snapshot = newReplica.getHistoryOperations("test", 0)) {
230+
assertThat(snapshot, SnapshotMatchers.size(6));
231+
}
232+
}
233+
}
234+
183235
public void testDifferentHistoryUUIDDisablesOPsRecovery() throws Exception {
184236
try (ReplicationGroup shards = createGroup(1)) {
185237
shards.startAll();
@@ -228,7 +280,8 @@ public void testDifferentHistoryUUIDDisablesOPsRecovery() throws Exception {
228280
shards.recoverReplica(newReplica);
229281
// file based recovery should be made
230282
assertThat(newReplica.recoveryState().getIndex().fileDetails(), not(empty()));
231-
assertThat(getTranslog(newReplica).totalOperations(), equalTo(numDocs));
283+
boolean softDeletesEnabled = replica.indexSettings().isSoftDeleteEnabled();
284+
assertThat(getTranslog(newReplica).totalOperations(), equalTo(softDeletesEnabled ? nonFlushedDocs : numDocs));
232285

233286
// history uuid was restored
234287
assertThat(newReplica.getHistoryUUID(), equalTo(historyUUID));
@@ -332,7 +385,8 @@ public void testShouldFlushAfterPeerRecovery() throws Exception {
332385
shards.recoverReplica(replica);
333386
// Make sure the flushing will eventually be completed (eg. `shouldPeriodicallyFlush` is false)
334387
assertBusy(() -> assertThat(getEngine(replica).shouldPeriodicallyFlush(), equalTo(false)));
335-
assertThat(getTranslog(replica).totalOperations(), equalTo(numDocs));
388+
boolean softDeletesEnabled = replica.indexSettings().isSoftDeleteEnabled();
389+
assertThat(getTranslog(replica).totalOperations(), equalTo(softDeletesEnabled ? 0 : numDocs));
336390
shards.assertAllEqual(numDocs);
337391
}
338392
}

0 commit comments

Comments
 (0)