Skip to content

Ensure remote strategy settings can be updated #49812

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Dec 5, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,9 @@
import org.elasticsearch.common.util.concurrent.CountDown;

import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
Expand Down Expand Up @@ -74,6 +76,7 @@ public class SimpleConnectionStrategy extends RemoteConnectionStrategy {

private final int maxNumConnections;
private final AtomicLong counter = new AtomicLong(0);
private final List<String> configuredAddresses;
private final List<Supplier<TransportAddress>> addresses;
private final AtomicReference<ClusterName> remoteClusterName = new AtomicReference<>();
private final ConnectionProfile profile;
Expand All @@ -100,6 +103,7 @@ public class SimpleConnectionStrategy extends RemoteConnectionStrategy {
int maxNumConnections, List<String> configuredAddresses, List<Supplier<TransportAddress>> addresses) {
super(clusterAlias, transportService, connectionManager);
this.maxNumConnections = maxNumConnections;
this.configuredAddresses = configuredAddresses;
assert addresses.isEmpty() == false : "Cannot use simple connection strategy with no configured addresses";
this.addresses = addresses;
// TODO: Move into the ConnectionManager
Expand Down Expand Up @@ -134,7 +138,9 @@ protected boolean shouldOpenMoreConnections() {

@Override
protected boolean strategyMustBeRebuilt(Settings newSettings) {
return false;
List<String> addresses = REMOTE_CLUSTER_ADDRESSES.getConcreteSettingForNamespace(clusterAlias).get(newSettings);
int numOfSockets = REMOTE_SOCKET_CONNECTIONS.getConcreteSettingForNamespace(clusterAlias).get(newSettings);
return numOfSockets != maxNumConnections || addressesChanged(configuredAddresses, addresses);
}

@Override
Expand Down Expand Up @@ -223,4 +229,13 @@ private TransportAddress nextAddress(List<TransportAddress> resolvedAddresses) {
private static TransportAddress resolveAddress(String address) {
return new TransportAddress(parseSeedAddress(address));
}

private boolean addressesChanged(final List<String> oldAddresses, final List<String> newAddresses) {
if (oldAddresses.size() != newAddresses.size()) {
return true;
}
Set<String> oldSeeds = new HashSet<>(oldAddresses);
Set<String> newSeeds = new HashSet<>(newAddresses);
return oldSeeds.equals(newSeeds) == false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,9 @@ protected boolean shouldOpenMoreConnections() {
protected boolean strategyMustBeRebuilt(Settings newSettings) {
String proxy = REMOTE_CLUSTERS_PROXY.getConcreteSettingForNamespace(clusterAlias).get(newSettings);
List<String> addresses = REMOTE_CLUSTER_SEEDS.getConcreteSettingForNamespace(clusterAlias).get(newSettings);
return seedsChanged(configuredSeedNodes, addresses) || proxyChanged(proxyAddress, proxy);
int nodeConnections = REMOTE_NODE_CONNECTIONS.getConcreteSettingForNamespace(clusterAlias).get(newSettings);
return nodeConnections != maxNumRemoteConnections || seedsChanged(configuredSeedNodes, addresses) ||
proxyChanged(proxyAddress, proxy);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.elasticsearch.Version;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.settings.AbstractScopedSettings;
import org.elasticsearch.common.settings.ClusterSettings;
Expand Down Expand Up @@ -303,6 +304,59 @@ numOfConnections, addresses(address), Collections.singletonList(addressSupplier
}
}

public void testSimpleStrategyWillNeedToBeRebuiltIfNumOfSocketsOrAddressesChange() {
try (MockTransportService transport1 = startTransport("node1", Version.CURRENT);
MockTransportService transport2 = startTransport("node2", Version.CURRENT)) {
TransportAddress address1 = transport1.boundAddress().publishAddress();
TransportAddress address2 = transport2.boundAddress().publishAddress();

try (MockTransportService localService = MockTransportService.createNewService(Settings.EMPTY, Version.CURRENT, threadPool)) {
localService.start();
localService.acceptIncomingRequests();

ConnectionManager connectionManager = new ConnectionManager(profile, localService.transport);
int numOfConnections = randomIntBetween(4, 8);
try (RemoteConnectionManager remoteConnectionManager = new RemoteConnectionManager(clusterAlias, connectionManager);
SimpleConnectionStrategy strategy = new SimpleConnectionStrategy(clusterAlias, localService, remoteConnectionManager,
numOfConnections, addresses(address1, address2))) {
PlainActionFuture<Void> connectFuture = PlainActionFuture.newFuture();
strategy.connect(connectFuture);
connectFuture.actionGet();

assertTrue(connectionManager.getAllConnectedNodes().stream().anyMatch(n -> n.getAddress().equals(address1)));
assertTrue(connectionManager.getAllConnectedNodes().stream().anyMatch(n -> n.getAddress().equals(address2)));
assertEquals(numOfConnections, connectionManager.size());
assertTrue(strategy.assertNoRunningConnections());

Setting<?> modeSetting = RemoteConnectionStrategy.REMOTE_CONNECTION_MODE
.getConcreteSettingForNamespace("cluster-alias");
Setting<?> addressesSetting = SimpleConnectionStrategy.REMOTE_CLUSTER_ADDRESSES
.getConcreteSettingForNamespace("cluster-alias");
Setting<?> socketConnections = SimpleConnectionStrategy.REMOTE_SOCKET_CONNECTIONS
.getConcreteSettingForNamespace("cluster-alias");

Settings noChange = Settings.builder()
.put(modeSetting.getKey(), "simple")
.put(addressesSetting.getKey(), Strings.arrayToCommaDelimitedString(addresses(address1, address2).toArray()))
.put(socketConnections.getKey(), numOfConnections)
.build();
assertFalse(strategy.shouldRebuildConnection(noChange));
Settings addressesChanged = Settings.builder()
.put(modeSetting.getKey(), "simple")
.put(addressesSetting.getKey(), Strings.arrayToCommaDelimitedString(addresses(address1).toArray()))
.build();
assertTrue(strategy.shouldRebuildConnection(addressesChanged));
Settings socketsChanged = Settings.builder()
.put(modeSetting.getKey(), "simple")
.put(addressesSetting.getKey(), Strings.arrayToCommaDelimitedString(addresses(address1, address2).toArray()))
.put(socketConnections.getKey(), numOfConnections + 1)
.build();
assertTrue(strategy.shouldRebuildConnection(socketsChanged));
}
}
}
}

public void testModeSettingsCannotBeUsedWhenInDifferentMode() {
List<Tuple<Setting.AffixSetting<?>, String>> restrictedSettings = Arrays.asList(
new Tuple<>(SimpleConnectionStrategy.REMOTE_CLUSTER_ADDRESSES, "192.168.0.1:8080"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -487,7 +487,7 @@ public void testConfiguredProxyAddressModeWillReplaceNodeAddress() {
}
}

public void testSniffStrategyWillNeedToBeRebuiltIfSeedsOrProxyChange() {
public void testSniffStrategyWillNeedToBeRebuiltIfNumOfConnectionsOrSeedsOrProxyChange() {
List<DiscoveryNode> knownNodes = new CopyOnWriteArrayList<>();
try (MockTransportService seedTransport = startTransport("seed_node", knownNodes, Version.CURRENT);
MockTransportService discoverableTransport = startTransport("discoverable_node", knownNodes, Version.CURRENT)) {
Expand Down Expand Up @@ -516,9 +516,12 @@ public void testSniffStrategyWillNeedToBeRebuiltIfSeedsOrProxyChange() {

Setting<?> seedSetting = SniffConnectionStrategy.REMOTE_CLUSTER_SEEDS.getConcreteSettingForNamespace("cluster-alias");
Setting<?> proxySetting = SniffConnectionStrategy.REMOTE_CLUSTERS_PROXY.getConcreteSettingForNamespace("cluster-alias");
Setting<?> numConnections = SniffConnectionStrategy.REMOTE_NODE_CONNECTIONS
.getConcreteSettingForNamespace("cluster-alias");

Settings noChange = Settings.builder()
.put(seedSetting.getKey(), Strings.arrayToCommaDelimitedString(seedNodes(seedNode).toArray()))
.put(numConnections.getKey(), 3)
.build();
assertFalse(strategy.shouldRebuildConnection(noChange));
Settings seedsChanged = Settings.builder()
Expand All @@ -530,6 +533,11 @@ public void testSniffStrategyWillNeedToBeRebuiltIfSeedsOrProxyChange() {
.put(proxySetting.getKey(), "proxy_address:9300")
.build();
assertTrue(strategy.shouldRebuildConnection(proxyChanged));
Settings connectionsChanged = Settings.builder()
.put(seedSetting.getKey(), Strings.arrayToCommaDelimitedString(seedNodes(seedNode).toArray()))
.put(numConnections.getKey(), 4)
.build();
assertTrue(strategy.shouldRebuildConnection(connectionsChanged));
}
}
}
Expand Down