diff --git a/server/src/main/java/org/elasticsearch/action/explain/TransportExplainAction.java b/server/src/main/java/org/elasticsearch/action/explain/TransportExplainAction.java index 5b20b848f0b04..18c1ea41e95b9 100644 --- a/server/src/main/java/org/elasticsearch/action/explain/TransportExplainAction.java +++ b/server/src/main/java/org/elasticsearch/action/explain/TransportExplainAction.java @@ -112,13 +112,13 @@ protected ExplainResponse shardOperation(ExplainRequest request, ShardId shardId if (uidTerm == null) { return new ExplainResponse(shardId.getIndexName(), request.type(), request.id(), false); } - result = context.indexShard().get(new Engine.Get(false, request.type(), request.id(), uidTerm)); + result = context.indexShard().get(new Engine.Get(false, false, request.type(), request.id(), uidTerm)); if (!result.exists()) { return new ExplainResponse(shardId.getIndexName(), request.type(), request.id(), false); } context.parsedQuery(context.getQueryShardContext().toQuery(request.query())); context.preProcess(true); - int topLevelDocId = result.docIdAndVersion().docId + result.docIdAndVersion().context.docBase; + int topLevelDocId = result.docIdAndVersion().docId + result.docIdAndVersion().docBase; Explanation explanation = context.searcher().explain(context.query(), topLevelDocId); for (RescoreContext ctx : context.rescore()) { Rescorer rescorer = ctx.rescorer(); diff --git a/server/src/main/java/org/elasticsearch/action/update/UpdateHelper.java b/server/src/main/java/org/elasticsearch/action/update/UpdateHelper.java index 4ee49f2407b5d..ab10aa710cce6 100644 --- a/server/src/main/java/org/elasticsearch/action/update/UpdateHelper.java +++ b/server/src/main/java/org/elasticsearch/action/update/UpdateHelper.java @@ -47,7 +47,6 @@ import org.elasticsearch.script.ExecutableScript; import org.elasticsearch.script.Script; import org.elasticsearch.script.ScriptService; -import org.elasticsearch.search.fetch.subphase.FetchSourceContext; import org.elasticsearch.search.lookup.SourceLookup; import java.io.IOException; @@ -71,9 +70,8 @@ public UpdateHelper(Settings settings, ScriptService scriptService) { * Prepares an update request by converting it into an index or delete request or an update response (no action). */ public Result prepare(UpdateRequest request, IndexShard indexShard, LongSupplier nowInMillis) { - final GetResult getResult = indexShard.getService().get(request.type(), request.id(), - new String[]{RoutingFieldMapper.NAME, ParentFieldMapper.NAME}, - true, request.version(), request.versionType(), FetchSourceContext.FETCH_SOURCE); + final GetResult getResult = indexShard.getService().getForUpdate(request.type(), request.id(), request.version(), + request.versionType()); return prepare(indexShard.shardId(), request, getResult, nowInMillis); } diff --git a/server/src/main/java/org/elasticsearch/common/lucene/uid/PerThreadIDVersionAndSeqNoLookup.java b/server/src/main/java/org/elasticsearch/common/lucene/uid/PerThreadIDVersionAndSeqNoLookup.java index f8ccd827019a4..38fcdfe5f1b62 100644 --- a/server/src/main/java/org/elasticsearch/common/lucene/uid/PerThreadIDVersionAndSeqNoLookup.java +++ b/server/src/main/java/org/elasticsearch/common/lucene/uid/PerThreadIDVersionAndSeqNoLookup.java @@ -100,7 +100,7 @@ public DocIdAndVersion lookupVersion(BytesRef id, LeafReaderContext context) if (versions.advanceExact(docID) == false) { throw new IllegalArgumentException("Document [" + docID + "] misses the [" + VersionFieldMapper.NAME + "] field"); } - return new DocIdAndVersion(docID, versions.longValue(), context); + return new DocIdAndVersion(docID, versions.longValue(), context.reader(), context.docBase); } else { return null; } diff --git a/server/src/main/java/org/elasticsearch/common/lucene/uid/VersionsAndSeqNoResolver.java b/server/src/main/java/org/elasticsearch/common/lucene/uid/VersionsAndSeqNoResolver.java index 126e4dee51cc2..9db7e3716d51a 100644 --- a/server/src/main/java/org/elasticsearch/common/lucene/uid/VersionsAndSeqNoResolver.java +++ b/server/src/main/java/org/elasticsearch/common/lucene/uid/VersionsAndSeqNoResolver.java @@ -20,6 +20,7 @@ package org.elasticsearch.common.lucene.uid; import org.apache.lucene.index.IndexReader; +import org.apache.lucene.index.LeafReader; import org.apache.lucene.index.LeafReaderContext; import org.apache.lucene.index.NumericDocValues; import org.apache.lucene.index.Term; @@ -97,12 +98,14 @@ private VersionsAndSeqNoResolver() { public static class DocIdAndVersion { public final int docId; public final long version; - public final LeafReaderContext context; + public final LeafReader reader; + public final int docBase; - DocIdAndVersion(int docId, long version, LeafReaderContext context) { + public DocIdAndVersion(int docId, long version, LeafReader reader, int docBase) { this.docId = docId; this.version = version; - this.context = context; + this.reader = reader; + this.docBase = docBase; } } diff --git a/server/src/main/java/org/elasticsearch/index/engine/Engine.java b/server/src/main/java/org/elasticsearch/index/engine/Engine.java index 1ca4468539da1..6cc8c4197dcd5 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/Engine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/Engine.java @@ -1232,14 +1232,16 @@ public static class Get { private final boolean realtime; private final Term uid; private final String type, id; + private final boolean readFromTranslog; private long version = Versions.MATCH_ANY; private VersionType versionType = VersionType.INTERNAL; - public Get(boolean realtime, String type, String id, Term uid) { + public Get(boolean realtime, boolean readFromTranslog, String type, String id, Term uid) { this.realtime = realtime; this.type = type; this.id = id; this.uid = uid; + this.readFromTranslog = readFromTranslog; } public boolean realtime() { @@ -1275,6 +1277,10 @@ public Get versionType(VersionType versionType) { this.versionType = versionType; return this; } + + public boolean isReadFromTranslog() { + return readFromTranslog; + } } public static class GetResult implements Releasable { diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 0fda2f04ac5a4..864385667f5fe 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -78,6 +78,7 @@ import org.elasticsearch.threadpool.ThreadPool; import java.io.IOException; +import java.io.UncheckedIOException; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -145,6 +146,7 @@ public class InternalEngine extends Engine { * being indexed/deleted. */ private final AtomicLong writingBytes = new AtomicLong(); + private final AtomicBoolean trackTranslogLocation = new AtomicBoolean(false); @Nullable private final String historyUUID; @@ -558,6 +560,27 @@ public GetResult get(Get get, BiFunction search throw new VersionConflictEngineException(shardId, get.type(), get.id(), get.versionType().explainConflictForReads(versionValue.version, get.version())); } + if (get.isReadFromTranslog()) { + // this is only used for updates - API _GET calls will always read form a reader for consistency + // the update call doesn't need the consistency since it's source only + _parent but parent can go away in 7.0 + if (versionValue.getLocation() != null) { + try { + Translog.Operation operation = translog.readOperation(versionValue.getLocation()); + if (operation != null) { + // in the case of a already pruned translog generation we might get null here - yet very unlikely + TranslogLeafReader reader = new TranslogLeafReader((Translog.Index) operation, engineConfig + .getIndexSettings().getIndexVersionCreated()); + return new GetResult(new Searcher("realtime_get", new IndexSearcher(reader)), + new VersionsAndSeqNoResolver.DocIdAndVersion(0, ((Translog.Index) operation).version(), reader, 0)); + } + } catch (IOException e) { + maybeFailEngine("realtime_get", e); // lets check if the translog has failed with a tragic event + throw new EngineException(shardId, "failed to read operation from translog", e); + } + } else { + trackTranslogLocation.set(true); + } + } refresh("realtime_get", SearcherScope.INTERNAL); } scope = SearcherScope.INTERNAL; @@ -790,6 +813,10 @@ public IndexResult index(Index index) throws IOException { } indexResult.setTranslogLocation(location); } + if (plan.indexIntoLucene && indexResult.hasFailure() == false) { + versionMap.maybePutUnderLock(index.uid().bytes(), + getVersionValue(plan.versionForIndexing, plan.seqNoForIndexing, index.primaryTerm(), indexResult.getTranslogLocation())); + } if (indexResult.getSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO) { localCheckpointTracker.markSeqNoAsCompleted(indexResult.getSeqNo()); } @@ -916,8 +943,6 @@ private IndexResult indexIntoLucene(Index index, IndexingStrategy plan) assert assertDocDoesNotExist(index, canOptimizeAddDocument(index) == false); index(index.docs(), indexWriter); } - versionMap.maybePutUnderLock(index.uid().bytes(), - new VersionValue(plan.versionForIndexing, plan.seqNoForIndexing, index.primaryTerm())); return new IndexResult(plan.versionForIndexing, plan.seqNoForIndexing, plan.currentNotFoundOrDeleted); } catch (Exception ex) { if (indexWriter.getTragicException() == null) { @@ -941,6 +966,13 @@ private IndexResult indexIntoLucene(Index index, IndexingStrategy plan) } } + private VersionValue getVersionValue(long version, long seqNo, long term, Translog.Location location) { + if (location != null && trackTranslogLocation.get()) { + return new TranslogVersionValue(location, version, seqNo, term); + } + return new VersionValue(version, seqNo, term); + } + /** * returns true if the indexing operation may have already be processed by this engine. * Note that it is OK to rarely return true even if this is not the case. However a `false` diff --git a/server/src/main/java/org/elasticsearch/index/engine/TranslogLeafReader.java b/server/src/main/java/org/elasticsearch/index/engine/TranslogLeafReader.java new file mode 100644 index 0000000000000..628bfd4826935 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/index/engine/TranslogLeafReader.java @@ -0,0 +1,237 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.index.engine; + +import org.apache.lucene.index.BinaryDocValues; +import org.apache.lucene.index.DocValuesType; +import org.apache.lucene.index.FieldInfo; +import org.apache.lucene.index.FieldInfos; +import org.apache.lucene.index.Fields; +import org.apache.lucene.index.IndexOptions; +import org.apache.lucene.index.LeafMetaData; +import org.apache.lucene.index.LeafReader; +import org.apache.lucene.index.NumericDocValues; +import org.apache.lucene.index.PointValues; +import org.apache.lucene.index.SortedDocValues; +import org.apache.lucene.index.SortedNumericDocValues; +import org.apache.lucene.index.SortedSetDocValues; +import org.apache.lucene.index.StoredFieldVisitor; +import org.apache.lucene.index.Terms; +import org.apache.lucene.util.Bits; +import org.apache.lucene.util.BytesRef; +import org.elasticsearch.Version; +import org.elasticsearch.index.fielddata.AbstractSortedDocValues; +import org.elasticsearch.index.fielddata.AbstractSortedSetDocValues; +import org.elasticsearch.index.mapper.IdFieldMapper; +import org.elasticsearch.index.mapper.ParentFieldMapper; +import org.elasticsearch.index.mapper.RoutingFieldMapper; +import org.elasticsearch.index.mapper.SourceFieldMapper; +import org.elasticsearch.index.mapper.Uid; +import org.elasticsearch.index.mapper.UidFieldMapper; +import org.elasticsearch.index.translog.Translog; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.Collections; + +/** + * Internal class that mocks a single doc read from the transaction log as a leaf reader. + */ +final class TranslogLeafReader extends LeafReader { + + private final Translog.Index operation; + private static final FieldInfo FAKE_SOURCE_FIELD + = new FieldInfo(SourceFieldMapper.NAME, 1, false, false, false, IndexOptions.NONE, DocValuesType.NONE, -1, Collections.emptyMap(), + 0,0); + private static final FieldInfo FAKE_ROUTING_FIELD + = new FieldInfo(RoutingFieldMapper.NAME, 2, false, false, false, IndexOptions.NONE, DocValuesType.NONE, -1, Collections.emptyMap(), + 0,0); + private static final FieldInfo FAKE_ID_FIELD + = new FieldInfo(IdFieldMapper.NAME, 3, false, false, false, IndexOptions.NONE, DocValuesType.NONE, -1, Collections.emptyMap(), + 0,0); + private static final FieldInfo FAKE_UID_FIELD + = new FieldInfo(UidFieldMapper.NAME, 4, false, false, false, IndexOptions.NONE, DocValuesType.NONE, -1, Collections.emptyMap(), + 0,0); + private final Version indexVersionCreated; + + TranslogLeafReader(Translog.Index operation, Version indexVersionCreated) { + this.operation = operation; + this.indexVersionCreated = indexVersionCreated; + } + @Override + public CacheHelper getCoreCacheHelper() { + throw new UnsupportedOperationException(); + } + + @Override + public Terms terms(String field) { + throw new UnsupportedOperationException(); + } + + @Override + public NumericDocValues getNumericDocValues(String field) { + throw new UnsupportedOperationException(); + } + + @Override + public BinaryDocValues getBinaryDocValues(String field) { + throw new UnsupportedOperationException(); + } + + @Override + public SortedDocValues getSortedDocValues(String field) { + // TODO this can be removed in 7.0 and upwards we don't support the parent field anymore + if (field.startsWith(ParentFieldMapper.NAME + "#") && operation.parent() != null) { + return new AbstractSortedDocValues() { + @Override + public int docID() { + return 0; + } + + private final BytesRef term = new BytesRef(operation.parent()); + private int ord; + @Override + public boolean advanceExact(int docID) { + if (docID != 0) { + throw new IndexOutOfBoundsException("do such doc ID: " + docID); + } + ord = 0; + return true; + } + + @Override + public int ordValue() { + return ord; + } + + @Override + public BytesRef lookupOrd(int ord) { + if (ord == 0) { + return term; + } + return null; + } + + @Override + public int getValueCount() { + return 1; + } + }; + } + if (operation.parent() == null) { + return null; + } + assert false : "unexpected field: " + field; + return null; + } + + @Override + public SortedNumericDocValues getSortedNumericDocValues(String field) { + throw new UnsupportedOperationException(); + } + + @Override + public SortedSetDocValues getSortedSetDocValues(String field) { + throw new UnsupportedOperationException(); + } + + @Override + public NumericDocValues getNormValues(String field) { + throw new UnsupportedOperationException(); + } + + @Override + public FieldInfos getFieldInfos() { + throw new UnsupportedOperationException(); + } + + @Override + public Bits getLiveDocs() { + throw new UnsupportedOperationException(); + } + + @Override + public PointValues getPointValues(String field) { + throw new UnsupportedOperationException(); + } + + @Override + public void checkIntegrity() { + + } + + @Override + public LeafMetaData getMetaData() { + throw new UnsupportedOperationException(); + } + + @Override + public Fields getTermVectors(int docID) { + throw new UnsupportedOperationException(); + } + + @Override + public int numDocs() { + return 1; + } + + @Override + public int maxDoc() { + return 1; + } + + @Override + public void document(int docID, StoredFieldVisitor visitor) throws IOException { + if (docID != 0) { + throw new IllegalArgumentException("no such doc ID " + docID); + } + if (visitor.needsField(FAKE_SOURCE_FIELD) == StoredFieldVisitor.Status.YES) { + assert operation.source().toBytesRef().offset == 0; + assert operation.source().toBytesRef().length == operation.source().toBytesRef().bytes.length; + visitor.binaryField(FAKE_SOURCE_FIELD, operation.source().toBytesRef().bytes); + } + if (operation.routing() != null && visitor.needsField(FAKE_ROUTING_FIELD) == StoredFieldVisitor.Status.YES) { + visitor.stringField(FAKE_ROUTING_FIELD, operation.routing().getBytes(StandardCharsets.UTF_8)); + } + if (visitor.needsField(FAKE_ID_FIELD) == StoredFieldVisitor.Status.YES) { + final byte[] id; + if (indexVersionCreated.onOrAfter(Version.V_6_0_0)) { + BytesRef bytesRef = Uid.encodeId(operation.id()); + id = new byte[bytesRef.length]; + System.arraycopy(bytesRef.bytes, bytesRef.offset, id, 0, bytesRef.length); + } else { // TODO this can go away in 7.0 after backport + id = operation.id().getBytes(StandardCharsets.UTF_8); + } + visitor.stringField(FAKE_ID_FIELD, id); + } + if (visitor.needsField(FAKE_UID_FIELD) == StoredFieldVisitor.Status.YES) { + visitor.stringField(FAKE_UID_FIELD, Uid.createUid(operation.type(), operation.id()).getBytes(StandardCharsets.UTF_8)); + } + } + + @Override + protected void doClose() { + + } + + @Override + public CacheHelper getReaderCacheHelper() { + throw new UnsupportedOperationException(); + } +} diff --git a/server/src/main/java/org/elasticsearch/index/engine/TranslogVersionValue.java b/server/src/main/java/org/elasticsearch/index/engine/TranslogVersionValue.java new file mode 100644 index 0000000000000..67415ea6139a6 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/index/engine/TranslogVersionValue.java @@ -0,0 +1,71 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.index.engine; + +import org.apache.lucene.util.RamUsageEstimator; +import org.elasticsearch.index.translog.Translog; + +import java.util.Objects; + +final class TranslogVersionValue extends VersionValue { + + private static final long RAM_BYTES_USED = RamUsageEstimator.shallowSizeOfInstance(TranslogVersionValue.class); + + private final Translog.Location translogLocation; + + TranslogVersionValue(Translog.Location translogLocation, long version, long seqNo, long term) { + super(version, seqNo, term); + this.translogLocation = translogLocation; + } + + @Override + public long ramBytesUsed() { + return RAM_BYTES_USED; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + if (!super.equals(o)) return false; + TranslogVersionValue that = (TranslogVersionValue) o; + return Objects.equals(translogLocation, that.translogLocation); + } + + @Override + public int hashCode() { + return Objects.hash(super.hashCode(), translogLocation); + } + + @Override + public String toString() { + return "TranslogVersionValue{" + + "version=" + version + + ", seqNo=" + seqNo + + ", term=" + term + + ", location=" + translogLocation + + '}'; + } + + @Override + public Translog.Location getLocation() { + return translogLocation; + } +} diff --git a/server/src/main/java/org/elasticsearch/index/engine/VersionValue.java b/server/src/main/java/org/elasticsearch/index/engine/VersionValue.java index e2a2614d6c102..d63306486732e 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/VersionValue.java +++ b/server/src/main/java/org/elasticsearch/index/engine/VersionValue.java @@ -21,6 +21,8 @@ import org.apache.lucene.util.Accountable; import org.apache.lucene.util.RamUsageEstimator; +import org.elasticsearch.common.Nullable; +import org.elasticsearch.index.translog.Translog; import java.util.Collection; import java.util.Collections; @@ -81,9 +83,16 @@ public int hashCode() { public String toString() { return "VersionValue{" + "version=" + version + - ", seqNo=" + seqNo + ", term=" + term + '}'; } + + /** + * Returns the translog location for this version value or null. This is optional and might not be tracked all the time. + */ + @Nullable + public Translog.Location getLocation() { + return null; + } } diff --git a/server/src/main/java/org/elasticsearch/index/get/ShardGetService.java b/server/src/main/java/org/elasticsearch/index/get/ShardGetService.java index dcd18c8f313f9..a6c8dbf53b395 100644 --- a/server/src/main/java/org/elasticsearch/index/get/ShardGetService.java +++ b/server/src/main/java/org/elasticsearch/index/get/ShardGetService.java @@ -42,6 +42,7 @@ import org.elasticsearch.index.mapper.FieldMapper; import org.elasticsearch.index.mapper.MapperService; import org.elasticsearch.index.mapper.ParentFieldMapper; +import org.elasticsearch.index.mapper.RoutingFieldMapper; import org.elasticsearch.index.mapper.SourceFieldMapper; import org.elasticsearch.index.shard.AbstractIndexShardComponent; import org.elasticsearch.index.shard.IndexShard; @@ -75,10 +76,15 @@ public GetStats stats() { } public GetResult get(String type, String id, String[] gFields, boolean realtime, long version, VersionType versionType, FetchSourceContext fetchSourceContext) { + return get(type, id, gFields, realtime, version, versionType, fetchSourceContext, false); + } + + private GetResult get(String type, String id, String[] gFields, boolean realtime, long version, VersionType versionType, + FetchSourceContext fetchSourceContext, boolean readFromTranslog) { currentMetric.inc(); try { long now = System.nanoTime(); - GetResult getResult = innerGet(type, id, gFields, realtime, version, versionType, fetchSourceContext); + GetResult getResult = innerGet(type, id, gFields, realtime, version, versionType, fetchSourceContext, readFromTranslog); if (getResult.isExists()) { existsMetric.inc(System.nanoTime() - now); @@ -91,6 +97,11 @@ public GetResult get(String type, String id, String[] gFields, boolean realtime, } } + public GetResult getForUpdate(String type, String id, long version, VersionType versionType) { + return get(type, id, new String[]{RoutingFieldMapper.NAME, ParentFieldMapper.NAME}, true, version, versionType, + FetchSourceContext.FETCH_SOURCE, true); + } + /** * Returns {@link GetResult} based on the specified {@link org.elasticsearch.index.engine.Engine.GetResult} argument. * This method basically loads specified fields for the associated document in the engineGetResult. @@ -137,7 +148,8 @@ private FetchSourceContext normalizeFetchSourceContent(@Nullable FetchSourceCont return FetchSourceContext.DO_NOT_FETCH_SOURCE; } - private GetResult innerGet(String type, String id, String[] gFields, boolean realtime, long version, VersionType versionType, FetchSourceContext fetchSourceContext) { + private GetResult innerGet(String type, String id, String[] gFields, boolean realtime, long version, VersionType versionType, + FetchSourceContext fetchSourceContext, boolean readFromTranslog) { fetchSourceContext = normalizeFetchSourceContent(fetchSourceContext, gFields); final Collection types; if (type == null || type.equals("_all")) { @@ -150,7 +162,7 @@ private GetResult innerGet(String type, String id, String[] gFields, boolean rea for (String typeX : types) { Term uidTerm = mapperService.createUidTerm(typeX, id); if (uidTerm != null) { - get = indexShard.get(new Engine.Get(realtime, typeX, id, uidTerm) + get = indexShard.get(new Engine.Get(realtime, readFromTranslog, typeX, id, uidTerm) .version(version).versionType(versionType)); if (get.exists()) { type = typeX; @@ -180,7 +192,7 @@ private GetResult innerGetLoadFromStoredFields(String type, String id, String[] FieldsVisitor fieldVisitor = buildFieldsVisitors(gFields, fetchSourceContext); if (fieldVisitor != null) { try { - docIdAndVersion.context.reader().document(docIdAndVersion.docId, fieldVisitor); + docIdAndVersion.reader.document(docIdAndVersion.docId, fieldVisitor); } catch (IOException e) { throw new ElasticsearchException("Failed to get type [" + type + "] and id [" + id + "]", e); } @@ -197,7 +209,7 @@ private GetResult innerGetLoadFromStoredFields(String type, String id, String[] DocumentMapper docMapper = mapperService.documentMapper(type); if (docMapper.parentFieldMapper().active()) { - String parentId = ParentFieldSubFetchPhase.getParentId(docMapper.parentFieldMapper(), docIdAndVersion.context.reader(), docIdAndVersion.docId); + String parentId = ParentFieldSubFetchPhase.getParentId(docMapper.parentFieldMapper(), docIdAndVersion.reader, docIdAndVersion.docId); if (fields == null) { fields = new HashMap<>(1); } diff --git a/server/src/main/java/org/elasticsearch/index/termvectors/TermVectorsService.java b/server/src/main/java/org/elasticsearch/index/termvectors/TermVectorsService.java index d527fa83501b3..573e75d78060a 100644 --- a/server/src/main/java/org/elasticsearch/index/termvectors/TermVectorsService.java +++ b/server/src/main/java/org/elasticsearch/index/termvectors/TermVectorsService.java @@ -85,7 +85,7 @@ static TermVectorsResponse getTermVectors(IndexShard indexShard, TermVectorsRequ termVectorsResponse.setExists(false); return termVectorsResponse; } - Engine.GetResult get = indexShard.get(new Engine.Get(request.realtime(), request.type(), request.id(), uidTerm) + Engine.GetResult get = indexShard.get(new Engine.Get(request.realtime(), false, request.type(), request.id(), uidTerm) .version(request.version()).versionType(request.versionType())); Fields termVectorsByField = null; @@ -114,7 +114,7 @@ static TermVectorsResponse getTermVectors(IndexShard indexShard, TermVectorsRequ /* or from an existing document */ else if (docIdAndVersion != null) { // fields with stored term vectors - termVectorsByField = docIdAndVersion.context.reader().getTermVectors(docIdAndVersion.docId); + termVectorsByField = docIdAndVersion.reader.getTermVectors(docIdAndVersion.docId); Set selectedFields = request.selectedFields(); // generate tvs for fields where analyzer is overridden if (selectedFields == null && request.perFieldAnalyzer() != null) { diff --git a/server/src/main/java/org/elasticsearch/index/translog/BaseTranslogReader.java b/server/src/main/java/org/elasticsearch/index/translog/BaseTranslogReader.java index d86c4491b63e9..14ee8ecb9b3c0 100644 --- a/server/src/main/java/org/elasticsearch/index/translog/BaseTranslogReader.java +++ b/server/src/main/java/org/elasticsearch/index/translog/BaseTranslogReader.java @@ -126,4 +126,13 @@ public Path path() { public long getLastModifiedTime() throws IOException { return Files.getLastModifiedTime(path).toMillis(); } + + /** + * Reads a single opertation from the given location. + */ + Translog.Operation read(Translog.Location location) throws IOException { + assert location.generation == this.generation : "generation mismatch expected: " + generation + " got: " + location.generation; + ByteBuffer buffer = ByteBuffer.allocate(location.size); + return read(checksummedStream(buffer, location.translogLocation, location.size, null)); + } } diff --git a/server/src/main/java/org/elasticsearch/index/translog/Translog.java b/server/src/main/java/org/elasticsearch/index/translog/Translog.java index 0043472b72f7c..62e47d08ded54 100644 --- a/server/src/main/java/org/elasticsearch/index/translog/Translog.java +++ b/server/src/main/java/org/elasticsearch/index/translog/Translog.java @@ -571,6 +571,33 @@ public Snapshot newSnapshotFromGen(long minGeneration) throws IOException { } } + /** + * Reads and returns the operation from the given location if the generation it references is still available. Otherwise + * this method will return null. + */ + public Operation readOperation(Location location) throws IOException { + try (ReleasableLock ignored = readLock.acquire()) { + ensureOpen(); + if (location.generation < getMinFileGeneration()) { + return null; + } + if (current.generation == location.generation) { + // no need to fsync here the read operation will ensure that buffers are written to disk + // if they are still in RAM and we are reading onto that position + return current.read(location); + } else { + // read backwards - it's likely we need to read on that is recent + for (int i = readers.size() - 1; i >= 0; i--) { + TranslogReader translogReader = readers.get(i); + if (translogReader.generation == location.generation) { + return translogReader.read(location); + } + } + } + } + return null; + } + public Snapshot newSnapshotFromMinSeqNo(long minSeqNo) throws IOException { try (ReleasableLock ignored = readLock.acquire()) { ensureOpen(); diff --git a/server/src/main/java/org/elasticsearch/index/translog/TranslogSnapshot.java b/server/src/main/java/org/elasticsearch/index/translog/TranslogSnapshot.java index 656772fa8169d..5f6d14e192eb8 100644 --- a/server/src/main/java/org/elasticsearch/index/translog/TranslogSnapshot.java +++ b/server/src/main/java/org/elasticsearch/index/translog/TranslogSnapshot.java @@ -104,5 +104,4 @@ public String toString() { ", reusableBuffer=" + reusableBuffer + '}'; } - } diff --git a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index 1ecb1829234ab..bba05401d4155 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -1238,7 +1238,7 @@ public void testVersionedUpdate() throws IOException { Engine.Index create = new Engine.Index(newUid(doc), doc, Versions.MATCH_DELETED); Engine.IndexResult indexResult = engine.index(create); assertThat(indexResult.getVersion(), equalTo(1L)); - try (Engine.GetResult get = engine.get(new Engine.Get(true, doc.type(), doc.id(), create.uid()), searcherFactory)) { + try (Engine.GetResult get = engine.get(new Engine.Get(true, false, doc.type(), doc.id(), create.uid()), searcherFactory)) { assertEquals(1, get.version()); } @@ -1246,7 +1246,7 @@ public void testVersionedUpdate() throws IOException { Engine.IndexResult update_1_result = engine.index(update_1); assertThat(update_1_result.getVersion(), equalTo(2L)); - try (Engine.GetResult get = engine.get(new Engine.Get(true, doc.type(), doc.id(), create.uid()), searcherFactory)) { + try (Engine.GetResult get = engine.get(new Engine.Get(true, false, doc.type(), doc.id(), create.uid()), searcherFactory)) { assertEquals(2, get.version()); } @@ -1254,7 +1254,7 @@ public void testVersionedUpdate() throws IOException { Engine.IndexResult update_2_result = engine.index(update_2); assertThat(update_2_result.getVersion(), equalTo(3L)); - try (Engine.GetResult get = engine.get(new Engine.Get(true, doc.type(), doc.id(), create.uid()), searcherFactory)) { + try (Engine.GetResult get = engine.get(new Engine.Get(true, false, doc.type(), doc.id(), create.uid()), searcherFactory)) { assertEquals(3, get.version()); } @@ -1765,7 +1765,7 @@ public void testVersioningPromotedReplica() throws IOException { assertOpsOnReplica(replicaOps, replicaEngine, true); final int opsOnPrimary = assertOpsOnPrimary(primaryOps, finalReplicaVersion, deletedOnReplica, replicaEngine); final long currentSeqNo = getSequenceID(replicaEngine, - new Engine.Get(false, "type", lastReplicaOp.uid().text(), lastReplicaOp.uid())).v1(); + new Engine.Get(false, false, "type", lastReplicaOp.uid().text(), lastReplicaOp.uid())).v1(); try (Searcher searcher = engine.acquireSearcher("test")) { final TotalHitCountCollector collector = new TotalHitCountCollector(); searcher.searcher().search(new MatchAllDocsQuery(), collector); @@ -1830,9 +1830,9 @@ class OpAndVersion { throw new AssertionError(e); } for (int op = 0; op < opsPerThread; op++) { - try (Engine.GetResult get = engine.get(new Engine.Get(true, doc.type(), doc.id(), uidTerm), searcherFactory)) { + try (Engine.GetResult get = engine.get(new Engine.Get(true, false, doc.type(), doc.id(), uidTerm), searcherFactory)) { FieldsVisitor visitor = new FieldsVisitor(true); - get.docIdAndVersion().context.reader().document(get.docIdAndVersion().docId, visitor); + get.docIdAndVersion().reader.document(get.docIdAndVersion().docId, visitor); List values = new ArrayList<>(Strings.commaDelimitedListToSet(visitor.source().utf8ToString())); String removed = op % 3 == 0 && values.size() > 0 ? values.remove(0) : null; String added = "v_" + idGenerator.incrementAndGet(); @@ -1872,9 +1872,9 @@ class OpAndVersion { assertTrue(op.added + " should not exist", exists); } - try (Engine.GetResult get = engine.get(new Engine.Get(true, doc.type(), doc.id(), uidTerm), searcherFactory)) { + try (Engine.GetResult get = engine.get(new Engine.Get(true, false, doc.type(), doc.id(), uidTerm), searcherFactory)) { FieldsVisitor visitor = new FieldsVisitor(true); - get.docIdAndVersion().context.reader().document(get.docIdAndVersion().docId, visitor); + get.docIdAndVersion().reader.document(get.docIdAndVersion().docId, visitor); List values = Arrays.asList(Strings.commaDelimitedListToStringArray(visitor.source().utf8ToString())); assertThat(currentValues, equalTo(new HashSet<>(values))); } @@ -2275,7 +2275,7 @@ public void testEnableGcDeletes() throws Exception { engine.delete(new Engine.Delete("test", "2", newUid("2"), SequenceNumbers.UNASSIGNED_SEQ_NO, 0, 10, VersionType.EXTERNAL, Engine.Operation.Origin.PRIMARY, System.nanoTime())); // Get should not find the document (we never indexed uid=2): - getResult = engine.get(new Engine.Get(true, "type", "2", newUid("2")), searcherFactory); + getResult = engine.get(new Engine.Get(true, false, "type", "2", newUid("2")), searcherFactory); assertThat(getResult.exists(), equalTo(false)); // Try to index uid=1 with a too-old version, should fail: @@ -3450,7 +3450,7 @@ public void afterRefresh(boolean didRefresh) throws IOException { } public void testSequenceIDs() throws Exception { - Tuple seqID = getSequenceID(engine, new Engine.Get(false, "type", "2", newUid("1"))); + Tuple seqID = getSequenceID(engine, new Engine.Get(false, false, "type", "2", newUid("1"))); // Non-existent doc returns no seqnum and no primary term assertThat(seqID.v1(), equalTo(SequenceNumbers.UNASSIGNED_SEQ_NO)); assertThat(seqID.v2(), equalTo(0L)); @@ -3665,7 +3665,7 @@ public void testOutOfOrderSequenceNumbersWithVersionConflict() throws IOExceptio } assertThat(engine.getLocalCheckpointTracker().getCheckpoint(), equalTo(expectedLocalCheckpoint)); - try (Engine.GetResult result = engine.get(new Engine.Get(true, "type", "2", uid), searcherFactory)) { + try (Engine.GetResult result = engine.get(new Engine.Get(true, false, "type", "2", uid), searcherFactory)) { assertThat(result.exists(), equalTo(exists)); } } @@ -4454,14 +4454,14 @@ public void testStressUpdateSameDocWhileGettingIt() throws IOException, Interrup CountDownLatch awaitStarted = new CountDownLatch(1); Thread thread = new Thread(() -> { awaitStarted.countDown(); - try (Engine.GetResult getResult = engine.get(new Engine.Get(true, doc3.type(), doc3.id(), doc3.uid()), + try (Engine.GetResult getResult = engine.get(new Engine.Get(true, false, doc3.type(), doc3.id(), doc3.uid()), engine::acquireSearcher)) { assertTrue(getResult.exists()); } }); thread.start(); awaitStarted.await(); - try (Engine.GetResult getResult = engine.get(new Engine.Get(true, doc.type(), doc.id(), doc.uid()), + try (Engine.GetResult getResult = engine.get(new Engine.Get(true, false, doc.type(), doc.id(), doc.uid()), engine::acquireSearcher)) { assertFalse(getResult.exists()); } diff --git a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index 822294a9c19f7..febf67f5b0c0a 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -1185,7 +1185,7 @@ public void testRefreshMetric() throws IOException { } long refreshCount = shard.refreshStats().getTotal(); indexDoc(shard, "test", "test"); - try (Engine.GetResult ignored = shard.get(new Engine.Get(true, "test", "test", + try (Engine.GetResult ignored = shard.get(new Engine.Get(true, false, "test", "test", new Term(IdFieldMapper.NAME, Uid.encodeId("test"))))) { assertThat(shard.refreshStats().getTotal(), equalTo(refreshCount+1)); } @@ -1832,7 +1832,7 @@ public void testSearcherWrapperIsUsed() throws IOException { indexDoc(shard, "test", "1", "{\"foobar\" : \"bar\"}"); shard.refresh("test"); - Engine.GetResult getResult = shard.get(new Engine.Get(false, "test", "1", new Term(IdFieldMapper.NAME, Uid.encodeId("1")))); + Engine.GetResult getResult = shard.get(new Engine.Get(false, false, "test", "1", new Term(IdFieldMapper.NAME, Uid.encodeId("1")))); assertTrue(getResult.exists()); assertNotNull(getResult.searcher()); getResult.release(); @@ -1866,7 +1866,7 @@ public IndexSearcher wrap(IndexSearcher searcher) throws EngineException { search = searcher.searcher().search(new TermQuery(new Term("foobar", "bar")), 10); assertEquals(search.totalHits, 1); } - getResult = newShard.get(new Engine.Get(false, "test", "1", new Term(IdFieldMapper.NAME, Uid.encodeId("1")))); + getResult = newShard.get(new Engine.Get(false, false, "test", "1", new Term(IdFieldMapper.NAME, Uid.encodeId("1")))); assertTrue(getResult.exists()); assertNotNull(getResult.searcher()); // make sure get uses the wrapped reader assertTrue(getResult.searcher().reader() instanceof FieldMaskingReader); diff --git a/server/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java b/server/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java index 0609477dda8e5..1bd98cd1c9e69 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java @@ -323,12 +323,12 @@ public void testLotsOfThreads() throws Exception { } listener.assertNoError(); - Engine.Get get = new Engine.Get(false, "test", threadId, new Term(IdFieldMapper.NAME, threadId)); + Engine.Get get = new Engine.Get(false, false, "test", threadId, new Term(IdFieldMapper.NAME, threadId)); try (Engine.GetResult getResult = engine.get(get, engine::acquireSearcher)) { assertTrue("document not found", getResult.exists()); assertEquals(iteration, getResult.version()); SingleFieldsVisitor visitor = new SingleFieldsVisitor("test"); - getResult.docIdAndVersion().context.reader().document(getResult.docIdAndVersion().docId, visitor); + getResult.docIdAndVersion().reader.document(getResult.docIdAndVersion().docId, visitor); assertEquals(Arrays.asList(testFieldValue), visitor.fields().get("test")); } } catch (Exception t) { diff --git a/server/src/test/java/org/elasticsearch/index/shard/ShardGetServiceTests.java b/server/src/test/java/org/elasticsearch/index/shard/ShardGetServiceTests.java new file mode 100644 index 0000000000000..c626f2d18522c --- /dev/null +++ b/server/src/test/java/org/elasticsearch/index/shard/ShardGetServiceTests.java @@ -0,0 +1,132 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.index.shard; + +import org.elasticsearch.Version; +import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.index.VersionType; +import org.elasticsearch.index.engine.Engine; +import org.elasticsearch.index.get.GetResult; +import org.elasticsearch.index.mapper.ParentFieldMapper; +import org.elasticsearch.index.mapper.RoutingFieldMapper; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; + +public class ShardGetServiceTests extends IndexShardTestCase { + + public void testGetForUpdate() throws IOException { + Settings settings = Settings.builder().put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT) + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1) + .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) + + .build(); + IndexMetaData metaData = IndexMetaData.builder("test") + .putMapping("test", "{ \"properties\": { \"foo\": { \"type\": \"text\"}}}") + .settings(settings) + .primaryTerm(0, 1).build(); + IndexShard primary = newShard(new ShardId(metaData.getIndex(), 0), true, "n1", metaData, null); + recoverShardFromStore(primary); + Engine.IndexResult test = indexDoc(primary, "test", "0", "{\"foo\" : \"bar\"}"); + assertTrue(primary.getEngine().refreshNeeded()); + GetResult testGet = primary.getService().getForUpdate("test", "0", test.getVersion(), VersionType.INTERNAL); + assertFalse(testGet.getFields().containsKey(RoutingFieldMapper.NAME)); + assertEquals(new String(testGet.source(), StandardCharsets.UTF_8), "{\"foo\" : \"bar\"}"); + try (Engine.Searcher searcher = primary.getEngine().acquireSearcher("test", Engine.SearcherScope.INTERNAL)) { + assertEquals(searcher.reader().maxDoc(), 1); // we refreshed + } + + Engine.IndexResult test1 = indexDoc(primary, "test", "1", "{\"foo\" : \"baz\"}", XContentType.JSON, "foobar", null); + assertTrue(primary.getEngine().refreshNeeded()); + GetResult testGet1 = primary.getService().getForUpdate("test", "1", test1.getVersion(), VersionType.INTERNAL); + assertEquals(new String(testGet1.source(), StandardCharsets.UTF_8), "{\"foo\" : \"baz\"}"); + assertTrue(testGet1.getFields().containsKey(RoutingFieldMapper.NAME)); + assertFalse(testGet1.getFields().containsKey(ParentFieldMapper.NAME)); + assertEquals("foobar", testGet1.getFields().get(RoutingFieldMapper.NAME).getValue()); + try (Engine.Searcher searcher = primary.getEngine().acquireSearcher("test", Engine.SearcherScope.INTERNAL)) { + assertEquals(searcher.reader().maxDoc(), 1); // we read from the translog + } + primary.getEngine().refresh("test"); + try (Engine.Searcher searcher = primary.getEngine().acquireSearcher("test", Engine.SearcherScope.INTERNAL)) { + assertEquals(searcher.reader().maxDoc(), 2); + } + + // now again from the reader + test1 = indexDoc(primary, "test", "1", "{\"foo\" : \"baz\"}", XContentType.JSON, "foobar", null); + assertTrue(primary.getEngine().refreshNeeded()); + testGet1 = primary.getService().getForUpdate("test", "1", test1.getVersion(), VersionType.INTERNAL); + assertEquals(new String(testGet1.source(), StandardCharsets.UTF_8), "{\"foo\" : \"baz\"}"); + assertTrue(testGet1.getFields().containsKey(RoutingFieldMapper.NAME)); + assertFalse(testGet1.getFields().containsKey(ParentFieldMapper.NAME)); + assertEquals("foobar", testGet1.getFields().get(RoutingFieldMapper.NAME).getValue()); + + closeShards(primary); + } + + public void testGetForUpdateWithParentField() throws IOException { + Settings settings = Settings.builder().put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT) + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1) + .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) + .put("index.version.created", Version.V_5_6_0) // for parent field mapper + .build(); + IndexMetaData metaData = IndexMetaData.builder("test") + .putMapping("parent", "{ \"properties\": {}}") + .putMapping("test", "{ \"properties\": { \"foo\": { \"type\": \"text\"}}, \"_parent\": { \"type\": \"parent\"}}") + .settings(settings) + .primaryTerm(0, 1).build(); + IndexShard primary = newShard(new ShardId(metaData.getIndex(), 0), true, "n1", metaData, null); + recoverShardFromStore(primary); + Engine.IndexResult test = indexDoc(primary, "test", "0", "{\"foo\" : \"bar\"}"); + assertTrue(primary.getEngine().refreshNeeded()); + GetResult testGet = primary.getService().getForUpdate("test", "0", test.getVersion(), VersionType.INTERNAL); + assertFalse(testGet.getFields().containsKey(RoutingFieldMapper.NAME)); + assertEquals(new String(testGet.source(), StandardCharsets.UTF_8), "{\"foo\" : \"bar\"}"); + try (Engine.Searcher searcher = primary.getEngine().acquireSearcher("test", Engine.SearcherScope.INTERNAL)) { + assertEquals(searcher.reader().maxDoc(), 1); // we refreshed + } + + Engine.IndexResult test1 = indexDoc(primary, "test", "1", "{\"foo\" : \"baz\"}", XContentType.JSON, null, "foobar"); + assertTrue(primary.getEngine().refreshNeeded()); + GetResult testGet1 = primary.getService().getForUpdate("test", "1", test1.getVersion(), VersionType.INTERNAL); + assertEquals(new String(testGet1.source(), StandardCharsets.UTF_8), "{\"foo\" : \"baz\"}"); + assertTrue(testGet1.getFields().containsKey(ParentFieldMapper.NAME)); + assertFalse(testGet1.getFields().containsKey(RoutingFieldMapper.NAME)); + assertEquals("foobar", testGet1.getFields().get(ParentFieldMapper.NAME).getValue()); + try (Engine.Searcher searcher = primary.getEngine().acquireSearcher("test", Engine.SearcherScope.INTERNAL)) { + assertEquals(searcher.reader().maxDoc(), 1); // we read from the translog + } + primary.getEngine().refresh("test"); + try (Engine.Searcher searcher = primary.getEngine().acquireSearcher("test", Engine.SearcherScope.INTERNAL)) { + assertEquals(searcher.reader().maxDoc(), 2); + } + + // now again from the reader + test1 = indexDoc(primary, "test", "1", "{\"foo\" : \"baz\"}", XContentType.JSON, null, "foobar"); + assertTrue(primary.getEngine().refreshNeeded()); + testGet1 = primary.getService().getForUpdate("test", "1", test1.getVersion(), VersionType.INTERNAL); + assertEquals(new String(testGet1.source(), StandardCharsets.UTF_8), "{\"foo\" : \"baz\"}"); + assertTrue(testGet1.getFields().containsKey(ParentFieldMapper.NAME)); + assertFalse(testGet1.getFields().containsKey(RoutingFieldMapper.NAME)); + assertEquals("foobar", testGet1.getFields().get(ParentFieldMapper.NAME).getValue()); + + closeShards(primary); + } +} diff --git a/server/src/test/java/org/elasticsearch/index/translog/TranslogTests.java b/server/src/test/java/org/elasticsearch/index/translog/TranslogTests.java index 2317d8fb0d8bf..61e5cdcfd953a 100644 --- a/server/src/test/java/org/elasticsearch/index/translog/TranslogTests.java +++ b/server/src/test/java/org/elasticsearch/index/translog/TranslogTests.java @@ -235,9 +235,9 @@ private TranslogConfig getTranslogConfig(final Path path, final Settings setting return new TranslogConfig(shardId, path, indexSettings, NON_RECYCLING_INSTANCE, bufferSize); } - private void addToTranslogAndList(Translog translog, List list, Translog.Operation op) throws IOException { + private Location addToTranslogAndList(Translog translog, List list, Translog.Operation op) throws IOException { list.add(op); - translog.add(op); + return translog.add(op); } public void testIdParsingFromFile() { @@ -579,6 +579,19 @@ public void testSnapshot() throws IOException { } } + public void testReadLocation() throws IOException { + ArrayList ops = new ArrayList<>(); + ArrayList locs = new ArrayList<>(); + locs.add(addToTranslogAndList(translog, ops, new Translog.Index("test", "1", 0, new byte[]{1}))); + locs.add(addToTranslogAndList(translog, ops, new Translog.Index("test", "2", 1, new byte[]{1}))); + locs.add(addToTranslogAndList(translog, ops, new Translog.Index("test", "3", 2, new byte[]{1}))); + int i = 0; + for (Translog.Operation op : ops) { + assertEquals(op, translog.readOperation(locs.get(i++))); + } + assertNull(translog.readOperation(new Location(100, 0, 0))); + } + public void testSnapshotWithNewTranslog() throws IOException { List toClose = new ArrayList<>(); try { @@ -689,6 +702,9 @@ public void testConcurrentWritesWithVaryingSize() throws Throwable { Translog.Operation op = snapshot.next(); assertNotNull(op); Translog.Operation expectedOp = locationOperation.operation; + if (randomBoolean()) { + assertEquals(expectedOp, translog.readOperation(locationOperation.location)); + } assertEquals(expectedOp.opType(), op.opType()); switch (op.opType()) { case INDEX: @@ -1643,6 +1659,9 @@ public void run() { Translog.Location loc = add(op); writtenOperations.add(new LocationOperation(op, loc)); + if (rarely()) { // lets verify we can concurrently read this + assertEquals(op, translog.readOperation(loc)); + } afterAdd(); } } catch (Exception t) { diff --git a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java index 8a9ad3d2a76e1..667adf9d990cc 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java @@ -471,7 +471,7 @@ protected Term newUid(ParsedDocument doc) { } protected Engine.Get newGet(boolean realtime, ParsedDocument doc) { - return new Engine.Get(realtime, doc.type(), doc.id(), newUid(doc)); + return new Engine.Get(realtime, false, doc.type(), doc.id(), newUid(doc)); } protected Engine.Index indexForDoc(ParsedDocument doc) { diff --git a/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java index 6d6cc36d78b1b..2656855b9fd15 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java @@ -548,12 +548,15 @@ protected Engine.IndexResult indexDoc(IndexShard shard, String type, String id) } protected Engine.IndexResult indexDoc(IndexShard shard, String type, String id, String source) throws IOException { - return indexDoc(shard, type, id, source, XContentType.JSON); + return indexDoc(shard, type, id, source, XContentType.JSON, null, null); } - protected Engine.IndexResult indexDoc(IndexShard shard, String type, String id, String source, XContentType xContentType) + protected Engine.IndexResult indexDoc(IndexShard shard, String type, String id, String source, XContentType xContentType, + String routing, String parentId) throws IOException { SourceToParse sourceToParse = SourceToParse.source(shard.shardId().getIndexName(), type, id, new BytesArray(source), xContentType); + sourceToParse.routing(routing); + sourceToParse.parent(parentId); if (shard.routingEntry().primary()) { final Engine.IndexResult result = shard.applyIndexOperationOnPrimary(Versions.MATCH_ANY, VersionType.INTERNAL, sourceToParse, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false, getMappingUpdater(shard, type));