Skip to content

Commit dc16cb7

Browse files
authored
Disable stored fields access optimization in recovery (#69385)
We can't enable the sequential access optimization for stored fields of changes snapshots used in peer recoveries because they are accessed by multiple threads. Relates to #68961
1 parent 683fb11 commit dc16cb7

File tree

3 files changed

+62
-29
lines changed

3 files changed

+62
-29
lines changed

server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -556,7 +556,7 @@ public Translog.Snapshot readHistoryOperations(String reason, HistorySource hist
556556
MapperService mapperService, long startingSeqNo) throws IOException {
557557
if (historySource == HistorySource.INDEX) {
558558
ensureSoftDeletesEnabled();
559-
return newChangesSnapshot(reason, mapperService, Math.max(0, startingSeqNo), Long.MAX_VALUE, false);
559+
return newChangesSnapshot(reason, mapperService, Math.max(0, startingSeqNo), Long.MAX_VALUE, false, false);
560560
} else {
561561
return getTranslog().newSnapshot(startingSeqNo, Long.MAX_VALUE);
562562
}
@@ -570,8 +570,8 @@ public int estimateNumberOfHistoryOperations(String reason, HistorySource histor
570570
MapperService mapperService, long startingSeqNo) throws IOException {
571571
if (historySource == HistorySource.INDEX) {
572572
ensureSoftDeletesEnabled();
573-
try (Translog.Snapshot snapshot = newChangesSnapshot(reason, mapperService, Math.max(0, startingSeqNo),
574-
Long.MAX_VALUE, false)) {
573+
try (Translog.Snapshot snapshot =
574+
newChangesSnapshot(reason, mapperService, Math.max(0, startingSeqNo), Long.MAX_VALUE, false, false)) {
575575
return snapshot.totalOperations();
576576
}
577577
} else {
@@ -2684,16 +2684,15 @@ private void ensureSoftDeletesEnabled() {
26842684
}
26852685
}
26862686

2687-
@Override
2688-
public Translog.Snapshot newChangesSnapshot(String source, MapperService mapperService,
2689-
long fromSeqNo, long toSeqNo, boolean requiredFullRange) throws IOException {
2687+
Translog.Snapshot newChangesSnapshot(String source, MapperService mapperService, long fromSeqNo, long toSeqNo,
2688+
boolean requiredFullRange, boolean singleConsumer) throws IOException {
26902689
ensureSoftDeletesEnabled();
26912690
ensureOpen();
26922691
refreshIfNeeded(source, toSeqNo);
26932692
Searcher searcher = acquireSearcher(source, SearcherScope.INTERNAL);
26942693
try {
26952694
LuceneChangesSnapshot snapshot = new LuceneChangesSnapshot(
2696-
searcher, mapperService, LuceneChangesSnapshot.DEFAULT_BATCH_SIZE, fromSeqNo, toSeqNo, requiredFullRange);
2695+
searcher, mapperService, LuceneChangesSnapshot.DEFAULT_BATCH_SIZE, fromSeqNo, toSeqNo, requiredFullRange, singleConsumer);
26972696
searcher = null;
26982697
return snapshot;
26992698
} catch (Exception e) {
@@ -2708,6 +2707,12 @@ public Translog.Snapshot newChangesSnapshot(String source, MapperService mapperS
27082707
}
27092708
}
27102709

2710+
@Override
2711+
public Translog.Snapshot newChangesSnapshot(String source, MapperService mapperService,
2712+
long fromSeqNo, long toSeqNo, boolean requiredFullRange) throws IOException {
2713+
return newChangesSnapshot(source, mapperService, fromSeqNo, toSeqNo, requiredFullRange, true);
2714+
}
2715+
27112716
@Override
27122717
public boolean hasCompleteOperationHistory(String reason, HistorySource historySource,
27132718
MapperService mapperService, long startingSeqNo) throws IOException {

server/src/main/java/org/elasticsearch/index/engine/LuceneChangesSnapshot.java

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ final class LuceneChangesSnapshot implements Translog.Snapshot {
5353
private long lastSeenSeqNo;
5454
private int skippedOperations;
5555
private final boolean requiredFullRange;
56+
private final boolean singleConsumer;
5657

5758
private final IndexSearcher indexSearcher;
5859
private final MapperService mapperService;
@@ -65,6 +66,8 @@ final class LuceneChangesSnapshot implements Translog.Snapshot {
6566
private int storedFieldsReaderOrd = -1;
6667
private StoredFieldsReader storedFieldsReader = null;
6768

69+
private final Thread creationThread; // for assertion
70+
6871
/**
6972
* Creates a new "translog" snapshot from Lucene for reading operations whose seq# in the specified range.
7073
*
@@ -74,9 +77,10 @@ final class LuceneChangesSnapshot implements Translog.Snapshot {
7477
* @param fromSeqNo the min requesting seq# - inclusive
7578
* @param toSeqNo the maximum requesting seq# - inclusive
7679
* @param requiredFullRange if true, the snapshot will strictly check for the existence of operations between fromSeqNo and toSeqNo
80+
* @param singleConsumer true if the snapshot is accessed by a single thread that creates the snapshot
7781
*/
7882
LuceneChangesSnapshot(Engine.Searcher engineSearcher, MapperService mapperService, int searchBatchSize,
79-
long fromSeqNo, long toSeqNo, boolean requiredFullRange) throws IOException {
83+
long fromSeqNo, long toSeqNo, boolean requiredFullRange, boolean singleConsumer) throws IOException {
8084
if (fromSeqNo < 0 || toSeqNo < 0 || fromSeqNo > toSeqNo) {
8185
throw new IllegalArgumentException("Invalid range; from_seqno [" + fromSeqNo + "], to_seqno [" + toSeqNo + "]");
8286
}
@@ -91,11 +95,13 @@ final class LuceneChangesSnapshot implements Translog.Snapshot {
9195
};
9296
this.mapperService = mapperService;
9397
final long requestingSize = (toSeqNo - fromSeqNo) == Long.MAX_VALUE ? Long.MAX_VALUE : (toSeqNo - fromSeqNo + 1L);
98+
this.creationThread = Thread.currentThread();
9499
this.searchBatchSize = requestingSize < searchBatchSize ? Math.toIntExact(requestingSize) : searchBatchSize;
95100
this.fromSeqNo = fromSeqNo;
96101
this.toSeqNo = toSeqNo;
97102
this.lastSeenSeqNo = fromSeqNo - 1;
98103
this.requiredFullRange = requiredFullRange;
104+
this.singleConsumer = singleConsumer;
99105
this.indexSearcher = new IndexSearcher(Lucene.wrapAllDocsLive(engineSearcher.getDirectoryReader()));
100106
this.indexSearcher.setQueryCache(null);
101107
this.parallelArray = new ParallelArray(this.searchBatchSize);
@@ -107,21 +113,25 @@ final class LuceneChangesSnapshot implements Translog.Snapshot {
107113

108114
@Override
109115
public void close() throws IOException {
116+
assert assertAccessingThread();
110117
onClose.close();
111118
}
112119

113120
@Override
114121
public int totalOperations() {
122+
assert assertAccessingThread();
115123
return totalHits;
116124
}
117125

118126
@Override
119127
public int skippedOperations() {
128+
assert assertAccessingThread();
120129
return skippedOperations;
121130
}
122131

123132
@Override
124133
public Translog.Operation next() throws IOException {
134+
assert assertAccessingThread();
125135
Translog.Operation op = null;
126136
for (int idx = nextDocIndex(); idx != -1; idx = nextDocIndex()) {
127137
op = readDocAsOp(idx);
@@ -138,6 +148,12 @@ public Translog.Operation next() throws IOException {
138148
return op;
139149
}
140150

151+
private boolean assertAccessingThread() {
152+
assert singleConsumer == false || creationThread == Thread.currentThread() :
153+
"created by [" + creationThread + "] != current thread [" + Thread.currentThread() + "]";
154+
return true;
155+
}
156+
141157
private void rangeCheck(Translog.Operation op) {
142158
if (op == null) {
143159
if (lastSeenSeqNo < toSeqNo) {
@@ -174,7 +190,7 @@ private void fillParallelArray(ScoreDoc[] scoreDocs, ParallelArray parallelArray
174190
for (int i = 0; i < scoreDocs.length; i++) {
175191
scoreDocs[i].shardIndex = i;
176192
}
177-
parallelArray.useSequentialStoredFieldsReader = scoreDocs.length >= 10 && hasSequentialAccess(scoreDocs);
193+
parallelArray.useSequentialStoredFieldsReader = singleConsumer && scoreDocs.length >= 10 && hasSequentialAccess(scoreDocs);
178194
if (parallelArray.useSequentialStoredFieldsReader == false) {
179195
storedFieldsReaderOrd = -1;
180196
storedFieldsReader = null;
@@ -262,6 +278,7 @@ private Translog.Operation readDocAsOp(int docIndex) throws IOException {
262278
}
263279
}
264280
if (storedFieldsReader != null) {
281+
assert singleConsumer : "Sequential access optimization must not be enabled for multiple consumers";
265282
assert parallelArray.useSequentialStoredFieldsReader;
266283
assert storedFieldsReaderOrd == leaf.ord : storedFieldsReaderOrd + " != " + leaf.ord;
267284
storedFieldsReader.visitDocument(segmentDocID, fields);

server/src/test/java/org/elasticsearch/index/engine/LuceneChangesSnapshotTests.java

Lines changed: 31 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -51,12 +51,12 @@ public void testBasics() throws Exception {
5151
long fromSeqNo = randomNonNegativeLong();
5252
long toSeqNo = randomLongBetween(fromSeqNo, Long.MAX_VALUE);
5353
// Empty engine
54-
try (Translog.Snapshot snapshot = engine.newChangesSnapshot("test", mapperService, fromSeqNo, toSeqNo, true)) {
54+
try (Translog.Snapshot snapshot = engine.newChangesSnapshot("test", mapperService, fromSeqNo, toSeqNo, true, randomBoolean())) {
5555
IllegalStateException error = expectThrows(IllegalStateException.class, () -> drainAll(snapshot));
5656
assertThat(error.getMessage(),
5757
containsString("Not all operations between from_seqno [" + fromSeqNo + "] and to_seqno [" + toSeqNo + "] found"));
5858
}
59-
try (Translog.Snapshot snapshot = engine.newChangesSnapshot("test", mapperService, fromSeqNo, toSeqNo, false)) {
59+
try (Translog.Snapshot snapshot = engine.newChangesSnapshot("test", mapperService, fromSeqNo, toSeqNo, false, randomBoolean())) {
6060
assertThat(snapshot, SnapshotMatchers.size(0));
6161
}
6262
int numOps = between(1, 100);
@@ -83,17 +83,17 @@ public void testBasics() throws Exception {
8383
toSeqNo = randomLongBetween(fromSeqNo, numOps * 2);
8484

8585
Engine.Searcher searcher = engine.acquireSearcher("test", Engine.SearcherScope.INTERNAL);
86-
try (Translog.Snapshot snapshot = new LuceneChangesSnapshot(
87-
searcher, mapperService, between(1, LuceneChangesSnapshot.DEFAULT_BATCH_SIZE), fromSeqNo, toSeqNo, false)) {
86+
try (Translog.Snapshot snapshot = new LuceneChangesSnapshot(searcher, mapperService,
87+
between(1, LuceneChangesSnapshot.DEFAULT_BATCH_SIZE), fromSeqNo, toSeqNo, false, randomBoolean())) {
8888
searcher = null;
8989
assertThat(snapshot, SnapshotMatchers.size(0));
9090
} finally {
9191
IOUtils.close(searcher);
9292
}
9393

9494
searcher = engine.acquireSearcher("test", Engine.SearcherScope.INTERNAL);
95-
try (Translog.Snapshot snapshot = new LuceneChangesSnapshot(
96-
searcher, mapperService, between(1, LuceneChangesSnapshot.DEFAULT_BATCH_SIZE), fromSeqNo, toSeqNo, true)) {
95+
try (Translog.Snapshot snapshot = new LuceneChangesSnapshot(searcher, mapperService,
96+
between(1, LuceneChangesSnapshot.DEFAULT_BATCH_SIZE), fromSeqNo, toSeqNo, true, randomBoolean())) {
9797
searcher = null;
9898
IllegalStateException error = expectThrows(IllegalStateException.class, () -> drainAll(snapshot));
9999
assertThat(error.getMessage(),
@@ -105,16 +105,16 @@ searcher, mapperService, between(1, LuceneChangesSnapshot.DEFAULT_BATCH_SIZE), f
105105
fromSeqNo = randomLongBetween(0, refreshedSeqNo);
106106
toSeqNo = randomLongBetween(refreshedSeqNo + 1, numOps * 2);
107107
Engine.Searcher searcher = engine.acquireSearcher("test", Engine.SearcherScope.INTERNAL);
108-
try (Translog.Snapshot snapshot = new LuceneChangesSnapshot(
109-
searcher, mapperService, between(1, LuceneChangesSnapshot.DEFAULT_BATCH_SIZE), fromSeqNo, toSeqNo, false)) {
108+
try (Translog.Snapshot snapshot = new LuceneChangesSnapshot(searcher, mapperService,
109+
between(1, LuceneChangesSnapshot.DEFAULT_BATCH_SIZE), fromSeqNo, toSeqNo, false, randomBoolean())) {
110110
searcher = null;
111111
assertThat(snapshot, SnapshotMatchers.containsSeqNoRange(fromSeqNo, refreshedSeqNo));
112112
} finally {
113113
IOUtils.close(searcher);
114114
}
115115
searcher = engine.acquireSearcher("test", Engine.SearcherScope.INTERNAL);
116-
try (Translog.Snapshot snapshot = new LuceneChangesSnapshot(
117-
searcher, mapperService, between(1, LuceneChangesSnapshot.DEFAULT_BATCH_SIZE), fromSeqNo, toSeqNo, true)) {
116+
try (Translog.Snapshot snapshot = new LuceneChangesSnapshot(searcher, mapperService,
117+
between(1, LuceneChangesSnapshot.DEFAULT_BATCH_SIZE), fromSeqNo, toSeqNo, true, randomBoolean())) {
118118
searcher = null;
119119
IllegalStateException error = expectThrows(IllegalStateException.class, () -> drainAll(snapshot));
120120
assertThat(error.getMessage(),
@@ -124,8 +124,8 @@ searcher, mapperService, between(1, LuceneChangesSnapshot.DEFAULT_BATCH_SIZE), f
124124
}
125125
toSeqNo = randomLongBetween(fromSeqNo, refreshedSeqNo);
126126
searcher = engine.acquireSearcher("test", Engine.SearcherScope.INTERNAL);
127-
try (Translog.Snapshot snapshot = new LuceneChangesSnapshot(
128-
searcher, mapperService, between(1, LuceneChangesSnapshot.DEFAULT_BATCH_SIZE), fromSeqNo, toSeqNo, true)) {
127+
try (Translog.Snapshot snapshot = new LuceneChangesSnapshot(searcher, mapperService,
128+
between(1, LuceneChangesSnapshot.DEFAULT_BATCH_SIZE), fromSeqNo, toSeqNo, true, randomBoolean())) {
129129
searcher = null;
130130
assertThat(snapshot, SnapshotMatchers.containsSeqNoRange(fromSeqNo, toSeqNo));
131131
} finally {
@@ -135,7 +135,8 @@ searcher, mapperService, between(1, LuceneChangesSnapshot.DEFAULT_BATCH_SIZE), f
135135
// Get snapshot via engine will auto refresh
136136
fromSeqNo = randomLongBetween(0, numOps - 1);
137137
toSeqNo = randomLongBetween(fromSeqNo, numOps - 1);
138-
try (Translog.Snapshot snapshot = engine.newChangesSnapshot("test", mapperService, fromSeqNo, toSeqNo, randomBoolean())) {
138+
try (Translog.Snapshot snapshot =
139+
engine.newChangesSnapshot("test", mapperService, fromSeqNo, toSeqNo, randomBoolean(), randomBoolean())) {
139140
assertThat(snapshot, SnapshotMatchers.containsSeqNoRange(fromSeqNo, toSeqNo));
140141
}
141142
}
@@ -166,7 +167,8 @@ public void testSkipNonRootOfNestedDocuments() throws Exception {
166167
long maxSeqNo = engine.getLocalCheckpointTracker().getMaxSeqNo();
167168
engine.refresh("test");
168169
Engine.Searcher searcher = engine.acquireSearcher("test", Engine.SearcherScope.INTERNAL);
169-
try (Translog.Snapshot snapshot = new LuceneChangesSnapshot(searcher, mapperService, between(1, 100), 0, maxSeqNo, false)) {
170+
try (Translog.Snapshot snapshot =
171+
new LuceneChangesSnapshot(searcher, mapperService, between(1, 100), 0, maxSeqNo, false, randomBoolean())) {
170172
assertThat(snapshot.totalOperations(), equalTo(seqNoToTerm.size()));
171173
Translog.Operation op;
172174
while ((op = snapshot.next()) != null) {
@@ -219,7 +221,7 @@ public void testUpdateAndReadChangesConcurrently() throws Exception {
219221

220222
public void testAccessStoredFieldsSequentially() throws Exception {
221223
try (Store store = createStore();
222-
Engine engine = createEngine(defaultSettings, store, createTempDir(), NoMergePolicy.INSTANCE)) {
224+
InternalEngine engine = createEngine(defaultSettings, store, createTempDir(), NoMergePolicy.INSTANCE)) {
223225
int smallBatch = between(5, 9);
224226
long seqNo = 0;
225227
for (int i = 0; i < smallBatch; i++) {
@@ -236,28 +238,36 @@ public void testAccessStoredFieldsSequentially() throws Exception {
236238
// disable optimization for a small batch
237239
Translog.Operation op;
238240
try (LuceneChangesSnapshot snapshot = (LuceneChangesSnapshot) engine.newChangesSnapshot(
239-
"test", createMapperService("test"), 0L, between(1, smallBatch), false)) {
241+
"test", createMapperService("test"), 0L, between(1, smallBatch), false, randomBoolean())) {
240242
while ((op = snapshot.next()) != null) {
241243
assertFalse(op.toString(), snapshot.useSequentialStoredFieldsReader());
242244
}
243245
assertFalse(snapshot.useSequentialStoredFieldsReader());
244246
}
245247
// disable optimization for non-sequential accesses
246248
try (LuceneChangesSnapshot snapshot = (LuceneChangesSnapshot) engine.newChangesSnapshot(
247-
"test", createMapperService("test"), between(1, 3), between(20, 100), false)) {
249+
"test", createMapperService("test"), between(1, 3), between(20, 100), false, randomBoolean())) {
248250
while ((op = snapshot.next()) != null) {
249251
assertFalse(op.toString(), snapshot.useSequentialStoredFieldsReader());
250252
}
251253
assertFalse(snapshot.useSequentialStoredFieldsReader());
252254
}
253255
// enable optimization for sequential access of 10+ docs
254256
try (LuceneChangesSnapshot snapshot = (LuceneChangesSnapshot) engine.newChangesSnapshot(
255-
"test", createMapperService("test"), 11, between(21, 100), false)) {
257+
"test", createMapperService("test"), 11, between(21, 100), false, true)) {
256258
while ((op = snapshot.next()) != null) {
257259
assertTrue(op.toString(), snapshot.useSequentialStoredFieldsReader());
258260
}
259261
assertTrue(snapshot.useSequentialStoredFieldsReader());
260262
}
263+
// disable optimization if snapshot is accessed by multiple consumers
264+
try (LuceneChangesSnapshot snapshot = (LuceneChangesSnapshot) engine.newChangesSnapshot(
265+
"test", createMapperService("test"), 11, between(21, 100), false, false)) {
266+
while ((op = snapshot.next()) != null) {
267+
assertFalse(op.toString(), snapshot.useSequentialStoredFieldsReader());
268+
}
269+
assertFalse(snapshot.useSequentialStoredFieldsReader());
270+
}
261271
}
262272
}
263273

@@ -284,7 +294,8 @@ void pullOperations(InternalEngine follower) throws IOException {
284294
long fromSeqNo = followerCheckpoint + 1;
285295
long batchSize = randomLongBetween(0, 100);
286296
long toSeqNo = Math.min(fromSeqNo + batchSize, leaderCheckpoint);
287-
try (Translog.Snapshot snapshot = leader.newChangesSnapshot("test", mapperService, fromSeqNo, toSeqNo, true)) {
297+
try (Translog.Snapshot snapshot =
298+
leader.newChangesSnapshot("test", mapperService, fromSeqNo, toSeqNo, true, randomBoolean())) {
288299
translogHandler.run(follower, snapshot);
289300
}
290301
}
@@ -330,7 +341,7 @@ private List<Translog.Operation> drainAll(Translog.Snapshot snapshot) throws IOE
330341
public void testOverFlow() throws Exception {
331342
long fromSeqNo = randomLongBetween(0, 5);
332343
long toSeqNo = randomLongBetween(Long.MAX_VALUE - 5, Long.MAX_VALUE);
333-
try (Translog.Snapshot snapshot = engine.newChangesSnapshot("test", mapperService, fromSeqNo, toSeqNo, true)) {
344+
try (Translog.Snapshot snapshot = engine.newChangesSnapshot("test", mapperService, fromSeqNo, toSeqNo, true, randomBoolean())) {
334345
IllegalStateException error = expectThrows(IllegalStateException.class, () -> drainAll(snapshot));
335346
assertThat(error.getMessage(),
336347
containsString("Not all operations between from_seqno [" + fromSeqNo + "] and to_seqno [" + toSeqNo + "] found"));

0 commit comments

Comments
 (0)