Skip to content

Commit 88aff40

Browse files
author
Ali Beyad
authored
Primary shard allocator observes limits in forcing allocation (#19811)
Primary shard allocation observes limits in forcing allocation Previously, during primary shards allocation of shards with prior allocation IDs, if all nodes returned a NO decision for allocation (e.g. the settings blocked allocation on that node), we would chose one of those nodes and force the primary shard to be allocated to it. However, this meant that primary shard allocation would not adhere to the decision of the MaxRetryAllocationDecider, which would lead to attempting to allocate a shard which has failed N number of times already (presumably due to some configuration issue). This commit solves this issue by introducing the notion of force allocating a primary shard to a node and each decider implementation must implement whether this is allowed or not. In the case of MaxRetryAllocationDecider, it just forwards the request to canAllocate. Closes #19446
1 parent eea1bc7 commit 88aff40

File tree

9 files changed

+282
-27
lines changed

9 files changed

+282
-27
lines changed

core/src/main/java/org/elasticsearch/cluster/routing/allocation/RoutingAllocation.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -252,6 +252,17 @@ public void addIgnoreShardForNode(ShardId shardId, String nodeId) {
252252
nodes.add(nodeId);
253253
}
254254

255+
/**
256+
* Returns whether the given node id should be ignored from consideration when {@link AllocationDeciders}
257+
* is deciding whether to allocate the specified shard id to that node. The node will be ignored if
258+
* the specified shard failed on that node, triggering the current round of allocation. Since the shard
259+
* just failed on that node, we don't want to try to reassign it there, if the node is still a part
260+
* of the cluster.
261+
*
262+
* @param shardId the shard id to be allocated
263+
* @param nodeId the node id to check against
264+
* @return true if the node id should be ignored in allocation decisions, false otherwise
265+
*/
255266
public boolean shouldIgnoreShardForNode(ShardId shardId, String nodeId) {
256267
if (ignoredShardToNodes == null) {
257268
return false;

core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/AllocationDecider.java

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.elasticsearch.cluster.routing.RoutingNode;
2424
import org.elasticsearch.cluster.routing.ShardRouting;
2525
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
26+
import org.elasticsearch.cluster.routing.allocation.decider.Decision.Type;
2627
import org.elasticsearch.common.component.AbstractComponent;
2728
import org.elasticsearch.common.settings.Settings;
2829

@@ -98,4 +99,30 @@ public Decision canAllocate(RoutingNode node, RoutingAllocation allocation) {
9899
public Decision canRebalance(RoutingAllocation allocation) {
99100
return Decision.ALWAYS;
100101
}
102+
103+
/**
104+
* Returns a {@link Decision} whether the given primary shard can be
105+
* forcibly allocated on the given node. This method should only be called
106+
* for unassigned primary shards where the node has a shard copy on disk.
107+
*
108+
* Note: all implementations that override this behavior should take into account
109+
* the results of {@link #canAllocate(ShardRouting, RoutingNode, RoutingAllocation)}
110+
* before making a decision on force allocation, because force allocation should only
111+
* be considered if all deciders return {@link Decision#NO}.
112+
*/
113+
public Decision canForceAllocatePrimary(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
114+
assert shardRouting.primary() : "must not call canForceAllocatePrimary on a non-primary shard " + shardRouting;
115+
assert shardRouting.unassigned() : "must not call canForceAllocatePrimary on an assigned shard " + shardRouting;
116+
Decision decision = canAllocate(shardRouting, node, allocation);
117+
if (decision.type() == Type.NO) {
118+
// On a NO decision, by default, we allow force allocating the primary.
119+
return allocation.decision(Decision.YES,
120+
decision.label(),
121+
"primary shard [{}] allowed to force allocate on node [{}]",
122+
shardRouting.shardId(), node.nodeId());
123+
} else {
124+
// On a THROTTLE/YES decision, we use the same decision instead of forcing allocation
125+
return decision;
126+
}
127+
}
101128
}

core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/AllocationDeciders.java

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -196,4 +196,32 @@ public Decision canRebalance(RoutingAllocation allocation) {
196196
}
197197
return ret;
198198
}
199+
200+
@Override
201+
public Decision canForceAllocatePrimary(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
202+
assert shardRouting.primary() : "must not call canForceAllocatePrimary on a non-primary shard routing " + shardRouting;
203+
204+
if (allocation.shouldIgnoreShardForNode(shardRouting.shardId(), node.nodeId())) {
205+
return Decision.NO;
206+
}
207+
Decision.Multi ret = new Decision.Multi();
208+
for (AllocationDecider decider : allocations) {
209+
Decision decision = decider.canForceAllocatePrimary(shardRouting, node, allocation);
210+
// short track if a NO is returned.
211+
if (decision == Decision.NO) {
212+
if (logger.isTraceEnabled()) {
213+
logger.trace("Shard [{}] can not be forcefully allocated to node [{}] due to [{}].",
214+
shardRouting.shardId(), node.nodeId(), decider.getClass().getSimpleName());
215+
}
216+
if (!allocation.debugDecision()) {
217+
return decision;
218+
} else {
219+
ret.add(decision);
220+
}
221+
} else if (decision != Decision.ALWAYS) {
222+
ret.add(decision);
223+
}
224+
}
225+
return ret;
226+
}
199227
}

core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/Decision.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ public abstract class Decision implements ToXContent {
5353
* @param explanationParams additional parameters for the decision
5454
* @return new {@link Decision} instance
5555
*/
56-
public static Decision single(Type type, String label, String explanation, Object... explanationParams) {
56+
public static Decision single(Type type, @Nullable String label, @Nullable String explanation, @Nullable Object... explanationParams) {
5757
return new Single(type, label, explanation, explanationParams);
5858
}
5959

@@ -146,6 +146,9 @@ public static void writeTo(Type type, StreamOutput out) throws IOException {
146146
*/
147147
public abstract Type type();
148148

149+
/**
150+
* Get the description label for this decision.
151+
*/
149152
@Nullable
150153
public abstract String label();
151154

core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/MaxRetryAllocationDecider.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,4 +80,12 @@ public Decision canAllocate(ShardRouting shardRouting, RoutingAllocation allocat
8080
public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
8181
return canAllocate(shardRouting, allocation);
8282
}
83+
84+
@Override
85+
public Decision canForceAllocatePrimary(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
86+
assert shardRouting.primary() : "must not call canForceAllocatePrimary on a non-primary shard " + shardRouting;
87+
// check if we have passed the maximum retry threshold through canAllocate,
88+
// if so, we don't want to force the primary allocation here
89+
return canAllocate(shardRouting, node, allocation);
90+
}
8391
}

core/src/main/java/org/elasticsearch/gateway/PrimaryShardAllocator.java

Lines changed: 43 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -27,10 +27,12 @@
2727
import org.elasticsearch.cluster.routing.RoutingNode;
2828
import org.elasticsearch.cluster.routing.RoutingNodes;
2929
import org.elasticsearch.cluster.routing.ShardRouting;
30+
import org.elasticsearch.cluster.routing.UnassignedInfo;
3031
import org.elasticsearch.cluster.routing.UnassignedInfo.AllocationStatus;
3132
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
3233
import org.elasticsearch.cluster.routing.allocation.decider.Decision;
3334
import org.elasticsearch.common.component.AbstractComponent;
35+
import org.elasticsearch.common.logging.ESLogger;
3436
import org.elasticsearch.common.settings.Setting;
3537
import org.elasticsearch.common.settings.Setting.Property;
3638
import org.elasticsearch.common.settings.Settings;
@@ -47,8 +49,18 @@
4749
import java.util.stream.Collectors;
4850

4951
/**
50-
* The primary shard allocator allocates primary shard that were not created as
51-
* a result of an API to a node that held them last to be recovered.
52+
* The primary shard allocator allocates unassigned primary shards to nodes that hold
53+
* valid copies of the unassigned primaries. It does this by iterating over all unassigned
54+
* primary shards in the routing table and fetching shard metadata from each node in the cluster
55+
* that holds a copy of the shard. The shard metadata from each node is compared against the
56+
* set of valid allocation IDs and for all valid shard copies (if any), the primary shard allocator
57+
* executes the allocation deciders to chose a copy to assign the primary shard to.
58+
*
59+
* Note that the PrimaryShardAllocator does *not* allocate primaries on index creation
60+
* (see {@link org.elasticsearch.cluster.routing.allocation.allocator.BalancedShardsAllocator}),
61+
* nor does it allocate primaries when a primary shard failed and there is a valid replica
62+
* copy that can immediately be promoted to primary, as this takes place in
63+
* {@link RoutingNodes#failShard(ESLogger, ShardRouting, UnassignedInfo, IndexMetaData)}.
5264
*/
5365
public abstract class PrimaryShardAllocator extends AbstractComponent {
5466

@@ -154,17 +166,35 @@ public boolean allocateUnassigned(RoutingAllocation allocation) {
154166
continue;
155167
}
156168

157-
final NodesToAllocate nodesToAllocate = buildNodesToAllocate(shard, allocation, nodeShardsResult.orderedAllocationCandidates);
169+
final NodesToAllocate nodesToAllocate = buildNodesToAllocate(
170+
allocation, nodeShardsResult.orderedAllocationCandidates, shard, false
171+
);
158172
if (nodesToAllocate.yesNodeShards.isEmpty() == false) {
159173
NodeGatewayStartedShards nodeShardState = nodesToAllocate.yesNodeShards.get(0);
160174
logger.debug("[{}][{}]: allocating [{}] to [{}] on primary allocation", shard.index(), shard.id(), shard, nodeShardState.getNode());
161175
changed = true;
162176
unassignedIterator.initialize(nodeShardState.getNode().getId(), nodeShardState.allocationId(), ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE);
163177
} else if (nodesToAllocate.throttleNodeShards.isEmpty() == true && nodesToAllocate.noNodeShards.isEmpty() == false) {
164-
NodeGatewayStartedShards nodeShardState = nodesToAllocate.noNodeShards.get(0);
165-
logger.debug("[{}][{}]: forcing allocating [{}] to [{}] on primary allocation", shard.index(), shard.id(), shard, nodeShardState.getNode());
166-
changed = true;
167-
unassignedIterator.initialize(nodeShardState.getNode().getId(), nodeShardState.allocationId(), ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE);
178+
// The deciders returned a NO decision for all nodes with shard copies, so we check if primary shard
179+
// can be force-allocated to one of the nodes.
180+
final NodesToAllocate nodesToForceAllocate = buildNodesToAllocate(
181+
allocation, nodeShardsResult.orderedAllocationCandidates, shard, true
182+
);
183+
if (nodesToForceAllocate.yesNodeShards.isEmpty() == false) {
184+
NodeGatewayStartedShards nodeShardState = nodesToForceAllocate.yesNodeShards.get(0);
185+
logger.debug("[{}][{}]: allocating [{}] to [{}] on forced primary allocation",
186+
shard.index(), shard.id(), shard, nodeShardState.getNode());
187+
changed = true;
188+
unassignedIterator.initialize(nodeShardState.getNode().getId(), nodeShardState.allocationId(),
189+
ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE);
190+
} else if (nodesToForceAllocate.throttleNodeShards.isEmpty() == false) {
191+
logger.debug("[{}][{}]: throttling allocation [{}] to [{}] on forced primary allocation",
192+
shard.index(), shard.id(), shard, nodesToForceAllocate.throttleNodeShards);
193+
changed |= unassignedIterator.removeAndIgnore(AllocationStatus.DECIDERS_THROTTLED);
194+
} else {
195+
logger.debug("[{}][{}]: forced primary allocation denied [{}]", shard.index(), shard.id(), shard);
196+
changed |= unassignedIterator.removeAndIgnore(AllocationStatus.DECIDERS_NO);
197+
}
168198
} else {
169199
// we are throttling this, but we have enough to allocate to this node, ignore it for now
170200
logger.debug("[{}][{}]: throttling allocation [{}] to [{}] on primary allocation", shard.index(), shard.id(), shard, nodesToAllocate.throttleNodeShards);
@@ -268,7 +298,10 @@ private boolean isEnoughVersionBasedAllocationsFound(IndexMetaData indexMetaData
268298
/**
269299
* Split the list of node shard states into groups yes/no/throttle based on allocation deciders
270300
*/
271-
private NodesToAllocate buildNodesToAllocate(ShardRouting shard, RoutingAllocation allocation, List<NodeGatewayStartedShards> nodeShardStates) {
301+
private NodesToAllocate buildNodesToAllocate(RoutingAllocation allocation,
302+
List<NodeGatewayStartedShards> nodeShardStates,
303+
ShardRouting shardRouting,
304+
boolean forceAllocate) {
272305
List<NodeGatewayStartedShards> yesNodeShards = new ArrayList<>();
273306
List<NodeGatewayStartedShards> throttledNodeShards = new ArrayList<>();
274307
List<NodeGatewayStartedShards> noNodeShards = new ArrayList<>();
@@ -278,7 +311,8 @@ private NodesToAllocate buildNodesToAllocate(ShardRouting shard, RoutingAllocati
278311
continue;
279312
}
280313

281-
Decision decision = allocation.deciders().canAllocate(shard, node, allocation);
314+
Decision decision = forceAllocate ? allocation.deciders().canForceAllocatePrimary(shardRouting, node, allocation) :
315+
allocation.deciders().canAllocate(shardRouting, node, allocation);
282316
if (decision.type() == Decision.Type.THROTTLE) {
283317
throttledNodeShards.add(nodeShardState);
284318
} else if (decision.type() == Decision.Type.NO) {

core/src/test/java/org/elasticsearch/cluster/routing/PrimaryAllocationIT.java

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import org.elasticsearch.action.admin.indices.shards.IndicesShardStoresResponse;
2525
import org.elasticsearch.action.support.ActiveShardCount;
2626
import org.elasticsearch.cluster.ClusterState;
27+
import org.elasticsearch.cluster.metadata.IndexMetaData;
2728
import org.elasticsearch.cluster.routing.allocation.command.AllocateEmptyPrimaryAllocationCommand;
2829
import org.elasticsearch.cluster.routing.allocation.command.AllocateStalePrimaryAllocationCommand;
2930
import org.elasticsearch.common.collect.ImmutableOpenIntMap;
@@ -219,4 +220,32 @@ public void testNotWaitForQuorumCopies() throws Exception {
219220
ensureYellow("test");
220221
assertHitCount(client().prepareSearch().setSize(0).setQuery(matchAllQuery()).get(), 1L);
221222
}
223+
224+
/**
225+
* This test ensures that for an unassigned primary shard that has a valid shard copy on at least one node,
226+
* we will force allocate the primary shard to one of those nodes, even if the allocation deciders all return
227+
* a NO decision to allocate.
228+
*/
229+
public void testForceAllocatePrimaryOnNoDecision() throws Exception {
230+
logger.info("--> starting 1 node");
231+
final String node = internalCluster().startNodeAsync().get();
232+
logger.info("--> creating index with 1 primary and 0 replicas");
233+
final String indexName = "test-idx";
234+
assertAcked(client().admin().indices()
235+
.prepareCreate(indexName)
236+
.setSettings(Settings.builder().put(IndexMetaData.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 1)
237+
.put(IndexMetaData.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), 0))
238+
.get());
239+
logger.info("--> update the settings to prevent allocation to the data node");
240+
assertTrue(client().admin().indices().prepareUpdateSettings(indexName)
241+
.setSettings(Settings.builder().put(IndexMetaData.INDEX_ROUTING_EXCLUDE_GROUP_SETTING.getKey() + "_name", node))
242+
.get()
243+
.isAcknowledged());
244+
logger.info("--> full cluster restart");
245+
internalCluster().fullRestart();
246+
logger.info("--> checking that the primary shard is force allocated to the data node despite being blocked by the exclude filter");
247+
ensureGreen(indexName);
248+
assertEquals(1, client().admin().cluster().prepareState().get().getState()
249+
.routingTable().index(indexName).shardsWithState(ShardRoutingState.STARTED).size());
250+
}
222251
}

0 commit comments

Comments
 (0)