Skip to content

Commit d340530

Browse files
committed
Avoid overshooting watermarks during relocation (#46079)
Today the `DiskThresholdDecider` attempts to account for already-relocating shards when deciding how to allocate or relocate a shard. Its goal is to stop relocating shards onto a node before that node exceeds the low watermark, and to stop relocating shards away from a node as soon as the node drops below the high watermark. The decider handles multiple data paths by only accounting for relocating shards that affect the appropriate data path. However, this mechanism does not correctly account for _new_ relocating shards, which are unwittingly ignored. This means that we may evict far too many shards from a node above the high watermark, and may relocate far too many shards onto a node causing it to blow right past the low watermark and potentially other watermarks too. There are in fact two distinct issues that this PR fixes. New incoming shards have an unknown data path until the `ClusterInfoService` refreshes its statistics. New outgoing shards have a known data path, but we fail to account for the change of the corresponding `ShardRouting` from `STARTED` to `RELOCATING`, meaning that we fail to find the correct data path and treat the path as unknown here too. This PR also reworks the `MockDiskUsagesIT` test to avoid using fake data paths for all shards. With the changes here, the data paths are handled in tests as they are in production, except that their sizes are fake. Fixes #45177
1 parent b526309 commit d340530

File tree

6 files changed

+420
-211
lines changed

6 files changed

+420
-211
lines changed

server/src/main/java/org/elasticsearch/cluster/InternalClusterInfoService.java

+15-10
Original file line numberDiff line numberDiff line change
@@ -47,9 +47,8 @@
4747
import org.elasticsearch.threadpool.ThreadPool;
4848
import org.elasticsearch.transport.ReceiveTimeoutTransportException;
4949

50-
import java.util.ArrayList;
51-
import java.util.Collections;
5250
import java.util.List;
51+
import java.util.concurrent.CopyOnWriteArrayList;
5352
import java.util.concurrent.CountDownLatch;
5453
import java.util.concurrent.TimeUnit;
5554
import java.util.function.Consumer;
@@ -88,7 +87,7 @@ public class InternalClusterInfoService implements ClusterInfoService, LocalNode
8887
private final ClusterService clusterService;
8988
private final ThreadPool threadPool;
9089
private final NodeClient client;
91-
private final List<Consumer<ClusterInfo>> listeners = Collections.synchronizedList(new ArrayList<>(1));
90+
private final List<Consumer<ClusterInfo>> listeners = new CopyOnWriteArrayList<>();
9291

9392
public InternalClusterInfoService(Settings settings, ClusterService clusterService, ThreadPool threadPool, NodeClient client) {
9493
this.leastAvailableSpaceUsages = ImmutableOpenMap.of();
@@ -275,6 +274,11 @@ private void maybeRefresh() {
275274
}
276275
}
277276

277+
// allow tests to adjust the node stats on receipt
278+
List<NodeStats> adjustNodesStats(List<NodeStats> nodeStats) {
279+
return nodeStats;
280+
}
281+
278282
/**
279283
* Refreshes the ClusterInfo in a blocking fashion
280284
*/
@@ -284,12 +288,13 @@ public final ClusterInfo refresh() {
284288
}
285289
final CountDownLatch nodeLatch = updateNodeStats(new ActionListener<NodesStatsResponse>() {
286290
@Override
287-
public void onResponse(NodesStatsResponse nodeStatses) {
288-
ImmutableOpenMap.Builder<String, DiskUsage> newLeastAvaiableUsages = ImmutableOpenMap.builder();
289-
ImmutableOpenMap.Builder<String, DiskUsage> newMostAvaiableUsages = ImmutableOpenMap.builder();
290-
fillDiskUsagePerNode(logger, nodeStatses.getNodes(), newLeastAvaiableUsages, newMostAvaiableUsages);
291-
leastAvailableSpaceUsages = newLeastAvaiableUsages.build();
292-
mostAvailableSpaceUsages = newMostAvaiableUsages.build();
291+
public void onResponse(NodesStatsResponse nodesStatsResponse) {
292+
ImmutableOpenMap.Builder<String, DiskUsage> leastAvailableUsagesBuilder = ImmutableOpenMap.builder();
293+
ImmutableOpenMap.Builder<String, DiskUsage> mostAvailableUsagesBuilder = ImmutableOpenMap.builder();
294+
fillDiskUsagePerNode(logger, adjustNodesStats(nodesStatsResponse.getNodes()),
295+
leastAvailableUsagesBuilder, mostAvailableUsagesBuilder);
296+
leastAvailableSpaceUsages = leastAvailableUsagesBuilder.build();
297+
mostAvailableSpaceUsages = mostAvailableUsagesBuilder.build();
293298
}
294299

295300
@Override
@@ -402,7 +407,7 @@ static void fillDiskUsagePerNode(Logger logger, List<NodeStats> nodeStatsArray,
402407
if (leastAvailablePath == null) {
403408
assert mostAvailablePath == null;
404409
mostAvailablePath = leastAvailablePath = info;
405-
} else if (leastAvailablePath.getAvailable().getBytes() > info.getAvailable().getBytes()){
410+
} else if (leastAvailablePath.getAvailable().getBytes() > info.getAvailable().getBytes()) {
406411
leastAvailablePath = info;
407412
} else if (mostAvailablePath.getAvailable().getBytes() < info.getAvailable().getBytes()) {
408413
mostAvailablePath = info;

server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDecider.java

+26-6
Original file line numberDiff line numberDiff line change
@@ -90,16 +90,36 @@ static long sizeOfRelocatingShards(RoutingNode node, RoutingAllocation allocatio
9090
boolean subtractShardsMovingAway, String dataPath) {
9191
ClusterInfo clusterInfo = allocation.clusterInfo();
9292
long totalSize = 0;
93-
for (ShardRouting routing : node.shardsWithState(ShardRoutingState.RELOCATING, ShardRoutingState.INITIALIZING)) {
94-
String actualPath = clusterInfo.getDataPath(routing);
95-
if (dataPath.equals(actualPath)) {
96-
if (routing.initializing() && routing.relocatingNodeId() != null) {
97-
totalSize += getExpectedShardSize(routing, allocation, 0);
98-
} else if (subtractShardsMovingAway && routing.relocating()) {
93+
94+
for (ShardRouting routing : node.shardsWithState(ShardRoutingState.INITIALIZING)) {
95+
if (routing.relocatingNodeId() == null) {
96+
// in practice the only initializing-but-not-relocating shards with a nonzero expected shard size will be ones created
97+
// by a resize (shrink/split/clone) operation which we expect to happen using hard links, so they shouldn't be taking
98+
// any additional space and can be ignored here
99+
continue;
100+
}
101+
102+
final String actualPath = clusterInfo.getDataPath(routing);
103+
// if we don't yet know the actual path of the incoming shard then conservatively assume it's going to the path with the least
104+
// free space
105+
if (actualPath == null || actualPath.equals(dataPath)) {
106+
totalSize += getExpectedShardSize(routing, allocation, 0);
107+
}
108+
}
109+
110+
if (subtractShardsMovingAway) {
111+
for (ShardRouting routing : node.shardsWithState(ShardRoutingState.RELOCATING)) {
112+
String actualPath = clusterInfo.getDataPath(routing);
113+
if (actualPath == null) {
114+
// we might know the path of this shard from before when it was relocating
115+
actualPath = clusterInfo.getDataPath(routing.cancelRelocation());
116+
}
117+
if (dataPath.equals(actualPath)) {
99118
totalSize -= getExpectedShardSize(routing, allocation, 0);
100119
}
101120
}
102121
}
122+
103123
return totalSize;
104124
}
105125

server/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDeciderTests.java

+16-1
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@
2626
import org.elasticsearch.cluster.ClusterState;
2727
import org.elasticsearch.cluster.DiskUsage;
2828
import org.elasticsearch.cluster.ESAllocationTestCase;
29-
import org.elasticsearch.cluster.MockInternalClusterInfoService.DevNullClusterInfo;
3029
import org.elasticsearch.cluster.metadata.IndexMetaData;
3130
import org.elasticsearch.cluster.metadata.MetaData;
3231
import org.elasticsearch.cluster.node.DiscoveryNode;
@@ -1002,4 +1001,20 @@ public void logShardStates(ClusterState state) {
10021001
rn.shardsWithState(RELOCATING),
10031002
rn.shardsWithState(STARTED));
10041003
}
1004+
1005+
/**
1006+
* ClusterInfo that always reports /dev/null for the shards' data paths.
1007+
*/
1008+
static class DevNullClusterInfo extends ClusterInfo {
1009+
DevNullClusterInfo(ImmutableOpenMap<String, DiskUsage> leastAvailableSpaceUsage,
1010+
ImmutableOpenMap<String, DiskUsage> mostAvailableSpaceUsage,
1011+
ImmutableOpenMap<String, Long> shardSizes) {
1012+
super(leastAvailableSpaceUsage, mostAvailableSpaceUsage, shardSizes, null);
1013+
}
1014+
1015+
@Override
1016+
public String getDataPath(ShardRouting shardRouting) {
1017+
return "/dev/null";
1018+
}
1019+
}
10051020
}

server/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDeciderUnitTests.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@
2525
import org.elasticsearch.cluster.ClusterState;
2626
import org.elasticsearch.cluster.DiskUsage;
2727
import org.elasticsearch.cluster.ESAllocationTestCase;
28-
import org.elasticsearch.cluster.MockInternalClusterInfoService.DevNullClusterInfo;
28+
import org.elasticsearch.cluster.routing.allocation.decider.DiskThresholdDeciderTests.DevNullClusterInfo;
2929
import org.elasticsearch.cluster.metadata.IndexMetaData;
3030
import org.elasticsearch.cluster.metadata.MetaData;
3131
import org.elasticsearch.cluster.node.DiscoveryNode;

0 commit comments

Comments
 (0)