Skip to content

Commit b8ec81e

Browse files
committed
Add tombstone document into Lucene for Noop (#30226)
This commit adds a tombstone document into Lucene for every No-op. With this change, Lucene index is expected to have a complete history of operations like Translog. In fact, this guarantee is subjected to the soft-deletes retention merge policy. Relates #29530
1 parent d2da7a3 commit b8ec81e

File tree

17 files changed

+328
-66
lines changed

17 files changed

+328
-66
lines changed

server/src/main/java/org/elasticsearch/index/engine/EngineConfig.java

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -372,9 +372,16 @@ public LongSupplier getPrimaryTermSupplier() {
372372
* A supplier supplies tombstone documents which will be used in soft-update methods.
373373
* The returned document consists only _uid, _seqno, _term and _version fields; other metadata fields are excluded.
374374
*/
375-
@FunctionalInterface
376375
public interface TombstoneDocSupplier {
377-
ParsedDocument newTombstoneDoc(String type, String id);
376+
/**
377+
* Creates a tombstone document for a delete operation.
378+
*/
379+
ParsedDocument newDeleteTombstoneDoc(String type, String id);
380+
381+
/**
382+
* Creates a tombstone document for a noop operation.
383+
*/
384+
ParsedDocument newNoopTombstoneDoc();
378385
}
379386

380387
public TombstoneDocSupplier getTombstoneDocSupplier() {

server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java

Lines changed: 26 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@
6969
import org.elasticsearch.index.mapper.ParseContext;
7070
import org.elasticsearch.index.mapper.ParsedDocument;
7171
import org.elasticsearch.index.mapper.UidFieldMapper;
72+
import org.elasticsearch.index.mapper.SeqNoFieldMapper;
7273
import org.elasticsearch.index.merge.MergeStats;
7374
import org.elasticsearch.index.merge.OnGoingMerge;
7475
import org.elasticsearch.index.seqno.LocalCheckpointTracker;
@@ -823,7 +824,9 @@ public IndexResult index(Index index) throws IOException {
823824
location = translog.add(new Translog.Index(index, indexResult));
824825
} else if (indexResult.getSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO) {
825826
// if we have document failure, record it as a no-op in the translog with the generated seq_no
826-
location = translog.add(new Translog.NoOp(indexResult.getSeqNo(), index.primaryTerm(), indexResult.getFailure().getMessage()));
827+
final NoOp noOp = new NoOp(indexResult.getSeqNo(), index.primaryTerm(), index.origin(),
828+
index.startTime(), indexResult.getFailure().getMessage());
829+
location = innerNoOp(noOp).getTranslogLocation();
827830
} else {
828831
location = null;
829832
}
@@ -1283,11 +1286,13 @@ private DeleteResult deleteInLucene(Delete delete, DeletionStrategy plan)
12831286
throws IOException {
12841287
try {
12851288
if (softDeleteEnabled) {
1286-
final ParsedDocument tombstone = engineConfig.getTombstoneDocSupplier().newTombstoneDoc(delete.type(), delete.id());
1289+
final ParsedDocument tombstone = engineConfig.getTombstoneDocSupplier().newDeleteTombstoneDoc(delete.type(), delete.id());
12871290
assert tombstone.docs().size() == 1 : "Tombstone doc should have single doc [" + tombstone + "]";
12881291
tombstone.updateSeqID(plan.seqNoOfDeletion, delete.primaryTerm());
12891292
tombstone.version().setLongValue(plan.versionOfDeletion);
12901293
final ParseContext.Document doc = tombstone.docs().get(0);
1294+
assert doc.getField(SeqNoFieldMapper.TOMBSTONE_NAME) != null :
1295+
"Delete tombstone document but _tombstone field is not set [" + doc + " ]";
12911296
doc.add(softDeleteField);
12921297
if (plan.addStaleOpToLucene || plan.currentlyDeleted) {
12931298
indexWriter.addDocument(doc);
@@ -1391,7 +1396,25 @@ private NoOpResult innerNoOp(final NoOp noOp) throws IOException {
13911396
assert noOp.seqNo() > SequenceNumbers.NO_OPS_PERFORMED;
13921397
final long seqNo = noOp.seqNo();
13931398
try {
1394-
final NoOpResult noOpResult = new NoOpResult(noOp.seqNo());
1399+
Exception failure = null;
1400+
if (softDeleteEnabled) {
1401+
try {
1402+
final ParsedDocument tombstone = engineConfig.getTombstoneDocSupplier().newNoopTombstoneDoc();
1403+
tombstone.updateSeqID(noOp.seqNo(), noOp.primaryTerm());
1404+
assert tombstone.docs().size() == 1 : "Tombstone should have a single doc [" + tombstone + "]";
1405+
final ParseContext.Document doc = tombstone.docs().get(0);
1406+
assert doc.getField(SeqNoFieldMapper.TOMBSTONE_NAME) != null
1407+
: "Noop tombstone document but _tombstone field is not set [" + doc + " ]";
1408+
doc.add(softDeleteField);
1409+
indexWriter.addDocument(doc);
1410+
} catch (Exception ex) {
1411+
if (maybeFailEngine("noop", ex)) {
1412+
throw ex;
1413+
}
1414+
failure = ex;
1415+
}
1416+
}
1417+
final NoOpResult noOpResult = failure != null ? new NoOpResult(noOp.seqNo(), failure) : new NoOpResult(noOp.seqNo());
13951418
if (noOp.origin() != Operation.Origin.LOCAL_TRANSLOG_RECOVERY) {
13961419
final Translog.Location location = translog.add(new Translog.NoOp(noOp.seqNo(), noOp.primaryTerm(), noOp.reason()));
13971420
noOpResult.setTranslogLocation(location);

server/src/main/java/org/elasticsearch/index/mapper/DocumentMapper.java

Lines changed: 19 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -124,7 +124,8 @@ public DocumentMapper build(MapperService mapperService) {
124124
private final Map<String, ObjectMapper> objectMappers;
125125

126126
private final boolean hasNestedObjects;
127-
private final MetadataFieldMapper[] tombstoneMetadataFieldMappers;
127+
private final MetadataFieldMapper[] deleteTombstoneMetadataFieldMappers;
128+
private final MetadataFieldMapper[] noopTombstoneMetadataFieldMappers;
128129

129130
public DocumentMapper(MapperService mapperService, Mapping mapping) {
130131
this.mapperService = mapperService;
@@ -133,10 +134,6 @@ public DocumentMapper(MapperService mapperService, Mapping mapping) {
133134
final IndexSettings indexSettings = mapperService.getIndexSettings();
134135
this.mapping = mapping;
135136
this.documentParser = new DocumentParser(indexSettings, mapperService.documentMapperParser(), this);
136-
final Collection<String> tombstoneFields =
137-
Arrays.asList(SeqNoFieldMapper.NAME, SeqNoFieldMapper.PRIMARY_TERM_NAME, VersionFieldMapper.NAME, IdFieldMapper.NAME);
138-
this.tombstoneMetadataFieldMappers = Stream.of(mapping.metadataMappers)
139-
.filter(field -> tombstoneFields.contains(field.name())).toArray(MetadataFieldMapper[]::new);
140137

141138
if (metadataMapper(ParentFieldMapper.class).active()) {
142139
// mark the routing field mapper as required
@@ -181,6 +178,15 @@ public DocumentMapper(MapperService mapperService, Mapping mapping) {
181178
} catch (Exception e) {
182179
throw new ElasticsearchGenerationException("failed to serialize source for type [" + type + "]", e);
183180
}
181+
182+
final Collection<String> deleteTombstoneMetadataFields = Arrays.asList(VersionFieldMapper.NAME, IdFieldMapper.NAME,
183+
TypeFieldMapper.NAME, SeqNoFieldMapper.NAME, SeqNoFieldMapper.PRIMARY_TERM_NAME, SeqNoFieldMapper.TOMBSTONE_NAME);
184+
this.deleteTombstoneMetadataFieldMappers = Stream.of(mapping.metadataMappers)
185+
.filter(field -> deleteTombstoneMetadataFields.contains(field.name())).toArray(MetadataFieldMapper[]::new);
186+
final Collection<String> noopTombstoneMetadataFields =
187+
Arrays.asList(SeqNoFieldMapper.NAME, SeqNoFieldMapper.PRIMARY_TERM_NAME, SeqNoFieldMapper.TOMBSTONE_NAME);
188+
this.noopTombstoneMetadataFieldMappers = Stream.of(mapping.metadataMappers)
189+
.filter(field -> noopTombstoneMetadataFields.contains(field.name())).toArray(MetadataFieldMapper[]::new);
184190
}
185191

186192
public Mapping mapping() {
@@ -268,9 +274,15 @@ public ParsedDocument parse(SourceToParse source) throws MapperParsingException
268274
return documentParser.parseDocument(source, mapping.metadataMappers);
269275
}
270276

271-
public ParsedDocument createTombstoneDoc(String index, String type, String id) throws MapperParsingException {
277+
public ParsedDocument createDeleteTombstoneDoc(String index, String type, String id) throws MapperParsingException {
278+
final SourceToParse emptySource = SourceToParse.source(index, type, id, new BytesArray("{}"), XContentType.JSON);
279+
return documentParser.parseDocument(emptySource, deleteTombstoneMetadataFieldMappers).toTombstone();
280+
}
281+
282+
public ParsedDocument createNoopTombstoneDoc(String index) throws MapperParsingException {
283+
final String id = ""; // _id won't be used.
272284
final SourceToParse emptySource = SourceToParse.source(index, type, id, new BytesArray("{}"), XContentType.JSON);
273-
return documentParser.parseDocument(emptySource, tombstoneMetadataFieldMappers);
285+
return documentParser.parseDocument(emptySource, noopTombstoneMetadataFieldMappers).toTombstone();
274286
}
275287

276288
/**

server/src/main/java/org/elasticsearch/index/mapper/ParsedDocument.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,17 @@ public void updateSeqID(long sequenceNumber, long primaryTerm) {
8585
this.seqID.primaryTerm.setLongValue(primaryTerm);
8686
}
8787

88+
/**
89+
* Makes the processing document as a tombstone document rather than a regular document.
90+
* Tombstone documents are stored in Lucene index to represent delete operations or Noops.
91+
*/
92+
ParsedDocument toTombstone() {
93+
assert docs().size() == 1 : "Tombstone should have a single doc [" + docs() + "]";
94+
this.seqID.tombstoneField.setLongValue(1);
95+
rootDoc().add(this.seqID.tombstoneField);
96+
return this;
97+
}
98+
8899
public String routing() {
89100
return this.routing;
90101
}

server/src/main/java/org/elasticsearch/index/mapper/SeqNoFieldMapper.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -69,26 +69,29 @@ public static class SequenceIDFields {
6969
public final Field seqNo;
7070
public final Field seqNoDocValue;
7171
public final Field primaryTerm;
72+
public final Field tombstoneField;
7273

73-
public SequenceIDFields(Field seqNo, Field seqNoDocValue, Field primaryTerm) {
74+
public SequenceIDFields(Field seqNo, Field seqNoDocValue, Field primaryTerm, Field tombstoneField) {
7475
Objects.requireNonNull(seqNo, "sequence number field cannot be null");
7576
Objects.requireNonNull(seqNoDocValue, "sequence number dv field cannot be null");
7677
Objects.requireNonNull(primaryTerm, "primary term field cannot be null");
7778
this.seqNo = seqNo;
7879
this.seqNoDocValue = seqNoDocValue;
7980
this.primaryTerm = primaryTerm;
81+
this.tombstoneField = tombstoneField;
8082
}
8183

8284
public static SequenceIDFields emptySeqID() {
8385
return new SequenceIDFields(new LongPoint(NAME, SequenceNumbers.UNASSIGNED_SEQ_NO),
8486
new NumericDocValuesField(NAME, SequenceNumbers.UNASSIGNED_SEQ_NO),
85-
new NumericDocValuesField(PRIMARY_TERM_NAME, 0));
87+
new NumericDocValuesField(PRIMARY_TERM_NAME, 0), new NumericDocValuesField(TOMBSTONE_NAME, 0));
8688
}
8789
}
8890

8991
public static final String NAME = "_seq_no";
9092
public static final String CONTENT_TYPE = "_seq_no";
9193
public static final String PRIMARY_TERM_NAME = "_primary_term";
94+
public static final String TOMBSTONE_NAME = "_tombstone";
9295

9396
public static class SeqNoDefaults {
9497
public static final String NAME = SeqNoFieldMapper.NAME;

server/src/main/java/org/elasticsearch/index/shard/IndexShard.java

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -92,12 +92,14 @@
9292
import org.elasticsearch.index.flush.FlushStats;
9393
import org.elasticsearch.index.get.GetStats;
9494
import org.elasticsearch.index.get.ShardGetService;
95+
import org.elasticsearch.index.mapper.DocumentMapper;
9596
import org.elasticsearch.index.mapper.DocumentMapperForType;
9697
import org.elasticsearch.index.mapper.IdFieldMapper;
9798
import org.elasticsearch.index.mapper.MapperParsingException;
9899
import org.elasticsearch.index.mapper.MapperService;
99100
import org.elasticsearch.index.mapper.Mapping;
100101
import org.elasticsearch.index.mapper.ParsedDocument;
102+
import org.elasticsearch.index.mapper.RootObjectMapper;
101103
import org.elasticsearch.index.mapper.SourceToParse;
102104
import org.elasticsearch.index.mapper.Uid;
103105
import org.elasticsearch.index.mapper.UidFieldMapper;
@@ -2197,8 +2199,7 @@ private EngineConfig newEngineConfig() {
21972199
IndexingMemoryController.SHARD_INACTIVE_TIME_SETTING.get(indexSettings.getSettings()),
21982200
Collections.singletonList(refreshListeners),
21992201
Collections.singletonList(new RefreshMetricUpdater(refreshMetric)),
2200-
indexSort, this::runTranslogRecovery, circuitBreakerService, replicationTracker, this::getPrimaryTerm,
2201-
this::createTombstoneDoc);
2202+
indexSort, this::runTranslogRecovery, circuitBreakerService, replicationTracker, this::getPrimaryTerm, tombstoneDocSupplier());
22022203
}
22032204

22042205
/**
@@ -2562,7 +2563,18 @@ public void afterRefresh(boolean didRefresh) throws IOException {
25622563
}
25632564
}
25642565

2565-
private ParsedDocument createTombstoneDoc(String type, String id) {
2566-
return docMapper(type).getDocumentMapper().createTombstoneDoc(shardId.getIndexName(), type, id);
2566+
private EngineConfig.TombstoneDocSupplier tombstoneDocSupplier() {
2567+
final RootObjectMapper.Builder noopRootMapper = new RootObjectMapper.Builder("__noop");
2568+
final DocumentMapper noopDocumentMapper = new DocumentMapper.Builder(noopRootMapper, mapperService).build(mapperService);
2569+
return new EngineConfig.TombstoneDocSupplier() {
2570+
@Override
2571+
public ParsedDocument newDeleteTombstoneDoc(String type, String id) {
2572+
return docMapper(type).getDocumentMapper().createDeleteTombstoneDoc(shardId.getIndexName(), type, id);
2573+
}
2574+
@Override
2575+
public ParsedDocument newNoopTombstoneDoc() {
2576+
return noopDocumentMapper.createNoopTombstoneDoc(shardId.getIndexName());
2577+
}
2578+
};
25672579
}
25682580
}

server/src/test/java/org/elasticsearch/cluster/routing/PrimaryAllocationIT.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -391,6 +391,7 @@ public void testPrimaryReplicaResyncFailed() throws Exception {
391391
assertThat(shard.getLocalCheckpoint(), equalTo(numDocs + moreDocs));
392392
}
393393
}, 30, TimeUnit.SECONDS);
394+
internalCluster().assertConsistentHistoryBetweenTranslogAndLuceneIndex();
394395
}
395396

396397
}

server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,7 @@
103103
import org.elasticsearch.index.mapper.ContentPath;
104104
import org.elasticsearch.index.mapper.IdFieldMapper;
105105
import org.elasticsearch.index.mapper.Mapper.BuilderContext;
106+
import org.elasticsearch.index.mapper.MapperService;
106107
import org.elasticsearch.index.mapper.Mapping;
107108
import org.elasticsearch.index.mapper.MetadataFieldMapper;
108109
import org.elasticsearch.index.mapper.ParseContext;
@@ -175,6 +176,7 @@
175176
import static org.hamcrest.Matchers.everyItem;
176177
import static org.hamcrest.Matchers.greaterThan;
177178
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
179+
import static org.hamcrest.Matchers.hasItem;
178180
import static org.hamcrest.Matchers.hasKey;
179181
import static org.hamcrest.Matchers.hasSize;
180182
import static org.hamcrest.Matchers.lessThanOrEqualTo;
@@ -2679,7 +2681,7 @@ public void testRecoverFromForeignTranslog() throws IOException {
26792681
new CodecService(null, logger), config.getEventListener(), IndexSearcher.getDefaultQueryCache(),
26802682
IndexSearcher.getDefaultQueryCachingPolicy(), translogConfig, TimeValue.timeValueMinutes(5),
26812683
config.getExternalRefreshListener(), config.getInternalRefreshListener(), null, config.getTranslogRecoveryRunner(),
2682-
new NoneCircuitBreakerService(), () -> SequenceNumbers.UNASSIGNED_SEQ_NO, primaryTerm::get, EngineTestCase::createTombstoneDoc);
2684+
new NoneCircuitBreakerService(), () -> SequenceNumbers.UNASSIGNED_SEQ_NO, primaryTerm::get, tombstoneDocSupplier());
26832685
try {
26842686
InternalEngine internalEngine = new InternalEngine(brokenConfig);
26852687
fail("translog belongs to a different engine");
@@ -3122,7 +3124,8 @@ public void testDoubleDeliveryReplica() throws IOException {
31223124
TopDocs topDocs = searcher.searcher().search(new MatchAllDocsQuery(), 10);
31233125
assertEquals(1, topDocs.totalHits);
31243126
}
3125-
assertThat(getOperationSeqNoInLucene(engine), contains(20L));
3127+
List<Translog.Operation> ops = readAllOperationsInLucene(engine, createMapperService("test"));
3128+
assertThat(ops.stream().map(o -> o.seqNo()).collect(Collectors.toList()), hasItem(20L));
31263129
}
31273130

31283131
public void testRetryWithAutogeneratedIdWorksAndNoDuplicateDocs() throws IOException {
@@ -3681,7 +3684,9 @@ public void testNoOps() throws IOException {
36813684
maxSeqNo,
36823685
localCheckpoint);
36833686
trimUnsafeCommits(engine.config());
3684-
noOpEngine = new InternalEngine(engine.config(), supplier) {
3687+
EngineConfig noopEngineConfig = copy(engine.config(), new SoftDeletesRetentionMergePolicy(Lucene.SOFT_DELETE_FIELD,
3688+
() -> new MatchAllDocsQuery(), engine.config().getMergePolicy()));
3689+
noOpEngine = new InternalEngine(noopEngineConfig, supplier) {
36853690
@Override
36863691
protected long doGenerateSeqNoForOperation(Operation operation) {
36873692
throw new UnsupportedOperationException();
@@ -3711,6 +3716,13 @@ protected long doGenerateSeqNoForOperation(Operation operation) {
37113716
assertThat(noOp.seqNo(), equalTo((long) (maxSeqNo + 2)));
37123717
assertThat(noOp.primaryTerm(), equalTo(primaryTerm.get()));
37133718
assertThat(noOp.reason(), equalTo(reason));
3719+
MapperService mapperService = createMapperService("test");
3720+
List<Translog.Operation> operationsFromLucene = readAllOperationsInLucene(noOpEngine, mapperService);
3721+
assertThat(operationsFromLucene, hasSize(maxSeqNo + 2 - localCheckpoint)); // fills n gap and 2 manual noop.
3722+
for (int i = 0; i < operationsFromLucene.size(); i++) {
3723+
assertThat(operationsFromLucene.get(i), equalTo(new Translog.NoOp(localCheckpoint + 1 + i, primaryTerm.get(), "")));
3724+
}
3725+
assertConsistentHistoryBetweenTranslogAndLuceneIndex(noOpEngine, mapperService);
37143726
} finally {
37153727
IOUtils.close(noOpEngine);
37163728
}
@@ -4678,7 +4690,10 @@ private void assertOperationHistoryInLucene(List<Engine.Operation> operations) t
46784690
engine.forceMerge(true);
46794691
}
46804692
}
4681-
assertThat(getOperationSeqNoInLucene(engine), containsInAnyOrder(expectedSeqNos.toArray()));
4693+
MapperService mapperService = createMapperService("test");
4694+
List<Translog.Operation> actualOps = readAllOperationsInLucene(engine, mapperService);
4695+
assertThat(actualOps.stream().map(o -> o.seqNo()).collect(Collectors.toList()), containsInAnyOrder(expectedSeqNos.toArray()));
4696+
assertConsistentHistoryBetweenTranslogAndLuceneIndex(engine, mapperService);
46824697
}
46834698
}
46844699

server/src/test/java/org/elasticsearch/index/replication/IndexLevelReplicationTests.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -245,7 +245,11 @@ public void testDocumentFailureReplication() throws Exception {
245245
try (ReplicationGroup shards = new ReplicationGroup(buildIndexMetaData(0)) {
246246
@Override
247247
protected EngineFactory getEngineFactory(ShardRouting routing) {
248-
return throwingDocumentFailureEngineFactory;
248+
if (routing.primary()){
249+
return throwingDocumentFailureEngineFactory; // Simulate exception only on the primary.
250+
}else {
251+
return InternalEngine::new;
252+
}
249253
}}) {
250254

251255
// test only primary

0 commit comments

Comments
 (0)