Skip to content

Commit c1b3afc

Browse files
committed
Introduce soft-deletes retention policy based on global checkpoint (#30335)
This commit introduces a soft-deletes retention merge policy based on the global checkpoint. Some notes on this simple retention policy: - This policy keeps all operations whose seq# is greater than the persisted global checkpoint and configurable extra operations prior to the global checkpoint. This is good enough for querying history changes. - This policy is not watertight for peer-recovery. We send the safe-commit in peer-recovery, thus we need to also send all operations after the local checkpoint of that commit. This is analog to the min translog generation for recovery. - This policy is too simple to support rollback. Relates #29530
1 parent b8ec81e commit c1b3afc

File tree

7 files changed

+180
-18
lines changed

7 files changed

+180
-18
lines changed

server/src/main/java/org/elasticsearch/common/settings/IndexScopedSettings.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -131,6 +131,7 @@ public final class IndexScopedSettings extends AbstractScopedSettings {
131131
ShardsLimitAllocationDecider.INDEX_TOTAL_SHARDS_PER_NODE_SETTING,
132132
IndexSettings.INDEX_GC_DELETES_SETTING,
133133
IndexSettings.INDEX_SOFT_DELETES_SETTING,
134+
IndexSettings.INDEX_SOFT_DELETES_RETENTION_OPERATIONS_SETTING,
134135
IndicesRequestCache.INDEX_CACHE_REQUEST_ENABLED_SETTING,
135136
UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING,
136137
EnableAllocationDecider.INDEX_ROUTING_REBALANCE_ENABLE_SETTING,

server/src/main/java/org/elasticsearch/index/IndexSettings.java

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -249,6 +249,14 @@ public final class IndexSettings {
249249
public static final Setting<Boolean> INDEX_SOFT_DELETES_SETTING =
250250
Setting.boolSetting("index.soft_deletes.enabled", true, Property.IndexScope);
251251

252+
/**
253+
* Controls how many soft-deleted documents will be kept around before being merged away. Keeping more deleted
254+
* documents increases the chance of operation-based recoveries and allows querying a longer history of documents.
255+
* If soft-deletes is enabled, an engine by default will retain all operations up to the global checkpoint.
256+
**/
257+
public static final Setting<Long> INDEX_SOFT_DELETES_RETENTION_OPERATIONS_SETTING =
258+
Setting.longSetting("index.soft_deletes.retention.operations", 0, 0, Property.IndexScope, Property.Dynamic);
259+
252260
/**
253261
* The maximum number of refresh listeners allows on this shard.
254262
*/
@@ -302,6 +310,7 @@ public final class IndexSettings {
302310
private final IndexScopedSettings scopedSettings;
303311
private long gcDeletesInMillis = DEFAULT_GC_DELETES.millis();
304312
private final boolean softDeleteEnabled;
313+
private volatile long softDeleteRetentionOperations;
305314
private volatile boolean warmerEnabled;
306315
private volatile int maxResultWindow;
307316
private volatile int maxInnerResultWindow;
@@ -410,6 +419,7 @@ public IndexSettings(final IndexMetaData indexMetaData, final Settings nodeSetti
410419
mergeSchedulerConfig = new MergeSchedulerConfig(this);
411420
gcDeletesInMillis = scopedSettings.get(INDEX_GC_DELETES_SETTING).getMillis();
412421
softDeleteEnabled = version.onOrAfter(Version.V_6_4_0) && scopedSettings.get(INDEX_SOFT_DELETES_SETTING);
422+
softDeleteRetentionOperations = scopedSettings.get(INDEX_SOFT_DELETES_RETENTION_OPERATIONS_SETTING);
413423
warmerEnabled = scopedSettings.get(INDEX_WARMER_ENABLED_SETTING);
414424
maxResultWindow = scopedSettings.get(MAX_RESULT_WINDOW_SETTING);
415425
maxInnerResultWindow = scopedSettings.get(MAX_INNER_RESULT_WINDOW_SETTING);
@@ -466,6 +476,7 @@ public IndexSettings(final IndexMetaData indexMetaData, final Settings nodeSetti
466476
scopedSettings.addSettingsUpdateConsumer(MAX_TERMS_COUNT_SETTING, this::setMaxTermsCount);
467477
scopedSettings.addSettingsUpdateConsumer(MAX_SLICES_PER_SCROLL, this::setMaxSlicesPerScroll);
468478
scopedSettings.addSettingsUpdateConsumer(DEFAULT_FIELD_SETTING, this::setDefaultFields);
479+
scopedSettings.addSettingsUpdateConsumer(INDEX_SOFT_DELETES_RETENTION_OPERATIONS_SETTING, this::setSoftDeleteRetentionOperations);
469480
}
470481

471482
private void setTranslogFlushThresholdSize(ByteSizeValue byteSizeValue) {
@@ -822,4 +833,15 @@ public IndexSortConfig getIndexSortConfig() {
822833
public boolean isSoftDeleteEnabled() {
823834
return softDeleteEnabled;
824835
}
836+
837+
private void setSoftDeleteRetentionOperations(long ops) {
838+
this.softDeleteRetentionOperations = ops;
839+
}
840+
841+
/**
842+
* Returns the number of extra operations (i.e. soft-deleted documents) to be kept for recoveries and history purpose.
843+
*/
844+
public long getSoftDeleteRetentionOperations() {
845+
return this.softDeleteRetentionOperations;
846+
}
825847
}

server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import org.apache.logging.log4j.Logger;
2323
import org.apache.logging.log4j.message.ParameterizedMessage;
2424
import org.apache.lucene.document.Field;
25+
import org.apache.lucene.document.LongPoint;
2526
import org.apache.lucene.document.NumericDocValuesField;
2627
import org.apache.lucene.index.DirectoryReader;
2728
import org.apache.lucene.index.IndexCommit;
@@ -34,8 +35,10 @@
3435
import org.apache.lucene.index.MergePolicy;
3536
import org.apache.lucene.index.SegmentCommitInfo;
3637
import org.apache.lucene.index.SegmentInfos;
38+
import org.apache.lucene.index.SoftDeletesRetentionMergePolicy;
3739
import org.apache.lucene.index.Term;
3840
import org.apache.lucene.search.IndexSearcher;
41+
import org.apache.lucene.search.Query;
3942
import org.apache.lucene.search.ReferenceManager;
4043
import org.apache.lucene.search.SearcherFactory;
4144
import org.apache.lucene.search.SearcherManager;
@@ -2059,8 +2062,8 @@ private IndexWriterConfig getIndexWriterConfig() {
20592062
// background merges
20602063
MergePolicy mergePolicy = config().getMergePolicy();
20612064
if (softDeleteEnabled) {
2062-
// TODO: soft-delete retention policy
20632065
iwc.setSoftDeletesField(Lucene.SOFT_DELETE_FIELD);
2066+
mergePolicy = new SoftDeletesRetentionMergePolicy(Lucene.SOFT_DELETE_FIELD, this::softDeletesRetentionQuery, mergePolicy);
20642067
}
20652068
iwc.setMergePolicy(new ElasticsearchMergePolicy(mergePolicy));
20662069
iwc.setSimilarity(engineConfig.getSimilarity());
@@ -2073,6 +2076,20 @@ private IndexWriterConfig getIndexWriterConfig() {
20732076
return iwc;
20742077
}
20752078

2079+
/**
2080+
* Documents including tombstones are soft-deleted and matched this query will be retained and won't cleaned up by merges.
2081+
*/
2082+
private Query softDeletesRetentionQuery() {
2083+
ensureOpen();
2084+
// TODO: We send the safe commit in peer-recovery, thus we need to retain all operations after the local checkpoint of that commit.
2085+
final long retainedExtraOps = engineConfig.getIndexSettings().getSoftDeleteRetentionOperations();
2086+
// Prefer using the global checkpoint which is persisted on disk than an in-memory value.
2087+
// If we failed to fsync checkpoint but already used a higher global checkpoint value to clean up soft-deleted ops,
2088+
// then we may not have all required operations whose seq# greater than the global checkpoint after restarted.
2089+
final long persistedGlobalCheckpoint = translog.getLastSyncedGlobalCheckpoint();
2090+
return LongPoint.newRangeQuery(SeqNoFieldMapper.NAME, persistedGlobalCheckpoint + 1 - retainedExtraOps, Long.MAX_VALUE);
2091+
}
2092+
20762093
/** Extended SearcherFactory that warms the segments if needed when acquiring a new searcher */
20772094
static final class SearchFactory extends EngineSearcherFactory {
20782095
private final Engine.Warmer warmer;

server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java

Lines changed: 79 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -179,6 +179,7 @@
179179
import static org.hamcrest.Matchers.hasItem;
180180
import static org.hamcrest.Matchers.hasKey;
181181
import static org.hamcrest.Matchers.hasSize;
182+
import static org.hamcrest.Matchers.isIn;
182183
import static org.hamcrest.Matchers.lessThanOrEqualTo;
183184
import static org.hamcrest.Matchers.not;
184185
import static org.hamcrest.Matchers.notNullValue;
@@ -252,8 +253,9 @@ public void testVersionMapAfterAutoIDDocument() throws IOException {
252253
}
253254

254255
public void testSegments() throws Exception {
256+
final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED);
255257
try (Store store = createStore();
256-
InternalEngine engine = createEngine(defaultSettings, store, createTempDir(), NoMergePolicy.INSTANCE)) {
258+
InternalEngine engine = createEngine(config(defaultSettings, store, createTempDir(), NoMergePolicy.INSTANCE, null, null, globalCheckpoint::get))) {
257259
List<Segment> segments = engine.segments(false);
258260
assertThat(segments.isEmpty(), equalTo(true));
259261
assertThat(engine.segmentsStats(false).getCount(), equalTo(0L));
@@ -325,6 +327,8 @@ public void testSegments() throws Exception {
325327

326328

327329
engine.delete(new Engine.Delete("test", "1", newUid(doc), primaryTerm.get()));
330+
globalCheckpoint.set(engine.getLocalCheckpointTracker().getCheckpoint());
331+
engine.getTranslog().sync();
328332
engine.refresh("test");
329333

330334
segments = engine.segments(false);
@@ -1280,9 +1284,13 @@ public void testVersioningNewIndex() throws IOException {
12801284
assertThat(indexResult.getVersion(), equalTo(1L));
12811285
}
12821286

1283-
public void testForceMerge() throws IOException {
1287+
public void testForceMergeWithoutSoftDeletes() throws IOException {
1288+
Settings settings = Settings.builder()
1289+
.put(defaultSettings.getSettings())
1290+
.put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), false).build();
1291+
IndexMetaData indexMetaData = IndexMetaData.builder(defaultSettings.getIndexMetaData()).settings(settings).build();
12841292
try (Store store = createStore();
1285-
Engine engine = createEngine(config(defaultSettings, store, createTempDir(),
1293+
Engine engine = createEngine(config(IndexSettingsModule.newIndexSettings(indexMetaData), store, createTempDir(),
12861294
new LogByteSizeMergePolicy(), null))) { // use log MP here we test some behavior in ESMP
12871295
int numDocs = randomIntBetween(10, 100);
12881296
for (int i = 0; i < numDocs; i++) {
@@ -1323,6 +1331,66 @@ public void testForceMerge() throws IOException {
13231331
}
13241332
}
13251333

1334+
public void testForceMergeWithSoftDeletesRetention() throws Exception {
1335+
final long retainedExtraOps = randomLongBetween(0, 10);
1336+
Settings.Builder settings = Settings.builder()
1337+
.put(defaultSettings.getSettings())
1338+
.put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true)
1339+
.put(IndexSettings.INDEX_SOFT_DELETES_RETENTION_OPERATIONS_SETTING.getKey(), retainedExtraOps);
1340+
final IndexMetaData indexMetaData = IndexMetaData.builder(defaultSettings.getIndexMetaData()).settings(settings).build();
1341+
final IndexSettings indexSettings = IndexSettingsModule.newIndexSettings(indexMetaData);
1342+
final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED);
1343+
final MapperService mapperService = createMapperService("test");
1344+
final Set<String> liveDocs = new HashSet<>();
1345+
try (Store store = createStore();
1346+
Engine engine = createEngine(config(indexSettings, store, createTempDir(), newMergePolicy(), null, null, globalCheckpoint::get))) {
1347+
int numDocs = scaledRandomIntBetween(10, 100);
1348+
for (int i = 0; i < numDocs; i++) {
1349+
ParsedDocument doc = testParsedDocument(Integer.toString(i), null, testDocument(), B_1, null);
1350+
engine.index(indexForDoc(doc));
1351+
liveDocs.add(doc.id());
1352+
}
1353+
for (int i = 0; i < numDocs; i++) {
1354+
ParsedDocument doc = testParsedDocument(Integer.toString(i), null, testDocument(), B_1, null);
1355+
if (randomBoolean()) {
1356+
engine.delete(new Engine.Delete(doc.type(), doc.id(), newUid(doc.id()), primaryTerm.get()));
1357+
liveDocs.remove(doc.id());
1358+
}
1359+
if (randomBoolean()) {
1360+
engine.index(indexForDoc(doc));
1361+
liveDocs.add(doc.id());
1362+
}
1363+
}
1364+
long localCheckpoint = engine.getLocalCheckpointTracker().getCheckpoint();
1365+
globalCheckpoint.set(randomLongBetween(0, localCheckpoint));
1366+
engine.getTranslog().sync();
1367+
engine.forceMerge(true, 1, false, false, false);
1368+
assertConsistentHistoryBetweenTranslogAndLuceneIndex(engine, mapperService);
1369+
Map<Long, Translog.Operation> ops = readAllOperationsInLucene(engine, mapperService)
1370+
.stream().collect(Collectors.toMap(Translog.Operation::seqNo, Function.identity()));
1371+
for (long seqno = 0; seqno <= localCheckpoint; seqno++) {
1372+
long keptIndex = globalCheckpoint.get() + 1 - retainedExtraOps;
1373+
String msg = "seq# [" + seqno + "], global checkpoint [" + globalCheckpoint + "], retained-ops [" + retainedExtraOps + "]";
1374+
if (seqno < keptIndex) {
1375+
Translog.Operation op = ops.get(seqno);
1376+
if (op != null) {
1377+
assertThat(op, instanceOf(Translog.Index.class));
1378+
assertThat(msg, ((Translog.Index) op).id(), isIn(liveDocs));
1379+
}
1380+
} else {
1381+
assertThat(msg, ops.get(seqno), notNullValue());
1382+
}
1383+
}
1384+
settings.put(IndexSettings.INDEX_SOFT_DELETES_RETENTION_OPERATIONS_SETTING.getKey(), 0);
1385+
indexSettings.updateIndexMetaData(IndexMetaData.builder(defaultSettings.getIndexMetaData()).settings(settings).build());
1386+
globalCheckpoint.set(localCheckpoint);
1387+
engine.getTranslog().sync();
1388+
engine.forceMerge(true, 1, false, false, false);
1389+
assertConsistentHistoryBetweenTranslogAndLuceneIndex(engine, mapperService);
1390+
assertThat(readAllOperationsInLucene(engine, mapperService), hasSize(liveDocs.size()));
1391+
}
1392+
}
1393+
13261394
public void testForceMergeAndClose() throws IOException, InterruptedException {
13271395
int numIters = randomIntBetween(2, 10);
13281396
for (int j = 0; j < numIters; j++) {
@@ -2544,14 +2612,16 @@ public void testSkipTranslogReplay() throws IOException {
25442612
Engine.IndexResult indexResult = engine.index(firstIndexRequest);
25452613
assertThat(indexResult.getVersion(), equalTo(1L));
25462614
}
2615+
EngineConfig config = engine.config();
25472616
assertVisibleCount(engine, numDocs);
25482617
engine.close();
2549-
trimUnsafeCommits(engine.config());
2550-
engine = new InternalEngine(engine.config());
2551-
engine.skipTranslogRecovery();
2552-
try (Engine.Searcher searcher = engine.acquireSearcher("test")) {
2553-
TopDocs topDocs = searcher.searcher().search(new MatchAllDocsQuery(), randomIntBetween(numDocs, numDocs + 10));
2554-
assertThat(topDocs.totalHits, equalTo(0L));
2618+
trimUnsafeCommits(config);
2619+
try (InternalEngine engine = new InternalEngine(config)) {
2620+
engine.skipTranslogRecovery();
2621+
try (Engine.Searcher searcher = engine.acquireSearcher("test")) {
2622+
TopDocs topDocs = searcher.searcher().search(new MatchAllDocsQuery(), randomIntBetween(numDocs, numDocs + 10));
2623+
assertThat(topDocs.totalHits, equalTo(0L));
2624+
}
25552625
}
25562626
}
25572627

server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2347,7 +2347,16 @@ public void testDocStats() throws IOException {
23472347
deleteDoc(indexShard, "_doc", id);
23482348
indexDoc(indexShard, "_doc", id);
23492349
}
2350-
2350+
// Need to update and sync the global checkpoint as the soft-deletes retention MergePolicy depends on it.
2351+
if (indexShard.indexSettings.isSoftDeleteEnabled()) {
2352+
if (indexShard.routingEntry().primary()) {
2353+
indexShard.updateGlobalCheckpointForShard(indexShard.routingEntry().allocationId().getId(),
2354+
indexShard.getLocalCheckpoint());
2355+
} else {
2356+
indexShard.updateGlobalCheckpointOnReplica(indexShard.getLocalCheckpoint(), "test");
2357+
}
2358+
indexShard.sync();
2359+
}
23512360
// flush the buffered deletes
23522361
final FlushRequest flushRequest = new FlushRequest();
23532362
flushRequest.force(false);
@@ -2764,6 +2773,7 @@ public void testSegmentMemoryTrackedInBreaker() throws Exception {
27642773

27652774
// Deleting a doc causes its memory to be freed from the breaker
27662775
deleteDoc(primary, "_doc", "0");
2776+
primary.sync(); // need to sync global checkpoint as the soft-deletes retention MergePolicy depends on it.
27672777
primary.refresh("force refresh");
27682778

27692779
ss = primary.segmentStats(randomBoolean());

server/src/test/java/org/elasticsearch/indices/stats/IndexStatsIT.java

Lines changed: 30 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,13 +43,15 @@
4343
import org.elasticsearch.common.settings.Settings;
4444
import org.elasticsearch.common.xcontent.XContentType;
4545
import org.elasticsearch.index.IndexModule;
46+
import org.elasticsearch.index.IndexService;
4647
import org.elasticsearch.index.IndexSettings;
4748
import org.elasticsearch.index.MergePolicyConfig;
4849
import org.elasticsearch.index.MergeSchedulerConfig;
4950
import org.elasticsearch.index.VersionType;
5051
import org.elasticsearch.index.cache.query.QueryCacheStats;
5152
import org.elasticsearch.index.engine.VersionConflictEngineException;
5253
import org.elasticsearch.index.query.QueryBuilders;
54+
import org.elasticsearch.index.shard.IndexShard;
5355
import org.elasticsearch.index.translog.Translog;
5456
import org.elasticsearch.indices.IndicesQueryCache;
5557
import org.elasticsearch.indices.IndicesRequestCache;
@@ -69,6 +71,7 @@
6971
import java.util.EnumSet;
7072
import java.util.List;
7173
import java.util.Random;
74+
import java.util.Set;
7275
import java.util.concurrent.BrokenBarrierException;
7376
import java.util.concurrent.CopyOnWriteArrayList;
7477
import java.util.concurrent.CountDownLatch;
@@ -1028,10 +1031,15 @@ private void assertCumulativeQueryCacheStats(IndicesStatsResponse response) {
10281031
}
10291032

10301033
public void testFilterCacheStats() throws Exception {
1031-
assertAcked(prepareCreate("index").setSettings(Settings.builder().put(indexSettings()).put("number_of_replicas", 0).build()).get());
1032-
indexRandom(true,
1034+
Settings settings = Settings.builder().put(indexSettings()).put("number_of_replicas", 0).build();
1035+
assertAcked(prepareCreate("index").setSettings(settings).get());
1036+
indexRandom(false, true,
10331037
client().prepareIndex("index", "type", "1").setSource("foo", "bar"),
10341038
client().prepareIndex("index", "type", "2").setSource("foo", "baz"));
1039+
if (IndexSettings.INDEX_SOFT_DELETES_SETTING.get(settings)) {
1040+
persistGlobalCheckpoint("index"); // Need to persist the global checkpoint for the soft-deletes retention MP.
1041+
}
1042+
refresh();
10351043
ensureGreen();
10361044

10371045
IndicesStatsResponse response = client().admin().indices().prepareStats("index").setQueryCache(true).get();
@@ -1062,6 +1070,9 @@ public void testFilterCacheStats() throws Exception {
10621070

10631071
assertEquals(DocWriteResponse.Result.DELETED, client().prepareDelete("index", "type", "1").get().getResult());
10641072
assertEquals(DocWriteResponse.Result.DELETED, client().prepareDelete("index", "type", "2").get().getResult());
1073+
if (IndexSettings.INDEX_SOFT_DELETES_SETTING.get(settings)) {
1074+
persistGlobalCheckpoint("index"); // Need to persist the global checkpoint for the soft-deletes retention MP.
1075+
}
10651076
refresh();
10661077
response = client().admin().indices().prepareStats("index").setQueryCache(true).get();
10671078
assertCumulativeQueryCacheStats(response);
@@ -1195,4 +1206,21 @@ public void testConcurrentIndexingAndStatsRequests() throws BrokenBarrierExcepti
11951206
assertThat(executionFailures.get(), emptyCollectionOf(Exception.class));
11961207
}
11971208

1209+
1210+
/**
1211+
* Persist the global checkpoint on all shards of the given index into disk.
1212+
* This makes sure that the persisted global checkpoint on those shards will equal to the in-memory value.
1213+
*/
1214+
private void persistGlobalCheckpoint(String index) throws Exception {
1215+
final Set<String> nodes = internalCluster().nodesInclude(index);
1216+
for (String node : nodes) {
1217+
final IndicesService indexServices = internalCluster().getInstance(IndicesService.class, node);
1218+
for (IndexService indexService : indexServices) {
1219+
for (IndexShard indexShard : indexService) {
1220+
indexShard.sync();
1221+
assertThat(indexShard.getLastSyncedGlobalCheckpoint(), equalTo(indexShard.getGlobalCheckpoint()));
1222+
}
1223+
}
1224+
}
1225+
}
11981226
}

0 commit comments

Comments
 (0)