Skip to content

Commit 7d8ce43

Browse files
committed
Avoid parallel reroutes in DiskThresholdMonitor
Today the `DiskThresholdMonitor` limits the frequency with which it submits reroute tasks, but it might still submit these tasks faster than the master can process them if, for instance, each reroute takes over 60 seconds. This causes a problem since the reroute task runs with priority `IMMEDIATE` and is always scheduled when there is a node over the high watermark, so this can starve any other pending tasks on the master. This change avoids further updates from the monitor while its last task(s) are still in progress, and it measures the time of each update from the completion time of the reroute task rather than its start time, to allow a larger window for other tasks to run. Fixes elastic#40174
1 parent 99495aa commit 7d8ce43

File tree

6 files changed

+189
-92
lines changed

6 files changed

+189
-92
lines changed

server/src/main/java/org/elasticsearch/cluster/InternalClusterInfoService.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -131,13 +131,13 @@ public void onMaster() {
131131
logger.trace("I have been elected master, scheduling a ClusterInfoUpdateJob");
132132
}
133133

134-
// Submit a job that will start after DEFAULT_STARTING_INTERVAL, and reschedule itself after running
134+
// Submit a job that will reschedule itself after running
135135
threadPool.scheduleUnlessShuttingDown(updateFrequency, executorName(), new SubmitReschedulingClusterInfoUpdatedJob());
136136

137137
try {
138138
if (clusterService.state().getNodes().getDataNodes().size() > 1) {
139139
// Submit an info update job to be run immediately
140-
threadPool.executor(executorName()).execute(() -> maybeRefresh());
140+
threadPool.executor(executorName()).execute(this::maybeRefresh);
141141
}
142142
} catch (EsRejectedExecutionException ex) {
143143
logger.debug("Couldn't schedule cluster info update task - node might be shutting down", ex);
@@ -173,7 +173,7 @@ public void clusterChanged(ClusterChangedEvent event) {
173173
if (logger.isDebugEnabled()) {
174174
logger.debug("data node was added, retrieving new cluster info");
175175
}
176-
threadPool.executor(executorName()).execute(() -> maybeRefresh());
176+
threadPool.executor(executorName()).execute(this::maybeRefresh);
177177
}
178178

179179
if (this.isMaster && event.nodesRemoved()) {
@@ -316,7 +316,7 @@ public void onResponse(IndicesStatsResponse indicesStatsResponse) {
316316
ShardStats[] stats = indicesStatsResponse.getShards();
317317
ImmutableOpenMap.Builder<String, Long> newShardSizes = ImmutableOpenMap.builder();
318318
ImmutableOpenMap.Builder<ShardRouting, String> newShardRoutingToDataPath = ImmutableOpenMap.builder();
319-
buildShardLevelInfo(logger, stats, newShardSizes, newShardRoutingToDataPath, clusterService.state());
319+
buildShardLevelInfo(logger, stats, newShardSizes, newShardRoutingToDataPath);
320320
shardSizes = newShardSizes.build();
321321
shardRoutingToDataPath = newShardRoutingToDataPath.build();
322322
}
@@ -365,7 +365,7 @@ public void onFailure(Exception e) {
365365
}
366366

367367
static void buildShardLevelInfo(Logger logger, ShardStats[] stats, ImmutableOpenMap.Builder<String, Long> newShardSizes,
368-
ImmutableOpenMap.Builder<ShardRouting, String> newShardRoutingToDataPath, ClusterState state) {
368+
ImmutableOpenMap.Builder<ShardRouting, String> newShardRoutingToDataPath) {
369369
for (ShardStats s : stats) {
370370
newShardRoutingToDataPath.put(s.getShardRouting(), s.getDataPath());
371371
long size = s.getStats().getStore().sizeInBytes();

server/src/main/java/org/elasticsearch/cluster/routing/allocation/DiskThresholdMonitor.java

Lines changed: 98 additions & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -21,12 +21,16 @@
2121

2222
import java.util.HashSet;
2323
import java.util.Set;
24+
import java.util.concurrent.atomic.AtomicBoolean;
25+
import java.util.function.LongSupplier;
2426
import java.util.function.Supplier;
2527

2628
import com.carrotsearch.hppc.ObjectLookupContainer;
2729
import com.carrotsearch.hppc.cursors.ObjectObjectCursor;
2830
import org.apache.logging.log4j.LogManager;
2931
import org.apache.logging.log4j.Logger;
32+
import org.elasticsearch.action.ActionListener;
33+
import org.elasticsearch.action.support.GroupedActionListener;
3034
import org.elasticsearch.client.Client;
3135
import org.elasticsearch.cluster.ClusterInfo;
3236
import org.elasticsearch.cluster.ClusterState;
@@ -54,11 +58,14 @@ public class DiskThresholdMonitor {
5458
private final Client client;
5559
private final Set<String> nodeHasPassedWatermark = Sets.newConcurrentHashSet();
5660
private final Supplier<ClusterState> clusterStateSupplier;
57-
private long lastRunNS;
61+
private final LongSupplier currentTimeMillisSupplier;
62+
private long lastRunTimeMillis = Long.MIN_VALUE;
63+
private final AtomicBoolean checkInProgress = new AtomicBoolean();
5864

5965
public DiskThresholdMonitor(Settings settings, Supplier<ClusterState> clusterStateSupplier, ClusterSettings clusterSettings,
60-
Client client) {
66+
Client client, LongSupplier currentTimeMillisSupplier) {
6167
this.clusterStateSupplier = clusterStateSupplier;
68+
this.currentTimeMillisSupplier = currentTimeMillisSupplier;
6269
this.diskThresholdSettings = new DiskThresholdSettings(settings, clusterSettings);
6370
this.client = client;
6471
}
@@ -92,88 +99,112 @@ private void warnAboutDiskIfNeeded(DiskUsage usage) {
9299
}
93100
}
94101

102+
private void checkFinished() {
103+
final boolean checkFinished = checkInProgress.compareAndSet(true, false);
104+
assert checkFinished;
105+
}
95106

96107
public void onNewInfo(ClusterInfo info) {
97-
ImmutableOpenMap<String, DiskUsage> usages = info.getNodeLeastAvailableDiskUsages();
98-
if (usages != null) {
99-
boolean reroute = false;
100-
String explanation = "";
101-
102-
// Garbage collect nodes that have been removed from the cluster
103-
// from the map that tracks watermark crossing
104-
ObjectLookupContainer<String> nodes = usages.keys();
105-
for (String node : nodeHasPassedWatermark) {
106-
if (nodes.contains(node) == false) {
107-
nodeHasPassedWatermark.remove(node);
108-
}
108+
109+
if (checkInProgress.compareAndSet(false, true) == false) {
110+
logger.info("skipping monitor as a check is already in progress");
111+
return;
112+
}
113+
114+
final ImmutableOpenMap<String, DiskUsage> usages = info.getNodeLeastAvailableDiskUsages();
115+
if (usages == null) {
116+
checkFinished();
117+
return;
118+
}
119+
120+
boolean reroute = false;
121+
String explanation = "";
122+
final long currentTimeMillis = currentTimeMillisSupplier.getAsLong();
123+
124+
// Garbage collect nodes that have been removed from the cluster
125+
// from the map that tracks watermark crossing
126+
final ObjectLookupContainer<String> nodes = usages.keys();
127+
for (String node : nodeHasPassedWatermark) {
128+
if (nodes.contains(node) == false) {
129+
nodeHasPassedWatermark.remove(node);
109130
}
110-
ClusterState state = clusterStateSupplier.get();
111-
Set<String> indicesToMarkReadOnly = new HashSet<>();
112-
for (ObjectObjectCursor<String, DiskUsage> entry : usages) {
113-
String node = entry.key;
114-
DiskUsage usage = entry.value;
115-
warnAboutDiskIfNeeded(usage);
116-
if (usage.getFreeBytes() < diskThresholdSettings.getFreeBytesThresholdFloodStage().getBytes() ||
117-
usage.getFreeDiskAsPercentage() < diskThresholdSettings.getFreeDiskThresholdFloodStage()) {
118-
RoutingNode routingNode = state.getRoutingNodes().node(node);
119-
if (routingNode != null) { // this might happen if we haven't got the full cluster-state yet?!
120-
for (ShardRouting routing : routingNode) {
121-
indicesToMarkReadOnly.add(routing.index().getName());
122-
}
131+
}
132+
final ClusterState state = clusterStateSupplier.get();
133+
final Set<String> indicesToMarkReadOnly = new HashSet<>();
134+
135+
for (final ObjectObjectCursor<String, DiskUsage> entry : usages) {
136+
final String node = entry.key;
137+
final DiskUsage usage = entry.value;
138+
warnAboutDiskIfNeeded(usage);
139+
if (usage.getFreeBytes() < diskThresholdSettings.getFreeBytesThresholdFloodStage().getBytes() ||
140+
usage.getFreeDiskAsPercentage() < diskThresholdSettings.getFreeDiskThresholdFloodStage()) {
141+
final RoutingNode routingNode = state.getRoutingNodes().node(node);
142+
if (routingNode != null) { // this might happen if we haven't got the full cluster-state yet?!
143+
for (ShardRouting routing : routingNode) {
144+
indicesToMarkReadOnly.add(routing.index().getName());
123145
}
124-
} else if (usage.getFreeBytes() < diskThresholdSettings.getFreeBytesThresholdHigh().getBytes() ||
125-
usage.getFreeDiskAsPercentage() < diskThresholdSettings.getFreeDiskThresholdHigh()) {
126-
if ((System.nanoTime() - lastRunNS) > diskThresholdSettings.getRerouteInterval().nanos()) {
127-
lastRunNS = System.nanoTime();
146+
}
147+
} else if (usage.getFreeBytes() < diskThresholdSettings.getFreeBytesThresholdHigh().getBytes() ||
148+
usage.getFreeDiskAsPercentage() < diskThresholdSettings.getFreeDiskThresholdHigh()) {
149+
if (lastRunTimeMillis < currentTimeMillis - diskThresholdSettings.getRerouteInterval().millis()) {
150+
reroute = true;
151+
explanation = "high disk watermark exceeded on one or more nodes";
152+
} else {
153+
logger.debug("high disk watermark exceeded on {} but an automatic reroute has occurred " +
154+
"in the last [{}], skipping reroute",
155+
node, diskThresholdSettings.getRerouteInterval());
156+
}
157+
nodeHasPassedWatermark.add(node);
158+
} else if (usage.getFreeBytes() < diskThresholdSettings.getFreeBytesThresholdLow().getBytes() ||
159+
usage.getFreeDiskAsPercentage() < diskThresholdSettings.getFreeDiskThresholdLow()) {
160+
nodeHasPassedWatermark.add(node);
161+
} else {
162+
if (nodeHasPassedWatermark.contains(node)) {
163+
// The node has previously been over the high or
164+
// low watermark, but is no longer, so we should
165+
// reroute so any unassigned shards can be allocated
166+
// if they are able to be
167+
if (lastRunTimeMillis < currentTimeMillis - diskThresholdSettings.getRerouteInterval().millis()) {
128168
reroute = true;
129-
explanation = "high disk watermark exceeded on one or more nodes";
169+
explanation = "one or more nodes has gone under the high or low watermark";
170+
nodeHasPassedWatermark.remove(node);
130171
} else {
131-
logger.debug("high disk watermark exceeded on {} but an automatic reroute has occurred " +
172+
logger.debug("{} has gone below a disk threshold, but an automatic reroute has occurred " +
132173
"in the last [{}], skipping reroute",
133174
node, diskThresholdSettings.getRerouteInterval());
134175
}
135-
nodeHasPassedWatermark.add(node);
136-
} else if (usage.getFreeBytes() < diskThresholdSettings.getFreeBytesThresholdLow().getBytes() ||
137-
usage.getFreeDiskAsPercentage() < diskThresholdSettings.getFreeDiskThresholdLow()) {
138-
nodeHasPassedWatermark.add(node);
139-
} else {
140-
if (nodeHasPassedWatermark.contains(node)) {
141-
// The node has previously been over the high or
142-
// low watermark, but is no longer, so we should
143-
// reroute so any unassigned shards can be allocated
144-
// if they are able to be
145-
if ((System.nanoTime() - lastRunNS) > diskThresholdSettings.getRerouteInterval().nanos()) {
146-
lastRunNS = System.nanoTime();
147-
reroute = true;
148-
explanation = "one or more nodes has gone under the high or low watermark";
149-
nodeHasPassedWatermark.remove(node);
150-
} else {
151-
logger.debug("{} has gone below a disk threshold, but an automatic reroute has occurred " +
152-
"in the last [{}], skipping reroute",
153-
node, diskThresholdSettings.getRerouteInterval());
154-
}
155-
}
156176
}
157177
}
158-
if (reroute) {
159-
logger.info("rerouting shards: [{}]", explanation);
160-
reroute();
161-
}
162-
indicesToMarkReadOnly.removeIf(index -> state.getBlocks().indexBlocked(ClusterBlockLevel.WRITE, index));
163-
if (indicesToMarkReadOnly.isEmpty() == false) {
164-
markIndicesReadOnly(indicesToMarkReadOnly);
165-
}
178+
}
179+
180+
final ActionListener<Void> listener = new GroupedActionListener<>(ActionListener.wrap(this::checkFinished), 2);
181+
182+
if (reroute) {
183+
logger.info("rerouting shards: [{}]", explanation);
184+
reroute(ActionListener.wrap(r -> {
185+
lastRunTimeMillis = currentTimeMillisSupplier.getAsLong();
186+
listener.onResponse(r);
187+
}, listener::onFailure));
188+
} else {
189+
listener.onResponse(null);
190+
}
191+
indicesToMarkReadOnly.removeIf(index -> state.getBlocks().indexBlocked(ClusterBlockLevel.WRITE, index));
192+
if (indicesToMarkReadOnly.isEmpty() == false) {
193+
markIndicesReadOnly(indicesToMarkReadOnly, listener);
194+
} else {
195+
listener.onResponse(null);
166196
}
167197
}
168198

169-
protected void markIndicesReadOnly(Set<String> indicesToMarkReadOnly) {
199+
protected void markIndicesReadOnly(Set<String> indicesToMarkReadOnly, ActionListener<Void> listener) {
170200
// set read-only block but don't block on the response
171-
client.admin().indices().prepareUpdateSettings(indicesToMarkReadOnly.toArray(Strings.EMPTY_ARRAY)).
172-
setSettings(Settings.builder().put(IndexMetaData.SETTING_READ_ONLY_ALLOW_DELETE, true).build()).execute();
201+
client.admin().indices().prepareUpdateSettings(indicesToMarkReadOnly.toArray(Strings.EMPTY_ARRAY))
202+
.setSettings(Settings.builder().put(IndexMetaData.SETTING_READ_ONLY_ALLOW_DELETE, true).build())
203+
.execute(ActionListener.map(listener, r -> null));
173204
}
174205

175-
protected void reroute() {
206+
protected void reroute(ActionListener<Void> listener) {
176207
// Execute an empty reroute, but don't block on the response
177-
client.admin().cluster().prepareReroute().execute();
208+
client.admin().cluster().prepareReroute().execute(ActionListener.map(listener, r -> null));
178209
}
179210
}

server/src/main/java/org/elasticsearch/node/Node.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -368,7 +368,7 @@ protected Node(
368368
final IngestService ingestService = new IngestService(clusterService, threadPool, this.environment,
369369
scriptModule.getScriptService(), analysisModule.getAnalysisRegistry(), pluginsService.filterPlugins(IngestPlugin.class));
370370
final DiskThresholdMonitor listener = new DiskThresholdMonitor(settings, clusterService::state,
371-
clusterService.getClusterSettings(), client);
371+
clusterService.getClusterSettings(), client, threadPool::relativeTimeInMillis);
372372
final ClusterInfoService clusterInfoService = newClusterInfoService(settings, clusterService, threadPool, client,
373373
listener::onNewInfo);
374374
final UsageService usageService = new UsageService();

server/src/test/java/org/elasticsearch/cluster/DiskUsageTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -119,7 +119,7 @@ public void testFillShardLevelInfo() {
119119
ImmutableOpenMap.Builder<String, Long> shardSizes = ImmutableOpenMap.builder();
120120
ImmutableOpenMap.Builder<ShardRouting, String> routingToPath = ImmutableOpenMap.builder();
121121
ClusterState state = ClusterState.builder(new ClusterName("blarg")).version(0).build();
122-
InternalClusterInfoService.buildShardLevelInfo(logger, stats, shardSizes, routingToPath, state);
122+
InternalClusterInfoService.buildShardLevelInfo(logger, stats, shardSizes, routingToPath);
123123
assertEquals(2, shardSizes.size());
124124
assertTrue(shardSizes.containsKey(ClusterInfo.shardIdentifierFromRouting(test_0)));
125125
assertTrue(shardSizes.containsKey(ClusterInfo.shardIdentifierFromRouting(test_1)));

0 commit comments

Comments
 (0)