Skip to content

Provide a getFromTranslog method in ShardGetService #95736

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions server/src/main/java/org/elasticsearch/index/engine/Engine.java
Original file line number Diff line number Diff line change
Expand Up @@ -634,6 +634,19 @@ public abstract GetResult get(
Function<Engine.Searcher, Engine.Searcher> searcherWrapper
);

/**
* Similar to {@link Engine#get}, but it only attempts to serve the get from the translog.
* If not found in translog, it returns null, as {@link GetResult#NOT_EXISTS} could mean deletion.
*/
public GetResult getFromTranslog(
Get get,
MappingLookup mappingLookup,
DocumentParser documentParser,
Function<Engine.Searcher, Engine.Searcher> searcherWrapper
) {
throw new UnsupportedOperationException();
}

/**
* Acquires a point-in-time reader that can be used to create {@link Engine.Searcher}s on demand.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,9 @@ public class InternalEngine extends Engine {
// we use the hashed variant since we iterate over it and check removal and additions on existing keys
private final LiveVersionMap versionMap;
private final LiveVersionMapArchive liveVersionMapArchive;
// Records the last known generation during which LiveVersionMap was in unsafe mode. This indicates that only after this
// generation it is safe to rely on the LiveVersionMap for a real-time get.
private final AtomicLong lastUnsafeSegmentGenerationForGets = new AtomicLong(-1);

private volatile SegmentInfos lastCommittedSegmentInfos;

Expand Down Expand Up @@ -753,63 +756,95 @@ public GetResult get(
DocumentParser documentParser,
Function<Engine.Searcher, Engine.Searcher> searcherWrapper
) {
assert Objects.equals(get.uid().field(), IdFieldMapper.NAME) : get.uid().field();
assert assertGetUsesIdField(get);
try (ReleasableLock ignored = readLock.acquire()) {
ensureOpen();
if (get.realtime()) {
final VersionValue versionValue;
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All I've done here is to break this down into two pieces. Alternative would be probably a flag to shortcut this, but I'm not sure it makes this more readable. Open for suggestions.

try (Releasable ignore = versionMap.acquireLock(get.uid().bytes())) {
// we need to lock here to access the version map to do this truly in RT
versionValue = getVersionFromMap(get.uid().bytes());
var result = realtimeGetUnderLock(get, mappingLookup, documentParser, searcherWrapper);
if (result != null) {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The normal get, as before, never returns null.

return result;
}
if (versionValue != null) {
if (versionValue.isDelete()) {
return GetResult.NOT_EXISTS;
}
if (get.versionType().isVersionConflictForReads(versionValue.version, get.version())) {
throw new VersionConflictEngineException(
shardId,
"[" + get.id() + "]",
get.versionType().explainConflictForReads(versionValue.version, get.version())
);
}
if (get.getIfSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO
&& (get.getIfSeqNo() != versionValue.seqNo || get.getIfPrimaryTerm() != versionValue.term)) {
throw new VersionConflictEngineException(
shardId,
get.id(),
get.getIfSeqNo(),
get.getIfPrimaryTerm(),
versionValue.seqNo,
versionValue.term
);
}
if (get.isReadFromTranslog()) {
// this is only used for updates - API _GET calls will always read form a reader for consistency
// the update call doesn't need the consistency since it's source only + _parent but parent can go away in 7.0
if (versionValue.getLocation() != null) {
try {
final Translog.Operation operation = translog.readOperation(versionValue.getLocation());
if (operation != null) {
return getFromTranslog(get, (Translog.Index) operation, mappingLookup, documentParser, searcherWrapper);
}
} catch (IOException e) {
maybeFailEngine("realtime_get", e); // lets check if the translog has failed with a tragic event
throw new EngineException(shardId, "failed to read operation from translog", e);
}
} else {
trackTranslogLocation.set(true);
return getFromSearcher(get, acquireSearcher("realtime_get", SearcherScope.INTERNAL, searcherWrapper), false);
} else {
// we expose what has been externally expose in a point in time snapshot via an explicit refresh
return getFromSearcher(get, acquireSearcher("get", SearcherScope.EXTERNAL, searcherWrapper), false);
}
}
}

@Override
public GetResult getFromTranslog(
Get get,
MappingLookup mappingLookup,
DocumentParser documentParser,
Function<Searcher, Searcher> searcherWrapper
) {
assert assertGetUsesIdField(get);
try (ReleasableLock ignored = readLock.acquire()) {
ensureOpen();
return realtimeGetUnderLock(get, mappingLookup, documentParser, searcherWrapper);
}
}

protected GetResult realtimeGetUnderLock(
Get get,
MappingLookup mappingLookup,
DocumentParser documentParser,
Function<Engine.Searcher, Engine.Searcher> searcherWrapper
) {
assert readLock.isHeldByCurrentThread();
assert get.realtime();
final VersionValue versionValue;
try (Releasable ignore = versionMap.acquireLock(get.uid().bytes())) {
// we need to lock here to access the version map to do this truly in RT
versionValue = getVersionFromMap(get.uid().bytes());
}
if (versionValue != null) {
if (versionValue.isDelete()) {
return GetResult.NOT_EXISTS;
}
if (get.versionType().isVersionConflictForReads(versionValue.version, get.version())) {
throw new VersionConflictEngineException(
shardId,
"[" + get.id() + "]",
get.versionType().explainConflictForReads(versionValue.version, get.version())
);
}
if (get.getIfSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO
&& (get.getIfSeqNo() != versionValue.seqNo || get.getIfPrimaryTerm() != versionValue.term)) {
throw new VersionConflictEngineException(
shardId,
get.id(),
get.getIfSeqNo(),
get.getIfPrimaryTerm(),
versionValue.seqNo,
versionValue.term
);
}
if (get.isReadFromTranslog()) {
if (versionValue.getLocation() != null) {
try {
final Translog.Operation operation = translog.readOperation(versionValue.getLocation());
if (operation != null) {
return getFromTranslog(get, (Translog.Index) operation, mappingLookup, documentParser, searcherWrapper);
}
} catch (IOException e) {
maybeFailEngine("realtime_get", e); // lets check if the translog has failed with a tragic event
throw new EngineException(shardId, "failed to read operation from translog", e);
}
} else {
trackTranslogLocation.set(true);
// We need to start tracking translog locations in the live version map. Refresh and
// serve the get from the internal searcher.
assert versionValue.seqNo >= 0 : versionValue;
refreshIfNeeded("realtime_get", versionValue.seqNo);
return getFromSearcher(get, acquireSearcher("realtime_get", SearcherScope.INTERNAL, searcherWrapper), false);
}
return getFromSearcher(get, acquireSearcher("realtime_get", SearcherScope.INTERNAL, searcherWrapper), false);
} else {
// we expose what has been externally expose in a point in time snapshot via an explicit refresh
return getFromSearcher(get, acquireSearcher("get", SearcherScope.EXTERNAL, searcherWrapper), false);
}
assert versionValue.seqNo >= 0 : versionValue;
refreshIfNeeded("realtime_get", versionValue.seqNo);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this refresh needed when the caller is getFromTranslog? I have the impression that we could avoid it but maybe I am missing a point?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is needed since we read from the internal searcher and we need to reload the reader so we can serve the get. The branching is a bit awkward there, I admit. I'll try to improve that.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tlrx I've pushed 8a973f9. Maybe that makes this clearer.

}
return null;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd be fine to return an Optional.empty() here and in upstream methods; this would maybe help to distinguish the case when the doc is not found in the translog vs when it's found but deleted?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Optional would be also an option! I just wanted to reduce non-essential changes to the Engine interface since that would change the return type in the upstream methods. The null return value is not affecting the normal Engine.get path. I will add some asserts for it, to make sure. (Alternatively, we could change the return value of get and getFromTranslog to an Optional).

}

/**
Expand Down Expand Up @@ -915,6 +950,7 @@ private VersionValue getVersionFromMap(BytesRef id) {
// but we only need to do this once since the last operation per ID is to add to the version
// map so once we pass this point we can safely lookup from the version map.
if (versionMap.isUnsafe()) {
lastUnsafeSegmentGenerationForGets.set(lastCommittedSegmentInfos.getGeneration() + 1);
refresh("unsafe_version_map", SearcherScope.INTERNAL, true);
}
versionMap.enforceSafeAccess();
Expand Down Expand Up @@ -3174,6 +3210,10 @@ protected void waitForCommitDurability(long generation, ActionListener<Void> lis
}
}

public long getLastUnsafeSegmentGenerationForGets() {
return lastUnsafeSegmentGenerationForGets.get();
}

protected LiveVersionMapArchive createLiveVersionMapArchive() {
return LiveVersionMapArchive.NOOP_ARCHIVE;
}
Expand All @@ -3186,4 +3226,9 @@ protected LiveVersionMapArchive getLiveVersionMapArchive() {
public LiveVersionMap getLiveVersionMap() {
return versionMap;
}

private static boolean assertGetUsesIdField(Get get) {
assert Objects.equals(get.uid().field(), IdFieldMapper.NAME) : get.uid().field();
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,8 @@ public GetResult get(
UNASSIGNED_SEQ_NO,
UNASSIGNED_PRIMARY_TERM,
fetchSourceContext,
forceSyntheticSource
forceSyntheticSource,
false
);
}

Expand All @@ -99,7 +100,8 @@ private GetResult get(
long ifSeqNo,
long ifPrimaryTerm,
FetchSourceContext fetchSourceContext,
boolean forceSyntheticSource
boolean forceSyntheticSource,
boolean translogOnly
) throws IOException {
currentMetric.inc();
try {
Expand All @@ -113,10 +115,11 @@ private GetResult get(
ifSeqNo,
ifPrimaryTerm,
fetchSourceContext,
forceSyntheticSource
forceSyntheticSource,
translogOnly
);

if (getResult.isExists()) {
if (getResult != null && getResult.isExists()) {
existsMetric.inc(System.nanoTime() - now);
} else {
missingMetric.inc(System.nanoTime() - now);
Expand All @@ -127,6 +130,29 @@ private GetResult get(
}
}

public GetResult getFromTranslog(
String id,
String[] gFields,
boolean realtime,
long version,
VersionType versionType,
FetchSourceContext fetchSourceContext,
boolean forceSyntheticSource
) throws IOException {
return get(
id,
gFields,
realtime,
version,
versionType,
UNASSIGNED_SEQ_NO,
UNASSIGNED_PRIMARY_TERM,
fetchSourceContext,
forceSyntheticSource,
true
);
}

public GetResult getForUpdate(String id, long ifSeqNo, long ifPrimaryTerm) throws IOException {
return get(
id,
Expand All @@ -137,6 +163,7 @@ public GetResult getForUpdate(String id, long ifSeqNo, long ifPrimaryTerm) throw
ifSeqNo,
ifPrimaryTerm,
FetchSourceContext.FETCH_SOURCE,
false,
false
);
}
Expand Down Expand Up @@ -197,17 +224,18 @@ private GetResult innerGet(
long ifSeqNo,
long ifPrimaryTerm,
FetchSourceContext fetchSourceContext,
boolean forceSyntheticSource
boolean forceSyntheticSource,
boolean translogOnly
) throws IOException {
fetchSourceContext = normalizeFetchSourceContent(fetchSourceContext, gFields);
try (
Engine.GetResult get = indexShard.get(
new Engine.Get(realtime, realtime, id).version(version)
.versionType(versionType)
.setIfSeqNo(ifSeqNo)
.setIfPrimaryTerm(ifPrimaryTerm)
)
) {
var engineGet = new Engine.Get(realtime, realtime, id).version(version)
.versionType(versionType)
.setIfSeqNo(ifSeqNo)
.setIfPrimaryTerm(ifPrimaryTerm);
try (Engine.GetResult get = translogOnly ? indexShard.getFromTranslog(engineGet) : indexShard.get(engineGet)) {
if (get == null) {
return null;
}
if (get.exists() == false) {
return new GetResult(shardId.getIndexName(), id, UNASSIGNED_SEQ_NO, UNASSIGNED_PRIMARY_TERM, -1, false, null, null, null);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1202,6 +1202,15 @@ public static Engine.Delete prepareDelete(
}

public Engine.GetResult get(Engine.Get get) {
return innerGet(get, false);
}

public Engine.GetResult getFromTranslog(Engine.Get get) {
assert get.realtime();
return innerGet(get, true);
}

private Engine.GetResult innerGet(Engine.Get get, boolean translogOnly) {
readAllowed();
MappingLookup mappingLookup = mapperService.mappingLookup();
if (mappingLookup.hasMappings() == false) {
Expand All @@ -1210,7 +1219,12 @@ public Engine.GetResult get(Engine.Get get) {
if (indexSettings.getIndexVersionCreated().isLegacyIndexVersion()) {
throw new IllegalStateException("get operations not allowed on a legacy index");
}
return getEngine().get(get, mappingLookup, mapperService.documentParser(), this::wrapSearcher);
if (translogOnly) {
return getEngine().getFromTranslog(get, mappingLookup, mapperService.documentParser(), this::wrapSearcher);
}
var result = getEngine().get(get, mappingLookup, mapperService.documentParser(), this::wrapSearcher);
assert result != null : "result cannot be null";
return result;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,4 +78,12 @@ private static Releasable acquireLock(LiveVersionMap map, BytesRef uid) {
public static BytesRef uid(String id) {
return new Term(IdFieldMapper.NAME, Uid.encodeId(id)).bytes();
}

public static boolean isUnsafe(LiveVersionMap map) {
return map.isUnsafe();
}

public static boolean isSafeAccessRequired(LiveVersionMap map) {
return map.isSafeAccessRequired();
}
}
Loading