Skip to content

Reduce overhead in blob cache service get #96399

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/96399.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 96399
summary: Reduce overhead in blob cache service get
area: Snapshot/Restore
type: enhancement
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import org.elasticsearch.common.unit.RelativeByteSizeValue;
import org.elasticsearch.common.util.concurrent.AbstractAsyncTask;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.KeyedLock;
import org.elasticsearch.core.AbstractRefCounted;
import org.elasticsearch.core.Assertions;
import org.elasticsearch.core.Releasable;
Expand Down Expand Up @@ -242,8 +241,6 @@ public void validate(ByteSizeValue value, Map<Setting<?>, Object> settings, bool
private final ConcurrentHashMap<RegionKey<KeyType>, Entry<CacheFileRegion>> keyMapping;
private final ThreadPool threadPool;

private final KeyedLock<KeyType> keyedLock = new KeyedLock<>();

private final SharedBytes sharedBytes;
private final long cacheSize;
private final long regionSize;
Expand Down Expand Up @@ -380,57 +377,76 @@ private long getRegionSize(long fileLength, int region) {

public CacheFileRegion get(KeyType cacheKey, long fileLength, int region) {
final long effectiveRegionSize = getRegionSize(fileLength, region);
try (Releasable ignore = keyedLock.acquire(cacheKey)) {
final RegionKey<KeyType> regionKey = new RegionKey<>(cacheKey, region);
final long now = threadPool.relativeTimeInMillis();
final Entry<CacheFileRegion> entry = keyMapping.computeIfAbsent(
regionKey,
key -> new Entry<>(new CacheFileRegion(key, effectiveRegionSize), now)
);
if (entry.chunk.sharedBytesPos == -1) {
// new item
assert entry.freq == 0;
assert entry.prev == null;
assert entry.next == null;
final Integer freeSlot = freeRegions.poll();
if (freeSlot != null) {
// no need to evict an item, just add
entry.chunk.sharedBytesPos = freeSlot;
assert regionOwners[freeSlot].compareAndSet(null, entry.chunk);
synchronized (this) {
pushEntryToBack(entry);
}
} else {
// need to evict something
synchronized (this) {
maybeEvict();
final RegionKey<KeyType> regionKey = new RegionKey<>(cacheKey, region);
final long now = threadPool.relativeTimeInMillis();
final Entry<CacheFileRegion> entry = keyMapping.computeIfAbsent(
regionKey,
key -> new Entry<>(new CacheFileRegion(key, effectiveRegionSize), now)
);
// sharedBytesPos is volatile, double locking is fine, as long as we assign it last.
if (entry.chunk.sharedBytesPos == -1) {
synchronized (entry.chunk) {
if (entry.chunk.sharedBytesPos == -1) {
if (keyMapping.get(regionKey) != entry) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can also assign -2 to sharedBytesPos in this case instead. I think it does not really matter, we expect near no contention here.

throw new AlreadyClosedException("no free region found (contender)");
}
final Integer freeSlotRetry = freeRegions.poll();
if (freeSlotRetry != null) {
entry.chunk.sharedBytesPos = freeSlotRetry;
assert regionOwners[freeSlotRetry].compareAndSet(null, entry.chunk);
// new item
assert entry.freq == 0;
assert entry.prev == null;
assert entry.next == null;
final Integer freeSlot = freeRegions.poll();
if (freeSlot != null) {
// no need to evict an item, just add
assert regionOwners[freeSlot].compareAndSet(null, entry.chunk);
synchronized (this) {
pushEntryToBack(entry);
// assign sharedBytesPos only when chunk is ready for use. Under lock to avoid concurrent tryEvict.
entry.chunk.sharedBytesPos = freeSlot;
}
} else {
boolean removed = keyMapping.remove(regionKey, entry);
assert removed;
throw new AlreadyClosedException("no free region found");
}
}
} else {
// check if we need to promote item
synchronized (this) {
if (now - entry.lastAccessed >= minTimeDelta && entry.freq + 1 < maxFreq) {
unlink(entry);
entry.freq++;
entry.lastAccessed = now;
pushEntryToBack(entry);
// need to evict something
synchronized (this) {
maybeEvict();
}
final Integer freeSlotRetry = freeRegions.poll();
if (freeSlotRetry != null) {
assert regionOwners[freeSlotRetry].compareAndSet(null, entry.chunk);
synchronized (this) {
pushEntryToBack(entry);
// assign sharedBytesPos only when chunk is ready for use. Under lock to avoid concurrent tryEvict.
entry.chunk.sharedBytesPos = freeSlotRetry;
}
} else {
boolean removed = keyMapping.remove(regionKey, entry);
assert removed;
throw new AlreadyClosedException("no free region found");
}
}

return entry.chunk;
}
}
return entry.chunk;
}
if (Assertions.ENABLED) {
Copy link
Contributor

@original-brownbear original-brownbear May 30, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NIT: maybe move this kind of assertion to a separate method (that can just include the next assertion below) to save a little on method size here? (probably not too relevant but also makes the method easier to read IMO)

synchronized (this) {
// assert linked (or evicted)
assert entry.prev != null || entry.chunk.isEvicted();

}
}
assert regionOwners[entry.chunk.sharedBytesPos].get() == entry.chunk || entry.chunk.isEvicted();

// existing item, check if we need to promote item
synchronized (this) {
if (now - entry.lastAccessed >= minTimeDelta && entry.freq + 1 < maxFreq) {
unlink(entry);
entry.freq++;
entry.lastAccessed = now;
pushEntryToBack(entry);
}
}

return entry.chunk;
}

public void onClose(CacheFileRegion chunk) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

package org.elasticsearch.blobcache.shared;

import org.apache.lucene.store.AlreadyClosedException;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.blobcache.common.ByteRange;
import org.elasticsearch.cluster.node.DiscoveryNodeRole;
Expand All @@ -24,13 +25,17 @@
import org.elasticsearch.env.TestEnvironment;
import org.elasticsearch.node.NodeRoleSettings;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool;

import java.io.IOException;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import static org.elasticsearch.node.Node.NODE_NAME_SETTING;
import static org.hamcrest.Matchers.equalTo;
Expand Down Expand Up @@ -212,6 +217,74 @@ public void testDecay() throws IOException {
}
}

/**
* Exercise SharedBlobCacheService#get in multiple threads to trigger any assertion errors.
* @throws IOException
*/
public void testGetMultiThreaded() throws IOException {
int threads = between(2, 10);
Settings settings = Settings.builder()
.put(NODE_NAME_SETTING.getKey(), "node")
.put(
SharedBlobCacheService.SHARED_CACHE_SIZE_SETTING.getKey(),
ByteSizeValue.ofBytes(size(between(1, 20) * 100L)).getStringRep()
)
.put(SharedBlobCacheService.SHARED_CACHE_REGION_SIZE_SETTING.getKey(), ByteSizeValue.ofBytes(size(100)).getStringRep())
.put("path.home", createTempDir())
.build();
long fileLength = size(500);
ThreadPool threadPool = new TestThreadPool("testGetMultiThreaded");
Set<String> files = randomSet(1, 10, () -> randomAlphaOfLength(5));
try (
NodeEnvironment environment = new NodeEnvironment(settings, TestEnvironment.newEnvironment(settings));
var cacheService = new SharedBlobCacheService<String>(environment, settings, threadPool)
) {
CyclicBarrier ready = new CyclicBarrier(threads);
List<Thread> threadList = IntStream.range(0, threads).mapToObj(no -> {
int iterations = between(100, 500);
String[] cacheKeys = IntStream.range(0, iterations).mapToObj(ignore -> randomFrom(files)).toArray(String[]::new);
int[] regions = IntStream.range(0, iterations).map(ignore -> between(0, 4)).toArray();
int[] yield = IntStream.range(0, iterations).map(ignore -> between(0, 9)).toArray();
return new Thread(() -> {
try {
ready.await();
for (int i = 0; i < iterations; ++i) {
try {
SharedBlobCacheService<String>.CacheFileRegion cacheFileRegion = cacheService.get(
cacheKeys[i],
fileLength,
regions[i]
);
if (cacheFileRegion.tryIncRef()) {
if (yield[i] == 0) {
Thread.yield();
}
cacheFileRegion.decRef();
}
} catch (AlreadyClosedException e) {
// ignore
}
}
} catch (InterruptedException | BrokenBarrierException e) {
assert false;
throw new RuntimeException(e);
}
});
}).toList();
threadList.forEach(Thread::start);
threadList.forEach(thread -> {
try {
thread.join();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
});
} finally {
threadPool.shutdownNow();
}
}

public void testCacheSizeRejectedOnNonFrozenNodes() {
String cacheSize = randomBoolean()
? ByteSizeValue.ofBytes(size(500)).getStringRep()
Expand Down