Skip to content

Fix realtime get of nested fields with synthetic source (#119575) #120247

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jan 16, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/changelog/119575.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 119575
summary: Fix realtime get of nested fields with synthetic source
area: Mapping
type: bug
issues:
- 119553
3 changes: 0 additions & 3 deletions muted-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -410,9 +410,6 @@ tests:
- class: org.elasticsearch.xpack.restart.FullClusterRestartIT
method: testWatcherWithApiKey {cluster=UPGRADED}
issue: https://github.com/elastic/elasticsearch/issues/119396
- class: org.elasticsearch.index.engine.LuceneSyntheticSourceChangesSnapshotTests
method: testSkipNonRootOfNestedDocuments
issue: https://github.com/elastic/elasticsearch/issues/119553
- class: org.elasticsearch.upgrades.SearchStatesIT
method: testCanMatch
issue: https://github.com/elastic/elasticsearch/issues/118718
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.elasticsearch.action.get.MultiGetResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.support.DefaultShardOperationFailedException;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.action.support.broadcast.BroadcastResponse;
import org.elasticsearch.common.Randomness;
import org.elasticsearch.common.Strings;
Expand All @@ -33,8 +34,10 @@
import org.elasticsearch.core.CheckedFunction;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.index.IndexModule;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.engine.EngineTestCase;
import org.elasticsearch.index.engine.VersionConflictEngineException;
import org.elasticsearch.index.mapper.SourceFieldMapper;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.test.ESIntegTestCase;
Expand Down Expand Up @@ -932,6 +935,102 @@ public void testGetRemoteIndex() {
);
}

public void testRealTimeGetNestedFields() {
String index = "test";
SourceFieldMapper.Mode sourceMode = randomFrom(SourceFieldMapper.Mode.values());
assertAcked(
prepareCreate(index).setMapping("title", "type=keyword", "author", "type=nested")
.setSettings(
indexSettings(1, 0).put("index.refresh_interval", -1)
.put(IndexSettings.INDEX_MAPPER_SOURCE_MODE_SETTING.getKey(), sourceMode)
)
);
ensureGreen();
String source0 = """
{
"title": "t0",
"author": [
{
"name": "a0"
}
]
}
""";
prepareIndex(index).setRefreshPolicy(WriteRequest.RefreshPolicy.NONE).setId("0").setSource(source0, XContentType.JSON).get();
// start tracking translog locations
assertTrue(client().prepareGet(index, "0").setRealtime(true).get().isExists());
String source1 = """
{
"title": ["t1"],
"author": [
{
"name": "a1"
}
]
}
""";
prepareIndex(index).setRefreshPolicy(WriteRequest.RefreshPolicy.NONE).setId("1").setSource(source1, XContentType.JSON).get();
String source2 = """
{
"title": ["t1", "t2"],
"author": [
{
"name": "a1"
},
{
"name": "a2"
}
]
}
""";
prepareIndex(index).setRefreshPolicy(WriteRequest.RefreshPolicy.NONE).setId("2").setSource(source2, XContentType.JSON).get();
String source3 = """
{
"title": ["t1", "t3", "t2"]
}
""";
prepareIndex(index).setRefreshPolicy(WriteRequest.RefreshPolicy.NONE).setId("3").setSource(source3, XContentType.JSON).get();
GetResponse translog1 = client().prepareGet(index, "1").setRealtime(true).get();
GetResponse translog2 = client().prepareGet(index, "2").setRealtime(true).get();
GetResponse translog3 = client().prepareGet(index, "3").setRealtime(true).get();
assertTrue(translog1.isExists());
assertTrue(translog2.isExists());
assertTrue(translog3.isExists());
switch (sourceMode) {
case STORED -> {
assertThat(translog1.getSourceAsBytesRef().utf8ToString(), equalTo(source1));
assertThat(translog2.getSourceAsBytesRef().utf8ToString(), equalTo(source2));
assertThat(translog3.getSourceAsBytesRef().utf8ToString(), equalTo(source3));
}
case SYNTHETIC -> {
assertThat(translog1.getSourceAsBytesRef().utf8ToString(), equalTo("""
{"author":{"name":"a1"},"title":"t1"}"""));
assertThat(translog2.getSourceAsBytesRef().utf8ToString(), equalTo("""
{"author":[{"name":"a1"},{"name":"a2"}],"title":["t1","t2"]}"""));
assertThat(translog3.getSourceAsBytesRef().utf8ToString(), equalTo("""
{"title":["t1","t2","t3"]}"""));
}
case DISABLED -> {
assertNull(translog1.getSourceAsBytesRef());
assertNull(translog2.getSourceAsBytesRef());
assertNull(translog3.getSourceAsBytesRef());
}
}
assertFalse(client().prepareGet(index, "1").setRealtime(false).get().isExists());
assertFalse(client().prepareGet(index, "2").setRealtime(false).get().isExists());
assertFalse(client().prepareGet(index, "3").setRealtime(false).get().isExists());
refresh(index);
GetResponse lucene1 = client().prepareGet(index, "1").setRealtime(randomBoolean()).get();
GetResponse lucene2 = client().prepareGet(index, "2").setRealtime(randomBoolean()).get();
GetResponse lucene3 = client().prepareGet(index, "3").setRealtime(randomBoolean()).get();
assertTrue(lucene1.isExists());
assertTrue(lucene2.isExists());
assertTrue(lucene3.isExists());
assertThat(translog1.getSourceAsBytesRef(), equalTo(lucene1.getSourceAsBytesRef()));
assertThat(translog2.getSourceAsBytesRef(), equalTo(lucene2.getSourceAsBytesRef()));
assertThat(translog3.getSourceAsBytesRef(), equalTo(lucene3.getSourceAsBytesRef()));
}

private void assertGetFieldsAlwaysWorks(String index, String docId, String[] fields) {
assertGetFieldsAlwaysWorks(index, docId, fields, null);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -819,7 +819,7 @@ private GetResult getFromTranslog(
) throws IOException {
assert get.isReadFromTranslog();
translogGetCount.incrementAndGet();
final TranslogDirectoryReader inMemoryReader = new TranslogDirectoryReader(
final DirectoryReader inMemoryReader = TranslogDirectoryReader.create(
shardId,
index,
mappingLookup,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

package org.elasticsearch.index.engine;

import org.apache.lucene.codecs.StoredFieldsReader;
import org.apache.lucene.index.BaseTermsEnum;
import org.apache.lucene.index.BinaryDocValues;
import org.apache.lucene.index.ByteVectorValues;
Expand Down Expand Up @@ -45,6 +46,9 @@
import org.apache.lucene.util.Bits;
import org.apache.lucene.util.BytesRef;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.common.lucene.index.ElasticsearchDirectoryReader;
import org.elasticsearch.common.lucene.index.SequentialStoredFieldsLeafReader;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.core.IOUtils;
import org.elasticsearch.index.fieldvisitor.FieldNamesProvidingStoredFieldsVisitor;
Expand Down Expand Up @@ -75,21 +79,49 @@
* into an in-memory Lucene segment that is created on-demand.
*/
final class TranslogDirectoryReader extends DirectoryReader {
private final TranslogLeafReader leafReader;
private final LeafReader leafReader;

TranslogDirectoryReader(
static DirectoryReader create(
ShardId shardId,
Translog.Index operation,
MappingLookup mappingLookup,
DocumentParser documentParser,
EngineConfig engineConfig,
Runnable onSegmentCreated
) throws IOException {
this(new TranslogLeafReader(shardId, operation, mappingLookup, documentParser, engineConfig, onSegmentCreated));
final Directory directory = new ByteBuffersDirectory();
boolean success = false;
try {
final LeafReader leafReader;
// When using synthetic source, the translog operation must always be reindexed into an in-memory Lucene to ensure consistent
// output for realtime-get operations. However, this can degrade the performance of realtime-get and update operations.
// If slight inconsistencies in realtime-get operations are acceptable, the translog operation can be reindexed lazily.
if (mappingLookup.isSourceSynthetic()) {
onSegmentCreated.run();
leafReader = createInMemoryReader(shardId, engineConfig, directory, documentParser, mappingLookup, false, operation);
} else {
leafReader = new TranslogLeafReader(
shardId,
operation,
mappingLookup,
documentParser,
engineConfig,
directory,
onSegmentCreated
);
}
var directoryReader = ElasticsearchDirectoryReader.wrap(new TranslogDirectoryReader(directory, leafReader), shardId);
success = true;
return directoryReader;
} finally {
if (success == false) {
IOUtils.closeWhileHandlingException(directory);
}
}
}

private TranslogDirectoryReader(TranslogLeafReader leafReader) throws IOException {
super(leafReader.directory, new LeafReader[] { leafReader }, null);
private TranslogDirectoryReader(Directory directory, LeafReader leafReader) throws IOException {
super(directory, new LeafReader[] { leafReader }, null);
this.leafReader = leafReader;
}

Expand Down Expand Up @@ -138,12 +170,13 @@ public CacheHelper getReaderCacheHelper() {
return leafReader.getReaderCacheHelper();
}

static DirectoryReader createInMemoryReader(
private static LeafReader createInMemoryReader(
ShardId shardId,
EngineConfig engineConfig,
Directory directory,
DocumentParser documentParser,
MappingLookup mappingLookup,
boolean rootDocOnly,
Translog.Index operation
) {
final ParsedDocument parsedDocs = documentParser.parseDocument(
Expand All @@ -158,13 +191,55 @@ static DirectoryReader createInMemoryReader(
IndexWriterConfig.OpenMode.CREATE
).setCodec(engineConfig.getCodec());
try (IndexWriter writer = new IndexWriter(directory, writeConfig)) {
writer.addDocuments(parsedDocs.docs());
final int numDocs;
if (rootDocOnly) {
numDocs = 1;
writer.addDocument(parsedDocs.rootDoc());
} else {
numDocs = parsedDocs.docs().size();
writer.addDocuments(parsedDocs.docs());
}
final DirectoryReader reader = open(writer);
if (reader.leaves().size() != 1) {
if (reader.leaves().size() != 1 || reader.leaves().get(0).reader().numDocs() != numDocs) {
reader.close();
throw new IllegalStateException("Expected a single segment; " + "but got[" + reader.leaves().size() + "] segments");
throw new IllegalStateException(
"Expected a single segment with "
+ numDocs
+ " documents, "
+ "but ["
+ reader.leaves().size()
+ " segments with "
+ reader.leaves().get(0).reader().numDocs()
+ " documents"
);
}
return reader;
LeafReader leafReader = reader.leaves().get(0).reader();
return new SequentialStoredFieldsLeafReader(leafReader) {
@Override
protected void doClose() throws IOException {
IOUtils.close(super::doClose, directory);
}

@Override
public CacheHelper getCoreCacheHelper() {
return leafReader.getCoreCacheHelper();
}

@Override
public CacheHelper getReaderCacheHelper() {
return leafReader.getReaderCacheHelper();
}

@Override
public StoredFieldsReader getSequentialStoredFieldsReader() {
return Lucene.segmentReader(leafReader).getFieldsReader().getMergeInstance();
}

@Override
protected StoredFieldsReader doGetSequentialStoredFieldsReader(StoredFieldsReader reader) {
return reader;
}
};
} catch (IOException e) {
throw new EngineException(shardId, "failed to create an in-memory segment for get [" + operation.id() + "]", e);
}
Expand Down Expand Up @@ -248,6 +323,7 @@ private static class TranslogLeafReader extends LeafReader {
MappingLookup mappingLookup,
DocumentParser documentParser,
EngineConfig engineConfig,
Directory directory,
Runnable onSegmentCreated
) {
this.shardId = shardId;
Expand All @@ -256,7 +332,7 @@ private static class TranslogLeafReader extends LeafReader {
this.documentParser = documentParser;
this.engineConfig = engineConfig;
this.onSegmentCreated = onSegmentCreated;
this.directory = new ByteBuffersDirectory();
this.directory = directory;
this.uid = Uid.encodeId(operation.id());
}

Expand All @@ -268,7 +344,15 @@ private LeafReader getDelegate() {
ensureOpen();
reader = delegate.get();
if (reader == null) {
var indexReader = createInMemoryReader(shardId, engineConfig, directory, documentParser, mappingLookup, operation);
var indexReader = createInMemoryReader(
shardId,
engineConfig,
directory,
documentParser,
mappingLookup,
true,
operation
);
reader = indexReader.leaves().get(0).reader();
final LeafReader existing = delegate.getAndSet(reader);
assert existing == null;
Expand Down Expand Up @@ -458,7 +542,12 @@ private void readStoredFieldsDirectly(StoredFieldVisitor visitor) throws IOExcep

@Override
protected synchronized void doClose() throws IOException {
IOUtils.close(delegate.get(), directory);
final LeafReader leaf = delegate.get();
if (leaf != null) {
leaf.close();
} else {
directory.close();
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,6 @@
package org.elasticsearch.index.engine;

import org.apache.lucene.search.similarities.BM25Similarity;
import org.apache.lucene.store.ByteBuffersDirectory;
import org.elasticsearch.common.lucene.index.ElasticsearchDirectoryReader;
import org.elasticsearch.index.cache.query.TrivialQueryCachingPolicy;
import org.elasticsearch.index.mapper.DocumentParser;
import org.elasticsearch.index.mapper.MappingLookup;
Expand Down Expand Up @@ -53,13 +51,7 @@ static Translog.Index synthesizeSource(EngineConfig engineConfig, Translog.Index
final ShardId shardId = engineConfig.getShardId();
final MappingLookup mappingLookup = engineConfig.getMapperService().mappingLookup();
final DocumentParser documentParser = engineConfig.getMapperService().documentParser();
try (
var directory = new ByteBuffersDirectory();
var reader = ElasticsearchDirectoryReader.wrap(
TranslogDirectoryReader.createInMemoryReader(shardId, engineConfig, directory, documentParser, mappingLookup, op),
new ShardId("index", "_na_", 0)
)
) {
try (var reader = TranslogDirectoryReader.create(shardId, op, mappingLookup, documentParser, engineConfig, () -> {})) {
final Engine.Searcher searcher = new Engine.Searcher(
"assert_translog",
reader,
Expand Down