Skip to content

Commit 6c9687b

Browse files
Fix Snapshot Corruption in Edge Case (#47552) (#47621)
This fixes missing to marking shard snapshots as failures when multiple data-nodes are lost during the snapshot process or shard snapshot failures have occured before a node left the cluster. The problem was that we were simply not adding any shard entries for completed shards on node-left events. This has no effect for a successful shard, but for a failed shard would lead to that shard not being marked as failed during snapshot finalization. Fixed by corectly keeping track of all previous completed shard states as well in this case. Also, added an assertion that without this fix would trip on almost every run of the resiliency tests and adjusted the serialization of SnapshotsInProgress.Entry so we have a proper assertion message. Closes #47550
1 parent 29c1424 commit 6c9687b

File tree

2 files changed

+56
-43
lines changed

2 files changed

+56
-43
lines changed

server/src/main/java/org/elasticsearch/cluster/SnapshotsInProgress.java

+47-39
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import org.elasticsearch.Version;
2626
import org.elasticsearch.cluster.ClusterState.Custom;
2727
import org.elasticsearch.common.Nullable;
28+
import org.elasticsearch.common.Strings;
2829
import org.elasticsearch.common.collect.ImmutableOpenMap;
2930
import org.elasticsearch.common.io.stream.StreamInput;
3031
import org.elasticsearch.common.io.stream.StreamOutput;
@@ -76,7 +77,7 @@ public String toString() {
7677
return builder.append("]").toString();
7778
}
7879

79-
public static class Entry {
80+
public static class Entry implements ToXContent {
8081
private final State state;
8182
private final Snapshot snapshot;
8283
private final boolean includeGlobalState;
@@ -210,7 +211,50 @@ public int hashCode() {
210211

211212
@Override
212213
public String toString() {
213-
return snapshot.toString();
214+
return Strings.toString(this);
215+
}
216+
217+
@Override
218+
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
219+
builder.startObject();
220+
builder.field(REPOSITORY, snapshot.getRepository());
221+
builder.field(SNAPSHOT, snapshot.getSnapshotId().getName());
222+
builder.field(UUID, snapshot.getSnapshotId().getUUID());
223+
builder.field(INCLUDE_GLOBAL_STATE, includeGlobalState());
224+
builder.field(PARTIAL, partial);
225+
builder.field(STATE, state);
226+
builder.startArray(INDICES);
227+
{
228+
for (IndexId index : indices) {
229+
index.toXContent(builder, params);
230+
}
231+
}
232+
builder.endArray();
233+
builder.humanReadableField(START_TIME_MILLIS, START_TIME, new TimeValue(startTime));
234+
builder.field(REPOSITORY_STATE_ID, repositoryStateId);
235+
builder.startArray(SHARDS);
236+
{
237+
for (ObjectObjectCursor<ShardId, ShardSnapshotStatus> shardEntry : shards) {
238+
ShardId shardId = shardEntry.key;
239+
ShardSnapshotStatus status = shardEntry.value;
240+
builder.startObject();
241+
{
242+
builder.field(INDEX, shardId.getIndex());
243+
builder.field(SHARD, shardId.getId());
244+
builder.field(STATE, status.state());
245+
builder.field(NODE, status.nodeId());
246+
}
247+
builder.endObject();
248+
}
249+
}
250+
builder.endArray();
251+
builder.endObject();
252+
return builder;
253+
}
254+
255+
@Override
256+
public boolean isFragment() {
257+
return false;
214258
}
215259

216260
private ImmutableOpenMap<String, List<ShardId>> findWaitingIndices(ImmutableOpenMap<ShardId, ShardSnapshotStatus> shards) {
@@ -507,48 +551,12 @@ public void writeTo(StreamOutput out) throws IOException {
507551
public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params params) throws IOException {
508552
builder.startArray(SNAPSHOTS);
509553
for (Entry entry : entries) {
510-
toXContent(entry, builder, params);
554+
entry.toXContent(builder, params);
511555
}
512556
builder.endArray();
513557
return builder;
514558
}
515559

516-
public void toXContent(Entry entry, XContentBuilder builder, ToXContent.Params params) throws IOException {
517-
builder.startObject();
518-
builder.field(REPOSITORY, entry.snapshot().getRepository());
519-
builder.field(SNAPSHOT, entry.snapshot().getSnapshotId().getName());
520-
builder.field(UUID, entry.snapshot().getSnapshotId().getUUID());
521-
builder.field(INCLUDE_GLOBAL_STATE, entry.includeGlobalState());
522-
builder.field(PARTIAL, entry.partial());
523-
builder.field(STATE, entry.state());
524-
builder.startArray(INDICES);
525-
{
526-
for (IndexId index : entry.indices()) {
527-
index.toXContent(builder, params);
528-
}
529-
}
530-
builder.endArray();
531-
builder.humanReadableField(START_TIME_MILLIS, START_TIME, new TimeValue(entry.startTime()));
532-
builder.field(REPOSITORY_STATE_ID, entry.getRepositoryStateId());
533-
builder.startArray(SHARDS);
534-
{
535-
for (ObjectObjectCursor<ShardId, ShardSnapshotStatus> shardEntry : entry.shards) {
536-
ShardId shardId = shardEntry.key;
537-
ShardSnapshotStatus status = shardEntry.value;
538-
builder.startObject();
539-
{
540-
builder.field(INDEX, shardId.getIndex());
541-
builder.field(SHARD, shardId.getId());
542-
builder.field(STATE, status.state());
543-
builder.field(NODE, status.nodeId());
544-
}
545-
builder.endObject();
546-
}
547-
}
548-
builder.endArray();
549-
builder.endObject();
550-
}
551-
552560
public enum ShardState {
553561
INIT((byte) 0, false, false),
554562
SUCCESS((byte) 2, true, false),

server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java

+9-4
Original file line numberDiff line numberDiff line change
@@ -764,18 +764,21 @@ public ClusterState execute(ClusterState currentState) {
764764
ImmutableOpenMap.Builder<ShardId, ShardSnapshotStatus> shards = ImmutableOpenMap.builder();
765765
boolean snapshotChanged = false;
766766
for (ObjectObjectCursor<ShardId, ShardSnapshotStatus> shardEntry : snapshot.shards()) {
767-
ShardSnapshotStatus shardStatus = shardEntry.value;
767+
final ShardSnapshotStatus shardStatus = shardEntry.value;
768+
final ShardId shardId = shardEntry.key;
768769
if (!shardStatus.state().completed() && shardStatus.nodeId() != null) {
769770
if (nodes.nodeExists(shardStatus.nodeId())) {
770-
shards.put(shardEntry.key, shardEntry.value);
771+
shards.put(shardId, shardStatus);
771772
} else {
772773
// TODO: Restart snapshot on another node?
773774
snapshotChanged = true;
774775
logger.warn("failing snapshot of shard [{}] on closed node [{}]",
775-
shardEntry.key, shardStatus.nodeId());
776-
shards.put(shardEntry.key,
776+
shardId, shardStatus.nodeId());
777+
shards.put(shardId,
777778
new ShardSnapshotStatus(shardStatus.nodeId(), ShardState.FAILED, "node shutdown"));
778779
}
780+
} else {
781+
shards.put(shardId, shardStatus);
779782
}
780783
}
781784
if (snapshotChanged) {
@@ -807,6 +810,8 @@ public void onFailure(Exception e) {
807810
}
808811
}, updatedSnapshot.getRepositoryStateId(), false);
809812
}
813+
assert updatedSnapshot.shards().size() == snapshot.shards().size()
814+
: "Shard count changed during snapshot status update from [" + snapshot + "] to [" + updatedSnapshot + "]";
810815
}
811816
if (changed) {
812817
return ClusterState.builder(currentState)

0 commit comments

Comments
 (0)