Skip to content

Commit 86e6d18

Browse files
jasontedordavidkyle
authored andcommitted
Do not resolve addresses in remote connection info (#36671)
The remote connection info API leads to resolving addresses of seed nodes when invoked. This is problematic because if a hostname fails to resolve, we would not display any remote connection info. Yet, a hostname not resolving can happen across remote clusters, especially in the modern world of cloud services with dynamically chaning IPs. Instead, the remote connection info API should be providing the configured seed nodes. This commit changes the remote connection info to display the configured seed nodes, avoiding a hostname resolution. Note that care was taken to preserve backwards compatibility with previous versions that expect the remote connection info to serialize a transport address instead of a string representing the hostname.
1 parent e9bd724 commit 86e6d18

File tree

6 files changed

+188
-124
lines changed

6 files changed

+188
-124
lines changed

server/src/main/java/org/elasticsearch/transport/RemoteClusterAware.java

Lines changed: 19 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -183,10 +183,11 @@ protected RemoteClusterAware(Settings settings) {
183183
* (ProxyAddresss, [SeedNodeSuppliers]). If a cluster is configured with a proxy address all seed nodes will point to
184184
* {@link TransportAddress#META_ADDRESS} and their configured address will be used as the hostname for the generated discovery node.
185185
*/
186-
protected static Map<String, Tuple<String, List<Supplier<DiscoveryNode>>>> buildRemoteClustersDynamicConfig(Settings settings) {
187-
final Map<String, Tuple<String, List<Supplier<DiscoveryNode>>>> remoteSeeds =
186+
protected static Map<String, Tuple<String, List<Tuple<String, Supplier<DiscoveryNode>>>>> buildRemoteClustersDynamicConfig(
187+
final Settings settings) {
188+
final Map<String, Tuple<String, List<Tuple<String, Supplier<DiscoveryNode>>>>> remoteSeeds =
188189
buildRemoteClustersDynamicConfig(settings, REMOTE_CLUSTERS_SEEDS);
189-
final Map<String, Tuple<String, List<Supplier<DiscoveryNode>>>> searchRemoteSeeds =
190+
final Map<String, Tuple<String, List<Tuple<String, Supplier<DiscoveryNode>>>>> searchRemoteSeeds =
190191
buildRemoteClustersDynamicConfig(settings, SEARCH_REMOTE_CLUSTERS_SEEDS);
191192
// sort the intersection for predictable output order
192193
final NavigableSet<String> intersection =
@@ -205,7 +206,7 @@ protected static Map<String, Tuple<String, List<Supplier<DiscoveryNode>>>> build
205206
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
206207
}
207208

208-
private static Map<String, Tuple<String, List<Supplier<DiscoveryNode>>>> buildRemoteClustersDynamicConfig(
209+
private static Map<String, Tuple<String, List<Tuple<String, Supplier<DiscoveryNode>>>>> buildRemoteClustersDynamicConfig(
209210
final Settings settings, final Setting.AffixSetting<List<String>> seedsSetting) {
210211
final Stream<Setting<List<String>>> allConcreteSettings = seedsSetting.getAllConcreteSettings(settings);
211212
return allConcreteSettings.collect(
@@ -214,9 +215,9 @@ private static Map<String, Tuple<String, List<Supplier<DiscoveryNode>>>> buildRe
214215
List<String> addresses = concreteSetting.get(settings);
215216
final boolean proxyMode =
216217
REMOTE_CLUSTERS_PROXY.getConcreteSettingForNamespace(clusterName).existsOrFallbackExists(settings);
217-
List<Supplier<DiscoveryNode>> nodes = new ArrayList<>(addresses.size());
218+
List<Tuple<String, Supplier<DiscoveryNode>>> nodes = new ArrayList<>(addresses.size());
218219
for (String address : addresses) {
219-
nodes.add(() -> buildSeedNode(clusterName, address, proxyMode));
220+
nodes.add(Tuple.tuple(address, () -> buildSeedNode(clusterName, address, proxyMode)));
220221
}
221222
return new Tuple<>(REMOTE_CLUSTERS_PROXY.getConcreteSettingForNamespace(clusterName).get(settings), nodes);
222223
}));
@@ -304,16 +305,24 @@ public void listenForUpdates(ClusterSettings clusterSettings) {
304305
(namespace, value) -> {});
305306
}
306307

307-
308-
protected static InetSocketAddress parseSeedAddress(String remoteHost) {
309-
String host = remoteHost.substring(0, indexOfPortSeparator(remoteHost));
308+
static InetSocketAddress parseSeedAddress(String remoteHost) {
309+
final Tuple<String, Integer> hostPort = parseHostPort(remoteHost);
310+
final String host = hostPort.v1();
311+
assert hostPort.v2() != null : remoteHost;
312+
final int port = hostPort.v2();
310313
InetAddress hostAddress;
311314
try {
312315
hostAddress = InetAddress.getByName(host);
313316
} catch (UnknownHostException e) {
314317
throw new IllegalArgumentException("unknown host [" + host + "]", e);
315318
}
316-
return new InetSocketAddress(hostAddress, parsePort(remoteHost));
319+
return new InetSocketAddress(hostAddress, port);
320+
}
321+
322+
public static Tuple<String, Integer> parseHostPort(final String remoteHost) {
323+
final String host = remoteHost.substring(0, indexOfPortSeparator(remoteHost));
324+
final int port = parsePort(remoteHost);
325+
return Tuple.tuple(host, port);
317326
}
318327

319328
private static int parsePort(String remoteHost) {

server/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java

Lines changed: 16 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import org.elasticsearch.cluster.ClusterName;
3636
import org.elasticsearch.cluster.node.DiscoveryNode;
3737
import org.elasticsearch.cluster.node.DiscoveryNodes;
38+
import org.elasticsearch.common.collect.Tuple;
3839
import org.elasticsearch.common.io.stream.StreamInput;
3940
import org.elasticsearch.common.settings.Settings;
4041
import org.elasticsearch.common.transport.TransportAddress;
@@ -95,7 +96,7 @@ final class RemoteClusterConnection implements TransportConnectionListener, Clos
9596
private final Predicate<DiscoveryNode> nodePredicate;
9697
private final ThreadPool threadPool;
9798
private volatile String proxyAddress;
98-
private volatile List<Supplier<DiscoveryNode>> seedNodes;
99+
private volatile List<Tuple<String, Supplier<DiscoveryNode>>> seedNodes;
99100
private volatile boolean skipUnavailable;
100101
private final ConnectHandler connectHandler;
101102
private final TimeValue initialConnectionTimeout;
@@ -111,15 +112,15 @@ final class RemoteClusterConnection implements TransportConnectionListener, Clos
111112
* @param nodePredicate a predicate to filter eligible remote nodes to connect to
112113
* @param proxyAddress the proxy address
113114
*/
114-
RemoteClusterConnection(Settings settings, String clusterAlias, List<Supplier<DiscoveryNode>> seedNodes,
115+
RemoteClusterConnection(Settings settings, String clusterAlias, List<Tuple<String, Supplier<DiscoveryNode>>> seedNodes,
115116
TransportService transportService, int maxNumRemoteConnections, Predicate<DiscoveryNode> nodePredicate,
116117
String proxyAddress) {
117118
this(settings, clusterAlias, seedNodes, transportService, maxNumRemoteConnections, nodePredicate, proxyAddress,
118119
createConnectionManager(settings, clusterAlias, transportService));
119120
}
120121

121122
// Public for tests to pass a StubbableConnectionManager
122-
RemoteClusterConnection(Settings settings, String clusterAlias, List<Supplier<DiscoveryNode>> seedNodes,
123+
RemoteClusterConnection(Settings settings, String clusterAlias, List<Tuple<String, Supplier<DiscoveryNode>>> seedNodes,
123124
TransportService transportService, int maxNumRemoteConnections, Predicate<DiscoveryNode> nodePredicate,
124125
String proxyAddress, ConnectionManager connectionManager) {
125126
this.transportService = transportService;
@@ -155,7 +156,10 @@ private static DiscoveryNode maybeAddProxyAddress(String proxyAddress, Discovery
155156
/**
156157
* Updates the list of seed nodes for this cluster connection
157158
*/
158-
synchronized void updateSeedNodes(String proxyAddress, List<Supplier<DiscoveryNode>> seedNodes, ActionListener<Void> connectListener) {
159+
synchronized void updateSeedNodes(
160+
final String proxyAddress,
161+
final List<Tuple<String, Supplier<DiscoveryNode>>> seedNodes,
162+
final ActionListener<Void> connectListener) {
159163
this.seedNodes = Collections.unmodifiableList(new ArrayList<>(seedNodes));
160164
this.proxyAddress = proxyAddress;
161165
connectHandler.connect(connectListener);
@@ -465,7 +469,7 @@ protected void doRun() {
465469
maybeConnect();
466470
}
467471
});
468-
collectRemoteNodes(seedNodes.iterator(), transportService, connectionManager, listener);
472+
collectRemoteNodes(seedNodes.stream().map(Tuple::v2).iterator(), transportService, connectionManager, listener);
469473
}
470474
});
471475
}
@@ -672,10 +676,13 @@ void addConnectedNode(DiscoveryNode node) {
672676
* Get the information about remote nodes to be rendered on {@code _remote/info} requests.
673677
*/
674678
public RemoteConnectionInfo getConnectionInfo() {
675-
List<TransportAddress> seedNodeAddresses = seedNodes.stream().map(node -> node.get().getAddress()).collect
676-
(Collectors.toList());
677-
return new RemoteConnectionInfo(clusterAlias, seedNodeAddresses, maxNumRemoteConnections, connectedNodes.size(),
678-
initialConnectionTimeout, skipUnavailable);
679+
return new RemoteConnectionInfo(
680+
clusterAlias,
681+
seedNodes.stream().map(Tuple::v1).collect(Collectors.toList()),
682+
maxNumRemoteConnections,
683+
connectedNodes.size(),
684+
initialConnectionTimeout,
685+
skipUnavailable);
679686
}
680687

681688
int getNumNodesConnected() {

server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -201,7 +201,7 @@ public String getKey(final String key) {
201201
* @param seeds a cluster alias to discovery node mapping representing the remote clusters seeds nodes
202202
* @param connectionListener a listener invoked once every configured cluster has been connected to
203203
*/
204-
private synchronized void updateRemoteClusters(Map<String, Tuple<String, List<Supplier<DiscoveryNode>>>> seeds,
204+
private synchronized void updateRemoteClusters(Map<String, Tuple<String, List<Tuple<String, Supplier<DiscoveryNode>>>>> seeds,
205205
ActionListener<Void> connectionListener) {
206206
if (seeds.containsKey(LOCAL_CLUSTER_GROUP_KEY)) {
207207
throw new IllegalArgumentException("remote clusters must not have the empty string as its key");
@@ -212,8 +212,8 @@ private synchronized void updateRemoteClusters(Map<String, Tuple<String, List<Su
212212
} else {
213213
CountDown countDown = new CountDown(seeds.size());
214214
remoteClusters.putAll(this.remoteClusters);
215-
for (Map.Entry<String, Tuple<String, List<Supplier<DiscoveryNode>>>> entry : seeds.entrySet()) {
216-
List<Supplier<DiscoveryNode>> seedList = entry.getValue().v2();
215+
for (Map.Entry<String, Tuple<String, List<Tuple<String, Supplier<DiscoveryNode>>>>> entry : seeds.entrySet()) {
216+
List<Tuple<String, Supplier<DiscoveryNode>>> seedList = entry.getValue().v2();
217217
String proxyAddress = entry.getValue().v1();
218218

219219
RemoteClusterConnection remote = this.remoteClusters.get(entry.getKey());
@@ -408,9 +408,10 @@ void updateRemoteCluster(
408408
final List<String> addresses,
409409
final String proxyAddress,
410410
final ActionListener<Void> connectionListener) {
411-
final List<Supplier<DiscoveryNode>> nodes = addresses.stream().<Supplier<DiscoveryNode>>map(address -> () ->
412-
buildSeedNode(clusterAlias, address, Strings.hasLength(proxyAddress))
413-
).collect(Collectors.toList());
411+
final List<Tuple<String, Supplier<DiscoveryNode>>> nodes =
412+
addresses.stream().<Tuple<String, Supplier<DiscoveryNode>>>map(address -> Tuple.tuple(address, () ->
413+
buildSeedNode(clusterAlias, address, Strings.hasLength(proxyAddress)))
414+
).collect(Collectors.toList());
414415
updateRemoteClusters(Collections.singletonMap(clusterAlias, new Tuple<>(proxyAddress, nodes)), connectionListener);
415416
}
416417

@@ -421,7 +422,8 @@ void updateRemoteCluster(
421422
void initializeRemoteClusters() {
422423
final TimeValue timeValue = REMOTE_INITIAL_CONNECTION_TIMEOUT_SETTING.get(settings);
423424
final PlainActionFuture<Void> future = new PlainActionFuture<>();
424-
Map<String, Tuple<String, List<Supplier<DiscoveryNode>>>> seeds = RemoteClusterAware.buildRemoteClustersDynamicConfig(settings);
425+
Map<String, Tuple<String, List<Tuple<String, Supplier<DiscoveryNode>>>>> seeds =
426+
RemoteClusterAware.buildRemoteClustersDynamicConfig(settings);
425427
updateRemoteClusters(seeds, future);
426428
try {
427429
future.get(timeValue.millis(), TimeUnit.MILLISECONDS);

server/src/main/java/org/elasticsearch/transport/RemoteConnectionInfo.java

Lines changed: 44 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,11 @@
1616
* specific language governing permissions and limitations
1717
* under the License.
1818
*/
19+
1920
package org.elasticsearch.transport;
2021

2122
import org.elasticsearch.Version;
23+
import org.elasticsearch.common.collect.Tuple;
2224
import org.elasticsearch.common.io.stream.StreamInput;
2325
import org.elasticsearch.common.io.stream.StreamOutput;
2426
import org.elasticsearch.common.io.stream.Writeable;
@@ -27,25 +29,29 @@
2729
import org.elasticsearch.common.xcontent.ToXContentFragment;
2830
import org.elasticsearch.common.xcontent.XContentBuilder;
2931

30-
import static java.util.Collections.emptyList;
31-
3232
import java.io.IOException;
33+
import java.net.InetAddress;
34+
import java.net.UnknownHostException;
35+
import java.util.Arrays;
3336
import java.util.List;
3437
import java.util.Objects;
38+
import java.util.stream.Collectors;
39+
40+
import static java.util.Collections.emptyList;
3541

3642
/**
3743
* This class encapsulates all remote cluster information to be rendered on
3844
* {@code _remote/info} requests.
3945
*/
4046
public final class RemoteConnectionInfo implements ToXContentFragment, Writeable {
41-
final List<TransportAddress> seedNodes;
47+
final List<String> seedNodes;
4248
final int connectionsPerCluster;
4349
final TimeValue initialConnectionTimeout;
4450
final int numNodesConnected;
4551
final String clusterAlias;
4652
final boolean skipUnavailable;
4753

48-
RemoteConnectionInfo(String clusterAlias, List<TransportAddress> seedNodes,
54+
RemoteConnectionInfo(String clusterAlias, List<String> seedNodes,
4955
int connectionsPerCluster, int numNodesConnected,
5056
TimeValue initialConnectionTimeout, boolean skipUnavailable) {
5157
this.clusterAlias = clusterAlias;
@@ -57,7 +63,17 @@ public final class RemoteConnectionInfo implements ToXContentFragment, Writeable
5763
}
5864

5965
public RemoteConnectionInfo(StreamInput input) throws IOException {
60-
seedNodes = input.readList(TransportAddress::new);
66+
if (input.getVersion().onOrAfter(Version.V_7_0_0)) {
67+
seedNodes = Arrays.asList(input.readStringArray());
68+
} else {
69+
// versions prior to 7.0.0 sent the resolved transport address of the seed nodes
70+
final List<TransportAddress> transportAddresses = input.readList(TransportAddress::new);
71+
seedNodes =
72+
transportAddresses
73+
.stream()
74+
.map(a -> a.address().getHostString() + ":" + a.address().getPort())
75+
.collect(Collectors.toList());
76+
}
6177
if (input.getVersion().before(Version.V_7_0_0)) {
6278
/*
6379
* Versions before 7.0 sent the HTTP addresses of all nodes in the
@@ -78,7 +94,26 @@ public RemoteConnectionInfo(StreamInput input) throws IOException {
7894

7995
@Override
8096
public void writeTo(StreamOutput out) throws IOException {
81-
out.writeList(seedNodes);
97+
if (out.getVersion().onOrAfter(Version.V_7_0_0)) {
98+
out.writeStringArray(seedNodes.toArray(new String[0]));
99+
} else {
100+
// versions prior to 7.0.0 received the resolved transport address of the seed nodes
101+
out.writeList(seedNodes
102+
.stream()
103+
.map(
104+
s -> {
105+
final Tuple<String, Integer> hostPort = RemoteClusterAware.parseHostPort(s);
106+
assert hostPort.v2() != null : s;
107+
try {
108+
return new TransportAddress(
109+
InetAddress.getByAddress(hostPort.v1(), TransportAddress.META_ADDRESS.getAddress()),
110+
hostPort.v2());
111+
} catch (final UnknownHostException e) {
112+
throw new AssertionError(e);
113+
}
114+
})
115+
.collect(Collectors.toList()));
116+
}
82117
if (out.getVersion().before(Version.V_7_0_0)) {
83118
/*
84119
* Versions before 7.0 sent the HTTP addresses of all nodes in the
@@ -104,8 +139,8 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
104139
builder.startObject(clusterAlias);
105140
{
106141
builder.startArray("seeds");
107-
for (TransportAddress addr : seedNodes) {
108-
builder.value(addr.toString());
142+
for (String addr : seedNodes) {
143+
builder.value(addr);
109144
}
110145
builder.endArray();
111146
builder.field("connected", numNodesConnected > 0);
@@ -136,4 +171,5 @@ public int hashCode() {
136171
return Objects.hash(seedNodes, connectionsPerCluster, initialConnectionTimeout,
137172
numNodesConnected, clusterAlias, skipUnavailable);
138173
}
174+
139175
}

0 commit comments

Comments
 (0)