Skip to content

Commit fe8901b

Browse files
committed
Return consistent source in updates (#48707)
1 parent 5bea389 commit fe8901b

File tree

5 files changed

+83
-30
lines changed

5 files changed

+83
-30
lines changed

server/src/main/java/org/elasticsearch/index/engine/Engine.java

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -628,7 +628,7 @@ protected final GetResult getFromSearcher(Get get, BiFunction<String, SearcherSc
628628
if (docIdAndVersion != null) {
629629
// don't release the searcher on this path, it is the
630630
// responsibility of the caller to call GetResult.release
631-
return new GetResult(searcher, docIdAndVersion);
631+
return new GetResult(searcher, docIdAndVersion, false);
632632
} else {
633633
Releasables.close(searcher);
634634
return GetResult.NOT_EXISTS;
@@ -1650,21 +1650,20 @@ public static class GetResult implements Releasable {
16501650
private final long version;
16511651
private final DocIdAndVersion docIdAndVersion;
16521652
private final Engine.Searcher searcher;
1653+
private final boolean fromTranslog;
16531654

1654-
public static final GetResult NOT_EXISTS = new GetResult(false, Versions.NOT_FOUND, null, null);
1655+
public static final GetResult NOT_EXISTS = new GetResult(false, Versions.NOT_FOUND, null, null, false);
16551656

1656-
private GetResult(boolean exists, long version, DocIdAndVersion docIdAndVersion, Engine.Searcher searcher) {
1657+
private GetResult(boolean exists, long version, DocIdAndVersion docIdAndVersion, Engine.Searcher searcher, boolean fromTranslog) {
16571658
this.exists = exists;
16581659
this.version = version;
16591660
this.docIdAndVersion = docIdAndVersion;
16601661
this.searcher = searcher;
1662+
this.fromTranslog = fromTranslog;
16611663
}
16621664

1663-
/**
1664-
* Build a non-realtime get result from the searcher.
1665-
*/
1666-
public GetResult(Engine.Searcher searcher, DocIdAndVersion docIdAndVersion) {
1667-
this(true, docIdAndVersion.version, docIdAndVersion, searcher);
1665+
public GetResult(Engine.Searcher searcher, DocIdAndVersion docIdAndVersion, boolean fromTranslog) {
1666+
this(true, docIdAndVersion.version, docIdAndVersion, searcher, fromTranslog);
16681667
}
16691668

16701669
public boolean exists() {
@@ -1675,6 +1674,10 @@ public long version() {
16751674
return this.version;
16761675
}
16771676

1677+
public boolean isFromTranslog() {
1678+
return fromTranslog;
1679+
}
1680+
16781681
public Engine.Searcher searcher() {
16791682
return this.searcher;
16801683
}

server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -680,7 +680,7 @@ public GetResult get(Get get, BiFunction<String, SearcherScope, Engine.Searcher>
680680
return new GetResult(new Engine.Searcher("realtime_get", reader,
681681
IndexSearcher.getDefaultSimilarity(), null, IndexSearcher.getDefaultQueryCachingPolicy(), reader),
682682
new VersionsAndSeqNoResolver.DocIdAndVersion(0, index.version(), index.seqNo(), index.primaryTerm(),
683-
reader, 0));
683+
reader, 0), true);
684684
}
685685
} catch (IOException e) {
686686
maybeFailEngine("realtime_get", e); // lets check if the translog has failed with a tragic event

server/src/main/java/org/elasticsearch/index/get/ShardGetService.java

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -168,6 +168,7 @@ private GetResult innerGet(String type, String id, String[] gFields, boolean rea
168168
Term uidTerm = new Term(IdFieldMapper.NAME, Uid.encodeId(id));
169169
get = indexShard.get(new Engine.Get(realtime, readFromTranslog, type, id, uidTerm)
170170
.version(version).versionType(versionType).setIfSeqNo(ifSeqNo).setIfPrimaryTerm(ifPrimaryTerm));
171+
assert get.isFromTranslog() == false || readFromTranslog : "should only read from translog if explicitly enabled";
171172
if (get.exists() == false) {
172173
get.close();
173174
}
@@ -206,7 +207,7 @@ private GetResult innerGetLoadFromStoredFields(String type, String id, String[]
206207
metaDataFields = new HashMap<>();
207208
for (Map.Entry<String, List<Object>> entry : fieldVisitor.fields().entrySet()) {
208209
if (MapperService.isMetadataField(entry.getKey())) {
209-
metaDataFields.put(entry.getKey(), new DocumentField(entry.getKey(), entry.getValue()));
210+
metaDataFields.put(entry.getKey(), new DocumentField(entry.getKey(), entry.getValue()));
210211
} else {
211212
documentFields.put(entry.getKey(), new DocumentField(entry.getKey(), entry.getValue()));
212213
}
@@ -230,12 +231,22 @@ private GetResult innerGetLoadFromStoredFields(String type, String id, String[]
230231

231232
if (!fetchSourceContext.fetchSource()) {
232233
source = null;
233-
} else if (fetchSourceContext.includes().length > 0 || fetchSourceContext.excludes().length > 0) {
234+
}
235+
236+
if (source != null && get.isFromTranslog()) {
237+
// reapply source filters from mapping (possibly also nulling the source)
238+
try {
239+
source = docMapper.sourceMapper().applyFilters(source, null);
240+
} catch (IOException e) {
241+
throw new ElasticsearchException("Failed to reapply filters for [" + id + "] after reading from translog", e);
242+
}
243+
}
244+
245+
if (source != null && (fetchSourceContext.includes().length > 0 || fetchSourceContext.excludes().length > 0)) {
234246
Map<String, Object> sourceAsMap;
235-
XContentType sourceContentType = null;
236247
// TODO: The source might parsed and available in the sourceLookup but that one uses unordered maps so different. Do we care?
237248
Tuple<XContentType, Map<String, Object>> typeMapTuple = XContentHelper.convertToMap(source, true);
238-
sourceContentType = typeMapTuple.v1();
249+
XContentType sourceContentType = typeMapTuple.v1();
239250
sourceAsMap = typeMapTuple.v2();
240251
sourceAsMap = XContentMapValues.filter(sourceAsMap, fetchSourceContext.includes(), fetchSourceContext.excludes());
241252
try {

server/src/main/java/org/elasticsearch/index/mapper/SourceFieldMapper.java

Lines changed: 28 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import org.apache.lucene.index.IndexableField;
2626
import org.apache.lucene.search.Query;
2727
import org.apache.lucene.util.BytesRef;
28+
import org.elasticsearch.common.Nullable;
2829
import org.elasticsearch.common.Strings;
2930
import org.elasticsearch.common.bytes.BytesReference;
3031
import org.elasticsearch.common.collect.Tuple;
@@ -227,33 +228,43 @@ public void parse(ParseContext context) throws IOException {
227228
@Override
228229
protected void parseCreateField(ParseContext context, List<IndexableField> fields) throws IOException {
229230
BytesReference originalSource = context.sourceToParse().source();
230-
BytesReference source = originalSource;
231-
if (enabled && fieldType().stored() && source != null) {
231+
XContentType contentType = context.sourceToParse().getXContentType();
232+
final BytesReference adaptedSource = applyFilters(originalSource, contentType);
233+
234+
if (adaptedSource != null) {
235+
final BytesRef ref = adaptedSource.toBytesRef();
236+
fields.add(new StoredField(fieldType().name(), ref.bytes, ref.offset, ref.length));
237+
}
238+
239+
if (originalSource != null && adaptedSource != originalSource && context.indexSettings().isSoftDeleteEnabled()) {
240+
// if we omitted source or modified it we add the _recovery_source to ensure we have it for ops based recovery
241+
BytesRef ref = originalSource.toBytesRef();
242+
fields.add(new StoredField(RECOVERY_SOURCE_NAME, ref.bytes, ref.offset, ref.length));
243+
fields.add(new NumericDocValuesField(RECOVERY_SOURCE_NAME, 1));
244+
}
245+
}
246+
247+
@Nullable
248+
public BytesReference applyFilters(@Nullable BytesReference originalSource, @Nullable XContentType contentType) throws IOException {
249+
if (enabled && fieldType().stored() && originalSource != null) {
232250
// Percolate and tv APIs may not set the source and that is ok, because these APIs will not index any data
233251
if (filter != null) {
234252
// we don't update the context source if we filter, we want to keep it as is...
235253
Tuple<XContentType, Map<String, Object>> mapTuple =
236-
XContentHelper.convertToMap(source, true, context.sourceToParse().getXContentType());
254+
XContentHelper.convertToMap(originalSource, true, contentType);
237255
Map<String, Object> filteredSource = filter.apply(mapTuple.v2());
238256
BytesStreamOutput bStream = new BytesStreamOutput();
239-
XContentType contentType = mapTuple.v1();
240-
XContentBuilder builder = XContentFactory.contentBuilder(contentType, bStream).map(filteredSource);
257+
XContentType actualContentType = mapTuple.v1();
258+
XContentBuilder builder = XContentFactory.contentBuilder(actualContentType, bStream).map(filteredSource);
241259
builder.close();
242-
source = bStream.bytes();
260+
return bStream.bytes();
261+
} else {
262+
return originalSource;
243263
}
244-
BytesRef ref = source.toBytesRef();
245-
fields.add(new StoredField(fieldType().name(), ref.bytes, ref.offset, ref.length));
246264
} else {
247-
source = null;
265+
return null;
248266
}
249-
250-
if (originalSource != null && source != originalSource && context.indexSettings().isSoftDeleteEnabled()) {
251-
// if we omitted source or modified it we add the _recovery_source to ensure we have it for ops based recovery
252-
BytesRef ref = originalSource.toBytesRef();
253-
fields.add(new StoredField(RECOVERY_SOURCE_NAME, ref.bytes, ref.offset, ref.length));
254-
fields.add(new NumericDocValuesField(RECOVERY_SOURCE_NAME, 1));
255-
}
256-
}
267+
}
257268

258269
@Override
259270
protected String contentType() {

server/src/test/java/org/elasticsearch/index/shard/ShardGetServiceTests.java

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,34 @@ public void testGetForUpdate() throws IOException {
8989
closeShards(primary);
9090
}
9191

92+
public void testGetFromTranslogWithSourceMappingOptions() throws IOException {
93+
Settings settings = Settings.builder().put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT)
94+
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1)
95+
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
96+
.build();
97+
String docToIndex = "{\"foo\" : \"foo\", \"bar\" : \"bar\"}";
98+
boolean noSource = randomBoolean();
99+
String sourceOptions = noSource ? "\"enabled\": false" : randomBoolean() ? "\"excludes\": [\"fo*\"]" : "\"includes\": [\"ba*\"]";
100+
String expectedResult = noSource ? "" : "{\"bar\":\"bar\"}";
101+
IndexMetaData metaData = IndexMetaData.builder("test")
102+
.putMapping("test", "{ \"properties\": { \"foo\": { \"type\": \"text\"}, \"bar\": { \"type\": \"text\"}}, \"_source\": { "
103+
+ sourceOptions + "}}}")
104+
.settings(settings)
105+
.primaryTerm(0, 1).build();
106+
IndexShard primary = newShard(new ShardId(metaData.getIndex(), 0), true, "n1", metaData, null);
107+
recoverShardFromStore(primary);
108+
Engine.IndexResult test = indexDoc(primary, "test", "0", docToIndex);
109+
assertTrue(primary.getEngine().refreshNeeded());
110+
GetResult testGet = primary.getService().getForUpdate("test", "0", UNASSIGNED_SEQ_NO, UNASSIGNED_PRIMARY_TERM);
111+
assertFalse(testGet.getFields().containsKey(RoutingFieldMapper.NAME));
112+
assertEquals(new String(testGet.source() == null ? new byte[0] : testGet.source(), StandardCharsets.UTF_8), expectedResult);
113+
try (Engine.Searcher searcher = primary.getEngine().acquireSearcher("test", Engine.SearcherScope.INTERNAL)) {
114+
assertEquals(searcher.getIndexReader().maxDoc(), 1); // we refreshed
115+
}
116+
117+
closeShards(primary);
118+
}
119+
92120
public void testTypelessGetForUpdate() throws IOException {
93121
Settings settings = Settings.builder().put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT)
94122
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1)

0 commit comments

Comments
 (0)