44
44
import org .elasticsearch .cluster .service .ClusterApplier ;
45
45
import org .elasticsearch .cluster .service .ClusterApplier .ClusterApplyListener ;
46
46
import org .elasticsearch .cluster .service .MasterService ;
47
+ import org .elasticsearch .common .Booleans ;
47
48
import org .elasticsearch .common .Nullable ;
48
49
import org .elasticsearch .common .Priority ;
49
50
import org .elasticsearch .common .Strings ;
86
87
import static org .elasticsearch .gateway .GatewayService .STATE_NOT_RECOVERED_BLOCK ;
87
88
88
89
public class Coordinator extends AbstractLifecycleComponent implements Discovery {
90
+
91
+ public static final long ZEN1_BWC_TERM = 0 ;
92
+
89
93
private static final Logger logger = LogManager .getLogger (Coordinator .class );
90
94
91
95
// the timeout for the publication of each value
@@ -121,6 +125,7 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
121
125
private long maxTermSeen ;
122
126
private final Reconfigurator reconfigurator ;
123
127
private final ClusterBootstrapService clusterBootstrapService ;
128
+ private final DiscoveryUpgradeService discoveryUpgradeService ;
124
129
private final LagDetector lagDetector ;
125
130
private final ClusterFormationFailureHelper clusterFormationFailureHelper ;
126
131
@@ -161,6 +166,8 @@ public Coordinator(String nodeName, Settings settings, ClusterSettings clusterSe
161
166
masterService .setClusterStateSupplier (this ::getStateForMasterService );
162
167
this .reconfigurator = new Reconfigurator (settings , clusterSettings );
163
168
this .clusterBootstrapService = new ClusterBootstrapService (settings , transportService );
169
+ this .discoveryUpgradeService = new DiscoveryUpgradeService (settings , clusterSettings , transportService , this ::isBootstrapped ,
170
+ joinHelper , peerFinder ::getFoundPeers , this ::unsafelySetConfigurationForUpgrade );
164
171
this .lagDetector = new LagDetector (settings , transportService .getThreadPool (), n -> removeNode (n , "lagging" ),
165
172
transportService ::getLocalNode );
166
173
this .clusterFormationFailureHelper = new ClusterFormationFailureHelper (settings , this ::getClusterFormationState ,
@@ -256,6 +263,14 @@ PublishWithJoinResponse handlePublishRequest(PublishRequest publishRequest) {
256
263
throw new CoordinationStateRejectedException ("no longer leading this publication's term: " + publishRequest );
257
264
}
258
265
266
+ if (publishRequest .getAcceptedState ().term () == ZEN1_BWC_TERM && getCurrentTerm () == ZEN1_BWC_TERM
267
+ && mode == Mode .FOLLOWER && Optional .of (sourceNode ).equals (lastKnownLeader ) == false ) {
268
+
269
+ logger .debug ("received cluster state from {} but currently following {}, rejecting" , sourceNode , lastKnownLeader );
270
+ throw new CoordinationStateRejectedException ("received cluster state from " + sourceNode + " but currently following "
271
+ + lastKnownLeader + ", rejecting" );
272
+ }
273
+
259
274
ensureTermAtLeast (sourceNode , publishRequest .getAcceptedState ().term ());
260
275
final PublishResponse publishResponse = coordinationState .get ().handlePublishRequest (publishRequest );
261
276
@@ -323,7 +338,11 @@ private void startElection() {
323
338
final StartJoinRequest startJoinRequest
324
339
= new StartJoinRequest (getLocalNode (), Math .max (getCurrentTerm (), maxTermSeen ) + 1 );
325
340
logger .debug ("starting election with {}" , startJoinRequest );
326
- getDiscoveredNodes ().forEach (node -> joinHelper .sendStartJoinRequest (startJoinRequest , node ));
341
+ getDiscoveredNodes ().forEach (node -> {
342
+ if (isZen1Node (node ) == false ) {
343
+ joinHelper .sendStartJoinRequest (startJoinRequest , node );
344
+ }
345
+ });
327
346
}
328
347
}
329
348
}
@@ -384,6 +403,11 @@ void becomeCandidate(String method) {
384
403
385
404
peerFinder .activate (coordinationState .get ().getLastAcceptedState ().nodes ());
386
405
clusterFormationFailureHelper .start ();
406
+
407
+ if (getCurrentTerm () == ZEN1_BWC_TERM ) {
408
+ discoveryUpgradeService .activate (lastKnownLeader );
409
+ }
410
+
387
411
leaderChecker .setCurrentNodes (DiscoveryNodes .EMPTY_NODES );
388
412
leaderChecker .updateLeader (null );
389
413
@@ -414,6 +438,7 @@ void becomeLeader(String method) {
414
438
415
439
lastKnownLeader = Optional .of (getLocalNode ());
416
440
peerFinder .deactivate (getLocalNode ());
441
+ discoveryUpgradeService .deactivate ();
417
442
clusterFormationFailureHelper .stop ();
418
443
closePrevotingAndElectionScheduler ();
419
444
preVoteCollector .update (getPreVoteResponse (), getLocalNode ());
@@ -439,6 +464,7 @@ void becomeFollower(String method, DiscoveryNode leaderNode) {
439
464
440
465
lastKnownLeader = Optional .of (leaderNode );
441
466
peerFinder .deactivate (leaderNode );
467
+ discoveryUpgradeService .deactivate ();
442
468
clusterFormationFailureHelper .stop ();
443
469
closePrevotingAndElectionScheduler ();
444
470
cancelActivePublication ();
@@ -647,9 +673,6 @@ public boolean setInitialConfiguration(final VotingConfiguration votingConfigura
647
673
return false ;
648
674
}
649
675
650
- assert currentState .term () == 0 : currentState ;
651
- assert currentState .version () == 0 : currentState ;
652
-
653
676
if (mode != Mode .CANDIDATE ) {
654
677
throw new CoordinationStateRejectedException ("Cannot set initial configuration in mode " + mode );
655
678
}
@@ -681,12 +704,59 @@ public boolean setInitialConfiguration(final VotingConfiguration votingConfigura
681
704
}
682
705
}
683
706
707
+ private boolean isBootstrapped () {
708
+ return getLastAcceptedState ().getLastAcceptedConfiguration ().isEmpty () == false ;
709
+ }
710
+
711
+ private void unsafelySetConfigurationForUpgrade (VotingConfiguration votingConfiguration ) {
712
+ assert Version .CURRENT .major == Version .V_6_6_0 .major + 1 : "remove this method once unsafe upgrades are no longer needed" ;
713
+ synchronized (mutex ) {
714
+ if (mode != Mode .CANDIDATE ) {
715
+ throw new IllegalStateException ("Cannot overwrite configuration in mode " + mode );
716
+ }
717
+
718
+ if (isBootstrapped ()) {
719
+ throw new IllegalStateException ("Cannot overwrite configuration: configuration is already set to "
720
+ + getLastAcceptedState ().getLastAcceptedConfiguration ());
721
+ }
722
+
723
+ if (lastKnownLeader .map (Coordinator ::isZen1Node ).orElse (false ) == false ) {
724
+ throw new IllegalStateException ("Cannot upgrade from last-known leader: " + lastKnownLeader );
725
+ }
726
+
727
+ if (getCurrentTerm () != ZEN1_BWC_TERM ) {
728
+ throw new IllegalStateException ("Cannot upgrade, term is " + getCurrentTerm ());
729
+ }
730
+
731
+ logger .info ("automatically bootstrapping during rolling upgrade, using initial configuration {}" , votingConfiguration );
732
+
733
+ final ClusterState currentState = getStateForMasterService ();
734
+ final Builder builder = masterService .incrementVersion (currentState );
735
+ builder .metaData (MetaData .builder (currentState .metaData ()).coordinationMetaData (
736
+ CoordinationMetaData .builder (currentState .metaData ().coordinationMetaData ())
737
+ .term (1 )
738
+ .lastAcceptedConfiguration (votingConfiguration )
739
+ .lastCommittedConfiguration (votingConfiguration )
740
+ .build ()));
741
+ final ClusterState newClusterState = builder .build ();
742
+
743
+ coordinationState .get ().handleStartJoin (new StartJoinRequest (getLocalNode (), newClusterState .term ()));
744
+ coordinationState .get ().handlePublishRequest (new PublishRequest (newClusterState ));
745
+
746
+ followersChecker .clearCurrentNodes ();
747
+ followersChecker .updateFastResponseState (getCurrentTerm (), mode );
748
+
749
+ peerFinder .deactivate (getLocalNode ());
750
+ peerFinder .activate (newClusterState .nodes ());
751
+ }
752
+ }
753
+
684
754
// Package-private for testing
685
755
ClusterState improveConfiguration (ClusterState clusterState ) {
686
756
assert Thread .holdsLock (mutex ) : "Coordinator mutex not held" ;
687
757
688
758
final Set <DiscoveryNode > liveNodes = StreamSupport .stream (clusterState .nodes ().spliterator (), false )
689
- .filter (this ::hasJoinVoteFrom ).collect (Collectors .toSet ());
759
+ .filter (this ::hasJoinVoteFrom ).filter ( discoveryNode -> isZen1Node ( discoveryNode ) == false ). collect (Collectors .toSet ());
690
760
final VotingConfiguration newConfig = reconfigurator .reconfigure (liveNodes ,
691
761
clusterState .getVotingConfigExclusions ().stream ().map (VotingConfigExclusion ::getNodeId ).collect (Collectors .toSet ()),
692
762
clusterState .getLastAcceptedConfiguration ());
@@ -967,7 +1037,9 @@ public void run() {
967
1037
prevotingRound .close ();
968
1038
}
969
1039
final ClusterState lastAcceptedState = coordinationState .get ().getLastAcceptedState ();
970
- prevotingRound = preVoteCollector .start (lastAcceptedState , getDiscoveredNodes ());
1040
+ final List <DiscoveryNode > discoveredNodes
1041
+ = getDiscoveredNodes ().stream ().filter (n -> isZen1Node (n ) == false ).collect (Collectors .toList ());
1042
+ prevotingRound = preVoteCollector .start (lastAcceptedState , discoveredNodes );
971
1043
}
972
1044
}
973
1045
}
@@ -1176,13 +1248,13 @@ protected void sendApplyCommit(DiscoveryNode destination, ApplyCommitRequest app
1176
1248
}
1177
1249
1178
1250
// TODO: only here temporarily for BWC development, remove once complete
1179
- public static Settings .Builder addZen1Attribute (Settings .Builder builder ) {
1180
- return builder .put ("node.attr.zen1" , true );
1251
+ public static Settings .Builder addZen1Attribute (boolean isZen1Node , Settings .Builder builder ) {
1252
+ return builder .put ("node.attr.zen1" , isZen1Node );
1181
1253
}
1182
1254
1183
1255
// TODO: only here temporarily for BWC development, remove once complete
1184
1256
public static boolean isZen1Node (DiscoveryNode discoveryNode ) {
1185
1257
return discoveryNode .getVersion ().before (Version .V_7_0_0 ) ||
1186
- discoveryNode .getAttributes ().containsKey ("zen1" );
1258
+ ( Booleans . isTrue ( discoveryNode .getAttributes ().getOrDefault ("zen1" , "false" )) );
1187
1259
}
1188
1260
}
0 commit comments