Skip to content

Commit 51a1dcf

Browse files
Fix Source Only Snapshot Permanently Broken on Broken _snapshot Directory (#71459)
Best effort fix, pruning the directory in case of any trouble syncing the snapshot to it as would be the case with e.g. existing dangling files from a previous aborted sync.
1 parent c50fd8f commit 51a1dcf

File tree

2 files changed

+58
-2
lines changed

2 files changed

+58
-2
lines changed

x-pack/plugin/core/src/internalClusterTest/java/org/elasticsearch/snapshots/SourceOnlySnapshotIT.java

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import org.elasticsearch.index.engine.EngineFactory;
3131
import org.elasticsearch.index.mapper.SeqNoFieldMapper;
3232
import org.elasticsearch.index.query.QueryBuilders;
33+
import org.elasticsearch.indices.IndicesService;
3334
import org.elasticsearch.indices.recovery.RecoverySettings;
3435
import org.elasticsearch.plugins.EnginePlugin;
3536
import org.elasticsearch.plugins.Plugin;
@@ -43,6 +44,9 @@
4344
import org.hamcrest.Matchers;
4445

4546
import java.io.IOException;
47+
import java.nio.file.DirectoryStream;
48+
import java.nio.file.Files;
49+
import java.nio.file.Path;
4650
import java.util.ArrayList;
4751
import java.util.Arrays;
4852
import java.util.Collection;
@@ -147,6 +151,41 @@ public void testSnapshotAndRestoreWithNested() throws Exception {
147151
assertHits(sourceIdx, builders.length, true);
148152
}
149153

154+
public void testSnapshotWithDanglingLocalSegment() throws IOException {
155+
logger.info("--> starting a master node and a data node");
156+
internalCluster().startMasterOnlyNode();
157+
final String dataNode = internalCluster().startDataOnlyNode();
158+
159+
final String repo = "test-repo";
160+
logger.info("--> creating repository");
161+
assertAcked(client().admin().cluster().preparePutRepository(repo).setType("source")
162+
.setSettings(Settings.builder().put("location", randomRepoPath()).put("delegate_type", "fs")
163+
.put("compress", randomBoolean())));
164+
165+
final String indexName = "test-idx";
166+
createIndex(indexName);
167+
client().prepareIndex(indexName).setSource("foo", "bar").get();
168+
client().admin().cluster().prepareCreateSnapshot(repo, "snapshot-1").setWaitForCompletion(true).get();
169+
170+
client().prepareIndex(indexName).setSource("foo", "baz").get();
171+
client().admin().cluster().prepareCreateSnapshot(repo, "snapshot-2").setWaitForCompletion(true).get();
172+
173+
logger.info("--> randomly deleting files from the local _snapshot path to simulate corruption");
174+
Path snapshotShardPath = internalCluster().getInstance(IndicesService.class, dataNode).indexService(
175+
clusterService().state().metadata().index(indexName).getIndex()).getShard(0).shardPath().getDataPath()
176+
.resolve("_snapshot");
177+
try (DirectoryStream<Path> localFiles = Files.newDirectoryStream(snapshotShardPath)) {
178+
for (Path localFile : localFiles) {
179+
if (randomBoolean()) {
180+
Files.delete(localFile);
181+
}
182+
}
183+
}
184+
185+
assertEquals(SnapshotState.SUCCESS, client().admin().cluster().prepareCreateSnapshot(repo, "snapshot-3")
186+
.setWaitForCompletion(true).get().getSnapshotInfo().state());
187+
}
188+
150189
private void assertMappings(String sourceIdx, boolean requireRouting, boolean useNested) throws IOException {
151190
GetMappingsResponse getMappingsResponse = client().admin().indices().prepareGetMappings(sourceIdx).get();
152191
MappingMetadata mapping = getMappingsResponse.getMappings().get(sourceIdx);

x-pack/plugin/core/src/main/java/org/elasticsearch/snapshots/SourceOnlySnapshotRepository.java

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,10 @@
66
*/
77
package org.elasticsearch.snapshots;
88

9+
import org.apache.logging.log4j.LogManager;
10+
import org.apache.logging.log4j.Logger;
11+
import org.apache.logging.log4j.message.ParameterizedMessage;
12+
import org.apache.lucene.index.CorruptIndexException;
913
import org.apache.lucene.index.DirectoryReader;
1014
import org.apache.lucene.index.IndexCommit;
1115
import org.apache.lucene.index.SegmentInfos;
@@ -22,6 +26,7 @@
2226
import org.elasticsearch.cluster.metadata.Metadata;
2327
import org.elasticsearch.cluster.metadata.RepositoryMetadata;
2428
import org.elasticsearch.common.Strings;
29+
import org.elasticsearch.common.lucene.Lucene;
2530
import org.elasticsearch.common.lucene.search.Queries;
2631
import org.elasticsearch.common.settings.Setting;
2732
import org.elasticsearch.common.settings.Settings;
@@ -43,6 +48,8 @@
4348
import java.io.Closeable;
4449
import java.io.IOException;
4550
import java.io.UncheckedIOException;
51+
import java.nio.file.FileAlreadyExistsException;
52+
import java.nio.file.NoSuchFileException;
4653
import java.nio.file.Path;
4754
import java.util.ArrayList;
4855
import java.util.Collection;
@@ -73,6 +80,8 @@ public final class SourceOnlySnapshotRepository extends FilterRepository {
7380
public static final Setting<Boolean> SOURCE_ONLY = Setting.boolSetting("index.source_only", false, Setting
7481
.Property.IndexScope, Setting.Property.Final, Setting.Property.PrivateIndex);
7582

83+
private static final Logger logger = LogManager.getLogger(SourceOnlySnapshotRepository.class);
84+
7685
private static final String SNAPSHOT_DIR_NAME = "_snapshot";
7786

7887
SourceOnlySnapshotRepository(Repository in) {
@@ -146,8 +155,16 @@ protected void closeInternal() {
146155
}, Store.OnClose.EMPTY);
147156
Supplier<Query> querySupplier = mapperService.hasNested() ? Queries::newNestedFilter : null;
148157
// SourceOnlySnapshot will take care of soft- and hard-deletes no special casing needed here
149-
SourceOnlySnapshot snapshot = new SourceOnlySnapshot(overlayDir, querySupplier);
150-
snapshot.syncSnapshot(snapshotIndexCommit);
158+
SourceOnlySnapshot snapshot;
159+
snapshot = new SourceOnlySnapshot(overlayDir, querySupplier);
160+
try {
161+
snapshot.syncSnapshot(snapshotIndexCommit);
162+
} catch (NoSuchFileException | CorruptIndexException | FileAlreadyExistsException e) {
163+
logger.warn(() -> new ParameterizedMessage(
164+
"Existing staging directory [{}] appears corrupted and will be pruned and recreated.", snapPath), e);
165+
Lucene.cleanLuceneIndex(overlayDir);
166+
snapshot.syncSnapshot(snapshotIndexCommit);
167+
}
151168
// we will use the lucene doc ID as the seq ID so we set the local checkpoint to maxDoc with a new index UUID
152169
SegmentInfos segmentInfos = tempStore.readLastCommittedSegmentsInfo();
153170
final long maxDoc = segmentInfos.totalMaxDoc();

0 commit comments

Comments
 (0)