Skip to content

Commit a50934e

Browse files
committed
Resiliency: Master election should demotes nodes which try to join the cluster for the first time
With the change in #7493, we introduced a pinging round when a master nodes goes down. That pinging round helps validating the current state of the cluster and takes, by default, 3 seconds. It may be that during that window, a new node tries to join the cluster and starts pinging (this is typical when you quickly restart the current master). If this node gets elected as the new master it will force recovery from the gateway (it has no in memory cluster state), which in turn will cause a full cluster shard synchronisation. While this is not a problem on it's own, it's a shame. This commit demotes "new" nodes during master election so the will only be elected if really needed. Closes #7558
1 parent 4ed5207 commit a50934e

File tree

10 files changed

+233
-86
lines changed

10 files changed

+233
-86
lines changed

src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java

+48-14
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@
5252
import org.elasticsearch.discovery.zen.fd.MasterFaultDetection;
5353
import org.elasticsearch.discovery.zen.fd.NodesFaultDetection;
5454
import org.elasticsearch.discovery.zen.membership.MembershipAction;
55+
import org.elasticsearch.discovery.zen.ping.PingContextProvider;
5556
import org.elasticsearch.discovery.zen.ping.ZenPing;
5657
import org.elasticsearch.discovery.zen.ping.ZenPingService;
5758
import org.elasticsearch.discovery.zen.publish.PublishClusterStateAction;
@@ -69,14 +70,15 @@
6970
import java.util.concurrent.CopyOnWriteArrayList;
7071
import java.util.concurrent.atomic.AtomicBoolean;
7172
import java.util.concurrent.atomic.AtomicInteger;
73+
import java.util.concurrent.atomic.AtomicLong;
7274

7375
import static com.google.common.collect.Lists.newArrayList;
7476
import static org.elasticsearch.common.unit.TimeValue.timeValueSeconds;
7577

7678
/**
7779
*
7880
*/
79-
public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implements Discovery, DiscoveryNodesProvider {
81+
public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implements Discovery, PingContextProvider {
8082

8183
public final static String SETTING_REJOIN_ON_MASTER_GONE = "discovery.zen.rejoin_on_master_gone";
8284
public final static String SETTING_PING_TIMEOUT = "discovery.zen.ping.timeout";
@@ -139,6 +141,9 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen
139141

140142
private volatile boolean rejoinOnMasterGone;
141143

144+
/** counts the time this node has joined the cluster or have elected it self as master */
145+
private final AtomicLong clusterJoinsCounter = new AtomicLong();
146+
142147
@Nullable
143148
private NodeService nodeService;
144149

@@ -194,7 +199,7 @@ public ZenDiscovery(Settings settings, ClusterName clusterName, ThreadPool threa
194199
this.nodesFD.addListener(new NodeFaultDetectionListener());
195200

196201
this.publishClusterState = new PublishClusterStateAction(settings, transportService, this, new NewClusterStateListener(), discoverySettings, clusterName);
197-
this.pingService.setNodesProvider(this);
202+
this.pingService.setPingContextProvider(this);
198203
this.membership = new MembershipAction(settings, clusterService, transportService, this, new MembershipListener());
199204

200205
transportService.registerHandler(DISCOVERY_REJOIN_ACTION_NAME, new RejoinClusterRequestHandler());
@@ -290,6 +295,7 @@ public String nodeDescription() {
290295
return clusterName.value() + "/" + localNode.id();
291296
}
292297

298+
/** start of {@link org.elasticsearch.discovery.zen.ping.PingContextProvider } implementation */
293299
@Override
294300
public DiscoveryNodes nodes() {
295301
DiscoveryNodes latestNodes = this.latestDiscoNodes;
@@ -305,6 +311,14 @@ public NodeService nodeService() {
305311
return this.nodeService;
306312
}
307313

314+
@Override
315+
public boolean nodeHasJoinedClusterOnce() {
316+
return clusterJoinsCounter.get() > 0;
317+
}
318+
319+
/** end of {@link org.elasticsearch.discovery.zen.ping.PingContextProvider } implementation */
320+
321+
308322
@Override
309323
public void publish(ClusterState clusterState, AckListener ackListener) {
310324
if (!master) {
@@ -387,6 +401,8 @@ public void onFailure(String source, Throwable t) {
387401
@Override
388402
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
389403
sendInitialStateEventIfNeeded();
404+
long count = clusterJoinsCounter.incrementAndGet();
405+
logger.trace("cluster joins counter set to [{}] (elected as master)", count);
390406
}
391407
});
392408
} else {
@@ -404,8 +420,8 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS
404420
}
405421

406422
masterFD.start(masterNode, "initial_join");
407-
// no need to submit the received cluster state, we will get it from the master when it publishes
408-
// the fact that we joined
423+
long count = clusterJoinsCounter.incrementAndGet();
424+
logger.trace("cluster joins counter set to [{}] (joined master)", count);
409425
}
410426
}
411427
}
@@ -922,7 +938,7 @@ private DiscoveryNode findMaster() {
922938
sb.append(" {none}");
923939
} else {
924940
for (ZenPing.PingResponse pingResponse : fullPingResponses) {
925-
sb.append("\n\t--> ").append("target [").append(pingResponse.target()).append("], master [").append(pingResponse.master()).append("]");
941+
sb.append("\n\t--> ").append(pingResponse);
926942
}
927943
}
928944
logger.trace(sb.toString());
@@ -931,7 +947,7 @@ private DiscoveryNode findMaster() {
931947
// filter responses
932948
List<ZenPing.PingResponse> pingResponses = Lists.newArrayList();
933949
for (ZenPing.PingResponse pingResponse : fullPingResponses) {
934-
DiscoveryNode node = pingResponse.target();
950+
DiscoveryNode node = pingResponse.node();
935951
if (masterElectionFilterClientNodes && (node.clientNode() || (!node.masterNode() && !node.dataNode()))) {
936952
// filter out the client node, which is a client node, or also one that is not data and not master (effectively, client)
937953
} else if (masterElectionFilterDataNodes && (!node.masterNode() && node.dataNode())) {
@@ -947,7 +963,7 @@ private DiscoveryNode findMaster() {
947963
sb.append(" {none}");
948964
} else {
949965
for (ZenPing.PingResponse pingResponse : pingResponses) {
950-
sb.append("\n\t--> ").append("target [").append(pingResponse.target()).append("], master [").append(pingResponse.master()).append("]");
966+
sb.append("\n\t--> ").append(pingResponse);
951967
}
952968
}
953969
logger.debug(sb.toString());
@@ -963,20 +979,38 @@ private DiscoveryNode findMaster() {
963979
}
964980
}
965981

966-
Set<DiscoveryNode> possibleMasterNodes = Sets.newHashSet();
982+
// nodes discovered during pinging
983+
Set<DiscoveryNode> activeNodes = Sets.newHashSet();
984+
// nodes discovered who has previously been part of the cluster and do not ping for the very first time
985+
Set<DiscoveryNode> joinedOnceActiveNodes = Sets.newHashSet();
967986
if (localNode.masterNode()) {
968-
possibleMasterNodes.add(localNode);
987+
activeNodes.add(localNode);
988+
long joinsCounter = clusterJoinsCounter.get();
989+
if (joinsCounter > 0) {
990+
logger.trace("adding local node to the list of active nodes who has previously joined the cluster (joins counter is [{}})", joinsCounter);
991+
joinedOnceActiveNodes.add(localNode);
992+
}
969993
}
970994
for (ZenPing.PingResponse pingResponse : pingResponses) {
971-
possibleMasterNodes.add(pingResponse.target());
995+
activeNodes.add(pingResponse.node());
996+
if (pingResponse.hasJoinedOnce()) {
997+
joinedOnceActiveNodes.add(pingResponse.node());
998+
}
972999
}
9731000

9741001
if (pingMasters.isEmpty()) {
975-
// if we don't have enough master nodes, we bail, because there are not enough master to elect from
976-
if (electMaster.hasEnoughMasterNodes(possibleMasterNodes)) {
977-
return electMaster.electMaster(possibleMasterNodes);
1002+
if (electMaster.hasEnoughMasterNodes(activeNodes)) {
1003+
// we give preference to nodes who have previously already joined the cluster. Those will
1004+
// have a cluster state in memory, including an up to date routing table (which is not persistent to disk
1005+
// by the gateway)
1006+
DiscoveryNode master = electMaster.electMaster(joinedOnceActiveNodes);
1007+
if (master != null) {
1008+
return master;
1009+
}
1010+
return electMaster.electMaster(activeNodes);
9781011
} else {
979-
logger.trace("not enough master nodes [{}]", possibleMasterNodes);
1012+
// if we don't have enough master nodes, we bail, because there are not enough master to elect from
1013+
logger.trace("not enough master nodes [{}]", activeNodes);
9801014
return null;
9811015
}
9821016
} else {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
/*
2+
* Licensed to Elasticsearch under one or more contributor
3+
* license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright
5+
* ownership. Elasticsearch licenses this file to you under
6+
* the Apache License, Version 2.0 (the "License"); you may
7+
* not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.elasticsearch.discovery.zen.ping;
21+
22+
import org.elasticsearch.discovery.zen.DiscoveryNodesProvider;
23+
24+
/**
25+
*
26+
*/
27+
public interface PingContextProvider extends DiscoveryNodesProvider {
28+
29+
/** return true if this node has previously joined the cluster at least once. False if this is first join */
30+
boolean nodeHasJoinedClusterOnce();
31+
32+
}

src/main/java/org/elasticsearch/discovery/zen/ping/ZenPing.java

+40-11
Original file line numberDiff line numberDiff line change
@@ -20,14 +20,14 @@
2020
package org.elasticsearch.discovery.zen.ping;
2121

2222
import org.elasticsearch.ElasticsearchException;
23+
import org.elasticsearch.Version;
2324
import org.elasticsearch.cluster.ClusterName;
2425
import org.elasticsearch.cluster.node.DiscoveryNode;
2526
import org.elasticsearch.common.component.LifecycleComponent;
2627
import org.elasticsearch.common.io.stream.StreamInput;
2728
import org.elasticsearch.common.io.stream.StreamOutput;
2829
import org.elasticsearch.common.io.stream.Streamable;
2930
import org.elasticsearch.common.unit.TimeValue;
30-
import org.elasticsearch.discovery.zen.DiscoveryNodesProvider;
3131

3232
import java.io.IOException;
3333

@@ -39,7 +39,7 @@
3939
*/
4040
public interface ZenPing extends LifecycleComponent<ZenPing> {
4141

42-
void setNodesProvider(DiscoveryNodesProvider nodesProvider);
42+
void setPingContextProvider(PingContextProvider contextProvider);
4343

4444
void ping(PingListener listener, TimeValue timeout) throws ElasticsearchException;
4545

@@ -49,36 +49,52 @@ public interface PingListener {
4949
}
5050

5151
public static class PingResponse implements Streamable {
52-
52+
5353
public static final PingResponse[] EMPTY = new PingResponse[0];
5454

5555
private ClusterName clusterName;
5656

57-
private DiscoveryNode target;
57+
private DiscoveryNode node;
5858

5959
private DiscoveryNode master;
6060

61+
private boolean hasJoinedOnce;
62+
6163
private PingResponse() {
6264
}
6365

64-
public PingResponse(DiscoveryNode target, DiscoveryNode master, ClusterName clusterName) {
65-
this.target = target;
66+
/**
67+
* @param node the node which this ping describes
68+
* @param master the current master of the node
69+
* @param clusterName the cluster name of the node
70+
* @param hasJoinedOnce true if the joined has successfully joined the cluster before
71+
*/
72+
public PingResponse(DiscoveryNode node, DiscoveryNode master, ClusterName clusterName, boolean hasJoinedOnce) {
73+
this.node = node;
6674
this.master = master;
6775
this.clusterName = clusterName;
76+
this.hasJoinedOnce = hasJoinedOnce;
6877
}
6978

7079
public ClusterName clusterName() {
7180
return this.clusterName;
7281
}
7382

74-
public DiscoveryNode target() {
75-
return target;
83+
/** the node which this ping describes */
84+
public DiscoveryNode node() {
85+
return node;
7686
}
7787

88+
/** the current master of the node */
7889
public DiscoveryNode master() {
7990
return master;
8091
}
8192

93+
/** true if the joined has successfully joined the cluster before */
94+
public boolean hasJoinedOnce() {
95+
return hasJoinedOnce;
96+
}
97+
8298
public static PingResponse readPingResponse(StreamInput in) throws IOException {
8399
PingResponse response = new PingResponse();
84100
response.readFrom(in);
@@ -88,27 +104,40 @@ public static PingResponse readPingResponse(StreamInput in) throws IOException {
88104
@Override
89105
public void readFrom(StreamInput in) throws IOException {
90106
clusterName = readClusterName(in);
91-
target = readNode(in);
107+
node = readNode(in);
92108
if (in.readBoolean()) {
93109
master = readNode(in);
94110
}
111+
if (in.getVersion().onOrAfter(Version.V_1_4_0)) {
112+
this.hasJoinedOnce = in.readBoolean();
113+
} else {
114+
// As of 1.4.0 we prefer to elect nodes which have previously successfully joined the cluster.
115+
// Nodes before 1.4.0 do not take this into consideration. If pre<1.4.0 node elects it self as master
116+
// based on the pings, we need to make sure we do the same. We therefore can not demote it
117+
// and thus mark it as if it has previously joined.
118+
this.hasJoinedOnce = true;
119+
}
120+
95121
}
96122

97123
@Override
98124
public void writeTo(StreamOutput out) throws IOException {
99125
clusterName.writeTo(out);
100-
target.writeTo(out);
126+
node.writeTo(out);
101127
if (master == null) {
102128
out.writeBoolean(false);
103129
} else {
104130
out.writeBoolean(true);
105131
master.writeTo(out);
106132
}
133+
if (out.getVersion().onOrAfter(Version.V_1_4_0)) {
134+
out.writeBoolean(hasJoinedOnce);
135+
}
107136
}
108137

109138
@Override
110139
public String toString() {
111-
return "ping_response{target [" + target + "], master [" + master + "], cluster_name[" + clusterName.value() + "]}";
140+
return "ping_response{node [" + node + "], master [" + master + "], hasJoinedOnce [" + hasJoinedOnce + "], cluster_name[" + clusterName.value() + "]}";
112141
}
113142
}
114143
}

src/main/java/org/elasticsearch/discovery/zen/ping/ZenPingService.java

+3-4
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,6 @@
3333
import org.elasticsearch.common.unit.TimeValue;
3434
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
3535
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
36-
import org.elasticsearch.discovery.zen.DiscoveryNodesProvider;
3736
import org.elasticsearch.discovery.zen.elect.ElectMasterService;
3837
import org.elasticsearch.discovery.zen.ping.multicast.MulticastZenPing;
3938
import org.elasticsearch.discovery.zen.ping.unicast.UnicastHostsProvider;
@@ -92,12 +91,12 @@ public void zenPings(ImmutableList<? extends ZenPing> pings) {
9291
}
9392

9493
@Override
95-
public void setNodesProvider(DiscoveryNodesProvider nodesProvider) {
94+
public void setPingContextProvider(PingContextProvider contextProvider) {
9695
if (lifecycle.started()) {
9796
throw new ElasticsearchIllegalStateException("Can't set nodes provider when started");
9897
}
9998
for (ZenPing zenPing : zenPings) {
100-
zenPing.setNodesProvider(nodesProvider);
99+
zenPing.setPingContextProvider(contextProvider);
101100
}
102101
}
103102

@@ -172,7 +171,7 @@ private CompoundPingListener(PingListener listener, ImmutableList<? extends ZenP
172171
public void onPing(PingResponse[] pings) {
173172
if (pings != null) {
174173
for (PingResponse pingResponse : pings) {
175-
responses.put(pingResponse.target(), pingResponse);
174+
responses.put(pingResponse.node(), pingResponse);
176175
}
177176
}
178177
if (counter.decrementAndGet() == 0) {

0 commit comments

Comments
 (0)