Skip to content

Commit 8786286

Browse files
committed
Allow realtime get to read from translog (#48843)
The realtime GET API currently has erratic performance in case where a document is accessed that has just been indexed but not refreshed yet, as the implementation will currently force an internal refresh in that case. Refreshing can be an expensive operation, and also will block the thread that executes the GET operation, blocking other GETs to be processed. In case of frequent access of recently indexed documents, this can lead to a refresh storm and terrible GET performance. While older versions of Elasticsearch (2.x and older) did not trigger refreshes and instead opted to read from the translog in case of realtime GET API or update API, this was removed in 5.0 (#20102) to avoid inconsistencies between values that were returned from the translog and those returned by the index. This was partially reverted in 6.3 (#29264) to allow _update and upsert to read from the translog again as it was easier to guarantee consistency for these, and also brought back more predictable performance characteristics of this API. Calls to the realtime GET API, however, would still always do a refresh if necessary to return consistent results. This means that users that were calling realtime GET APIs to coordinate updates on client side (realtime GET + CAS for conditional index of updated doc) would still see very erratic performance. This PR (together with #48707) resolves the inconsistencies between reading from translog and index. In particular it fixes the inconsistencies that happen when requesting stored fields, which were not available when reading from translog. In case where stored fields are requested, this PR will reparse the _source from the translog and derive the stored fields to be returned. With this, it changes the realtime GET API to allow reading from the translog again, avoid refresh storms and blocking the GET threadpool, and provide overall much better and predictable performance for this API.
1 parent ff6c121 commit 8786286

File tree

7 files changed

+181
-43
lines changed

7 files changed

+181
-43
lines changed

docs/reference/docs/get.asciidoc

+5-5
Original file line numberDiff line numberDiff line change
@@ -35,11 +35,11 @@ that it exists.
3535
===== Realtime
3636

3737
By default, the get API is realtime, and is not affected by the refresh
38-
rate of the index (when data will become visible for search). If a document
39-
has been updated but is not yet refreshed, the get API will issue a refresh
40-
call in-place to make the document visible. This will also make other documents
41-
changed since the last refresh visible. In order to disable realtime GET,
42-
one can set the `realtime` parameter to `false`.
38+
rate of the index (when data will become visible for search). In case where
39+
stored fields are requested (see `stored_fields` parameter) and the document
40+
has been updated but is not yet refreshed, the get API will have to parse
41+
and analyze the source to extract the stored fields. In order to disable
42+
realtime GET, the `realtime` parameter can be set to `false`.
4343

4444
[float]
4545
[[get-source-filtering]]

server/src/main/java/org/elasticsearch/index/engine/TranslogLeafReader.java

+5-2
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import org.apache.lucene.index.Terms;
3636
import org.apache.lucene.util.Bits;
3737
import org.apache.lucene.util.BytesRef;
38+
import org.elasticsearch.common.util.set.Sets;
3839
import org.elasticsearch.index.mapper.IdFieldMapper;
3940
import org.elasticsearch.index.mapper.RoutingFieldMapper;
4041
import org.elasticsearch.index.mapper.SourceFieldMapper;
@@ -44,11 +45,12 @@
4445
import java.io.IOException;
4546
import java.nio.charset.StandardCharsets;
4647
import java.util.Collections;
48+
import java.util.Set;
4749

4850
/**
4951
* Internal class that mocks a single doc read from the transaction log as a leaf reader.
5052
*/
51-
final class TranslogLeafReader extends LeafReader {
53+
public final class TranslogLeafReader extends LeafReader {
5254

5355
private final Translog.Index operation;
5456
private static final FieldInfo FAKE_SOURCE_FIELD
@@ -60,6 +62,7 @@ final class TranslogLeafReader extends LeafReader {
6062
private static final FieldInfo FAKE_ID_FIELD
6163
= new FieldInfo(IdFieldMapper.NAME, 3, false, false, false, IndexOptions.NONE, DocValuesType.NONE, -1, Collections.emptyMap(),
6264
0, 0, 0, false);
65+
public static Set<String> ALL_FIELD_NAMES = Sets.newHashSet(FAKE_SOURCE_FIELD.name, FAKE_ROUTING_FIELD.name, FAKE_ID_FIELD.name);
6366

6467
TranslogLeafReader(Translog.Index operation) {
6568
this.operation = operation;
@@ -161,7 +164,7 @@ public void document(int docID, StoredFieldVisitor visitor) throws IOException {
161164
BytesRef bytesRef = Uid.encodeId(operation.id());
162165
final byte[] id = new byte[bytesRef.length];
163166
System.arraycopy(bytesRef.bytes, bytesRef.offset, id, 0, bytesRef.length);
164-
visitor.stringField(FAKE_ID_FIELD, id);
167+
visitor.binaryField(FAKE_ID_FIELD, id);
165168
}
166169
}
167170

server/src/main/java/org/elasticsearch/index/fieldvisitor/CustomFieldsVisitor.java

+1-2
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020

2121
import org.apache.lucene.index.FieldInfo;
2222

23-
import java.io.IOException;
2423
import java.util.Set;
2524

2625
/**
@@ -39,7 +38,7 @@ public CustomFieldsVisitor(Set<String> fields, boolean loadSource) {
3938
}
4039

4140
@Override
42-
public Status needsField(FieldInfo fieldInfo) throws IOException {
41+
public Status needsField(FieldInfo fieldInfo) {
4342
if (super.needsField(fieldInfo) == Status.YES) {
4443
return Status.YES;
4544
}

server/src/main/java/org/elasticsearch/index/fieldvisitor/FieldsVisitor.java

+21-10
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,6 @@
3232
import org.elasticsearch.index.mapper.SourceFieldMapper;
3333
import org.elasticsearch.index.mapper.Uid;
3434

35-
import java.io.IOException;
3635
import java.nio.charset.StandardCharsets;
3736
import java.util.ArrayList;
3837
import java.util.HashMap;
@@ -72,7 +71,7 @@ public FieldsVisitor(boolean loadSource, String sourceFieldName) {
7271
}
7372

7473
@Override
75-
public Status needsField(FieldInfo fieldInfo) throws IOException {
74+
public Status needsField(FieldInfo fieldInfo) {
7675
if (requiredFields.remove(fieldInfo.name)) {
7776
return Status.YES;
7877
}
@@ -108,42 +107,54 @@ public void postProcess(MapperService mapperService) {
108107
}
109108

110109
@Override
111-
public void binaryField(FieldInfo fieldInfo, byte[] value) throws IOException {
110+
public void binaryField(FieldInfo fieldInfo, byte[] value) {
111+
binaryField(fieldInfo, new BytesRef(value));
112+
}
113+
114+
public void binaryField(FieldInfo fieldInfo, BytesRef value) {
112115
if (sourceFieldName.equals(fieldInfo.name)) {
113116
source = new BytesArray(value);
114117
} else if (IdFieldMapper.NAME.equals(fieldInfo.name)) {
115-
id = Uid.decodeId(value);
118+
id = Uid.decodeId(value.bytes, value.offset, value.length);
116119
} else {
117-
addValue(fieldInfo.name, new BytesRef(value));
120+
addValue(fieldInfo.name, value);
118121
}
119122
}
120123

121124
@Override
122-
public void stringField(FieldInfo fieldInfo, byte[] bytes) throws IOException {
125+
public void stringField(FieldInfo fieldInfo, byte[] bytes) {
126+
assert IdFieldMapper.NAME.equals(fieldInfo.name) == false : "_id field must go through binaryField";
127+
assert sourceFieldName.equals(fieldInfo.name) == false : "source field must go through binaryField";
123128
final String value = new String(bytes, StandardCharsets.UTF_8);
124129
addValue(fieldInfo.name, value);
125130
}
126131

127132
@Override
128-
public void intField(FieldInfo fieldInfo, int value) throws IOException {
133+
public void intField(FieldInfo fieldInfo, int value) {
129134
addValue(fieldInfo.name, value);
130135
}
131136

132137
@Override
133-
public void longField(FieldInfo fieldInfo, long value) throws IOException {
138+
public void longField(FieldInfo fieldInfo, long value) {
134139
addValue(fieldInfo.name, value);
135140
}
136141

137142
@Override
138-
public void floatField(FieldInfo fieldInfo, float value) throws IOException {
143+
public void floatField(FieldInfo fieldInfo, float value) {
139144
addValue(fieldInfo.name, value);
140145
}
141146

142147
@Override
143-
public void doubleField(FieldInfo fieldInfo, double value) throws IOException {
148+
public void doubleField(FieldInfo fieldInfo, double value) {
144149
addValue(fieldInfo.name, value);
145150
}
146151

152+
public void objectField(FieldInfo fieldInfo, Object object) {
153+
assert IdFieldMapper.NAME.equals(fieldInfo.name) == false : "_id field must go through binaryField";
154+
assert sourceFieldName.equals(fieldInfo.name) == false : "source field must go through binaryField";
155+
addValue(fieldInfo.name, object);
156+
}
157+
147158
public BytesReference source() {
148159
return source;
149160
}

server/src/main/java/org/elasticsearch/index/get/ShardGetService.java

+106-21
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,12 @@
1919

2020
package org.elasticsearch.index.get;
2121

22+
import org.apache.lucene.index.DocValuesType;
23+
import org.apache.lucene.index.FieldInfo;
24+
import org.apache.lucene.index.IndexOptions;
25+
import org.apache.lucene.index.IndexableField;
26+
import org.apache.lucene.index.IndexableFieldType;
27+
import org.apache.lucene.index.StoredFieldVisitor;
2228
import org.apache.lucene.index.Term;
2329
import org.elasticsearch.ElasticsearchException;
2430
import org.elasticsearch.common.Nullable;
@@ -37,24 +43,29 @@
3743
import org.elasticsearch.index.IndexSettings;
3844
import org.elasticsearch.index.VersionType;
3945
import org.elasticsearch.index.engine.Engine;
46+
import org.elasticsearch.index.engine.TranslogLeafReader;
4047
import org.elasticsearch.index.fieldvisitor.CustomFieldsVisitor;
4148
import org.elasticsearch.index.fieldvisitor.FieldsVisitor;
4249
import org.elasticsearch.index.mapper.DocumentMapper;
4350
import org.elasticsearch.index.mapper.IdFieldMapper;
4451
import org.elasticsearch.index.mapper.Mapper;
4552
import org.elasticsearch.index.mapper.MapperService;
53+
import org.elasticsearch.index.mapper.ParsedDocument;
4654
import org.elasticsearch.index.mapper.RoutingFieldMapper;
4755
import org.elasticsearch.index.mapper.SourceFieldMapper;
56+
import org.elasticsearch.index.mapper.SourceToParse;
4857
import org.elasticsearch.index.mapper.Uid;
4958
import org.elasticsearch.index.shard.AbstractIndexShardComponent;
5059
import org.elasticsearch.index.shard.IndexShard;
5160
import org.elasticsearch.search.fetch.subphase.FetchSourceContext;
5261

5362
import java.io.IOException;
63+
import java.util.Collections;
5464
import java.util.HashMap;
5565
import java.util.List;
5666
import java.util.Map;
5767
import java.util.concurrent.TimeUnit;
68+
import java.util.stream.Stream;
5869

5970
import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_PRIMARY_TERM;
6071
import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO;
@@ -81,16 +92,16 @@ public GetStats stats() {
8192
public GetResult get(String type, String id, String[] gFields, boolean realtime, long version,
8293
VersionType versionType, FetchSourceContext fetchSourceContext) {
8394
return
84-
get(type, id, gFields, realtime, version, versionType, UNASSIGNED_SEQ_NO, UNASSIGNED_PRIMARY_TERM, fetchSourceContext, false);
95+
get(type, id, gFields, realtime, version, versionType, UNASSIGNED_SEQ_NO, UNASSIGNED_PRIMARY_TERM, fetchSourceContext);
8596
}
8697

8798
private GetResult get(String type, String id, String[] gFields, boolean realtime, long version, VersionType versionType,
88-
long ifSeqNo, long ifPrimaryTerm, FetchSourceContext fetchSourceContext, boolean readFromTranslog) {
99+
long ifSeqNo, long ifPrimaryTerm, FetchSourceContext fetchSourceContext) {
89100
currentMetric.inc();
90101
try {
91102
long now = System.nanoTime();
92103
GetResult getResult =
93-
innerGet(type, id, gFields, realtime, version, versionType, ifSeqNo, ifPrimaryTerm, fetchSourceContext, readFromTranslog);
104+
innerGet(type, id, gFields, realtime, version, versionType, ifSeqNo, ifPrimaryTerm, fetchSourceContext);
94105

95106
if (getResult.isExists()) {
96107
existsMetric.inc(System.nanoTime() - now);
@@ -105,7 +116,7 @@ private GetResult get(String type, String id, String[] gFields, boolean realtime
105116

106117
public GetResult getForUpdate(String type, String id, long ifSeqNo, long ifPrimaryTerm) {
107118
return get(type, id, new String[]{RoutingFieldMapper.NAME}, true,
108-
Versions.MATCH_ANY, VersionType.INTERNAL, ifSeqNo, ifPrimaryTerm, FetchSourceContext.FETCH_SOURCE, true);
119+
Versions.MATCH_ANY, VersionType.INTERNAL, ifSeqNo, ifPrimaryTerm, FetchSourceContext.FETCH_SOURCE);
109120
}
110121

111122
/**
@@ -156,7 +167,7 @@ private FetchSourceContext normalizeFetchSourceContent(@Nullable FetchSourceCont
156167
}
157168

158169
private GetResult innerGet(String type, String id, String[] gFields, boolean realtime, long version, VersionType versionType,
159-
long ifSeqNo, long ifPrimaryTerm, FetchSourceContext fetchSourceContext, boolean readFromTranslog) {
170+
long ifSeqNo, long ifPrimaryTerm, FetchSourceContext fetchSourceContext) {
160171
fetchSourceContext = normalizeFetchSourceContent(fetchSourceContext, gFields);
161172
if (type == null || type.equals("_all")) {
162173
DocumentMapper mapper = mapperService.documentMapper();
@@ -166,9 +177,9 @@ private GetResult innerGet(String type, String id, String[] gFields, boolean rea
166177
Engine.GetResult get = null;
167178
if (type != null) {
168179
Term uidTerm = new Term(IdFieldMapper.NAME, Uid.encodeId(id));
169-
get = indexShard.get(new Engine.Get(realtime, readFromTranslog, type, id, uidTerm)
170-
.version(version).versionType(versionType).setIfSeqNo(ifSeqNo).setIfPrimaryTerm(ifPrimaryTerm));
171-
assert get.isFromTranslog() == false || readFromTranslog : "should only read from translog if explicitly enabled";
180+
get = indexShard.get(new Engine.Get(realtime, realtime, type, id, uidTerm)
181+
.version(version).versionType(versionType).setIfSeqNo(ifSeqNo).setIfPrimaryTerm(ifPrimaryTerm));
182+
assert get.isFromTranslog() == false || realtime : "should only read from translog if realtime enabled";
172183
if (get.exists() == false) {
173184
get.close();
174185
}
@@ -186,13 +197,33 @@ private GetResult innerGet(String type, String id, String[] gFields, boolean rea
186197
}
187198
}
188199

189-
private GetResult innerGetLoadFromStoredFields(String type, String id, String[] gFields, FetchSourceContext fetchSourceContext,
190-
Engine.GetResult get, MapperService mapperService) {
200+
private GetResult innerGetLoadFromStoredFields(String type, String id, String[] storedFields, FetchSourceContext fetchSourceContext,
201+
Engine.GetResult get, MapperService mapperService) {
202+
assert get.exists() : "method should only be called if document could be retrieved";
203+
204+
// check first if stored fields to be loaded don't contain an object field
205+
DocumentMapper docMapper = mapperService.documentMapper();
206+
if (storedFields != null) {
207+
for (String field : storedFields) {
208+
Mapper fieldMapper = docMapper.mappers().getMapper(field);
209+
if (fieldMapper == null) {
210+
if (docMapper.objectMappers().get(field) != null) {
211+
// Only fail if we know it is a object field, missing paths / fields shouldn't fail.
212+
throw new IllegalArgumentException("field [" + field + "] isn't a leaf field");
213+
}
214+
}
215+
}
216+
}
217+
191218
Map<String, DocumentField> documentFields = null;
192219
Map<String, DocumentField> metaDataFields = null;
193220
BytesReference source = null;
194221
DocIdAndVersion docIdAndVersion = get.docIdAndVersion();
195-
FieldsVisitor fieldVisitor = buildFieldsVisitors(gFields, fetchSourceContext);
222+
// force fetching source if we read from translog and need to recreate stored fields
223+
boolean forceSourceForComputingTranslogStoredFields = get.isFromTranslog() && storedFields != null &&
224+
Stream.of(storedFields).anyMatch(f -> TranslogLeafReader.ALL_FIELD_NAMES.contains(f) == false);
225+
FieldsVisitor fieldVisitor = buildFieldsVisitors(storedFields,
226+
forceSourceForComputingTranslogStoredFields ? FetchSourceContext.FETCH_SOURCE : fetchSourceContext);
196227
if (fieldVisitor != null) {
197228
try {
198229
docIdAndVersion.reader.document(docIdAndVersion.docId, fieldVisitor);
@@ -201,6 +232,54 @@ private GetResult innerGetLoadFromStoredFields(String type, String id, String[]
201232
}
202233
source = fieldVisitor.source();
203234

235+
// in case we read from translog, some extra steps are needed to make _source consistent and to load stored fields
236+
if (get.isFromTranslog()) {
237+
// Fast path: if only asked for the source or stored fields that have been already provided by TranslogLeafReader,
238+
// just make source consistent by reapplying source filters from mapping (possibly also nulling the source)
239+
if (forceSourceForComputingTranslogStoredFields == false) {
240+
try {
241+
source = indexShard.mapperService().documentMapper().sourceMapper().applyFilters(source, null);
242+
} catch (IOException e) {
243+
throw new ElasticsearchException("Failed to reapply filters for [" + id + "] after reading from translog", e);
244+
}
245+
} else {
246+
// Slow path: recreate stored fields from original source
247+
assert source != null : "original source in translog must exist";
248+
SourceToParse sourceToParse = new SourceToParse(shardId.getIndexName(), type, id, source,
249+
XContentHelper.xContentType(source), fieldVisitor.routing());
250+
ParsedDocument doc = indexShard.mapperService().documentMapper().parse(sourceToParse);
251+
assert doc.dynamicMappingsUpdate() == null : "mapping updates should not be required on already-indexed doc";
252+
// update special fields
253+
doc.updateSeqID(docIdAndVersion.seqNo, docIdAndVersion.primaryTerm);
254+
doc.version().setLongValue(docIdAndVersion.version);
255+
256+
// retrieve stored fields from parsed doc
257+
fieldVisitor = buildFieldsVisitors(storedFields, fetchSourceContext);
258+
for (IndexableField indexableField : doc.rootDoc().getFields()) {
259+
IndexableFieldType fieldType = indexableField.fieldType();
260+
if (fieldType.stored()) {
261+
FieldInfo fieldInfo = new FieldInfo(indexableField.name(), 0, false, false, false, IndexOptions.NONE,
262+
DocValuesType.NONE, -1, Collections.emptyMap(), 0, 0, 0, false);
263+
StoredFieldVisitor.Status status = fieldVisitor.needsField(fieldInfo);
264+
if (status == StoredFieldVisitor.Status.YES) {
265+
if (indexableField.binaryValue() != null) {
266+
fieldVisitor.binaryField(fieldInfo, indexableField.binaryValue());
267+
} else if (indexableField.stringValue() != null) {
268+
fieldVisitor.objectField(fieldInfo, indexableField.stringValue());
269+
} else if (indexableField.numericValue() != null) {
270+
fieldVisitor.objectField(fieldInfo, indexableField.numericValue());
271+
}
272+
} else if (status == StoredFieldVisitor.Status.STOP) {
273+
break;
274+
}
275+
}
276+
}
277+
// retrieve source (with possible transformations, e.g. source filters
278+
source = fieldVisitor.source();
279+
}
280+
}
281+
282+
// put stored fields into result objects
204283
if (!fieldVisitor.fields().isEmpty()) {
205284
fieldVisitor.postProcess(mapperService);
206285
documentFields = new HashMap<>();
@@ -215,16 +294,22 @@ private GetResult innerGetLoadFromStoredFields(String type, String id, String[]
215294
}
216295
}
217296

218-
DocumentMapper docMapper = mapperService.documentMapper();
219-
220-
if (gFields != null && gFields.length > 0) {
221-
for (String field : gFields) {
222-
Mapper fieldMapper = docMapper.mappers().getMapper(field);
223-
if (fieldMapper == null) {
224-
if (docMapper.objectMappers().get(field) != null) {
225-
// Only fail if we know it is a object field, missing paths / fields shouldn't fail.
226-
throw new IllegalArgumentException("field [" + field + "] isn't a leaf field");
227-
}
297+
if (source != null) {
298+
// apply request-level source filtering
299+
if (fetchSourceContext.fetchSource() == false) {
300+
source = null;
301+
} else if (fetchSourceContext.includes().length > 0 || fetchSourceContext.excludes().length > 0) {
302+
Map<String, Object> sourceAsMap;
303+
// TODO: The source might be parsed and available in the sourceLookup but that one uses unordered maps so different.
304+
// Do we care?
305+
Tuple<XContentType, Map<String, Object>> typeMapTuple = XContentHelper.convertToMap(source, true);
306+
XContentType sourceContentType = typeMapTuple.v1();
307+
sourceAsMap = typeMapTuple.v2();
308+
sourceAsMap = XContentMapValues.filter(sourceAsMap, fetchSourceContext.includes(), fetchSourceContext.excludes());
309+
try {
310+
source = BytesReference.bytes(XContentFactory.contentBuilder(sourceContentType).map(sourceAsMap));
311+
} catch (IOException e) {
312+
throw new ElasticsearchException("Failed to get id [" + id + "] with includes/excludes set", e);
228313
}
229314
}
230315
}

0 commit comments

Comments
 (0)