Skip to content

Commit e244d65

Browse files
Fix Snapshot Corruption in Edge Case (#47552)
This fixes missing to marking shard snapshots as failures when multiple data-nodes are lost during the snapshot process or shard snapshot failures have occured before a node left the cluster. The problem was that we were simply not adding any shard entries for completed shards on node-left events. This has no effect for a successful shard, but for a failed shard would lead to that shard not being marked as failed during snapshot finalization. Fixed by corectly keeping track of all previous completed shard states as well in this case. Also, added an assertion that without this fix would trip on almost every run of the resiliency tests and adjusted the serialization of SnapshotsInProgress.Entry so we have a proper assertion message. Closes #47550
1 parent d683b20 commit e244d65

File tree

2 files changed

+56
-43
lines changed

2 files changed

+56
-43
lines changed

server/src/main/java/org/elasticsearch/cluster/SnapshotsInProgress.java

+47-39
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import org.elasticsearch.Version;
2626
import org.elasticsearch.cluster.ClusterState.Custom;
2727
import org.elasticsearch.common.Nullable;
28+
import org.elasticsearch.common.Strings;
2829
import org.elasticsearch.common.collect.ImmutableOpenMap;
2930
import org.elasticsearch.common.io.stream.StreamInput;
3031
import org.elasticsearch.common.io.stream.StreamOutput;
@@ -77,7 +78,7 @@ public String toString() {
7778
return builder.append("]").toString();
7879
}
7980

80-
public static class Entry {
81+
public static class Entry implements ToXContent {
8182
private final State state;
8283
private final Snapshot snapshot;
8384
private final boolean includeGlobalState;
@@ -211,7 +212,50 @@ public int hashCode() {
211212

212213
@Override
213214
public String toString() {
214-
return snapshot.toString();
215+
return Strings.toString(this);
216+
}
217+
218+
@Override
219+
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
220+
builder.startObject();
221+
builder.field(REPOSITORY, snapshot.getRepository());
222+
builder.field(SNAPSHOT, snapshot.getSnapshotId().getName());
223+
builder.field(UUID, snapshot.getSnapshotId().getUUID());
224+
builder.field(INCLUDE_GLOBAL_STATE, includeGlobalState());
225+
builder.field(PARTIAL, partial);
226+
builder.field(STATE, state);
227+
builder.startArray(INDICES);
228+
{
229+
for (IndexId index : indices) {
230+
index.toXContent(builder, params);
231+
}
232+
}
233+
builder.endArray();
234+
builder.humanReadableField(START_TIME_MILLIS, START_TIME, new TimeValue(startTime));
235+
builder.field(REPOSITORY_STATE_ID, repositoryStateId);
236+
builder.startArray(SHARDS);
237+
{
238+
for (ObjectObjectCursor<ShardId, ShardSnapshotStatus> shardEntry : shards) {
239+
ShardId shardId = shardEntry.key;
240+
ShardSnapshotStatus status = shardEntry.value;
241+
builder.startObject();
242+
{
243+
builder.field(INDEX, shardId.getIndex());
244+
builder.field(SHARD, shardId.getId());
245+
builder.field(STATE, status.state());
246+
builder.field(NODE, status.nodeId());
247+
}
248+
builder.endObject();
249+
}
250+
}
251+
builder.endArray();
252+
builder.endObject();
253+
return builder;
254+
}
255+
256+
@Override
257+
public boolean isFragment() {
258+
return false;
215259
}
216260

217261
private ImmutableOpenMap<String, List<ShardId>> findWaitingIndices(ImmutableOpenMap<ShardId, ShardSnapshotStatus> shards) {
@@ -506,48 +550,12 @@ public void writeTo(StreamOutput out) throws IOException {
506550
public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params params) throws IOException {
507551
builder.startArray(SNAPSHOTS);
508552
for (Entry entry : entries) {
509-
toXContent(entry, builder, params);
553+
entry.toXContent(builder, params);
510554
}
511555
builder.endArray();
512556
return builder;
513557
}
514558

515-
public void toXContent(Entry entry, XContentBuilder builder, ToXContent.Params params) throws IOException {
516-
builder.startObject();
517-
builder.field(REPOSITORY, entry.snapshot().getRepository());
518-
builder.field(SNAPSHOT, entry.snapshot().getSnapshotId().getName());
519-
builder.field(UUID, entry.snapshot().getSnapshotId().getUUID());
520-
builder.field(INCLUDE_GLOBAL_STATE, entry.includeGlobalState());
521-
builder.field(PARTIAL, entry.partial());
522-
builder.field(STATE, entry.state());
523-
builder.startArray(INDICES);
524-
{
525-
for (IndexId index : entry.indices()) {
526-
index.toXContent(builder, params);
527-
}
528-
}
529-
builder.endArray();
530-
builder.humanReadableField(START_TIME_MILLIS, START_TIME, new TimeValue(entry.startTime()));
531-
builder.field(REPOSITORY_STATE_ID, entry.getRepositoryStateId());
532-
builder.startArray(SHARDS);
533-
{
534-
for (ObjectObjectCursor<ShardId, ShardSnapshotStatus> shardEntry : entry.shards) {
535-
ShardId shardId = shardEntry.key;
536-
ShardSnapshotStatus status = shardEntry.value;
537-
builder.startObject();
538-
{
539-
builder.field(INDEX, shardId.getIndex());
540-
builder.field(SHARD, shardId.getId());
541-
builder.field(STATE, status.state());
542-
builder.field(NODE, status.nodeId());
543-
}
544-
builder.endObject();
545-
}
546-
}
547-
builder.endArray();
548-
builder.endObject();
549-
}
550-
551559
public enum ShardState {
552560
INIT((byte) 0, false, false),
553561
SUCCESS((byte) 2, true, false),

server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java

+9-4
Original file line numberDiff line numberDiff line change
@@ -785,19 +785,22 @@ public ClusterState execute(ClusterState currentState) {
785785
ImmutableOpenMap.Builder<ShardId, ShardSnapshotStatus> shards = ImmutableOpenMap.builder();
786786
boolean snapshotChanged = false;
787787
for (ObjectObjectCursor<ShardId, ShardSnapshotStatus> shardEntry : snapshot.shards()) {
788-
ShardSnapshotStatus shardStatus = shardEntry.value;
788+
final ShardSnapshotStatus shardStatus = shardEntry.value;
789+
final ShardId shardId = shardEntry.key;
789790
if (!shardStatus.state().completed() && shardStatus.nodeId() != null) {
790791
if (nodes.nodeExists(shardStatus.nodeId())) {
791-
shards.put(shardEntry.key, shardEntry.value);
792+
shards.put(shardId, shardStatus);
792793
} else {
793794
// TODO: Restart snapshot on another node?
794795
snapshotChanged = true;
795796
logger.warn("failing snapshot of shard [{}] on closed node [{}]",
796-
shardEntry.key, shardStatus.nodeId());
797-
shards.put(shardEntry.key,
797+
shardId, shardStatus.nodeId());
798+
shards.put(shardId,
798799
new ShardSnapshotStatus(shardStatus.nodeId(), ShardState.FAILED, "node shutdown",
799800
shardStatus.generation()));
800801
}
802+
} else {
803+
shards.put(shardId, shardStatus);
801804
}
802805
}
803806
if (snapshotChanged) {
@@ -829,6 +832,8 @@ public void onFailure(Exception e) {
829832
}
830833
}, updatedSnapshot.getRepositoryStateId(), false);
831834
}
835+
assert updatedSnapshot.shards().size() == snapshot.shards().size()
836+
: "Shard count changed during snapshot status update from [" + snapshot + "] to [" + updatedSnapshot + "]";
832837
}
833838
if (changed) {
834839
return ClusterState.builder(currentState)

0 commit comments

Comments
 (0)