Skip to content

Commit 8e2f2be

Browse files
authored
Track Lucene operations in engine explicitly (#29357)
Today we reply on `IndexWriter#hasDeletions` to check if an index contains "update" operations. However, this check considers both deletes and updates. This commit replaces that check by tracking and checking Lucene operations explicitly. This would provide us stronger assertions.
1 parent 7c6d5cb commit 8e2f2be

File tree

2 files changed

+68
-28
lines changed

2 files changed

+68
-28
lines changed

server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java

+31-8
Original file line numberDiff line numberDiff line change
@@ -136,6 +136,11 @@ public class InternalEngine extends Engine {
136136
private final AtomicLong maxSeqNoOfNonAppendOnlyOperations = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED);
137137
private final CounterMetric numVersionLookups = new CounterMetric();
138138
private final CounterMetric numIndexVersionsLookups = new CounterMetric();
139+
// Lucene operations since this engine was opened - not include operations from existing segments.
140+
private final CounterMetric numDocDeletes = new CounterMetric();
141+
private final CounterMetric numDocAppends = new CounterMetric();
142+
private final CounterMetric numDocUpdates = new CounterMetric();
143+
139144
/**
140145
* How many bytes we are currently moving to disk, via either IndexWriter.flush or refresh. IndexingMemoryController polls this
141146
* across all shards to decide if throttling is necessary because moving bytes to disk is falling behind vs incoming documents
@@ -907,11 +912,11 @@ private IndexResult indexIntoLucene(Index index, IndexingStrategy plan)
907912
index.parsedDoc().version().setLongValue(plan.versionForIndexing);
908913
try {
909914
if (plan.useLuceneUpdateDocument) {
910-
update(index.uid(), index.docs(), indexWriter);
915+
updateDocs(index.uid(), index.docs(), indexWriter);
911916
} else {
912917
// document does not exists, we can optimize for create, but double check if assertions are running
913918
assert assertDocDoesNotExist(index, canOptimizeAddDocument(index) == false);
914-
index(index.docs(), indexWriter);
919+
addDocs(index.docs(), indexWriter);
915920
}
916921
return new IndexResult(plan.versionForIndexing, plan.seqNoForIndexing, plan.currentNotFoundOrDeleted);
917922
} catch (Exception ex) {
@@ -968,12 +973,13 @@ long getMaxSeqNoOfNonAppendOnlyOperations() {
968973
return maxSeqNoOfNonAppendOnlyOperations.get();
969974
}
970975

971-
private static void index(final List<ParseContext.Document> docs, final IndexWriter indexWriter) throws IOException {
976+
private void addDocs(final List<ParseContext.Document> docs, final IndexWriter indexWriter) throws IOException {
972977
if (docs.size() > 1) {
973978
indexWriter.addDocuments(docs);
974979
} else {
975980
indexWriter.addDocument(docs.get(0));
976981
}
982+
numDocAppends.inc(docs.size());
977983
}
978984

979985
private static final class IndexingStrategy {
@@ -1054,12 +1060,13 @@ private boolean assertDocDoesNotExist(final Index index, final boolean allowDele
10541060
return true;
10551061
}
10561062

1057-
private static void update(final Term uid, final List<ParseContext.Document> docs, final IndexWriter indexWriter) throws IOException {
1063+
private void updateDocs(final Term uid, final List<ParseContext.Document> docs, final IndexWriter indexWriter) throws IOException {
10581064
if (docs.size() > 1) {
10591065
indexWriter.updateDocuments(uid, docs);
10601066
} else {
10611067
indexWriter.updateDocument(uid, docs.get(0));
10621068
}
1069+
numDocUpdates.inc(docs.size());
10631070
}
10641071

10651072
@Override
@@ -1188,6 +1195,7 @@ private DeleteResult deleteInLucene(Delete delete, DeletionStrategy plan)
11881195
// any exception that comes from this is a either an ACE or a fatal exception there
11891196
// can't be any document failures coming from this
11901197
indexWriter.deleteDocuments(delete.uid());
1198+
numDocDeletes.inc();
11911199
}
11921200
versionMap.putUnderLock(delete.uid().bytes(),
11931201
new DeleteVersionValue(plan.versionOfDeletion, plan.seqNoOfDeletion, delete.primaryTerm(),
@@ -2205,13 +2213,28 @@ boolean isSafeAccessRequired() {
22052213
return versionMap.isSafeAccessRequired();
22062214
}
22072215

2216+
/**
2217+
* Returns the number of documents have been deleted since this engine was opened.
2218+
* This count does not include the deletions from the existing segments before opening engine.
2219+
*/
2220+
long getNumDocDeletes() {
2221+
return numDocDeletes.count();
2222+
}
2223+
2224+
/**
2225+
* Returns the number of documents have been appended since this engine was opened.
2226+
* This count does not include the appends from the existing segments before opening engine.
2227+
*/
2228+
long getNumDocAppends() {
2229+
return numDocAppends.count();
2230+
}
22082231

22092232
/**
2210-
* Returns <code>true</code> iff the index writer has any deletions either buffered in memory or
2211-
* in the index.
2233+
* Returns the number of documents have been updated since this engine was opened.
2234+
* This count does not include the updates from the existing segments before opening engine.
22122235
*/
2213-
boolean indexWriterHasDeletions() {
2214-
return indexWriter.hasDeletions();
2236+
long getNumDocUpdates() {
2237+
return numDocUpdates.count();
22152238
}
22162239

22172240
@Override

server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java

+37-20
Original file line numberDiff line numberDiff line change
@@ -2939,21 +2939,21 @@ public void testDoubleDeliveryPrimary() throws IOException {
29392939
Engine.Index retry = appendOnlyPrimary(doc, true, 1);
29402940
if (randomBoolean()) {
29412941
Engine.IndexResult indexResult = engine.index(operation);
2942-
assertFalse(engine.indexWriterHasDeletions());
2942+
assertLuceneOperations(engine, 1, 0, 0);
29432943
assertEquals(0, engine.getNumVersionLookups());
29442944
assertNotNull(indexResult.getTranslogLocation());
29452945
Engine.IndexResult retryResult = engine.index(retry);
2946-
assertTrue(engine.indexWriterHasDeletions());
2946+
assertLuceneOperations(engine, 1, 1, 0);
29472947
assertEquals(0, engine.getNumVersionLookups());
29482948
assertNotNull(retryResult.getTranslogLocation());
29492949
assertTrue(retryResult.getTranslogLocation().compareTo(indexResult.getTranslogLocation()) > 0);
29502950
} else {
29512951
Engine.IndexResult retryResult = engine.index(retry);
2952-
assertTrue(engine.indexWriterHasDeletions());
2952+
assertLuceneOperations(engine, 0, 1, 0);
29532953
assertEquals(0, engine.getNumVersionLookups());
29542954
assertNotNull(retryResult.getTranslogLocation());
29552955
Engine.IndexResult indexResult = engine.index(operation);
2956-
assertTrue(engine.indexWriterHasDeletions());
2956+
assertLuceneOperations(engine, 0, 2, 0);
29572957
assertEquals(0, engine.getNumVersionLookups());
29582958
assertNotNull(retryResult.getTranslogLocation());
29592959
assertTrue(retryResult.getTranslogLocation().compareTo(indexResult.getTranslogLocation()) < 0);
@@ -3000,23 +3000,23 @@ public void testDoubleDeliveryReplicaAppendingAndDeleteOnly() throws IOException
30003000
final boolean belowLckp = operation.seqNo() == 0 && retry.seqNo() == 0;
30013001
if (randomBoolean()) {
30023002
Engine.IndexResult indexResult = engine.index(operation);
3003-
assertFalse(engine.indexWriterHasDeletions());
3003+
assertLuceneOperations(engine, 1, 0, 0);
30043004
assertEquals(0, engine.getNumVersionLookups());
30053005
assertNotNull(indexResult.getTranslogLocation());
30063006
engine.delete(delete);
30073007
assertEquals(1, engine.getNumVersionLookups());
3008-
assertTrue(engine.indexWriterHasDeletions());
3008+
assertLuceneOperations(engine, 1, 0, 1);
30093009
Engine.IndexResult retryResult = engine.index(retry);
30103010
assertEquals(belowLckp ? 1 : 2, engine.getNumVersionLookups());
30113011
assertNotNull(retryResult.getTranslogLocation());
30123012
assertTrue(retryResult.getTranslogLocation().compareTo(indexResult.getTranslogLocation()) > 0);
30133013
} else {
30143014
Engine.IndexResult retryResult = engine.index(retry);
3015-
assertFalse(engine.indexWriterHasDeletions());
3015+
assertLuceneOperations(engine, 1, 0, 0);
30163016
assertEquals(1, engine.getNumVersionLookups());
30173017
assertNotNull(retryResult.getTranslogLocation());
30183018
engine.delete(delete);
3019-
assertTrue(engine.indexWriterHasDeletions());
3019+
assertLuceneOperations(engine, 1, 0, 1);
30203020
assertEquals(2, engine.getNumVersionLookups());
30213021
Engine.IndexResult indexResult = engine.index(operation);
30223022
assertEquals(belowLckp ? 2 : 3, engine.getNumVersionLookups());
@@ -3041,21 +3041,29 @@ public void testDoubleDeliveryReplicaAppendingOnly() throws IOException {
30413041
final boolean belowLckp = operation.seqNo() == 0 && retry.seqNo() == 0;
30423042
if (randomBoolean()) {
30433043
Engine.IndexResult indexResult = engine.index(operation);
3044-
assertFalse(engine.indexWriterHasDeletions());
3044+
assertLuceneOperations(engine, 1, 0, 0);
30453045
assertEquals(0, engine.getNumVersionLookups());
30463046
assertNotNull(indexResult.getTranslogLocation());
30473047
Engine.IndexResult retryResult = engine.index(retry);
3048-
assertEquals(retry.seqNo() > operation.seqNo(), engine.indexWriterHasDeletions());
3048+
if (retry.seqNo() > operation.seqNo()) {
3049+
assertLuceneOperations(engine, 1, 1, 0);
3050+
} else {
3051+
assertLuceneOperations(engine, 1, 0, 0);
3052+
}
30493053
assertEquals(belowLckp ? 0 : 1, engine.getNumVersionLookups());
30503054
assertNotNull(retryResult.getTranslogLocation());
30513055
assertTrue(retryResult.getTranslogLocation().compareTo(indexResult.getTranslogLocation()) > 0);
30523056
} else {
30533057
Engine.IndexResult retryResult = engine.index(retry);
3054-
assertFalse(engine.indexWriterHasDeletions());
3058+
assertLuceneOperations(engine, 1, 0, 0);
30553059
assertEquals(1, engine.getNumVersionLookups());
30563060
assertNotNull(retryResult.getTranslogLocation());
30573061
Engine.IndexResult indexResult = engine.index(operation);
3058-
assertEquals(operation.seqNo() > retry.seqNo(), engine.indexWriterHasDeletions());
3062+
if (operation.seqNo() > retry.seqNo()) {
3063+
assertLuceneOperations(engine, 1, 1, 0);
3064+
} else {
3065+
assertLuceneOperations(engine, 1, 0, 0);
3066+
}
30593067
assertEquals(belowLckp ? 1 : 2, engine.getNumVersionLookups());
30603068
assertNotNull(retryResult.getTranslogLocation());
30613069
assertTrue(retryResult.getTranslogLocation().compareTo(indexResult.getTranslogLocation()) < 0);
@@ -3096,27 +3104,27 @@ public void testDoubleDeliveryReplica() throws IOException {
30963104
Engine.Index duplicate = replicaIndexForDoc(doc, 1, 20, true);
30973105
if (randomBoolean()) {
30983106
Engine.IndexResult indexResult = engine.index(operation);
3099-
assertFalse(engine.indexWriterHasDeletions());
3107+
assertLuceneOperations(engine, 1, 0, 0);
31003108
assertEquals(1, engine.getNumVersionLookups());
31013109
assertNotNull(indexResult.getTranslogLocation());
31023110
if (randomBoolean()) {
31033111
engine.refresh("test");
31043112
}
31053113
Engine.IndexResult retryResult = engine.index(duplicate);
3106-
assertFalse(engine.indexWriterHasDeletions());
3114+
assertLuceneOperations(engine, 1, 0, 0);
31073115
assertEquals(2, engine.getNumVersionLookups());
31083116
assertNotNull(retryResult.getTranslogLocation());
31093117
assertTrue(retryResult.getTranslogLocation().compareTo(indexResult.getTranslogLocation()) > 0);
31103118
} else {
31113119
Engine.IndexResult retryResult = engine.index(duplicate);
3112-
assertFalse(engine.indexWriterHasDeletions());
3120+
assertLuceneOperations(engine, 1, 0, 0);
31133121
assertEquals(1, engine.getNumVersionLookups());
31143122
assertNotNull(retryResult.getTranslogLocation());
31153123
if (randomBoolean()) {
31163124
engine.refresh("test");
31173125
}
31183126
Engine.IndexResult indexResult = engine.index(operation);
3119-
assertFalse(engine.indexWriterHasDeletions());
3127+
assertLuceneOperations(engine, 1, 0, 0);
31203128
assertEquals(2, engine.getNumVersionLookups());
31213129
assertNotNull(retryResult.getTranslogLocation());
31223130
assertTrue(retryResult.getTranslogLocation().compareTo(indexResult.getTranslogLocation()) < 0);
@@ -3278,10 +3286,11 @@ public void testRetryConcurrently() throws InterruptedException, IOException {
32783286
}
32793287
if (primary) {
32803288
// primaries rely on lucene dedup and may index the same document twice
3281-
assertTrue(engine.indexWriterHasDeletions());
3289+
assertThat(engine.getNumDocUpdates(), greaterThanOrEqualTo((long) numDocs));
3290+
assertThat(engine.getNumDocAppends() + engine.getNumDocUpdates(), equalTo(numDocs * 2L));
32823291
} else {
32833292
// replicas rely on seq# based dedup and in this setup (same seq#) should never rely on lucene
3284-
assertFalse(engine.indexWriterHasDeletions());
3293+
assertLuceneOperations(engine, numDocs, 0, 0);
32853294
}
32863295
}
32873296

@@ -3377,8 +3386,7 @@ public void run() {
33773386
}
33783387
assertEquals(0, engine.getNumVersionLookups());
33793388
assertEquals(0, engine.getNumIndexVersionsLookups());
3380-
assertFalse(engine.indexWriterHasDeletions());
3381-
3389+
assertLuceneOperations(engine, numDocs, 0, 0);
33823390
}
33833391

33843392
public static long getNumVersionLookups(InternalEngine engine) { // for other tests to access this
@@ -4659,4 +4667,13 @@ private static void trimUnsafeCommits(EngineConfig config) throws IOException {
46594667
store.trimUnsafeCommits(globalCheckpoint, minRetainedTranslogGen, config.getIndexSettings().getIndexVersionCreated());
46604668
}
46614669

4670+
void assertLuceneOperations(InternalEngine engine, long expectedAppends, long expectedUpdates, long expectedDeletes) {
4671+
String message = "Lucene operations mismatched;" +
4672+
" appends [actual:" + engine.getNumDocAppends() + ", expected:" + expectedAppends + "]," +
4673+
" updates [actual:" + engine.getNumDocUpdates() + ", expected:" + expectedUpdates + "]," +
4674+
" deletes [actual:" + engine.getNumDocDeletes() + ", expected:" + expectedDeletes + "]";
4675+
assertThat(message, engine.getNumDocAppends(), equalTo(expectedAppends));
4676+
assertThat(message, engine.getNumDocUpdates(), equalTo(expectedUpdates));
4677+
assertThat(message, engine.getNumDocDeletes(), equalTo(expectedDeletes));
4678+
}
46624679
}

0 commit comments

Comments
 (0)