26
26
import org .elasticsearch .action .ActionListener ;
27
27
import org .elasticsearch .cluster .ClusterName ;
28
28
import org .elasticsearch .cluster .node .DiscoveryNode ;
29
+ import org .elasticsearch .cluster .node .DiscoveryNodeRole ;
29
30
import org .elasticsearch .common .io .stream .StreamInput ;
30
31
import org .elasticsearch .common .io .stream .StreamOutput ;
31
32
import org .elasticsearch .common .io .stream .Writeable ;
40
41
import java .util .Collections ;
41
42
import java .util .HashSet ;
42
43
import java .util .List ;
44
+ import java .util .Map ;
43
45
import java .util .Objects ;
44
46
import java .util .Set ;
45
47
import java .util .concurrent .atomic .AtomicInteger ;
49
51
import java .util .stream .Collectors ;
50
52
import java .util .stream .Stream ;
51
53
54
+ import static org .elasticsearch .common .settings .Setting .boolSetting ;
52
55
import static org .elasticsearch .common .settings .Setting .intSetting ;
53
56
54
57
public class SimpleConnectionStrategy extends RemoteConnectionStrategy {
@@ -76,6 +79,15 @@ public class SimpleConnectionStrategy extends RemoteConnectionStrategy {
76
79
(ns , key ) -> intSetting (key , 18 , 1 , new StrategyValidator <>(ns , key , ConnectionStrategy .SIMPLE ),
77
80
Setting .Property .Dynamic , Setting .Property .NodeScope ));
78
81
82
+ /**
83
+ * Whether to include the hostname as a server_name attribute
84
+ */
85
+ public static final Setting .AffixSetting <Boolean > INCLUDE_SERVER_NAME = Setting .affixKeySetting (
86
+ "cluster.remote." ,
87
+ "simple.include_server_name" ,
88
+ (ns , key ) -> boolSetting (key , false , new StrategyValidator <>(ns , key , ConnectionStrategy .SIMPLE ),
89
+ Setting .Property .Dynamic , Setting .Property .NodeScope ));
90
+
79
91
static final int CHANNELS_PER_CONNECTION = 1 ;
80
92
81
93
private static final int MAX_CONNECT_ATTEMPTS_PER_RUN = 3 ;
@@ -84,6 +96,7 @@ public class SimpleConnectionStrategy extends RemoteConnectionStrategy {
84
96
private final int maxNumConnections ;
85
97
private final AtomicLong counter = new AtomicLong (0 );
86
98
private final List <String > configuredAddresses ;
99
+ private final boolean includeServerName ;
87
100
private final List <Supplier <TransportAddress >> addresses ;
88
101
private final AtomicReference <ClusterName > remoteClusterName = new AtomicReference <>();
89
102
private final ConnectionProfile profile ;
@@ -96,21 +109,31 @@ public class SimpleConnectionStrategy extends RemoteConnectionStrategy {
96
109
transportService ,
97
110
connectionManager ,
98
111
REMOTE_SOCKET_CONNECTIONS .getConcreteSettingForNamespace (clusterAlias ).get (settings ),
99
- REMOTE_CLUSTER_ADDRESSES .getConcreteSettingForNamespace (clusterAlias ).get (settings ));
112
+ REMOTE_CLUSTER_ADDRESSES .getConcreteSettingForNamespace (clusterAlias ).get (settings ),
113
+ INCLUDE_SERVER_NAME .getConcreteSettingForNamespace (clusterAlias ).get (settings ));
100
114
}
101
115
102
116
SimpleConnectionStrategy (String clusterAlias , TransportService transportService , RemoteConnectionManager connectionManager ,
103
117
int maxNumConnections , List <String > configuredAddresses ) {
104
118
this (clusterAlias , transportService , connectionManager , maxNumConnections , configuredAddresses ,
105
119
configuredAddresses .stream ().map (address ->
106
- (Supplier <TransportAddress >) () -> resolveAddress (address )).collect (Collectors .toList ()));
120
+ (Supplier <TransportAddress >) () -> resolveAddress (address )).collect (Collectors .toList ()), false );
107
121
}
108
122
109
123
SimpleConnectionStrategy (String clusterAlias , TransportService transportService , RemoteConnectionManager connectionManager ,
110
- int maxNumConnections , List <String > configuredAddresses , List <Supplier <TransportAddress >> addresses ) {
124
+ int maxNumConnections , List <String > configuredAddresses , boolean includeServerName ) {
125
+ this (clusterAlias , transportService , connectionManager , maxNumConnections , configuredAddresses ,
126
+ configuredAddresses .stream ().map (address ->
127
+ (Supplier <TransportAddress >) () -> resolveAddress (address )).collect (Collectors .toList ()), includeServerName );
128
+ }
129
+
130
+ SimpleConnectionStrategy (String clusterAlias , TransportService transportService , RemoteConnectionManager connectionManager ,
131
+ int maxNumConnections , List <String > configuredAddresses , List <Supplier <TransportAddress >> addresses ,
132
+ boolean includeServerName ) {
111
133
super (clusterAlias , transportService , connectionManager );
112
134
this .maxNumConnections = maxNumConnections ;
113
135
this .configuredAddresses = configuredAddresses ;
136
+ this .includeServerName = includeServerName ;
114
137
assert addresses .isEmpty () == false : "Cannot use simple connection strategy with no configured addresses" ;
115
138
this .addresses = addresses ;
116
139
// TODO: Move into the ConnectionManager
@@ -207,7 +230,14 @@ public void onFailure(Exception e) {
207
230
for (int i = 0 ; i < remaining ; ++i ) {
208
231
TransportAddress address = nextAddress (resolved );
209
232
String id = clusterAlias + "#" + address ;
210
- DiscoveryNode node = new DiscoveryNode (id , address , Version .CURRENT .minimumCompatibilityVersion ());
233
+ Map <String , String > attributes ;
234
+ if (includeServerName ) {
235
+ attributes = Collections .singletonMap ("server_name" , address .address ().getHostString ());
236
+ } else {
237
+ attributes = Collections .emptyMap ();
238
+ }
239
+ DiscoveryNode node = new DiscoveryNode (id , address , attributes , DiscoveryNodeRole .BUILT_IN_ROLES ,
240
+ Version .CURRENT .minimumCompatibilityVersion ());
211
241
212
242
connectionManager .connectToNode (node , profile , clusterNameValidator , new ActionListener <>() {
213
243
@ Override
@@ -243,7 +273,7 @@ private TransportAddress nextAddress(List<TransportAddress> resolvedAddresses) {
243
273
}
244
274
245
275
private static TransportAddress resolveAddress (String address ) {
246
- return new TransportAddress (parseSeedAddress (address ));
276
+ return new TransportAddress (parseConfiguredAddress (address ));
247
277
}
248
278
249
279
private boolean addressesChanged (final List <String > oldAddresses , final List <String > newAddresses ) {
0 commit comments