Skip to content

Commit 421b097

Browse files
committed
Never block on key in LiveVersionMap#pruneTombstones (#28736)
Pruning tombstones is best effort and should not block if a key is currently locked. This can cause a deadlock in rare situations if we switch of append only optimization while heavily updating the same key in the engine while the LiveVersionMap is locked. This is very rare since this code patch only executed every 15 seconds by default since that is the interval we try to prune the deletes in the version map. Closes #28714
1 parent 1e2f86c commit 421b097

File tree

5 files changed

+189
-28
lines changed

5 files changed

+189
-28
lines changed

server/src/main/java/org/elasticsearch/common/util/concurrent/KeyedLock.java

Lines changed: 44 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -63,20 +63,52 @@ public Releasable acquire(T key) {
6363
while (true) {
6464
KeyLock perNodeLock = map.get(key);
6565
if (perNodeLock == null) {
66-
KeyLock newLock = new KeyLock(fair);
67-
perNodeLock = map.putIfAbsent(key, newLock);
68-
if (perNodeLock == null) {
69-
newLock.lock();
70-
return new ReleasableLock(key, newLock);
66+
ReleasableLock newLock = tryCreateNewLock(key);
67+
if (newLock != null) {
68+
return newLock;
69+
}
70+
} else {
71+
assert perNodeLock != null;
72+
int i = perNodeLock.count.get();
73+
if (i > 0 && perNodeLock.count.compareAndSet(i, i + 1)) {
74+
perNodeLock.lock();
75+
return new ReleasableLock(key, perNodeLock);
7176
}
7277
}
73-
assert perNodeLock != null;
74-
int i = perNodeLock.count.get();
75-
if (i > 0 && perNodeLock.count.compareAndSet(i, i + 1)) {
76-
perNodeLock.lock();
77-
return new ReleasableLock(key, perNodeLock);
78+
}
79+
}
80+
81+
/**
82+
* Tries to acquire the lock for the given key and returns it. If the lock can't be acquired null is returned.
83+
*/
84+
public Releasable tryAcquire(T key) {
85+
final KeyLock perNodeLock = map.get(key);
86+
if (perNodeLock == null) {
87+
return tryCreateNewLock(key);
88+
}
89+
if (perNodeLock.tryLock()) { // ok we got it - make sure we increment it accordingly otherwise release it again
90+
int i;
91+
while ((i = perNodeLock.count.get()) > 0) {
92+
// we have to do this in a loop here since even if the count is > 0
93+
// there could be a concurrent blocking acquire that changes the count and then this CAS fails. Since we already got
94+
// the lock we should retry and see if we can still get it or if the count is 0. If that is the case and we give up.
95+
if (perNodeLock.count.compareAndSet(i, i + 1)) {
96+
return new ReleasableLock(key, perNodeLock);
97+
}
7898
}
99+
perNodeLock.unlock(); // make sure we unlock and don't leave the lock in a locked state
100+
}
101+
return null;
102+
}
103+
104+
private ReleasableLock tryCreateNewLock(T key) {
105+
KeyLock newLock = new KeyLock(fair);
106+
newLock.lock();
107+
KeyLock keyLock = map.putIfAbsent(key, newLock);
108+
if (keyLock == null) {
109+
return new ReleasableLock(key, newLock);
79110
}
111+
return null;
80112
}
81113

82114
/**
@@ -92,11 +124,12 @@ public boolean isHeldByCurrentThread(T key) {
92124

93125
private void release(T key, KeyLock lock) {
94126
assert lock == map.get(key);
127+
final int decrementAndGet = lock.count.decrementAndGet();
95128
lock.unlock();
96-
int decrementAndGet = lock.count.decrementAndGet();
97129
if (decrementAndGet == 0) {
98130
map.remove(key, lock);
99131
}
132+
assert decrementAndGet >= 0 : decrementAndGet + " must be >= 0 but wasn't";
100133
}
101134

102135

server/src/main/java/org/elasticsearch/index/engine/LiveVersionMap.java

Lines changed: 19 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,6 @@
3232
import java.util.Collections;
3333
import java.util.Map;
3434
import java.util.concurrent.atomic.AtomicLong;
35-
import java.util.function.Function;
3635

3736
/** Maps _uid value to its version information. */
3837
final class LiveVersionMap implements ReferenceManager.RefreshListener, Accountable {
@@ -378,20 +377,25 @@ void removeTombstoneUnderLock(BytesRef uid) {
378377

379378
void pruneTombstones(long currentTime, long pruneInterval) {
380379
for (Map.Entry<BytesRef, DeleteVersionValue> entry : tombstones.entrySet()) {
381-
BytesRef uid = entry.getKey();
382-
try (Releasable ignored = acquireLock(uid)) { // can we do it without this lock on each value? maybe batch to a set and get
383-
// the lock once per set?
384-
// Must re-get it here, vs using entry.getValue(), in case the uid was indexed/deleted since we pulled the iterator:
385-
DeleteVersionValue versionValue = tombstones.get(uid);
386-
if (versionValue != null) {
387-
// check if the value is old enough to be removed
388-
final boolean isTooOld = currentTime - versionValue.time > pruneInterval;
389-
if (isTooOld) {
390-
// version value can't be removed it's
391-
// not yet flushed to lucene ie. it's part of this current maps object
392-
final boolean isNotTrackedByCurrentMaps = versionValue.time < maps.getMinDeleteTimestamp();
393-
if (isNotTrackedByCurrentMaps) {
394-
removeTombstoneUnderLock(uid);
380+
final BytesRef uid = entry.getKey();
381+
try (Releasable lock = keyedLock.tryAcquire(uid)) {
382+
// we use tryAcquire here since this is a best effort and we try to be least disruptive
383+
// this method is also called under lock in the engine under certain situations such that this can lead to deadlocks
384+
// if we do use a blocking acquire. see #28714
385+
if (lock != null) { // did we get the lock?
386+
// can we do it without this lock on each value? maybe batch to a set and get the lock once per set?
387+
// Must re-get it here, vs using entry.getValue(), in case the uid was indexed/deleted since we pulled the iterator:
388+
final DeleteVersionValue versionValue = tombstones.get(uid);
389+
if (versionValue != null) {
390+
// check if the value is old enough to be removed
391+
final boolean isTooOld = currentTime - versionValue.time > pruneInterval;
392+
if (isTooOld) {
393+
// version value can't be removed it's
394+
// not yet flushed to lucene ie. it's part of this current maps object
395+
final boolean isNotTrackedByCurrentMaps = versionValue.time < maps.getMinDeleteTimestamp();
396+
if (isNotTrackedByCurrentMaps) {
397+
removeTombstoneUnderLock(uid);
398+
}
395399
}
396400
}
397401
}

server/src/test/java/org/elasticsearch/common/util/concurrent/KeyedLockTests.java

Lines changed: 47 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020
package org.elasticsearch.common.util.concurrent;
2121

2222
import org.elasticsearch.common.lease.Releasable;
23-
import org.elasticsearch.common.util.concurrent.KeyedLock;
2423
import org.elasticsearch.test.ESTestCase;
2524
import org.hamcrest.Matchers;
2625

@@ -31,6 +30,7 @@
3130
import java.util.Set;
3231
import java.util.concurrent.ConcurrentHashMap;
3332
import java.util.concurrent.CountDownLatch;
33+
import java.util.concurrent.atomic.AtomicBoolean;
3434
import java.util.concurrent.atomic.AtomicInteger;
3535

3636
import static org.hamcrest.Matchers.equalTo;
@@ -79,6 +79,34 @@ public void testHasLockedKeys() {
7979
assertFalse(lock.hasLockedKeys());
8080
}
8181

82+
public void testTryAcquire() throws InterruptedException {
83+
KeyedLock<String> lock = new KeyedLock<>();
84+
Releasable foo = lock.tryAcquire("foo");
85+
Releasable second = lock.tryAcquire("foo");
86+
assertTrue(lock.hasLockedKeys());
87+
foo.close();
88+
assertTrue(lock.hasLockedKeys());
89+
second.close();
90+
assertFalse(lock.hasLockedKeys());
91+
// lock again
92+
Releasable acquire = lock.tryAcquire("foo");
93+
assertNotNull(acquire);
94+
final AtomicBoolean check = new AtomicBoolean(false);
95+
CountDownLatch latch = new CountDownLatch(1);
96+
Thread thread = new Thread(() -> {
97+
latch.countDown();
98+
try (Releasable ignore = lock.acquire("foo")) {
99+
assertTrue(check.get());
100+
}
101+
});
102+
thread.start();
103+
latch.await();
104+
check.set(true);
105+
acquire.close();
106+
foo.close();
107+
thread.join();
108+
}
109+
82110
public void testLockIsReentrant() throws InterruptedException {
83111
KeyedLock<String> lock = new KeyedLock<>();
84112
Releasable foo = lock.acquire("foo");
@@ -137,7 +165,24 @@ public void run() {
137165
for (int i = 0; i < numRuns; i++) {
138166
String curName = names[randomInt(names.length - 1)];
139167
assert connectionLock.isHeldByCurrentThread(curName) == false;
140-
try (Releasable ignored = connectionLock.acquire(curName)) {
168+
Releasable lock;
169+
if (randomIntBetween(0, 10) < 4) {
170+
int tries = 0;
171+
boolean stepOut = false;
172+
while ((lock = connectionLock.tryAcquire(curName)) == null) {
173+
assertFalse(connectionLock.isHeldByCurrentThread(curName));
174+
if (tries++ == 10) {
175+
stepOut = true;
176+
break;
177+
}
178+
}
179+
if (stepOut) {
180+
break;
181+
}
182+
} else {
183+
lock = connectionLock.acquire(curName);
184+
}
185+
try (Releasable ignore = lock) {
141186
assert connectionLock.isHeldByCurrentThread(curName);
142187
assert connectionLock.isHeldByCurrentThread(curName + "bla") == false;
143188
if (randomBoolean()) {

server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4535,4 +4535,60 @@ public void testShouldPeriodicallyFlush() throws Exception {
45354535
assertThat(engine.getLastCommittedSegmentInfos(), not(sameInstance(lastCommitInfo)));
45364536
assertThat(engine.getTranslog().uncommittedOperations(), equalTo(0));
45374537
}
4538+
4539+
4540+
public void testStressUpdateSameDocWhileGettingIt() throws IOException, InterruptedException {
4541+
final int iters = randomIntBetween(1, 15);
4542+
for (int i = 0; i < iters; i++) {
4543+
// this is a reproduction of https://github.com/elastic/elasticsearch/issues/28714
4544+
try (Store store = createStore(); InternalEngine engine = createEngine(store, createTempDir())) {
4545+
final IndexSettings indexSettings = engine.config().getIndexSettings();
4546+
final IndexMetaData indexMetaData = IndexMetaData.builder(indexSettings.getIndexMetaData())
4547+
.settings(Settings.builder().put(indexSettings.getSettings())
4548+
.put(IndexSettings.INDEX_GC_DELETES_SETTING.getKey(), TimeValue.timeValueMillis(1))).build();
4549+
engine.engineConfig.getIndexSettings().updateIndexMetaData(indexMetaData);
4550+
engine.onSettingsChanged();
4551+
ParsedDocument document = testParsedDocument(Integer.toString(0), null, testDocumentWithTextField(), SOURCE, null);
4552+
final Engine.Index doc = new Engine.Index(newUid(document), document, SequenceNumbers.UNASSIGNED_SEQ_NO, 0,
4553+
Versions.MATCH_ANY, VersionType.INTERNAL, Engine.Operation.Origin.PRIMARY, System.nanoTime(), 0, false);
4554+
// first index an append only document and then delete it. such that we have it in the tombstones
4555+
engine.index(doc);
4556+
engine.delete(new Engine.Delete(doc.type(), doc.id(), doc.uid()));
4557+
4558+
// now index more append only docs and refresh so we re-enabel the optimization for unsafe version map
4559+
ParsedDocument document1 = testParsedDocument(Integer.toString(1), null, testDocumentWithTextField(), SOURCE, null);
4560+
engine.index(new Engine.Index(newUid(document1), document1, SequenceNumbers.UNASSIGNED_SEQ_NO, 0,
4561+
Versions.MATCH_ANY, VersionType.INTERNAL, Engine.Operation.Origin.PRIMARY, System.nanoTime(), 0, false));
4562+
engine.refresh("test");
4563+
ParsedDocument document2 = testParsedDocument(Integer.toString(2), null, testDocumentWithTextField(), SOURCE, null);
4564+
engine.index(new Engine.Index(newUid(document2), document2, SequenceNumbers.UNASSIGNED_SEQ_NO, 0,
4565+
Versions.MATCH_ANY, VersionType.INTERNAL, Engine.Operation.Origin.PRIMARY, System.nanoTime(), 0, false));
4566+
engine.refresh("test");
4567+
ParsedDocument document3 = testParsedDocument(Integer.toString(3), null, testDocumentWithTextField(), SOURCE, null);
4568+
final Engine.Index doc3 = new Engine.Index(newUid(document3), document3, SequenceNumbers.UNASSIGNED_SEQ_NO, 0,
4569+
Versions.MATCH_ANY, VersionType.INTERNAL, Engine.Operation.Origin.PRIMARY, System.nanoTime(), 0, false);
4570+
engine.index(doc3);
4571+
engine.engineConfig.setEnableGcDeletes(true);
4572+
// once we are here the version map is unsafe again and we need to do a refresh inside the get calls to ensure we
4573+
// de-optimize. We also enabled GCDeletes which now causes pruning tombstones inside that refresh that is done internally
4574+
// to ensure we de-optimize. One get call will purne and the other will try to lock the version map concurrently while
4575+
// holding the lock that pruneTombstones needs and we have a deadlock
4576+
CountDownLatch awaitStarted = new CountDownLatch(1);
4577+
Thread thread = new Thread(() -> {
4578+
awaitStarted.countDown();
4579+
try (Engine.GetResult getResult = engine.get(new Engine.Get(true, doc3.type(), doc3.id(), doc3.uid()),
4580+
engine::acquireSearcher)) {
4581+
assertTrue(getResult.exists());
4582+
}
4583+
});
4584+
thread.start();
4585+
awaitStarted.await();
4586+
try (Engine.GetResult getResult = engine.get(new Engine.Get(true, doc.type(), doc.id(), doc.uid()),
4587+
engine::acquireSearcher)) {
4588+
assertFalse(getResult.exists());
4589+
}
4590+
thread.join();
4591+
}
4592+
}
4593+
}
45384594
}

server/src/test/java/org/elasticsearch/index/engine/LiveVersionMapTests.java

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -348,4 +348,27 @@ public void testAddAndDeleteRefreshConcurrently() throws IOException, Interrupte
348348
}
349349
}
350350
}
351+
352+
public void testPruneTombstonesWhileLocked() throws InterruptedException, IOException {
353+
LiveVersionMap map = new LiveVersionMap();
354+
BytesRef uid = uid("1");
355+
;
356+
try (Releasable ignore = map.acquireLock(uid)) {
357+
map.putUnderLock(uid, new DeleteVersionValue(0, 0, 0, 0));
358+
map.beforeRefresh(); // refresh otherwise we won't prune since it's tracked by the current map
359+
map.afterRefresh(false);
360+
Thread thread = new Thread(() -> {
361+
map.pruneTombstones(Long.MAX_VALUE, 0);
362+
});
363+
thread.start();
364+
thread.join();
365+
assertEquals(1, map.getAllTombstones().size());
366+
}
367+
Thread thread = new Thread(() -> {
368+
map.pruneTombstones(Long.MAX_VALUE, 0);
369+
});
370+
thread.start();
371+
thread.join();
372+
assertEquals(0, map.getAllTombstones().size());
373+
}
351374
}

0 commit comments

Comments
 (0)