Skip to content

Track evacuated IDs since the last shard-local refresh in LiveVersionMap #95331

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 26 commits into from
May 4, 2023
Merged
Show file tree
Hide file tree
Changes from 19 commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
e04e324
first draft
pxsalehi Apr 19, 2023
f83cabe
prioratize tombstone read over archiver read
pxsalehi Apr 19, 2023
e83cff3
renames and an assert
pxsalehi Apr 19, 2023
c41d7ab
comments
pxsalehi Apr 20, 2023
f4fb8da
Merge remote-tracking branch 'upstream/main' into ps230417-LiveVersio…
pxsalehi Apr 20, 2023
f1986b0
cleanup and make a bunch of stuff public
pxsalehi Apr 20, 2023
8629e47
use archiver only for non-assert map
pxsalehi Apr 21, 2023
3d3536b
clear archive if unsafe
pxsalehi Apr 22, 2023
1bab205
translogOnly get
pxsalehi Apr 22, 2023
c372b85
Revert "translogOnly get"
pxsalehi Apr 22, 2023
f8bf812
make stuff public for testing
pxsalehi Apr 22, 2023
b450b6f
translogOnly get
pxsalehi Apr 22, 2023
831dc9f
clear archive when swithing from unsafe to safe
pxsalehi Apr 23, 2023
71fda19
remove clear archive
pxsalehi Apr 24, 2023
b147487
comments
pxsalehi Apr 24, 2023
9448f49
Revert "translogOnly get"
pxsalehi Apr 24, 2023
0cc9ba2
move some unsued stuff back to package private visibility
pxsalehi Apr 24, 2023
1c2aa67
reduce chaos
pxsalehi Apr 25, 2023
638b3db
clean up modifiers and add comments
pxsalehi Apr 26, 2023
d69e55b
spotless
pxsalehi Apr 26, 2023
331dfc6
Merge remote-tracking branch 'upstream/main' into ps230417-LiveVersio…
pxsalehi Apr 27, 2023
d2b9623
clean up and track min delete timestamp
pxsalehi Apr 28, 2023
6e169b2
address review comments
pxsalehi Apr 28, 2023
1848177
add more test
pxsalehi May 2, 2023
344c81d
protected
pxsalehi May 4, 2023
7acd078
Merge remote-tracking branch 'upstream/main' into ps230417-LiveVersio…
pxsalehi May 4, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,13 @@

/** Holds a deleted version, which just adds a timestamp to {@link VersionValue} so we know when we can expire the deletion. */

final class DeleteVersionValue extends VersionValue {
public final class DeleteVersionValue extends VersionValue {

private static final long BASE_RAM_BYTES_USED = RamUsageEstimator.shallowSizeOfInstance(DeleteVersionValue.class);

final long time;

DeleteVersionValue(long version, long seqNo, long term, long time) {
public DeleteVersionValue(long version, long seqNo, long term, long time) {
super(version, seqNo, term);
this.time = time;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,13 @@

import java.util.Objects;

final class IndexVersionValue extends VersionValue {
public final class IndexVersionValue extends VersionValue {

private static final long RAM_BYTES_USED = RamUsageEstimator.shallowSizeOfInstance(IndexVersionValue.class);

private final Translog.Location translogLocation;

IndexVersionValue(Translog.Location translogLocation, long version, long seqNo, long term) {
public IndexVersionValue(Translog.Location translogLocation, long version, long seqNo, long term) {
super(version, seqNo, term);
this.translogLocation = translogLocation;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ public class InternalEngine extends Engine {

// A uid (in the form of BytesRef) to the version map
// we use the hashed variant since we iterate over it and check removal and additions on existing keys
private final LiveVersionMap versionMap = new LiveVersionMap();
private final LiveVersionMap versionMap;

private volatile SegmentInfos lastCommittedSegmentInfos;

Expand Down Expand Up @@ -215,6 +215,7 @@ public InternalEngine(EngineConfig engineConfig) {
this.maxDocs = maxDocs;
this.relativeTimeInNanosSupplier = config().getRelativeTimeInNanosSupplier();
this.lastFlushTimestamp = relativeTimeInNanosSupplier.getAsLong(); // default to creation timestamp
this.versionMap = createLiveVersionMap();
final TranslogDeletionPolicy translogDeletionPolicy = new TranslogDeletionPolicy();
store.incRef();
IndexWriter writer = null;
Expand Down Expand Up @@ -3128,4 +3129,13 @@ public ShardLongFieldRange getRawFieldRange(String field) {
public void addFlushListener(Translog.Location location, ActionListener<Long> listener) {
this.flushListener.addOrNotify(location, listener);
}

protected LiveVersionMap createLiveVersionMap() {
return new LiveVersionMap();
}

// Visible for testing purposes only
public LiveVersionMap getLiveVersionMap() {
return versionMap;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,21 @@
import java.util.concurrent.atomic.AtomicLong;

/** Maps _uid value to its version information. */
final class LiveVersionMap implements ReferenceManager.RefreshListener, Accountable {
public final class LiveVersionMap implements ReferenceManager.RefreshListener, Accountable {

private final KeyedLock<BytesRef> keyedLock = new KeyedLock<>();

private static final class VersionLookup {
private final LiveVersionMapArchiver archiver;

LiveVersionMap() {
this(LiveVersionMapArchiver.NOOP_ARCHIVER);
}

public LiveVersionMap(LiveVersionMapArchiver archiver) {
this.archiver = archiver;
}

public static final class VersionLookup {

/** Tracks bytes used by current map, i.e. what is freed on refresh. For deletes, which are also added to tombstones,
* we only account for the CHM entry here, and account for BytesRef/VersionValue against the tombstones, since refresh would not
Expand All @@ -52,19 +62,24 @@ private static final class VersionLookup {
// the tombstone
private final AtomicLong minDeleteTimestamp = new AtomicLong(Long.MAX_VALUE);

// Modifies the map of this instance by merging with the given VersionLookup
public void merge(VersionLookup versionLookup) {
map.putAll(versionLookup.map);
}

private VersionLookup(Map<BytesRef, VersionValue> map) {
this.map = map;
}

VersionValue get(BytesRef key) {
public VersionValue get(BytesRef key) {
return map.get(key);
}

VersionValue put(BytesRef key, VersionValue value) {
return map.put(key, value);
}

boolean isEmpty() {
public boolean isEmpty() {
return map.isEmpty();
}

Expand Down Expand Up @@ -135,10 +150,18 @@ Maps buildTransitionMap() {
);
}

/**
* similar to `invalidateOldMap` but used only for the `unsafeKeysMap` used for assertions
*/
Maps invalidateOldMapForAssert() {
return new Maps(current, VersionLookup.EMPTY, previousMapsNeededSafeAccess);
}

/**
* builds a new map that invalidates the old map but maintains the current. This should be called in afterRefresh()
*/
Maps invalidateOldMap() {
Maps invalidateOldMap(LiveVersionMapArchiver archiver) {
archiver.afterRefresh(old);
return new Maps(current, VersionLookup.EMPTY, previousMapsNeededSafeAccess);
}

Expand Down Expand Up @@ -245,11 +268,15 @@ public void afterRefresh(boolean didRefresh) throws IOException {
// reopen, and so any concurrent indexing requests can still sneak in a few additions to that current map that are in fact
// reflected in the previous reader. We don't touch tombstones here: they expire on their own index.gc_deletes timeframe:

maps = maps.invalidateOldMap();
assert (unsafeKeysMap = unsafeKeysMap.invalidateOldMap()) != null;
maps = maps.invalidateOldMap(archiver);
assert (unsafeKeysMap = unsafeKeysMap.invalidateOldMapForAssert()) != null;

}

public void afterUnpromotablesRefreshed(long generation) {
archiver.afterUnpromotablesRefreshed(generation);
}

/**
* Returns the live version (add or delete) for this uid.
*/
Expand All @@ -270,7 +297,14 @@ private VersionValue getUnderLock(final BytesRef uid, Maps currentMaps) {
return value;
}

return tombstones.get(uid);
// We first check the tombstone then the archiver since the archiver accumulates ids from the old map, and we
// makes sure in `putDeleteUnderLock` the old map does not hold an entry that is in tombstone, archiver also wouldn't have them.
value = tombstones.get(uid);
if (value != null) {
return value;
}

return archiver.get(uid);
}

VersionValue getVersionForAssert(final BytesRef uid) {
Expand Down Expand Up @@ -459,4 +493,9 @@ boolean assertKeyedLockHeldByCurrentThread(BytesRef uid) {
assert keyedLock.isHeldByCurrentThread(uid) : "Thread [" + Thread.currentThread().getName() + "], uid [" + uid.utf8ToString() + "]";
return true;
}

// visible for testing purposes only
LiveVersionMapArchiver getArchiver() {
return archiver;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.index.engine;

import org.apache.lucene.util.BytesRef;

/**
* Keeps track of the old map of a LiveVersionMap that gets evacuated on a refresh
*/
public interface LiveVersionMapArchiver {
/**
* Archive the old map evacuated due to a refresh
*
* @param old is the old map that is evacuated on a refresh
*/
void afterRefresh(LiveVersionMap.VersionLookup old);

/**
* Trigger a cleanup of the archive based on the given generation
*
* @param generation the generation of the commit caused by the flush
*/
void afterUnpromotablesRefreshed(long generation);

/**
* Look up the given uid in the archive
*/
VersionValue get(BytesRef uid);

LiveVersionMapArchiver NOOP_ARCHIVER = new LiveVersionMapArchiver() {
@Override
public void afterRefresh(LiveVersionMap.VersionLookup old) {}

@Override
public void afterUnpromotablesRefreshed(long generation) {}

@Override
public VersionValue get(BytesRef uid) {
return null;
}
};
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
import org.elasticsearch.core.Nullable;
import org.elasticsearch.index.translog.Translog;

abstract class VersionValue implements Accountable {
public abstract class VersionValue implements Accountable {

private static final long BASE_RAM_BYTES_USED = RamUsageEstimator.shallowSizeOfInstance(VersionValue.class);

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.index.engine;

import org.apache.lucene.index.Term;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.BytesRefBuilder;
import org.elasticsearch.core.Releasable;
import org.elasticsearch.index.mapper.IdFieldMapper;
import org.elasticsearch.index.mapper.Uid;

public class LiveVersionMapTestUtils {

public static VersionValue get(LiveVersionMap map, String id) {
try (Releasable r = acquireLock(map, uid(id))) {
return map.getUnderLock(uid(id));
}
}

public static void putIndex(LiveVersionMap map, String id, IndexVersionValue version) {
try (Releasable r = acquireLock(map, uid(id))) {
map.putIndexUnderLock(uid(id), version);
}
}

public static void putDelete(LiveVersionMap map, String id, DeleteVersionValue version) {
try (Releasable r = acquireLock(map, uid(id))) {
map.putDeleteUnderLock(uid(id), version);
}
}

public static void pruneTombstones(LiveVersionMap map, long maxTimestampToPrune, long maxSeqNoToPrune) {
map.pruneTombstones(maxTimestampToPrune, maxSeqNoToPrune);
}

public static int VersionLookupSize(LiveVersionMap.VersionLookup lookup) {
return lookup.size();
}

private static Releasable acquireLock(LiveVersionMap map, BytesRef uid) {
return map.acquireLock(uid);
}

private static BytesRef uid(String id) {
return new Term(IdFieldMapper.NAME, Uid.encodeId(id)).bytes();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,7 @@ public void setUp() throws Exception {

assertEquals(engine.config().getCodec().getName(), codecService.codec(codecName).getName());
assertEquals(currentIndexWriterConfig.getCodec().getName(), codecService.codec(codecName).getName());
assertEquals(engine.getLiveVersionMap().getArchiver(), LiveVersionMapArchiver.NOOP_ARCHIVER);
if (randomBoolean()) {
engine.config().setEnableGcDeletes(false);
}
Expand Down