Skip to content

Commit b7ebdfa

Browse files
authored
Zen2 node joining (elastic#35)
Adds node joining based on a simplified MasterService abstraction.
1 parent 8d7c962 commit b7ebdfa

File tree

10 files changed

+472
-89
lines changed

10 files changed

+472
-89
lines changed

build.gradle

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -145,8 +145,8 @@ task verifyVersions {
145145
* the enabled state of every bwc task. It should be set back to true
146146
* after the backport of the backcompat code is complete.
147147
*/
148-
final boolean bwc_tests_enabled = true
149-
final String bwc_tests_disabled_issue = "" /* place a PR link here when commiting bwc changes */
148+
final boolean bwc_tests_enabled = false
149+
final String bwc_tests_disabled_issue = "Zen2 prototype development" /* place a PR link here when commiting bwc changes */
150150
if (bwc_tests_enabled == false) {
151151
if (bwc_tests_disabled_issue.isEmpty()) {
152152
throw new GradleException("bwc_tests_disabled_issue must be set when bwc_tests_enabled == false")

server/src/main/java/org/elasticsearch/cluster/ClusterState.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -720,6 +720,9 @@ public static ClusterState readFrom(StreamInput in, DiscoveryNode localNode) thr
720720
ClusterName clusterName = new ClusterName(in);
721721
Builder builder = new Builder(clusterName);
722722
builder.version = in.readLong();
723+
if (in.getVersion().onOrAfter(Version.V_7_0_0_alpha1)) {
724+
builder.term = in.readLong();
725+
}
723726
builder.uuid = in.readString();
724727
if (in.getVersion().onOrAfter(Version.V_7_0_0_alpha1)) {
725728
builder.lastCommittedConfiguration(new VotingConfiguration(in));
@@ -741,6 +744,9 @@ public static ClusterState readFrom(StreamInput in, DiscoveryNode localNode) thr
741744
public void writeTo(StreamOutput out) throws IOException {
742745
clusterName.writeTo(out);
743746
out.writeLong(version);
747+
if (out.getVersion().onOrAfter(Version.V_7_0_0_alpha1)) {
748+
out.writeLong(term);
749+
}
744750
out.writeString(stateUUID);
745751
if (out.getVersion().onOrAfter(Version.V_7_0_0_alpha1)) {
746752
lastCommittedConfiguration.writeTo(out);
@@ -900,7 +906,7 @@ public void writeTo(StreamOutput out) throws IOException {
900906

901907
public boolean hasQuorum(Collection<String> votes) {
902908
if (nodeIds.isEmpty()) {
903-
throw new IllegalStateException("cannot check quorum on an empty configuration");
909+
return false; // TODO: should we even allow this check on an empty configuration?
904910
}
905911
final HashSet<String> intersection = new HashSet<>(nodeIds);
906912
intersection.retainAll(votes);

server/src/main/java/org/elasticsearch/discovery/zen/MasterFaultDetection.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -311,7 +311,7 @@ public Throwable fillInStackTrace() {
311311
}
312312
}
313313

314-
static class NodeDoesNotExistOnMasterException extends IllegalStateException {
314+
public static class NodeDoesNotExistOnMasterException extends IllegalStateException {
315315
@Override
316316
public Throwable fillInStackTrace() {
317317
return null;

server/src/main/java/org/elasticsearch/discovery/zen/MembershipAction.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -205,7 +205,7 @@ public void messageReceived(ValidateJoinRequest request, TransportChannel channe
205205
* @see Version#minimumIndexCompatibilityVersion()
206206
* @throws IllegalStateException if any index is incompatible with the given version
207207
*/
208-
static void ensureIndexCompatibility(final Version nodeVersion, MetaData metaData) {
208+
public static void ensureIndexCompatibility(final Version nodeVersion, MetaData metaData) {
209209
Version supportedIndexVersion = nodeVersion.minimumIndexCompatibilityVersion();
210210
// we ensure that all indices in the cluster we join are compatible with us no matter if they are
211211
// closed or not we can't read mappings of these indices so we need to reject the join...
@@ -229,7 +229,7 @@ static void ensureNodesCompatibility(final Version joiningNodeVersion, Discovery
229229
}
230230

231231
/** ensures that the joining node has a version that's compatible with a given version range */
232-
static void ensureNodesCompatibility(Version joiningNodeVersion, Version minClusterNodeVersion, Version maxClusterNodeVersion) {
232+
public static void ensureNodesCompatibility(Version joiningNodeVersion, Version minClusterNodeVersion, Version maxClusterNodeVersion) {
233233
assert minClusterNodeVersion.onOrBefore(maxClusterNodeVersion) : minClusterNodeVersion + " > " + maxClusterNodeVersion;
234234
if (joiningNodeVersion.isCompatible(maxClusterNodeVersion) == false) {
235235
throw new IllegalStateException("node version [" + joiningNodeVersion + "] is not supported. " +
@@ -246,7 +246,7 @@ static void ensureNodesCompatibility(Version joiningNodeVersion, Version minClus
246246
* to ensure that if the master is already fully operating under the new major version, it doesn't go back to mixed
247247
* version mode
248248
**/
249-
static void ensureMajorVersionBarrier(Version joiningNodeVersion, Version minClusterNodeVersion) {
249+
public static void ensureMajorVersionBarrier(Version joiningNodeVersion, Version minClusterNodeVersion) {
250250
final byte clusterMajor = minClusterNodeVersion.major;
251251
if (joiningNodeVersion.major < clusterMajor) {
252252
throw new IllegalStateException("node version [" + joiningNodeVersion + "] is not supported. " +

server/src/main/java/org/elasticsearch/discovery/zen/NodeJoinController.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -344,11 +344,11 @@ public void onFailure(String source, Exception e) {
344344

345345
}
346346

347-
static class JoinTaskListener implements ClusterStateTaskListener {
347+
public static class JoinTaskListener implements ClusterStateTaskListener {
348348
final List<MembershipAction.JoinCallback> callbacks;
349349
private final Logger logger;
350350

351-
JoinTaskListener(MembershipAction.JoinCallback callback, Logger logger) {
351+
public JoinTaskListener(MembershipAction.JoinCallback callback, Logger logger) {
352352
this(Collections.singletonList(callback), logger);
353353
}
354354

server/src/main/java/org/elasticsearch/discovery/zen2/ConsensusState.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,10 @@ public long getLastPublishedVersion() {
103103
return lastPublishedVersion;
104104
}
105105

106+
public boolean hasElectionQuorum(VotingConfiguration votingConfiguration) {
107+
return joinVotes.isQuorum(votingConfiguration);
108+
}
109+
106110
/**
107111
* May be safely called at any time to move this instance to a new term. It is vitally important for safety that
108112
* the resulting Join is sent to no more than one node.
@@ -214,7 +218,7 @@ && getLastCommittedConfiguration().equals(getLastAcceptedConfiguration()) == fal
214218
logger.debug("handleClientValue: only allow reconfiguration while not already reconfiguring");
215219
throw new ConsensusMessageRejectedException("only allow reconfiguration while not already reconfiguring");
216220
}
217-
if (clusterState.getLastAcceptedConfiguration().hasQuorum(joinVotes.nodes.keySet()) == false) {
221+
if (hasElectionQuorum(clusterState.getLastAcceptedConfiguration()) == false) {
218222
logger.debug("handleClientValue: only allow reconfiguration if join quorum available for new config");
219223
throw new ConsensusMessageRejectedException("only allow reconfiguration if join quorum available for new config");
220224
}

0 commit comments

Comments
 (0)