Skip to content

Commit ddedf80

Browse files
authored
Defer reroute when nodes join (#42855)
Today the master eagerly reroutes the cluster as part of processing node joins. However, it is not necessary to do this reroute straight away, and it is sometimes preferable to defer it until later. For instance, when the master wins its election it processes joins and performs a reroute, but it would be better to defer the reroute until after the master has become properly established. This change defers this reroute into a separate task, and batches multiple such tasks together.
1 parent 4b44561 commit ddedf80

File tree

14 files changed

+83
-29
lines changed

14 files changed

+83
-29
lines changed

server/src/main/java/org/elasticsearch/cluster/ClusterModule.java

-2
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,6 @@
3333
import org.elasticsearch.cluster.metadata.MetaDataUpdateSettingsService;
3434
import org.elasticsearch.cluster.metadata.RepositoriesMetaData;
3535
import org.elasticsearch.cluster.routing.DelayedAllocationService;
36-
import org.elasticsearch.cluster.routing.RoutingService;
3736
import org.elasticsearch.cluster.routing.allocation.AllocationService;
3837
import org.elasticsearch.cluster.routing.allocation.allocator.BalancedShardsAllocator;
3938
import org.elasticsearch.cluster.routing.allocation.allocator.ShardsAllocator;
@@ -242,7 +241,6 @@ protected void configure() {
242241
bind(MetaDataUpdateSettingsService.class).asEagerSingleton();
243242
bind(MetaDataIndexTemplateService.class).asEagerSingleton();
244243
bind(IndexNameExpressionResolver.class).toInstance(indexNameExpressionResolver);
245-
bind(RoutingService.class).asEagerSingleton();
246244
bind(DelayedAllocationService.class).asEagerSingleton();
247245
bind(ShardStateAction.class).asEagerSingleton();
248246
bind(NodeMappingRefreshAction.class).asEagerSingleton();

server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java

+11-2
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,7 @@
7979
import java.util.Set;
8080
import java.util.concurrent.atomic.AtomicBoolean;
8181
import java.util.function.BiConsumer;
82+
import java.util.function.Consumer;
8283
import java.util.function.Supplier;
8384
import java.util.stream.Collectors;
8485
import java.util.stream.StreamSupport;
@@ -138,18 +139,26 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
138139
private JoinHelper.JoinAccumulator joinAccumulator;
139140
private Optional<CoordinatorPublication> currentPublication = Optional.empty();
140141

142+
/**
143+
* @param nodeName The name of the node, used to name the {@link java.util.concurrent.ExecutorService} of the {@link SeedHostsResolver}.
144+
* @param onJoinValidators A collection of join validators to restrict which nodes may join the cluster.
145+
* @param reroute A callback to call when the membership of the cluster has changed, to recalculate the assignment of shards. In
146+
* production code this calls {@link org.elasticsearch.cluster.routing.RoutingService#reroute(String)}.
147+
*/
141148
public Coordinator(String nodeName, Settings settings, ClusterSettings clusterSettings, TransportService transportService,
142149
NamedWriteableRegistry namedWriteableRegistry, AllocationService allocationService, MasterService masterService,
143150
Supplier<CoordinationState.PersistedState> persistedStateSupplier, SeedHostsProvider seedHostsProvider,
144-
ClusterApplier clusterApplier, Collection<BiConsumer<DiscoveryNode, ClusterState>> onJoinValidators, Random random) {
151+
ClusterApplier clusterApplier, Collection<BiConsumer<DiscoveryNode, ClusterState>> onJoinValidators, Random random,
152+
Consumer<String> reroute) {
145153
this.settings = settings;
146154
this.transportService = transportService;
147155
this.masterService = masterService;
148156
this.allocationService = allocationService;
149157
this.onJoinValidators = JoinTaskExecutor.addBuiltInJoinValidators(onJoinValidators);
150158
this.singleNodeDiscovery = DiscoveryModule.SINGLE_NODE_DISCOVERY_TYPE.equals(DiscoveryModule.DISCOVERY_TYPE_SETTING.get(settings));
151159
this.joinHelper = new JoinHelper(settings, allocationService, masterService, transportService,
152-
this::getCurrentTerm, this::getStateForMasterService, this::handleJoinRequest, this::joinLeaderInTerm, this.onJoinValidators);
160+
this::getCurrentTerm, this::getStateForMasterService, this::handleJoinRequest, this::joinLeaderInTerm, this.onJoinValidators,
161+
reroute);
153162
this.persistedStateSupplier = persistedStateSupplier;
154163
this.noMasterBlockService = new NoMasterBlockService(settings, clusterSettings);
155164
this.lastKnownLeader = Optional.empty();

server/src/main/java/org/elasticsearch/cluster/coordination/JoinHelper.java

+3-2
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@
6262
import java.util.Set;
6363
import java.util.concurrent.atomic.AtomicReference;
6464
import java.util.function.BiConsumer;
65+
import java.util.function.Consumer;
6566
import java.util.function.Function;
6667
import java.util.function.LongSupplier;
6768
import java.util.function.Supplier;
@@ -91,11 +92,11 @@ public class JoinHelper {
9192
JoinHelper(Settings settings, AllocationService allocationService, MasterService masterService,
9293
TransportService transportService, LongSupplier currentTermSupplier, Supplier<ClusterState> currentStateSupplier,
9394
BiConsumer<JoinRequest, JoinCallback> joinHandler, Function<StartJoinRequest, Join> joinLeaderInTerm,
94-
Collection<BiConsumer<DiscoveryNode, ClusterState>> joinValidators) {
95+
Collection<BiConsumer<DiscoveryNode, ClusterState>> joinValidators, Consumer<String> reroute) {
9596
this.masterService = masterService;
9697
this.transportService = transportService;
9798
this.joinTimeout = JOIN_TIMEOUT_SETTING.get(settings);
98-
this.joinTaskExecutor = new JoinTaskExecutor(allocationService, logger) {
99+
this.joinTaskExecutor = new JoinTaskExecutor(allocationService, logger, reroute) {
99100

100101
@Override
101102
public ClusterTasksResult<JoinTaskExecutor.Task> execute(ClusterState currentState, List<JoinTaskExecutor.Task> joiningTasks)

server/src/main/java/org/elasticsearch/cluster/coordination/JoinTaskExecutor.java

+6-3
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import java.util.Collections;
3737
import java.util.List;
3838
import java.util.function.BiConsumer;
39+
import java.util.function.Consumer;
3940

4041
import static org.elasticsearch.gateway.GatewayService.STATE_NOT_RECOVERED_BLOCK;
4142

@@ -44,6 +45,7 @@ public class JoinTaskExecutor implements ClusterStateTaskExecutor<JoinTaskExecut
4445
private final AllocationService allocationService;
4546

4647
private final Logger logger;
48+
private final Consumer<String> reroute;
4749

4850
public static class Task {
4951

@@ -80,9 +82,10 @@ public boolean isFinishElectionTask() {
8082
private static final String FINISH_ELECTION_TASK_REASON = "_FINISH_ELECTION_";
8183
}
8284

83-
public JoinTaskExecutor(AllocationService allocationService, Logger logger) {
85+
public JoinTaskExecutor(AllocationService allocationService, Logger logger, Consumer<String> reroute) {
8486
this.allocationService = allocationService;
8587
this.logger = logger;
88+
this.reroute = reroute;
8689
}
8790

8891
@Override
@@ -146,8 +149,8 @@ public ClusterTasksResult<Task> execute(ClusterState currentState, List<Task> jo
146149
results.success(joinTask);
147150
}
148151
if (nodesChanged) {
149-
newState.nodes(nodesBuilder);
150-
return results.build(allocationService.reroute(newState.build(), "node_join"));
152+
reroute.accept("post-join reroute");
153+
return results.build(allocationService.adaptAutoExpandReplicas(newState.nodes(nodesBuilder).build()));
151154
} else {
152155
// we must return a new cluster state instance to force publishing. This is important
153156
// for the joining node to finalize its join and set us as a master

server/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationService.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -240,7 +240,7 @@ public ClusterState disassociateDeadNodes(ClusterState clusterState, boolean rer
240240
* Checks if the are replicas with the auto-expand feature that need to be adapted.
241241
* Returns an updated cluster state if changes were necessary, or the identical cluster if no changes were required.
242242
*/
243-
private ClusterState adaptAutoExpandReplicas(ClusterState clusterState) {
243+
public ClusterState adaptAutoExpandReplicas(ClusterState clusterState) {
244244
final Map<Integer, List<String>> autoExpandReplicaChanges =
245245
AutoExpandReplicas.getAutoExpandReplicaChanges(clusterState.metaData(), clusterState.nodes());
246246
if (autoExpandReplicaChanges.isEmpty()) {

server/src/main/java/org/elasticsearch/discovery/DiscoveryModule.java

+4-2
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import org.elasticsearch.cluster.ClusterState;
2525
import org.elasticsearch.cluster.coordination.Coordinator;
2626
import org.elasticsearch.cluster.node.DiscoveryNode;
27+
import org.elasticsearch.cluster.routing.RoutingService;
2728
import org.elasticsearch.cluster.routing.allocation.AllocationService;
2829
import org.elasticsearch.cluster.service.ClusterApplier;
2930
import org.elasticsearch.cluster.service.ClusterApplierService;
@@ -79,7 +80,8 @@ public class DiscoveryModule {
7980
public DiscoveryModule(Settings settings, ThreadPool threadPool, TransportService transportService,
8081
NamedWriteableRegistry namedWriteableRegistry, NetworkService networkService, MasterService masterService,
8182
ClusterApplier clusterApplier, ClusterSettings clusterSettings, List<DiscoveryPlugin> plugins,
82-
AllocationService allocationService, Path configFile, GatewayMetaState gatewayMetaState) {
83+
AllocationService allocationService, Path configFile, GatewayMetaState gatewayMetaState,
84+
RoutingService routingService) {
8385
final Collection<BiConsumer<DiscoveryNode, ClusterState>> joinValidators = new ArrayList<>();
8486
final Map<String, Supplier<SeedHostsProvider>> hostProviders = new HashMap<>();
8587
hostProviders.put("settings", () -> new SettingsBasedSeedHostsProvider(settings, transportService));
@@ -129,7 +131,7 @@ public DiscoveryModule(Settings settings, ThreadPool threadPool, TransportServic
129131
settings, clusterSettings,
130132
transportService, namedWriteableRegistry, allocationService, masterService,
131133
() -> gatewayMetaState.getPersistedState(settings, (ClusterApplierService) clusterApplier), seedHostsProvider,
132-
clusterApplier, joinValidators, new Random(Randomness.get().nextLong()));
134+
clusterApplier, joinValidators, new Random(Randomness.get().nextLong()), routingService::reroute);
133135
} else {
134136
throw new IllegalArgumentException("Unknown discovery type [" + discoveryType + "]");
135137
}

server/src/main/java/org/elasticsearch/node/Node.java

+3-1
Original file line numberDiff line numberDiff line change
@@ -495,10 +495,11 @@ protected Node(
495495
RestoreService restoreService = new RestoreService(clusterService, repositoryService, clusterModule.getAllocationService(),
496496
metaDataCreateIndexService, metaDataIndexUpgradeService, clusterService.getClusterSettings());
497497

498+
final RoutingService routingService = new RoutingService(clusterService, clusterModule.getAllocationService());
498499
final DiscoveryModule discoveryModule = new DiscoveryModule(settings, threadPool, transportService, namedWriteableRegistry,
499500
networkService, clusterService.getMasterService(), clusterService.getClusterApplierService(),
500501
clusterService.getClusterSettings(), pluginsService.filterPlugins(DiscoveryPlugin.class),
501-
clusterModule.getAllocationService(), environment.configFile(), gatewayMetaState);
502+
clusterModule.getAllocationService(), environment.configFile(), gatewayMetaState, routingService);
502503
this.nodeService = new NodeService(settings, threadPool, monitorService, discoveryModule.getDiscovery(),
503504
transportService, indicesService, pluginsService, circuitBreakerService, scriptModule.getScriptService(),
504505
httpServerTransport, ingestService, clusterService, settingsModule.getSettingsFilter(), responseCollectorService,
@@ -573,6 +574,7 @@ protected Node(
573574
b.bind(SnapshotShardsService.class).toInstance(snapshotShardsService);
574575
b.bind(TransportNodesSnapshotsStatus.class).toInstance(nodesSnapshotsStatus);
575576
b.bind(RestoreService.class).toInstance(restoreService);
577+
b.bind(RoutingService.class).toInstance(routingService);
576578
}
577579
);
578580
injector = modules.createInjector();

server/src/test/java/org/elasticsearch/cluster/SimpleDataNodesIT.java

+45-9
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,12 @@
2020
package org.elasticsearch.cluster;
2121

2222
import org.elasticsearch.action.UnavailableShardsException;
23+
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
2324
import org.elasticsearch.action.index.IndexResponse;
2425
import org.elasticsearch.action.support.ActiveShardCount;
2526
import org.elasticsearch.client.Requests;
27+
import org.elasticsearch.cluster.health.ClusterHealthStatus;
28+
import org.elasticsearch.cluster.metadata.IndexMetaData;
2629
import org.elasticsearch.common.Priority;
2730
import org.elasticsearch.common.settings.Settings;
2831
import org.elasticsearch.common.xcontent.XContentType;
@@ -35,13 +38,16 @@
3538
import static org.elasticsearch.common.unit.TimeValue.timeValueSeconds;
3639
import static org.hamcrest.Matchers.equalTo;
3740

38-
@ClusterScope(scope= Scope.TEST, numDataNodes =0)
41+
@ClusterScope(scope = Scope.TEST, numDataNodes = 0)
3942
public class SimpleDataNodesIT extends ESIntegTestCase {
40-
public void testDataNodes() throws Exception {
43+
44+
private static final String SOURCE = "{\"type1\":{\"id\":\"1\",\"name\":\"test\"}}";
45+
46+
public void testIndexingBeforeAndAfterDataNodesStart() {
4147
internalCluster().startNode(Settings.builder().put(Node.NODE_DATA_SETTING.getKey(), false).build());
4248
client().admin().indices().create(createIndexRequest("test").waitForActiveShards(ActiveShardCount.NONE)).actionGet();
4349
try {
44-
client().index(Requests.indexRequest("test").type("type1").id("1").source(source("1", "test"), XContentType.JSON)
50+
client().index(Requests.indexRequest("test").id("1").source(SOURCE, XContentType.JSON)
4551
.timeout(timeValueSeconds(1))).actionGet();
4652
fail("no allocation should happen");
4753
} catch (UnavailableShardsException e) {
@@ -54,7 +60,7 @@ public void testDataNodes() throws Exception {
5460

5561
// still no shard should be allocated
5662
try {
57-
client().index(Requests.indexRequest("test").type("type1").id("1").source(source("1", "test"), XContentType.JSON)
63+
client().index(Requests.indexRequest("test").id("1").source(SOURCE, XContentType.JSON)
5864
.timeout(timeValueSeconds(1))).actionGet();
5965
fail("no allocation should happen");
6066
} catch (UnavailableShardsException e) {
@@ -66,13 +72,43 @@ public void testDataNodes() throws Exception {
6672
assertThat(client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForNodes("3")
6773
.setLocal(true).execute().actionGet().isTimedOut(), equalTo(false));
6874

69-
IndexResponse indexResponse = client().index(Requests.indexRequest("test").type("type1").id("1")
70-
.source(source("1", "test"), XContentType.JSON)).actionGet();
75+
IndexResponse indexResponse = client().index(Requests.indexRequest("test").id("1")
76+
.source(SOURCE, XContentType.JSON)).actionGet();
7177
assertThat(indexResponse.getId(), equalTo("1"));
72-
assertThat(indexResponse.getType(), equalTo("type1"));
7378
}
7479

75-
private String source(String id, String nameValue) {
76-
return "{ \"type1\" : { \"id\" : \"" + id + "\", \"name\" : \"" + nameValue + "\" } }";
80+
public void testShardsAllocatedAfterDataNodesStart() {
81+
internalCluster().startNode(Settings.builder().put(Node.NODE_DATA_SETTING.getKey(), false).build());
82+
client().admin().indices().create(createIndexRequest("test")
83+
.settings(Settings.builder().put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0)).waitForActiveShards(ActiveShardCount.NONE))
84+
.actionGet();
85+
final ClusterHealthResponse healthResponse1 = client().admin().cluster().prepareHealth()
86+
.setWaitForEvents(Priority.LANGUID).execute().actionGet();
87+
assertThat(healthResponse1.isTimedOut(), equalTo(false));
88+
assertThat(healthResponse1.getStatus(), equalTo(ClusterHealthStatus.YELLOW)); // TODO should be RED, see #41073
89+
assertThat(healthResponse1.getActiveShards(), equalTo(0));
90+
91+
internalCluster().startNode(Settings.builder().put(Node.NODE_DATA_SETTING.getKey(), true).build());
92+
93+
assertThat(client().admin().cluster().prepareHealth()
94+
.setWaitForEvents(Priority.LANGUID).setWaitForNodes("2").setWaitForGreenStatus().execute().actionGet().isTimedOut(),
95+
equalTo(false));
96+
}
97+
98+
public void testAutoExpandReplicasAdjustedWhenDataNodeJoins() {
99+
internalCluster().startNode(Settings.builder().put(Node.NODE_DATA_SETTING.getKey(), false).build());
100+
client().admin().indices().create(createIndexRequest("test")
101+
.settings(Settings.builder().put(IndexMetaData.SETTING_AUTO_EXPAND_REPLICAS, "0-all"))
102+
.waitForActiveShards(ActiveShardCount.NONE))
103+
.actionGet();
104+
final ClusterHealthResponse healthResponse1 = client().admin().cluster().prepareHealth()
105+
.setWaitForEvents(Priority.LANGUID).execute().actionGet();
106+
assertThat(healthResponse1.isTimedOut(), equalTo(false));
107+
assertThat(healthResponse1.getStatus(), equalTo(ClusterHealthStatus.YELLOW)); // TODO should be RED, see #41073
108+
assertThat(healthResponse1.getActiveShards(), equalTo(0));
109+
110+
internalCluster().startNode();
111+
internalCluster().startNode();
112+
client().admin().cluster().prepareReroute().setRetryFailed(true).get();
77113
}
78114
}

server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -1923,7 +1923,7 @@ protected Optional<DisruptableMockTransport> getDisruptableMockTransport(Transpo
19231923
final AllocationService allocationService = ESAllocationTestCase.createAllocationService(Settings.EMPTY);
19241924
coordinator = new Coordinator("test_node", settings, clusterSettings, transportService, writableRegistry(),
19251925
allocationService, masterService, this::getPersistedState,
1926-
Cluster.this::provideSeedHosts, clusterApplierService, onJoinValidators, Randomness.get());
1926+
Cluster.this::provideSeedHosts, clusterApplierService, onJoinValidators, Randomness.get(), s -> {});
19271927
masterService.setClusterStatePublisher(coordinator);
19281928
final GatewayService gatewayService = new GatewayService(settings, allocationService, clusterService,
19291929
deterministicTaskQueue.getThreadPool(this::onNode), null, coordinator);

server/src/test/java/org/elasticsearch/cluster/coordination/JoinHelperTests.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ public void testJoinDeduplication() {
5757
x -> localNode, null, Collections.emptySet());
5858
JoinHelper joinHelper = new JoinHelper(Settings.EMPTY, null, null, transportService, () -> 0L, () -> null,
5959
(joinRequest, joinCallback) -> { throw new AssertionError(); }, startJoinRequest -> { throw new AssertionError(); },
60-
Collections.emptyList());
60+
Collections.emptyList(), s -> {});
6161
transportService.start();
6262

6363
DiscoveryNode node1 = new DiscoveryNode("node1", buildNewFakeTransportAddress(), Version.CURRENT);
@@ -153,7 +153,7 @@ public void testJoinValidationRejectsMismatchedClusterUUID() {
153153
x -> localNode, null, Collections.emptySet());
154154
new JoinHelper(Settings.EMPTY, null, null, transportService, () -> 0L, () -> localClusterState,
155155
(joinRequest, joinCallback) -> { throw new AssertionError(); }, startJoinRequest -> { throw new AssertionError(); },
156-
Collections.emptyList()); // registers request handler
156+
Collections.emptyList(), s -> {}); // registers request handler
157157
transportService.start();
158158
transportService.acceptIncomingRequests();
159159

server/src/test/java/org/elasticsearch/cluster/coordination/NodeJoinTests.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -176,7 +176,7 @@ transportService, writableRegistry(),
176176
() -> new InMemoryPersistedState(term, initialState), r -> emptyList(),
177177
new NoOpClusterApplier(),
178178
Collections.emptyList(),
179-
random);
179+
random, s -> {});
180180
transportService.start();
181181
transportService.acceptIncomingRequests();
182182
transport = capturingTransport;

0 commit comments

Comments
 (0)