Skip to content

Commit 0d6eec8

Browse files
authored
Fix realtime get of nested fields with synthetic source (#119575) (#120247)
Today, for get-from-translog operations, we only need to reindex the root document into an in-memory Lucene, as the _source is stored in the root document and is sufficient. However, synthesizing the source for nested fields requires both the root document and its child documents. This causes realtime-get operations (as well as update and update-by-query operations) to miss nested fields. Another issue is that the translog operation is reindexed lazily during get-from-translog operations. As a result, two realtime-get operations can return slightly different outputs: one reading from the translog and the other from Lucene. This change resolves both issues. However, addressing the second issue can degrade the performance of realtime-get and update operations. If slight inconsistencies are acceptable, the translog operation should be reindexed lazily instead. Closes #119553
1 parent 33e69fb commit 0d6eec8

File tree

6 files changed

+209
-26
lines changed

6 files changed

+209
-26
lines changed

docs/changelog/119575.yaml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
pr: 119575
2+
summary: Fix realtime get of nested fields with synthetic source
3+
area: Mapping
4+
type: bug
5+
issues:
6+
- 119553

muted-tests.yml

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -410,9 +410,6 @@ tests:
410410
- class: org.elasticsearch.xpack.restart.FullClusterRestartIT
411411
method: testWatcherWithApiKey {cluster=UPGRADED}
412412
issue: https://github.com/elastic/elasticsearch/issues/119396
413-
- class: org.elasticsearch.index.engine.LuceneSyntheticSourceChangesSnapshotTests
414-
method: testSkipNonRootOfNestedDocuments
415-
issue: https://github.com/elastic/elasticsearch/issues/119553
416413
- class: org.elasticsearch.upgrades.SearchStatesIT
417414
method: testCanMatch
418415
issue: https://github.com/elastic/elasticsearch/issues/118718

server/src/internalClusterTest/java/org/elasticsearch/get/GetActionIT.java

Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import org.elasticsearch.action.get.MultiGetResponse;
2525
import org.elasticsearch.action.index.IndexRequest;
2626
import org.elasticsearch.action.support.DefaultShardOperationFailedException;
27+
import org.elasticsearch.action.support.WriteRequest;
2728
import org.elasticsearch.action.support.broadcast.BroadcastResponse;
2829
import org.elasticsearch.common.Randomness;
2930
import org.elasticsearch.common.Strings;
@@ -33,8 +34,10 @@
3334
import org.elasticsearch.core.CheckedFunction;
3435
import org.elasticsearch.core.Nullable;
3536
import org.elasticsearch.index.IndexModule;
37+
import org.elasticsearch.index.IndexSettings;
3638
import org.elasticsearch.index.engine.EngineTestCase;
3739
import org.elasticsearch.index.engine.VersionConflictEngineException;
40+
import org.elasticsearch.index.mapper.SourceFieldMapper;
3841
import org.elasticsearch.plugins.Plugin;
3942
import org.elasticsearch.rest.RestStatus;
4043
import org.elasticsearch.test.ESIntegTestCase;
@@ -932,6 +935,102 @@ public void testGetRemoteIndex() {
932935
);
933936
}
934937

938+
public void testRealTimeGetNestedFields() {
939+
String index = "test";
940+
SourceFieldMapper.Mode sourceMode = randomFrom(SourceFieldMapper.Mode.values());
941+
assertAcked(
942+
prepareCreate(index).setMapping("title", "type=keyword", "author", "type=nested")
943+
.setSettings(
944+
indexSettings(1, 0).put("index.refresh_interval", -1)
945+
.put(IndexSettings.INDEX_MAPPER_SOURCE_MODE_SETTING.getKey(), sourceMode)
946+
)
947+
);
948+
ensureGreen();
949+
String source0 = """
950+
{
951+
"title": "t0",
952+
"author": [
953+
{
954+
"name": "a0"
955+
}
956+
]
957+
}
958+
""";
959+
prepareIndex(index).setRefreshPolicy(WriteRequest.RefreshPolicy.NONE).setId("0").setSource(source0, XContentType.JSON).get();
960+
// start tracking translog locations
961+
assertTrue(client().prepareGet(index, "0").setRealtime(true).get().isExists());
962+
String source1 = """
963+
{
964+
"title": ["t1"],
965+
"author": [
966+
{
967+
"name": "a1"
968+
}
969+
]
970+
}
971+
""";
972+
prepareIndex(index).setRefreshPolicy(WriteRequest.RefreshPolicy.NONE).setId("1").setSource(source1, XContentType.JSON).get();
973+
String source2 = """
974+
{
975+
"title": ["t1", "t2"],
976+
"author": [
977+
{
978+
"name": "a1"
979+
},
980+
{
981+
"name": "a2"
982+
}
983+
]
984+
}
985+
""";
986+
prepareIndex(index).setRefreshPolicy(WriteRequest.RefreshPolicy.NONE).setId("2").setSource(source2, XContentType.JSON).get();
987+
String source3 = """
988+
{
989+
"title": ["t1", "t3", "t2"]
990+
}
991+
""";
992+
prepareIndex(index).setRefreshPolicy(WriteRequest.RefreshPolicy.NONE).setId("3").setSource(source3, XContentType.JSON).get();
993+
GetResponse translog1 = client().prepareGet(index, "1").setRealtime(true).get();
994+
GetResponse translog2 = client().prepareGet(index, "2").setRealtime(true).get();
995+
GetResponse translog3 = client().prepareGet(index, "3").setRealtime(true).get();
996+
assertTrue(translog1.isExists());
997+
assertTrue(translog2.isExists());
998+
assertTrue(translog3.isExists());
999+
switch (sourceMode) {
1000+
case STORED -> {
1001+
assertThat(translog1.getSourceAsBytesRef().utf8ToString(), equalTo(source1));
1002+
assertThat(translog2.getSourceAsBytesRef().utf8ToString(), equalTo(source2));
1003+
assertThat(translog3.getSourceAsBytesRef().utf8ToString(), equalTo(source3));
1004+
}
1005+
case SYNTHETIC -> {
1006+
assertThat(translog1.getSourceAsBytesRef().utf8ToString(), equalTo("""
1007+
{"author":{"name":"a1"},"title":"t1"}"""));
1008+
assertThat(translog2.getSourceAsBytesRef().utf8ToString(), equalTo("""
1009+
{"author":[{"name":"a1"},{"name":"a2"}],"title":["t1","t2"]}"""));
1010+
assertThat(translog3.getSourceAsBytesRef().utf8ToString(), equalTo("""
1011+
{"title":["t1","t2","t3"]}"""));
1012+
}
1013+
case DISABLED -> {
1014+
assertNull(translog1.getSourceAsBytesRef());
1015+
assertNull(translog2.getSourceAsBytesRef());
1016+
assertNull(translog3.getSourceAsBytesRef());
1017+
}
1018+
}
1019+
assertFalse(client().prepareGet(index, "1").setRealtime(false).get().isExists());
1020+
assertFalse(client().prepareGet(index, "2").setRealtime(false).get().isExists());
1021+
assertFalse(client().prepareGet(index, "3").setRealtime(false).get().isExists());
1022+
refresh(index);
1023+
GetResponse lucene1 = client().prepareGet(index, "1").setRealtime(randomBoolean()).get();
1024+
GetResponse lucene2 = client().prepareGet(index, "2").setRealtime(randomBoolean()).get();
1025+
GetResponse lucene3 = client().prepareGet(index, "3").setRealtime(randomBoolean()).get();
1026+
assertTrue(lucene1.isExists());
1027+
assertTrue(lucene2.isExists());
1028+
assertTrue(lucene3.isExists());
1029+
assertThat(translog1.getSourceAsBytesRef(), equalTo(lucene1.getSourceAsBytesRef()));
1030+
assertThat(translog2.getSourceAsBytesRef(), equalTo(lucene2.getSourceAsBytesRef()));
1031+
assertThat(translog3.getSourceAsBytesRef(), equalTo(lucene3.getSourceAsBytesRef()));
1032+
}
1033+
9351034
private void assertGetFieldsAlwaysWorks(String index, String docId, String[] fields) {
9361035
assertGetFieldsAlwaysWorks(index, docId, fields, null);
9371036
}

server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -819,7 +819,7 @@ private GetResult getFromTranslog(
819819
) throws IOException {
820820
assert get.isReadFromTranslog();
821821
translogGetCount.incrementAndGet();
822-
final TranslogDirectoryReader inMemoryReader = new TranslogDirectoryReader(
822+
final DirectoryReader inMemoryReader = TranslogDirectoryReader.create(
823823
shardId,
824824
index,
825825
mappingLookup,

server/src/main/java/org/elasticsearch/index/engine/TranslogDirectoryReader.java

Lines changed: 102 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99

1010
package org.elasticsearch.index.engine;
1111

12+
import org.apache.lucene.codecs.StoredFieldsReader;
1213
import org.apache.lucene.index.BaseTermsEnum;
1314
import org.apache.lucene.index.BinaryDocValues;
1415
import org.apache.lucene.index.ByteVectorValues;
@@ -45,6 +46,9 @@
4546
import org.apache.lucene.util.Bits;
4647
import org.apache.lucene.util.BytesRef;
4748
import org.elasticsearch.common.bytes.BytesReference;
49+
import org.elasticsearch.common.lucene.Lucene;
50+
import org.elasticsearch.common.lucene.index.ElasticsearchDirectoryReader;
51+
import org.elasticsearch.common.lucene.index.SequentialStoredFieldsLeafReader;
4852
import org.elasticsearch.common.xcontent.XContentHelper;
4953
import org.elasticsearch.core.IOUtils;
5054
import org.elasticsearch.index.fieldvisitor.FieldNamesProvidingStoredFieldsVisitor;
@@ -75,21 +79,49 @@
7579
* into an in-memory Lucene segment that is created on-demand.
7680
*/
7781
final class TranslogDirectoryReader extends DirectoryReader {
78-
private final TranslogLeafReader leafReader;
82+
private final LeafReader leafReader;
7983

80-
TranslogDirectoryReader(
84+
static DirectoryReader create(
8185
ShardId shardId,
8286
Translog.Index operation,
8387
MappingLookup mappingLookup,
8488
DocumentParser documentParser,
8589
EngineConfig engineConfig,
8690
Runnable onSegmentCreated
8791
) throws IOException {
88-
this(new TranslogLeafReader(shardId, operation, mappingLookup, documentParser, engineConfig, onSegmentCreated));
92+
final Directory directory = new ByteBuffersDirectory();
93+
boolean success = false;
94+
try {
95+
final LeafReader leafReader;
96+
// When using synthetic source, the translog operation must always be reindexed into an in-memory Lucene to ensure consistent
97+
// output for realtime-get operations. However, this can degrade the performance of realtime-get and update operations.
98+
// If slight inconsistencies in realtime-get operations are acceptable, the translog operation can be reindexed lazily.
99+
if (mappingLookup.isSourceSynthetic()) {
100+
onSegmentCreated.run();
101+
leafReader = createInMemoryReader(shardId, engineConfig, directory, documentParser, mappingLookup, false, operation);
102+
} else {
103+
leafReader = new TranslogLeafReader(
104+
shardId,
105+
operation,
106+
mappingLookup,
107+
documentParser,
108+
engineConfig,
109+
directory,
110+
onSegmentCreated
111+
);
112+
}
113+
var directoryReader = ElasticsearchDirectoryReader.wrap(new TranslogDirectoryReader(directory, leafReader), shardId);
114+
success = true;
115+
return directoryReader;
116+
} finally {
117+
if (success == false) {
118+
IOUtils.closeWhileHandlingException(directory);
119+
}
120+
}
89121
}
90122

91-
private TranslogDirectoryReader(TranslogLeafReader leafReader) throws IOException {
92-
super(leafReader.directory, new LeafReader[] { leafReader }, null);
123+
private TranslogDirectoryReader(Directory directory, LeafReader leafReader) throws IOException {
124+
super(directory, new LeafReader[] { leafReader }, null);
93125
this.leafReader = leafReader;
94126
}
95127

@@ -138,12 +170,13 @@ public CacheHelper getReaderCacheHelper() {
138170
return leafReader.getReaderCacheHelper();
139171
}
140172

141-
static DirectoryReader createInMemoryReader(
173+
private static LeafReader createInMemoryReader(
142174
ShardId shardId,
143175
EngineConfig engineConfig,
144176
Directory directory,
145177
DocumentParser documentParser,
146178
MappingLookup mappingLookup,
179+
boolean rootDocOnly,
147180
Translog.Index operation
148181
) {
149182
final ParsedDocument parsedDocs = documentParser.parseDocument(
@@ -158,13 +191,55 @@ static DirectoryReader createInMemoryReader(
158191
IndexWriterConfig.OpenMode.CREATE
159192
).setCodec(engineConfig.getCodec());
160193
try (IndexWriter writer = new IndexWriter(directory, writeConfig)) {
161-
writer.addDocuments(parsedDocs.docs());
194+
final int numDocs;
195+
if (rootDocOnly) {
196+
numDocs = 1;
197+
writer.addDocument(parsedDocs.rootDoc());
198+
} else {
199+
numDocs = parsedDocs.docs().size();
200+
writer.addDocuments(parsedDocs.docs());
201+
}
162202
final DirectoryReader reader = open(writer);
163-
if (reader.leaves().size() != 1) {
203+
if (reader.leaves().size() != 1 || reader.leaves().get(0).reader().numDocs() != numDocs) {
164204
reader.close();
165-
throw new IllegalStateException("Expected a single segment; " + "but got[" + reader.leaves().size() + "] segments");
205+
throw new IllegalStateException(
206+
"Expected a single segment with "
207+
+ numDocs
208+
+ " documents, "
209+
+ "but ["
210+
+ reader.leaves().size()
211+
+ " segments with "
212+
+ reader.leaves().get(0).reader().numDocs()
213+
+ " documents"
214+
);
166215
}
167-
return reader;
216+
LeafReader leafReader = reader.leaves().get(0).reader();
217+
return new SequentialStoredFieldsLeafReader(leafReader) {
218+
@Override
219+
protected void doClose() throws IOException {
220+
IOUtils.close(super::doClose, directory);
221+
}
222+
223+
@Override
224+
public CacheHelper getCoreCacheHelper() {
225+
return leafReader.getCoreCacheHelper();
226+
}
227+
228+
@Override
229+
public CacheHelper getReaderCacheHelper() {
230+
return leafReader.getReaderCacheHelper();
231+
}
232+
233+
@Override
234+
public StoredFieldsReader getSequentialStoredFieldsReader() {
235+
return Lucene.segmentReader(leafReader).getFieldsReader().getMergeInstance();
236+
}
237+
238+
@Override
239+
protected StoredFieldsReader doGetSequentialStoredFieldsReader(StoredFieldsReader reader) {
240+
return reader;
241+
}
242+
};
168243
} catch (IOException e) {
169244
throw new EngineException(shardId, "failed to create an in-memory segment for get [" + operation.id() + "]", e);
170245
}
@@ -248,6 +323,7 @@ private static class TranslogLeafReader extends LeafReader {
248323
MappingLookup mappingLookup,
249324
DocumentParser documentParser,
250325
EngineConfig engineConfig,
326+
Directory directory,
251327
Runnable onSegmentCreated
252328
) {
253329
this.shardId = shardId;
@@ -256,7 +332,7 @@ private static class TranslogLeafReader extends LeafReader {
256332
this.documentParser = documentParser;
257333
this.engineConfig = engineConfig;
258334
this.onSegmentCreated = onSegmentCreated;
259-
this.directory = new ByteBuffersDirectory();
335+
this.directory = directory;
260336
this.uid = Uid.encodeId(operation.id());
261337
}
262338

@@ -268,7 +344,15 @@ private LeafReader getDelegate() {
268344
ensureOpen();
269345
reader = delegate.get();
270346
if (reader == null) {
271-
var indexReader = createInMemoryReader(shardId, engineConfig, directory, documentParser, mappingLookup, operation);
347+
var indexReader = createInMemoryReader(
348+
shardId,
349+
engineConfig,
350+
directory,
351+
documentParser,
352+
mappingLookup,
353+
true,
354+
operation
355+
);
272356
reader = indexReader.leaves().get(0).reader();
273357
final LeafReader existing = delegate.getAndSet(reader);
274358
assert existing == null;
@@ -458,7 +542,12 @@ private void readStoredFieldsDirectly(StoredFieldVisitor visitor) throws IOExcep
458542

459543
@Override
460544
protected synchronized void doClose() throws IOException {
461-
IOUtils.close(delegate.get(), directory);
545+
final LeafReader leaf = delegate.get();
546+
if (leaf != null) {
547+
leaf.close();
548+
} else {
549+
directory.close();
550+
}
462551
}
463552
}
464553

server/src/main/java/org/elasticsearch/index/engine/TranslogOperationAsserter.java

Lines changed: 1 addition & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,6 @@
1010
package org.elasticsearch.index.engine;
1111

1212
import org.apache.lucene.search.similarities.BM25Similarity;
13-
import org.apache.lucene.store.ByteBuffersDirectory;
14-
import org.elasticsearch.common.lucene.index.ElasticsearchDirectoryReader;
1513
import org.elasticsearch.index.cache.query.TrivialQueryCachingPolicy;
1614
import org.elasticsearch.index.mapper.DocumentParser;
1715
import org.elasticsearch.index.mapper.MappingLookup;
@@ -53,13 +51,7 @@ static Translog.Index synthesizeSource(EngineConfig engineConfig, Translog.Index
5351
final ShardId shardId = engineConfig.getShardId();
5452
final MappingLookup mappingLookup = engineConfig.getMapperService().mappingLookup();
5553
final DocumentParser documentParser = engineConfig.getMapperService().documentParser();
56-
try (
57-
var directory = new ByteBuffersDirectory();
58-
var reader = ElasticsearchDirectoryReader.wrap(
59-
TranslogDirectoryReader.createInMemoryReader(shardId, engineConfig, directory, documentParser, mappingLookup, op),
60-
new ShardId("index", "_na_", 0)
61-
)
62-
) {
54+
try (var reader = TranslogDirectoryReader.create(shardId, op, mappingLookup, documentParser, engineConfig, () -> {})) {
6355
final Engine.Searcher searcher = new Engine.Searcher(
6456
"assert_translog",
6557
reader,

0 commit comments

Comments
 (0)