diff --git a/server/src/main/java/org/elasticsearch/index/engine/Engine.java b/server/src/main/java/org/elasticsearch/index/engine/Engine.java index 85c9c77bfb7a5..8901b1ded7d38 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/Engine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/Engine.java @@ -634,6 +634,19 @@ public abstract GetResult get( Function searcherWrapper ); + /** + * Similar to {@link Engine#get}, but it only attempts to serve the get from the translog. + * If not found in translog, it returns null, as {@link GetResult#NOT_EXISTS} could mean deletion. + */ + public GetResult getFromTranslog( + Get get, + MappingLookup mappingLookup, + DocumentParser documentParser, + Function searcherWrapper + ) { + throw new UnsupportedOperationException(); + } + /** * Acquires a point-in-time reader that can be used to create {@link Engine.Searcher}s on demand. */ diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 1b351e810af80..7914d27d3fd07 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -143,6 +143,10 @@ public class InternalEngine extends Engine { // we use the hashed variant since we iterate over it and check removal and additions on existing keys private final LiveVersionMap versionMap; private final LiveVersionMapArchive liveVersionMapArchive; + // Records the last known generation during which LiveVersionMap was in unsafe mode. This indicates that only after this + // generation it is safe to rely on the LiveVersionMap for a real-time get. + // TODO: can we move this entirely into the stateless plugin? + private final AtomicLong lastUnsafeSegmentGenerationForGets = new AtomicLong(-1); private volatile SegmentInfos lastCommittedSegmentInfos; @@ -753,58 +757,13 @@ public GetResult get( DocumentParser documentParser, Function searcherWrapper ) { - assert Objects.equals(get.uid().field(), IdFieldMapper.NAME) : get.uid().field(); + assert assertGetUsesIdField(get); try (ReleasableLock ignored = readLock.acquire()) { ensureOpen(); if (get.realtime()) { - final VersionValue versionValue; - try (Releasable ignore = versionMap.acquireLock(get.uid().bytes())) { - // we need to lock here to access the version map to do this truly in RT - versionValue = getVersionFromMap(get.uid().bytes()); - } - if (versionValue != null) { - if (versionValue.isDelete()) { - return GetResult.NOT_EXISTS; - } - if (get.versionType().isVersionConflictForReads(versionValue.version, get.version())) { - throw new VersionConflictEngineException( - shardId, - "[" + get.id() + "]", - get.versionType().explainConflictForReads(versionValue.version, get.version()) - ); - } - if (get.getIfSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO - && (get.getIfSeqNo() != versionValue.seqNo || get.getIfPrimaryTerm() != versionValue.term)) { - throw new VersionConflictEngineException( - shardId, - get.id(), - get.getIfSeqNo(), - get.getIfPrimaryTerm(), - versionValue.seqNo, - versionValue.term - ); - } - if (get.isReadFromTranslog()) { - // this is only used for updates - API _GET calls will always read form a reader for consistency - // the update call doesn't need the consistency since it's source only + _parent but parent can go away in 7.0 - if (versionValue.getLocation() != null) { - try { - final Translog.Operation operation = translog.readOperation(versionValue.getLocation()); - if (operation != null) { - return getFromTranslog(get, (Translog.Index) operation, mappingLookup, documentParser, searcherWrapper); - } - } catch (IOException e) { - maybeFailEngine("realtime_get", e); // lets check if the translog has failed with a tragic event - throw new EngineException(shardId, "failed to read operation from translog", e); - } - } else { - trackTranslogLocation.set(true); - } - } - assert versionValue.seqNo >= 0 : versionValue; - refreshIfNeeded("realtime_get", versionValue.seqNo); - } - return getFromSearcher(get, acquireSearcher("realtime_get", SearcherScope.INTERNAL, searcherWrapper), false); + var result = realtimeGetUnderLock(get, mappingLookup, documentParser, searcherWrapper, true); + assert result != null : "real-time get result must not be null"; + return result; } else { // we expose what has been externally expose in a point in time snapshot via an explicit refresh return getFromSearcher(get, acquireSearcher("get", SearcherScope.EXTERNAL, searcherWrapper), false); @@ -812,6 +771,89 @@ public GetResult get( } } + @Override + public GetResult getFromTranslog( + Get get, + MappingLookup mappingLookup, + DocumentParser documentParser, + Function searcherWrapper + ) { + assert assertGetUsesIdField(get); + try (ReleasableLock ignored = readLock.acquire()) { + ensureOpen(); + return realtimeGetUnderLock(get, mappingLookup, documentParser, searcherWrapper, false); + } + } + + /** + * @param getFromSearcher indicates whether we also try the internal searcher if not found in translog. In the case where + * we just started tracking locations in the translog, we always use the internal searcher. + */ + protected GetResult realtimeGetUnderLock( + Get get, + MappingLookup mappingLookup, + DocumentParser documentParser, + Function searcherWrapper, + boolean getFromSearcher + ) { + assert readLock.isHeldByCurrentThread(); + assert get.realtime(); + final VersionValue versionValue; + try (Releasable ignore = versionMap.acquireLock(get.uid().bytes())) { + // we need to lock here to access the version map to do this truly in RT + versionValue = getVersionFromMap(get.uid().bytes()); + } + boolean getFromSearcherIfNotInTranslog = getFromSearcher; + if (versionValue != null) { + if (versionValue.isDelete()) { + return GetResult.NOT_EXISTS; + } + if (get.versionType().isVersionConflictForReads(versionValue.version, get.version())) { + throw new VersionConflictEngineException( + shardId, + "[" + get.id() + "]", + get.versionType().explainConflictForReads(versionValue.version, get.version()) + ); + } + if (get.getIfSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO + && (get.getIfSeqNo() != versionValue.seqNo || get.getIfPrimaryTerm() != versionValue.term)) { + throw new VersionConflictEngineException( + shardId, + get.id(), + get.getIfSeqNo(), + get.getIfPrimaryTerm(), + versionValue.seqNo, + versionValue.term + ); + } + if (get.isReadFromTranslog()) { + if (versionValue.getLocation() != null) { + try { + final Translog.Operation operation = translog.readOperation(versionValue.getLocation()); + if (operation != null) { + return getFromTranslog(get, (Translog.Index) operation, mappingLookup, documentParser, searcherWrapper); + } + } catch (IOException e) { + maybeFailEngine("realtime_get", e); // lets check if the translog has failed with a tragic event + throw new EngineException(shardId, "failed to read operation from translog", e); + } + } else { + trackTranslogLocation.set(true); + // We need to start tracking translog locations in the live version map. Refresh and + // serve all the real-time gets with a missing translog location from the internal searcher + // (until a flush happens) even if we're supposed to only get from translog. + getFromSearcherIfNotInTranslog = true; + } + } + assert versionValue.seqNo >= 0 : versionValue; + refreshIfNeeded("realtime_get", versionValue.seqNo); + } + if (getFromSearcherIfNotInTranslog) { + return getFromSearcher(get, acquireSearcher("realtime_get", SearcherScope.INTERNAL, searcherWrapper), false); + } + return null; + } + /** * the status of the current doc version in lucene, compared to the version in an incoming * operation @@ -915,6 +957,7 @@ private VersionValue getVersionFromMap(BytesRef id) { // but we only need to do this once since the last operation per ID is to add to the version // map so once we pass this point we can safely lookup from the version map. if (versionMap.isUnsafe()) { + lastUnsafeSegmentGenerationForGets.set(lastCommittedSegmentInfos.getGeneration() + 1); refresh("unsafe_version_map", SearcherScope.INTERNAL, true); } versionMap.enforceSafeAccess(); @@ -3174,6 +3217,10 @@ protected void waitForCommitDurability(long generation, ActionListener lis } } + public long getLastUnsafeSegmentGenerationForGets() { + return lastUnsafeSegmentGenerationForGets.get(); + } + protected LiveVersionMapArchive createLiveVersionMapArchive() { return LiveVersionMapArchive.NOOP_ARCHIVE; } @@ -3186,4 +3233,9 @@ protected LiveVersionMapArchive getLiveVersionMapArchive() { public LiveVersionMap getLiveVersionMap() { return versionMap; } + + private static boolean assertGetUsesIdField(Get get) { + assert Objects.equals(get.uid().field(), IdFieldMapper.NAME) : get.uid().field(); + return true; + } } diff --git a/server/src/main/java/org/elasticsearch/index/get/ShardGetService.java b/server/src/main/java/org/elasticsearch/index/get/ShardGetService.java index 95a0669d6ab90..0b7a2f9bfb47b 100644 --- a/server/src/main/java/org/elasticsearch/index/get/ShardGetService.java +++ b/server/src/main/java/org/elasticsearch/index/get/ShardGetService.java @@ -86,7 +86,8 @@ public GetResult get( UNASSIGNED_SEQ_NO, UNASSIGNED_PRIMARY_TERM, fetchSourceContext, - forceSyntheticSource + forceSyntheticSource, + false ); } @@ -99,7 +100,8 @@ private GetResult get( long ifSeqNo, long ifPrimaryTerm, FetchSourceContext fetchSourceContext, - boolean forceSyntheticSource + boolean forceSyntheticSource, + boolean translogOnly ) throws IOException { currentMetric.inc(); try { @@ -113,10 +115,11 @@ private GetResult get( ifSeqNo, ifPrimaryTerm, fetchSourceContext, - forceSyntheticSource + forceSyntheticSource, + translogOnly ); - if (getResult.isExists()) { + if (getResult != null && getResult.isExists()) { existsMetric.inc(System.nanoTime() - now); } else { missingMetric.inc(System.nanoTime() - now); @@ -127,6 +130,29 @@ private GetResult get( } } + public GetResult getFromTranslog( + String id, + String[] gFields, + boolean realtime, + long version, + VersionType versionType, + FetchSourceContext fetchSourceContext, + boolean forceSyntheticSource + ) throws IOException { + return get( + id, + gFields, + realtime, + version, + versionType, + UNASSIGNED_SEQ_NO, + UNASSIGNED_PRIMARY_TERM, + fetchSourceContext, + forceSyntheticSource, + true + ); + } + public GetResult getForUpdate(String id, long ifSeqNo, long ifPrimaryTerm) throws IOException { return get( id, @@ -137,6 +163,7 @@ public GetResult getForUpdate(String id, long ifSeqNo, long ifPrimaryTerm) throw ifSeqNo, ifPrimaryTerm, FetchSourceContext.FETCH_SOURCE, + false, false ); } @@ -197,17 +224,18 @@ private GetResult innerGet( long ifSeqNo, long ifPrimaryTerm, FetchSourceContext fetchSourceContext, - boolean forceSyntheticSource + boolean forceSyntheticSource, + boolean translogOnly ) throws IOException { fetchSourceContext = normalizeFetchSourceContent(fetchSourceContext, gFields); - try ( - Engine.GetResult get = indexShard.get( - new Engine.Get(realtime, realtime, id).version(version) - .versionType(versionType) - .setIfSeqNo(ifSeqNo) - .setIfPrimaryTerm(ifPrimaryTerm) - ) - ) { + var engineGet = new Engine.Get(realtime, realtime, id).version(version) + .versionType(versionType) + .setIfSeqNo(ifSeqNo) + .setIfPrimaryTerm(ifPrimaryTerm); + try (Engine.GetResult get = translogOnly ? indexShard.getFromTranslog(engineGet) : indexShard.get(engineGet)) { + if (get == null) { + return null; + } if (get.exists() == false) { return new GetResult(shardId.getIndexName(), id, UNASSIGNED_SEQ_NO, UNASSIGNED_PRIMARY_TERM, -1, false, null, null, null); } diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 93157ea9c52bd..e902999939f70 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -1202,6 +1202,15 @@ public static Engine.Delete prepareDelete( } public Engine.GetResult get(Engine.Get get) { + return innerGet(get, false); + } + + public Engine.GetResult getFromTranslog(Engine.Get get) { + assert get.realtime(); + return innerGet(get, true); + } + + private Engine.GetResult innerGet(Engine.Get get, boolean translogOnly) { readAllowed(); MappingLookup mappingLookup = mapperService.mappingLookup(); if (mappingLookup.hasMappings() == false) { @@ -1210,6 +1219,9 @@ public Engine.GetResult get(Engine.Get get) { if (indexSettings.getIndexVersionCreated().isLegacyIndexVersion()) { throw new IllegalStateException("get operations not allowed on a legacy index"); } + if (translogOnly) { + return getEngine().getFromTranslog(get, mappingLookup, mapperService.documentParser(), this::wrapSearcher); + } return getEngine().get(get, mappingLookup, mapperService.documentParser(), this::wrapSearcher); } diff --git a/server/src/test/java/org/elasticsearch/index/engine/LiveVersionMapTestUtils.java b/server/src/test/java/org/elasticsearch/index/engine/LiveVersionMapTestUtils.java index c234978938025..b9bf305c8ecce 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/LiveVersionMapTestUtils.java +++ b/server/src/test/java/org/elasticsearch/index/engine/LiveVersionMapTestUtils.java @@ -78,4 +78,12 @@ private static Releasable acquireLock(LiveVersionMap map, BytesRef uid) { public static BytesRef uid(String id) { return new Term(IdFieldMapper.NAME, Uid.encodeId(id)).bytes(); } + + public static boolean isUnsafe(LiveVersionMap map) { + return map.isUnsafe(); + } + + public static boolean isSafeAccessRequired(LiveVersionMap map) { + return map.isSafeAccessRequired(); + } } diff --git a/server/src/test/java/org/elasticsearch/index/shard/ShardGetServiceTests.java b/server/src/test/java/org/elasticsearch/index/shard/ShardGetServiceTests.java index bc4a345084e49..9b13e0c55d0eb 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/ShardGetServiceTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/ShardGetServiceTests.java @@ -8,6 +8,7 @@ package org.elasticsearch.index.shard; import org.elasticsearch.Version; +import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.core.Strings; @@ -15,6 +16,7 @@ import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.engine.EngineTestCase; import org.elasticsearch.index.engine.InternalEngine; +import org.elasticsearch.index.engine.LiveVersionMapTestUtils; import org.elasticsearch.index.engine.VersionConflictEngineException; import org.elasticsearch.index.get.GetResult; import org.elasticsearch.index.mapper.RoutingFieldMapper; @@ -28,6 +30,7 @@ import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_PRIMARY_TERM; import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThan; public class ShardGetServiceTests extends IndexShardTestCase { @@ -209,4 +212,96 @@ public void testTypelessGetForUpdate() throws IOException { closeShards(shard); } + + public void testGetFromTranslog() throws IOException { + Settings settings = indexSettings(Version.CURRENT, 1, 1).build(); + IndexMetadata metadata = IndexMetadata.builder("test").putMapping(""" + { "properties": { "foo": { "type": "text"}}}""").settings(settings).primaryTerm(0, 1).build(); + IndexShard primary = newShard(new ShardId(metadata.getIndex(), 0), true, "n1", metadata, null); + recoverShardFromStore(primary); + InternalEngine engine = (InternalEngine) primary.getEngineOrNull(); + + // Initially there hasn't been any switches from unsafe to safe maps in the live version map + assertEquals(engine.getLastUnsafeSegmentGenerationForGets(), -1); + var map = engine.getLiveVersionMap(); + assertFalse(LiveVersionMapTestUtils.isSafeAccessRequired(map)); + assertFalse(LiveVersionMapTestUtils.isUnsafe(map)); + + // Make the map unsafe by indexing a doc that will be indexed in the append-only mode + var indexResult = indexDoc(primary, null, "{\"foo\" : \"baz\"}", XContentType.JSON, "foobar"); + assertFalse(LiveVersionMapTestUtils.isSafeAccessRequired(map)); + assertTrue(LiveVersionMapTestUtils.isUnsafe(map)); + + // Issue a get that would enforce safe access mode and switches the maps from unsafe to safe + var getResult = primary.getService() + .getFromTranslog("2", new String[] { "foo" }, true, 1, VersionType.INTERNAL, FetchSourceContext.FETCH_SOURCE, false); + assertNull(getResult); + var lastUnsafeGeneration = engine.getLastUnsafeSegmentGenerationForGets(); + assertThat(lastUnsafeGeneration, greaterThan(0L)); + assertTrue(LiveVersionMapTestUtils.isSafeAccessRequired(map)); + assertFalse(LiveVersionMapTestUtils.isUnsafe(map)); + + // A flush shouldn't change the recorded last unsafe generation for gets + PlainActionFuture flushFuture = PlainActionFuture.newFuture(); + engine.flush(true, true, flushFuture); + var flushResult = flushFuture.actionGet(); + assertTrue(flushResult.flushPerformed()); + assertThat(flushResult.generation(), equalTo(lastUnsafeGeneration)); + assertThat(engine.getLastUnsafeSegmentGenerationForGets(), equalTo(lastUnsafeGeneration)); + // No longer in translog + getResult = primary.getService() + .getFromTranslog( + indexResult.getId(), + new String[] { "foo" }, + true, + 1, + VersionType.INTERNAL, + FetchSourceContext.FETCH_SOURCE, + false + ); + assertNull(getResult); + // But normal get would still work! + getResult = primary.getService() + .get(indexResult.getId(), new String[] { "foo" }, true, 1, VersionType.INTERNAL, FetchSourceContext.FETCH_SOURCE, false); + assertNotNull(getResult); + assertTrue(getResult.isExists()); + assertEquals(engine.getLastUnsafeSegmentGenerationForGets(), lastUnsafeGeneration); + + // As long as in safe mode, last unsafe generation stays the same + assertTrue(LiveVersionMapTestUtils.isSafeAccessRequired(map)); + assertFalse(LiveVersionMapTestUtils.isUnsafe(map)); + indexDoc(primary, "1", "{\"foo\" : \"baz\"}", XContentType.JSON, "foobar"); + // The first get in safe mode, would trigger a refresh, since we need to start tracking translog locations in the live version map + getResult = primary.getService() + .getFromTranslog("1", new String[] { "foo" }, true, 1, VersionType.INTERNAL, FetchSourceContext.FETCH_SOURCE, false); + assertTrue(getResult.isExists()); + assertEquals(engine.getLastUnsafeSegmentGenerationForGets(), lastUnsafeGeneration); + getResult = primary.getService() + .getFromTranslog("2", new String[] { "foo" }, true, 1, VersionType.INTERNAL, FetchSourceContext.FETCH_SOURCE, false); + assertNull(getResult); + assertEquals(engine.getLastUnsafeSegmentGenerationForGets(), lastUnsafeGeneration); + + // After two refreshes (one for tracking translog locations, i.e., source="realtime_get") and the following) + // with no safe access needed, it should switch to append-only. (see https://github.com/elastic/elasticsearch/pull/27752) + assertTrue(LiveVersionMapTestUtils.isSafeAccessRequired(map)); + assertFalse(LiveVersionMapTestUtils.isUnsafe(map)); + indexDoc(primary, null, "{\"foo\" : \"baz\"}", XContentType.JSON, "foobar"); + engine.refresh("test"); + assertFalse(LiveVersionMapTestUtils.isSafeAccessRequired(map)); + assertFalse(LiveVersionMapTestUtils.isUnsafe(map)); + + // Redo the same: make the map unsafe and see that the recorded last unsafe generation gets updated, upon a get. + indexDoc(primary, null, "{\"foo\" : \"baz\"}", XContentType.JSON, "foobar"); + assertFalse(LiveVersionMapTestUtils.isSafeAccessRequired(map)); + assertTrue(LiveVersionMapTestUtils.isUnsafe(map)); + getResult = primary.getService() + .getFromTranslog("2", new String[] { "foo" }, true, 1, VersionType.INTERNAL, FetchSourceContext.FETCH_SOURCE, false); + assertNull(getResult); + var lastUnsafeGeneration2 = engine.getLastUnsafeSegmentGenerationForGets(); + assertTrue(lastUnsafeGeneration2 > lastUnsafeGeneration); + assertTrue(LiveVersionMapTestUtils.isSafeAccessRequired(map)); + assertFalse(LiveVersionMapTestUtils.isUnsafe(map)); + + closeShards(primary); + } } diff --git a/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java index af33af75cda7d..72411d1451c7e 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java @@ -24,6 +24,7 @@ import org.elasticsearch.cluster.routing.ShardRoutingHelper; import org.elasticsearch.cluster.routing.ShardRoutingState; import org.elasticsearch.cluster.routing.TestShardRouting; +import org.elasticsearch.common.Strings; import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.lucene.uid.Versions; @@ -912,8 +913,14 @@ protected Engine.IndexResult indexDoc(IndexShard shard, String type, String id, return indexDoc(shard, id, source, XContentType.JSON, null); } + // Uses an auto-generated ID if `id` is null/empty protected Engine.IndexResult indexDoc(IndexShard shard, String id, String source, XContentType xContentType, String routing) throws IOException { + long autoGeneratedTimestamp = IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP; + if (Strings.isEmpty(id)) { + id = UUIDs.base64UUID(); + autoGeneratedTimestamp = System.currentTimeMillis(); + } SourceToParse sourceToParse = new SourceToParse(id, new BytesArray(source), xContentType, routing, Map.of()); Engine.IndexResult result; if (shard.routingEntry().primary()) { @@ -923,7 +930,7 @@ protected Engine.IndexResult indexDoc(IndexShard shard, String id, String source sourceToParse, SequenceNumbers.UNASSIGNED_SEQ_NO, 0, - IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, + autoGeneratedTimestamp, false ); if (result.getResultType() == Engine.Result.Type.MAPPING_UPDATE_REQUIRED) { @@ -939,7 +946,7 @@ protected Engine.IndexResult indexDoc(IndexShard shard, String id, String source sourceToParse, SequenceNumbers.UNASSIGNED_SEQ_NO, 0, - IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, + autoGeneratedTimestamp, false ); } @@ -952,7 +959,7 @@ protected Engine.IndexResult indexDoc(IndexShard shard, String id, String source seqNo, shard.getOperationPrimaryTerm(), 0, - IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, + autoGeneratedTimestamp, false, sourceToParse );