Skip to content

Update remote cluster stats to support simple mode #49961

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Dec 13, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions docs/reference/ccr/getting-started.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,8 @@ remote cluster.
"num_nodes_connected" : 1, <2>
"max_connections_per_cluster" : 3,
"initial_connect_timeout" : "30s",
"skip_unavailable" : false
"skip_unavailable" : false,
"mode" : "sniff"
}
}
--------------------------------------------------
Expand All @@ -146,7 +147,7 @@ remote cluster.
alias `leader`
<2> This shows the number of nodes in the remote cluster the local cluster is
connected to.

Alternatively, you can manage remote clusters on the
*Management / Elasticsearch / Remote Clusters* page in {kib}:

Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
---
"Fetch remote cluster info for existing cluster":
"Fetch remote cluster sniff info for existing cluster":

- do:
cluster.remote_info: {}
- match: { my_remote_cluster.connected: true }
- match: { my_remote_cluster.num_nodes_connected: 1}
- match: { my_remote_cluster.max_connections_per_cluster: 1}
- match: { my_remote_cluster.initial_connect_timeout: "30s" }
- match: { my_remote_cluster.mode: "sniff" }

---
"Add transient remote cluster based on the preset cluster and check remote info":
Expand All @@ -21,9 +22,13 @@
flat_settings: true
body:
transient:
cluster.remote.test_remote_cluster.seeds: $remote_ip
cluster.remote.test_remote_cluster.mode: "sniff"
cluster.remote.test_remote_cluster.sniff.node_connections: "2"
cluster.remote.test_remote_cluster.sniff.seeds: $remote_ip

- match: {transient: {cluster.remote.test_remote_cluster.seeds: $remote_ip}}
- match: {transient.cluster\.remote\.test_remote_cluster\.mode: "sniff"}
- match: {transient.cluster\.remote\.test_remote_cluster\.sniff\.node_connections: "2"}
- match: {transient.cluster\.remote\.test_remote_cluster\.sniff\.seeds: $remote_ip}

# we do another search here since this will enforce the connection to be established
# otherwise the cluster might not have been connected yet.
Expand All @@ -45,19 +50,49 @@
- match: { my_remote_cluster.seeds.0: $remote_ip }

- match: { my_remote_cluster.num_nodes_connected: 1}
- match: { test_remote_cluster.num_nodes_connected: 1}
- gt: { test_remote_cluster.num_nodes_connected: 0}

- match: { my_remote_cluster.max_connections_per_cluster: 1}
- match: { test_remote_cluster.max_connections_per_cluster: 1}
- match: { test_remote_cluster.max_connections_per_cluster: 2}

- match: { my_remote_cluster.initial_connect_timeout: "30s" }
- match: { test_remote_cluster.initial_connect_timeout: "30s" }

- match: { my_remote_cluster.mode: "sniff" }
- match: { test_remote_cluster.mode: "sniff" }

- do:
cluster.put_settings:
flat_settings: true
body:
transient:
cluster.remote.test_remote_cluster.mode: "simple"
cluster.remote.test_remote_cluster.sniff.seeds: null
cluster.remote.test_remote_cluster.sniff.node_connections: null
cluster.remote.test_remote_cluster.simple.socket_connections: "10"
cluster.remote.test_remote_cluster.simple.addresses: $remote_ip

- match: {transient.cluster\.remote\.test_remote_cluster\.mode: "simple"}
- match: {transient.cluster\.remote\.test_remote_cluster\.simple\.socket_connections: "10"}
- match: {transient.cluster\.remote\.test_remote_cluster\.simple\.addresses: $remote_ip}

- do:
cluster.remote_info: {}

- match: { test_remote_cluster.connected: true }
- match: { test_remote_cluster.addresses.0: $remote_ip }
- gt: { test_remote_cluster.num_sockets_connected: 0}
- match: { test_remote_cluster.max_socket_connections: 10}
- match: { test_remote_cluster.initial_connect_timeout: "30s" }
- match: { test_remote_cluster.mode: "simple" }

- do:
cluster.put_settings:
body:
transient:
cluster.remote.test_remote_cluster.seeds: null
cluster.remote.test_remote_cluster.mode: null
cluster.remote.test_remote_cluster.simple.socket_connections: null
cluster.remote.test_remote_cluster.simple.addresses: null

---
"skip_unavailable is returned as part of _remote/info response":
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@

import java.io.Closeable;
import java.io.IOException;
import java.util.Collections;
import java.util.function.Function;

/**
Expand Down Expand Up @@ -206,24 +205,7 @@ boolean isNodeConnected(final DiscoveryNode node) {
* Get the information about remote nodes to be rendered on {@code _remote/info} requests.
*/
public RemoteConnectionInfo getConnectionInfo() {
if (connectionStrategy instanceof SniffConnectionStrategy) {
SniffConnectionStrategy sniffStrategy = (SniffConnectionStrategy) this.connectionStrategy;
return new RemoteConnectionInfo(
clusterAlias,
sniffStrategy.getSeedNodes(),
sniffStrategy.getMaxConnections(),
getNumNodesConnected(),
initialConnectionTimeout,
skipUnavailable);
} else {
return new RemoteConnectionInfo(
clusterAlias,
Collections.emptyList(),
0,
getNumNodesConnected(),
initialConnectionTimeout,
skipUnavailable);
}
return new RemoteConnectionInfo(clusterAlias, connectionStrategy.getModeInfo(), initialConnectionTimeout, skipUnavailable);
}

int getNumNodesConnected() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.elasticsearch.transport;

import org.elasticsearch.Version;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
Expand All @@ -36,63 +37,67 @@
* {@code _remote/info} requests.
*/
public final class RemoteConnectionInfo implements ToXContentFragment, Writeable {
final List<String> seedNodes;
final int connectionsPerCluster;

final ModeInfo modeInfo;
final TimeValue initialConnectionTimeout;
final int numNodesConnected;
final String clusterAlias;
final boolean skipUnavailable;

RemoteConnectionInfo(String clusterAlias, List<String> seedNodes,
int connectionsPerCluster, int numNodesConnected,
TimeValue initialConnectionTimeout, boolean skipUnavailable) {
RemoteConnectionInfo(String clusterAlias, ModeInfo modeInfo, TimeValue initialConnectionTimeout, boolean skipUnavailable) {
this.clusterAlias = clusterAlias;
this.seedNodes = seedNodes;
this.connectionsPerCluster = connectionsPerCluster;
this.numNodesConnected = numNodesConnected;
this.modeInfo = modeInfo;
this.initialConnectionTimeout = initialConnectionTimeout;
this.skipUnavailable = skipUnavailable;
}

public RemoteConnectionInfo(StreamInput input) throws IOException {
seedNodes = Arrays.asList(input.readStringArray());
connectionsPerCluster = input.readVInt();
initialConnectionTimeout = input.readTimeValue();
numNodesConnected = input.readVInt();
clusterAlias = input.readString();
skipUnavailable = input.readBoolean();
}

public List<String> getSeedNodes() {
return seedNodes;
}

public int getConnectionsPerCluster() {
return connectionsPerCluster;
}

public TimeValue getInitialConnectionTimeout() {
return initialConnectionTimeout;
// TODO: Change to 7.6 after backport
if (input.getVersion().onOrAfter(Version.V_8_0_0)) {
RemoteConnectionStrategy.ConnectionStrategy mode = input.readEnum(RemoteConnectionStrategy.ConnectionStrategy.class);
modeInfo = mode.getReader().read(input);
initialConnectionTimeout = input.readTimeValue();
clusterAlias = input.readString();
skipUnavailable = input.readBoolean();
} else {
List<String> seedNodes = Arrays.asList(input.readStringArray());
int connectionsPerCluster = input.readVInt();
initialConnectionTimeout = input.readTimeValue();
int numNodesConnected = input.readVInt();
clusterAlias = input.readString();
skipUnavailable = input.readBoolean();
modeInfo = new SniffConnectionStrategy.SniffModeInfo(seedNodes, connectionsPerCluster, numNodesConnected);
}
}

public int getNumNodesConnected() {
return numNodesConnected;
public boolean isConnected() {
return modeInfo.isConnected();
}

public String getClusterAlias() {
return clusterAlias;
}

public boolean isSkipUnavailable() {
return skipUnavailable;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeStringArray(seedNodes.toArray(new String[0]));
out.writeVInt(connectionsPerCluster);
out.writeTimeValue(initialConnectionTimeout);
out.writeVInt(numNodesConnected);
// TODO: Change to 7.6 after backport
if (out.getVersion().onOrAfter(Version.V_8_0_0)) {
out.writeEnum(modeInfo.modeType());
modeInfo.writeTo(out);
out.writeTimeValue(initialConnectionTimeout);
} else {
if (modeInfo.modeType() == RemoteConnectionStrategy.ConnectionStrategy.SNIFF) {
SniffConnectionStrategy.SniffModeInfo sniffInfo = (SniffConnectionStrategy.SniffModeInfo) this.modeInfo;
out.writeStringArray(sniffInfo.seedNodes.toArray(new String[0]));
out.writeVInt(sniffInfo.maxConnectionsPerCluster);
out.writeTimeValue(initialConnectionTimeout);
out.writeVInt(sniffInfo.numNodesConnected);
} else {
out.writeStringArray(new String[0]);
out.writeVInt(0);
out.writeTimeValue(initialConnectionTimeout);
out.writeVInt(0);
}
}
out.writeString(clusterAlias);
out.writeBoolean(skipUnavailable);
}
Expand All @@ -101,14 +106,9 @@ public void writeTo(StreamOutput out) throws IOException {
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject(clusterAlias);
{
builder.startArray("seeds");
for (String addr : seedNodes) {
builder.value(addr);
}
builder.endArray();
builder.field("connected", numNodesConnected > 0);
builder.field("num_nodes_connected", numNodesConnected);
builder.field("max_connections_per_cluster", connectionsPerCluster);
builder.field("connected", modeInfo.isConnected());
builder.field("mode", modeInfo.modeName());
modeInfo.toXContent(builder, params);
builder.field("initial_connect_timeout", initialConnectionTimeout);
builder.field("skip_unavailable", skipUnavailable);
}
Expand All @@ -121,18 +121,23 @@ public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
RemoteConnectionInfo that = (RemoteConnectionInfo) o;
return connectionsPerCluster == that.connectionsPerCluster &&
numNodesConnected == that.numNodesConnected &&
Objects.equals(seedNodes, that.seedNodes) &&
return skipUnavailable == that.skipUnavailable &&
Objects.equals(modeInfo, that.modeInfo) &&
Objects.equals(initialConnectionTimeout, that.initialConnectionTimeout) &&
Objects.equals(clusterAlias, that.clusterAlias) &&
skipUnavailable == that.skipUnavailable;
Objects.equals(clusterAlias, that.clusterAlias);
}

@Override
public int hashCode() {
return Objects.hash(seedNodes, connectionsPerCluster, initialConnectionTimeout,
numNodesConnected, clusterAlias, skipUnavailable);
return Objects.hash(modeInfo, initialConnectionTimeout, clusterAlias, skipUnavailable);
}

public interface ModeInfo extends ToXContentFragment, Writeable {

boolean isConnected();

String modeName();

RemoteConnectionStrategy.ConnectionStrategy modeType();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.elasticsearch.action.support.ContextPreservingActionListener;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
Expand Down Expand Up @@ -57,20 +58,39 @@
public abstract class RemoteConnectionStrategy implements TransportConnectionListener, Closeable {

enum ConnectionStrategy {
SNIFF(SniffConnectionStrategy.CHANNELS_PER_CONNECTION, SniffConnectionStrategy::enablementSettings),
SIMPLE(SimpleConnectionStrategy.CHANNELS_PER_CONNECTION, SimpleConnectionStrategy::enablementSettings);
SNIFF(SniffConnectionStrategy.CHANNELS_PER_CONNECTION, SniffConnectionStrategy::enablementSettings,
SniffConnectionStrategy::infoReader) {
@Override
public String toString() {
return "sniff";
}
},
SIMPLE(SimpleConnectionStrategy.CHANNELS_PER_CONNECTION, SimpleConnectionStrategy::enablementSettings,
SimpleConnectionStrategy::infoReader) {
@Override
public String toString() {
return "simple";
}
};

private final int numberOfChannels;
private final Supplier<Stream<Setting.AffixSetting<?>>> enablementSettings;
private final Supplier<Writeable.Reader<RemoteConnectionInfo.ModeInfo>> reader;

ConnectionStrategy(int numberOfChannels, Supplier<Stream<Setting.AffixSetting<?>>> enablementSettings) {
ConnectionStrategy(int numberOfChannels, Supplier<Stream<Setting.AffixSetting<?>>> enablementSettings,
Supplier<Writeable.Reader<RemoteConnectionInfo.ModeInfo>> reader) {
this.numberOfChannels = numberOfChannels;
this.enablementSettings = enablementSettings;
this.reader = reader;
}

public int getNumberOfChannels() {
return numberOfChannels;
}

public Writeable.Reader<RemoteConnectionInfo.ModeInfo> getReader() {
return reader.get();
}
}

public static final Setting.AffixSetting<ConnectionStrategy> REMOTE_CONNECTION_MODE = Setting.affixKeySetting(
Expand Down Expand Up @@ -310,6 +330,8 @@ boolean assertNoRunningConnections() {

protected abstract void connectImpl(ActionListener<Void> listener);

protected abstract RemoteConnectionInfo.ModeInfo getModeInfo();

private List<ActionListener<Void>> getAndClearListeners() {
final List<ActionListener<Void>> result;
synchronized (mutex) {
Expand Down
Loading