Skip to content

Commit c04b32e

Browse files
Fix blob cache races/assertion errors (#96458)
In racy evict cases, assertions in blob cache did not hold, adapted test and added fixes. Relates #96399
1 parent 8363e8c commit c04b32e

File tree

3 files changed

+57
-24
lines changed

3 files changed

+57
-24
lines changed

docs/changelog/96458.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 96458
2+
summary: Fix blob cache races/assertion errors
3+
area: Snapshot/Restore
4+
type: bug
5+
issues: []

x-pack/plugin/blob-cache/src/main/java/org/elasticsearch/blobcache/shared/SharedBlobCacheService.java

Lines changed: 29 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -397,25 +397,15 @@ public CacheFileRegion get(KeyType cacheKey, long fileLength, int region) {
397397
final Integer freeSlot = freeRegions.poll();
398398
if (freeSlot != null) {
399399
// no need to evict an item, just add
400-
assert regionOwners[freeSlot].compareAndSet(null, entry.chunk);
401-
synchronized (this) {
402-
pushEntryToBack(entry);
403-
// assign sharedBytesPos only when chunk is ready for use. Under lock to avoid concurrent tryEvict.
404-
entry.chunk.sharedBytesPos = freeSlot;
405-
}
400+
assignToSlot(entry, freeSlot);
406401
} else {
407402
// need to evict something
408403
synchronized (this) {
409404
maybeEvict();
410405
}
411406
final Integer freeSlotRetry = freeRegions.poll();
412407
if (freeSlotRetry != null) {
413-
assert regionOwners[freeSlotRetry].compareAndSet(null, entry.chunk);
414-
synchronized (this) {
415-
pushEntryToBack(entry);
416-
// assign sharedBytesPos only when chunk is ready for use. Under lock to avoid concurrent tryEvict.
417-
entry.chunk.sharedBytesPos = freeSlotRetry;
418-
}
408+
assignToSlot(entry, freeSlotRetry);
419409
} else {
420410
boolean removed = keyMapping.remove(regionKey, entry);
421411
assert removed;
@@ -431,7 +421,7 @@ public CacheFileRegion get(KeyType cacheKey, long fileLength, int region) {
431421

432422
// existing item, check if we need to promote item
433423
synchronized (this) {
434-
if (now - entry.lastAccessed >= minTimeDelta && entry.freq + 1 < maxFreq) {
424+
if (now - entry.lastAccessed >= minTimeDelta && entry.freq + 1 < maxFreq && entry.chunk.isEvicted() == false) {
435425
unlink(entry);
436426
entry.freq++;
437427
entry.lastAccessed = now;
@@ -442,6 +432,21 @@ public CacheFileRegion get(KeyType cacheKey, long fileLength, int region) {
442432
return entry.chunk;
443433
}
444434

435+
private void assignToSlot(Entry<CacheFileRegion> entry, int freeSlot) {
436+
assert regionOwners[freeSlot].compareAndSet(null, entry.chunk);
437+
synchronized (this) {
438+
if (entry.chunk.isEvicted()) {
439+
assert regionOwners[freeSlot].compareAndSet(entry.chunk, null);
440+
freeRegions.add(freeSlot);
441+
keyMapping.remove(entry.chunk.regionKey, entry);
442+
throw new AlreadyClosedException("evicted during free region allocation");
443+
}
444+
pushEntryToBack(entry);
445+
// assign sharedBytesPos only when chunk is ready for use. Under lock to avoid concurrent tryEvict.
446+
entry.chunk.sharedBytesPos = freeSlot;
447+
}
448+
}
449+
445450
private void assertChunkActiveOrEvicted(Entry<CacheFileRegion> entry) {
446451
if (Assertions.ENABLED) {
447452
synchronized (this) {
@@ -454,8 +459,11 @@ private void assertChunkActiveOrEvicted(Entry<CacheFileRegion> entry) {
454459
}
455460

456461
public void onClose(CacheFileRegion chunk) {
457-
assert regionOwners[chunk.sharedBytesPos].compareAndSet(chunk, null);
458-
freeRegions.add(chunk.sharedBytesPos);
462+
// we held the "this" lock when this was evicted, hence if sharedBytesPos is not filled in, chunk will never be registered.
463+
if (chunk.sharedBytesPos != -1) {
464+
assert regionOwners[chunk.sharedBytesPos].compareAndSet(chunk, null);
465+
freeRegions.add(chunk.sharedBytesPos);
466+
}
459467
}
460468

461469
// used by tests
@@ -510,7 +518,7 @@ private void maybeEvict() {
510518
for (int i = 0; i < maxFreq; i++) {
511519
for (Entry<CacheFileRegion> entry = freqs[i]; entry != null; entry = entry.next) {
512520
boolean evicted = entry.chunk.tryEvict();
513-
if (evicted) {
521+
if (evicted && entry.chunk.sharedBytesPos != -1) {
514522
unlink(entry);
515523
keyMapping.remove(entry.chunk.regionKey, entry);
516524
return;
@@ -603,7 +611,7 @@ public void forceEvict(Predicate<KeyType> cacheKeyPredicate) {
603611
synchronized (this) {
604612
for (Entry<CacheFileRegion> entry : matchingEntries) {
605613
boolean evicted = entry.chunk.forceEvict();
606-
if (evicted) {
614+
if (evicted && entry.chunk.sharedBytesPos != -1) {
607615
unlink(entry);
608616
keyMapping.remove(entry.chunk.regionKey, entry);
609617
}
@@ -693,7 +701,9 @@ public long physicalEndOffset() {
693701
private final AtomicBoolean evicted = new AtomicBoolean(false);
694702

695703
// tries to evict this chunk if noone is holding onto its resources anymore
696-
public boolean tryEvict() {
704+
// visible for tests.
705+
boolean tryEvict() {
706+
assert Thread.holdsLock(SharedBlobCacheService.this) : "must hold lock when evicting";
697707
if (refCount() <= 1 && evicted.compareAndSet(false, true)) {
698708
logger.trace("evicted {} with channel offset {}", regionKey, physicalStartOffset());
699709
evictCount.increment();
@@ -704,6 +714,7 @@ public boolean tryEvict() {
704714
}
705715

706716
public boolean forceEvict() {
717+
assert Thread.holdsLock(SharedBlobCacheService.this) : "must hold lock when evicting";
707718
if (evicted.compareAndSet(false, true)) {
708719
logger.trace("force evicted {} with channel offset {}", regionKey, physicalStartOffset());
709720
evictCount.increment();

x-pack/plugin/blob-cache/src/test/java/org/elasticsearch/blobcache/shared/SharedBlobCacheServiceTests.java

Lines changed: 23 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -73,9 +73,13 @@ public void testBasicEviction() throws IOException {
7373
assertEquals(size(50), region2.tracker.getLength());
7474
assertEquals(2, cacheService.freeRegionCount());
7575

76-
assertTrue(region1.tryEvict());
76+
synchronized (cacheService) {
77+
assertTrue(region1.tryEvict());
78+
}
7779
assertEquals(3, cacheService.freeRegionCount());
78-
assertFalse(region1.tryEvict());
80+
synchronized (cacheService) {
81+
assertFalse(region1.tryEvict());
82+
}
7983
assertEquals(3, cacheService.freeRegionCount());
8084
final var bytesReadFuture = new PlainActionFuture<Integer>();
8185
region0.populateAndRead(
@@ -86,13 +90,19 @@ public void testBasicEviction() throws IOException {
8690
taskQueue.getThreadPool().executor(ThreadPool.Names.GENERIC),
8791
bytesReadFuture
8892
);
89-
assertFalse(region0.tryEvict());
93+
synchronized (cacheService) {
94+
assertFalse(region0.tryEvict());
95+
}
9096
assertEquals(3, cacheService.freeRegionCount());
9197
assertFalse(bytesReadFuture.isDone());
9298
taskQueue.runAllRunnableTasks();
93-
assertTrue(region0.tryEvict());
99+
synchronized (cacheService) {
100+
assertTrue(region0.tryEvict());
101+
}
94102
assertEquals(4, cacheService.freeRegionCount());
95-
assertTrue(region2.tryEvict());
103+
synchronized (cacheService) {
104+
assertTrue(region2.tryEvict());
105+
}
96106
assertEquals(5, cacheService.freeRegionCount());
97107
assertTrue(bytesReadFuture.isDone());
98108
assertEquals(Integer.valueOf(1), bytesReadFuture.actionGet());
@@ -130,7 +140,9 @@ public void testAutoEviction() throws IOException {
130140
assertFalse(region1.isEvicted());
131141

132142
// explicitly evict region 1
133-
assertTrue(region1.tryEvict());
143+
synchronized (cacheService) {
144+
assertTrue(region1.tryEvict());
145+
}
134146
assertEquals(1, cacheService.freeRegionCount());
135147
}
136148
}
@@ -230,6 +242,7 @@ public void testGetMultiThreaded() throws IOException {
230242
ByteSizeValue.ofBytes(size(between(1, 20) * 100L)).getStringRep()
231243
)
232244
.put(SharedBlobCacheService.SHARED_CACHE_REGION_SIZE_SETTING.getKey(), ByteSizeValue.ofBytes(size(100)).getStringRep())
245+
.put(SharedBlobCacheService.SHARED_CACHE_MIN_TIME_DELTA_SETTING.getKey(), randomFrom("0", "1ms", "10s"))
233246
.put("path.home", createTempDir())
234247
.build();
235248
long fileLength = size(500);
@@ -245,6 +258,7 @@ public void testGetMultiThreaded() throws IOException {
245258
String[] cacheKeys = IntStream.range(0, iterations).mapToObj(ignore -> randomFrom(files)).toArray(String[]::new);
246259
int[] regions = IntStream.range(0, iterations).map(ignore -> between(0, 4)).toArray();
247260
int[] yield = IntStream.range(0, iterations).map(ignore -> between(0, 9)).toArray();
261+
int[] evict = IntStream.range(0, iterations).map(ignore -> between(0, 99)).toArray();
248262
return new Thread(() -> {
249263
try {
250264
ready.await();
@@ -261,6 +275,9 @@ public void testGetMultiThreaded() throws IOException {
261275
}
262276
cacheFileRegion.decRef();
263277
}
278+
if (evict[i] == 0) {
279+
cacheService.forceEvict(x -> true);
280+
}
264281
} catch (AlreadyClosedException e) {
265282
// ignore
266283
}

0 commit comments

Comments
 (0)