Skip to content

Commit fca7a19

Browse files
committed
Avoid parallel reroutes in DiskThresholdMonitor (#43381)
Today the `DiskThresholdMonitor` limits the frequency with which it submits reroute tasks, but it might still submit these tasks faster than the master can process them if, for instance, each reroute takes over 60 seconds. This causes a problem since the reroute task runs with priority `IMMEDIATE` and is always scheduled when there is a node over the high watermark, so this can starve any other pending tasks on the master. This change avoids further updates from the monitor while its last task(s) are still in progress, and it measures the time of each update from the completion time of the reroute task rather than its start time, to allow a larger window for other tasks to run. It also now makes use of the `RoutingService` to submit the reroute task, in order to batch this task with any other pending reroutes. It enhances the `RoutingService` to notify its listeners on completion. Fixes #40174 Relates #42559
1 parent 55b3ec8 commit fca7a19

File tree

22 files changed

+502
-151
lines changed

22 files changed

+502
-151
lines changed

server/src/main/java/org/elasticsearch/cluster/InternalClusterInfoService.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -131,13 +131,13 @@ public void onMaster() {
131131
logger.trace("I have been elected master, scheduling a ClusterInfoUpdateJob");
132132
}
133133

134-
// Submit a job that will start after DEFAULT_STARTING_INTERVAL, and reschedule itself after running
134+
// Submit a job that will reschedule itself after running
135135
threadPool.scheduleUnlessShuttingDown(updateFrequency, executorName(), new SubmitReschedulingClusterInfoUpdatedJob());
136136

137137
try {
138138
if (clusterService.state().getNodes().getDataNodes().size() > 1) {
139139
// Submit an info update job to be run immediately
140-
threadPool.executor(executorName()).execute(() -> maybeRefresh());
140+
threadPool.executor(executorName()).execute(this::maybeRefresh);
141141
}
142142
} catch (EsRejectedExecutionException ex) {
143143
logger.debug("Couldn't schedule cluster info update task - node might be shutting down", ex);
@@ -173,7 +173,7 @@ public void clusterChanged(ClusterChangedEvent event) {
173173
if (logger.isDebugEnabled()) {
174174
logger.debug("data node was added, retrieving new cluster info");
175175
}
176-
threadPool.executor(executorName()).execute(() -> maybeRefresh());
176+
threadPool.executor(executorName()).execute(this::maybeRefresh);
177177
}
178178

179179
if (this.isMaster && event.nodesRemoved()) {
@@ -316,7 +316,7 @@ public void onResponse(IndicesStatsResponse indicesStatsResponse) {
316316
ShardStats[] stats = indicesStatsResponse.getShards();
317317
ImmutableOpenMap.Builder<String, Long> newShardSizes = ImmutableOpenMap.builder();
318318
ImmutableOpenMap.Builder<ShardRouting, String> newShardRoutingToDataPath = ImmutableOpenMap.builder();
319-
buildShardLevelInfo(logger, stats, newShardSizes, newShardRoutingToDataPath, clusterService.state());
319+
buildShardLevelInfo(logger, stats, newShardSizes, newShardRoutingToDataPath);
320320
shardSizes = newShardSizes.build();
321321
shardRoutingToDataPath = newShardRoutingToDataPath.build();
322322
}
@@ -365,7 +365,7 @@ public void onFailure(Exception e) {
365365
}
366366

367367
static void buildShardLevelInfo(Logger logger, ShardStats[] stats, ImmutableOpenMap.Builder<String, Long> newShardSizes,
368-
ImmutableOpenMap.Builder<ShardRouting, String> newShardRoutingToDataPath, ClusterState state) {
368+
ImmutableOpenMap.Builder<ShardRouting, String> newShardRoutingToDataPath) {
369369
for (ShardStats s : stats) {
370370
newShardRoutingToDataPath.put(s.getShardRouting(), s.getDataPath());
371371
long size = s.getStats().getStore().sizeInBytes();

server/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -382,7 +382,9 @@ public void clusterStatePublished(ClusterChangedEvent clusterChangedEvent) {
382382
if (logger.isTraceEnabled()) {
383383
logger.trace("{}, scheduling a reroute", reason);
384384
}
385-
routingService.reroute(reason);
385+
routingService.reroute(reason, ActionListener.wrap(
386+
r -> logger.trace("{}, reroute completed", reason),
387+
e -> logger.debug(new ParameterizedMessage("{}, reroute failed", reason), e)));
386388
}
387389
}
388390
}

server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,6 @@
8383
import java.util.Set;
8484
import java.util.concurrent.atomic.AtomicBoolean;
8585
import java.util.function.BiConsumer;
86-
import java.util.function.Consumer;
8786
import java.util.function.Supplier;
8887
import java.util.stream.Collectors;
8988
import java.util.stream.Stream;
@@ -152,13 +151,14 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
152151
* @param nodeName The name of the node, used to name the {@link java.util.concurrent.ExecutorService} of the {@link SeedHostsResolver}.
153152
* @param onJoinValidators A collection of join validators to restrict which nodes may join the cluster.
154153
* @param reroute A callback to call when the membership of the cluster has changed, to recalculate the assignment of shards. In
155-
* production code this calls {@link org.elasticsearch.cluster.routing.RoutingService#reroute(String)}.
154+
* production code this calls
155+
* {@link org.elasticsearch.cluster.routing.RoutingService#reroute(String, ActionListener)}.
156156
*/
157157
public Coordinator(String nodeName, Settings settings, ClusterSettings clusterSettings, TransportService transportService,
158158
NamedWriteableRegistry namedWriteableRegistry, AllocationService allocationService, MasterService masterService,
159159
Supplier<CoordinationState.PersistedState> persistedStateSupplier, SeedHostsProvider seedHostsProvider,
160160
ClusterApplier clusterApplier, Collection<BiConsumer<DiscoveryNode, ClusterState>> onJoinValidators, Random random,
161-
Consumer<String> reroute, ElectionStrategy electionStrategy) {
161+
BiConsumer<String, ActionListener<Void>> reroute, ElectionStrategy electionStrategy) {
162162
this.settings = settings;
163163
this.transportService = transportService;
164164
this.masterService = masterService;

server/src/main/java/org/elasticsearch/cluster/coordination/JoinHelper.java

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,6 @@
6464
import java.util.Set;
6565
import java.util.concurrent.atomic.AtomicReference;
6666
import java.util.function.BiConsumer;
67-
import java.util.function.Consumer;
6867
import java.util.function.Function;
6968
import java.util.function.LongSupplier;
7069
import java.util.function.Supplier;
@@ -91,10 +90,10 @@ public class JoinHelper {
9190

9291
private AtomicReference<FailedJoinAttempt> lastFailedJoinAttempt = new AtomicReference<>();
9392

94-
public JoinHelper(Settings settings, AllocationService allocationService, MasterService masterService,
95-
TransportService transportService, LongSupplier currentTermSupplier, Supplier<ClusterState> currentStateSupplier,
96-
BiConsumer<JoinRequest, JoinCallback> joinHandler, Function<StartJoinRequest, Join> joinLeaderInTerm,
97-
Collection<BiConsumer<DiscoveryNode, ClusterState>> joinValidators, Consumer<String> reroute) {
93+
JoinHelper(Settings settings, AllocationService allocationService, MasterService masterService,
94+
TransportService transportService, LongSupplier currentTermSupplier, Supplier<ClusterState> currentStateSupplier,
95+
BiConsumer<JoinRequest, JoinCallback> joinHandler, Function<StartJoinRequest, Join> joinLeaderInTerm,
96+
Collection<BiConsumer<DiscoveryNode, ClusterState>> joinValidators, BiConsumer<String, ActionListener<Void>> reroute) {
9897
this.masterService = masterService;
9998
this.transportService = transportService;
10099
this.joinTimeout = JOIN_TIMEOUT_SETTING.get(settings);

server/src/main/java/org/elasticsearch/cluster/coordination/JoinTaskExecutor.java

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121
import org.apache.logging.log4j.Logger;
2222
import org.elasticsearch.Version;
23+
import org.elasticsearch.action.ActionListener;
2324
import org.elasticsearch.cluster.ClusterState;
2425
import org.elasticsearch.cluster.ClusterStateTaskExecutor;
2526
import org.elasticsearch.cluster.NotMasterException;
@@ -38,7 +39,6 @@
3839
import java.util.Collections;
3940
import java.util.List;
4041
import java.util.function.BiConsumer;
41-
import java.util.function.Consumer;
4242

4343
import static org.elasticsearch.gateway.GatewayService.STATE_NOT_RECOVERED_BLOCK;
4444

@@ -47,7 +47,7 @@ public class JoinTaskExecutor implements ClusterStateTaskExecutor<JoinTaskExecut
4747
private final AllocationService allocationService;
4848

4949
private final Logger logger;
50-
private final Consumer<String> reroute;
50+
private final BiConsumer<String, ActionListener<Void>> reroute;
5151

5252
private final int minimumMasterNodesOnLocalNode;
5353

@@ -86,7 +86,8 @@ public boolean isFinishElectionTask() {
8686
private static final String FINISH_ELECTION_TASK_REASON = "_FINISH_ELECTION_";
8787
}
8888

89-
public JoinTaskExecutor(Settings settings, AllocationService allocationService, Logger logger, Consumer<String> reroute) {
89+
public JoinTaskExecutor(Settings settings, AllocationService allocationService, Logger logger,
90+
BiConsumer<String, ActionListener<Void>> reroute) {
9091
this.allocationService = allocationService;
9192
this.logger = logger;
9293
minimumMasterNodesOnLocalNode = ElectMasterService.DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING.get(settings);
@@ -154,7 +155,10 @@ public ClusterTasksResult<Task> execute(ClusterState currentState, List<Task> jo
154155
results.success(joinTask);
155156
}
156157
if (nodesChanged) {
157-
reroute.accept("post-join reroute");
158+
reroute.accept("post-join reroute", ActionListener.wrap(
159+
r -> logger.trace("post-join reroute completed"),
160+
e -> logger.debug("post-join reroute failed", e)));
161+
158162
return results.build(allocationService.adaptAutoExpandReplicas(newState.nodes(nodesBuilder).build()));
159163
} else {
160164
// we must return a new cluster state instance to force publishing. This is important

server/src/main/java/org/elasticsearch/cluster/routing/RoutingService.java

Lines changed: 58 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -22,16 +22,20 @@
2222
import org.apache.logging.log4j.LogManager;
2323
import org.apache.logging.log4j.Logger;
2424
import org.apache.logging.log4j.message.ParameterizedMessage;
25+
import org.elasticsearch.ElasticsearchException;
26+
import org.elasticsearch.action.ActionListener;
27+
import org.elasticsearch.action.support.PlainListenableActionFuture;
2528
import org.elasticsearch.cluster.ClusterChangedEvent;
2629
import org.elasticsearch.cluster.ClusterState;
2730
import org.elasticsearch.cluster.ClusterStateUpdateTask;
28-
import org.elasticsearch.cluster.routing.allocation.AllocationService;
31+
import org.elasticsearch.cluster.NotMasterException;
2932
import org.elasticsearch.cluster.service.ClusterService;
33+
import org.elasticsearch.common.Nullable;
3034
import org.elasticsearch.common.Priority;
3135
import org.elasticsearch.common.component.AbstractLifecycleComponent;
3236
import org.elasticsearch.common.inject.Inject;
3337

34-
import java.util.concurrent.atomic.AtomicBoolean;
38+
import java.util.function.BiFunction;
3539

3640
/**
3741
* A {@link RoutingService} listens to clusters state. When this service
@@ -51,14 +55,16 @@ public class RoutingService extends AbstractLifecycleComponent {
5155
private static final String CLUSTER_UPDATE_TASK_SOURCE = "cluster_reroute";
5256

5357
private final ClusterService clusterService;
54-
private final AllocationService allocationService;
58+
private final BiFunction<ClusterState, String, ClusterState> reroute;
5559

56-
private AtomicBoolean rerouting = new AtomicBoolean();
60+
private final Object mutex = new Object();
61+
@Nullable // null if no reroute is currently pending
62+
private PlainListenableActionFuture<Void> pendingRerouteListeners;
5763

5864
@Inject
59-
public RoutingService(ClusterService clusterService, AllocationService allocationService) {
65+
public RoutingService(ClusterService clusterService, BiFunction<ClusterState, String, ClusterState> reroute) {
6066
this.clusterService = clusterService;
61-
this.allocationService = allocationService;
67+
this.reroute = reroute;
6268
}
6369

6470
@Override
@@ -76,47 +82,78 @@ protected void doClose() {
7682
/**
7783
* Initiates a reroute.
7884
*/
79-
public final void reroute(String reason) {
80-
try {
81-
if (lifecycle.stopped()) {
82-
return;
83-
}
84-
if (rerouting.compareAndSet(false, true) == false) {
85-
logger.trace("already has pending reroute, ignoring {}", reason);
85+
public final void reroute(String reason, ActionListener<Void> listener) {
86+
if (lifecycle.started() == false) {
87+
listener.onFailure(new IllegalStateException(
88+
"rejecting delayed reroute [" + reason + "] in state [" + lifecycleState() + "]"));
89+
return;
90+
}
91+
final PlainListenableActionFuture<Void> currentListeners;
92+
synchronized (mutex) {
93+
if (pendingRerouteListeners != null) {
94+
logger.trace("already has pending reroute, adding [{}] to batch", reason);
95+
pendingRerouteListeners.addListener(listener);
8696
return;
8797
}
88-
logger.trace("rerouting {}", reason);
98+
currentListeners = PlainListenableActionFuture.newListenableFuture();
99+
currentListeners.addListener(listener);
100+
pendingRerouteListeners = currentListeners;
101+
}
102+
logger.trace("rerouting [{}]", reason);
103+
try {
89104
clusterService.submitStateUpdateTask(CLUSTER_UPDATE_TASK_SOURCE + "(" + reason + ")",
90105
new ClusterStateUpdateTask(Priority.HIGH) {
91106
@Override
92107
public ClusterState execute(ClusterState currentState) {
93-
rerouting.set(false);
94-
return allocationService.reroute(currentState, reason);
108+
synchronized (mutex) {
109+
assert pendingRerouteListeners == currentListeners;
110+
pendingRerouteListeners = null;
111+
}
112+
return reroute.apply(currentState, reason);
95113
}
96114

97115
@Override
98116
public void onNoLongerMaster(String source) {
99-
rerouting.set(false);
100-
// no biggie
117+
synchronized (mutex) {
118+
if (pendingRerouteListeners == currentListeners) {
119+
pendingRerouteListeners = null;
120+
}
121+
}
122+
currentListeners.onFailure(new NotMasterException("delayed reroute [" + reason + "] cancelled"));
123+
// no big deal, the new master will reroute again
101124
}
102125

103126
@Override
104127
public void onFailure(String source, Exception e) {
105-
rerouting.set(false);
106-
ClusterState state = clusterService.state();
128+
synchronized (mutex) {
129+
if (pendingRerouteListeners == currentListeners) {
130+
pendingRerouteListeners = null;
131+
}
132+
}
133+
final ClusterState state = clusterService.state();
107134
if (logger.isTraceEnabled()) {
108135
logger.error(() -> new ParameterizedMessage("unexpected failure during [{}], current state:\n{}",
109136
source, state), e);
110137
} else {
111138
logger.error(() -> new ParameterizedMessage("unexpected failure during [{}], current state version [{}]",
112139
source, state.version()), e);
113140
}
141+
currentListeners.onFailure(new ElasticsearchException("delayed reroute [" + reason + "] failed", e));
142+
}
143+
144+
@Override
145+
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
146+
currentListeners.onResponse(null);
114147
}
115148
});
116149
} catch (Exception e) {
117-
rerouting.set(false);
150+
synchronized (mutex) {
151+
assert pendingRerouteListeners == currentListeners;
152+
pendingRerouteListeners = null;
153+
}
118154
ClusterState state = clusterService.state();
119155
logger.warn(() -> new ParameterizedMessage("failed to reroute routing table, current state:\n{}", state), e);
156+
currentListeners.onFailure(new ElasticsearchException("delayed reroute [" + reason + "] could not be submitted", e));
120157
}
121158
}
122159
}

0 commit comments

Comments
 (0)