Skip to content

Commit 33ae4e6

Browse files
committed
Snapshot/Restore: Ensure that shard failure reasons are correctly stored in CS (#25941)
The failure reason for snapshot shard failures might not be propagated properly if the master node changes after the errors were reported by other data nodes. This commits ensures that the snapshot shard failure reason is preserved properly and adds workaround for reading old snapshot files where this information might not have been preserved. Closes #25878
1 parent 9abaf49 commit 33ae4e6

9 files changed

+131
-17
lines changed

core/src/main/java/org/elasticsearch/cluster/SnapshotsInProgress.java

Lines changed: 20 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -253,6 +253,8 @@ public ShardSnapshotStatus(String nodeId, State state, String reason) {
253253
this.nodeId = nodeId;
254254
this.state = state;
255255
this.reason = reason;
256+
// If the state is failed we have to have a reason for this failure
257+
assert state.failed() == false || reason != null;
256258
}
257259

258260
public ShardSnapshotStatus(StreamInput in) throws IOException {
@@ -413,9 +415,17 @@ public SnapshotsInProgress(StreamInput in) throws IOException {
413415
int shards = in.readVInt();
414416
for (int j = 0; j < shards; j++) {
415417
ShardId shardId = ShardId.readShardId(in);
416-
String nodeId = in.readOptionalString();
417-
State shardState = State.fromValue(in.readByte());
418-
builder.put(shardId, new ShardSnapshotStatus(nodeId, shardState));
418+
// TODO: Change this to an appropriate version when it's backported
419+
if (in.getVersion().onOrAfter(Version.V_6_0_0_beta1)) {
420+
builder.put(shardId, new ShardSnapshotStatus(in));
421+
} else {
422+
String nodeId = in.readOptionalString();
423+
State shardState = State.fromValue(in.readByte());
424+
// Workaround for https://github.com/elastic/elasticsearch/issues/25878
425+
// Some old snapshot might still have null in shard failure reasons
426+
String reason = shardState.failed() ? "" : null;
427+
builder.put(shardId, new ShardSnapshotStatus(nodeId, shardState, reason));
428+
}
419429
}
420430
long repositoryStateId = UNDEFINED_REPOSITORY_STATE_ID;
421431
if (in.getVersion().onOrAfter(REPOSITORY_ID_INTRODUCED_VERSION)) {
@@ -449,8 +459,13 @@ public void writeTo(StreamOutput out) throws IOException {
449459
out.writeVInt(entry.shards().size());
450460
for (ObjectObjectCursor<ShardId, ShardSnapshotStatus> shardEntry : entry.shards()) {
451461
shardEntry.key.writeTo(out);
452-
out.writeOptionalString(shardEntry.value.nodeId());
453-
out.writeByte(shardEntry.value.state().value());
462+
// TODO: Change this to an appropriate version when it's backported
463+
if (out.getVersion().onOrAfter(Version.V_6_0_0_beta1)) {
464+
shardEntry.value.writeTo(out);
465+
} else {
466+
out.writeOptionalString(shardEntry.value.nodeId());
467+
out.writeByte(shardEntry.value.state().value());
468+
}
454469
}
455470
if (out.getVersion().onOrAfter(REPOSITORY_ID_INTRODUCED_VERSION)) {
456471
out.writeLong(entry.repositoryStateId);

core/src/main/java/org/elasticsearch/snapshots/SnapshotShardFailure.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@ public SnapshotShardFailure(@Nullable String nodeId, ShardId shardId, String rea
6262
this.nodeId = nodeId;
6363
this.shardId = shardId;
6464
this.reason = reason;
65+
assert reason != null;
6566
status = RestStatus.INTERNAL_SERVER_ERROR;
6667
}
6768

@@ -192,7 +193,9 @@ public static SnapshotShardFailure fromXContent(XContentParser parser) throws IO
192193
} else if ("node_id".equals(currentFieldName)) {
193194
snapshotShardFailure.nodeId = parser.text();
194195
} else if ("reason".equals(currentFieldName)) {
195-
snapshotShardFailure.reason = parser.text();
196+
// Workaround for https://github.com/elastic/elasticsearch/issues/25878
197+
// Some old snapshot might still have null in shard failure reasons
198+
snapshotShardFailure.reason = parser.textOrNull();
196199
} else if ("shard_id".equals(currentFieldName)) {
197200
shardId = parser.intValue();
198201
} else if ("status".equals(currentFieldName)) {
@@ -215,6 +218,11 @@ public static SnapshotShardFailure fromXContent(XContentParser parser) throws IO
215218
throw new ElasticsearchParseException("index shard was not set");
216219
}
217220
snapshotShardFailure.shardId = new ShardId(index, index_uuid, shardId);
221+
// Workaround for https://github.com/elastic/elasticsearch/issues/25878
222+
// Some old snapshot might still have null in shard failure reasons
223+
if (snapshotShardFailure.reason == null) {
224+
snapshotShardFailure.reason = "";
225+
}
218226
return snapshotShardFailure;
219227
}
220228

core/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1128,7 +1128,8 @@ public ClusterState execute(ClusterState currentState) throws Exception {
11281128
for (ObjectObjectCursor<ShardId, ShardSnapshotStatus> shardEntry : snapshotEntry.shards()) {
11291129
ShardSnapshotStatus status = shardEntry.value;
11301130
if (!status.state().completed()) {
1131-
shardsBuilder.put(shardEntry.key, new ShardSnapshotStatus(status.nodeId(), State.ABORTED));
1131+
shardsBuilder.put(shardEntry.key, new ShardSnapshotStatus(status.nodeId(), State.ABORTED,
1132+
"aborted by snapshot deletion"));
11321133
} else {
11331134
shardsBuilder.put(shardEntry.key, status);
11341135
}

core/src/test/java/org/elasticsearch/cluster/SnapshotsInProgressTests.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -57,12 +57,12 @@ public void testWaitingIndices() {
5757
// test more than one waiting shard in an index
5858
shards.put(new ShardId(idx1Name, idx1UUID, 0), new ShardSnapshotStatus(randomAlphaOfLength(2), State.WAITING));
5959
shards.put(new ShardId(idx1Name, idx1UUID, 1), new ShardSnapshotStatus(randomAlphaOfLength(2), State.WAITING));
60-
shards.put(new ShardId(idx1Name, idx1UUID, 2), new ShardSnapshotStatus(randomAlphaOfLength(2), randomNonWaitingState()));
60+
shards.put(new ShardId(idx1Name, idx1UUID, 2), new ShardSnapshotStatus(randomAlphaOfLength(2), randomNonWaitingState(), ""));
6161
// test exactly one waiting shard in an index
6262
shards.put(new ShardId(idx2Name, idx2UUID, 0), new ShardSnapshotStatus(randomAlphaOfLength(2), State.WAITING));
63-
shards.put(new ShardId(idx2Name, idx2UUID, 1), new ShardSnapshotStatus(randomAlphaOfLength(2), randomNonWaitingState()));
63+
shards.put(new ShardId(idx2Name, idx2UUID, 1), new ShardSnapshotStatus(randomAlphaOfLength(2), randomNonWaitingState(), ""));
6464
// test no waiting shards in an index
65-
shards.put(new ShardId(idx3Name, idx3UUID, 0), new ShardSnapshotStatus(randomAlphaOfLength(2), randomNonWaitingState()));
65+
shards.put(new ShardId(idx3Name, idx3UUID, 0), new ShardSnapshotStatus(randomAlphaOfLength(2), randomNonWaitingState(), ""));
6666
Entry entry = new Entry(snapshot, randomBoolean(), randomBoolean(), State.INIT,
6767
indices, System.currentTimeMillis(), randomLong(), shards.build());
6868

core/src/test/java/org/elasticsearch/snapshots/AbstractSnapshotIntegTestCase.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -128,13 +128,20 @@ public SnapshotInfo waitForCompletion(String repository, String snapshotName, Ti
128128
return null;
129129
}
130130

131-
public static String blockMasterFromFinalizingSnapshot(final String repositoryName) {
131+
public static String blockMasterFromFinalizingSnapshotOnIndexFile(final String repositoryName) {
132132
final String masterName = internalCluster().getMasterName();
133133
((MockRepository)internalCluster().getInstance(RepositoriesService.class, masterName)
134134
.repository(repositoryName)).setBlockOnWriteIndexFile(true);
135135
return masterName;
136136
}
137137

138+
public static String blockMasterFromFinalizingSnapshotOnSnapFile(final String repositoryName) {
139+
final String masterName = internalCluster().getMasterName();
140+
((MockRepository)internalCluster().getInstance(RepositoriesService.class, masterName)
141+
.repository(repositoryName)).setBlockAndFailOnWriteSnapFiles(true);
142+
return masterName;
143+
}
144+
138145
public static String blockNodeWithIndex(final String repositoryName, final String indexName) {
139146
for(String node : internalCluster().nodesInclude(indexName)) {
140147
((MockRepository)internalCluster().getInstance(RepositoriesService.class, node).repository(repositoryName))

core/src/test/java/org/elasticsearch/snapshots/DedicatedClusterSnapshotRestoreIT.java

Lines changed: 62 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -767,6 +767,67 @@ public void testMasterShutdownDuringSnapshot() throws Exception {
767767
assertEquals(0, snapshotInfo.failedShards());
768768
}
769769

770+
771+
public void testMasterAndDataShutdownDuringSnapshot() throws Exception {
772+
logger.info("--> starting three master nodes and two data nodes");
773+
internalCluster().startMasterOnlyNodes(3);
774+
internalCluster().startDataOnlyNodes(2);
775+
776+
final Client client = client();
777+
778+
logger.info("--> creating repository");
779+
assertAcked(client.admin().cluster().preparePutRepository("test-repo")
780+
.setType("mock").setSettings(Settings.builder()
781+
.put("location", randomRepoPath())
782+
.put("compress", randomBoolean())
783+
.put("chunk_size", randomIntBetween(100, 1000), ByteSizeUnit.BYTES)));
784+
785+
assertAcked(prepareCreate("test-idx", 0, Settings.builder().put("number_of_shards", between(1, 20))
786+
.put("number_of_replicas", 0)));
787+
ensureGreen();
788+
789+
logger.info("--> indexing some data");
790+
final int numdocs = randomIntBetween(10, 100);
791+
IndexRequestBuilder[] builders = new IndexRequestBuilder[numdocs];
792+
for (int i = 0; i < builders.length; i++) {
793+
builders[i] = client().prepareIndex("test-idx", "type1", Integer.toString(i)).setSource("field1", "bar " + i);
794+
}
795+
indexRandom(true, builders);
796+
flushAndRefresh();
797+
798+
final int numberOfShards = getNumShards("test-idx").numPrimaries;
799+
logger.info("number of shards: {}", numberOfShards);
800+
801+
final String masterNode = blockMasterFromFinalizingSnapshotOnSnapFile("test-repo");
802+
final String dataNode = blockNodeWithIndex("test-repo", "test-idx");
803+
804+
dataNodeClient().admin().cluster().prepareCreateSnapshot("test-repo", "test-snap").setWaitForCompletion(false).setIndices("test-idx").get();
805+
806+
logger.info("--> stopping data node {}", dataNode);
807+
stopNode(dataNode);
808+
logger.info("--> stopping master node {} ", masterNode);
809+
internalCluster().stopCurrentMasterNode();
810+
811+
logger.info("--> wait until the snapshot is done");
812+
813+
assertBusy(() -> {
814+
GetSnapshotsResponse snapshotsStatusResponse = client().admin().cluster().prepareGetSnapshots("test-repo").setSnapshots("test-snap").get();
815+
SnapshotInfo snapshotInfo = snapshotsStatusResponse.getSnapshots().get(0);
816+
assertTrue(snapshotInfo.state().completed());
817+
}, 1, TimeUnit.MINUTES);
818+
819+
logger.info("--> verify that snapshot was partial");
820+
821+
GetSnapshotsResponse snapshotsStatusResponse = client().admin().cluster().prepareGetSnapshots("test-repo").setSnapshots("test-snap").get();
822+
SnapshotInfo snapshotInfo = snapshotsStatusResponse.getSnapshots().get(0);
823+
assertEquals(SnapshotState.PARTIAL, snapshotInfo.state());
824+
assertNotEquals(snapshotInfo.totalShards(), snapshotInfo.successfulShards());
825+
assertThat(snapshotInfo.failedShards(), greaterThan(0));
826+
for (SnapshotShardFailure failure : snapshotInfo.shardFailures()) {
827+
assertNotNull(failure.reason());
828+
}
829+
}
830+
770831
@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/25281")
771832
public void testMasterShutdownDuringFailedSnapshot() throws Exception {
772833
logger.info("--> starting two master nodes and two data nodes");
@@ -800,7 +861,7 @@ public void testMasterShutdownDuringFailedSnapshot() throws Exception {
800861
assertEquals(ClusterHealthStatus.RED, client().admin().cluster().prepareHealth().get().getStatus()),
801862
30, TimeUnit.SECONDS);
802863

803-
final String masterNode = blockMasterFromFinalizingSnapshot("test-repo");
864+
final String masterNode = blockMasterFromFinalizingSnapshotOnIndexFile("test-repo");
804865

805866
logger.info("--> snapshot");
806867
client().admin().cluster().prepareCreateSnapshot("test-repo", "test-snap")

core/src/test/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2252,9 +2252,9 @@ public void testDeleteOrphanSnapshot() throws Exception {
22522252
public ClusterState execute(ClusterState currentState) {
22532253
// Simulate orphan snapshot
22542254
ImmutableOpenMap.Builder<ShardId, ShardSnapshotStatus> shards = ImmutableOpenMap.builder();
2255-
shards.put(new ShardId(idxName, "_na_", 0), new ShardSnapshotStatus("unknown-node", State.ABORTED));
2256-
shards.put(new ShardId(idxName, "_na_", 1), new ShardSnapshotStatus("unknown-node", State.ABORTED));
2257-
shards.put(new ShardId(idxName, "_na_", 2), new ShardSnapshotStatus("unknown-node", State.ABORTED));
2255+
shards.put(new ShardId(idxName, "_na_", 0), new ShardSnapshotStatus("unknown-node", State.ABORTED, "aborted"));
2256+
shards.put(new ShardId(idxName, "_na_", 1), new ShardSnapshotStatus("unknown-node", State.ABORTED, "aborted"));
2257+
shards.put(new ShardId(idxName, "_na_", 2), new ShardSnapshotStatus("unknown-node", State.ABORTED, "aborted"));
22582258
List<Entry> entries = new ArrayList<>();
22592259
entries.add(new Entry(new Snapshot(repositoryName,
22602260
createSnapshotResponse.getSnapshotInfo().snapshotId()),

core/src/test/java/org/elasticsearch/snapshots/SnapshotsInProgressSerializationTests.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,8 @@ private Entry randomSnapshot() {
6666
ShardId shardId = new ShardId(new Index(randomAlphaOfLength(10), randomAlphaOfLength(10)), randomIntBetween(0, 10));
6767
String nodeId = randomAlphaOfLength(10);
6868
State shardState = randomFrom(State.values());
69-
builder.put(shardId, new SnapshotsInProgress.ShardSnapshotStatus(nodeId, shardState));
69+
builder.put(shardId, new SnapshotsInProgress.ShardSnapshotStatus(nodeId, shardState,
70+
shardState.failed() ? randomAlphaOfLength(10) : null));
7071
}
7172
ImmutableOpenMap<ShardId, SnapshotsInProgress.ShardSnapshotStatus> shards = builder.build();
7273
return new Entry(snapshot, includeGlobalState, partial, state, indices, startTime, repositoryStateId, shards);

core/src/test/java/org/elasticsearch/snapshots/mockstore/MockRepository.java

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,9 @@ public long getFailureCount() {
104104
* finalization of a snapshot, while permitting other IO operations to proceed unblocked. */
105105
private volatile boolean blockOnWriteIndexFile;
106106

107+
/** Allows blocking on writing the snapshot file at the end of snapshot creation to simulate a died master node */
108+
private volatile boolean blockAndFailOnWriteSnapFile;
109+
107110
private volatile boolean atomicMove;
108111

109112
private volatile boolean blocked = false;
@@ -118,6 +121,7 @@ public MockRepository(RepositoryMetaData metadata, Environment environment,
118121
blockOnControlFiles = metadata.settings().getAsBoolean("block_on_control", false);
119122
blockOnDataFiles = metadata.settings().getAsBoolean("block_on_data", false);
120123
blockOnInitialization = metadata.settings().getAsBoolean("block_on_init", false);
124+
blockAndFailOnWriteSnapFile = metadata.settings().getAsBoolean("block_on_snap", false);
121125
randomPrefix = metadata.settings().get("random", "default");
122126
waitAfterUnblock = metadata.settings().getAsLong("wait_after_unblock", 0L);
123127
atomicMove = metadata.settings().getAsBoolean("atomic_move", true);
@@ -168,13 +172,18 @@ public synchronized void unblock() {
168172
blockOnControlFiles = false;
169173
blockOnInitialization = false;
170174
blockOnWriteIndexFile = false;
175+
blockAndFailOnWriteSnapFile = false;
171176
this.notifyAll();
172177
}
173178

174179
public void blockOnDataFiles(boolean blocked) {
175180
blockOnDataFiles = blocked;
176181
}
177182

183+
public void setBlockAndFailOnWriteSnapFiles(boolean blocked) {
184+
blockAndFailOnWriteSnapFile = blocked;
185+
}
186+
178187
public void setBlockOnWriteIndexFile(boolean blocked) {
179188
blockOnWriteIndexFile = blocked;
180189
}
@@ -187,7 +196,8 @@ private synchronized boolean blockExecution() {
187196
logger.debug("Blocking execution");
188197
boolean wasBlocked = false;
189198
try {
190-
while (blockOnDataFiles || blockOnControlFiles || blockOnInitialization || blockOnWriteIndexFile) {
199+
while (blockOnDataFiles || blockOnControlFiles || blockOnInitialization || blockOnWriteIndexFile ||
200+
blockAndFailOnWriteSnapFile) {
191201
blocked = true;
192202
this.wait();
193203
wasBlocked = true;
@@ -266,6 +276,8 @@ private void maybeIOExceptionOrBlock(String blobName) throws IOException {
266276
throw new IOException("Random IOException");
267277
} else if (blockOnControlFiles) {
268278
blockExecutionAndMaybeWait(blobName);
279+
} else if (blobName.startsWith("snap-") && blockAndFailOnWriteSnapFile) {
280+
blockExecutionAndFail(blobName);
269281
}
270282
}
271283
}
@@ -283,6 +295,15 @@ private void blockExecutionAndMaybeWait(final String blobName) {
283295
}
284296
}
285297

298+
/**
299+
* Blocks an I/O operation on the blob fails and throws an exception when unblocked
300+
*/
301+
private void blockExecutionAndFail(final String blobName) throws IOException {
302+
logger.info("blocking I/O operation for file [{}] at path [{}]", blobName, path());
303+
blockExecution();
304+
throw new IOException("exception after block");
305+
}
306+
286307
MockBlobContainer(BlobContainer delegate) {
287308
super(delegate);
288309
}

0 commit comments

Comments
 (0)