Skip to content

Commit b08ca07

Browse files
committed
Delete shard store files before restoring a snapshot
Pull request elastic#20220 added a change where the store files that have the same name but are different from the ones in the snapshot are deleted first before the snapshot is restored. This logic was based on the `Store.RecoveryDiff.different` set of files which works by computing a diff between an existing store and a snapshot. This works well when the files on the filesystem form valid shard store, ie there's a `segments` file and store files are not corrupted. Otherwise, the existing store's snapshot metadata cannot be read (using Store#snapshotStoreMetadata()) and an exception is thrown (CorruptIndexException, IndexFormatTooOldException etc) which is later caught as the begining of the restore process (see RestoreContext#restore()) and is translated into an empty store metadata (Store.MetadataSnapshot.EMPTY). This will make the deletion of different files introduced in elastic#20220 useless as the set of files will always be empty even when store files exist on the filesystem. And if some files are present within the store directory, then restoring a snapshot with files with same names will fail with a FileAlreadyExistException. This is part of the elastic#26865 issue. There are various cases were some files could exist in the store directory before a snapshot is restored. One that Igor identified is a restore attempt that failed on a node and only first files were restored, then the shard is allocated again to the same node and the restore starts again (but fails because of existing files). Another one is when some files of a closed index are corrupted / deleted and the index is restored. This commit adds a test that uses the infrastructure provided by IndexShardTestCase in order to test that restoring a shard succeed even when files with same names exist on filesystem. Related to elastic#26865
1 parent fadbe0d commit b08ca07

File tree

4 files changed

+310
-14
lines changed

4 files changed

+310
-14
lines changed

core/src/main/java/org/elasticsearch/index/store/Store.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -731,7 +731,7 @@ public String toString() {
731731

732732
/**
733733
* Represents a snapshot of the current directory build from the latest Lucene commit.
734-
* Only files that are part of the last commit are considered in this datastrucutre.
734+
* Only files that are part of the last commit are considered in this datastructure.
735735
* For backwards compatibility the snapshot might include legacy checksums that
736736
* are derived from a dedicated checksum file written by older elasticsearch version pre 1.3
737737
* <p>

core/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java

Lines changed: 31 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,6 @@
3535
import org.apache.lucene.store.RateLimiter;
3636
import org.apache.lucene.util.BytesRef;
3737
import org.apache.lucene.util.BytesRefBuilder;
38-
import org.apache.lucene.util.IOUtils;
3938
import org.elasticsearch.ElasticsearchParseException;
4039
import org.elasticsearch.ExceptionsHelper;
4140
import org.elasticsearch.ResourceNotFoundException;
@@ -110,9 +109,11 @@
110109
import java.nio.file.FileAlreadyExistsException;
111110
import java.nio.file.NoSuchFileException;
112111
import java.util.ArrayList;
112+
import java.util.Arrays;
113113
import java.util.Collection;
114114
import java.util.Collections;
115115
import java.util.HashMap;
116+
import java.util.HashSet;
116117
import java.util.List;
117118
import java.util.Map;
118119
import java.util.Set;
@@ -1451,6 +1452,9 @@ public void restore() throws IOException {
14511452
SnapshotFiles snapshotFiles = new SnapshotFiles(snapshot.snapshot(), snapshot.indexFiles());
14521453
Store.MetadataSnapshot recoveryTargetMetadata;
14531454
try {
1455+
// this will throw an IOException if the store has no segments infos file. The
1456+
// store can still have existing files but they will be deleted just before being
1457+
// restored.
14541458
recoveryTargetMetadata = targetShard.snapshotStoreMetadata();
14551459
} catch (IndexNotFoundException e) {
14561460
// happens when restore to an empty shard, not a big deal
@@ -1478,7 +1482,14 @@ public void restore() throws IOException {
14781482
snapshotMetaData.put(fileInfo.metadata().name(), fileInfo.metadata());
14791483
fileInfos.put(fileInfo.metadata().name(), fileInfo);
14801484
}
1485+
14811486
final Store.MetadataSnapshot sourceMetaData = new Store.MetadataSnapshot(unmodifiableMap(snapshotMetaData), emptyMap(), 0);
1487+
1488+
final StoreFileMetaData restoredSegmentsFile = sourceMetaData.getSegmentsFile();
1489+
if (restoredSegmentsFile == null) {
1490+
throw new IndexShardRestoreFailedException(shardId, "Snapshot has no segments file");
1491+
}
1492+
14821493
final Store.RecoveryDiff diff = sourceMetaData.recoveryDiff(recoveryTargetMetadata);
14831494
for (StoreFileMetaData md : diff.identical) {
14841495
BlobStoreIndexShardSnapshot.FileInfo fileInfo = fileInfos.get(md.name());
@@ -1510,24 +1521,33 @@ public void restore() throws IOException {
15101521
index.totalRecoverFiles(), new ByteSizeValue(index.totalRecoverBytes()), index.reusedFileCount(), new ByteSizeValue(index.reusedFileCount()));
15111522
}
15121523
try {
1513-
// first, delete pre-existing files in the store that have the same name but are
1514-
// different (i.e. different length/checksum) from those being restored in the snapshot
1515-
for (final StoreFileMetaData storeFileMetaData : diff.different) {
1516-
IOUtils.deleteFiles(store.directory(), storeFileMetaData.name());
1517-
}
1524+
// list of all existing store files without the identical ones
1525+
final Set<String> deleteIfExistFiles = Sets.difference(
1526+
new HashSet<>(Arrays.asList(store.directory().listAll())),
1527+
diff.identical.stream().map(StoreFileMetaData::name).collect(Collectors.toSet())
1528+
);
1529+
15181530
// restore the files from the snapshot to the Lucene store
15191531
for (final BlobStoreIndexShardSnapshot.FileInfo fileToRecover : filesToRecover) {
1532+
// if a file with a same physical name already exist in the store we need to delete it
1533+
// before restoring it from the snapshot. We could be lenient and try to reuse the existing
1534+
// store files (and compare their names/length/checksum again with the snapshot files) but to
1535+
// avoid extra complexity we simply delete them and restore them again like StoreRecovery
1536+
// does with dangling indices. Any existing store file that is not restored from the snapshot
1537+
// will be clean up by RecoveryTarget.cleanFiles().
1538+
final String physicalName = fileToRecover.physicalName();
1539+
if (deleteIfExistFiles.contains(physicalName)) {
1540+
logger.trace("[{}] [{}] deleting pre-existing file [{}]", shardId, snapshotId, physicalName);
1541+
store.directory().deleteFile(physicalName);
1542+
}
1543+
15201544
logger.trace("[{}] [{}] restoring file [{}]", shardId, snapshotId, fileToRecover.name());
15211545
restoreFile(fileToRecover, store);
15221546
}
15231547
} catch (IOException ex) {
15241548
throw new IndexShardRestoreFailedException(shardId, "Failed to recover index", ex);
15251549
}
1526-
final StoreFileMetaData restoredSegmentsFile = sourceMetaData.getSegmentsFile();
1527-
if (recoveryTargetMetadata == null) {
1528-
throw new IndexShardRestoreFailedException(shardId, "Snapshot has no segments file");
1529-
}
1530-
assert restoredSegmentsFile != null;
1550+
15311551
// read the snapshot data persisted
15321552
final SegmentInfos segmentCommitInfos;
15331553
try {

core/src/main/java/org/elasticsearch/snapshots/RestoreService.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,6 @@
7676
import java.util.Collections;
7777
import java.util.HashMap;
7878
import java.util.HashSet;
79-
import java.util.Iterator;
8079
import java.util.List;
8180
import java.util.Map;
8281
import java.util.Objects;
@@ -189,7 +188,7 @@ public void restoreSnapshot(final RestoreRequest request, final ActionListener<R
189188
final SnapshotInfo snapshotInfo = repository.getSnapshotInfo(snapshotId);
190189
final Snapshot snapshot = new Snapshot(request.repositoryName, snapshotId);
191190
List<String> filteredIndices = SnapshotUtils.filterIndices(snapshotInfo.indices(), request.indices(), request.indicesOptions());
192-
MetaData metaData = repository.getSnapshotMetaData(snapshotInfo, repositoryData.resolveIndices(filteredIndices));
191+
final MetaData metaData = repository.getSnapshotMetaData(snapshotInfo, repositoryData.resolveIndices(filteredIndices));
193192

194193
// Make sure that we can restore from this snapshot
195194
validateSnapshotRestorable(request.repositoryName, snapshotInfo);
Lines changed: 277 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,277 @@
1+
/*
2+
* Licensed to Elasticsearch under one or more contributor
3+
* license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright
5+
* ownership. Elasticsearch licenses this file to you under
6+
* the Apache License, Version 2.0 (the "License"); you may
7+
* not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.elasticsearch.repositories.blobstore;
21+
22+
import org.apache.lucene.store.Directory;
23+
import org.apache.lucene.util.IOUtils;
24+
import org.apache.lucene.util.TestUtil;
25+
import org.elasticsearch.Version;
26+
import org.elasticsearch.cluster.metadata.RepositoryMetaData;
27+
import org.elasticsearch.cluster.node.DiscoveryNode;
28+
import org.elasticsearch.cluster.routing.RecoverySource;
29+
import org.elasticsearch.cluster.routing.ShardRouting;
30+
import org.elasticsearch.cluster.routing.ShardRoutingHelper;
31+
import org.elasticsearch.cluster.routing.ShardRoutingState;
32+
import org.elasticsearch.common.UUIDs;
33+
import org.elasticsearch.common.blobstore.BlobContainer;
34+
import org.elasticsearch.common.blobstore.BlobMetaData;
35+
import org.elasticsearch.common.blobstore.BlobPath;
36+
import org.elasticsearch.common.blobstore.BlobStore;
37+
import org.elasticsearch.common.blobstore.support.PlainBlobMetaData;
38+
import org.elasticsearch.common.io.Streams;
39+
import org.elasticsearch.common.settings.Settings;
40+
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
41+
import org.elasticsearch.index.Index;
42+
import org.elasticsearch.index.engine.Engine;
43+
import org.elasticsearch.index.shard.IndexShard;
44+
import org.elasticsearch.index.shard.IndexShardState;
45+
import org.elasticsearch.index.shard.IndexShardTestCase;
46+
import org.elasticsearch.index.shard.ShardId;
47+
import org.elasticsearch.index.snapshots.IndexShardSnapshotStatus;
48+
import org.elasticsearch.index.store.Store;
49+
import org.elasticsearch.index.store.StoreFileMetaData;
50+
import org.elasticsearch.indices.recovery.RecoveryState;
51+
import org.elasticsearch.repositories.IndexId;
52+
import org.elasticsearch.snapshots.Snapshot;
53+
import org.elasticsearch.snapshots.SnapshotId;
54+
55+
import java.io.ByteArrayInputStream;
56+
import java.io.ByteArrayOutputStream;
57+
import java.io.FileNotFoundException;
58+
import java.io.IOException;
59+
import java.io.InputStream;
60+
import java.nio.file.Files;
61+
import java.util.Arrays;
62+
import java.util.HashMap;
63+
import java.util.List;
64+
import java.util.Map;
65+
import java.util.stream.Collectors;
66+
67+
import static org.elasticsearch.cluster.routing.RecoverySource.StoreRecoverySource.EXISTING_STORE_INSTANCE;
68+
import static org.elasticsearch.cluster.routing.TestShardRouting.newShardRouting;
69+
70+
/**
71+
* This class tests the behavior of {@link BlobStoreRepository} when it
72+
* restores a shard from a snapshot but some files with same names already
73+
* exist on disc.
74+
*/
75+
public class BlobStoreRepositoryRestoreTests extends IndexShardTestCase {
76+
77+
/**
78+
* Restoring a snapshot that contains multiple files must succeed even when
79+
* some files already exist in the shard's store.
80+
*/
81+
public void testRestoreSnapshotWithExistingFiles() throws IOException {
82+
final IndexId indexId = new IndexId(randomAlphaOfLength(10), UUIDs.randomBase64UUID());
83+
final ShardId shardId = new ShardId(indexId.getName(), indexId.getId(), 0);
84+
85+
IndexShard shard = newShard(shardId, true);
86+
try {
87+
// index documents in the shards
88+
final int numDocs = scaledRandomIntBetween(1, 500);
89+
recoverShardFromStore(shard);
90+
for (int i = 0; i < numDocs; i++) {
91+
indexDoc(shard, "doc", Integer.toString(i));
92+
if (rarely()) {
93+
flushShard(shard, false);
94+
}
95+
}
96+
assertDocCount(shard, numDocs);
97+
98+
// snapshot the shard
99+
final BlobStoreRepository repository = createRepository();
100+
final Snapshot snapshot = new Snapshot(repository.getMetadata().name(), new SnapshotId(randomAlphaOfLength(10), "_uuid"));
101+
snapshotShard(shard, snapshot, repository);
102+
103+
// capture current store files
104+
final Store.MetadataSnapshot storeFiles = shard.snapshotStoreMetadata();
105+
assertFalse(storeFiles.asMap().isEmpty());
106+
107+
// close the shard
108+
closeShards(shard);
109+
110+
// delete some random files in the store
111+
List<String> deletedFiles = randomSubsetOf(randomIntBetween(1, storeFiles.size() - 1), storeFiles.asMap().keySet());
112+
for (String deletedFile : deletedFiles) {
113+
Files.delete(shard.shardPath().resolveIndex().resolve(deletedFile));
114+
}
115+
116+
// build a new shard using the same store directory as the closed shard
117+
ShardRouting shardRouting = ShardRoutingHelper.initWithSameId(shard.routingEntry(), EXISTING_STORE_INSTANCE);
118+
shard = newShard(shardRouting, shard.shardPath(), shard.indexSettings().getIndexMetaData(), null, null, () -> {});
119+
120+
// restore the shard
121+
recoverShardFromSnapshot(shard, snapshot, repository);
122+
123+
// check that the shard is not corrupted
124+
TestUtil.checkIndex(shard.store().directory());
125+
126+
// check that all files have been restored
127+
final Directory directory = shard.store().directory();
128+
final List<String> directoryFiles = Arrays.asList(directory.listAll());
129+
130+
for (StoreFileMetaData storeFile : storeFiles) {
131+
String fileName = storeFile.name();
132+
assertTrue("File [" + fileName + "] does not exist in store directory", directoryFiles.contains(fileName));
133+
assertEquals(storeFile.length(), shard.store().directory().fileLength(fileName));
134+
}
135+
} finally {
136+
if (shard != null && shard.state() != IndexShardState.CLOSED) {
137+
try {
138+
shard.close("test", false);
139+
} finally {
140+
IOUtils.close(shard.store());
141+
}
142+
}
143+
}
144+
}
145+
146+
/** Recover a shard from a snapshot using a given repository **/
147+
private void recoverShardFromSnapshot(final IndexShard shard,
148+
final Snapshot snapshot,
149+
final BlobStoreRepository repository) throws IOException {
150+
final Version version = Version.CURRENT;
151+
final ShardId shardId = shard.shardId();
152+
final String index = shard.shardId().getIndexName();
153+
final IndexId indexId = new IndexId(index, UUIDs.randomBase64UUID());
154+
final DiscoveryNode node = new DiscoveryNode(randomAlphaOfLength(25), buildNewFakeTransportAddress(), version);
155+
final RecoverySource.SnapshotRecoverySource recoverySource = new RecoverySource.SnapshotRecoverySource(snapshot, version, index);
156+
final ShardRouting shardRouting = newShardRouting(shardId, node.getId(), true, recoverySource, ShardRoutingState.INITIALIZING);
157+
158+
shard.markAsRecovering("from snapshot", new RecoveryState(shardRouting, node, null));
159+
repository.restoreShard(shard, snapshot.getSnapshotId(), version, indexId, shard.shardId(), shard.recoveryState());
160+
}
161+
162+
/** Snapshot a shard using a given repository **/
163+
private void snapshotShard(final IndexShard shard,
164+
final Snapshot snapshot,
165+
final BlobStoreRepository repository) throws IOException {
166+
final IndexShardSnapshotStatus snapshotStatus = new IndexShardSnapshotStatus();
167+
try (Engine.IndexCommitRef indexCommitRef = shard.acquireIndexCommit(true)) {
168+
Index index = shard.shardId().getIndex();
169+
IndexId indexId = new IndexId(index.getName(), index.getUUID());
170+
171+
repository.snapshotShard(shard, snapshot.getSnapshotId(), indexId, indexCommitRef.getIndexCommit(), snapshotStatus);
172+
}
173+
assertEquals(IndexShardSnapshotStatus.Stage.DONE, snapshotStatus.stage());
174+
assertEquals(shard.snapshotStoreMetadata().size(), snapshotStatus.numberOfFiles());
175+
assertNull(snapshotStatus.failure());
176+
}
177+
178+
179+
/**
180+
* A {@link BlobStoreRepository} implementation that works in memory.
181+
*
182+
* It implements only the methods required by the tests and is not thread safe.
183+
*/
184+
class MemoryBlobStoreRepository extends BlobStoreRepository {
185+
186+
private final Map<String, byte[]> files = new HashMap<>();
187+
188+
MemoryBlobStoreRepository(final RepositoryMetaData metadata, final Settings settings, final NamedXContentRegistry registry) {
189+
super(metadata, settings, registry);
190+
}
191+
192+
@Override
193+
protected BlobStore blobStore() {
194+
return new BlobStore() {
195+
@Override
196+
public BlobContainer blobContainer(BlobPath path) {
197+
return new BlobContainer() {
198+
@Override
199+
public BlobPath path() {
200+
return new BlobPath();
201+
}
202+
203+
@Override
204+
public boolean blobExists(String blobName) {
205+
return files.containsKey(blobName);
206+
}
207+
208+
@Override
209+
public InputStream readBlob(String blobName) throws IOException {
210+
if (blobExists(blobName) == false) {
211+
throw new FileNotFoundException(blobName);
212+
}
213+
return new ByteArrayInputStream(files.get(blobName));
214+
}
215+
216+
@Override
217+
public void writeBlob(String blobName, InputStream in, long blobSize) throws IOException {
218+
try (ByteArrayOutputStream out = new ByteArrayOutputStream((int) blobSize)) {
219+
Streams.copy(in, out);
220+
files.put(blobName, out.toByteArray());
221+
}
222+
}
223+
224+
@Override
225+
public void deleteBlob(String blobName) throws IOException {
226+
files.remove(blobName);
227+
}
228+
229+
@Override
230+
public Map<String, BlobMetaData> listBlobs() throws IOException {
231+
final Map<String, BlobMetaData> blobs = new HashMap<>(files.size());
232+
files.forEach((key, value) -> blobs.put(key, new PlainBlobMetaData(key, value.length)));
233+
return blobs;
234+
}
235+
236+
@Override
237+
public Map<String, BlobMetaData> listBlobsByPrefix(String blobNamePrefix) throws IOException {
238+
return listBlobs().entrySet().stream()
239+
.filter(e -> e.getKey().startsWith(blobNamePrefix))
240+
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
241+
}
242+
243+
@Override
244+
public void move(String sourceBlobName, String targetBlobName) throws IOException {
245+
byte[] bytes = files.remove(sourceBlobName);
246+
if (bytes == null) {
247+
throw new FileNotFoundException(sourceBlobName);
248+
}
249+
files.put(targetBlobName, bytes);
250+
}
251+
};
252+
}
253+
254+
@Override
255+
public void delete(BlobPath path) throws IOException {
256+
throw new UnsupportedOperationException("MemoryBlobStoreRepository does not support this method");
257+
}
258+
259+
@Override
260+
public void close() throws IOException {
261+
files.clear();
262+
}
263+
};
264+
}
265+
266+
@Override
267+
protected BlobPath basePath() {
268+
return new BlobPath();
269+
}
270+
}
271+
272+
/** Create a {@link BlobStoreRepository} with a random name **/
273+
private BlobStoreRepository createRepository() {
274+
String name = randomAlphaOfLength(10);
275+
return new MemoryBlobStoreRepository(new RepositoryMetaData(name, "in-memory", Settings.EMPTY), Settings.EMPTY, xContentRegistry());
276+
}
277+
}

0 commit comments

Comments
 (0)