15
15
import org .elasticsearch .cluster .routing .ShardRouting ;
16
16
import org .elasticsearch .cluster .routing .allocation .RoutingAllocation ;
17
17
18
+ /**
19
+ * An allocation decider that ensures that all the shards allocated to the node scheduled for removal are relocated to the replacement node.
20
+ * It also ensures that auto-expands replicas are expanded to only the replacement source or target (not both at the same time)
21
+ * and only of the shards that were already present on the source node.
22
+ */
18
23
public class NodeReplacementAllocationDecider extends AllocationDecider {
19
24
20
25
public static final String NAME = "node_replacement" ;
@@ -38,8 +43,8 @@ public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, Routing
38
43
Decision .YES ,
39
44
NAME ,
40
45
"node [%s] is replacing node [%s], and may receive shards from it" ,
41
- shardRouting . currentNodeId (),
42
- node . nodeId ()
46
+ node . nodeId (),
47
+ shardRouting . currentNodeId ()
43
48
);
44
49
} else if (isReplacementSource (allocation , shardRouting .currentNodeId ())) {
45
50
if (allocation .isReconciling ()) {
@@ -110,27 +115,64 @@ public Decision shouldAutoExpandToNode(IndexMetadata indexMetadata, DiscoveryNod
110
115
return YES__NO_REPLACEMENTS ;
111
116
} else if (isReplacementTargetName (allocation , node .getName ())) {
112
117
final SingleNodeShutdownMetadata shutdown = allocation .replacementTargetShutdowns ().get (node .getName ());
113
- return allocation .decision (
114
- Decision .NO ,
115
- NAME ,
116
- "node [%s] is a node replacement target for node [%s], "
117
- + "shards cannot auto expand to be on it until the replacement is complete" ,
118
- node .getId (),
119
- shutdown == null ? null : shutdown .getNodeId ()
120
- );
118
+ final String sourceNodeId = shutdown != null ? shutdown .getNodeId () : null ;
119
+ final boolean hasShardsAllocatedOnSourceOrTarget = hasShardOnNode (indexMetadata , node .getId (), allocation )
120
+ || (sourceNodeId != null && hasShardOnNode (indexMetadata , sourceNodeId , allocation ));
121
+
122
+ if (hasShardsAllocatedOnSourceOrTarget ) {
123
+ return allocation .decision (
124
+ Decision .YES ,
125
+ NAME ,
126
+ "node [%s] is a node replacement target for node [%s], "
127
+ + "shard can auto expand to it as it was already present on the source node" ,
128
+ node .getId (),
129
+ sourceNodeId
130
+ );
131
+ } else {
132
+ return allocation .decision (
133
+ Decision .NO ,
134
+ NAME ,
135
+ "node [%s] is a node replacement target for node [%s], "
136
+ + "shards cannot auto expand to be on it until the replacement is complete" ,
137
+ node .getId (),
138
+ sourceNodeId
139
+ );
140
+ }
121
141
} else if (isReplacementSource (allocation , node .getId ())) {
122
- return allocation .decision (
123
- Decision .NO ,
124
- NAME ,
125
- "node [%s] is being replaced by [%s], shards cannot auto expand to be on it" ,
126
- node .getId (),
127
- getReplacementName (allocation , node .getId ())
128
- );
142
+ final SingleNodeShutdownMetadata shutdown = allocation .getClusterState ().metadata ().nodeShutdowns ().get (node .getId ());
143
+ final String replacementNodeName = shutdown != null ? shutdown .getTargetNodeName () : null ;
144
+ final boolean hasShardOnSource = hasShardOnNode (indexMetadata , node .getId (), allocation )
145
+ && shutdown != null
146
+ && allocation .getClusterState ().getNodes ().hasByName (replacementNodeName ) == false ;
147
+
148
+ if (hasShardOnSource ) {
149
+ return allocation .decision (
150
+ Decision .YES ,
151
+ NAME ,
152
+ "node [%s] is being replaced by [%s], shards can auto expand to be on it "
153
+ + "while replacement node has not joined the cluster" ,
154
+ node .getId (),
155
+ replacementNodeName
156
+ );
157
+ } else {
158
+ return allocation .decision (
159
+ Decision .NO ,
160
+ NAME ,
161
+ "node [%s] is being replaced by [%s], shards cannot auto expand to be on it" ,
162
+ node .getId (),
163
+ replacementNodeName
164
+ );
165
+ }
129
166
} else {
130
167
return YES__NO_APPLICABLE_REPLACEMENTS ;
131
168
}
132
169
}
133
170
171
+ private static boolean hasShardOnNode (IndexMetadata indexMetadata , String nodeId , RoutingAllocation allocation ) {
172
+ RoutingNode node = allocation .routingNodes ().node (nodeId );
173
+ return node != null && node .numberOfOwningShardsForIndex (indexMetadata .getIndex ()) >= 1 ;
174
+ }
175
+
134
176
@ Override
135
177
public Decision canForceAllocateDuringReplace (ShardRouting shardRouting , RoutingNode node , RoutingAllocation allocation ) {
136
178
if (replacementFromSourceToTarget (allocation , shardRouting .currentNodeId (), node .node ().getName ())) {
0 commit comments