Skip to content

Commit 448acea

Browse files
authored
Avoid parallel reroutes in DiskThresholdMonitor (#43381)
Today the `DiskThresholdMonitor` limits the frequency with which it submits reroute tasks, but it might still submit these tasks faster than the master can process them if, for instance, each reroute takes over 60 seconds. This causes a problem since the reroute task runs with priority `IMMEDIATE` and is always scheduled when there is a node over the high watermark, so this can starve any other pending tasks on the master. This change avoids further updates from the monitor while its last task(s) are still in progress, and it measures the time of each update from the completion time of the reroute task rather than its start time, to allow a larger window for other tasks to run. It also now makes use of the `RoutingService` to submit the reroute task, in order to batch this task with any other pending reroutes. It enhances the `RoutingService` to notify its listeners on completion. Fixes #40174 Relates #42559
1 parent b76d314 commit 448acea

File tree

18 files changed

+492
-143
lines changed

18 files changed

+492
-143
lines changed

server/src/main/java/org/elasticsearch/cluster/InternalClusterInfoService.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -131,13 +131,13 @@ public void onMaster() {
131131
logger.trace("I have been elected master, scheduling a ClusterInfoUpdateJob");
132132
}
133133

134-
// Submit a job that will start after DEFAULT_STARTING_INTERVAL, and reschedule itself after running
134+
// Submit a job that will reschedule itself after running
135135
threadPool.scheduleUnlessShuttingDown(updateFrequency, executorName(), new SubmitReschedulingClusterInfoUpdatedJob());
136136

137137
try {
138138
if (clusterService.state().getNodes().getDataNodes().size() > 1) {
139139
// Submit an info update job to be run immediately
140-
threadPool.executor(executorName()).execute(() -> maybeRefresh());
140+
threadPool.executor(executorName()).execute(this::maybeRefresh);
141141
}
142142
} catch (EsRejectedExecutionException ex) {
143143
logger.debug("Couldn't schedule cluster info update task - node might be shutting down", ex);
@@ -173,7 +173,7 @@ public void clusterChanged(ClusterChangedEvent event) {
173173
if (logger.isDebugEnabled()) {
174174
logger.debug("data node was added, retrieving new cluster info");
175175
}
176-
threadPool.executor(executorName()).execute(() -> maybeRefresh());
176+
threadPool.executor(executorName()).execute(this::maybeRefresh);
177177
}
178178

179179
if (this.isMaster && event.nodesRemoved()) {
@@ -316,7 +316,7 @@ public void onResponse(IndicesStatsResponse indicesStatsResponse) {
316316
ShardStats[] stats = indicesStatsResponse.getShards();
317317
ImmutableOpenMap.Builder<String, Long> newShardSizes = ImmutableOpenMap.builder();
318318
ImmutableOpenMap.Builder<ShardRouting, String> newShardRoutingToDataPath = ImmutableOpenMap.builder();
319-
buildShardLevelInfo(logger, stats, newShardSizes, newShardRoutingToDataPath, clusterService.state());
319+
buildShardLevelInfo(logger, stats, newShardSizes, newShardRoutingToDataPath);
320320
shardSizes = newShardSizes.build();
321321
shardRoutingToDataPath = newShardRoutingToDataPath.build();
322322
}
@@ -365,7 +365,7 @@ public void onFailure(Exception e) {
365365
}
366366

367367
static void buildShardLevelInfo(Logger logger, ShardStats[] stats, ImmutableOpenMap.Builder<String, Long> newShardSizes,
368-
ImmutableOpenMap.Builder<ShardRouting, String> newShardRoutingToDataPath, ClusterState state) {
368+
ImmutableOpenMap.Builder<ShardRouting, String> newShardRoutingToDataPath) {
369369
for (ShardStats s : stats) {
370370
newShardRoutingToDataPath.put(s.getShardRouting(), s.getDataPath());
371371
long size = s.getStats().getStore().sizeInBytes();

server/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -379,7 +379,9 @@ public void clusterStatePublished(ClusterChangedEvent clusterChangedEvent) {
379379
if (logger.isTraceEnabled()) {
380380
logger.trace("{}, scheduling a reroute", reason);
381381
}
382-
routingService.reroute(reason);
382+
routingService.reroute(reason, ActionListener.wrap(
383+
r -> logger.trace("{}, reroute completed", reason),
384+
e -> logger.debug(new ParameterizedMessage("{}, reroute failed", reason), e)));
383385
}
384386
}
385387
}

server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,6 @@
8080
import java.util.Set;
8181
import java.util.concurrent.atomic.AtomicBoolean;
8282
import java.util.function.BiConsumer;
83-
import java.util.function.Consumer;
8483
import java.util.function.Supplier;
8584
import java.util.stream.Collectors;
8685
import java.util.stream.Stream;
@@ -146,13 +145,14 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
146145
* @param nodeName The name of the node, used to name the {@link java.util.concurrent.ExecutorService} of the {@link SeedHostsResolver}.
147146
* @param onJoinValidators A collection of join validators to restrict which nodes may join the cluster.
148147
* @param reroute A callback to call when the membership of the cluster has changed, to recalculate the assignment of shards. In
149-
* production code this calls {@link org.elasticsearch.cluster.routing.RoutingService#reroute(String)}.
148+
* production code this calls
149+
* {@link org.elasticsearch.cluster.routing.RoutingService#reroute(String, ActionListener)}.
150150
*/
151151
public Coordinator(String nodeName, Settings settings, ClusterSettings clusterSettings, TransportService transportService,
152152
NamedWriteableRegistry namedWriteableRegistry, AllocationService allocationService, MasterService masterService,
153153
Supplier<CoordinationState.PersistedState> persistedStateSupplier, SeedHostsProvider seedHostsProvider,
154154
ClusterApplier clusterApplier, Collection<BiConsumer<DiscoveryNode, ClusterState>> onJoinValidators, Random random,
155-
Consumer<String> reroute, ElectionStrategy electionStrategy) {
155+
BiConsumer<String, ActionListener<Void>> reroute, ElectionStrategy electionStrategy) {
156156
this.settings = settings;
157157
this.transportService = transportService;
158158
this.masterService = masterService;

server/src/main/java/org/elasticsearch/cluster/coordination/JoinHelper.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,6 @@
6262
import java.util.Set;
6363
import java.util.concurrent.atomic.AtomicReference;
6464
import java.util.function.BiConsumer;
65-
import java.util.function.Consumer;
6665
import java.util.function.Function;
6766
import java.util.function.LongSupplier;
6867
import java.util.function.Supplier;
@@ -92,7 +91,7 @@ public class JoinHelper {
9291
JoinHelper(Settings settings, AllocationService allocationService, MasterService masterService,
9392
TransportService transportService, LongSupplier currentTermSupplier, Supplier<ClusterState> currentStateSupplier,
9493
BiConsumer<JoinRequest, JoinCallback> joinHandler, Function<StartJoinRequest, Join> joinLeaderInTerm,
95-
Collection<BiConsumer<DiscoveryNode, ClusterState>> joinValidators, Consumer<String> reroute) {
94+
Collection<BiConsumer<DiscoveryNode, ClusterState>> joinValidators, BiConsumer<String, ActionListener<Void>> reroute) {
9695
this.masterService = masterService;
9796
this.transportService = transportService;
9897
this.joinTimeout = JOIN_TIMEOUT_SETTING.get(settings);

server/src/main/java/org/elasticsearch/cluster/coordination/JoinTaskExecutor.java

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121
import org.apache.logging.log4j.Logger;
2222
import org.elasticsearch.Version;
23+
import org.elasticsearch.action.ActionListener;
2324
import org.elasticsearch.cluster.ClusterState;
2425
import org.elasticsearch.cluster.ClusterStateTaskExecutor;
2526
import org.elasticsearch.cluster.NotMasterException;
@@ -36,7 +37,6 @@
3637
import java.util.Collections;
3738
import java.util.List;
3839
import java.util.function.BiConsumer;
39-
import java.util.function.Consumer;
4040

4141
import static org.elasticsearch.gateway.GatewayService.STATE_NOT_RECOVERED_BLOCK;
4242

@@ -45,7 +45,7 @@ public class JoinTaskExecutor implements ClusterStateTaskExecutor<JoinTaskExecut
4545
private final AllocationService allocationService;
4646

4747
private final Logger logger;
48-
private final Consumer<String> reroute;
48+
private final BiConsumer<String, ActionListener<Void>> reroute;
4949

5050
public static class Task {
5151

@@ -82,7 +82,7 @@ public boolean isFinishElectionTask() {
8282
private static final String FINISH_ELECTION_TASK_REASON = "_FINISH_ELECTION_";
8383
}
8484

85-
public JoinTaskExecutor(AllocationService allocationService, Logger logger, Consumer<String> reroute) {
85+
public JoinTaskExecutor(AllocationService allocationService, Logger logger, BiConsumer<String, ActionListener<Void>> reroute) {
8686
this.allocationService = allocationService;
8787
this.logger = logger;
8888
this.reroute = reroute;
@@ -149,7 +149,10 @@ public ClusterTasksResult<Task> execute(ClusterState currentState, List<Task> jo
149149
results.success(joinTask);
150150
}
151151
if (nodesChanged) {
152-
reroute.accept("post-join reroute");
152+
reroute.accept("post-join reroute", ActionListener.wrap(
153+
r -> logger.trace("post-join reroute completed"),
154+
e -> logger.debug("post-join reroute failed", e)));
155+
153156
return results.build(allocationService.adaptAutoExpandReplicas(newState.nodes(nodesBuilder).build()));
154157
} else {
155158
// we must return a new cluster state instance to force publishing. This is important

server/src/main/java/org/elasticsearch/cluster/routing/RoutingService.java

Lines changed: 58 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -22,16 +22,20 @@
2222
import org.apache.logging.log4j.LogManager;
2323
import org.apache.logging.log4j.Logger;
2424
import org.apache.logging.log4j.message.ParameterizedMessage;
25+
import org.elasticsearch.ElasticsearchException;
26+
import org.elasticsearch.action.ActionListener;
27+
import org.elasticsearch.action.support.PlainListenableActionFuture;
2528
import org.elasticsearch.cluster.ClusterChangedEvent;
2629
import org.elasticsearch.cluster.ClusterState;
2730
import org.elasticsearch.cluster.ClusterStateUpdateTask;
28-
import org.elasticsearch.cluster.routing.allocation.AllocationService;
31+
import org.elasticsearch.cluster.NotMasterException;
2932
import org.elasticsearch.cluster.service.ClusterService;
33+
import org.elasticsearch.common.Nullable;
3034
import org.elasticsearch.common.Priority;
3135
import org.elasticsearch.common.component.AbstractLifecycleComponent;
3236
import org.elasticsearch.common.inject.Inject;
3337

34-
import java.util.concurrent.atomic.AtomicBoolean;
38+
import java.util.function.BiFunction;
3539

3640
/**
3741
* A {@link RoutingService} listens to clusters state. When this service
@@ -51,14 +55,16 @@ public class RoutingService extends AbstractLifecycleComponent {
5155
private static final String CLUSTER_UPDATE_TASK_SOURCE = "cluster_reroute";
5256

5357
private final ClusterService clusterService;
54-
private final AllocationService allocationService;
58+
private final BiFunction<ClusterState, String, ClusterState> reroute;
5559

56-
private AtomicBoolean rerouting = new AtomicBoolean();
60+
private final Object mutex = new Object();
61+
@Nullable // null if no reroute is currently pending
62+
private PlainListenableActionFuture<Void> pendingRerouteListeners;
5763

5864
@Inject
59-
public RoutingService(ClusterService clusterService, AllocationService allocationService) {
65+
public RoutingService(ClusterService clusterService, BiFunction<ClusterState, String, ClusterState> reroute) {
6066
this.clusterService = clusterService;
61-
this.allocationService = allocationService;
67+
this.reroute = reroute;
6268
}
6369

6470
@Override
@@ -76,47 +82,78 @@ protected void doClose() {
7682
/**
7783
* Initiates a reroute.
7884
*/
79-
public final void reroute(String reason) {
80-
try {
81-
if (lifecycle.stopped()) {
82-
return;
83-
}
84-
if (rerouting.compareAndSet(false, true) == false) {
85-
logger.trace("already has pending reroute, ignoring {}", reason);
85+
public final void reroute(String reason, ActionListener<Void> listener) {
86+
if (lifecycle.started() == false) {
87+
listener.onFailure(new IllegalStateException(
88+
"rejecting delayed reroute [" + reason + "] in state [" + lifecycleState() + "]"));
89+
return;
90+
}
91+
final PlainListenableActionFuture<Void> currentListeners;
92+
synchronized (mutex) {
93+
if (pendingRerouteListeners != null) {
94+
logger.trace("already has pending reroute, adding [{}] to batch", reason);
95+
pendingRerouteListeners.addListener(listener);
8696
return;
8797
}
88-
logger.trace("rerouting {}", reason);
98+
currentListeners = PlainListenableActionFuture.newListenableFuture();
99+
currentListeners.addListener(listener);
100+
pendingRerouteListeners = currentListeners;
101+
}
102+
logger.trace("rerouting [{}]", reason);
103+
try {
89104
clusterService.submitStateUpdateTask(CLUSTER_UPDATE_TASK_SOURCE + "(" + reason + ")",
90105
new ClusterStateUpdateTask(Priority.HIGH) {
91106
@Override
92107
public ClusterState execute(ClusterState currentState) {
93-
rerouting.set(false);
94-
return allocationService.reroute(currentState, reason);
108+
synchronized (mutex) {
109+
assert pendingRerouteListeners == currentListeners;
110+
pendingRerouteListeners = null;
111+
}
112+
return reroute.apply(currentState, reason);
95113
}
96114

97115
@Override
98116
public void onNoLongerMaster(String source) {
99-
rerouting.set(false);
100-
// no biggie
117+
synchronized (mutex) {
118+
if (pendingRerouteListeners == currentListeners) {
119+
pendingRerouteListeners = null;
120+
}
121+
}
122+
currentListeners.onFailure(new NotMasterException("delayed reroute [" + reason + "] cancelled"));
123+
// no big deal, the new master will reroute again
101124
}
102125

103126
@Override
104127
public void onFailure(String source, Exception e) {
105-
rerouting.set(false);
106-
ClusterState state = clusterService.state();
128+
synchronized (mutex) {
129+
if (pendingRerouteListeners == currentListeners) {
130+
pendingRerouteListeners = null;
131+
}
132+
}
133+
final ClusterState state = clusterService.state();
107134
if (logger.isTraceEnabled()) {
108135
logger.error(() -> new ParameterizedMessage("unexpected failure during [{}], current state:\n{}",
109136
source, state), e);
110137
} else {
111138
logger.error(() -> new ParameterizedMessage("unexpected failure during [{}], current state version [{}]",
112139
source, state.version()), e);
113140
}
141+
currentListeners.onFailure(new ElasticsearchException("delayed reroute [" + reason + "] failed", e));
142+
}
143+
144+
@Override
145+
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
146+
currentListeners.onResponse(null);
114147
}
115148
});
116149
} catch (Exception e) {
117-
rerouting.set(false);
150+
synchronized (mutex) {
151+
assert pendingRerouteListeners == currentListeners;
152+
pendingRerouteListeners = null;
153+
}
118154
ClusterState state = clusterService.state();
119155
logger.warn(() -> new ParameterizedMessage("failed to reroute routing table, current state:\n{}", state), e);
156+
currentListeners.onFailure(new ElasticsearchException("delayed reroute [" + reason + "] could not be submitted", e));
120157
}
121158
}
122159
}

0 commit comments

Comments
 (0)