Skip to content

Commit d456f78

Browse files
Deduplicate Index Metadata in BlobStore (#50278) (#59514)
This PR introduces two new fields in to `RepositoryData` (index-N) to track the blob name of `IndexMetaData` blobs and their content via setting generations and uuids. This is used to deduplicate the `IndexMetaData` blobs (`meta-{uuid}.dat` in the indices folders under `/indices` so that new metadata for an index is only written to the repository during a snapshot if that same metadata can't be found in another snapshot. This saves one write per index in the common case of unchanged metadata thus saving cost and making snapshot finalization drastically faster if many indices are being snapshotted at the same time. The implementation is mostly analogous to that for shard generations in #46250 and piggy backs on the BwC mechanism introduced in that PR (which means this PR needs adjustments if it doesn't go into `7.6`). Relates to #45736 as it improves the efficiency of snapshotting unchanged indices Relates to #49800 as it has the potential of loading the index metadata for multiple snapshots of the same index concurrently much more efficient speeding up future concurrent snapshot delete
1 parent 0d2ea1b commit d456f78

File tree

23 files changed

+644
-139
lines changed

23 files changed

+644
-139
lines changed

plugins/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobStoreRepositoryTests.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -160,8 +160,8 @@ public void testEnforcedCooldownPeriod() throws IOException {
160160
final RepositoryData repositoryData = getRepositoryData(repository);
161161
final RepositoryData modifiedRepositoryData = repositoryData.withVersions(Collections.singletonMap(fakeOldSnapshot,
162162
SnapshotsService.SHARD_GEN_IN_REPO_DATA_VERSION.minimumCompatibilityVersion()));
163-
final BytesReference serialized =
164-
BytesReference.bytes(modifiedRepositoryData.snapshotsToXContent(XContentFactory.jsonBuilder(), false));
163+
final BytesReference serialized = BytesReference.bytes(modifiedRepositoryData.snapshotsToXContent(XContentFactory.jsonBuilder(),
164+
SnapshotsService.OLD_SNAPSHOT_FORMAT));
165165
PlainActionFuture.get(f -> repository.threadPool().generic().execute(ActionRunnable.run(f, () -> {
166166
try (InputStream stream = serialized.streamInput()) {
167167
repository.blobStore().blobContainer(repository.basePath()).writeBlobAtomic(

qa/repository-multi-version/src/test/java/org/elasticsearch/upgrades/MultiVersionRepositoryAccessIT.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -218,7 +218,7 @@ public void testUpgradeMovesRepoToNewMetaVersion() throws IOException {
218218
ensureSnapshotRestoreWorks(repoName, "snapshot-2", shards);
219219
}
220220
} else {
221-
if (SnapshotsService.useShardGenerations(minimumNodeVersion()) == false) {
221+
if (SnapshotsService.useIndexGenerations(minimumNodeVersion()) == false) {
222222
assertThat(TEST_STEP, is(TestStep.STEP3_OLD_CLUSTER));
223223
final List<Class<? extends Exception>> expectedExceptions =
224224
Arrays.asList(ResponseException.class, ElasticsearchStatusException.class);

server/src/internalClusterTest/java/org/elasticsearch/snapshots/CorruptedBlobStoreRepositoryIT.java

+8-5
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import org.elasticsearch.common.unit.ByteSizeUnit;
3434
import org.elasticsearch.common.xcontent.XContentFactory;
3535
import org.elasticsearch.repositories.IndexId;
36+
import org.elasticsearch.repositories.IndexMetaDataGenerations;
3637
import org.elasticsearch.repositories.RepositoriesService;
3738
import org.elasticsearch.repositories.Repository;
3839
import org.elasticsearch.repositories.RepositoryData;
@@ -273,11 +274,12 @@ public void testHandlingMissingRootLevelSnapshotMetadata() throws Exception {
273274
SnapshotId::getUUID, Function.identity())),
274275
repositoryData.getSnapshotIds().stream().collect(Collectors.toMap(
275276
SnapshotId::getUUID, repositoryData::getSnapshotState)),
276-
Collections.emptyMap(), Collections.emptyMap(), ShardGenerations.EMPTY);
277+
Collections.emptyMap(), Collections.emptyMap(), ShardGenerations.EMPTY, IndexMetaDataGenerations.EMPTY);
277278

278279
Files.write(repo.resolve(BlobStoreRepository.INDEX_FILE_PREFIX + withoutVersions.getGenId()),
279-
BytesReference.toBytes(BytesReference.bytes(withoutVersions.snapshotsToXContent(XContentFactory.jsonBuilder(),
280-
true))), StandardOpenOption.TRUNCATE_EXISTING);
280+
BytesReference.toBytes(BytesReference.bytes(
281+
withoutVersions.snapshotsToXContent(XContentFactory.jsonBuilder(), Version.CURRENT))),
282+
StandardOpenOption.TRUNCATE_EXISTING);
281283

282284
logger.info("--> verify that repo is assumed in old metadata format");
283285
final SnapshotsService snapshotsService = internalCluster().getCurrentMasterNodeInstance(SnapshotsService.class);
@@ -403,11 +405,12 @@ public void testRepairBrokenShardGenerations() throws IOException {
403405
Collectors.toMap(SnapshotId::getUUID, repositoryData1::getVersion)),
404406
repositoryData1.getIndices().values().stream().collect(
405407
Collectors.toMap(Function.identity(), repositoryData1::getSnapshots)
406-
), ShardGenerations.builder().putAll(repositoryData1.shardGenerations()).put(indexId, 0, "0").build()
408+
), ShardGenerations.builder().putAll(repositoryData1.shardGenerations()).put(indexId, 0, "0").build(),
409+
repositoryData1.indexMetaDataGenerations()
407410
);
408411
Files.write(repoPath.resolve(BlobStoreRepository.INDEX_FILE_PREFIX + repositoryData1.getGenId()),
409412
BytesReference.toBytes(BytesReference.bytes(
410-
brokenRepoData.snapshotsToXContent(XContentFactory.jsonBuilder(), true))),
413+
brokenRepoData.snapshotsToXContent(XContentFactory.jsonBuilder(), Version.CURRENT))),
411414
StandardOpenOption.TRUNCATE_EXISTING);
412415

413416
logger.info("--> recreating repository to clear caches");

server/src/internalClusterTest/java/org/elasticsearch/snapshots/DedicatedClusterSnapshotRestoreIT.java

+83
Original file line numberDiff line numberDiff line change
@@ -73,9 +73,11 @@
7373
import org.elasticsearch.node.Node;
7474
import org.elasticsearch.plugins.Plugin;
7575
import org.elasticsearch.repositories.RepositoryMissingException;
76+
import org.elasticsearch.repositories.blobstore.BlobStoreRepository;
7677
import org.elasticsearch.rest.AbstractRestChannel;
7778
import org.elasticsearch.rest.RestRequest;
7879
import org.elasticsearch.rest.RestResponse;
80+
import org.elasticsearch.rest.RestStatus;
7981
import org.elasticsearch.rest.action.admin.cluster.RestClusterStateAction;
8082
import org.elasticsearch.rest.action.admin.cluster.RestGetRepositoriesAction;
8183
import org.elasticsearch.snapshots.mockstore.MockRepository;
@@ -996,6 +998,8 @@ public void testSnapshotTotalAndIncrementalSizes() throws IOException {
996998

997999
SnapshotStats stats = snapshots.get(0).getStats();
9981000

1001+
final List<Path> snapshot0IndexMetaFiles = findRepoMetaBlobs(repoPath);
1002+
assertThat(snapshot0IndexMetaFiles, hasSize(1)); // snapshotting a single index
9991003
assertThat(stats.getTotalFileCount(), greaterThanOrEqualTo(snapshot0FileCount));
10001004
assertThat(stats.getTotalSize(), greaterThanOrEqualTo(snapshot0FileSize));
10011005

@@ -1023,6 +1027,10 @@ public void testSnapshotTotalAndIncrementalSizes() throws IOException {
10231027
.get();
10241028

10251029
final List<Path> snapshot1Files = scanSnapshotFolder(repoPath);
1030+
final List<Path> snapshot1IndexMetaFiles = findRepoMetaBlobs(repoPath);
1031+
1032+
// The IndexMetadata did not change between snapshots, verify that no new redundant IndexMetaData was written to the repository
1033+
assertThat(snapshot1IndexMetaFiles, is(snapshot0IndexMetaFiles));
10261034

10271035
final int snapshot1FileCount = snapshot1Files.size();
10281036
final long snapshot1FileSize = calculateTotalFilesSize(snapshot1Files);
@@ -1047,6 +1055,65 @@ public void testSnapshotTotalAndIncrementalSizes() throws IOException {
10471055
assertThat(anotherStats.getTotalSize(), greaterThanOrEqualTo(snapshot1FileSize));
10481056
}
10491057

1058+
public void testDeduplicateIndexMetadata() throws Exception {
1059+
final String indexName = "test-blocks-1";
1060+
final String repositoryName = "repo-" + indexName;
1061+
final String snapshot0 = "snapshot-0";
1062+
final String snapshot1 = "snapshot-1";
1063+
final String snapshot2 = "snapshot-2";
1064+
1065+
createIndex(indexName);
1066+
1067+
int docs = between(10, 100);
1068+
for (int i = 0; i < docs; i++) {
1069+
client().prepareIndex(indexName, "_doc").setSource("test", "init").execute().actionGet();
1070+
}
1071+
1072+
final Path repoPath = randomRepoPath();
1073+
createRepository(repositoryName, "fs", repoPath);
1074+
1075+
logger.info("--> create a snapshot");
1076+
client().admin().cluster().prepareCreateSnapshot(repositoryName, snapshot0)
1077+
.setIncludeGlobalState(true)
1078+
.setWaitForCompletion(true)
1079+
.get();
1080+
1081+
final List<Path> snapshot0IndexMetaFiles = findRepoMetaBlobs(repoPath);
1082+
assertThat(snapshot0IndexMetaFiles, hasSize(1)); // snapshotting a single index
1083+
1084+
docs = between(1, 5);
1085+
for (int i = 0; i < docs; i++) {
1086+
client().prepareIndex(indexName, "_doc").setSource("test", "test" + i).execute().actionGet();
1087+
}
1088+
1089+
logger.info("--> restart random data node and add new data node to change index allocation");
1090+
internalCluster().restartRandomDataNode();
1091+
internalCluster().startDataOnlyNode();
1092+
ensureGreen(indexName);
1093+
1094+
assertThat(client().admin().cluster().prepareCreateSnapshot(repositoryName, snapshot1).setWaitForCompletion(true).get().status(),
1095+
equalTo(RestStatus.OK));
1096+
1097+
final List<Path> snapshot1IndexMetaFiles = findRepoMetaBlobs(repoPath);
1098+
1099+
// The IndexMetadata did not change between snapshots, verify that no new redundant IndexMetaData was written to the repository
1100+
assertThat(snapshot1IndexMetaFiles, is(snapshot0IndexMetaFiles));
1101+
1102+
// index to some other field to trigger a change in index metadata
1103+
for (int i = 0; i < docs; i++) {
1104+
client().prepareIndex(indexName, "_doc").setSource("new_field", "test" + i).execute().actionGet();
1105+
}
1106+
assertThat(client().admin().cluster().prepareCreateSnapshot(repositoryName, snapshot2).setWaitForCompletion(true).get().status(),
1107+
equalTo(RestStatus.OK));
1108+
1109+
final List<Path> snapshot2IndexMetaFiles = findRepoMetaBlobs(repoPath);
1110+
assertThat(snapshot2IndexMetaFiles, hasSize(2)); // should have created one new metadata blob
1111+
1112+
assertAcked(client().admin().cluster().prepareDeleteSnapshot(repositoryName, snapshot0, snapshot1).get());
1113+
final List<Path> snapshot3IndexMetaFiles = findRepoMetaBlobs(repoPath);
1114+
assertThat(snapshot3IndexMetaFiles, hasSize(1)); // should have deleted the metadata blob referenced by the first two snapshots
1115+
}
1116+
10501117
public void testDataNodeRestartWithBusyMasterDuringSnapshot() throws Exception {
10511118
logger.info("--> starting a master node and two data nodes");
10521119
internalCluster().startMasterOnlyNode();
@@ -1256,6 +1323,22 @@ private long calculateTotalFilesSize(List<Path> files) {
12561323
}).sum();
12571324
}
12581325

1326+
private static List<Path> findRepoMetaBlobs(Path repoPath) throws IOException {
1327+
List<Path> files = new ArrayList<>();
1328+
Files.walkFileTree(repoPath.resolve("indices"), new SimpleFileVisitor<Path>() {
1329+
@Override
1330+
public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {
1331+
final String fileName = file.getFileName().toString();
1332+
if (fileName.startsWith(BlobStoreRepository.METADATA_PREFIX) && fileName.endsWith(".dat")) {
1333+
files.add(file);
1334+
}
1335+
return super.visitFile(file, attrs);
1336+
}
1337+
}
1338+
);
1339+
return files;
1340+
}
1341+
12591342
private List<Path> scanSnapshotFolder(Path repoPath) throws IOException {
12601343
List<Path> files = new ArrayList<>();
12611344
Files.walkFileTree(repoPath, new SimpleFileVisitor<Path>(){

server/src/internalClusterTest/java/org/elasticsearch/snapshots/MetadataLoadingDuringSnapshotRestoreIT.java

+5-2
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.elasticsearch.action.admin.cluster.snapshots.get.GetSnapshotsResponse;
2424
import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse;
2525
import org.elasticsearch.action.admin.cluster.snapshots.status.SnapshotsStatusResponse;
26+
import org.elasticsearch.action.support.PlainActionFuture;
2627
import org.elasticsearch.cluster.metadata.IndexMetadata;
2728
import org.elasticsearch.cluster.metadata.Metadata;
2829
import org.elasticsearch.cluster.metadata.RepositoryMetadata;
@@ -34,6 +35,7 @@
3435
import org.elasticsearch.repositories.IndexId;
3536
import org.elasticsearch.repositories.RepositoriesService;
3637
import org.elasticsearch.repositories.Repository;
38+
import org.elasticsearch.repositories.RepositoryData;
3739
import org.elasticsearch.rest.RestStatus;
3840
import org.elasticsearch.snapshots.mockstore.MockRepository;
3941

@@ -198,9 +200,10 @@ public Metadata getSnapshotGlobalMetadata(SnapshotId snapshotId) {
198200
}
199201

200202
@Override
201-
public IndexMetadata getSnapshotIndexMetadata(SnapshotId snapshotId, IndexId indexId) throws IOException {
203+
public IndexMetadata getSnapshotIndexMetaData(RepositoryData repositoryData, SnapshotId snapshotId,
204+
IndexId indexId) throws IOException {
202205
indicesMetadata.computeIfAbsent(key(snapshotId.getName(), indexId.getName()), (s) -> new AtomicInteger(0)).incrementAndGet();
203-
return super.getSnapshotIndexMetadata(snapshotId, indexId);
206+
return super.getSnapshotIndexMetaData(PlainActionFuture.get(this::getRepositoryData), snapshotId, indexId);
204207
}
205208
}
206209

server/src/internalClusterTest/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -2546,7 +2546,8 @@ public void testRestoreSnapshotWithCorruptedIndexMetadata() throws Exception {
25462546
final IndexId corruptedIndex = randomFrom(indexIds.values());
25472547
final Path indexMetadataPath = repo.resolve("indices")
25482548
.resolve(corruptedIndex.getId())
2549-
.resolve("meta-" + snapshotInfo.snapshotId().getUUID() + ".dat");
2549+
.resolve(
2550+
"meta-" + repositoryData.indexMetaDataGenerations().indexMetaBlobId(snapshotInfo.snapshotId(), corruptedIndex) + ".dat");
25502551

25512552
// Truncate the index metadata file
25522553
try(SeekableByteChannel outChan = Files.newByteChannel(indexMetadataPath, StandardOpenOption.WRITE)) {

server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/status/TransportSnapshotsStatusAction.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -334,7 +334,7 @@ private Map<ShardId, IndexShardSnapshotStatus> snapshotShards(final String repos
334334
final Map<ShardId, IndexShardSnapshotStatus> shardStatus = new HashMap<>();
335335
for (String index : snapshotInfo.indices()) {
336336
IndexId indexId = repositoryData.resolveIndexId(index);
337-
IndexMetadata indexMetadata = repository.getSnapshotIndexMetadata(snapshotInfo.snapshotId(), indexId);
337+
IndexMetadata indexMetadata = repository.getSnapshotIndexMetaData(repositoryData, snapshotInfo.snapshotId(), indexId);
338338
if (indexMetadata != null) {
339339
int numberOfShards = indexMetadata.getNumberOfShards();
340340
for (int i = 0; i < numberOfShards; i++) {

server/src/main/java/org/elasticsearch/repositories/FilterRepository.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -70,8 +70,8 @@ public Metadata getSnapshotGlobalMetadata(SnapshotId snapshotId) {
7070
}
7171

7272
@Override
73-
public IndexMetadata getSnapshotIndexMetadata(SnapshotId snapshotId, IndexId index) throws IOException {
74-
return in.getSnapshotIndexMetadata(snapshotId, index);
73+
public IndexMetadata getSnapshotIndexMetaData(RepositoryData repositoryData, SnapshotId snapshotId, IndexId index) throws IOException {
74+
return in.getSnapshotIndexMetaData(repositoryData, snapshotId, index);
7575
}
7676

7777
@Override

0 commit comments

Comments
 (0)