Skip to content

Reduce Overhead of RepositoryData Cache for Large Repositories #66587

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -662,10 +662,10 @@ public void onFailure(Exception e) {
* @param rootBlobs Blobs at the repository root
* @return RepositoryData
*/
private RepositoryData safeRepositoryData(long repositoryStateId, Map<String, BlobMetadata> rootBlobs) throws IOException {
private RepositoryData safeRepositoryData(long repositoryStateId, Map<String, BlobMetadata> rootBlobs) {
final long generation = latestGeneration(rootBlobs.keySet());
final long genToLoad;
final Tuple<Long, BytesReference> cached;
final CachedRepositoryData cached;
if (bestEffortConsistency) {
genToLoad = latestKnownRepoGen.updateAndGet(known -> Math.max(known, repositoryStateId));
cached = null;
Expand All @@ -684,8 +684,8 @@ private RepositoryData safeRepositoryData(long repositoryStateId, Map<String, Bl
throw new RepositoryException(metadata.name(), "concurrent modification of the index-N file, expected current generation [" +
repositoryStateId + "], actual current generation [" + genToLoad + "]");
}
if (cached != null && cached.v1() == genToLoad) {
return repositoryDataFromCachedEntry(cached);
if (cached != null && cached.generation() == genToLoad && cached.hasData()) {
return cached.repositoryData();
}
return getRepositoryData(genToLoad);
}
Expand Down Expand Up @@ -1272,7 +1272,6 @@ public String startVerification() {
String seed = UUIDs.randomBase64UUID();
byte[] testBytes = Strings.toUTF8Bytes(seed);
BlobContainer testContainer = blobStore().blobContainer(basePath().add(testBlobPrefix(seed)));
BytesArray bytes = new BytesArray(testBytes);
testContainer.writeBlobAtomic("master.dat", new BytesArray(testBytes), true);
return seed;
}
Expand All @@ -1298,20 +1297,62 @@ public void endVerification(String seed) {
private final AtomicLong latestKnownRepoGen = new AtomicLong(RepositoryData.UNKNOWN_REPO_GEN);

// Best effort cache of the latest known repository data and its generation, cached serialized as compressed json
private final AtomicReference<Tuple<Long, BytesReference>> latestKnownRepositoryData = new AtomicReference<>();
private final AtomicReference<CachedRepositoryData> latestKnownRepositoryData =
new AtomicReference<>(new CachedRepositoryData(RepositoryData.EMPTY_REPO_GEN, null));

/**
* Cached serialized repository data or placeholder to keep track of the fact that data for a generation was too large to be cached.
*/
private static final class CachedRepositoryData {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I figured I'd encapsulate this a little (even though it adds a bunch of lines) because the Tuple access is quite hard to read and it's used in a bunch of spots.


private final long generation;

@Nullable
private final BytesReference repositoryData;

CachedRepositoryData(long generation, @Nullable BytesReference repositoryData) {
this.generation = generation;
this.repositoryData = repositoryData;
}

long generation() {
return generation;
}

boolean hasData() {
return generation == RepositoryData.EMPTY_REPO_GEN || repositoryData != null;
}

@Nullable
RepositoryData repositoryData() {
if (generation == RepositoryData.EMPTY_REPO_GEN) {
return RepositoryData.EMPTY;
}
if (repositoryData == null) {
return null;
}
try (InputStream input = CompressorFactory.COMPRESSOR.threadLocalInputStream(repositoryData.streamInput())) {
return RepositoryData.snapshotsFromXContent(
XContentType.JSON.xContent().createParser(NamedXContentRegistry.EMPTY, LoggingDeprecationHandler.INSTANCE, input),
generation, false);
} catch (IOException e) {
throw new AssertionError("no actual IO happens here", e);
}
}
}

@Override
public void getRepositoryData(ActionListener<RepositoryData> listener) {
if (latestKnownRepoGen.get() == RepositoryData.CORRUPTED_REPO_GEN) {
listener.onFailure(corruptedStateException(null));
return;
}
final Tuple<Long, BytesReference> cached = latestKnownRepositoryData.get();
final CachedRepositoryData cached = latestKnownRepositoryData.get();
// Fast path loading repository data directly from cache if we're in fully consistent mode and the cache matches up with
// the latest known repository generation
if (bestEffortConsistency == false && cached != null && cached.v1() == latestKnownRepoGen.get()) {
if (bestEffortConsistency == false && cached.generation() == latestKnownRepoGen.get() && cached.hasData()) {
try {
listener.onResponse(repositoryDataFromCachedEntry(cached));
listener.onResponse(cached.repositoryData());
} catch (Exception e) {
listener.onFailure(e);
}
Expand Down Expand Up @@ -1341,26 +1382,28 @@ private void doGetRepositoryData(ActionListener<RepositoryData> listener) {
}
genToLoad = latestKnownRepoGen.updateAndGet(known -> Math.max(known, generation));
if (genToLoad > generation) {
logger.info("Determined repository generation [" + generation
+ "] from repository contents but correct generation must be at least [" + genToLoad + "]");
logger.info("Determined repository generation [{}] from repository contents but correct generation must be at " +
"least [{}]", generation, genToLoad);
}
} else {
// We only rely on the generation tracked in #latestKnownRepoGen which is exclusively updated from the cluster state
genToLoad = latestKnownRepoGen.get();
}
try {
final Tuple<Long, BytesReference> cached = latestKnownRepositoryData.get();
final CachedRepositoryData cached = latestKnownRepositoryData.get();
final RepositoryData loaded;
// Caching is not used with #bestEffortConsistency see docs on #cacheRepositoryData for details
if (bestEffortConsistency == false && cached != null && cached.v1() == genToLoad) {
loaded = repositoryDataFromCachedEntry(cached);
if (bestEffortConsistency == false && cached.generation() == genToLoad && cached.hasData()) {
loaded = cached.repositoryData();
} else {
loaded = getRepositoryData(genToLoad);
// We can cache serialized in the most recent version here without regard to the actual repository metadata version
// since we're only caching the information that we just wrote and thus won't accidentally cache any information that
// isn't safe
cacheRepositoryData(compressRepoDataForCache(BytesReference.bytes(
loaded.snapshotsToXContent(XContentFactory.jsonBuilder(), Version.CURRENT))), genToLoad);
if (cached == null || cached.generation() < genToLoad) {
// We can cache serialized in the most recent version here without regard to the actual repository metadata version
// since we're only caching the information that we just wrote and thus won't accidentally cache any information
// that isn't safe
cacheRepositoryData(compressRepoDataForCache(BytesReference.bytes(
loaded.snapshotsToXContent(XContentFactory.jsonBuilder(), Version.CURRENT))), genToLoad);
}
}
listener.onResponse(loaded);
return;
Expand Down Expand Up @@ -1400,11 +1443,12 @@ private void doGetRepositoryData(ActionListener<RepositoryData> listener) {
* @param generation repository generation of the given repository data
*/
private void cacheRepositoryData(@Nullable BytesReference serialized, long generation) {
assert generation >= 0 : "No need to cache abstract generations but attempted to cache [" + generation + "]";
latestKnownRepositoryData.updateAndGet(known -> {
if (known != null && known.v1() > generation) {
if (known.generation() > generation) {
return known;
}
return serialized == null ? null : new Tuple<>(generation, serialized);
return new CachedRepositoryData(generation, serialized);
});
}

Expand Down Expand Up @@ -1440,14 +1484,6 @@ private BytesReference compressRepoDataForCache(BytesReference uncompressed) {
}
}

private RepositoryData repositoryDataFromCachedEntry(Tuple<Long, BytesReference> cacheEntry) throws IOException {
try (InputStream input = CompressorFactory.COMPRESSOR.threadLocalInputStream(cacheEntry.v2().streamInput())) {
return RepositoryData.snapshotsFromXContent(
XContentType.JSON.xContent().createParser(NamedXContentRegistry.EMPTY,
LoggingDeprecationHandler.INSTANCE, input), cacheEntry.v1(), false);
}
}

private RepositoryException corruptedStateException(@Nullable Exception cause) {
return new RepositoryException(metadata.name(),
"Could not read repository data because the contents of the repository do not match its " +
Expand Down