27
27
import org .elasticsearch .cluster .ClusterName ;
28
28
import org .elasticsearch .cluster .node .DiscoveryNode ;
29
29
import org .elasticsearch .cluster .node .DiscoveryNodeRole ;
30
+ import org .elasticsearch .common .Strings ;
30
31
import org .elasticsearch .common .io .stream .StreamInput ;
31
32
import org .elasticsearch .common .io .stream .StreamOutput ;
32
33
import org .elasticsearch .common .io .stream .Writeable ;
37
38
import org .elasticsearch .common .xcontent .XContentBuilder ;
38
39
39
40
import java .io .IOException ;
40
- import java .util .Arrays ;
41
41
import java .util .Collections ;
42
- import java .util .HashSet ;
43
42
import java .util .List ;
44
43
import java .util .Map ;
45
44
import java .util .Objects ;
46
- import java .util .Set ;
47
45
import java .util .concurrent .atomic .AtomicInteger ;
48
46
import java .util .concurrent .atomic .AtomicLong ;
49
47
import java .util .concurrent .atomic .AtomicReference ;
50
48
import java .util .function .Supplier ;
51
- import java .util .stream .Collectors ;
52
49
import java .util .stream .Stream ;
53
50
54
51
import static org .elasticsearch .common .settings .Setting .boolSetting ;
57
54
public class ProxyConnectionStrategy extends RemoteConnectionStrategy {
58
55
59
56
/**
60
- * A list of addresses for remote cluster connections. The connections will be opened to the configured addresses in a round-robin
61
- * fashion.
57
+ * The remote address for the proxy. The connections will be opened to the configured address.
62
58
*/
63
- public static final Setting .AffixSetting <List < String > > REMOTE_CLUSTER_ADDRESSES = Setting .affixKeySetting (
59
+ public static final Setting .AffixSetting <String > REMOTE_CLUSTER_ADDRESSES = Setting .affixKeySetting (
64
60
"cluster.remote." ,
65
- "proxy_addresses" ,
66
- (ns , key ) -> Setting .listSetting (key , Collections .emptyList (), s -> {
67
- // validate address
68
- parsePort (s );
69
- return s ;
70
- }, new StrategyValidator <>(ns , key , ConnectionStrategy .PROXY ),
71
- Setting .Property .Dynamic , Setting .Property .NodeScope ));
61
+ "proxy_address" ,
62
+ (ns , key ) -> Setting .simpleString (key , new StrategyValidator <>(ns , key , ConnectionStrategy .PROXY , s -> {
63
+ if (Strings .hasLength (s )) {
64
+ parsePort (s );
65
+ }
66
+ }), Setting .Property .Dynamic , Setting .Property .NodeScope ));
72
67
73
68
/**
74
69
* The maximum number of socket connections that will be established to a remote cluster. The default is 18.
@@ -95,9 +90,9 @@ public class ProxyConnectionStrategy extends RemoteConnectionStrategy {
95
90
96
91
private final int maxNumConnections ;
97
92
private final AtomicLong counter = new AtomicLong (0 );
98
- private final List < String > configuredAddresses ;
93
+ private final String configuredAddress ;
99
94
private final boolean includeServerName ;
100
- private final List < Supplier <TransportAddress >> addresses ;
95
+ private final Supplier <TransportAddress > address ;
101
96
private final AtomicReference <ClusterName > remoteClusterName = new AtomicReference <>();
102
97
private final ConnectionProfile profile ;
103
98
private final ConnectionManager .ConnectionValidator clusterNameValidator ;
@@ -114,28 +109,26 @@ public class ProxyConnectionStrategy extends RemoteConnectionStrategy {
114
109
}
115
110
116
111
ProxyConnectionStrategy (String clusterAlias , TransportService transportService , RemoteConnectionManager connectionManager ,
117
- int maxNumConnections , List <String > configuredAddresses ) {
118
- this (clusterAlias , transportService , connectionManager , maxNumConnections , configuredAddresses ,
119
- configuredAddresses .stream ().map (address ->
120
- (Supplier <TransportAddress >) () -> resolveAddress (address )).collect (Collectors .toList ()), false );
112
+ int maxNumConnections , String configuredAddress ) {
113
+ this (clusterAlias , transportService , connectionManager , maxNumConnections , configuredAddress ,
114
+ () -> resolveAddress (configuredAddress ), false );
121
115
}
122
116
123
117
ProxyConnectionStrategy (String clusterAlias , TransportService transportService , RemoteConnectionManager connectionManager ,
124
- int maxNumConnections , List <String > configuredAddresses , boolean includeServerName ) {
125
- this (clusterAlias , transportService , connectionManager , maxNumConnections , configuredAddresses ,
126
- configuredAddresses .stream ().map (address ->
127
- (Supplier <TransportAddress >) () -> resolveAddress (address )).collect (Collectors .toList ()), includeServerName );
118
+ int maxNumConnections , String configuredAddress , boolean includeServerName ) {
119
+ this (clusterAlias , transportService , connectionManager , maxNumConnections , configuredAddress ,
120
+ () -> resolveAddress (configuredAddress ), includeServerName );
128
121
}
129
122
130
123
ProxyConnectionStrategy (String clusterAlias , TransportService transportService , RemoteConnectionManager connectionManager ,
131
- int maxNumConnections , List < String > configuredAddresses , List < Supplier <TransportAddress >> addresses ,
124
+ int maxNumConnections , String configuredAddress , Supplier <TransportAddress > address ,
132
125
boolean includeServerName ) {
133
126
super (clusterAlias , transportService , connectionManager );
134
127
this .maxNumConnections = maxNumConnections ;
135
- this .configuredAddresses = configuredAddresses ;
128
+ this .configuredAddress = configuredAddress ;
136
129
this .includeServerName = includeServerName ;
137
- assert addresses .isEmpty () == false : "Cannot use proxy connection strategy with no configured addresses" ;
138
- this .addresses = addresses ;
130
+ assert Strings .isEmpty (configuredAddress ) == false : "Cannot use proxy connection strategy with no configured addresses" ;
131
+ this .address = address ;
139
132
// TODO: Move into the ConnectionManager
140
133
this .profile = new ConnectionProfile .Builder ()
141
134
.addConnections (1 , TransportRequestOptions .Type .REG , TransportRequestOptions .Type .PING )
@@ -172,9 +165,9 @@ protected boolean shouldOpenMoreConnections() {
172
165
173
166
@ Override
174
167
protected boolean strategyMustBeRebuilt (Settings newSettings ) {
175
- List < String > addresses = REMOTE_CLUSTER_ADDRESSES .getConcreteSettingForNamespace (clusterAlias ).get (newSettings );
168
+ String address = REMOTE_CLUSTER_ADDRESSES .getConcreteSettingForNamespace (clusterAlias ).get (newSettings );
176
169
int numOfSockets = REMOTE_SOCKET_CONNECTIONS .getConcreteSettingForNamespace (clusterAlias ).get (newSettings );
177
- return numOfSockets != maxNumConnections || addressesChanged ( configuredAddresses , addresses ) ;
170
+ return numOfSockets != maxNumConnections || configuredAddress . equals ( address ) == false ;
178
171
}
179
172
180
173
@ Override
@@ -189,7 +182,7 @@ protected void connectImpl(ActionListener<Void> listener) {
189
182
190
183
@ Override
191
184
public RemoteConnectionInfo .ModeInfo getModeInfo () {
192
- return new ProxyModeInfo (configuredAddresses , maxNumConnections , connectionManager .size ());
185
+ return new ProxyModeInfo (configuredAddress , maxNumConnections , connectionManager .size ());
193
186
}
194
187
195
188
private void performProxyConnectionProcess (ActionListener <Void > listener ) {
@@ -198,7 +191,7 @@ private void performProxyConnectionProcess(ActionListener<Void> listener) {
198
191
199
192
private void openConnections (ActionListener <Void > finished , int attemptNumber ) {
200
193
if (attemptNumber <= MAX_CONNECT_ATTEMPTS_PER_RUN ) {
201
- List < TransportAddress > resolved = addresses . stream (). map ( Supplier :: get ). collect ( Collectors . toList () );
194
+ TransportAddress resolved = address . get ( );
202
195
203
196
int remaining = maxNumConnections - connectionManager .size ();
204
197
ActionListener <Void > compositeListener = new ActionListener <>() {
@@ -228,15 +221,14 @@ public void onFailure(Exception e) {
228
221
229
222
230
223
for (int i = 0 ; i < remaining ; ++i ) {
231
- TransportAddress address = nextAddress (resolved );
232
- String id = clusterAlias + "#" + address ;
224
+ String id = clusterAlias + "#" + resolved ;
233
225
Map <String , String > attributes ;
234
226
if (includeServerName ) {
235
- attributes = Collections .singletonMap ("server_name" , address .address ().getHostString ());
227
+ attributes = Collections .singletonMap ("server_name" , resolved .address ().getHostString ());
236
228
} else {
237
229
attributes = Collections .emptyMap ();
238
230
}
239
- DiscoveryNode node = new DiscoveryNode (id , address , attributes , DiscoveryNodeRole .BUILT_IN_ROLES ,
231
+ DiscoveryNode node = new DiscoveryNode (id , resolved , attributes , DiscoveryNodeRole .BUILT_IN_ROLES ,
240
232
Version .CURRENT .minimumCompatibilityVersion ());
241
233
242
234
connectionManager .connectToNode (node , profile , clusterNameValidator , new ActionListener <>() {
@@ -248,7 +240,7 @@ public void onResponse(Void v) {
248
240
@ Override
249
241
public void onFailure (Exception e ) {
250
242
logger .debug (new ParameterizedMessage ("failed to open remote connection [remote cluster: {}, address: {}]" ,
251
- clusterAlias , address ), e );
243
+ clusterAlias , resolved ), e );
252
244
compositeListener .onFailure (e );
253
245
}
254
246
});
@@ -276,48 +268,35 @@ private static TransportAddress resolveAddress(String address) {
276
268
return new TransportAddress (parseConfiguredAddress (address ));
277
269
}
278
270
279
- private boolean addressesChanged (final List <String > oldAddresses , final List <String > newAddresses ) {
280
- if (oldAddresses .size () != newAddresses .size ()) {
281
- return true ;
282
- }
283
- Set <String > oldSeeds = new HashSet <>(oldAddresses );
284
- Set <String > newSeeds = new HashSet <>(newAddresses );
285
- return oldSeeds .equals (newSeeds ) == false ;
286
- }
287
-
288
271
static class ProxyModeInfo implements RemoteConnectionInfo .ModeInfo {
289
272
290
- private final List < String > addresses ;
273
+ private final String address ;
291
274
private final int maxSocketConnections ;
292
275
private final int numSocketsConnected ;
293
276
294
- ProxyModeInfo (List < String > addresses , int maxSocketConnections , int numSocketsConnected ) {
295
- this .addresses = addresses ;
277
+ ProxyModeInfo (String address , int maxSocketConnections , int numSocketsConnected ) {
278
+ this .address = address ;
296
279
this .maxSocketConnections = maxSocketConnections ;
297
280
this .numSocketsConnected = numSocketsConnected ;
298
281
}
299
282
300
283
private ProxyModeInfo (StreamInput input ) throws IOException {
301
- addresses = Arrays . asList ( input .readStringArray () );
284
+ address = input .readString ( );
302
285
maxSocketConnections = input .readVInt ();
303
286
numSocketsConnected = input .readVInt ();
304
287
}
305
288
306
289
@ Override
307
290
public XContentBuilder toXContent (XContentBuilder builder , Params params ) throws IOException {
308
- builder .startArray ("addresses" );
309
- for (String address : addresses ) {
310
- builder .value (address );
311
- }
312
- builder .endArray ();
291
+ builder .field ("address" , address );
313
292
builder .field ("num_sockets_connected" , numSocketsConnected );
314
293
builder .field ("max_socket_connections" , maxSocketConnections );
315
294
return builder ;
316
295
}
317
296
318
297
@ Override
319
298
public void writeTo (StreamOutput out ) throws IOException {
320
- out .writeStringArray ( addresses . toArray ( new String [ 0 ]) );
299
+ out .writeString ( address );
321
300
out .writeVInt (maxSocketConnections );
322
301
out .writeVInt (numSocketsConnected );
323
302
}
@@ -344,12 +323,12 @@ public boolean equals(Object o) {
344
323
ProxyModeInfo otherProxy = (ProxyModeInfo ) o ;
345
324
return maxSocketConnections == otherProxy .maxSocketConnections &&
346
325
numSocketsConnected == otherProxy .numSocketsConnected &&
347
- Objects .equals (addresses , otherProxy .addresses );
326
+ Objects .equals (address , otherProxy .address );
348
327
}
349
328
350
329
@ Override
351
330
public int hashCode () {
352
- return Objects .hash (addresses , maxSocketConnections , numSocketsConnected );
331
+ return Objects .hash (address , maxSocketConnections , numSocketsConnected );
353
332
}
354
333
}
355
334
}
0 commit comments