Skip to content

Commit 2e7b1ab

Browse files
Use ClusterState as Consistency Source for Snapshot Repositories (#49060) (#50267)
Follow up to #49729 This change removes falling back to listing out the repository contents to find the latest `index-N` in write-mounted blob store repositories. This saves 2-3 list operations on each snapshot create and delete operation. Also it makes all the snapshot status APIs cheaper (and faster) by saving one list operation there as well in many cases. This removes the resiliency to concurrent modifications of the repository as a result and puts a repository in a `corrupted` state in case loading `RepositoryData` failed from the assumed generation.
1 parent ce294e1 commit 2e7b1ab

File tree

8 files changed

+457
-50
lines changed

8 files changed

+457
-50
lines changed

server/src/main/java/org/elasticsearch/repositories/RepositoryData.java

+5
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,11 @@ public final class RepositoryData {
5858
*/
5959
public static final long UNKNOWN_REPO_GEN = -2L;
6060

61+
/**
62+
* The generation value indicating that the repository generation could not be determined.
63+
*/
64+
public static final long CORRUPTED_REPO_GEN = -3L;
65+
6166
/**
6267
* An instance initialized for an empty repository.
6368
*/

server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java

+180-43
Large diffs are not rendered by default.

server/src/test/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryRestoreTests.java

+6-2
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import org.elasticsearch.cluster.routing.RecoverySource;
2828
import org.elasticsearch.cluster.routing.ShardRouting;
2929
import org.elasticsearch.cluster.routing.ShardRoutingHelper;
30+
import org.elasticsearch.cluster.service.ClusterService;
3031
import org.elasticsearch.common.UUIDs;
3132
import org.elasticsearch.common.settings.Settings;
3233
import org.elasticsearch.core.internal.io.IOUtils;
@@ -193,13 +194,16 @@ public void testSnapshotWithConflictingName() throws IOException {
193194
private Repository createRepository() {
194195
Settings settings = Settings.builder().put("location", randomAlphaOfLength(10)).build();
195196
RepositoryMetaData repositoryMetaData = new RepositoryMetaData(randomAlphaOfLength(10), FsRepository.TYPE, settings);
196-
final FsRepository repository = new FsRepository(repositoryMetaData, createEnvironment(), xContentRegistry(),
197-
BlobStoreTestUtil.mockClusterService(repositoryMetaData)) {
197+
final ClusterService clusterService = BlobStoreTestUtil.mockClusterService(repositoryMetaData);
198+
final FsRepository repository = new FsRepository(repositoryMetaData, createEnvironment(), xContentRegistry(), clusterService) {
198199
@Override
199200
protected void assertSnapshotOrGenericThread() {
200201
// eliminate thread name check as we create repo manually
201202
}
202203
};
204+
clusterService.addStateApplier(event -> repository.updateState(event.state()));
205+
// Apply state once to initialize repo properly like RepositoriesService would
206+
repository.updateState(clusterService.state());
203207
repository.start();
204208
return repository;
205209
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,247 @@
1+
/*
2+
* Licensed to Elasticsearch under one or more contributor
3+
* license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright
5+
* ownership. Elasticsearch licenses this file to you under
6+
* the Apache License, Version 2.0 (the "License"); you may
7+
* not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.elasticsearch.snapshots;
20+
21+
import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse;
22+
import org.elasticsearch.action.support.PlainActionFuture;
23+
import org.elasticsearch.client.Client;
24+
import org.elasticsearch.cluster.ClusterState;
25+
import org.elasticsearch.cluster.ClusterStateUpdateTask;
26+
import org.elasticsearch.cluster.metadata.MetaData;
27+
import org.elasticsearch.cluster.metadata.RepositoriesMetaData;
28+
import org.elasticsearch.cluster.service.ClusterService;
29+
import org.elasticsearch.common.settings.Settings;
30+
import org.elasticsearch.common.unit.ByteSizeUnit;
31+
import org.elasticsearch.repositories.RepositoriesService;
32+
import org.elasticsearch.repositories.Repository;
33+
import org.elasticsearch.repositories.RepositoryData;
34+
import org.elasticsearch.repositories.RepositoryException;
35+
import org.elasticsearch.repositories.blobstore.BlobStoreRepository;
36+
37+
import java.nio.file.Files;
38+
import java.nio.file.Path;
39+
40+
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
41+
import static org.hamcrest.Matchers.containsString;
42+
import static org.hamcrest.Matchers.equalTo;
43+
import static org.hamcrest.Matchers.greaterThan;
44+
import static org.hamcrest.Matchers.is;
45+
46+
public class CorruptedBlobStoreRepositoryIT extends AbstractSnapshotIntegTestCase {
47+
48+
public void testConcurrentlyChangeRepositoryContents() throws Exception {
49+
Client client = client();
50+
51+
Path repo = randomRepoPath();
52+
final String repoName = "test-repo";
53+
logger.info("--> creating repository at {}", repo.toAbsolutePath());
54+
assertAcked(client.admin().cluster().preparePutRepository(repoName)
55+
.setType("fs").setSettings(Settings.builder()
56+
.put("location", repo)
57+
.put("compress", false)
58+
.put("chunk_size", randomIntBetween(100, 1000), ByteSizeUnit.BYTES)));
59+
60+
createIndex("test-idx-1", "test-idx-2");
61+
logger.info("--> indexing some data");
62+
indexRandom(true,
63+
client().prepareIndex().setIndex("test-idx-1").setSource("foo", "bar"),
64+
client().prepareIndex().setIndex("test-idx-2").setSource("foo", "bar"));
65+
66+
final String snapshot = "test-snap";
67+
68+
logger.info("--> creating snapshot");
69+
CreateSnapshotResponse createSnapshotResponse = client.admin().cluster().prepareCreateSnapshot(repoName, snapshot)
70+
.setWaitForCompletion(true).setIndices("test-idx-*").get();
71+
assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(), greaterThan(0));
72+
assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(),
73+
equalTo(createSnapshotResponse.getSnapshotInfo().totalShards()));
74+
75+
logger.info("--> move index-N blob to next generation");
76+
final RepositoryData repositoryData =
77+
getRepositoryData(internalCluster().getMasterNodeInstance(RepositoriesService.class).repository(repoName));
78+
Files.move(repo.resolve("index-" + repositoryData.getGenId()), repo.resolve("index-" + (repositoryData.getGenId() + 1)));
79+
80+
assertRepositoryBlocked(client, repoName, snapshot);
81+
82+
if (randomBoolean()) {
83+
logger.info("--> move index-N blob back to initial generation");
84+
Files.move(repo.resolve("index-" + (repositoryData.getGenId() + 1)), repo.resolve("index-" + repositoryData.getGenId()));
85+
86+
logger.info("--> verify repository remains blocked");
87+
assertRepositoryBlocked(client, repoName, snapshot);
88+
}
89+
90+
logger.info("--> remove repository");
91+
assertAcked(client.admin().cluster().prepareDeleteRepository(repoName));
92+
93+
logger.info("--> recreate repository");
94+
assertAcked(client.admin().cluster().preparePutRepository(repoName)
95+
.setType("fs").setSettings(Settings.builder()
96+
.put("location", repo)
97+
.put("compress", false)
98+
.put("chunk_size", randomIntBetween(100, 1000), ByteSizeUnit.BYTES)));
99+
100+
logger.info("--> delete snapshot");
101+
client.admin().cluster().prepareDeleteSnapshot(repoName, snapshot).get();
102+
103+
logger.info("--> make sure snapshot doesn't exist");
104+
expectThrows(SnapshotMissingException.class, () -> client.admin().cluster().prepareGetSnapshots(repoName)
105+
.addSnapshots(snapshot).get());
106+
}
107+
108+
public void testConcurrentlyChangeRepositoryContentsInBwCMode() throws Exception {
109+
Client client = client();
110+
111+
Path repo = randomRepoPath();
112+
final String repoName = "test-repo";
113+
logger.info("--> creating repository at {}", repo.toAbsolutePath());
114+
assertAcked(client.admin().cluster().preparePutRepository(repoName)
115+
.setType("fs").setSettings(Settings.builder()
116+
.put("location", repo)
117+
.put("compress", false)
118+
.put(BlobStoreRepository.ALLOW_CONCURRENT_MODIFICATION.getKey(), true)
119+
.put("chunk_size", randomIntBetween(100, 1000), ByteSizeUnit.BYTES)));
120+
121+
createIndex("test-idx-1", "test-idx-2");
122+
logger.info("--> indexing some data");
123+
indexRandom(true,
124+
client().prepareIndex().setIndex("test-idx-1").setSource("foo", "bar"),
125+
client().prepareIndex().setIndex("test-idx-2").setSource("foo", "bar"));
126+
127+
final String snapshot = "test-snap";
128+
129+
logger.info("--> creating snapshot");
130+
CreateSnapshotResponse createSnapshotResponse = client.admin().cluster().prepareCreateSnapshot(repoName, snapshot)
131+
.setWaitForCompletion(true).setIndices("test-idx-*").get();
132+
assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(), greaterThan(0));
133+
assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(),
134+
equalTo(createSnapshotResponse.getSnapshotInfo().totalShards()));
135+
136+
final Repository repository = internalCluster().getMasterNodeInstance(RepositoriesService.class).repository(repoName);
137+
138+
logger.info("--> move index-N blob to next generation");
139+
final RepositoryData repositoryData = getRepositoryData(repository);
140+
final long beforeMoveGen = repositoryData.getGenId();
141+
Files.move(repo.resolve("index-" + beforeMoveGen), repo.resolve("index-" + (beforeMoveGen + 1)));
142+
143+
logger.info("--> verify index-N blob is found at the new location");
144+
assertThat(getRepositoryData(repository).getGenId(), is(beforeMoveGen + 1));
145+
146+
logger.info("--> delete snapshot");
147+
client.admin().cluster().prepareDeleteSnapshot(repoName, snapshot).get();
148+
149+
logger.info("--> verify index-N blob is found at the expected location");
150+
assertThat(getRepositoryData(repository).getGenId(), is(beforeMoveGen + 2));
151+
152+
logger.info("--> make sure snapshot doesn't exist");
153+
expectThrows(SnapshotMissingException.class, () -> client.admin().cluster().prepareGetSnapshots(repoName)
154+
.addSnapshots(snapshot).get());
155+
}
156+
157+
public void testFindDanglingLatestGeneration() throws Exception {
158+
Path repo = randomRepoPath();
159+
final String repoName = "test-repo";
160+
logger.info("--> creating repository at {}", repo.toAbsolutePath());
161+
assertAcked(client().admin().cluster().preparePutRepository(repoName)
162+
.setType("fs").setSettings(Settings.builder()
163+
.put("location", repo)
164+
.put("compress", false)
165+
.put("chunk_size", randomIntBetween(100, 1000), ByteSizeUnit.BYTES)));
166+
167+
createIndex("test-idx-1", "test-idx-2");
168+
logger.info("--> indexing some data");
169+
indexRandom(true,
170+
client().prepareIndex().setIndex("test-idx-1").setSource("foo", "bar"),
171+
client().prepareIndex().setIndex("test-idx-2").setSource("foo", "bar"));
172+
173+
final String snapshot = "test-snap";
174+
175+
logger.info("--> creating snapshot");
176+
CreateSnapshotResponse createSnapshotResponse = client().admin().cluster().prepareCreateSnapshot(repoName, snapshot)
177+
.setWaitForCompletion(true).setIndices("test-idx-*").get();
178+
assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(), greaterThan(0));
179+
assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(),
180+
equalTo(createSnapshotResponse.getSnapshotInfo().totalShards()));
181+
182+
final Repository repository = internalCluster().getCurrentMasterNodeInstance(RepositoriesService.class).repository(repoName);
183+
184+
logger.info("--> move index-N blob to next generation");
185+
final RepositoryData repositoryData = getRepositoryData(repository);
186+
final long beforeMoveGen = repositoryData.getGenId();
187+
Files.move(repo.resolve("index-" + beforeMoveGen), repo.resolve("index-" + (beforeMoveGen + 1)));
188+
189+
logger.info("--> set next generation as pending in the cluster state");
190+
final PlainActionFuture<Void> csUpdateFuture = PlainActionFuture.newFuture();
191+
internalCluster().getCurrentMasterNodeInstance(ClusterService.class).submitStateUpdateTask("set pending generation",
192+
new ClusterStateUpdateTask() {
193+
@Override
194+
public ClusterState execute(ClusterState currentState) {
195+
return ClusterState.builder(currentState).metaData(MetaData.builder(currentState.getMetaData())
196+
.putCustom(RepositoriesMetaData.TYPE,
197+
currentState.metaData().<RepositoriesMetaData>custom(RepositoriesMetaData.TYPE).withUpdatedGeneration(
198+
repository.getMetadata().name(), beforeMoveGen, beforeMoveGen + 1)).build()).build();
199+
}
200+
201+
@Override
202+
public void onFailure(String source, Exception e) {
203+
csUpdateFuture.onFailure(e);
204+
}
205+
206+
@Override
207+
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
208+
csUpdateFuture.onResponse(null);
209+
}
210+
}
211+
);
212+
csUpdateFuture.get();
213+
214+
logger.info("--> full cluster restart");
215+
internalCluster().fullRestart();
216+
ensureGreen();
217+
218+
Repository repositoryAfterRestart = internalCluster().getCurrentMasterNodeInstance(RepositoriesService.class).repository(repoName);
219+
220+
logger.info("--> verify index-N blob is found at the new location");
221+
assertThat(getRepositoryData(repositoryAfterRestart).getGenId(), is(beforeMoveGen + 1));
222+
223+
logger.info("--> delete snapshot");
224+
client().admin().cluster().prepareDeleteSnapshot(repoName, snapshot).get();
225+
226+
logger.info("--> verify index-N blob is found at the expected location");
227+
assertThat(getRepositoryData(repositoryAfterRestart).getGenId(), is(beforeMoveGen + 2));
228+
229+
logger.info("--> make sure snapshot doesn't exist");
230+
expectThrows(SnapshotMissingException.class, () -> client().admin().cluster().prepareGetSnapshots(repoName)
231+
.addSnapshots(snapshot).get());
232+
}
233+
234+
private void assertRepositoryBlocked(Client client, String repo, String existingSnapshot) {
235+
logger.info("--> try to delete snapshot");
236+
final RepositoryException repositoryException3 = expectThrows(RepositoryException.class,
237+
() -> client.admin().cluster().prepareDeleteSnapshot(repo, existingSnapshot).execute().actionGet());
238+
assertThat(repositoryException3.getMessage(),
239+
containsString("Could not read repository data because the contents of the repository do not match its expected state."));
240+
241+
logger.info("--> try to create snapshot");
242+
final RepositoryException repositoryException4 = expectThrows(RepositoryException.class,
243+
() -> client.admin().cluster().prepareCreateSnapshot(repo, existingSnapshot).execute().actionGet());
244+
assertThat(repositoryException4.getMessage(),
245+
containsString("Could not read repository data because the contents of the repository do not match its expected state."));
246+
}
247+
}

server/src/test/java/org/elasticsearch/snapshots/mockstore/MockEventuallyConsistentRepositoryTests.java

+2
Original file line numberDiff line numberDiff line change
@@ -140,6 +140,8 @@ public void testOverwriteSnapshotInfoBlob() {
140140
try (BlobStoreRepository repository =
141141
new MockEventuallyConsistentRepository(metaData, xContentRegistry(), clusterService, blobStoreContext, random())) {
142142
clusterService.addStateApplier(event -> repository.updateState(event.state()));
143+
// Apply state once to initialize repo properly like RepositoriesService would
144+
repository.updateState(clusterService.state());
143145
repository.start();
144146

145147
// We create a snap- blob for snapshot "foo" in the first generation

test/framework/src/main/java/org/elasticsearch/repositories/AbstractThirdPartyRepositoryTestCase.java

+1-2
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,7 @@ public void setUp() throws Exception {
7575
@Override
7676
public void tearDown() throws Exception {
7777
deleteAndAssertEmpty(getRepository().basePath());
78+
client().admin().cluster().prepareDeleteRepository("test-repo").get();
7879
super.tearDown();
7980
}
8081

@@ -168,8 +169,6 @@ protected void assertBlobsByPrefix(BlobPath path, String prefix, Map<String, Blo
168169
}
169170

170171
public void testCleanup() throws Exception {
171-
createRepository("test-repo");
172-
173172
createIndex("test-idx-1");
174173
createIndex("test-idx-2");
175174
createIndex("test-idx-3");

test/framework/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreTestUtil.java

+9-1
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
package org.elasticsearch.repositories.blobstore;
2020

2121
import org.apache.lucene.util.SameThreadExecutorService;
22+
import org.elasticsearch.Version;
2223
import org.elasticsearch.action.ActionRunnable;
2324
import org.elasticsearch.action.support.PlainActionFuture;
2425
import org.elasticsearch.cluster.ClusterChangedEvent;
@@ -29,6 +30,8 @@
2930
import org.elasticsearch.cluster.metadata.MetaData;
3031
import org.elasticsearch.cluster.metadata.RepositoriesMetaData;
3132
import org.elasticsearch.cluster.metadata.RepositoryMetaData;
33+
import org.elasticsearch.cluster.node.DiscoveryNode;
34+
import org.elasticsearch.cluster.node.DiscoveryNodes;
3235
import org.elasticsearch.cluster.service.ClusterApplierService;
3336
import org.elasticsearch.cluster.service.ClusterService;
3437
import org.elasticsearch.common.Strings;
@@ -69,6 +72,7 @@
6972
import java.util.concurrent.atomic.AtomicReference;
7073
import java.util.stream.Collectors;
7174

75+
import static org.elasticsearch.test.ESTestCase.buildNewFakeTransportAddress;
7276
import static org.elasticsearch.test.ESTestCase.randomIntBetween;
7377
import static org.hamcrest.Matchers.containsInAnyOrder;
7478
import static org.hamcrest.Matchers.empty;
@@ -326,7 +330,11 @@ private static ClusterService mockClusterService(ClusterState initialState) {
326330
final ClusterService clusterService = mock(ClusterService.class);
327331
final ClusterApplierService clusterApplierService = mock(ClusterApplierService.class);
328332
when(clusterService.getClusterApplierService()).thenReturn(clusterApplierService);
329-
final AtomicReference<ClusterState> currentState = new AtomicReference<>(initialState);
333+
// Setting local node as master so it may update the repository metadata in the cluster state
334+
final DiscoveryNode localNode = new DiscoveryNode("", buildNewFakeTransportAddress(), Version.CURRENT);
335+
final AtomicReference<ClusterState> currentState = new AtomicReference<>(
336+
ClusterState.builder(initialState).nodes(
337+
DiscoveryNodes.builder().add(localNode).masterNodeId(localNode.getId()).localNodeId(localNode.getId()).build()).build());
330338
when(clusterService.state()).then(invocationOnMock -> currentState.get());
331339
final List<ClusterStateApplier> appliers = new CopyOnWriteArrayList<>();
332340
doAnswer(invocation -> {

x-pack/plugin/core/src/test/java/org/elasticsearch/snapshots/SourceOnlySnapshotShardTests.java

+7-2
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import org.elasticsearch.cluster.routing.ShardRouting;
3333
import org.elasticsearch.cluster.routing.ShardRoutingState;
3434
import org.elasticsearch.cluster.routing.TestShardRouting;
35+
import org.elasticsearch.cluster.service.ClusterService;
3536
import org.elasticsearch.common.UUIDs;
3637
import org.elasticsearch.common.bytes.BytesReference;
3738
import org.elasticsearch.common.lucene.uid.Versions;
@@ -353,8 +354,12 @@ private Environment createEnvironment() {
353354
private Repository createRepository() {
354355
Settings settings = Settings.builder().put("location", randomAlphaOfLength(10)).build();
355356
RepositoryMetaData repositoryMetaData = new RepositoryMetaData(randomAlphaOfLength(10), FsRepository.TYPE, settings);
356-
return new FsRepository(repositoryMetaData, createEnvironment(), xContentRegistry(),
357-
BlobStoreTestUtil.mockClusterService(repositoryMetaData));
357+
final ClusterService clusterService = BlobStoreTestUtil.mockClusterService(repositoryMetaData);
358+
final Repository repository = new FsRepository(repositoryMetaData, createEnvironment(), xContentRegistry(), clusterService);
359+
clusterService.addStateApplier(e -> repository.updateState(e.state()));
360+
// Apply state once to initialize repo properly like RepositoriesService would
361+
repository.updateState(clusterService.state());
362+
return repository;
358363
}
359364

360365
private static void runAsSnapshot(ThreadPool pool, Runnable runnable) {

0 commit comments

Comments
 (0)