Skip to content

Commit c499427

Browse files
authored
Use _refresh instead of reading from Translog in the RT GET case (#20102)
Today we do a lot of accounting inside the engine to maintain locations of documents inside the transaction log. This is only needed to ensure we can return the documents source from the engine if it hasn't been refreshed. Aside of the added complexity to be able to read from the currently writing translog, maintainance of pointers into the translog this also caused inconsistencies like different values of the `_ttl` field if it was read from the tlog or not. TermVectors are totally different if the document is fetched from the tranlog since copy fields are ignored etc. This chance will simply call `refresh` if the documents latest version is not in the index. This streamlines the semantics of the `_get` API and allows for more optimizations inside the engine and on the transaction log. Note: `_refresh` is only called iff the requested document is not refreshed yet but has recently been updated or added. #Relates to #19787
1 parent abc025e commit c499427

File tree

22 files changed

+79
-303
lines changed

22 files changed

+79
-303
lines changed

core/src/main/java/org/elasticsearch/index/engine/DeleteVersionValue.java

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020
package org.elasticsearch.index.engine;
2121

2222
import org.apache.lucene.util.RamUsageEstimator;
23-
import org.elasticsearch.index.translog.Translog;
2423

2524
/** Holds a deleted version, which just adds a timestamp to {@link VersionValue} so we know when we can expire the deletion. */
2625

@@ -30,8 +29,8 @@ class DeleteVersionValue extends VersionValue {
3029

3130
private final long time;
3231

33-
public DeleteVersionValue(long version, long time, Translog.Location translogLocation) {
34-
super(version, translogLocation);
32+
public DeleteVersionValue(long version, long time) {
33+
super(version);
3534
this.time = time;
3635
}
3736

@@ -47,7 +46,6 @@ public boolean delete() {
4746

4847
@Override
4948
public long ramBytesUsed() {
50-
Translog.Location translogLocation = translogLocation();
51-
return BASE_RAM_BYTES_USED + (translogLocation != null ? translogLocation.ramBytesUsed() : 0);
49+
return BASE_RAM_BYTES_USED;
5250
}
5351
}

core/src/main/java/org/elasticsearch/index/engine/Engine.java

Lines changed: 6 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1000,32 +1000,23 @@ public Get versionType(VersionType versionType) {
10001000
public static class GetResult implements Releasable {
10011001
private final boolean exists;
10021002
private final long version;
1003-
private final Translog.Source source;
10041003
private final Versions.DocIdAndVersion docIdAndVersion;
10051004
private final Searcher searcher;
10061005

1007-
public static final GetResult NOT_EXISTS = new GetResult(false, Versions.NOT_FOUND, null);
1006+
public static final GetResult NOT_EXISTS = new GetResult(false, Versions.NOT_FOUND, null, null);
10081007

1009-
/**
1010-
* Build a realtime get result from the translog.
1011-
*/
1012-
public GetResult(boolean exists, long version, @Nullable Translog.Source source) {
1013-
this.source = source;
1008+
private GetResult(boolean exists, long version, Versions.DocIdAndVersion docIdAndVersion, Searcher searcher) {
10141009
this.exists = exists;
10151010
this.version = version;
1016-
this.docIdAndVersion = null;
1017-
this.searcher = null;
1011+
this.docIdAndVersion = docIdAndVersion;
1012+
this.searcher = searcher;
10181013
}
10191014

10201015
/**
10211016
* Build a non-realtime get result from the searcher.
10221017
*/
10231018
public GetResult(Searcher searcher, Versions.DocIdAndVersion docIdAndVersion) {
1024-
this.exists = true;
1025-
this.source = null;
1026-
this.version = docIdAndVersion.version;
1027-
this.docIdAndVersion = docIdAndVersion;
1028-
this.searcher = searcher;
1019+
this(true, docIdAndVersion.version, docIdAndVersion, searcher);
10291020
}
10301021

10311022
public boolean exists() {
@@ -1036,11 +1027,6 @@ public long version() {
10361027
return this.version;
10371028
}
10381029

1039-
@Nullable
1040-
public Translog.Source source() {
1041-
return source;
1042-
}
1043-
10441030
public Searcher searcher() {
10451031
return this.searcher;
10461032
}
@@ -1055,9 +1041,7 @@ public void close() {
10551041
}
10561042

10571043
public void release() {
1058-
if (searcher != null) {
1059-
searcher.close();
1060-
}
1044+
Releasables.close(searcher);
10611045
}
10621046
}
10631047

core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java

Lines changed: 5 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -328,10 +328,7 @@ public GetResult get(Get get, Function<String, Searcher> searcherFactory) throws
328328
throw new VersionConflictEngineException(shardId, uid.type(), uid.id(),
329329
get.versionType().explainConflictForReads(versionValue.version(), get.version()));
330330
}
331-
Translog.Operation op = translog.read(versionValue.translogLocation());
332-
if (op != null) {
333-
return new GetResult(true, versionValue.version(), op.getSource());
334-
}
331+
refresh("realtime_get");
335332
}
336333
}
337334

@@ -368,11 +365,11 @@ private long checkDeletedAndGCed(VersionValue versionValue) {
368365
return currentVersion;
369366
}
370367

371-
private static VersionValueSupplier NEW_VERSION_VALUE = (u, t, l) -> new VersionValue(u, l);
368+
private static VersionValueSupplier NEW_VERSION_VALUE = (u, t) -> new VersionValue(u);
372369

373370
@FunctionalInterface
374371
private interface VersionValueSupplier {
375-
VersionValue apply(long updatedVersion, long time, Translog.Location location);
372+
VersionValue apply(long updatedVersion, long time);
376373
}
377374

378375
private <T extends Engine.Operation> void maybeAddToTranslog(
@@ -383,14 +380,9 @@ private <T extends Engine.Operation> void maybeAddToTranslog(
383380
if (op.origin() != Operation.Origin.LOCAL_TRANSLOG_RECOVERY) {
384381
final Translog.Location translogLocation = translog.add(toTranslogOp.apply(op));
385382
op.setTranslogLocation(translogLocation);
386-
versionMap.putUnderLock(op.uid().bytes(), toVersionValue.apply(updatedVersion, engineConfig.getThreadPool().estimatedTimeInMillis(), op.getTranslogLocation()));
387-
} else {
388-
// we do not replay in to the translog, so there is no
389-
// translog location; that is okay because real-time
390-
// gets are not possible during recovery and we will
391-
// flush when the recovery is complete
392-
versionMap.putUnderLock(op.uid().bytes(), toVersionValue.apply(updatedVersion, engineConfig.getThreadPool().estimatedTimeInMillis(), null));
393383
}
384+
versionMap.putUnderLock(op.uid().bytes(), toVersionValue.apply(updatedVersion, engineConfig.getThreadPool().estimatedTimeInMillis()));
385+
394386
}
395387

396388
@Override

core/src/main/java/org/elasticsearch/index/engine/LiveVersionMap.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ private static class Maps {
4242

4343
// Used while refresh is running, and to hold adds/deletes until refresh finishes. We read from both current and old on lookup:
4444
final Map<BytesRef,VersionValue> old;
45-
45+
4646
public Maps(Map<BytesRef,VersionValue> current, Map<BytesRef,VersionValue> old) {
4747
this.current = current;
4848
this.old = old;
@@ -256,7 +256,7 @@ public long ramBytesUsed() {
256256
return ramBytesUsedCurrent.get() + ramBytesUsedTombstones.get();
257257
}
258258

259-
/** Returns how much RAM would be freed up by refreshing. This is {@link ramBytesUsed} except does not include tombstones because they
259+
/** Returns how much RAM would be freed up by refreshing. This is {@link #ramBytesUsed} except does not include tombstones because they
260260
* don't clear on refresh. */
261261
long ramBytesUsedForRefresh() {
262262
return ramBytesUsedCurrent.get();

core/src/main/java/org/elasticsearch/index/engine/VersionValue.java

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121

2222
import org.apache.lucene.util.Accountable;
2323
import org.apache.lucene.util.RamUsageEstimator;
24-
import org.elasticsearch.index.translog.Translog;
2524

2625
import java.util.Collection;
2726
import java.util.Collections;
@@ -31,11 +30,9 @@ class VersionValue implements Accountable {
3130
private static final long BASE_RAM_BYTES_USED = RamUsageEstimator.shallowSizeOfInstance(VersionValue.class);
3231

3332
private final long version;
34-
private final Translog.Location translogLocation;
3533

36-
public VersionValue(long version, Translog.Location translogLocation) {
34+
public VersionValue(long version) {
3735
this.version = version;
38-
this.translogLocation = translogLocation;
3936
}
4037

4138
public long time() {
@@ -50,13 +47,10 @@ public boolean delete() {
5047
return false;
5148
}
5249

53-
public Translog.Location translogLocation() {
54-
return this.translogLocation;
55-
}
5650

5751
@Override
5852
public long ramBytesUsed() {
59-
return BASE_RAM_BYTES_USED + (translogLocation != null ? translogLocation.ramBytesUsed() : 0);
53+
return BASE_RAM_BYTES_USED;
6054
}
6155

6256
@Override

core/src/main/java/org/elasticsearch/index/get/ShardGetService.java

Lines changed: 1 addition & 130 deletions
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,6 @@ public GetStats stats() {
8585
return new GetStats(existsMetric.count(), TimeUnit.NANOSECONDS.toMillis(existsMetric.sum()), missingMetric.count(), TimeUnit.NANOSECONDS.toMillis(missingMetric.sum()), currentMetric.count());
8686
}
8787

88-
8988
public GetResult get(String type, String id, String[] gFields, boolean realtime, long version, VersionType versionType, FetchSourceContext fetchSourceContext, boolean ignoreErrorsOnGeneratedFields) {
9089
currentMetric.inc();
9190
try {
@@ -182,140 +181,12 @@ private GetResult innerGet(String type, String id, String[] gFields, boolean rea
182181

183182
try {
184183
// break between having loaded it from translog (so we only have _source), and having a document to load
185-
if (get.docIdAndVersion() != null) {
186-
return innerGetLoadFromStoredFields(type, id, gFields, fetchSourceContext, get, mapperService);
187-
} else {
188-
Translog.Source source = get.source();
189-
190-
Map<String, GetField> fields = null;
191-
SearchLookup searchLookup = null;
192-
193-
// we can only load scripts that can run against the source
194-
Set<String> neededFields = new HashSet<>();
195-
// add meta fields
196-
neededFields.add(RoutingFieldMapper.NAME);
197-
DocumentMapper docMapper = mapperService.documentMapper(type);
198-
if (docMapper.parentFieldMapper().active()) {
199-
neededFields.add(ParentFieldMapper.NAME);
200-
}
201-
if (docMapper.timestampFieldMapper().enabled()) {
202-
neededFields.add(TimestampFieldMapper.NAME);
203-
}
204-
if (docMapper.TTLFieldMapper().enabled()) {
205-
neededFields.add(TTLFieldMapper.NAME);
206-
}
207-
// add requested fields
208-
if (gFields != null) {
209-
neededFields.addAll(Arrays.asList(gFields));
210-
}
211-
for (String field : neededFields) {
212-
if (SourceFieldMapper.NAME.equals(field)) {
213-
// dealt with when normalizing fetchSourceContext.
214-
continue;
215-
}
216-
Object value = null;
217-
if (field.equals(RoutingFieldMapper.NAME)) {
218-
value = source.routing;
219-
} else if (field.equals(ParentFieldMapper.NAME) && docMapper.parentFieldMapper().active()) {
220-
value = source.parent;
221-
} else if (field.equals(TimestampFieldMapper.NAME) && docMapper.timestampFieldMapper().enabled()) {
222-
value = source.timestamp;
223-
} else if (field.equals(TTLFieldMapper.NAME) && docMapper.TTLFieldMapper().enabled()) {
224-
// Call value for search with timestamp + ttl here to display the live remaining ttl value and be consistent with the search result display
225-
if (source.ttl > 0) {
226-
value = docMapper.TTLFieldMapper().valueForSearch(source.timestamp + source.ttl);
227-
}
228-
} else {
229-
if (searchLookup == null) {
230-
searchLookup = new SearchLookup(mapperService, null, new String[]{type});
231-
searchLookup.source().setSource(source.source);
232-
}
233-
234-
FieldMapper fieldMapper = docMapper.mappers().smartNameFieldMapper(field);
235-
if (fieldMapper == null) {
236-
if (docMapper.objectMappers().get(field) != null) {
237-
// Only fail if we know it is a object field, missing paths / fields shouldn't fail.
238-
throw new IllegalArgumentException("field [" + field + "] isn't a leaf field");
239-
}
240-
} else if (shouldGetFromSource(ignoreErrorsOnGeneratedFields, docMapper, fieldMapper)) {
241-
List<Object> values = searchLookup.source().extractRawValues(field);
242-
if (!values.isEmpty()) {
243-
value = values;
244-
}
245-
246-
}
247-
}
248-
if (value != null) {
249-
if (fields == null) {
250-
fields = new HashMap<>(2);
251-
}
252-
if (value instanceof List) {
253-
fields.put(field, new GetField(field, (List) value));
254-
} else {
255-
fields.put(field, new GetField(field, Collections.singletonList(value)));
256-
}
257-
}
258-
}
259-
260-
// deal with source, but only if it's enabled (we always have it from the translog)
261-
BytesReference sourceToBeReturned = null;
262-
SourceFieldMapper sourceFieldMapper = docMapper.sourceMapper();
263-
if (fetchSourceContext.fetchSource() && sourceFieldMapper.enabled()) {
264-
265-
sourceToBeReturned = source.source;
266-
267-
// Cater for source excludes/includes at the cost of performance
268-
// We must first apply the field mapper filtering to make sure we get correct results
269-
// in the case that the fetchSourceContext white lists something that's not included by the field mapper
270-
271-
boolean sourceFieldFiltering = sourceFieldMapper.includes().length > 0 || sourceFieldMapper.excludes().length > 0;
272-
boolean sourceFetchFiltering = fetchSourceContext.includes().length > 0 || fetchSourceContext.excludes().length > 0;
273-
if (sourceFieldFiltering || sourceFetchFiltering) {
274-
// TODO: The source might parsed and available in the sourceLookup but that one uses unordered maps so different. Do we care?
275-
Tuple<XContentType, Map<String, Object>> typeMapTuple = XContentHelper.convertToMap(source.source, true);
276-
XContentType sourceContentType = typeMapTuple.v1();
277-
Map<String, Object> sourceAsMap = typeMapTuple.v2();
278-
if (sourceFieldFiltering) {
279-
sourceAsMap = XContentMapValues.filter(sourceAsMap, sourceFieldMapper.includes(), sourceFieldMapper.excludes());
280-
}
281-
if (sourceFetchFiltering) {
282-
sourceAsMap = XContentMapValues.filter(sourceAsMap, fetchSourceContext.includes(), fetchSourceContext.excludes());
283-
}
284-
try {
285-
sourceToBeReturned = XContentFactory.contentBuilder(sourceContentType).map(sourceAsMap).bytes();
286-
} catch (IOException e) {
287-
throw new ElasticsearchException("Failed to get type [" + type + "] and id [" + id + "] with includes/excludes set", e);
288-
}
289-
}
290-
}
291-
292-
return new GetResult(shardId.getIndexName(), type, id, get.version(), get.exists(), sourceToBeReturned, fields);
293-
}
184+
return innerGetLoadFromStoredFields(type, id, gFields, fetchSourceContext, get, mapperService);
294185
} finally {
295186
get.release();
296187
}
297188
}
298189

299-
protected boolean shouldGetFromSource(boolean ignoreErrorsOnGeneratedFields, DocumentMapper docMapper, FieldMapper fieldMapper) {
300-
if (!fieldMapper.isGenerated()) {
301-
//if the field is always there we check if either source mapper is enabled, in which case we get the field
302-
// from source, or, if the field is stored, in which case we have to get if from source here also (we are in the translog phase, doc not indexed yet, we annot access the stored fields)
303-
return docMapper.sourceMapper().enabled() || fieldMapper.fieldType().stored();
304-
} else {
305-
if (!fieldMapper.fieldType().stored()) {
306-
//if it is not stored, user will not get the generated field back
307-
return false;
308-
} else {
309-
if (ignoreErrorsOnGeneratedFields) {
310-
return false;
311-
} else {
312-
throw new ElasticsearchException("Cannot access field " + fieldMapper.name() + " from transaction log. You can only get this field after refresh() has been called.");
313-
}
314-
}
315-
316-
}
317-
}
318-
319190
private GetResult innerGetLoadFromStoredFields(String type, String id, String[] gFields, FetchSourceContext fetchSourceContext, Engine.GetResult get, MapperService mapperService) {
320191
Map<String, GetField> fields = null;
321192
BytesReference source = null;

core/src/main/java/org/elasticsearch/index/termvectors/TermVectorsService.java

Lines changed: 3 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -82,16 +82,9 @@ static TermVectorsResponse getTermVectors(IndexShard indexShard, TermVectorsRequ
8282
Engine.GetResult get = indexShard.get(new Engine.Get(request.realtime(), uidTerm).version(request.version()).versionType(request.versionType()));
8383

8484
Fields termVectorsByField = null;
85-
boolean docFromTranslog = get.source() != null;
8685
AggregatedDfs dfs = null;
8786
TermVectorsFilter termVectorsFilter = null;
8887

89-
/* fetched from translog is treated as an artificial document */
90-
if (docFromTranslog) {
91-
request.doc(get.source().source, false);
92-
termVectorsResponse.setDocVersion(get.version());
93-
}
94-
9588
/* handle potential wildcards in fields */
9689
if (request.selectedFields() != null) {
9790
handleFieldWildcards(indexShard, request);
@@ -103,12 +96,12 @@ static TermVectorsResponse getTermVectors(IndexShard indexShard, TermVectorsRequ
10396
Versions.DocIdAndVersion docIdAndVersion = get.docIdAndVersion();
10497
/* from an artificial document */
10598
if (request.doc() != null) {
106-
termVectorsByField = generateTermVectorsFromDoc(indexShard, request, !docFromTranslog);
99+
termVectorsByField = generateTermVectorsFromDoc(indexShard, request);
107100
// if no document indexed in shard, take the queried document itself for stats
108101
if (topLevelFields == null) {
109102
topLevelFields = termVectorsByField;
110103
}
111-
termVectorsResponse.setArtificial(!docFromTranslog);
104+
termVectorsResponse.setArtificial(true);
112105
termVectorsResponse.setExists(true);
113106
}
114107
/* or from an existing document */
@@ -252,7 +245,7 @@ private static Fields generateTermVectors(IndexShard indexShard, Collection<GetF
252245
return MultiFields.getFields(index.createSearcher().getIndexReader());
253246
}
254247

255-
private static Fields generateTermVectorsFromDoc(IndexShard indexShard, TermVectorsRequest request, boolean doAllFields) throws IOException {
248+
private static Fields generateTermVectorsFromDoc(IndexShard indexShard, TermVectorsRequest request) throws IOException {
256249
// parse the document, at the moment we do update the mapping, just like percolate
257250
ParsedDocument parsedDocument = parseDocument(indexShard, indexShard.shardId().getIndexName(), request.type(), request.doc());
258251

@@ -265,9 +258,6 @@ private static Fields generateTermVectorsFromDoc(IndexShard indexShard, TermVect
265258
if (!isValidField(fieldType)) {
266259
continue;
267260
}
268-
if (request.selectedFields() == null && !doAllFields && !fieldType.storeTermVectors()) {
269-
continue;
270-
}
271261
if (request.selectedFields() != null && !request.selectedFields().contains(field.name())) {
272262
continue;
273263
}

0 commit comments

Comments
 (0)