Skip to content

Commit e70020d

Browse files
authored
Allow tracking evacuated IDs since the last refresh in LiveVersionMap (#95331)
This PR adds adds an interface and noop implementation for LiveVersionMapArchive. It receives old maps from the LiveVersionMap upon a refresh, and is also informed when unpromotable shards have been refreshed (one way of doing this is implemented in the matching Serverless PR). relates ES-5728
1 parent 9bbea47 commit e70020d

File tree

8 files changed

+276
-24
lines changed

8 files changed

+276
-24
lines changed

server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -141,7 +141,8 @@ public class InternalEngine extends Engine {
141141

142142
// A uid (in the form of BytesRef) to the version map
143143
// we use the hashed variant since we iterate over it and check removal and additions on existing keys
144-
private final LiveVersionMap versionMap = new LiveVersionMap();
144+
private final LiveVersionMap versionMap;
145+
private final LiveVersionMapArchive liveVersionMapArchive;
145146

146147
private volatile SegmentInfos lastCommittedSegmentInfos;
147148

@@ -216,6 +217,8 @@ public InternalEngine(EngineConfig engineConfig) {
216217
this.maxDocs = maxDocs;
217218
this.relativeTimeInNanosSupplier = config().getRelativeTimeInNanosSupplier();
218219
this.lastFlushTimestamp = relativeTimeInNanosSupplier.getAsLong(); // default to creation timestamp
220+
this.liveVersionMapArchive = createLiveVersionMapArchive();
221+
this.versionMap = new LiveVersionMap(liveVersionMapArchive);
219222
final TranslogDeletionPolicy translogDeletionPolicy = new TranslogDeletionPolicy();
220223
store.incRef();
221224
IndexWriter writer = null;
@@ -3170,4 +3173,17 @@ protected void waitForCommitDurability(long generation, ActionListener<Void> lis
31703173
listener.onResponse(null);
31713174
}
31723175
}
3176+
3177+
protected LiveVersionMapArchive createLiveVersionMapArchive() {
3178+
return LiveVersionMapArchive.NOOP_ARCHIVE;
3179+
}
3180+
3181+
protected LiveVersionMapArchive getLiveVersionMapArchive() {
3182+
return liveVersionMapArchive;
3183+
}
3184+
3185+
// Visible for testing purposes only
3186+
public LiveVersionMap getLiveVersionMap() {
3187+
return versionMap;
3188+
}
31733189
}

server/src/main/java/org/elasticsearch/index/engine/LiveVersionMap.java

Lines changed: 50 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -22,11 +22,21 @@
2222
import java.util.concurrent.atomic.AtomicLong;
2323

2424
/** Maps _uid value to its version information. */
25-
final class LiveVersionMap implements ReferenceManager.RefreshListener, Accountable {
25+
public final class LiveVersionMap implements ReferenceManager.RefreshListener, Accountable {
2626

2727
private final KeyedLock<BytesRef> keyedLock = new KeyedLock<>();
2828

29-
private static final class VersionLookup {
29+
private final LiveVersionMapArchive archiver;
30+
31+
LiveVersionMap() {
32+
this(LiveVersionMapArchive.NOOP_ARCHIVE);
33+
}
34+
35+
LiveVersionMap(LiveVersionMapArchive archiver) {
36+
this.archiver = archiver;
37+
}
38+
39+
public static final class VersionLookup {
3040

3141
/** Tracks bytes used by current map, i.e. what is freed on refresh. For deletes, which are also added to tombstones,
3242
* we only account for the CHM entry here, and account for BytesRef/VersionValue against the tombstones, since refresh would not
@@ -52,19 +62,25 @@ private static final class VersionLookup {
5262
// the tombstone
5363
private final AtomicLong minDeleteTimestamp = new AtomicLong(Long.MAX_VALUE);
5464

65+
// Modifies the map of this instance by merging with the given VersionLookup
66+
public void merge(VersionLookup versionLookup) {
67+
map.putAll(versionLookup.map);
68+
minDeleteTimestamp.accumulateAndGet(versionLookup.minDeleteTimestamp(), Math::min);
69+
}
70+
5571
private VersionLookup(Map<BytesRef, VersionValue> map) {
5672
this.map = map;
5773
}
5874

59-
VersionValue get(BytesRef key) {
75+
public VersionValue get(BytesRef key) {
6076
return map.get(key);
6177
}
6278

6379
VersionValue put(BytesRef key, VersionValue value) {
6480
return map.put(key, value);
6581
}
6682

67-
boolean isEmpty() {
83+
public boolean isEmpty() {
6884
return map.isEmpty();
6985
}
7086

@@ -88,6 +104,9 @@ public void updateMinDeletedTimestamp(DeleteVersionValue delete) {
88104
minDeleteTimestamp.accumulateAndGet(delete.time, Math::min);
89105
}
90106

107+
public long minDeleteTimestamp() {
108+
return minDeleteTimestamp.get();
109+
}
91110
}
92111

93112
private static final class Maps {
@@ -98,7 +117,7 @@ private static final class Maps {
98117
// Used while refresh is running, and to hold adds/deletes until refresh finishes. We read from both current and old on lookup:
99118
final VersionLookup old;
100119

101-
// this is not volatile since we don't need to maintain a happens before relation ship across doc IDs so it's enough to
120+
// this is not volatile since we don't need to maintain a happens before relationship across doc IDs so it's enough to
102121
// have the volatile read of the Maps reference to make it visible even across threads.
103122
boolean needsSafeAccess;
104123
final boolean previousMapsNeededSafeAccess;
@@ -135,10 +154,18 @@ Maps buildTransitionMap() {
135154
);
136155
}
137156

157+
/**
158+
* similar to `invalidateOldMap` but used only for the `unsafeKeysMap` used for assertions
159+
*/
160+
Maps invalidateOldMapForAssert() {
161+
return new Maps(current, VersionLookup.EMPTY, previousMapsNeededSafeAccess);
162+
}
163+
138164
/**
139165
* builds a new map that invalidates the old map but maintains the current. This should be called in afterRefresh()
140166
*/
141-
Maps invalidateOldMap() {
167+
Maps invalidateOldMap(LiveVersionMapArchive archiver) {
168+
archiver.afterRefresh(old);
142169
return new Maps(current, VersionLookup.EMPTY, previousMapsNeededSafeAccess);
143170
}
144171

@@ -245,8 +272,8 @@ public void afterRefresh(boolean didRefresh) throws IOException {
245272
// reopen, and so any concurrent indexing requests can still sneak in a few additions to that current map that are in fact
246273
// reflected in the previous reader. We don't touch tombstones here: they expire on their own index.gc_deletes timeframe:
247274

248-
maps = maps.invalidateOldMap();
249-
assert (unsafeKeysMap = unsafeKeysMap.invalidateOldMap()) != null;
275+
maps = maps.invalidateOldMap(archiver);
276+
assert (unsafeKeysMap = unsafeKeysMap.invalidateOldMapForAssert()) != null;
250277

251278
}
252279

@@ -270,7 +297,14 @@ private VersionValue getUnderLock(final BytesRef uid, Maps currentMaps) {
270297
return value;
271298
}
272299

273-
return tombstones.get(uid);
300+
// We first check the tombstone then the archiver since the archiver accumulates ids from the old map, and we
301+
// makes sure in `putDeleteUnderLock` the old map does not hold an entry that is in tombstone, archiver also wouldn't have them.
302+
value = tombstones.get(uid);
303+
if (value != null) {
304+
return value;
305+
}
306+
307+
return archiver.get(uid);
274308
}
275309

276310
VersionValue getVersionForAssert(final BytesRef uid) {
@@ -368,7 +402,8 @@ private boolean canRemoveTombstone(long maxTimestampToPrune, long maxSeqNoToPrun
368402
// version value can't be removed it's
369403
// not yet flushed to lucene ie. it's part of this current maps object
370404
final boolean isNotTrackedByCurrentMaps = versionValue.time < maps.getMinDeleteTimestamp();
371-
return isTooOld && isSafeToPrune && isNotTrackedByCurrentMaps;
405+
final boolean isNotTrackedByArchiver = versionValue.time < archiver.getMinDeleteTimestamp();
406+
return isTooOld && isSafeToPrune && isNotTrackedByCurrentMaps & isNotTrackedByArchiver;
372407
}
373408

374409
/**
@@ -459,4 +494,9 @@ boolean assertKeyedLockHeldByCurrentThread(BytesRef uid) {
459494
assert keyedLock.isHeldByCurrentThread(uid) : "Thread [" + Thread.currentThread().getName() + "], uid [" + uid.utf8ToString() + "]";
460495
return true;
461496
}
497+
498+
// visible for testing purposes only
499+
LiveVersionMapArchive getArchiver() {
500+
return archiver;
501+
}
462502
}
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0 and the Server Side Public License, v 1; you may not use this file except
5+
* in compliance with, at your election, the Elastic License 2.0 or the Server
6+
* Side Public License, v 1.
7+
*/
8+
9+
package org.elasticsearch.index.engine;
10+
11+
import org.apache.lucene.util.BytesRef;
12+
13+
/**
14+
* Keeps track of the old map of a LiveVersionMap that gets evacuated on a refresh
15+
*/
16+
public interface LiveVersionMapArchive {
17+
/**
18+
* Archive the old map evacuated due to a refresh
19+
*
20+
* @param old is the old map that is evacuated on a refresh
21+
*/
22+
void afterRefresh(LiveVersionMap.VersionLookup old);
23+
24+
/**
25+
* Look up the given uid in the archive
26+
*/
27+
VersionValue get(BytesRef uid);
28+
29+
/**
30+
* Returns the min delete timestamp across all archived maps.
31+
*/
32+
long getMinDeleteTimestamp();
33+
34+
LiveVersionMapArchive NOOP_ARCHIVE = new LiveVersionMapArchive() {
35+
@Override
36+
public void afterRefresh(LiveVersionMap.VersionLookup old) {}
37+
38+
@Override
39+
public VersionValue get(BytesRef uid) {
40+
return null;
41+
}
42+
43+
@Override
44+
public long getMinDeleteTimestamp() {
45+
return Long.MAX_VALUE;
46+
}
47+
};
48+
}

server/src/main/java/org/elasticsearch/index/engine/VersionValue.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
import org.elasticsearch.core.Nullable;
1414
import org.elasticsearch.index.translog.Translog;
1515

16-
abstract class VersionValue implements Accountable {
16+
public abstract class VersionValue implements Accountable {
1717

1818
private static final long BASE_RAM_BYTES_USED = RamUsageEstimator.shallowSizeOfInstance(VersionValue.class);
1919

Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0 and the Server Side Public License, v 1; you may not use this file except
5+
* in compliance with, at your election, the Elastic License 2.0 or the Server
6+
* Side Public License, v 1.
7+
*/
8+
9+
package org.elasticsearch.index.engine;
10+
11+
import org.apache.lucene.util.BytesRef;
12+
import org.elasticsearch.common.settings.Settings;
13+
import org.elasticsearch.index.shard.IndexShard;
14+
import org.elasticsearch.index.shard.IndexShardTestCase;
15+
import org.hamcrest.CoreMatchers;
16+
import org.hamcrest.core.IsInstanceOf;
17+
18+
import static org.elasticsearch.index.engine.LiveVersionMapTestUtils.putIndex;
19+
import static org.elasticsearch.index.engine.LiveVersionMapTestUtils.randomIndexVersionValue;
20+
21+
public class LiveVersionMapArchiveTests extends IndexShardTestCase {
22+
private class TestArchive implements LiveVersionMapArchive {
23+
24+
boolean afterRefreshCalled = false;
25+
LiveVersionMap.VersionLookup archivedMap = null;
26+
27+
@Override
28+
public void afterRefresh(LiveVersionMap.VersionLookup old) {
29+
afterRefreshCalled = true;
30+
archivedMap = old;
31+
}
32+
33+
@Override
34+
public VersionValue get(BytesRef uid) {
35+
return null;
36+
}
37+
38+
@Override
39+
public long getMinDeleteTimestamp() {
40+
return Long.MAX_VALUE;
41+
}
42+
}
43+
44+
private class TestEngine extends InternalEngine {
45+
TestEngine(EngineConfig engineConfig) {
46+
super(engineConfig);
47+
}
48+
49+
@Override
50+
public LiveVersionMapArchive createLiveVersionMapArchive() {
51+
return new TestArchive();
52+
}
53+
}
54+
55+
public void testLiveVersionMapArchiveCreation() throws Exception {
56+
final IndexShard shard = newStartedShard(false, Settings.EMPTY, TestEngine::new);
57+
assertThat(shard.getEngineOrNull(), CoreMatchers.instanceOf(InternalEngine.class));
58+
var engine = (InternalEngine) shard.getEngineOrNull();
59+
assertThat(engine.getLiveVersionMapArchive(), IsInstanceOf.instanceOf(TestArchive.class));
60+
closeShards(shard);
61+
}
62+
63+
public void testLiveVersionMapArchiveBasic() throws Exception {
64+
TestArchive archive = new TestArchive();
65+
LiveVersionMap map = new LiveVersionMap(archive);
66+
putIndex(map, "1", randomIndexVersionValue());
67+
map.beforeRefresh();
68+
assertFalse(archive.afterRefreshCalled);
69+
assertNull(archive.archivedMap);
70+
map.afterRefresh(randomBoolean());
71+
assertTrue(archive.afterRefreshCalled);
72+
var archived = archive.archivedMap;
73+
assertNotNull(archived);
74+
assertEquals(archived.size(), 1);
75+
}
76+
}
Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0 and the Server Side Public License, v 1; you may not use this file except
5+
* in compliance with, at your election, the Elastic License 2.0 or the Server
6+
* Side Public License, v 1.
7+
*/
8+
9+
package org.elasticsearch.index.engine;
10+
11+
import org.apache.lucene.index.Term;
12+
import org.apache.lucene.util.BytesRef;
13+
import org.elasticsearch.core.Releasable;
14+
import org.elasticsearch.index.mapper.IdFieldMapper;
15+
import org.elasticsearch.index.mapper.Uid;
16+
import org.elasticsearch.index.translog.Translog;
17+
18+
import static org.elasticsearch.test.ESTestCase.randomBoolean;
19+
import static org.elasticsearch.test.ESTestCase.randomInt;
20+
import static org.elasticsearch.test.ESTestCase.randomNonNegativeLong;
21+
22+
public class LiveVersionMapTestUtils {
23+
24+
public static LiveVersionMap newLiveVersionMap(LiveVersionMapArchive archiver) {
25+
return new LiveVersionMap(archiver);
26+
}
27+
28+
public static DeleteVersionValue newDeleteVersionValue(long version, long seqNo, long term, long time) {
29+
return new DeleteVersionValue(version, seqNo, term, time);
30+
}
31+
32+
public static IndexVersionValue newIndexVersionValue(Translog.Location location, long version, long seqNo, long term) {
33+
return new IndexVersionValue(location, version, seqNo, term);
34+
}
35+
36+
public static VersionValue get(LiveVersionMap map, String id) {
37+
try (Releasable r = acquireLock(map, uid(id))) {
38+
return map.getUnderLock(uid(id));
39+
}
40+
}
41+
42+
public static void putIndex(LiveVersionMap map, String id, IndexVersionValue version) {
43+
try (Releasable r = acquireLock(map, uid(id))) {
44+
map.putIndexUnderLock(uid(id), version);
45+
}
46+
}
47+
48+
public static void putDelete(LiveVersionMap map, String id, DeleteVersionValue version) {
49+
try (Releasable r = acquireLock(map, uid(id))) {
50+
map.putDeleteUnderLock(uid(id), version);
51+
}
52+
}
53+
54+
public static void pruneTombstones(LiveVersionMap map, long maxTimestampToPrune, long maxSeqNoToPrune) {
55+
map.pruneTombstones(maxTimestampToPrune, maxSeqNoToPrune);
56+
}
57+
58+
static IndexVersionValue randomIndexVersionValue() {
59+
return new IndexVersionValue(randomTranslogLocation(), randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong());
60+
}
61+
62+
static Translog.Location randomTranslogLocation() {
63+
if (randomBoolean()) {
64+
return null;
65+
} else {
66+
return new Translog.Location(randomNonNegativeLong(), randomNonNegativeLong(), randomInt());
67+
}
68+
}
69+
70+
public static int VersionLookupSize(LiveVersionMap.VersionLookup lookup) {
71+
return lookup.size();
72+
}
73+
74+
private static Releasable acquireLock(LiveVersionMap map, BytesRef uid) {
75+
return map.acquireLock(uid);
76+
}
77+
78+
public static BytesRef uid(String id) {
79+
return new Term(IdFieldMapper.NAME, Uid.encodeId(id)).bytes();
80+
}
81+
}

0 commit comments

Comments
 (0)