-
Notifications
You must be signed in to change notification settings - Fork 25.2k
Fix Snapshot Repository Corruption in Downgrade Scenarios #50692
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
edbff0f
81c2228
a7306db
78907a1
ea223c9
e9832b0
74ce345
b349787
b61b8e9
3dbe0c4
f924981
93c5776
13ce331
04702eb
8de5cad
15bbc04
3e7fef0
4c04999
cb50970
5004dda
33a3acd
f77ccd2
f7e65fc
72cfb1b
bbb2b3d
1321106
2637126
cfe7254
5df6100
38de2a7
cecf2d0
cc6740e
fd55b40
f1cda9d
c1436d3
d4a18d8
ba5f826
7d5d256
2ee45d6
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,122 @@ | ||
/* | ||
* Licensed to Elasticsearch under one or more contributor | ||
* license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright | ||
* ownership. Elasticsearch licenses this file to you under | ||
* the Apache License, Version 2.0 (the "License"); you may | ||
* not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, | ||
* software distributed under the License is distributed on an | ||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
* KIND, either express or implied. See the License for the | ||
* specific language governing permissions and limitations | ||
* under the License. | ||
*/ | ||
|
||
import org.elasticsearch.gradle.Version | ||
import org.elasticsearch.gradle.info.BuildParams | ||
import org.elasticsearch.gradle.testclusters.RestTestRunnerTask | ||
|
||
apply plugin: 'elasticsearch.testclusters' | ||
apply plugin: 'elasticsearch.standalone-test' | ||
|
||
tasks.register("bwcTest") { | ||
description = 'Runs backwards compatibility tests.' | ||
group = 'verification' | ||
} | ||
|
||
dependencies { | ||
testCompile project(':client:rest-high-level') | ||
} | ||
|
||
for (Version bwcVersion : bwcVersions.indexCompatible) { | ||
String baseName = "v${bwcVersion}" | ||
String oldClusterName = "${baseName}-old" | ||
String newClusterName = "${baseName}-new" | ||
|
||
def clusterSettings = { v -> | ||
return { | ||
version = v | ||
numberOfNodes = 2 | ||
setting 'path.repo', "${buildDir}/cluster/shared/repo/${baseName}" | ||
javaHome = BuildParams.runtimeJavaHome | ||
} | ||
} | ||
|
||
testClusters { | ||
"${oldClusterName}" clusterSettings(bwcVersion.toString()) | ||
"${newClusterName}" clusterSettings(project.version) | ||
} | ||
|
||
tasks.register("${baseName}#Step1OldClusterTest", RestTestRunnerTask) { | ||
useCluster testClusters."${oldClusterName}" | ||
mustRunAfter(precommit) | ||
doFirst { | ||
project.delete("${buildDir}/cluster/shared/repo/${baseName}") | ||
} | ||
systemProperty 'tests.rest.suite', 'step1' | ||
} | ||
|
||
tasks.register("${baseName}#Step2NewClusterTest", RestTestRunnerTask) { | ||
useCluster testClusters."${newClusterName}" | ||
dependsOn "${baseName}#Step1OldClusterTest" | ||
systemProperty 'tests.rest.suite', 'step2' | ||
} | ||
|
||
tasks.register("${baseName}#Step3OldClusterTest", RestTestRunnerTask) { | ||
useCluster testClusters."${oldClusterName}" | ||
dependsOn "${baseName}#Step2NewClusterTest" | ||
systemProperty 'tests.rest.suite', 'step3' | ||
} | ||
|
||
tasks.register("${baseName}#Step4NewClusterTest", RestTestRunnerTask) { | ||
useCluster testClusters."${newClusterName}" | ||
dependsOn "${baseName}#Step3OldClusterTest" | ||
systemProperty 'tests.rest.suite', 'step4' | ||
} | ||
|
||
tasks.matching { it.name.startsWith(baseName) && it.name.endsWith("ClusterTest") }.configureEach { | ||
it.systemProperty 'tests.old_cluster_version', bwcVersion.toString().minus("-SNAPSHOT") | ||
it.systemProperty 'tests.path.repo', "${buildDir}/cluster/shared/repo/${baseName}" | ||
def clusterName = it.name.contains("Step2") || it.name.contains("Step4") ? "${newClusterName}" : "${oldClusterName}" | ||
it.nonInputProperties.systemProperty('tests.rest.cluster', "${-> testClusters."${clusterName}".allHttpSocketURI.join(",")}") | ||
it.nonInputProperties.systemProperty('tests.clustername', "${-> testClusters."${clusterName}".getName()}") | ||
} | ||
|
||
if (project.bwc_tests_enabled) { | ||
bwcTest.dependsOn( | ||
tasks.register("${baseName}#bwcTest") { | ||
dependsOn tasks.named("${baseName}#Step4NewClusterTest") | ||
} | ||
) | ||
} | ||
} | ||
|
||
task bwcTestSnapshots { | ||
if (project.bwc_tests_enabled) { | ||
for (final def version : bwcVersions.unreleasedIndexCompatible) { | ||
dependsOn "v${version}#bwcTest" | ||
} | ||
} | ||
} | ||
|
||
check.dependsOn(bwcTestSnapshots) | ||
|
||
configurations { | ||
testArtifacts.extendsFrom testRuntime | ||
} | ||
|
||
task testJar(type: Jar) { | ||
appendix 'test' | ||
from sourceSets.test.output | ||
} | ||
|
||
artifacts { | ||
testArtifacts testJar | ||
} | ||
|
||
test.enabled = false |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,269 @@ | ||
/* | ||
* Licensed to Elasticsearch under one or more contributor | ||
* license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright | ||
* ownership. Elasticsearch licenses this file to you under | ||
* the Apache License, Version 2.0 (the "License"); you may | ||
* not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, | ||
* software distributed under the License is distributed on an | ||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
* KIND, either express or implied. See the License for the | ||
* specific language governing permissions and limitations | ||
* under the License. | ||
*/ | ||
|
||
package org.elasticsearch.upgrades; | ||
|
||
import org.elasticsearch.action.admin.cluster.repositories.put.PutRepositoryRequest; | ||
import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotRequest; | ||
import org.elasticsearch.action.admin.cluster.snapshots.delete.DeleteSnapshotRequest; | ||
import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotRequest; | ||
import org.elasticsearch.action.admin.cluster.snapshots.status.SnapshotStatus; | ||
import org.elasticsearch.action.admin.cluster.snapshots.status.SnapshotsStatusRequest; | ||
import org.elasticsearch.action.admin.cluster.snapshots.status.SnapshotsStatusResponse; | ||
import org.elasticsearch.client.Node; | ||
import org.elasticsearch.client.Request; | ||
import org.elasticsearch.client.RequestOptions; | ||
import org.elasticsearch.client.Response; | ||
import org.elasticsearch.client.RestClient; | ||
import org.elasticsearch.client.RestHighLevelClient; | ||
import org.elasticsearch.common.settings.Settings; | ||
import org.elasticsearch.common.xcontent.DeprecationHandler; | ||
import org.elasticsearch.common.xcontent.XContentParser; | ||
import org.elasticsearch.common.xcontent.json.JsonXContent; | ||
import org.elasticsearch.snapshots.RestoreInfo; | ||
import org.elasticsearch.test.rest.ESRestTestCase; | ||
|
||
import java.io.IOException; | ||
import java.io.InputStream; | ||
import java.net.HttpURLConnection; | ||
import java.util.List; | ||
import java.util.Map; | ||
import java.util.stream.Collectors; | ||
|
||
import static org.hamcrest.Matchers.equalTo; | ||
import static org.hamcrest.Matchers.hasItem; | ||
import static org.hamcrest.Matchers.hasSize; | ||
import static org.hamcrest.Matchers.is; | ||
|
||
/** | ||
* Tests that verify that a snapshot repository is not getting corrupted and continues to function properly when accessed by multiple | ||
* clusters of different versions. Concretely this test suite is simulating the following scenario: | ||
* <ul> | ||
* <li>Start and run against a cluster in an old version: {@link TestStep#STEP1_OLD_CLUSTER}</li> | ||
* <li>Start and run against a cluster running the current version: {@link TestStep#STEP2_NEW_CLUSTER}</li> | ||
* <li>Run against the old version cluster from the first step: {@link TestStep#STEP3_OLD_CLUSTER}</li> | ||
* <li>Run against the current version cluster from the second step: {@link TestStep#STEP4_NEW_CLUSTER}</li> | ||
* </ul> | ||
* TODO: Add two more steps: delete all old version snapshots from the repository, then downgrade again and verify that the repository | ||
* is not being corrupted. This requires first merging the logic for reading the min_version field in RepositoryData back to 7.6. | ||
*/ | ||
public class MultiVersionRepositoryAccessIT extends ESRestTestCase { | ||
|
||
private enum TestStep { | ||
STEP1_OLD_CLUSTER("step1"), | ||
STEP2_NEW_CLUSTER("step2"), | ||
STEP3_OLD_CLUSTER("step3"), | ||
STEP4_NEW_CLUSTER("step4"); | ||
|
||
private final String name; | ||
|
||
TestStep(String name) { | ||
this.name = name; | ||
} | ||
|
||
@Override | ||
public String toString() { | ||
return name; | ||
} | ||
|
||
public static TestStep parse(String value) { | ||
switch (value) { | ||
case "step1": | ||
return STEP1_OLD_CLUSTER; | ||
case "step2": | ||
return STEP2_NEW_CLUSTER; | ||
case "step3": | ||
return STEP3_OLD_CLUSTER; | ||
case "step4": | ||
return STEP4_NEW_CLUSTER; | ||
default: | ||
throw new AssertionError("unknown test step: " + value); | ||
} | ||
} | ||
} | ||
|
||
protected static final TestStep TEST_STEP = TestStep.parse(System.getProperty("tests.rest.suite")); | ||
|
||
@Override | ||
protected boolean preserveSnapshotsUponCompletion() { | ||
return true; | ||
} | ||
|
||
@Override | ||
protected boolean preserveReposUponCompletion() { | ||
return true; | ||
} | ||
|
||
public void testCreateAndRestoreSnapshot() throws IOException { | ||
final String repoName = getTestName(); | ||
try (RestHighLevelClient client = new RestHighLevelClient(RestClient.builder(adminClient().getNodes().toArray(new Node[0])))) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why is it always taking the first node here? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's taking all nodes? It's just creating the empty array for the conversion to There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Java 🗡 |
||
final int shards = 3; | ||
createIndex(client, "test-index", shards); | ||
createRepository(client, repoName, false); | ||
createSnapshot(client, repoName, "snapshot-" + TEST_STEP); | ||
final String snapshotToDeleteName = "snapshot-to-delete"; | ||
// Create a snapshot and delete it right away again to test the impact of each version's cleanup functionality that is run | ||
// as part of the snapshot delete | ||
createSnapshot(client, repoName, snapshotToDeleteName); | ||
final List<Map<String, Object>> snapshotsIncludingToDelete = listSnapshots(repoName); | ||
// Every step creates one snapshot and we have to add one more for the temporary snapshot | ||
assertThat(snapshotsIncludingToDelete, hasSize(TEST_STEP.ordinal() + 1 + 1)); | ||
assertThat(snapshotsIncludingToDelete.stream().map( | ||
sn -> (String) sn.get("snapshot")).collect(Collectors.toList()), hasItem(snapshotToDeleteName)); | ||
deleteSnapshot(client, repoName, snapshotToDeleteName); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. check that the snapshot can be listed before deletion? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sure :) |
||
final List<Map<String, Object>> snapshots = listSnapshots(repoName); | ||
assertThat(snapshots, hasSize(TEST_STEP.ordinal() + 1)); | ||
assertSnapshotStatusSuccessful(client, repoName, snapshots); | ||
if (TEST_STEP == TestStep.STEP3_OLD_CLUSTER) { | ||
ensureSnapshotRestoreWorks(client, repoName, "snapshot-" + TestStep.STEP1_OLD_CLUSTER, shards); | ||
} else if (TEST_STEP == TestStep.STEP4_NEW_CLUSTER) { | ||
for (TestStep value : TestStep.values()) { | ||
ensureSnapshotRestoreWorks(client, repoName, "snapshot-" + value, shards); | ||
} | ||
} | ||
} finally { | ||
deleteRepository(repoName); | ||
} | ||
} | ||
|
||
public void testReadOnlyRepo() throws IOException { | ||
final String repoName = getTestName(); | ||
try (RestHighLevelClient client = new RestHighLevelClient(RestClient.builder(adminClient().getNodes().toArray(new Node[0])))) { | ||
final int shards = 3; | ||
final boolean readOnly = TEST_STEP.ordinal() > 1; // only restore from read-only repo in steps 3 and 4 | ||
createRepository(client, repoName, readOnly); | ||
if (readOnly == false) { | ||
createIndex(client, "test-index", shards); | ||
createSnapshot(client, repoName, "snapshot-" + TEST_STEP); | ||
} | ||
final List<Map<String, Object>> snapshots = listSnapshots(repoName); | ||
switch (TEST_STEP) { | ||
case STEP1_OLD_CLUSTER: | ||
assertThat(snapshots, hasSize(1)); | ||
break; | ||
case STEP2_NEW_CLUSTER: | ||
case STEP4_NEW_CLUSTER: | ||
case STEP3_OLD_CLUSTER: | ||
assertThat(snapshots, hasSize(2)); | ||
break; | ||
} | ||
assertSnapshotStatusSuccessful(client, repoName, snapshots); | ||
if (TEST_STEP == TestStep.STEP3_OLD_CLUSTER) { | ||
ensureSnapshotRestoreWorks(client, repoName, "snapshot-" + TestStep.STEP1_OLD_CLUSTER, shards); | ||
} else if (TEST_STEP == TestStep.STEP4_NEW_CLUSTER) { | ||
ensureSnapshotRestoreWorks(client, repoName, "snapshot-" + TestStep.STEP1_OLD_CLUSTER, shards); | ||
ensureSnapshotRestoreWorks(client, repoName, "snapshot-" + TestStep.STEP2_NEW_CLUSTER, shards); | ||
} | ||
} | ||
} | ||
|
||
public void testUpgradeMovesRepoToNewMetaVersion() throws IOException { | ||
if (TEST_STEP.ordinal() > 1) { | ||
// Only testing the first 2 steps here | ||
return; | ||
} | ||
final String repoName = getTestName(); | ||
try (RestHighLevelClient client = new RestHighLevelClient(RestClient.builder(adminClient().getNodes().toArray(new Node[0])))) { | ||
final int shards = 3; | ||
createIndex(client, "test-index", shards); | ||
createRepository(client, repoName, false); | ||
createSnapshot(client, repoName, "snapshot-" + TEST_STEP); | ||
final List<Map<String, Object>> snapshots = listSnapshots(repoName); | ||
// Every step creates one snapshot | ||
assertThat(snapshots, hasSize(TEST_STEP.ordinal() + 1)); | ||
assertSnapshotStatusSuccessful(client, repoName, snapshots); | ||
if (TEST_STEP == TestStep.STEP1_OLD_CLUSTER) { | ||
ensureSnapshotRestoreWorks(client, repoName, "snapshot-" + TestStep.STEP1_OLD_CLUSTER, shards); | ||
} else { | ||
deleteSnapshot(client, repoName, "snapshot-" + TestStep.STEP1_OLD_CLUSTER); | ||
ensureSnapshotRestoreWorks(client, repoName, "snapshot-" + TestStep.STEP2_NEW_CLUSTER, shards); | ||
createSnapshot(client, repoName, "snapshot-1"); | ||
ensureSnapshotRestoreWorks(client, repoName, "snapshot-1", shards); | ||
deleteSnapshot(client, repoName, "snapshot-" + TestStep.STEP2_NEW_CLUSTER); | ||
createSnapshot(client, repoName, "snapshot-2"); | ||
ensureSnapshotRestoreWorks(client, repoName, "snapshot-2", shards); | ||
} | ||
} finally { | ||
deleteRepository(repoName); | ||
} | ||
} | ||
|
||
private static void assertSnapshotStatusSuccessful(RestHighLevelClient client, String repoName, | ||
List<Map<String, Object>> snapshots) throws IOException { | ||
final SnapshotsStatusResponse statusResponse = client.snapshot().status(new SnapshotsStatusRequest(repoName, | ||
snapshots.stream().map(sn -> (String) sn.get("snapshot")).toArray(String[]::new)), RequestOptions.DEFAULT); | ||
for (SnapshotStatus status : statusResponse.getSnapshots()) { | ||
assertThat(status.getShardsStats().getFailedShards(), is(0)); | ||
} | ||
} | ||
|
||
private void deleteSnapshot(RestHighLevelClient client, String repoName, String name) throws IOException { | ||
assertThat(client.snapshot().delete(new DeleteSnapshotRequest(repoName, name), RequestOptions.DEFAULT).isAcknowledged(), is(true)); | ||
} | ||
|
||
@SuppressWarnings("unchecked") | ||
private List<Map<String, Object>> listSnapshots(String repoName) throws IOException { | ||
try (InputStream entity = client().performRequest( | ||
new Request("GET", "/_snapshot/" + repoName + "/_all")).getEntity().getContent(); | ||
XContentParser parser = JsonXContent.jsonXContent.createParser( | ||
xContentRegistry(), DeprecationHandler.THROW_UNSUPPORTED_OPERATION, entity)) { | ||
final Map<String, Object> raw = parser.map(); | ||
// Bwc lookup since the format of the snapshots response changed between versions | ||
if (raw.containsKey("snapshots")) { | ||
return (List<Map<String, Object>>) raw.get("snapshots"); | ||
} else { | ||
return (List<Map<String, Object>>) ((List<Map<?, ?>>) raw.get("responses")).get(0).get("snapshots"); | ||
} | ||
} | ||
} | ||
|
||
private static void ensureSnapshotRestoreWorks(RestHighLevelClient client, String repoName, String name, | ||
int shards) throws IOException { | ||
wipeAllIndices(); | ||
final RestoreInfo restoreInfo = | ||
client.snapshot().restore(new RestoreSnapshotRequest().repository(repoName).snapshot(name).waitForCompletion(true), | ||
RequestOptions.DEFAULT).getRestoreInfo(); | ||
assertThat(restoreInfo.failedShards(), is(0)); | ||
assertThat(restoreInfo.successfulShards(), equalTo(shards)); | ||
} | ||
|
||
private static void createRepository(RestHighLevelClient client, String repoName, boolean readOnly) throws IOException { | ||
assertThat(client.snapshot().createRepository(new PutRepositoryRequest(repoName).type("fs").settings( | ||
Settings.builder().put("location", "./" + repoName).put("readonly", readOnly)), RequestOptions.DEFAULT).isAcknowledged(), | ||
is(true)); | ||
} | ||
|
||
private static void createSnapshot(RestHighLevelClient client, String repoName, String name) throws IOException { | ||
client.snapshot().create(new CreateSnapshotRequest(repoName, name).waitForCompletion(true), RequestOptions.DEFAULT); | ||
} | ||
|
||
private void createIndex(RestHighLevelClient client, String name, int shards) throws IOException { | ||
final Request putIndexRequest = new Request("PUT", "/" + name); | ||
putIndexRequest.setJsonEntity("{\n" + | ||
" \"settings\" : {\n" + | ||
" \"index\" : {\n" + | ||
" \"number_of_shards\" : " + shards + ", \n" + | ||
" \"number_of_replicas\" : 0 \n" + | ||
" }\n" + | ||
" }\n" + | ||
"}"); | ||
final Response response = client.getLowLevelClient().performRequest(putIndexRequest); | ||
assertThat(response.getStatusLine().getStatusCode(), is(HttpURLConnection.HTTP_OK)); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why 2 nodes? Isn't one node sufficient?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's probably pointless paranoia but I figured if I add all these tests, why not also cover the over the wire serialization in this scenario as well (I can't see how it would be important but who knows ... could be we're tripping some assertion in a
Streamable
constructor or so ... we have beefy CI machines so why not? :)).