Skip to content

Commit 064d9f2

Browse files
Fix Snapshot Repository Corruption in Downgrade Scenarios (#50692)
This PR introduces test infrastructure for downgrading a cluster while interacting with a given repository. It fixes the fact that repository metadata in the new format could be written while there's still older snapshots in the repository that require the old-format metadata to be restorable.
1 parent 71afeec commit 064d9f2

File tree

8 files changed

+503
-9
lines changed

8 files changed

+503
-9
lines changed
+122
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,122 @@
1+
/*
2+
* Licensed to Elasticsearch under one or more contributor
3+
* license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright
5+
* ownership. Elasticsearch licenses this file to you under
6+
* the Apache License, Version 2.0 (the "License"); you may
7+
* not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
import org.elasticsearch.gradle.Version
21+
import org.elasticsearch.gradle.info.BuildParams
22+
import org.elasticsearch.gradle.testclusters.RestTestRunnerTask
23+
24+
apply plugin: 'elasticsearch.testclusters'
25+
apply plugin: 'elasticsearch.standalone-test'
26+
27+
tasks.register("bwcTest") {
28+
description = 'Runs backwards compatibility tests.'
29+
group = 'verification'
30+
}
31+
32+
dependencies {
33+
testCompile project(':client:rest-high-level')
34+
}
35+
36+
for (Version bwcVersion : bwcVersions.indexCompatible) {
37+
String baseName = "v${bwcVersion}"
38+
String oldClusterName = "${baseName}-old"
39+
String newClusterName = "${baseName}-new"
40+
41+
def clusterSettings = { v ->
42+
return {
43+
version = v
44+
numberOfNodes = 2
45+
setting 'path.repo', "${buildDir}/cluster/shared/repo/${baseName}"
46+
javaHome = BuildParams.runtimeJavaHome
47+
}
48+
}
49+
50+
testClusters {
51+
"${oldClusterName}" clusterSettings(bwcVersion.toString())
52+
"${newClusterName}" clusterSettings(project.version)
53+
}
54+
55+
tasks.register("${baseName}#Step1OldClusterTest", RestTestRunnerTask) {
56+
useCluster testClusters."${oldClusterName}"
57+
mustRunAfter(precommit)
58+
doFirst {
59+
project.delete("${buildDir}/cluster/shared/repo/${baseName}")
60+
}
61+
systemProperty 'tests.rest.suite', 'step1'
62+
}
63+
64+
tasks.register("${baseName}#Step2NewClusterTest", RestTestRunnerTask) {
65+
useCluster testClusters."${newClusterName}"
66+
dependsOn "${baseName}#Step1OldClusterTest"
67+
systemProperty 'tests.rest.suite', 'step2'
68+
}
69+
70+
tasks.register("${baseName}#Step3OldClusterTest", RestTestRunnerTask) {
71+
useCluster testClusters."${oldClusterName}"
72+
dependsOn "${baseName}#Step2NewClusterTest"
73+
systemProperty 'tests.rest.suite', 'step3'
74+
}
75+
76+
tasks.register("${baseName}#Step4NewClusterTest", RestTestRunnerTask) {
77+
useCluster testClusters."${newClusterName}"
78+
dependsOn "${baseName}#Step3OldClusterTest"
79+
systemProperty 'tests.rest.suite', 'step4'
80+
}
81+
82+
tasks.matching { it.name.startsWith(baseName) && it.name.endsWith("ClusterTest") }.configureEach {
83+
it.systemProperty 'tests.old_cluster_version', bwcVersion.toString().minus("-SNAPSHOT")
84+
it.systemProperty 'tests.path.repo', "${buildDir}/cluster/shared/repo/${baseName}"
85+
def clusterName = it.name.contains("Step2") || it.name.contains("Step4") ? "${newClusterName}" : "${oldClusterName}"
86+
it.nonInputProperties.systemProperty('tests.rest.cluster', "${-> testClusters."${clusterName}".allHttpSocketURI.join(",")}")
87+
it.nonInputProperties.systemProperty('tests.clustername', "${-> testClusters."${clusterName}".getName()}")
88+
}
89+
90+
if (project.bwc_tests_enabled) {
91+
bwcTest.dependsOn(
92+
tasks.register("${baseName}#bwcTest") {
93+
dependsOn tasks.named("${baseName}#Step4NewClusterTest")
94+
}
95+
)
96+
}
97+
}
98+
99+
task bwcTestSnapshots {
100+
if (project.bwc_tests_enabled) {
101+
for (final def version : bwcVersions.unreleasedIndexCompatible) {
102+
dependsOn "v${version}#bwcTest"
103+
}
104+
}
105+
}
106+
107+
check.dependsOn(bwcTestSnapshots)
108+
109+
configurations {
110+
testArtifacts.extendsFrom testRuntime
111+
}
112+
113+
task testJar(type: Jar) {
114+
appendix 'test'
115+
from sourceSets.test.output
116+
}
117+
118+
artifacts {
119+
testArtifacts testJar
120+
}
121+
122+
test.enabled = false
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,269 @@
1+
/*
2+
* Licensed to Elasticsearch under one or more contributor
3+
* license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright
5+
* ownership. Elasticsearch licenses this file to you under
6+
* the Apache License, Version 2.0 (the "License"); you may
7+
* not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.elasticsearch.upgrades;
21+
22+
import org.elasticsearch.action.admin.cluster.repositories.put.PutRepositoryRequest;
23+
import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotRequest;
24+
import org.elasticsearch.action.admin.cluster.snapshots.delete.DeleteSnapshotRequest;
25+
import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotRequest;
26+
import org.elasticsearch.action.admin.cluster.snapshots.status.SnapshotStatus;
27+
import org.elasticsearch.action.admin.cluster.snapshots.status.SnapshotsStatusRequest;
28+
import org.elasticsearch.action.admin.cluster.snapshots.status.SnapshotsStatusResponse;
29+
import org.elasticsearch.client.Node;
30+
import org.elasticsearch.client.Request;
31+
import org.elasticsearch.client.RequestOptions;
32+
import org.elasticsearch.client.Response;
33+
import org.elasticsearch.client.RestClient;
34+
import org.elasticsearch.client.RestHighLevelClient;
35+
import org.elasticsearch.common.settings.Settings;
36+
import org.elasticsearch.common.xcontent.DeprecationHandler;
37+
import org.elasticsearch.common.xcontent.XContentParser;
38+
import org.elasticsearch.common.xcontent.json.JsonXContent;
39+
import org.elasticsearch.snapshots.RestoreInfo;
40+
import org.elasticsearch.test.rest.ESRestTestCase;
41+
42+
import java.io.IOException;
43+
import java.io.InputStream;
44+
import java.net.HttpURLConnection;
45+
import java.util.List;
46+
import java.util.Map;
47+
import java.util.stream.Collectors;
48+
49+
import static org.hamcrest.Matchers.equalTo;
50+
import static org.hamcrest.Matchers.hasItem;
51+
import static org.hamcrest.Matchers.hasSize;
52+
import static org.hamcrest.Matchers.is;
53+
54+
/**
55+
* Tests that verify that a snapshot repository is not getting corrupted and continues to function properly when accessed by multiple
56+
* clusters of different versions. Concretely this test suite is simulating the following scenario:
57+
* <ul>
58+
* <li>Start and run against a cluster in an old version: {@link TestStep#STEP1_OLD_CLUSTER}</li>
59+
* <li>Start and run against a cluster running the current version: {@link TestStep#STEP2_NEW_CLUSTER}</li>
60+
* <li>Run against the old version cluster from the first step: {@link TestStep#STEP3_OLD_CLUSTER}</li>
61+
* <li>Run against the current version cluster from the second step: {@link TestStep#STEP4_NEW_CLUSTER}</li>
62+
* </ul>
63+
* TODO: Add two more steps: delete all old version snapshots from the repository, then downgrade again and verify that the repository
64+
* is not being corrupted. This requires first merging the logic for reading the min_version field in RepositoryData back to 7.6.
65+
*/
66+
public class MultiVersionRepositoryAccessIT extends ESRestTestCase {
67+
68+
private enum TestStep {
69+
STEP1_OLD_CLUSTER("step1"),
70+
STEP2_NEW_CLUSTER("step2"),
71+
STEP3_OLD_CLUSTER("step3"),
72+
STEP4_NEW_CLUSTER("step4");
73+
74+
private final String name;
75+
76+
TestStep(String name) {
77+
this.name = name;
78+
}
79+
80+
@Override
81+
public String toString() {
82+
return name;
83+
}
84+
85+
public static TestStep parse(String value) {
86+
switch (value) {
87+
case "step1":
88+
return STEP1_OLD_CLUSTER;
89+
case "step2":
90+
return STEP2_NEW_CLUSTER;
91+
case "step3":
92+
return STEP3_OLD_CLUSTER;
93+
case "step4":
94+
return STEP4_NEW_CLUSTER;
95+
default:
96+
throw new AssertionError("unknown test step: " + value);
97+
}
98+
}
99+
}
100+
101+
protected static final TestStep TEST_STEP = TestStep.parse(System.getProperty("tests.rest.suite"));
102+
103+
@Override
104+
protected boolean preserveSnapshotsUponCompletion() {
105+
return true;
106+
}
107+
108+
@Override
109+
protected boolean preserveReposUponCompletion() {
110+
return true;
111+
}
112+
113+
public void testCreateAndRestoreSnapshot() throws IOException {
114+
final String repoName = getTestName();
115+
try (RestHighLevelClient client = new RestHighLevelClient(RestClient.builder(adminClient().getNodes().toArray(new Node[0])))) {
116+
final int shards = 3;
117+
createIndex(client, "test-index", shards);
118+
createRepository(client, repoName, false);
119+
createSnapshot(client, repoName, "snapshot-" + TEST_STEP);
120+
final String snapshotToDeleteName = "snapshot-to-delete";
121+
// Create a snapshot and delete it right away again to test the impact of each version's cleanup functionality that is run
122+
// as part of the snapshot delete
123+
createSnapshot(client, repoName, snapshotToDeleteName);
124+
final List<Map<String, Object>> snapshotsIncludingToDelete = listSnapshots(repoName);
125+
// Every step creates one snapshot and we have to add one more for the temporary snapshot
126+
assertThat(snapshotsIncludingToDelete, hasSize(TEST_STEP.ordinal() + 1 + 1));
127+
assertThat(snapshotsIncludingToDelete.stream().map(
128+
sn -> (String) sn.get("snapshot")).collect(Collectors.toList()), hasItem(snapshotToDeleteName));
129+
deleteSnapshot(client, repoName, snapshotToDeleteName);
130+
final List<Map<String, Object>> snapshots = listSnapshots(repoName);
131+
assertThat(snapshots, hasSize(TEST_STEP.ordinal() + 1));
132+
assertSnapshotStatusSuccessful(client, repoName, snapshots);
133+
if (TEST_STEP == TestStep.STEP3_OLD_CLUSTER) {
134+
ensureSnapshotRestoreWorks(client, repoName, "snapshot-" + TestStep.STEP1_OLD_CLUSTER, shards);
135+
} else if (TEST_STEP == TestStep.STEP4_NEW_CLUSTER) {
136+
for (TestStep value : TestStep.values()) {
137+
ensureSnapshotRestoreWorks(client, repoName, "snapshot-" + value, shards);
138+
}
139+
}
140+
} finally {
141+
deleteRepository(repoName);
142+
}
143+
}
144+
145+
public void testReadOnlyRepo() throws IOException {
146+
final String repoName = getTestName();
147+
try (RestHighLevelClient client = new RestHighLevelClient(RestClient.builder(adminClient().getNodes().toArray(new Node[0])))) {
148+
final int shards = 3;
149+
final boolean readOnly = TEST_STEP.ordinal() > 1; // only restore from read-only repo in steps 3 and 4
150+
createRepository(client, repoName, readOnly);
151+
if (readOnly == false) {
152+
createIndex(client, "test-index", shards);
153+
createSnapshot(client, repoName, "snapshot-" + TEST_STEP);
154+
}
155+
final List<Map<String, Object>> snapshots = listSnapshots(repoName);
156+
switch (TEST_STEP) {
157+
case STEP1_OLD_CLUSTER:
158+
assertThat(snapshots, hasSize(1));
159+
break;
160+
case STEP2_NEW_CLUSTER:
161+
case STEP4_NEW_CLUSTER:
162+
case STEP3_OLD_CLUSTER:
163+
assertThat(snapshots, hasSize(2));
164+
break;
165+
}
166+
assertSnapshotStatusSuccessful(client, repoName, snapshots);
167+
if (TEST_STEP == TestStep.STEP3_OLD_CLUSTER) {
168+
ensureSnapshotRestoreWorks(client, repoName, "snapshot-" + TestStep.STEP1_OLD_CLUSTER, shards);
169+
} else if (TEST_STEP == TestStep.STEP4_NEW_CLUSTER) {
170+
ensureSnapshotRestoreWorks(client, repoName, "snapshot-" + TestStep.STEP1_OLD_CLUSTER, shards);
171+
ensureSnapshotRestoreWorks(client, repoName, "snapshot-" + TestStep.STEP2_NEW_CLUSTER, shards);
172+
}
173+
}
174+
}
175+
176+
public void testUpgradeMovesRepoToNewMetaVersion() throws IOException {
177+
if (TEST_STEP.ordinal() > 1) {
178+
// Only testing the first 2 steps here
179+
return;
180+
}
181+
final String repoName = getTestName();
182+
try (RestHighLevelClient client = new RestHighLevelClient(RestClient.builder(adminClient().getNodes().toArray(new Node[0])))) {
183+
final int shards = 3;
184+
createIndex(client, "test-index", shards);
185+
createRepository(client, repoName, false);
186+
createSnapshot(client, repoName, "snapshot-" + TEST_STEP);
187+
final List<Map<String, Object>> snapshots = listSnapshots(repoName);
188+
// Every step creates one snapshot
189+
assertThat(snapshots, hasSize(TEST_STEP.ordinal() + 1));
190+
assertSnapshotStatusSuccessful(client, repoName, snapshots);
191+
if (TEST_STEP == TestStep.STEP1_OLD_CLUSTER) {
192+
ensureSnapshotRestoreWorks(client, repoName, "snapshot-" + TestStep.STEP1_OLD_CLUSTER, shards);
193+
} else {
194+
deleteSnapshot(client, repoName, "snapshot-" + TestStep.STEP1_OLD_CLUSTER);
195+
ensureSnapshotRestoreWorks(client, repoName, "snapshot-" + TestStep.STEP2_NEW_CLUSTER, shards);
196+
createSnapshot(client, repoName, "snapshot-1");
197+
ensureSnapshotRestoreWorks(client, repoName, "snapshot-1", shards);
198+
deleteSnapshot(client, repoName, "snapshot-" + TestStep.STEP2_NEW_CLUSTER);
199+
createSnapshot(client, repoName, "snapshot-2");
200+
ensureSnapshotRestoreWorks(client, repoName, "snapshot-2", shards);
201+
}
202+
} finally {
203+
deleteRepository(repoName);
204+
}
205+
}
206+
207+
private static void assertSnapshotStatusSuccessful(RestHighLevelClient client, String repoName,
208+
List<Map<String, Object>> snapshots) throws IOException {
209+
final SnapshotsStatusResponse statusResponse = client.snapshot().status(new SnapshotsStatusRequest(repoName,
210+
snapshots.stream().map(sn -> (String) sn.get("snapshot")).toArray(String[]::new)), RequestOptions.DEFAULT);
211+
for (SnapshotStatus status : statusResponse.getSnapshots()) {
212+
assertThat(status.getShardsStats().getFailedShards(), is(0));
213+
}
214+
}
215+
216+
private void deleteSnapshot(RestHighLevelClient client, String repoName, String name) throws IOException {
217+
assertThat(client.snapshot().delete(new DeleteSnapshotRequest(repoName, name), RequestOptions.DEFAULT).isAcknowledged(), is(true));
218+
}
219+
220+
@SuppressWarnings("unchecked")
221+
private List<Map<String, Object>> listSnapshots(String repoName) throws IOException {
222+
try (InputStream entity = client().performRequest(
223+
new Request("GET", "/_snapshot/" + repoName + "/_all")).getEntity().getContent();
224+
XContentParser parser = JsonXContent.jsonXContent.createParser(
225+
xContentRegistry(), DeprecationHandler.THROW_UNSUPPORTED_OPERATION, entity)) {
226+
final Map<String, Object> raw = parser.map();
227+
// Bwc lookup since the format of the snapshots response changed between versions
228+
if (raw.containsKey("snapshots")) {
229+
return (List<Map<String, Object>>) raw.get("snapshots");
230+
} else {
231+
return (List<Map<String, Object>>) ((List<Map<?, ?>>) raw.get("responses")).get(0).get("snapshots");
232+
}
233+
}
234+
}
235+
236+
private static void ensureSnapshotRestoreWorks(RestHighLevelClient client, String repoName, String name,
237+
int shards) throws IOException {
238+
wipeAllIndices();
239+
final RestoreInfo restoreInfo =
240+
client.snapshot().restore(new RestoreSnapshotRequest().repository(repoName).snapshot(name).waitForCompletion(true),
241+
RequestOptions.DEFAULT).getRestoreInfo();
242+
assertThat(restoreInfo.failedShards(), is(0));
243+
assertThat(restoreInfo.successfulShards(), equalTo(shards));
244+
}
245+
246+
private static void createRepository(RestHighLevelClient client, String repoName, boolean readOnly) throws IOException {
247+
assertThat(client.snapshot().createRepository(new PutRepositoryRequest(repoName).type("fs").settings(
248+
Settings.builder().put("location", "./" + repoName).put("readonly", readOnly)), RequestOptions.DEFAULT).isAcknowledged(),
249+
is(true));
250+
}
251+
252+
private static void createSnapshot(RestHighLevelClient client, String repoName, String name) throws IOException {
253+
client.snapshot().create(new CreateSnapshotRequest(repoName, name).waitForCompletion(true), RequestOptions.DEFAULT);
254+
}
255+
256+
private void createIndex(RestHighLevelClient client, String name, int shards) throws IOException {
257+
final Request putIndexRequest = new Request("PUT", "/" + name);
258+
putIndexRequest.setJsonEntity("{\n" +
259+
" \"settings\" : {\n" +
260+
" \"index\" : {\n" +
261+
" \"number_of_shards\" : " + shards + ", \n" +
262+
" \"number_of_replicas\" : 0 \n" +
263+
" }\n" +
264+
" }\n" +
265+
"}");
266+
final Response response = client.getLowLevelClient().performRequest(putIndexRequest);
267+
assertThat(response.getStatusLine().getStatusCode(), is(HttpURLConnection.HTTP_OK));
268+
}
269+
}

0 commit comments

Comments
 (0)