Skip to content

Commit 2ddd39a

Browse files
Introduce ShardState Enum + Slight Cleanup SnapshotsInProgress (elastic#41940)
* Added separate enum for the state of each shard, it was really confusing that we used the same enum for the state of the snapshot overall and the state of each individual shard * relates elastic#40943 (comment) * Shortened some obvious spots in equals method and saved a few lines via `computeIfAbsent` to make up for adding 50 new lines to this class
1 parent 464f769 commit 2ddd39a

File tree

7 files changed

+95
-58
lines changed

7 files changed

+95
-58
lines changed

server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/status/TransportSnapshotsStatusAction.java

-1
Original file line numberDiff line numberDiff line change
@@ -174,7 +174,6 @@ private SnapshotsStatusResponse buildResponse(SnapshotsStatusRequest request, Li
174174
break;
175175
case INIT:
176176
case WAITING:
177-
case STARTED:
178177
stage = SnapshotIndexShardStage.STARTED;
179178
break;
180179
case SUCCESS:

server/src/main/java/org/elasticsearch/cluster/SnapshotsInProgress.java

+64-34
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242
import java.util.HashMap;
4343
import java.util.List;
4444
import java.util.Map;
45+
import java.util.Objects;
4546

4647
/**
4748
* Meta data about snapshots that are currently executing
@@ -53,12 +54,7 @@ public class SnapshotsInProgress extends AbstractNamedDiffable<Custom> implement
5354
public boolean equals(Object o) {
5455
if (this == o) return true;
5556
if (o == null || getClass() != o.getClass()) return false;
56-
57-
SnapshotsInProgress that = (SnapshotsInProgress) o;
58-
59-
if (!entries.equals(that.entries)) return false;
60-
61-
return true;
57+
return entries.equals(((SnapshotsInProgress) o).entries);
6258
}
6359

6460
@Override
@@ -208,18 +204,11 @@ public String toString() {
208204
return snapshot.toString();
209205
}
210206

211-
// package private for testing
212-
ImmutableOpenMap<String, List<ShardId>> findWaitingIndices(ImmutableOpenMap<ShardId, ShardSnapshotStatus> shards) {
207+
private ImmutableOpenMap<String, List<ShardId>> findWaitingIndices(ImmutableOpenMap<ShardId, ShardSnapshotStatus> shards) {
213208
Map<String, List<ShardId>> waitingIndicesMap = new HashMap<>();
214209
for (ObjectObjectCursor<ShardId, ShardSnapshotStatus> entry : shards) {
215-
if (entry.value.state() == State.WAITING) {
216-
final String indexName = entry.key.getIndexName();
217-
List<ShardId> waitingShards = waitingIndicesMap.get(indexName);
218-
if (waitingShards == null) {
219-
waitingShards = new ArrayList<>();
220-
waitingIndicesMap.put(indexName, waitingShards);
221-
}
222-
waitingShards.add(entry.key);
210+
if (entry.value.state() == ShardState.WAITING) {
211+
waitingIndicesMap.computeIfAbsent(entry.key.getIndexName(), k -> new ArrayList<>()).add(entry.key);
223212
}
224213
}
225214
if (waitingIndicesMap.isEmpty()) {
@@ -241,28 +230,27 @@ ImmutableOpenMap<String, List<ShardId>> findWaitingIndices(ImmutableOpenMap<Shar
241230
*/
242231
public static boolean completed(ObjectContainer<ShardSnapshotStatus> shards) {
243232
for (ObjectCursor<ShardSnapshotStatus> status : shards) {
244-
if (status.value.state().completed() == false) {
233+
if (status.value.state().completed == false) {
245234
return false;
246235
}
247236
}
248237
return true;
249238
}
250239

251-
252240
public static class ShardSnapshotStatus {
253-
private final State state;
241+
private final ShardState state;
254242
private final String nodeId;
255243
private final String reason;
256244

257245
public ShardSnapshotStatus(String nodeId) {
258-
this(nodeId, State.INIT);
246+
this(nodeId, ShardState.INIT);
259247
}
260248

261-
public ShardSnapshotStatus(String nodeId, State state) {
249+
public ShardSnapshotStatus(String nodeId, ShardState state) {
262250
this(nodeId, state, null);
263251
}
264252

265-
public ShardSnapshotStatus(String nodeId, State state, String reason) {
253+
public ShardSnapshotStatus(String nodeId, ShardState state, String reason) {
266254
this.nodeId = nodeId;
267255
this.state = state;
268256
this.reason = reason;
@@ -272,11 +260,11 @@ public ShardSnapshotStatus(String nodeId, State state, String reason) {
272260

273261
public ShardSnapshotStatus(StreamInput in) throws IOException {
274262
nodeId = in.readOptionalString();
275-
state = State.fromValue(in.readByte());
263+
state = ShardState.fromValue(in.readByte());
276264
reason = in.readOptionalString();
277265
}
278266

279-
public State state() {
267+
public ShardState state() {
280268
return state;
281269
}
282270

@@ -298,14 +286,9 @@ public void writeTo(StreamOutput out) throws IOException {
298286
public boolean equals(Object o) {
299287
if (this == o) return true;
300288
if (o == null || getClass() != o.getClass()) return false;
301-
302289
ShardSnapshotStatus status = (ShardSnapshotStatus) o;
290+
return Objects.equals(nodeId, status.nodeId) && Objects.equals(reason, status.reason) && state == status.state;
303291

304-
if (nodeId != null ? !nodeId.equals(status.nodeId) : status.nodeId != null) return false;
305-
if (reason != null ? !reason.equals(status.reason) : status.reason != null) return false;
306-
if (state != status.state) return false;
307-
308-
return true;
309292
}
310293

311294
@Override
@@ -331,11 +314,11 @@ public enum State {
331314
MISSING((byte) 5, true, true),
332315
WAITING((byte) 6, false, false);
333316

334-
private byte value;
317+
private final byte value;
335318

336-
private boolean completed;
319+
private final boolean completed;
337320

338-
private boolean failed;
321+
private final boolean failed;
339322

340323
State(byte value, boolean completed, boolean failed) {
341324
this.value = value;
@@ -379,7 +362,6 @@ public static State fromValue(byte value) {
379362

380363
private final List<Entry> entries;
381364

382-
383365
public SnapshotsInProgress(List<Entry> entries) {
384366
this.entries = entries;
385367
}
@@ -534,4 +516,52 @@ public void toXContent(Entry entry, XContentBuilder builder, ToXContent.Params p
534516
builder.endArray();
535517
builder.endObject();
536518
}
519+
520+
public enum ShardState {
521+
INIT((byte) 0, false, false),
522+
SUCCESS((byte) 2, true, false),
523+
FAILED((byte) 3, true, true),
524+
ABORTED((byte) 4, false, true),
525+
MISSING((byte) 5, true, true),
526+
WAITING((byte) 6, false, false);
527+
528+
private final byte value;
529+
530+
private final boolean completed;
531+
532+
private final boolean failed;
533+
534+
ShardState(byte value, boolean completed, boolean failed) {
535+
this.value = value;
536+
this.completed = completed;
537+
this.failed = failed;
538+
}
539+
540+
public boolean completed() {
541+
return completed;
542+
}
543+
544+
public boolean failed() {
545+
return failed;
546+
}
547+
548+
public static ShardState fromValue(byte value) {
549+
switch (value) {
550+
case 0:
551+
return INIT;
552+
case 2:
553+
return SUCCESS;
554+
case 3:
555+
return FAILED;
556+
case 4:
557+
return ABORTED;
558+
case 5:
559+
return MISSING;
560+
case 6:
561+
return WAITING;
562+
default:
563+
throw new IllegalArgumentException("No shard snapshot state for value [" + value + "]");
564+
}
565+
}
566+
}
537567
}

server/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java

+8-4
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
import org.elasticsearch.cluster.ClusterStateTaskListener;
4040
import org.elasticsearch.cluster.SnapshotsInProgress;
4141
import org.elasticsearch.cluster.SnapshotsInProgress.ShardSnapshotStatus;
42+
import org.elasticsearch.cluster.SnapshotsInProgress.ShardState;
4243
import org.elasticsearch.cluster.SnapshotsInProgress.State;
4344
import org.elasticsearch.cluster.block.ClusterBlockException;
4445
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
@@ -248,7 +249,8 @@ private void startNewSnapshots(SnapshotsInProgress snapshotsInProgress) {
248249
// Add all new shards to start processing on
249250
final ShardId shardId = shard.key;
250251
final ShardSnapshotStatus shardSnapshotStatus = shard.value;
251-
if (localNodeId.equals(shardSnapshotStatus.nodeId()) && shardSnapshotStatus.state() == State.INIT
252+
if (localNodeId.equals(shardSnapshotStatus.nodeId())
253+
&& shardSnapshotStatus.state() == ShardState.INIT
252254
&& snapshotShards.containsKey(shardId) == false) {
253255
logger.trace("[{}] - Adding shard to the queue", shardId);
254256
if (startedShards == null) {
@@ -286,7 +288,7 @@ private void startNewSnapshots(SnapshotsInProgress snapshotsInProgress) {
286288
} else {
287289
// due to CS batching we might have missed the INIT state and straight went into ABORTED
288290
// notify master that abort has completed by moving to FAILED
289-
if (shard.value.state() == State.ABORTED) {
291+
if (shard.value.state() == ShardState.ABORTED) {
290292
notifyFailedSnapshotShard(snapshot, shard.key, shard.value.reason());
291293
}
292294
}
@@ -480,12 +482,14 @@ public String toString() {
480482

481483
/** Notify the master node that the given shard has been successfully snapshotted **/
482484
private void notifySuccessfulSnapshotShard(final Snapshot snapshot, final ShardId shardId) {
483-
sendSnapshotShardUpdate(snapshot, shardId, new ShardSnapshotStatus(clusterService.localNode().getId(), State.SUCCESS));
485+
sendSnapshotShardUpdate(snapshot, shardId,
486+
new ShardSnapshotStatus(clusterService.localNode().getId(), ShardState.SUCCESS));
484487
}
485488

486489
/** Notify the master node that the given shard failed to be snapshotted **/
487490
private void notifyFailedSnapshotShard(final Snapshot snapshot, final ShardId shardId, final String failure) {
488-
sendSnapshotShardUpdate(snapshot, shardId, new ShardSnapshotStatus(clusterService.localNode().getId(), State.FAILED, failure));
491+
sendSnapshotShardUpdate(snapshot, shardId,
492+
new ShardSnapshotStatus(clusterService.localNode().getId(), ShardState.FAILED, failure));
489493
}
490494

491495
/** Updates the shard snapshot status by sending a {@link UpdateIndexShardSnapshotStatusRequest} to the master node */

server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java

+11-10
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
import org.elasticsearch.cluster.SnapshotDeletionsInProgress;
3838
import org.elasticsearch.cluster.SnapshotsInProgress;
3939
import org.elasticsearch.cluster.SnapshotsInProgress.ShardSnapshotStatus;
40+
import org.elasticsearch.cluster.SnapshotsInProgress.ShardState;
4041
import org.elasticsearch.cluster.SnapshotsInProgress.State;
4142
import org.elasticsearch.cluster.metadata.IndexMetaData;
4243
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
@@ -776,7 +777,7 @@ public ClusterState execute(ClusterState currentState) {
776777
logger.warn("failing snapshot of shard [{}] on closed node [{}]",
777778
shardEntry.key, shardStatus.nodeId());
778779
shards.put(shardEntry.key,
779-
new ShardSnapshotStatus(shardStatus.nodeId(), State.FAILED, "node shutdown"));
780+
new ShardSnapshotStatus(shardStatus.nodeId(), ShardState.FAILED, "node shutdown"));
780781
}
781782
}
782783
}
@@ -872,7 +873,7 @@ private static ImmutableOpenMap<ShardId, ShardSnapshotStatus> processWaitingShar
872873
for (ObjectObjectCursor<ShardId, ShardSnapshotStatus> shardEntry : snapshotShards) {
873874
ShardSnapshotStatus shardStatus = shardEntry.value;
874875
ShardId shardId = shardEntry.key;
875-
if (shardStatus.state() == State.WAITING) {
876+
if (shardStatus.state() == ShardState.WAITING) {
876877
IndexRoutingTable indexShardRoutingTable = routingTable.index(shardId.getIndex());
877878
if (indexShardRoutingTable != null) {
878879
IndexShardRoutingTable shardRouting = indexShardRoutingTable.shard(shardId.id());
@@ -893,7 +894,7 @@ private static ImmutableOpenMap<ShardId, ShardSnapshotStatus> processWaitingShar
893894
// Shard that we were waiting for went into unassigned state or disappeared - giving up
894895
snapshotChanged = true;
895896
logger.warn("failing snapshot of shard [{}] on unassigned shard [{}]", shardId, shardStatus.nodeId());
896-
shards.put(shardId, new ShardSnapshotStatus(shardStatus.nodeId(), State.FAILED, "shard is unassigned"));
897+
shards.put(shardId, new ShardSnapshotStatus(shardStatus.nodeId(), ShardState.FAILED, "shard is unassigned"));
897898
} else {
898899
shards.put(shardId, shardStatus);
899900
}
@@ -943,7 +944,7 @@ private static Tuple<Set<String>, Set<String>> indicesWithMissingShards(
943944
Set<String> missing = new HashSet<>();
944945
Set<String> closed = new HashSet<>();
945946
for (ObjectObjectCursor<ShardId, SnapshotsInProgress.ShardSnapshotStatus> entry : shards) {
946-
if (entry.value.state() == State.MISSING) {
947+
if (entry.value.state() == ShardState.MISSING) {
947948
if (metaData.hasIndex(entry.key.getIndex().getName()) &&
948949
metaData.getIndexSafe(entry.key.getIndex()).getState() == IndexMetaData.State.CLOSE) {
949950
closed.add(entry.key.getIndex().getName());
@@ -1195,7 +1196,7 @@ public ClusterState execute(ClusterState currentState) {
11951196
for (ObjectObjectCursor<ShardId, ShardSnapshotStatus> shardEntry : snapshotEntry.shards()) {
11961197
ShardSnapshotStatus status = shardEntry.value;
11971198
if (status.state().completed() == false) {
1198-
status = new ShardSnapshotStatus(status.nodeId(), State.ABORTED, "aborted by snapshot deletion");
1199+
status = new ShardSnapshotStatus(status.nodeId(), ShardState.ABORTED, "aborted by snapshot deletion");
11991200
}
12001201
shardsBuilder.put(shardEntry.key, status);
12011202
}
@@ -1385,7 +1386,7 @@ private static ImmutableOpenMap<ShardId, SnapshotsInProgress.ShardSnapshotStatus
13851386
if (indexMetaData == null) {
13861387
// The index was deleted before we managed to start the snapshot - mark it as missing.
13871388
builder.put(new ShardId(indexName, IndexMetaData.INDEX_UUID_NA_VALUE, 0),
1388-
new SnapshotsInProgress.ShardSnapshotStatus(null, State.MISSING, "missing index"));
1389+
new SnapshotsInProgress.ShardSnapshotStatus(null, ShardState.MISSING, "missing index"));
13891390
} else {
13901391
IndexRoutingTable indexRoutingTable = clusterState.getRoutingTable().index(indexName);
13911392
for (int i = 0; i < indexMetaData.getNumberOfShards(); i++) {
@@ -1394,18 +1395,18 @@ private static ImmutableOpenMap<ShardId, SnapshotsInProgress.ShardSnapshotStatus
13941395
ShardRouting primary = indexRoutingTable.shard(i).primaryShard();
13951396
if (primary == null || !primary.assignedToNode()) {
13961397
builder.put(shardId,
1397-
new SnapshotsInProgress.ShardSnapshotStatus(null, State.MISSING, "primary shard is not allocated"));
1398+
new SnapshotsInProgress.ShardSnapshotStatus(null, ShardState.MISSING, "primary shard is not allocated"));
13981399
} else if (primary.relocating() || primary.initializing()) {
1399-
builder.put(shardId, new SnapshotsInProgress.ShardSnapshotStatus(primary.currentNodeId(), State.WAITING));
1400+
builder.put(shardId, new SnapshotsInProgress.ShardSnapshotStatus(primary.currentNodeId(), ShardState.WAITING));
14001401
} else if (!primary.started()) {
14011402
builder.put(shardId,
1402-
new SnapshotsInProgress.ShardSnapshotStatus(primary.currentNodeId(), State.MISSING,
1403+
new SnapshotsInProgress.ShardSnapshotStatus(primary.currentNodeId(), ShardState.MISSING,
14031404
"primary shard hasn't been started yet"));
14041405
} else {
14051406
builder.put(shardId, new SnapshotsInProgress.ShardSnapshotStatus(primary.currentNodeId()));
14061407
}
14071408
} else {
1408-
builder.put(shardId, new SnapshotsInProgress.ShardSnapshotStatus(null, State.MISSING,
1409+
builder.put(shardId, new SnapshotsInProgress.ShardSnapshotStatus(null, ShardState.MISSING,
14091410
"missing routing table"));
14101411
}
14111412
}

server/src/test/java/org/elasticsearch/cluster/SnapshotsInProgressTests.java

+6-5
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121

2222
import org.elasticsearch.cluster.SnapshotsInProgress.Entry;
2323
import org.elasticsearch.cluster.SnapshotsInProgress.ShardSnapshotStatus;
24+
import org.elasticsearch.cluster.SnapshotsInProgress.ShardState;
2425
import org.elasticsearch.cluster.SnapshotsInProgress.State;
2526
import org.elasticsearch.common.collect.ImmutableOpenMap;
2627
import org.elasticsearch.index.shard.ShardId;
@@ -55,11 +56,11 @@ public void testWaitingIndices() {
5556
ImmutableOpenMap.Builder<ShardId, ShardSnapshotStatus> shards = ImmutableOpenMap.builder();
5657

5758
// test more than one waiting shard in an index
58-
shards.put(new ShardId(idx1Name, idx1UUID, 0), new ShardSnapshotStatus(randomAlphaOfLength(2), State.WAITING));
59-
shards.put(new ShardId(idx1Name, idx1UUID, 1), new ShardSnapshotStatus(randomAlphaOfLength(2), State.WAITING));
59+
shards.put(new ShardId(idx1Name, idx1UUID, 0), new ShardSnapshotStatus(randomAlphaOfLength(2), ShardState.WAITING));
60+
shards.put(new ShardId(idx1Name, idx1UUID, 1), new ShardSnapshotStatus(randomAlphaOfLength(2), ShardState.WAITING));
6061
shards.put(new ShardId(idx1Name, idx1UUID, 2), new ShardSnapshotStatus(randomAlphaOfLength(2), randomNonWaitingState(), ""));
6162
// test exactly one waiting shard in an index
62-
shards.put(new ShardId(idx2Name, idx2UUID, 0), new ShardSnapshotStatus(randomAlphaOfLength(2), State.WAITING));
63+
shards.put(new ShardId(idx2Name, idx2UUID, 0), new ShardSnapshotStatus(randomAlphaOfLength(2), ShardState.WAITING));
6364
shards.put(new ShardId(idx2Name, idx2UUID, 1), new ShardSnapshotStatus(randomAlphaOfLength(2), randomNonWaitingState(), ""));
6465
// test no waiting shards in an index
6566
shards.put(new ShardId(idx3Name, idx3UUID, 0), new ShardSnapshotStatus(randomAlphaOfLength(2), randomNonWaitingState(), ""));
@@ -72,7 +73,7 @@ public void testWaitingIndices() {
7273
assertFalse(waitingIndices.containsKey(idx3Name));
7374
}
7475

75-
private State randomNonWaitingState() {
76-
return randomFrom(Arrays.stream(State.values()).filter(s -> s != State.WAITING).collect(Collectors.toSet()));
76+
private ShardState randomNonWaitingState() {
77+
return randomFrom(Arrays.stream(ShardState.values()).filter(s -> s != ShardState.WAITING).collect(Collectors.toSet()));
7778
}
7879
}

0 commit comments

Comments
 (0)