53
53
import org .elasticsearch .common .settings .ClusterSettings ;
54
54
import org .elasticsearch .common .settings .Setting ;
55
55
import org .elasticsearch .common .settings .Settings ;
56
+ import org .elasticsearch .common .transport .TransportAddress ;
56
57
import org .elasticsearch .common .unit .TimeValue ;
57
58
import org .elasticsearch .common .util .concurrent .EsExecutors ;
58
59
import org .elasticsearch .common .util .concurrent .ListenableFuture ;
59
60
import org .elasticsearch .common .xcontent .XContentHelper ;
60
61
import org .elasticsearch .common .xcontent .json .JsonXContent ;
61
62
import org .elasticsearch .discovery .Discovery ;
63
+ import org .elasticsearch .discovery .DiscoveryModule ;
62
64
import org .elasticsearch .discovery .DiscoveryStats ;
63
65
import org .elasticsearch .discovery .HandshakingTransportAddressConnector ;
64
66
import org .elasticsearch .discovery .PeerFinder ;
72
74
73
75
import java .util .ArrayList ;
74
76
import java .util .Collection ;
77
+ import java .util .Collections ;
75
78
import java .util .HashSet ;
76
79
import java .util .List ;
77
80
import java .util .Optional ;
@@ -99,6 +102,7 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
99
102
TimeValue .timeValueMillis (30000 ), TimeValue .timeValueMillis (1 ), Setting .Property .NodeScope );
100
103
101
104
private final Settings settings ;
105
+ private final boolean singleNodeDiscovery ;
102
106
private final TransportService transportService ;
103
107
private final MasterService masterService ;
104
108
private final AllocationService allocationService ;
@@ -149,6 +153,7 @@ public Coordinator(String nodeName, Settings settings, ClusterSettings clusterSe
149
153
this .masterService = masterService ;
150
154
this .allocationService = allocationService ;
151
155
this .onJoinValidators = JoinTaskExecutor .addBuiltInJoinValidators (onJoinValidators );
156
+ this .singleNodeDiscovery = DiscoveryModule .SINGLE_NODE_DISCOVERY_TYPE .equals (DiscoveryModule .DISCOVERY_TYPE_SETTING .get (settings ));
152
157
this .joinHelper = new JoinHelper (settings , allocationService , masterService , transportService ,
153
158
this ::getCurrentTerm , this ::getStateForMasterService , this ::handleJoinRequest , this ::joinLeaderInTerm , this .onJoinValidators );
154
159
this .persistedStateSupplier = persistedStateSupplier ;
@@ -448,6 +453,13 @@ private void handleJoinRequest(JoinRequest joinRequest, JoinHelper.JoinCallback
448
453
assert Thread .holdsLock (mutex ) == false ;
449
454
assert getLocalNode ().isMasterNode () : getLocalNode () + " received a join but is not master-eligible" ;
450
455
logger .trace ("handleJoinRequest: as {}, handling {}" , mode , joinRequest );
456
+
457
+ if (singleNodeDiscovery && joinRequest .getSourceNode ().equals (getLocalNode ()) == false ) {
458
+ joinCallback .onFailure (new IllegalStateException ("cannot join node with [" + DiscoveryModule .DISCOVERY_TYPE_SETTING .getKey () +
459
+ "] set to [" + DiscoveryModule .SINGLE_NODE_DISCOVERY_TYPE + "] discovery" ));
460
+ return ;
461
+ }
462
+
451
463
transportService .connectToNode (joinRequest .getSourceNode ());
452
464
453
465
final ClusterState stateForJoinValidation = getStateForMasterService ();
@@ -666,6 +678,14 @@ protected void doStart() {
666
678
coordinationState .set (new CoordinationState (settings , getLocalNode (), persistedState ));
667
679
peerFinder .setCurrentTerm (getCurrentTerm ());
668
680
configuredHostsResolver .start ();
681
+ VotingConfiguration votingConfiguration = coordinationState .get ().getLastAcceptedState ().getLastCommittedConfiguration ();
682
+ if (singleNodeDiscovery &&
683
+ votingConfiguration .isEmpty () == false &&
684
+ votingConfiguration .hasQuorum (Collections .singleton (getLocalNode ().getId ())) == false ) {
685
+ throw new IllegalStateException ("cannot start with [" + DiscoveryModule .DISCOVERY_TYPE_SETTING .getKey () + "] set to [" +
686
+ DiscoveryModule .SINGLE_NODE_DISCOVERY_TYPE + "] when local node " + getLocalNode () +
687
+ " does not have quorum in voting configuration " + votingConfiguration );
688
+ }
669
689
ClusterState initialState = ClusterState .builder (ClusterName .CLUSTER_NAME_SETTING .get (settings ))
670
690
.blocks (ClusterBlocks .builder ()
671
691
.addGlobalBlock (STATE_NOT_RECOVERED_BLOCK )
@@ -1079,7 +1099,8 @@ private class CoordinatorPeerFinder extends PeerFinder {
1079
1099
1080
1100
CoordinatorPeerFinder (Settings settings , TransportService transportService , TransportAddressConnector transportAddressConnector ,
1081
1101
ConfiguredHostsResolver configuredHostsResolver ) {
1082
- super (settings , transportService , transportAddressConnector , configuredHostsResolver );
1102
+ super (settings , transportService , transportAddressConnector ,
1103
+ singleNodeDiscovery ? hostsResolver -> Collections .emptyList () : configuredHostsResolver );
1083
1104
}
1084
1105
1085
1106
@ Override
@@ -1090,6 +1111,13 @@ protected void onActiveMasterFound(DiscoveryNode masterNode, long term) {
1090
1111
}
1091
1112
}
1092
1113
1114
+ @ Override
1115
+ protected void startProbe (TransportAddress transportAddress ) {
1116
+ if (singleNodeDiscovery == false ) {
1117
+ super .startProbe (transportAddress );
1118
+ }
1119
+ }
1120
+
1093
1121
@ Override
1094
1122
protected void onFoundPeersUpdated () {
1095
1123
synchronized (mutex ) {
0 commit comments