22
22
import org .apache .logging .log4j .Logger ;
23
23
import org .apache .logging .log4j .LogManager ;
24
24
import org .elasticsearch .cluster .ClusterState ;
25
+ import org .elasticsearch .cluster .coordination .Coordinator ;
25
26
import org .elasticsearch .cluster .node .DiscoveryNode ;
26
27
import org .elasticsearch .cluster .routing .allocation .AllocationService ;
27
28
import org .elasticsearch .cluster .service .ClusterApplier ;
29
+ import org .elasticsearch .cluster .service .ClusterApplierService ;
28
30
import org .elasticsearch .cluster .service .MasterService ;
31
+ import org .elasticsearch .common .Randomness ;
29
32
import org .elasticsearch .common .io .stream .NamedWriteableRegistry ;
30
33
import org .elasticsearch .common .network .NetworkService ;
31
34
import org .elasticsearch .common .settings .ClusterSettings ;
58
61
import java .util .function .Supplier ;
59
62
import java .util .stream .Collectors ;
60
63
64
+ import static org .elasticsearch .node .Node .NODE_NAME_SETTING ;
65
+
61
66
/**
62
67
* A module for loading classes for node discovery.
63
68
*/
64
69
public class DiscoveryModule {
65
70
private static final Logger logger = LogManager .getLogger (DiscoveryModule .class );
66
71
72
+ public static final String ZEN_DISCOVERY_TYPE = "zen" ;
73
+ public static final String ZEN2_DISCOVERY_TYPE = "zen2" ;
74
+
67
75
public static final Setting <String > DISCOVERY_TYPE_SETTING =
68
- new Setting <>("discovery.type" , "zen" , Function .identity (), Property .NodeScope );
76
+ new Setting <>("discovery.type" , ZEN_DISCOVERY_TYPE , Function .identity (), Property .NodeScope );
69
77
public static final Setting <List <String >> DISCOVERY_HOSTS_PROVIDER_SETTING =
70
78
Setting .listSetting ("discovery.zen.hosts_provider" , Collections .emptyList (), Function .identity (), Property .NodeScope );
71
79
@@ -75,14 +83,14 @@ public DiscoveryModule(Settings settings, ThreadPool threadPool, TransportServic
75
83
NamedWriteableRegistry namedWriteableRegistry , NetworkService networkService , MasterService masterService ,
76
84
ClusterApplier clusterApplier , ClusterSettings clusterSettings , List <DiscoveryPlugin > plugins ,
77
85
AllocationService allocationService , Path configFile , GatewayMetaState gatewayMetaState ) {
78
- final Collection <BiConsumer <DiscoveryNode ,ClusterState >> joinValidators = new ArrayList <>();
86
+ final Collection <BiConsumer <DiscoveryNode , ClusterState >> joinValidators = new ArrayList <>();
79
87
final Map <String , Supplier <UnicastHostsProvider >> hostProviders = new HashMap <>();
80
88
hostProviders .put ("settings" , () -> new SettingsBasedHostsProvider (settings , transportService ));
81
89
hostProviders .put ("file" , () -> new FileBasedUnicastHostsProvider (configFile ));
82
90
for (DiscoveryPlugin plugin : plugins ) {
83
- plugin .getZenHostsProviders (transportService , networkService ).entrySet (). forEach (entry -> {
84
- if (hostProviders .put (entry . getKey (), entry . getValue () ) != null ) {
85
- throw new IllegalArgumentException ("Cannot register zen hosts provider [" + entry . getKey () + "] twice" );
91
+ plugin .getZenHostsProviders (transportService , networkService ).forEach (( key , value ) -> {
92
+ if (hostProviders .put (key , value ) != null ) {
93
+ throw new IllegalArgumentException ("Cannot register zen hosts provider [" + key + "] twice" );
86
94
}
87
95
});
88
96
BiConsumer <DiscoveryNode , ClusterState > joinValidator = plugin .getJoinValidator ();
@@ -117,18 +125,21 @@ public DiscoveryModule(Settings settings, ThreadPool threadPool, TransportServic
117
125
};
118
126
119
127
Map <String , Supplier <Discovery >> discoveryTypes = new HashMap <>();
120
- discoveryTypes .put ("zen" ,
128
+ discoveryTypes .put (ZEN_DISCOVERY_TYPE ,
121
129
() -> new ZenDiscovery (settings , threadPool , transportService , namedWriteableRegistry , masterService , clusterApplier ,
122
130
clusterSettings , hostsProvider , allocationService , Collections .unmodifiableCollection (joinValidators ), gatewayMetaState ));
131
+ discoveryTypes .put (ZEN2_DISCOVERY_TYPE , () -> new Coordinator (NODE_NAME_SETTING .get (settings ), settings , clusterSettings ,
132
+ transportService , namedWriteableRegistry , allocationService , masterService ,
133
+ () -> gatewayMetaState .getPersistedState (settings , (ClusterApplierService ) clusterApplier ), hostsProvider , clusterApplier ,
134
+ Randomness .get ()));
123
135
discoveryTypes .put ("single-node" , () -> new SingleNodeDiscovery (settings , transportService , masterService , clusterApplier ));
124
136
for (DiscoveryPlugin plugin : plugins ) {
125
- plugin .getDiscoveryTypes (threadPool , transportService , namedWriteableRegistry ,
126
- masterService , clusterApplier , clusterSettings , hostsProvider , allocationService , gatewayMetaState ).entrySet ()
127
- .forEach (entry -> {
128
- if (discoveryTypes .put (entry .getKey (), entry .getValue ()) != null ) {
129
- throw new IllegalArgumentException ("Cannot register discovery type [" + entry .getKey () + "] twice" );
130
- }
131
- });
137
+ plugin .getDiscoveryTypes (threadPool , transportService , namedWriteableRegistry , masterService , clusterApplier , clusterSettings ,
138
+ hostsProvider , allocationService , gatewayMetaState ).forEach ((key , value ) -> {
139
+ if (discoveryTypes .put (key , value ) != null ) {
140
+ throw new IllegalArgumentException ("Cannot register discovery type [" + key + "] twice" );
141
+ }
142
+ });
132
143
}
133
144
String discoveryType = DISCOVERY_TYPE_SETTING .get (settings );
134
145
Supplier <Discovery > discoverySupplier = discoveryTypes .get (discoveryType );
0 commit comments