Skip to content

Add Caching for RepositoryData in BlobStoreRepository #52341

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,8 @@ protected Settings repositorySettings() {
.put(super.repositorySettings())
.put(S3Repository.BUCKET_SETTING.getKey(), "bucket")
.put(S3Repository.CLIENT_NAME.getKey(), "test")
// Don't cache repository data because some tests manually modify the repository data
.put(BlobStoreRepository.CACHE_REPOSITORY_DATA.getKey(), false)
.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,11 @@
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.compress.CompressorFactory;
import org.elasticsearch.common.compress.NotXContentException;
import org.elasticsearch.common.io.Streams;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.common.lucene.store.InputStreamIndexInput;
import org.elasticsearch.common.metrics.CounterMetric;
Expand All @@ -77,6 +79,7 @@
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.xcontent.LoggingDeprecationHandler;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentType;
Expand Down Expand Up @@ -129,6 +132,7 @@
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import java.util.stream.LongStream;
Expand Down Expand Up @@ -205,8 +209,16 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
public static final Setting<Boolean> ALLOW_CONCURRENT_MODIFICATION =
Setting.boolSetting("allow_concurrent_modifications", false, Setting.Property.Deprecated);

/**
* Setting to disable caching of the latest repository data.
*/
public static final Setting<Boolean> CACHE_REPOSITORY_DATA =
Setting.boolSetting("cache_repository_data", true, Setting.Property.Deprecated);

private final boolean compress;

private final boolean cacheRepositoryData;

private final RateLimiter snapshotRateLimiter;

private final RateLimiter restoreRateLimiter;
Expand Down Expand Up @@ -282,6 +294,7 @@ protected BlobStoreRepository(
snapshotRateLimiter = getRateLimiter(metadata.settings(), "max_snapshot_bytes_per_sec", new ByteSizeValue(40, ByteSizeUnit.MB));
restoreRateLimiter = getRateLimiter(metadata.settings(), "max_restore_bytes_per_sec", new ByteSizeValue(40, ByteSizeUnit.MB));
readOnly = metadata.settings().getAsBoolean("readonly", false);
cacheRepositoryData = CACHE_REPOSITORY_DATA.get(metadata.settings());
this.basePath = basePath;

indexShardSnapshotFormat = new ChecksumBlobStoreFormat<>(SNAPSHOT_CODEC, SNAPSHOT_NAME_FORMAT,
Expand Down Expand Up @@ -510,13 +523,16 @@ public void deleteSnapshot(SnapshotId snapshotId, long repositoryStateId, Versio
* @param rootBlobs Blobs at the repository root
* @return RepositoryData
*/
private RepositoryData safeRepositoryData(long repositoryStateId, Map<String, BlobMetaData> rootBlobs) {
private RepositoryData safeRepositoryData(long repositoryStateId, Map<String, BlobMetaData> rootBlobs) throws IOException {
final long generation = latestGeneration(rootBlobs.keySet());
final long genToLoad;
final Tuple<Long, BytesReference> cached;
if (bestEffortConsistency) {
genToLoad = latestKnownRepoGen.updateAndGet(known -> Math.max(known, repositoryStateId));
cached = null;
} else {
genToLoad = latestKnownRepoGen.get();
cached = latestKnownRepositoryData.get();
}
if (genToLoad > generation) {
// It's always a possibility to not see the latest index-N in the listing here on an eventually consistent blob store, just
Expand All @@ -529,6 +545,9 @@ private RepositoryData safeRepositoryData(long repositoryStateId, Map<String, Bl
throw new RepositoryException(metadata.name(), "concurrent modification of the index-N file, expected current generation [" +
repositoryStateId + "], actual current generation [" + genToLoad + "]");
}
if (cached != null && cached.v1() == genToLoad) {
return repositoryDataFromCachedEntry(cached);
}
return getRepositoryData(genToLoad);
}

Expand Down Expand Up @@ -1057,6 +1076,9 @@ public void endVerification(String seed) {
// and concurrent modifications.
private final AtomicLong latestKnownRepoGen = new AtomicLong(RepositoryData.UNKNOWN_REPO_GEN);

// Best effort cache of the latest known repository data and its generation, cached serialized as compressed json
private final AtomicReference<Tuple<Long, BytesReference>> latestKnownRepositoryData = new AtomicReference<>();

@Override
public void getRepositoryData(ActionListener<RepositoryData> listener) {
if (latestKnownRepoGen.get() == RepositoryData.CORRUPTED_REPO_GEN) {
Expand Down Expand Up @@ -1090,7 +1112,16 @@ public void getRepositoryData(ActionListener<RepositoryData> listener) {
genToLoad = latestKnownRepoGen.get();
}
try {
listener.onResponse(getRepositoryData(genToLoad));
final Tuple<Long, BytesReference> cached = latestKnownRepositoryData.get();
final RepositoryData loaded;
// Caching is not used with #bestEffortConsistency see docs on #cacheRepositoryData for details
if (bestEffortConsistency == false && cached != null && cached.v1() == genToLoad) {
loaded = repositoryDataFromCachedEntry(cached);
} else {
loaded = getRepositoryData(genToLoad);
cacheRepositoryData(loaded);
}
listener.onResponse(loaded);
return;
} catch (RepositoryException e) {
// If the generation to load changed concurrently and we didn't just try loading the same generation before we retry
Expand All @@ -1116,6 +1147,59 @@ public void getRepositoryData(ActionListener<RepositoryData> listener) {
}
}

/**
* Puts the given {@link RepositoryData} into the cache if it is of a newer generation and only if the repository is not using
* {@link #bestEffortConsistency}. When using {@link #bestEffortConsistency} the repository is using listing to find the latest
* {@code index-N} blob and there are no hard guarantees that a given repository generation won't be reused since an external
* modification can lead to moving from a higher {@code N} to a lower {@code N} value which mean we can't safely assume that a given
* generation will always contain the same {@link RepositoryData}.
*
* @param updated RepositoryData to cache if newer than the cache contents
*/
private void cacheRepositoryData(RepositoryData updated) {
if (cacheRepositoryData && bestEffortConsistency == false) {
final BytesReference serialized;
BytesStreamOutput out = new BytesStreamOutput();
try {
try (StreamOutput tmp = CompressorFactory.COMPRESSOR.streamOutput(out);
XContentBuilder builder = XContentFactory.jsonBuilder(tmp)) {
updated.snapshotsToXContent(builder, true);
}
serialized = out.bytes();
final int len = serialized.length();
if (len > ByteSizeUnit.KB.toBytes(500)) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For context:

on heap 1k shards in a snapshot means about 25kb of shard generations which themselves make up ~50% of the size of RepositoryData on heap with the interning changes I added.
The number of snapshots doesn't matter much by comparison. So in the Cloud case of 100 snapshots, this would allow for caching snapshots of up to ~ 15k shards which should work fine for most users I'm assuming.

logger.debug("Not caching repository data of size [{}] for repository [{}] because it is larger than 500KB in" +
" serialized size", len, metadata.name());
if (len > ByteSizeUnit.MB.toBytes(5)) {
logger.warn("Your repository metadata blob for repository [{}] is larger than 5MB. Consider moving to a fresh" +
" repository for new snapshots or deleting unneeded snapshots from your repository to ensure stable" +
" repository behavior going forward.", metadata.name());
}
// Set empty repository data to not waste heap for an outdated cached value
latestKnownRepositoryData.set(null);
return;
}
} catch (IOException e) {
assert false : new AssertionError("Impossible, no IO happens here", e);
logger.warn("Failed to serialize repository data", e);
return;
}
latestKnownRepositoryData.updateAndGet(known -> {
if (known != null && known.v1() > updated.getGenId()) {
return known;
}
return new Tuple<>(updated.getGenId(), serialized);
});
}
}

private RepositoryData repositoryDataFromCachedEntry(Tuple<Long, BytesReference> cacheEntry) throws IOException {
return RepositoryData.snapshotsFromXContent(
XContentType.JSON.xContent().createParser(NamedXContentRegistry.EMPTY,
LoggingDeprecationHandler.INSTANCE,
CompressorFactory.COMPRESSOR.streamInput(cacheEntry.v2().streamInput())), cacheEntry.v1());
}

private RepositoryException corruptedStateException(@Nullable Exception cause) {
return new RepositoryException(metadata.name(),
"Could not read repository data because the contents of the repository do not match its " +
Expand Down Expand Up @@ -1362,6 +1446,7 @@ public void onFailure(String source, Exception e) {

@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
cacheRepositoryData(filteredRepositoryData.withGenId(newGen));
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a little dirty and we talked about it in other PRs .. the generation here is -1 due to the weird way the repository data is loaded initially and we have to eventually set it. I'll clean that up in a follow up, for now just setting it here where it matters (it doesn't matter in the above code when serializing filteredRepositoryData to the repo) seemed the driest.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe add a TODO then?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure what to even put in it though. In the end, this is simply the way RepositoryData works for now. We have the same kind of code for the cluster state version as well I guess. I don't have a good idea for a better abstract yet :(

threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(ActionRunnable.run(listener, () -> {
// Delete all now outdated index files up to 1000 blobs back from the new generation.
// If there are more than 1000 dangling index-N cleanup functionality on repo delete will take care of them.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@ public void testConcurrentlyChangeRepositoryContents() throws Exception {
.setType("fs").setSettings(Settings.builder()
.put("location", repo)
.put("compress", false)
// Don't cache repository data because the test manually modifies the repository data
.put(BlobStoreRepository.CACHE_REPOSITORY_DATA.getKey(), false)
.put("chunk_size", randomIntBetween(100, 1000), ByteSizeUnit.BYTES)));

createIndex("test-idx-1", "test-idx-2");
Expand Down Expand Up @@ -250,6 +252,8 @@ public void testHandlingMissingRootLevelSnapshotMetadata() throws Exception {
.setType("fs").setSettings(Settings.builder()
.put("location", repo)
.put("compress", false)
// Don't cache repository data because the test manually modifies the repository data
.put(BlobStoreRepository.CACHE_REPOSITORY_DATA.getKey(), false)
.put("chunk_size", randomIntBetween(100, 1000), ByteSizeUnit.BYTES)));

final String snapshotPrefix = "test-snap-";
Expand Down Expand Up @@ -315,6 +319,8 @@ public void testMountCorruptedRepositoryData() throws Exception {
assertAcked(client.admin().cluster().preparePutRepository(repoName)
.setType("fs").setSettings(Settings.builder()
.put("location", repo)
// Don't cache repository data because the test manually modifies the repository data
.put(BlobStoreRepository.CACHE_REPOSITORY_DATA.getKey(), false)
.put("compress", false)));

final String snapshot = "test-snap";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
*/
public class MockSecureSettings implements SecureSettings {

private Map<String, SecureString> secureStrings = new HashMap<>();
private Map<String, String> secureStrings = new HashMap<>();
Copy link
Contributor Author

@original-brownbear original-brownbear Feb 14, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just fixing a bug here that prevented node restarts in tests for nodes that used secure settings. If we always return the same SecureString for getString then if a node ever closes that SecureString it couldn't get the setting again.

private Map<String, byte[]> files = new HashMap<>();
private Map<String, byte[]> sha256Digests = new HashMap<>();
private Set<String> settingNames = new HashSet<>();
Expand Down Expand Up @@ -65,7 +65,11 @@ public Set<String> getSettingNames() {
@Override
public SecureString getString(String setting) {
ensureOpen();
return secureStrings.get(setting);
final String s = secureStrings.get(setting);
if (s == null) {
return null;
}
return new SecureString(s.toCharArray());
}

@Override
Expand All @@ -81,7 +85,7 @@ public byte[] getSHA256Digest(String setting) {

public void setString(String setting, String value) {
ensureOpen();
secureStrings.put(setting, new SecureString(value.toCharArray()));
secureStrings.put(setting, value);
sha256Digests.put(setting, MessageDigests.sha256().digest(value.getBytes(StandardCharsets.UTF_8)));
settingNames.add(setting);
}
Expand Down