Skip to content

Commit 58f66cf

Browse files
authored
Delay shard reassignment from nodes which are known to be restarting (elastic#75606)
This PR makes the delayed allocation infrastructure aware of registered node shutdowns, so that reallocation of shards will be further delayed for nodes which are known to be restarting. To make this more configurable, the Node Shutdown APIs now support a `allocation_delay` parameter, which defaults to 5 minutes. For example: ``` PUT /_nodes/USpTGYaBSIKbgSUJR2Z9lg/shutdown { "type": "restart", "reason": "Demonstrating how the node shutdown API works", "allocation_delay": "20m" } ``` Will cause reallocation of shards assigned to that node to another node to be delayed by 20 minutes. Note that this delay will only be used if it's *longer* than the index-level allocation delay, set via `index.unassigned.node_left.delayed_timeout`. The `allocation_delay` parameter is only valid for `restart`-type shutdown registrations, and the request will be rejected if it's used with another shutdown type.
1 parent 189f650 commit 58f66cf

File tree

27 files changed

+935
-154
lines changed

27 files changed

+935
-154
lines changed

server/src/internalClusterTest/java/org/elasticsearch/cluster/ClusterStateDiffIT.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
package org.elasticsearch.cluster;
1010

1111
import com.carrotsearch.hppc.cursors.ObjectCursor;
12+
1213
import org.elasticsearch.cluster.block.ClusterBlock;
1314
import org.elasticsearch.cluster.block.ClusterBlocks;
1415
import org.elasticsearch.cluster.coordination.CoordinationMetadata;
@@ -58,7 +59,7 @@
5859
import static java.util.Collections.emptySet;
5960
import static org.elasticsearch.cluster.metadata.AliasMetadata.newAliasMetadataBuilder;
6061
import static org.elasticsearch.cluster.routing.RandomShardRoutingMutator.randomChange;
61-
import static org.elasticsearch.cluster.routing.RandomShardRoutingMutator.randomReason;
62+
import static org.elasticsearch.cluster.routing.UnassignedInfoTests.randomUnassignedInfo;
6263
import static org.elasticsearch.test.VersionUtils.randomVersion;
6364
import static org.elasticsearch.test.XContentTestUtils.convertToMap;
6465
import static org.elasticsearch.test.XContentTestUtils.differenceBetweenMapsIgnoringArrayOrder;
@@ -270,7 +271,7 @@ private IndexRoutingTable randomIndexRoutingTable(String index, String[] nodeIds
270271
for (int j = 0; j < replicaCount; j++) {
271272
UnassignedInfo unassignedInfo = null;
272273
if (randomInt(5) == 1) {
273-
unassignedInfo = new UnassignedInfo(randomReason(), randomAlphaOfLength(10));
274+
unassignedInfo = randomUnassignedInfo(randomAlphaOfLength(10));
274275
}
275276
if (availableNodeIds.isEmpty()) {
276277
break;

server/src/main/java/org/elasticsearch/cluster/metadata/Metadata.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -719,6 +719,12 @@ public Map<String, DataStreamAlias> dataStreamAliases() {
719719
.orElse(Collections.emptyMap());
720720
}
721721

722+
public Map<String, SingleNodeShutdownMetadata> nodeShutdowns() {
723+
return Optional.ofNullable((NodesShutdownMetadata) this.custom(NodesShutdownMetadata.TYPE))
724+
.map(NodesShutdownMetadata::getAllNodeMetadataMap)
725+
.orElse(Collections.emptyMap());
726+
}
727+
722728
public ImmutableOpenMap<String, Custom> customs() {
723729
return this.customs;
724730
}

server/src/main/java/org/elasticsearch/cluster/metadata/NodesShutdownMetadata.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,10 +14,10 @@
1414
import org.elasticsearch.cluster.Diff;
1515
import org.elasticsearch.cluster.DiffableUtils;
1616
import org.elasticsearch.cluster.NamedDiff;
17-
import org.elasticsearch.common.xcontent.ParseField;
1817
import org.elasticsearch.common.io.stream.StreamInput;
1918
import org.elasticsearch.common.io.stream.StreamOutput;
2019
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
20+
import org.elasticsearch.common.xcontent.ParseField;
2121
import org.elasticsearch.common.xcontent.XContentBuilder;
2222
import org.elasticsearch.common.xcontent.XContentParser;
2323

server/src/main/java/org/elasticsearch/cluster/metadata/SingleNodeShutdownMetadata.java

Lines changed: 56 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -10,13 +10,16 @@
1010

1111
import org.elasticsearch.cluster.AbstractDiffable;
1212
import org.elasticsearch.cluster.Diffable;
13-
import org.elasticsearch.common.xcontent.ParseField;
1413
import org.elasticsearch.common.io.stream.StreamInput;
1514
import org.elasticsearch.common.io.stream.StreamOutput;
1615
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
16+
import org.elasticsearch.common.xcontent.ObjectParser;
17+
import org.elasticsearch.common.xcontent.ParseField;
1718
import org.elasticsearch.common.xcontent.ToXContentObject;
1819
import org.elasticsearch.common.xcontent.XContentBuilder;
1920
import org.elasticsearch.common.xcontent.XContentParser;
21+
import org.elasticsearch.core.Nullable;
22+
import org.elasticsearch.core.TimeValue;
2023

2124
import java.io.IOException;
2225
import java.util.Locale;
@@ -35,14 +38,16 @@ public class SingleNodeShutdownMetadata extends AbstractDiffable<SingleNodeShutd
3538
public static final ParseField REASON_FIELD = new ParseField("reason");
3639
public static final String STARTED_AT_READABLE_FIELD = "shutdown_started";
3740
public static final ParseField STARTED_AT_MILLIS_FIELD = new ParseField(STARTED_AT_READABLE_FIELD + "millis");
41+
public static final ParseField ALLOCATION_DELAY_FIELD = new ParseField("allocation_delay");
3842

3943
public static final ConstructingObjectParser<SingleNodeShutdownMetadata, Void> PARSER = new ConstructingObjectParser<>(
4044
"node_shutdown_info",
4145
a -> new SingleNodeShutdownMetadata(
4246
(String) a[0],
4347
Type.valueOf((String) a[1]),
4448
(String) a[2],
45-
(long) a[3]
49+
(long) a[3],
50+
(TimeValue) a[4]
4651
)
4752
);
4853

@@ -51,16 +56,24 @@ public class SingleNodeShutdownMetadata extends AbstractDiffable<SingleNodeShutd
5156
PARSER.declareString(ConstructingObjectParser.constructorArg(), TYPE_FIELD);
5257
PARSER.declareString(ConstructingObjectParser.constructorArg(), REASON_FIELD);
5358
PARSER.declareLong(ConstructingObjectParser.constructorArg(), STARTED_AT_MILLIS_FIELD);
59+
PARSER.declareField(
60+
ConstructingObjectParser.optionalConstructorArg(),
61+
(p, c) -> TimeValue.parseTimeValue(p.textOrNull(), ALLOCATION_DELAY_FIELD.getPreferredName()), ALLOCATION_DELAY_FIELD,
62+
ObjectParser.ValueType.STRING_OR_NULL
63+
);
5464
}
5565

5666
public static SingleNodeShutdownMetadata parse(XContentParser parser) {
5767
return PARSER.apply(parser, null);
5868
}
5969

70+
public static final TimeValue DEFAULT_RESTART_SHARD_ALLOCATION_DELAY = TimeValue.timeValueMinutes(5);
71+
6072
private final String nodeId;
6173
private final Type type;
6274
private final String reason;
6375
private final long startedAtMillis;
76+
@Nullable private final TimeValue allocationDelay;
6477

6578
/**
6679
* @param nodeId The node ID that this shutdown metadata refers to.
@@ -72,19 +85,25 @@ private SingleNodeShutdownMetadata(
7285
String nodeId,
7386
Type type,
7487
String reason,
75-
long startedAtMillis
88+
long startedAtMillis,
89+
@Nullable TimeValue allocationDelay
7690
) {
7791
this.nodeId = Objects.requireNonNull(nodeId, "node ID must not be null");
7892
this.type = Objects.requireNonNull(type, "shutdown type must not be null");
7993
this.reason = Objects.requireNonNull(reason, "shutdown reason must not be null");
8094
this.startedAtMillis = startedAtMillis;
95+
if (allocationDelay != null && Type.RESTART.equals(type) == false) {
96+
throw new IllegalArgumentException("shard allocation delay is only valid for RESTART-type shutdowns");
97+
}
98+
this.allocationDelay = allocationDelay;
8199
}
82100

83101
public SingleNodeShutdownMetadata(StreamInput in) throws IOException {
84102
this.nodeId = in.readString();
85103
this.type = in.readEnum(Type.class);
86104
this.reason = in.readString();
87105
this.startedAtMillis = in.readVLong();
106+
this.allocationDelay = in.readOptionalTimeValue();
88107
}
89108

90109
/**
@@ -115,12 +134,27 @@ public long getStartedAtMillis() {
115134
return startedAtMillis;
116135
}
117136

137+
/**
138+
* @return The amount of time shard reallocation should be delayed for shards on this node, so that they will not be automatically
139+
* reassigned while the node is restarting. Will be {@code null} for non-restart shutdowns.
140+
*/
141+
@Nullable
142+
public TimeValue getAllocationDelay() {
143+
if (allocationDelay != null) {
144+
return allocationDelay;
145+
} else if (Type.RESTART.equals(type)) {
146+
return DEFAULT_RESTART_SHARD_ALLOCATION_DELAY;
147+
}
148+
return null;
149+
}
150+
118151
@Override
119152
public void writeTo(StreamOutput out) throws IOException {
120153
out.writeString(nodeId);
121154
out.writeEnum(type);
122155
out.writeString(reason);
123156
out.writeVLong(startedAtMillis);
157+
out.writeOptionalTimeValue(allocationDelay);
124158
}
125159

126160
@Override
@@ -131,6 +165,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
131165
builder.field(TYPE_FIELD.getPreferredName(), type);
132166
builder.field(REASON_FIELD.getPreferredName(), reason);
133167
builder.timeField(STARTED_AT_MILLIS_FIELD.getPreferredName(), STARTED_AT_READABLE_FIELD, startedAtMillis);
168+
if (allocationDelay != null) {
169+
builder.field(ALLOCATION_DELAY_FIELD.getPreferredName(), allocationDelay.getStringRep());
170+
}
134171
}
135172
builder.endObject();
136173

@@ -145,7 +182,8 @@ public boolean equals(Object o) {
145182
return getStartedAtMillis() == that.getStartedAtMillis()
146183
&& getNodeId().equals(that.getNodeId())
147184
&& getType() == that.getType()
148-
&& getReason().equals(that.getReason());
185+
&& getReason().equals(that.getReason())
186+
&& Objects.equals(allocationDelay, that.allocationDelay);
149187
}
150188

151189
@Override
@@ -154,7 +192,8 @@ public int hashCode() {
154192
getNodeId(),
155193
getType(),
156194
getReason(),
157-
getStartedAtMillis()
195+
getStartedAtMillis(),
196+
allocationDelay
158197
);
159198
}
160199

@@ -178,6 +217,7 @@ public static class Builder {
178217
private Type type;
179218
private String reason;
180219
private long startedAtMillis = -1;
220+
private TimeValue allocationDelay;
181221

182222
private Builder() {}
183223

@@ -217,15 +257,25 @@ public Builder setStartedAtMillis(long startedAtMillis) {
217257
return this;
218258
}
219259

260+
/**
261+
* @param allocationDelay The amount of time shard reallocation should be delayed while this node is offline.
262+
* @return This builder.
263+
*/
264+
public Builder setAllocationDelay(TimeValue allocationDelay) {
265+
this.allocationDelay = allocationDelay;
266+
return this;
267+
}
268+
220269
public SingleNodeShutdownMetadata build() {
221270
if (startedAtMillis == -1) {
222271
throw new IllegalArgumentException("start timestamp must be set");
223272
}
273+
224274
return new SingleNodeShutdownMetadata(
225275
nodeId,
226276
type,
227277
reason,
228-
startedAtMillis
278+
startedAtMillis, allocationDelay
229279
);
230280
}
231281
}

server/src/main/java/org/elasticsearch/cluster/routing/RoutingNodes.java

Lines changed: 28 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -9,18 +9,20 @@
99
package org.elasticsearch.cluster.routing;
1010

1111
import com.carrotsearch.hppc.cursors.ObjectCursor;
12+
1213
import org.apache.logging.log4j.Logger;
1314
import org.apache.lucene.util.CollectionUtil;
1415
import org.elasticsearch.Assertions;
1516
import org.elasticsearch.cluster.ClusterState;
1617
import org.elasticsearch.cluster.metadata.IndexMetadata;
1718
import org.elasticsearch.cluster.metadata.Metadata;
19+
import org.elasticsearch.cluster.metadata.SingleNodeShutdownMetadata;
1820
import org.elasticsearch.cluster.node.DiscoveryNode;
1921
import org.elasticsearch.cluster.routing.UnassignedInfo.AllocationStatus;
2022
import org.elasticsearch.cluster.routing.allocation.ExistingShardsAllocator;
2123
import org.elasticsearch.cluster.service.MasterService;
22-
import org.elasticsearch.core.Nullable;
2324
import org.elasticsearch.common.Randomness;
25+
import org.elasticsearch.core.Nullable;
2426
import org.elasticsearch.core.Tuple;
2527
import org.elasticsearch.index.Index;
2628
import org.elasticsearch.index.shard.ShardId;
@@ -65,6 +67,8 @@ public class RoutingNodes implements Iterable<RoutingNode> {
6567

6668
private final Map<ShardId, List<ShardRouting>> assignedShards = new HashMap<>();
6769

70+
private final Map<String, SingleNodeShutdownMetadata> nodeShutdowns;
71+
6872
private final boolean readOnly;
6973

7074
private int inactivePrimaryCount = 0;
@@ -83,6 +87,7 @@ public RoutingNodes(ClusterState clusterState) {
8387
public RoutingNodes(ClusterState clusterState, boolean readOnly) {
8488
this.readOnly = readOnly;
8589
final RoutingTable routingTable = clusterState.routingTable();
90+
nodeShutdowns = clusterState.metadata().nodeShutdowns();
8691

8792
Map<String, LinkedHashMap<ShardId, ShardRouting>> nodesToShards = new HashMap<>();
8893
// fill in the nodeToShards with the "live" nodes
@@ -533,9 +538,17 @@ assert getByAllocationId(failedShard.shardId(), failedShard.allocationId().getId
533538
// re-resolve replica as earlier iteration could have changed source/target of replica relocation
534539
ShardRouting replicaShard = getByAllocationId(routing.shardId(), routing.allocationId().getId());
535540
assert replicaShard != null : "failed to re-resolve " + routing + " when failing replicas";
536-
UnassignedInfo primaryFailedUnassignedInfo = new UnassignedInfo(UnassignedInfo.Reason.PRIMARY_FAILED,
537-
"primary failed while replica initializing", null, 0, unassignedInfo.getUnassignedTimeInNanos(),
538-
unassignedInfo.getUnassignedTimeInMillis(), false, AllocationStatus.NO_ATTEMPT, Collections.emptySet());
541+
UnassignedInfo primaryFailedUnassignedInfo = new UnassignedInfo(
542+
UnassignedInfo.Reason.PRIMARY_FAILED,
543+
"primary failed while replica initializing",
544+
null,
545+
0,
546+
unassignedInfo.getUnassignedTimeInNanos(),
547+
unassignedInfo.getUnassignedTimeInMillis(),
548+
false,
549+
AllocationStatus.NO_ATTEMPT,
550+
Collections.emptySet(),
551+
routing.currentNodeId());
539552
failShard(logger, replicaShard, primaryFailedUnassignedInfo, indexMetadata, routingChangesObserver);
540553
}
541554
}
@@ -858,10 +871,17 @@ public void ignoreShard(ShardRouting shard, AllocationStatus allocationStatus, R
858871
UnassignedInfo currInfo = shard.unassignedInfo();
859872
assert currInfo != null;
860873
if (allocationStatus.equals(currInfo.getLastAllocationStatus()) == false) {
861-
UnassignedInfo newInfo = new UnassignedInfo(currInfo.getReason(), currInfo.getMessage(), currInfo.getFailure(),
862-
currInfo.getNumFailedAllocations(), currInfo.getUnassignedTimeInNanos(),
863-
currInfo.getUnassignedTimeInMillis(), currInfo.isDelayed(),
864-
allocationStatus, currInfo.getFailedNodeIds());
874+
UnassignedInfo newInfo = new UnassignedInfo(
875+
currInfo.getReason(),
876+
currInfo.getMessage(),
877+
currInfo.getFailure(),
878+
currInfo.getNumFailedAllocations(),
879+
currInfo.getUnassignedTimeInNanos(),
880+
currInfo.getUnassignedTimeInMillis(),
881+
currInfo.isDelayed(),
882+
allocationStatus,
883+
currInfo.getFailedNodeIds(),
884+
currInfo.getLastAllocatedNodeId());
865885
ShardRouting updatedShard = shard.updateUnassigned(newInfo, shard.recoverySource());
866886
changes.unassignedInfoUpdated(shard, newInfo);
867887
shard = updatedShard;

0 commit comments

Comments
 (0)