diff --git a/server/src/main/java/org/elasticsearch/action/ActionModule.java b/server/src/main/java/org/elasticsearch/action/ActionModule.java index 4e589f613a6fb..b5474a05ec26b 100644 --- a/server/src/main/java/org/elasticsearch/action/ActionModule.java +++ b/server/src/main/java/org/elasticsearch/action/ActionModule.java @@ -23,6 +23,10 @@ import org.apache.logging.log4j.LogManager; import org.elasticsearch.action.admin.cluster.allocation.ClusterAllocationExplainAction; import org.elasticsearch.action.admin.cluster.allocation.TransportClusterAllocationExplainAction; +import org.elasticsearch.action.admin.cluster.bootstrap.BootstrapClusterAction; +import org.elasticsearch.action.admin.cluster.bootstrap.GetDiscoveredNodesAction; +import org.elasticsearch.action.admin.cluster.bootstrap.TransportBootstrapClusterAction; +import org.elasticsearch.action.admin.cluster.bootstrap.TransportGetDiscoveredNodesAction; import org.elasticsearch.action.admin.cluster.health.ClusterHealthAction; import org.elasticsearch.action.admin.cluster.health.TransportClusterHealthAction; import org.elasticsearch.action.admin.cluster.node.hotthreads.NodesHotThreadsAction; @@ -422,6 +426,8 @@ public void reg actions.register(GetTaskAction.INSTANCE, TransportGetTaskAction.class); actions.register(CancelTasksAction.INSTANCE, TransportCancelTasksAction.class); + actions.register(GetDiscoveredNodesAction.INSTANCE, TransportGetDiscoveredNodesAction.class); + actions.register(BootstrapClusterAction.INSTANCE, TransportBootstrapClusterAction.class); actions.register(ClusterAllocationExplainAction.INSTANCE, TransportClusterAllocationExplainAction.class); actions.register(ClusterStatsAction.INSTANCE, TransportClusterStatsAction.class); actions.register(ClusterStateAction.INSTANCE, TransportClusterStateAction.class); diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/bootstrap/BootstrapClusterAction.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/bootstrap/BootstrapClusterAction.java new file mode 100644 index 0000000000000..d060efcc5a141 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/bootstrap/BootstrapClusterAction.java @@ -0,0 +1,41 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.action.admin.cluster.bootstrap; + +import org.elasticsearch.action.Action; +import org.elasticsearch.common.io.stream.Writeable.Reader; + +public class BootstrapClusterAction extends Action { + public static final BootstrapClusterAction INSTANCE = new BootstrapClusterAction(); + public static final String NAME = "cluster:admin/bootstrap_cluster"; + + private BootstrapClusterAction() { + super(NAME); + } + + @Override + public BootstrapClusterResponse newResponse() { + throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable"); + } + + @Override + public Reader getResponseReader() { + return BootstrapClusterResponse::new; + } +} diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/bootstrap/BootstrapClusterRequest.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/bootstrap/BootstrapClusterRequest.java new file mode 100644 index 0000000000000..f8d0bcb13a58f --- /dev/null +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/bootstrap/BootstrapClusterRequest.java @@ -0,0 +1,65 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.action.admin.cluster.bootstrap; + +import org.elasticsearch.action.ActionRequest; +import org.elasticsearch.action.ActionRequestValidationException; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; + +import java.io.IOException; + +/** + * Request to set the initial configuration of master-eligible nodes in a cluster so that the very first master election can take place. + */ +public class BootstrapClusterRequest extends ActionRequest { + private final BootstrapConfiguration bootstrapConfiguration; + + public BootstrapClusterRequest(BootstrapConfiguration bootstrapConfiguration) { + this.bootstrapConfiguration = bootstrapConfiguration; + } + + public BootstrapClusterRequest(StreamInput in) throws IOException { + super(in); + bootstrapConfiguration = new BootstrapConfiguration(in); + } + + /** + * @return the bootstrap configuration: the initial set of master-eligible nodes whose votes are counted in elections. + */ + public BootstrapConfiguration getBootstrapConfiguration() { + return bootstrapConfiguration; + } + + @Override + public ActionRequestValidationException validate() { + return null; + } + + @Override + public void readFrom(StreamInput in) throws IOException { + throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable"); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + bootstrapConfiguration.writeTo(out); + } +} diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/bootstrap/BootstrapClusterResponse.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/bootstrap/BootstrapClusterResponse.java new file mode 100644 index 0000000000000..2576409a3cef1 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/bootstrap/BootstrapClusterResponse.java @@ -0,0 +1,66 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.action.admin.cluster.bootstrap; + +import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; + +import java.io.IOException; + +/** + * Response to a {@link BootstrapClusterRequest} indicating that the cluster has been successfully bootstrapped. + */ +public class BootstrapClusterResponse extends ActionResponse { + private final boolean alreadyBootstrapped; + + public BootstrapClusterResponse(boolean alreadyBootstrapped) { + this.alreadyBootstrapped = alreadyBootstrapped; + } + + public BootstrapClusterResponse(StreamInput in) throws IOException { + super(in); + alreadyBootstrapped = in.readBoolean(); + } + + /** + * @return whether this node already knew that the cluster had been bootstrapped when handling this request. + */ + public boolean getAlreadyBootstrapped() { + return alreadyBootstrapped; + } + + @Override + public void readFrom(StreamInput in) throws IOException { + throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable"); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeBoolean(alreadyBootstrapped); + } + + @Override + public String toString() { + return "BootstrapClusterResponse{" + + "alreadyBootstrapped=" + alreadyBootstrapped + + '}'; + } +} diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/bootstrap/BootstrapConfiguration.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/bootstrap/BootstrapConfiguration.java new file mode 100644 index 0000000000000..f6207459c9616 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/bootstrap/BootstrapConfiguration.java @@ -0,0 +1,179 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.action.admin.cluster.bootstrap; + +import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.cluster.ClusterState.VotingConfiguration; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.common.Nullable; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.io.stream.Writeable; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Objects; +import java.util.Set; +import java.util.stream.Collectors; + +public class BootstrapConfiguration implements Writeable { + + private final List nodeDescriptions; + + public BootstrapConfiguration(List nodeDescriptions) { + if (nodeDescriptions.isEmpty()) { + throw new IllegalArgumentException("cannot create empty bootstrap configuration"); + } + this.nodeDescriptions = Collections.unmodifiableList(new ArrayList<>(nodeDescriptions)); + } + + public BootstrapConfiguration(StreamInput in) throws IOException { + nodeDescriptions = Collections.unmodifiableList(in.readList(NodeDescription::new)); + assert nodeDescriptions.isEmpty() == false; + } + + public List getNodeDescriptions() { + return nodeDescriptions; + } + + public VotingConfiguration resolve(Iterable discoveredNodes) { + final Set selectedNodes = new HashSet<>(); + for (final NodeDescription nodeDescription : nodeDescriptions) { + final DiscoveryNode discoveredNode = nodeDescription.resolve(discoveredNodes); + if (selectedNodes.add(discoveredNode) == false) { + throw new ElasticsearchException("multiple nodes matching {} in {}", discoveredNode, this); + } + } + + final Set nodeIds = selectedNodes.stream().map(DiscoveryNode::getId).collect(Collectors.toSet()); + assert nodeIds.size() == selectedNodes.size() : selectedNodes + " does not contain distinct IDs"; + return new VotingConfiguration(nodeIds); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeList(nodeDescriptions); + } + + @Override + public String toString() { + return "BootstrapConfiguration{" + + "nodeDescriptions=" + nodeDescriptions + + '}'; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + BootstrapConfiguration that = (BootstrapConfiguration) o; + return Objects.equals(nodeDescriptions, that.nodeDescriptions); + } + + @Override + public int hashCode() { + return Objects.hash(nodeDescriptions); + } + + public static class NodeDescription implements Writeable { + + @Nullable + private final String id; + + private final String name; + + @Nullable + public String getId() { + return id; + } + + public String getName() { + return name; + } + + public NodeDescription(@Nullable String id, String name) { + this.id = id; + this.name = Objects.requireNonNull(name); + } + + public NodeDescription(DiscoveryNode discoveryNode) { + this(discoveryNode.getId(), discoveryNode.getName()); + } + + public NodeDescription(StreamInput in) throws IOException { + this(in.readOptionalString(), in.readString()); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeOptionalString(id); + out.writeString(name); + } + + @Override + public String toString() { + return "NodeDescription{" + + "id='" + id + '\'' + + ", name='" + name + '\'' + + '}'; + } + + public DiscoveryNode resolve(Iterable discoveredNodes) { + DiscoveryNode selectedNode = null; + for (final DiscoveryNode discoveredNode : discoveredNodes) { + assert discoveredNode.isMasterNode() : discoveredNode; + if (discoveredNode.getName().equals(name)) { + if (id == null || id.equals(discoveredNode.getId())) { + if (selectedNode != null) { + throw new ElasticsearchException( + "discovered multiple nodes matching {} in {}", this, discoveredNodes); + } + selectedNode = discoveredNode; + } else { + throw new ElasticsearchException("node id mismatch comparing {} to {}", this, discoveredNode); + } + } else if (id != null && id.equals(discoveredNode.getId())) { + throw new ElasticsearchException("node name mismatch comparing {} to {}", this, discoveredNode); + } + } + if (selectedNode == null) { + throw new ElasticsearchException("no node matching {} found in {}", this, discoveredNodes); + } + + return selectedNode; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + NodeDescription that = (NodeDescription) o; + return Objects.equals(id, that.id) && + Objects.equals(name, that.name); + } + + @Override + public int hashCode() { + return Objects.hash(id, name); + } + } +} diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/bootstrap/GetDiscoveredNodesAction.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/bootstrap/GetDiscoveredNodesAction.java new file mode 100644 index 0000000000000..0a3ab72e115cc --- /dev/null +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/bootstrap/GetDiscoveredNodesAction.java @@ -0,0 +1,41 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.action.admin.cluster.bootstrap; + +import org.elasticsearch.action.Action; +import org.elasticsearch.common.io.stream.Writeable.Reader; + +public class GetDiscoveredNodesAction extends Action { + public static final GetDiscoveredNodesAction INSTANCE = new GetDiscoveredNodesAction(); + public static final String NAME = "cluster:monitor/discovered_nodes"; + + private GetDiscoveredNodesAction() { + super(NAME); + } + + @Override + public GetDiscoveredNodesResponse newResponse() { + throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable"); + } + + @Override + public Reader getResponseReader() { + return GetDiscoveredNodesResponse::new; + } +} diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/bootstrap/GetDiscoveredNodesRequest.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/bootstrap/GetDiscoveredNodesRequest.java new file mode 100644 index 0000000000000..f81061e854759 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/bootstrap/GetDiscoveredNodesRequest.java @@ -0,0 +1,118 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.action.admin.cluster.bootstrap; + +import org.elasticsearch.action.ActionRequest; +import org.elasticsearch.action.ActionRequestValidationException; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.unit.TimeValue; + +import java.io.IOException; + +/** + * Request the set of master-eligible nodes discovered by this node. Most useful in a brand-new cluster as a precursor to setting the + * initial configuration using {@link BootstrapClusterRequest}. + */ +public class GetDiscoveredNodesRequest extends ActionRequest { + + private int waitForNodes = 1; + private TimeValue timeout = TimeValue.timeValueSeconds(30); + + public GetDiscoveredNodesRequest() { + } + + public GetDiscoveredNodesRequest(StreamInput in) throws IOException { + super(in); + waitForNodes = in.readInt(); + timeout = in.readTimeValue(); + } + + /** + * Sometimes it is useful only to receive a successful response after discovering a certain number of master-eligible nodes. This + * parameter controls this behaviour. + * + * @param waitForNodes the minimum number of nodes to have discovered before this request will receive a successful response. Must + * be at least 1, because we always discover the local node. + */ + public void setWaitForNodes(int waitForNodes) { + if (waitForNodes < 1) { + throw new IllegalArgumentException("always finds at least one node, waiting for [" + waitForNodes + "] is not allowed"); + } + this.waitForNodes = waitForNodes; + } + + /** + * Sometimes it is useful only to receive a successful response after discovering a certain number of master-eligible nodes. This + * parameter controls this behaviour. + * + * @return the minimum number of nodes to have discovered before this request will receive a successful response. + */ + public int getWaitForNodes() { + return waitForNodes; + } + + /** + * Sometimes it is useful to wait until enough nodes have been discovered, rather than failing immediately. This parameter controls how + * long to wait, and defaults to 30s. + * + * @param timeout how long to wait to discover sufficiently many nodes to respond successfully. + */ + public void setTimeout(TimeValue timeout) { + if (timeout.compareTo(TimeValue.ZERO) < 0) { + throw new IllegalArgumentException("negative timeout of [" + timeout + "] is not allowed"); + } + this.timeout = timeout; + } + + /** + * Sometimes it is useful to wait until enough nodes have been discovered, rather than failing immediately. This parameter controls how + * long to wait, and defaults to 30s. + * + * @return how long to wait to discover sufficiently many nodes to respond successfully. + */ + public TimeValue getTimeout() { + return timeout; + } + + @Override + public ActionRequestValidationException validate() { + return null; + } + + @Override + public void readFrom(StreamInput in) throws IOException { + throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable"); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeInt(waitForNodes); + out.writeTimeValue(timeout); + } + + @Override + public String toString() { + return "GetDiscoveredNodesRequest{" + + "waitForNodes=" + waitForNodes + + ", timeout=" + timeout + + '}'; + } +} diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/bootstrap/GetDiscoveredNodesResponse.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/bootstrap/GetDiscoveredNodesResponse.java new file mode 100644 index 0000000000000..f1174002b6998 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/bootstrap/GetDiscoveredNodesResponse.java @@ -0,0 +1,73 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.action.admin.cluster.bootstrap; + +import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.action.admin.cluster.bootstrap.BootstrapConfiguration.NodeDescription; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; + +import java.io.IOException; +import java.util.Collections; +import java.util.HashSet; +import java.util.Set; +import java.util.stream.Collectors; + +/** + * Response to {@link GetDiscoveredNodesRequest}, containing the set of master-eligible nodes that were discovered. + */ +public class GetDiscoveredNodesResponse extends ActionResponse { + private final Set nodes; + + public GetDiscoveredNodesResponse(Set nodes) { + this.nodes = Collections.unmodifiableSet(new HashSet<>(nodes)); + } + + public GetDiscoveredNodesResponse(StreamInput in) throws IOException { + super(in); + nodes = Collections.unmodifiableSet(in.readSet(DiscoveryNode::new)); + } + + /** + * @return the set of nodes that were discovered. + */ + public Set getNodes() { + return nodes; + } + + /** + * @return a bootstrap configuration constructed from the set of nodes that were discovered, in order to make a + * {@link BootstrapClusterRequest}. + */ + public BootstrapConfiguration getBootstrapConfiguration() { + return new BootstrapConfiguration(nodes.stream().map(NodeDescription::new).collect(Collectors.toList())); + } + + @Override + public void readFrom(StreamInput in) throws IOException { + throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable"); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeCollection(nodes, (o, v) -> v.writeTo(o)); + } +} diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/bootstrap/TransportBootstrapClusterAction.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/bootstrap/TransportBootstrapClusterAction.java new file mode 100644 index 0000000000000..32a9f39cc0db8 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/bootstrap/TransportBootstrapClusterAction.java @@ -0,0 +1,87 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.action.admin.cluster.bootstrap; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.HandledTransportAction; +import org.elasticsearch.cluster.coordination.Coordinator; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.common.Nullable; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.concurrent.AbstractRunnable; +import org.elasticsearch.discovery.Discovery; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.transport.TransportService; + +import static org.elasticsearch.discovery.DiscoveryModule.DISCOVERY_TYPE_SETTING; + +public class TransportBootstrapClusterAction extends HandledTransportAction { + + @Nullable // TODO make this not nullable + private final Coordinator coordinator; + private final TransportService transportService; + private final String discoveryType; + + @Inject + public TransportBootstrapClusterAction(Settings settings, ActionFilters actionFilters, TransportService transportService, + Discovery discovery) { + super(BootstrapClusterAction.NAME, transportService, actionFilters, BootstrapClusterRequest::new); + this.transportService = transportService; + this.discoveryType = DISCOVERY_TYPE_SETTING.get(settings); + if (discovery instanceof Coordinator) { + coordinator = (Coordinator) discovery; + } else { + coordinator = null; + } + } + + @Override + protected void doExecute(Task task, BootstrapClusterRequest request, ActionListener listener) { + if (coordinator == null) { // TODO remove when not nullable + throw new IllegalArgumentException("cluster bootstrapping is not supported by discovery type [" + discoveryType + "]"); + } + + final DiscoveryNode localNode = transportService.getLocalNode(); + assert localNode != null; + if (localNode.isMasterNode() == false) { + throw new IllegalArgumentException( + "this node is not master-eligible, but cluster bootstrapping can only happen on a master-eligible node"); + } + + transportService.getThreadPool().generic().execute(new AbstractRunnable() { + @Override + public void doRun() { + listener.onResponse(new BootstrapClusterResponse( + coordinator.setInitialConfiguration(request.getBootstrapConfiguration()) == false)); + } + + @Override + public void onFailure(Exception e) { + listener.onFailure(e); + } + + @Override + public String toString() { + return "setting initial configuration with " + request; + } + }); + } +} diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/bootstrap/TransportGetDiscoveredNodesAction.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/bootstrap/TransportGetDiscoveredNodesAction.java new file mode 100644 index 0000000000000..a45d7c3246fbc --- /dev/null +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/bootstrap/TransportGetDiscoveredNodesAction.java @@ -0,0 +1,125 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.action.admin.cluster.bootstrap; + +import org.elasticsearch.ElasticsearchTimeoutException; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.HandledTransportAction; +import org.elasticsearch.cluster.coordination.Coordinator; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.common.Nullable; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.io.stream.Writeable.Reader; +import org.elasticsearch.common.lease.Releasable; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.concurrent.EsExecutors; +import org.elasticsearch.common.util.concurrent.ListenableFuture; +import org.elasticsearch.discovery.Discovery; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.threadpool.ThreadPool.Names; +import org.elasticsearch.transport.TransportService; + +import java.util.LinkedHashSet; +import java.util.Set; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Consumer; + +import static org.elasticsearch.discovery.DiscoveryModule.DISCOVERY_TYPE_SETTING; + +public class TransportGetDiscoveredNodesAction extends HandledTransportAction { + + @Nullable // TODO make this not nullable + private final Coordinator coordinator; + private final TransportService transportService; + private final String discoveryType; + + @Inject + public TransportGetDiscoveredNodesAction(Settings settings, ActionFilters actionFilters, TransportService transportService, + Discovery discovery) { + super(GetDiscoveredNodesAction.NAME, transportService, actionFilters, + (Reader) GetDiscoveredNodesRequest::new); + + this.discoveryType = DISCOVERY_TYPE_SETTING.get(settings); + this.transportService = transportService; + if (discovery instanceof Coordinator) { + coordinator = (Coordinator) discovery; + } else { + coordinator = null; + } + } + + @Override + protected void doExecute(Task task, GetDiscoveredNodesRequest request, ActionListener listener) { + if (coordinator == null) { // TODO remove when not nullable + throw new IllegalArgumentException("discovered nodes are not exposed by discovery type [" + discoveryType + "]"); + } + + final DiscoveryNode localNode = transportService.getLocalNode(); + assert localNode != null; + if (localNode.isMasterNode() == false) { + throw new IllegalArgumentException( + "this node is not master-eligible, but discovered nodes are only exposed by master-eligible nodes"); + } + final ExecutorService directExecutor = EsExecutors.newDirectExecutorService(); + final AtomicBoolean listenerNotified = new AtomicBoolean(); + final ListenableFuture listenableFuture = new ListenableFuture<>(); + final ThreadPool threadPool = transportService.getThreadPool(); + listenableFuture.addListener(listener, directExecutor, threadPool.getThreadContext()); + // TODO make it so that listenableFuture copes with multiple completions, and then remove listenerNotified + + final Consumer> respondIfRequestSatisfied = new Consumer>() { + @Override + public void accept(Iterable nodes) { + final Set nodesSet = new LinkedHashSet<>(); + nodesSet.add(localNode); + nodes.forEach(nodesSet::add); + logger.trace("discovered {}", nodesSet); + if (nodesSet.size() >= request.getWaitForNodes() && listenerNotified.compareAndSet(false, true)) { + listenableFuture.onResponse(new GetDiscoveredNodesResponse(nodesSet)); + } + } + + @Override + public String toString() { + return "waiting for " + request; + } + }; + + final Releasable releasable = coordinator.withDiscoveryListener(respondIfRequestSatisfied); + listenableFuture.addListener(ActionListener.wrap(releasable::close), directExecutor, threadPool.getThreadContext()); + respondIfRequestSatisfied.accept(coordinator.getFoundPeers()); + + threadPool.schedule(request.getTimeout(), Names.SAME, new Runnable() { + @Override + public void run() { + if (listenerNotified.compareAndSet(false, true)) { + listenableFuture.onFailure(new ElasticsearchTimeoutException("timed out while waiting for " + request)); + } + } + + @Override + public String toString() { + return "timeout handler for " + request; + } + }); + } +} diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java b/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java index ba8f727d8ab00..ca5191e1bba8e 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java @@ -21,6 +21,7 @@ import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.lucene.util.SetOnce; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.admin.cluster.bootstrap.BootstrapConfiguration; import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; @@ -67,11 +68,13 @@ import java.util.Random; import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Consumer; import java.util.function.Supplier; import java.util.stream.Collectors; import java.util.stream.StreamSupport; import static java.util.Collections.emptySet; +import static org.elasticsearch.common.util.concurrent.ConcurrentCollections.newConcurrentSet; import static org.elasticsearch.discovery.DiscoverySettings.NO_MASTER_BLOCK_WRITES; import static org.elasticsearch.gateway.GatewayService.STATE_NOT_RECOVERED_BLOCK; @@ -116,6 +119,8 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery private JoinHelper.JoinAccumulator joinAccumulator; private Optional currentPublication = Optional.empty(); + private final Set>> discoveredNodesListeners = newConcurrentSet(); + public Coordinator(String nodeName, Settings settings, ClusterSettings clusterSettings, TransportService transportService, AllocationService allocationService, MasterService masterService, Supplier persistedStateSupplier, UnicastHostsProvider unicastHostsProvider, @@ -564,13 +569,40 @@ public void invariant() { } } - public void setInitialConfiguration(final VotingConfiguration votingConfiguration) { + public boolean isInitialConfigurationSet() { + return getStateForMasterService().getLastAcceptedConfiguration().isEmpty() == false; + } + + /** + * Sets the initial configuration by resolving the given {@link BootstrapConfiguration} to concrete nodes. This method is safe to call + * more than once, as long as each call's bootstrap configuration resolves to the same set of nodes. + * + * @param bootstrapConfiguration A description of the nodes that should form the initial configuration. + * @return whether this call successfully set the initial configuration - if false, the cluster has already been bootstrapped. + */ + public boolean setInitialConfiguration(final BootstrapConfiguration bootstrapConfiguration) { + final List selfAndDiscoveredPeers = new ArrayList<>(); + selfAndDiscoveredPeers.add(getLocalNode()); + getFoundPeers().forEach(selfAndDiscoveredPeers::add); + final VotingConfiguration votingConfiguration = bootstrapConfiguration.resolve(selfAndDiscoveredPeers); + return setInitialConfiguration(votingConfiguration); + } + + /** + * Sets the initial configuration to the given {@link VotingConfiguration}. This method is safe to call + * more than once, as long as the argument to each call is the same. + * + * @param votingConfiguration The nodes that should form the initial configuration. + * @return whether this call successfully set the initial configuration - if false, the cluster has already been bootstrapped. + */ + public boolean setInitialConfiguration(final VotingConfiguration votingConfiguration) { synchronized (mutex) { final ClusterState currentState = getStateForMasterService(); - if (currentState.getLastAcceptedConfiguration().isEmpty() == false) { - throw new CoordinationStateRejectedException("Cannot set initial configuration: configuration has already been set"); + if (isInitialConfigurationSet()) { + return false; } + assert currentState.term() == 0 : currentState; assert currentState.version() == 0 : currentState; @@ -597,6 +629,7 @@ public void setInitialConfiguration(final VotingConfiguration votingConfiguratio coordinationState.get().setInitialState(builder.build()); preVoteCollector.update(getPreVoteResponse(), null); // pick up the change to last-accepted version startElectionScheduler(); + return true; } } @@ -835,10 +868,12 @@ protected void onActiveMasterFound(DiscoveryNode masterNode, long term) { @Override protected void onFoundPeersUpdated() { + final Iterable foundPeers; synchronized (mutex) { + foundPeers = getFoundPeers(); if (mode == Mode.CANDIDATE) { final CoordinationState.VoteCollection expectedVotes = new CoordinationState.VoteCollection(); - getFoundPeers().forEach(expectedVotes::addVote); + foundPeers.forEach(expectedVotes::addVote); expectedVotes.addVote(Coordinator.this.getLocalNode()); final ClusterState lastAcceptedState = coordinationState.get().getLastAcceptedState(); final boolean foundQuorum = CoordinationState.isElectionQuorum(expectedVotes, lastAcceptedState); @@ -852,6 +887,10 @@ protected void onFoundPeersUpdated() { } } } + + for (Consumer> discoveredNodesListener : discoveredNodesListeners) { + discoveredNodesListener.accept(foundPeers); + } } } @@ -879,6 +918,19 @@ public String toString() { }); } + public Releasable withDiscoveryListener(Consumer> listener) { + discoveredNodesListeners.add(listener); + return () -> { + boolean removed = discoveredNodesListeners.remove(listener); + assert removed : listener; + }; + } + + public Iterable getFoundPeers() { + // TODO everyone takes this and adds the local node. Maybe just add the local node here? + return peerFinder.getFoundPeers(); + } + class CoordinatorPublication extends Publication { private final PublishRequest publishRequest; diff --git a/server/src/test/java/org/elasticsearch/action/admin/cluster/bootstrap/BootstrapClusterRequestTests.java b/server/src/test/java/org/elasticsearch/action/admin/cluster/bootstrap/BootstrapClusterRequestTests.java new file mode 100644 index 0000000000000..ee9c58413b350 --- /dev/null +++ b/server/src/test/java/org/elasticsearch/action/admin/cluster/bootstrap/BootstrapClusterRequestTests.java @@ -0,0 +1,39 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.action.admin.cluster.bootstrap; + +import org.elasticsearch.action.admin.cluster.bootstrap.BootstrapConfiguration.NodeDescription; +import org.elasticsearch.test.ESTestCase; + +import java.io.IOException; +import java.util.Collections; + +import static org.hamcrest.Matchers.equalTo; + +public class BootstrapClusterRequestTests extends ESTestCase { + + public void testSerialization() throws IOException { + final BootstrapConfiguration bootstrapConfiguration + = new BootstrapConfiguration(Collections.singletonList(new NodeDescription(null, randomAlphaOfLength(10)))); + final BootstrapClusterRequest original = new BootstrapClusterRequest(bootstrapConfiguration); + assertNull(original.validate()); + final BootstrapClusterRequest deserialized = copyWriteable(original, writableRegistry(), BootstrapClusterRequest::new); + assertThat(deserialized.getBootstrapConfiguration(), equalTo(bootstrapConfiguration)); + } +} diff --git a/server/src/test/java/org/elasticsearch/action/admin/cluster/bootstrap/BootstrapClusterResponseTests.java b/server/src/test/java/org/elasticsearch/action/admin/cluster/bootstrap/BootstrapClusterResponseTests.java new file mode 100644 index 0000000000000..fb33dbc5fcbd6 --- /dev/null +++ b/server/src/test/java/org/elasticsearch/action/admin/cluster/bootstrap/BootstrapClusterResponseTests.java @@ -0,0 +1,33 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.action.admin.cluster.bootstrap; + +import org.elasticsearch.test.ESTestCase; + +import java.io.IOException; + +import static org.hamcrest.Matchers.equalTo; + +public class BootstrapClusterResponseTests extends ESTestCase { + public void testSerialization() throws IOException { + final BootstrapClusterResponse original = new BootstrapClusterResponse(randomBoolean()); + final BootstrapClusterResponse deserialized = copyWriteable(original, writableRegistry(), BootstrapClusterResponse::new); + assertThat(deserialized.getAlreadyBootstrapped(), equalTo(original.getAlreadyBootstrapped())); + } +} diff --git a/server/src/test/java/org/elasticsearch/action/admin/cluster/bootstrap/BootstrapConfigurationTests.java b/server/src/test/java/org/elasticsearch/action/admin/cluster/bootstrap/BootstrapConfigurationTests.java new file mode 100644 index 0000000000000..f39026e6db456 --- /dev/null +++ b/server/src/test/java/org/elasticsearch/action/admin/cluster/bootstrap/BootstrapConfigurationTests.java @@ -0,0 +1,178 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.action.admin.cluster.bootstrap; + +import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.Version; +import org.elasticsearch.action.admin.cluster.bootstrap.BootstrapConfiguration.NodeDescription; +import org.elasticsearch.cluster.ClusterState.VotingConfiguration; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.node.DiscoveryNode.Role; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.test.EqualsHashCodeTestUtils; + +import java.util.ArrayList; +import java.util.List; +import java.util.stream.Collectors; + +import static java.util.Collections.emptyMap; +import static java.util.Collections.singleton; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.startsWith; + +public class BootstrapConfigurationTests extends ESTestCase { + + public void testEqualsHashcodeSerialization() { + EqualsHashCodeTestUtils.checkEqualsAndHashCode(randomBootstrapConfiguration(), + bootstrapConfiguration -> copyWriteable(bootstrapConfiguration, writableRegistry(), BootstrapConfiguration::new), this::mutate); + } + + public void testNodeDescriptionResolvedByName() { + final List discoveryNodes = randomDiscoveryNodes(); + final DiscoveryNode expectedNode = randomFrom(discoveryNodes); + assertThat(new NodeDescription(null, expectedNode.getName()).resolve(discoveryNodes), equalTo(expectedNode)); + } + + public void testNodeDescriptionResolvedByIdAndName() { + final List discoveryNodes = randomDiscoveryNodes(); + final DiscoveryNode expectedNode = randomFrom(discoveryNodes); + assertThat(new NodeDescription(expectedNode).resolve(discoveryNodes), equalTo(expectedNode)); + } + + public void testRejectsMismatchedId() { + final List discoveryNodes = randomDiscoveryNodes(); + final DiscoveryNode expectedNode = randomFrom(discoveryNodes); + final ElasticsearchException e = expectThrows(ElasticsearchException.class, + () -> new NodeDescription(randomAlphaOfLength(11), expectedNode.getName()).resolve(discoveryNodes)); + assertThat(e.getMessage(), startsWith("node id mismatch comparing ")); + } + + public void testRejectsMismatchedName() { + final List discoveryNodes = randomDiscoveryNodes(); + final DiscoveryNode expectedNode = randomFrom(discoveryNodes); + final ElasticsearchException e = expectThrows(ElasticsearchException.class, + () -> new NodeDescription(expectedNode.getId(), randomAlphaOfLength(11)).resolve(discoveryNodes)); + assertThat(e.getMessage(), startsWith("node name mismatch comparing ")); + } + + public void testFailsIfNoMatch() { + final List discoveryNodes = randomDiscoveryNodes(); + final ElasticsearchException e = expectThrows(ElasticsearchException.class, + () -> randomNodeDescription().resolve(discoveryNodes)); + assertThat(e.getMessage(), startsWith("no node matching ")); + } + + public void testFailsIfDuplicateMatchOnName() { + final List discoveryNodes = randomDiscoveryNodes(); + final DiscoveryNode discoveryNode = randomFrom(discoveryNodes); + discoveryNodes.add(new DiscoveryNode(discoveryNode.getName(), randomAlphaOfLength(10), buildNewFakeTransportAddress(), emptyMap(), + singleton(Role.MASTER), Version.CURRENT)); + final ElasticsearchException e = expectThrows(ElasticsearchException.class, + () -> new NodeDescription(null, discoveryNode.getName()).resolve(discoveryNodes)); + assertThat(e.getMessage(), startsWith("discovered multiple nodes matching ")); + } + + public void testFailsIfDuplicatedNode() { + final List discoveryNodes = randomDiscoveryNodes(); + final DiscoveryNode discoveryNode = randomFrom(discoveryNodes); + discoveryNodes.add(discoveryNode); + final ElasticsearchException e = expectThrows(ElasticsearchException.class, + () -> new NodeDescription(discoveryNode).resolve(discoveryNodes)); + assertThat(e.getMessage(), startsWith("discovered multiple nodes matching ")); + } + + public void testResolvesEntireConfiguration() { + final List discoveryNodes = randomDiscoveryNodes(); + final List selectedNodes = randomSubsetOf(randomIntBetween(1, discoveryNodes.size()), discoveryNodes); + final BootstrapConfiguration bootstrapConfiguration = new BootstrapConfiguration(selectedNodes.stream() + .map(discoveryNode -> randomBoolean() ? new NodeDescription(discoveryNode) : new NodeDescription(null, discoveryNode.getName())) + .collect(Collectors.toList())); + + final VotingConfiguration expectedConfiguration + = new VotingConfiguration(selectedNodes.stream().map(DiscoveryNode::getId).collect(Collectors.toSet())); + final VotingConfiguration votingConfiguration = bootstrapConfiguration.resolve(discoveryNodes); + assertThat(votingConfiguration, equalTo(expectedConfiguration)); + } + + public void testRejectsDuplicatedDescriptions() { + final List discoveryNodes = randomDiscoveryNodes(); + final List selectedNodes = randomSubsetOf(randomIntBetween(1, discoveryNodes.size()), discoveryNodes); + final List selectedNodeDescriptions = selectedNodes.stream() + .map(discoveryNode -> randomBoolean() ? new NodeDescription(discoveryNode) : new NodeDescription(null, discoveryNode.getName())) + .collect(Collectors.toList()); + final NodeDescription toDuplicate = randomFrom(selectedNodeDescriptions); + selectedNodeDescriptions.add(randomBoolean() ? toDuplicate : new NodeDescription(null, toDuplicate.getName())); + final BootstrapConfiguration bootstrapConfiguration = new BootstrapConfiguration(selectedNodeDescriptions); + + final ElasticsearchException e = expectThrows(ElasticsearchException.class, () -> bootstrapConfiguration.resolve(discoveryNodes)); + assertThat(e.getMessage(), startsWith("multiple nodes matching ")); + } + + private NodeDescription mutate(NodeDescription original) { + if (randomBoolean()) { + return new NodeDescription(original.getId(), randomAlphaOfLength(21 - original.getName().length())); + } else { + if (original.getId() == null) { + return new NodeDescription(randomAlphaOfLength(10), original.getName()); + } else if (randomBoolean()) { + return new NodeDescription(randomAlphaOfLength(21 - original.getId().length()), original.getName()); + } else { + return new NodeDescription(null, original.getName()); + } + } + } + + protected BootstrapConfiguration mutate(BootstrapConfiguration original) { + final List newDescriptions = new ArrayList<>(original.getNodeDescriptions()); + final int mutateElement = randomIntBetween(0, newDescriptions.size()); + if (mutateElement == newDescriptions.size()) { + newDescriptions.add(randomIntBetween(0, newDescriptions.size()), randomNodeDescription()); + } else { + if (newDescriptions.size() > 1 && randomBoolean()) { + newDescriptions.remove(mutateElement); + } else { + newDescriptions.set(mutateElement, mutate(newDescriptions.get(mutateElement))); + } + } + return new BootstrapConfiguration(newDescriptions); + } + + protected NodeDescription randomNodeDescription() { + return new NodeDescription(randomBoolean() ? null : randomAlphaOfLength(10), randomAlphaOfLength(10)); + } + + protected BootstrapConfiguration randomBootstrapConfiguration() { + final int size = randomIntBetween(1, 5); + final List nodeDescriptions = new ArrayList<>(size); + while (nodeDescriptions.size() <= size) { + nodeDescriptions.add(randomNodeDescription()); + } + return new BootstrapConfiguration(nodeDescriptions); + } + + protected List randomDiscoveryNodes() { + final int size = randomIntBetween(1, 5); + final List nodes = new ArrayList<>(size); + for (int i = 0; i < size; i++) { + nodes.add(new DiscoveryNode(randomAlphaOfLength(10), randomAlphaOfLength(10), buildNewFakeTransportAddress(), emptyMap(), + singleton(Role.MASTER), Version.CURRENT)); + } + return nodes; + } +} diff --git a/server/src/test/java/org/elasticsearch/action/admin/cluster/bootstrap/GetDiscoveredNodesRequestTests.java b/server/src/test/java/org/elasticsearch/action/admin/cluster/bootstrap/GetDiscoveredNodesRequestTests.java new file mode 100644 index 0000000000000..9ce53a93efaff --- /dev/null +++ b/server/src/test/java/org/elasticsearch/action/admin/cluster/bootstrap/GetDiscoveredNodesRequestTests.java @@ -0,0 +1,77 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.action.admin.cluster.bootstrap; + +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.test.ESTestCase; + +import java.io.IOException; + +import static org.hamcrest.Matchers.endsWith; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.startsWith; +import static org.hamcrest.core.Is.is; + +public class GetDiscoveredNodesRequestTests extends ESTestCase { + + public void testWaitForNodesValidation() { + final GetDiscoveredNodesRequest getDiscoveredNodesRequest = new GetDiscoveredNodesRequest(); + assertThat("default value is 1", getDiscoveredNodesRequest.getWaitForNodes(), is(1)); + + final int newWaitForNodes = randomIntBetween(1, 10); + getDiscoveredNodesRequest.setWaitForNodes(newWaitForNodes); + assertThat("value updated", getDiscoveredNodesRequest.getWaitForNodes(), is(newWaitForNodes)); + + final IllegalArgumentException exception + = expectThrows(IllegalArgumentException.class, () -> getDiscoveredNodesRequest.setWaitForNodes(randomIntBetween(-10, 0))); + assertThat(exception.getMessage(), startsWith("always finds at least one node, waiting for ")); + assertThat(exception.getMessage(), endsWith(" is not allowed")); + } + + public void testTimeoutValidation() { + final GetDiscoveredNodesRequest getDiscoveredNodesRequest = new GetDiscoveredNodesRequest(); + assertThat("default value is 30s", getDiscoveredNodesRequest.getTimeout(), is(TimeValue.timeValueSeconds(30))); + + final TimeValue newTimeout = TimeValue.parseTimeValue(randomTimeValue(), "timeout"); + getDiscoveredNodesRequest.setTimeout(newTimeout); + assertThat("value updated", getDiscoveredNodesRequest.getTimeout(), equalTo(newTimeout)); + + final IllegalArgumentException exception = expectThrows(IllegalArgumentException.class, + () -> getDiscoveredNodesRequest.setTimeout(TimeValue.timeValueNanos(randomLongBetween(-10, -1)))); + assertThat(exception.getMessage(), startsWith("negative timeout of ")); + assertThat(exception.getMessage(), endsWith(" is not allowed")); + } + + public void testSerialization() throws IOException { + final GetDiscoveredNodesRequest originalRequest = new GetDiscoveredNodesRequest(); + + if (randomBoolean()) { + originalRequest.setWaitForNodes(randomIntBetween(1, 10)); + } + + if (randomBoolean()) { + originalRequest.setTimeout(TimeValue.parseTimeValue(randomTimeValue(), "timeout")); + } + + final GetDiscoveredNodesRequest deserialized = copyWriteable(originalRequest, writableRegistry(), GetDiscoveredNodesRequest::new); + + assertThat(deserialized.getWaitForNodes(), equalTo(originalRequest.getWaitForNodes())); + assertThat(deserialized.getTimeout(), equalTo(originalRequest.getTimeout())); + } +} diff --git a/server/src/test/java/org/elasticsearch/action/admin/cluster/bootstrap/GetDiscoveredNodesResponseTests.java b/server/src/test/java/org/elasticsearch/action/admin/cluster/bootstrap/GetDiscoveredNodesResponseTests.java new file mode 100644 index 0000000000000..7d2fc602e66c6 --- /dev/null +++ b/server/src/test/java/org/elasticsearch/action/admin/cluster/bootstrap/GetDiscoveredNodesResponseTests.java @@ -0,0 +1,59 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.action.admin.cluster.bootstrap; + +import org.elasticsearch.Version; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.node.DiscoveryNode.Role; +import org.elasticsearch.common.UUIDs; +import org.elasticsearch.test.ESTestCase; + +import java.io.IOException; +import java.util.HashSet; +import java.util.Set; +import java.util.stream.Collectors; + +import static java.util.Collections.emptyMap; +import static java.util.Collections.singleton; +import static org.hamcrest.Matchers.equalTo; + +public class GetDiscoveredNodesResponseTests extends ESTestCase { + public void testSerialization() throws IOException { + final GetDiscoveredNodesResponse original = new GetDiscoveredNodesResponse(randomDiscoveryNodeSet()); + final GetDiscoveredNodesResponse deserialized = copyWriteable(original, writableRegistry(), GetDiscoveredNodesResponse::new); + assertThat(deserialized.getNodes(), equalTo(original.getNodes())); + } + + private Set randomDiscoveryNodeSet() { + final int size = randomIntBetween(1, 10); + final Set nodes = new HashSet<>(size); + while (nodes.size() < size) { + assertTrue(nodes.add(new DiscoveryNode(randomAlphaOfLength(10), randomAlphaOfLength(10), + UUIDs.randomBase64UUID(random()), randomAlphaOfLength(10), randomAlphaOfLength(10), buildNewFakeTransportAddress(), + emptyMap(), singleton(Role.MASTER), Version.CURRENT))); + } + return nodes; + } + + public void testConversionToBootstrapConfiguration() { + final Set nodes = randomDiscoveryNodeSet(); + assertThat(new GetDiscoveredNodesResponse(nodes).getBootstrapConfiguration().resolve(nodes).getNodeIds(), + equalTo(nodes.stream().map(DiscoveryNode::getId).collect(Collectors.toSet()))); + } +} diff --git a/server/src/test/java/org/elasticsearch/action/admin/cluster/bootstrap/TransportBootstrapClusterActionTests.java b/server/src/test/java/org/elasticsearch/action/admin/cluster/bootstrap/TransportBootstrapClusterActionTests.java new file mode 100644 index 0000000000000..743bc5374ed07 --- /dev/null +++ b/server/src/test/java/org/elasticsearch/action/admin/cluster/bootstrap/TransportBootstrapClusterActionTests.java @@ -0,0 +1,226 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.action.admin.cluster.bootstrap; + +import org.elasticsearch.Version; +import org.elasticsearch.action.admin.cluster.bootstrap.BootstrapConfiguration.NodeDescription; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.cluster.ClusterName; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.ESAllocationTestCase; +import org.elasticsearch.cluster.coordination.Coordinator; +import org.elasticsearch.cluster.coordination.InMemoryPersistedState; +import org.elasticsearch.cluster.coordination.NoOpClusterApplier; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.service.MasterService; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.settings.ClusterSettings; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.discovery.Discovery; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.test.transport.MockTransport; +import org.elasticsearch.threadpool.TestThreadPool; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.threadpool.ThreadPool.Names; +import org.elasticsearch.transport.TransportException; +import org.elasticsearch.transport.TransportResponseHandler; +import org.elasticsearch.transport.TransportService; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; + +import java.io.IOException; +import java.util.Random; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import static java.util.Collections.emptyList; +import static java.util.Collections.emptyMap; +import static java.util.Collections.emptySet; +import static java.util.Collections.singletonList; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.instanceOf; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verifyZeroInteractions; + +public class TransportBootstrapClusterActionTests extends ESTestCase { + + private final ActionFilters EMPTY_FILTERS = new ActionFilters(emptySet()); + private DiscoveryNode discoveryNode; + private static ThreadPool threadPool; + private TransportService transportService; + private Coordinator coordinator; + + private static BootstrapClusterRequest exampleRequest() { + return new BootstrapClusterRequest(new BootstrapConfiguration(singletonList(new NodeDescription("id", "name")))); + } + + @BeforeClass + public static void createThreadPool() { + threadPool = new TestThreadPool("test", Settings.EMPTY); + } + + @AfterClass + public static void shutdownThreadPool() { + threadPool.shutdown(); + } + + @Before + public void setupTest() { + discoveryNode = new DiscoveryNode("local", buildNewFakeTransportAddress(), Version.CURRENT); + final MockTransport transport = new MockTransport(); + transportService = transport.createTransportService(Settings.EMPTY, threadPool, + TransportService.NOOP_TRANSPORT_INTERCEPTOR, boundTransportAddress -> discoveryNode, null, emptySet()); + + final ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + coordinator = new Coordinator("local", Settings.EMPTY, clusterSettings, transportService, + ESAllocationTestCase.createAllocationService(Settings.EMPTY), + new MasterService("local", Settings.EMPTY, threadPool), + () -> new InMemoryPersistedState(0, ClusterState.builder(new ClusterName("cluster")).build()), r -> emptyList(), + new NoOpClusterApplier(), new Random(random().nextLong())); + } + + public void testHandlesNonstandardDiscoveryImplementation() throws InterruptedException { + final Discovery discovery = mock(Discovery.class); + verifyZeroInteractions(discovery); + + new TransportBootstrapClusterAction(Settings.EMPTY, EMPTY_FILTERS, transportService, discovery); // registers action + transportService.start(); + transportService.acceptIncomingRequests(); + + final CountDownLatch countDownLatch = new CountDownLatch(1); + transportService.sendRequest(discoveryNode, BootstrapClusterAction.NAME, exampleRequest(), new ResponseHandler() { + @Override + public void handleResponse(BootstrapClusterResponse response) { + throw new AssertionError("should not be called"); + } + + @Override + public void handleException(TransportException exp) { + final Throwable rootCause = exp.getRootCause(); + assertThat(rootCause, instanceOf(IllegalArgumentException.class)); + assertThat(rootCause.getMessage(), equalTo("cluster bootstrapping is not supported by discovery type [zen]")); + countDownLatch.countDown(); + } + }); + + assertTrue(countDownLatch.await(10, TimeUnit.SECONDS)); + } + + public void testFailsOnNonMasterEligibleNodes() throws InterruptedException { + discoveryNode = new DiscoveryNode("local", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT); + // transport service only picks up local node when started, so we can change it here ^ + + new TransportBootstrapClusterAction(Settings.EMPTY, EMPTY_FILTERS, transportService, coordinator); // registers action + transportService.start(); + transportService.acceptIncomingRequests(); + coordinator.start(); + + final CountDownLatch countDownLatch = new CountDownLatch(1); + transportService.sendRequest(discoveryNode, BootstrapClusterAction.NAME, exampleRequest(), new ResponseHandler() { + @Override + public void handleResponse(BootstrapClusterResponse response) { + throw new AssertionError("should not be called"); + } + + @Override + public void handleException(TransportException exp) { + final Throwable rootCause = exp.getRootCause(); + assertThat(rootCause, instanceOf(IllegalArgumentException.class)); + assertThat(rootCause.getMessage(), + equalTo("this node is not master-eligible, but cluster bootstrapping can only happen on a master-eligible node")); + countDownLatch.countDown(); + } + }); + + assertTrue(countDownLatch.await(10, TimeUnit.SECONDS)); + } + + public void testSetsInitialConfiguration() throws InterruptedException { + new TransportBootstrapClusterAction(Settings.EMPTY, EMPTY_FILTERS, transportService, coordinator); // registers action + transportService.start(); + transportService.acceptIncomingRequests(); + coordinator.start(); + coordinator.startInitialJoin(); + + assertFalse(coordinator.isInitialConfigurationSet()); + + final BootstrapClusterRequest request + = new BootstrapClusterRequest(new BootstrapConfiguration(singletonList(new NodeDescription(discoveryNode)))); + + { + final int parallelRequests = 10; + final CountDownLatch countDownLatch = new CountDownLatch(parallelRequests); + final AtomicInteger successes = new AtomicInteger(); + + for (int i = 0; i < parallelRequests; i++) { + transportService.sendRequest(discoveryNode, BootstrapClusterAction.NAME, request, new ResponseHandler() { + @Override + public void handleResponse(BootstrapClusterResponse response) { + if (response.getAlreadyBootstrapped() == false) { + successes.incrementAndGet(); + } + countDownLatch.countDown(); + } + + @Override + public void handleException(TransportException exp) { + throw new AssertionError("should not be called", exp); + } + }); + } + + assertTrue(countDownLatch.await(10, TimeUnit.SECONDS)); + assertThat(successes.get(), equalTo(1)); + } + + assertTrue(coordinator.isInitialConfigurationSet()); + + { + final CountDownLatch countDownLatch = new CountDownLatch(1); + transportService.sendRequest(discoveryNode, BootstrapClusterAction.NAME, request, new ResponseHandler() { + @Override + public void handleResponse(BootstrapClusterResponse response) { + assertTrue(response.getAlreadyBootstrapped()); + countDownLatch.countDown(); + } + + @Override + public void handleException(TransportException exp) { + throw new AssertionError("should not be called", exp); + } + }); + + assertTrue(countDownLatch.await(10, TimeUnit.SECONDS)); + } + } + + private abstract class ResponseHandler implements TransportResponseHandler { + @Override + public String executor() { + return Names.SAME; + } + + @Override + public BootstrapClusterResponse read(StreamInput in) throws IOException { + return new BootstrapClusterResponse(in); + } + } +} diff --git a/server/src/test/java/org/elasticsearch/action/admin/cluster/bootstrap/TransportGetDiscoveredNodesActionTests.java b/server/src/test/java/org/elasticsearch/action/admin/cluster/bootstrap/TransportGetDiscoveredNodesActionTests.java new file mode 100644 index 0000000000000..0545728969b6b --- /dev/null +++ b/server/src/test/java/org/elasticsearch/action/admin/cluster/bootstrap/TransportGetDiscoveredNodesActionTests.java @@ -0,0 +1,281 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.action.admin.cluster.bootstrap; + +import org.elasticsearch.ElasticsearchTimeoutException; +import org.elasticsearch.Version; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.cluster.ClusterName; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.ESAllocationTestCase; +import org.elasticsearch.cluster.coordination.Coordinator; +import org.elasticsearch.cluster.coordination.InMemoryPersistedState; +import org.elasticsearch.cluster.coordination.NoOpClusterApplier; +import org.elasticsearch.cluster.coordination.PeersResponse; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.service.MasterService; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.settings.ClusterSettings; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.discovery.Discovery; +import org.elasticsearch.discovery.PeersRequest; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.test.transport.MockTransport; +import org.elasticsearch.threadpool.TestThreadPool; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.threadpool.ThreadPool.Names; +import org.elasticsearch.transport.TransportException; +import org.elasticsearch.transport.TransportRequest; +import org.elasticsearch.transport.TransportResponseHandler; +import org.elasticsearch.transport.TransportService; +import org.elasticsearch.transport.TransportService.HandshakeResponse; +import org.junit.After; +import org.junit.Before; + +import java.io.IOException; +import java.util.Random; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import static java.util.Collections.emptyList; +import static java.util.Collections.emptyMap; +import static java.util.Collections.emptySet; +import static org.elasticsearch.cluster.ClusterName.CLUSTER_NAME_SETTING; +import static org.elasticsearch.discovery.PeerFinder.REQUEST_PEERS_ACTION_NAME; +import static org.elasticsearch.transport.TransportService.HANDSHAKE_ACTION_NAME; +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.instanceOf; +import static org.hamcrest.Matchers.startsWith; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verifyZeroInteractions; + +public class TransportGetDiscoveredNodesActionTests extends ESTestCase { + + private final ActionFilters EMPTY_FILTERS = new ActionFilters(emptySet()); + private DiscoveryNode localNode; + private ThreadPool threadPool; + private String clusterName; + private TransportService transportService; + private Coordinator coordinator; + private DiscoveryNode otherNode; + + @Before + public void setupTest() { + clusterName = randomAlphaOfLength(10); + localNode = new DiscoveryNode("local", buildNewFakeTransportAddress(), Version.CURRENT); + otherNode = new DiscoveryNode("other", buildNewFakeTransportAddress(), Version.CURRENT); + + final MockTransport transport = new MockTransport() { + @Override + protected void onSendRequest(long requestId, String action, TransportRequest request, DiscoveryNode node) { + if (action.equals(HANDSHAKE_ACTION_NAME) && node.getAddress().equals(otherNode.getAddress())) { + handleResponse(requestId, new HandshakeResponse(otherNode, new ClusterName(clusterName), Version.CURRENT)); + } + } + }; + threadPool = new TestThreadPool("test", Settings.EMPTY); + transportService = transport.createTransportService( + Settings.builder().put(CLUSTER_NAME_SETTING.getKey(), clusterName).build(), threadPool, + TransportService.NOOP_TRANSPORT_INTERCEPTOR, boundTransportAddress -> localNode, null, emptySet()); + + final ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + coordinator = new Coordinator("local", Settings.EMPTY, clusterSettings, transportService, + ESAllocationTestCase.createAllocationService(Settings.EMPTY), + new MasterService("local", Settings.EMPTY, threadPool), + () -> new InMemoryPersistedState(0, ClusterState.builder(new ClusterName(clusterName)).build()), r -> emptyList(), + new NoOpClusterApplier(), new Random(random().nextLong())); + } + + @After + public void cleanUp() { + threadPool.shutdown(); + } + + public void testHandlesNonstandardDiscoveryImplementation() throws InterruptedException { + final Discovery discovery = mock(Discovery.class); + verifyZeroInteractions(discovery); + + new TransportGetDiscoveredNodesAction(Settings.EMPTY, EMPTY_FILTERS, transportService, discovery); // registers action + + transportService.start(); + transportService.acceptIncomingRequests(); + + final CountDownLatch countDownLatch = new CountDownLatch(1); + transportService.sendRequest(localNode, GetDiscoveredNodesAction.NAME, new GetDiscoveredNodesRequest(), new ResponseHandler() { + @Override + public void handleResponse(GetDiscoveredNodesResponse response) { + throw new AssertionError("should not be called"); + } + + @Override + public void handleException(TransportException exp) { + final Throwable rootCause = exp.getRootCause(); + assertThat(rootCause, instanceOf(IllegalArgumentException.class)); + assertThat(rootCause.getMessage(), equalTo("discovered nodes are not exposed by discovery type [zen]")); + countDownLatch.countDown(); + } + }); + + assertTrue(countDownLatch.await(10, TimeUnit.SECONDS)); + } + + public void testFailsOnNonMasterEligibleNodes() throws InterruptedException { + localNode = new DiscoveryNode("local", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT); + // transport service only picks up local node when started, so we can change it here ^ + + new TransportGetDiscoveredNodesAction(Settings.EMPTY, EMPTY_FILTERS, transportService, coordinator); // registers action + transportService.start(); + transportService.acceptIncomingRequests(); + coordinator.start(); + + final CountDownLatch countDownLatch = new CountDownLatch(1); + transportService.sendRequest(localNode, GetDiscoveredNodesAction.NAME, new GetDiscoveredNodesRequest(), new ResponseHandler() { + @Override + public void handleResponse(GetDiscoveredNodesResponse response) { + throw new AssertionError("should not be called"); + } + + @Override + public void handleException(TransportException exp) { + final Throwable rootCause = exp.getRootCause(); + assertThat(rootCause, instanceOf(IllegalArgumentException.class)); + assertThat(rootCause.getMessage(), + equalTo("this node is not master-eligible, but discovered nodes are only exposed by master-eligible nodes")); + countDownLatch.countDown(); + } + }); + + assertTrue(countDownLatch.await(10, TimeUnit.SECONDS)); + } + + public void testFailsQuicklyWithZeroTimeout() throws InterruptedException { + new TransportGetDiscoveredNodesAction(Settings.EMPTY, EMPTY_FILTERS, transportService, coordinator); // registers action + transportService.start(); + transportService.acceptIncomingRequests(); + coordinator.start(); + coordinator.startInitialJoin(); + + final GetDiscoveredNodesRequest getDiscoveredNodesRequest = new GetDiscoveredNodesRequest(); + getDiscoveredNodesRequest.setWaitForNodes(2); + getDiscoveredNodesRequest.setTimeout(TimeValue.ZERO); + + final CountDownLatch countDownLatch = new CountDownLatch(1); + transportService.sendRequest(localNode, GetDiscoveredNodesAction.NAME, getDiscoveredNodesRequest, new ResponseHandler() { + @Override + public void handleResponse(GetDiscoveredNodesResponse response) { + throw new AssertionError("should not be called"); + } + + @Override + public void handleException(TransportException exp) { + final Throwable rootCause = exp.getRootCause(); + assertThat(rootCause, instanceOf(ElasticsearchTimeoutException.class)); + assertThat(rootCause.getMessage(), startsWith("timed out while waiting for GetDiscoveredNodesRequest{")); + countDownLatch.countDown(); + } + }); + + assertTrue(countDownLatch.await(10, TimeUnit.SECONDS)); + } + + public void testGetsDiscoveredNodes() throws InterruptedException { + new TransportGetDiscoveredNodesAction(Settings.EMPTY, EMPTY_FILTERS, transportService, coordinator); // registers action + transportService.start(); + transportService.acceptIncomingRequests(); + coordinator.start(); + coordinator.startInitialJoin(); + + threadPool.generic().execute(() -> + transportService.sendRequest(localNode, REQUEST_PEERS_ACTION_NAME, new PeersRequest(otherNode, emptyList()), + new TransportResponseHandler() { + @Override + public PeersResponse read(StreamInput in) throws IOException { + return new PeersResponse(in); + } + + @Override + public void handleResponse(PeersResponse response) { + } + + @Override + public void handleException(TransportException exp) { + } + + @Override + public String executor() { + return Names.SAME; + } + })); + + { + final CountDownLatch countDownLatch = new CountDownLatch(1); + final GetDiscoveredNodesRequest getDiscoveredNodesRequest = new GetDiscoveredNodesRequest(); + getDiscoveredNodesRequest.setWaitForNodes(2); + transportService.sendRequest(localNode, GetDiscoveredNodesAction.NAME, getDiscoveredNodesRequest, new ResponseHandler() { + @Override + public void handleResponse(GetDiscoveredNodesResponse response) { + assertThat(response.getNodes(), containsInAnyOrder(localNode, otherNode)); + countDownLatch.countDown(); + } + + @Override + public void handleException(TransportException exp) { + throw new AssertionError("should not be called", exp); + } + }); + + assertTrue(countDownLatch.await(10, TimeUnit.SECONDS)); + } + + { + final CountDownLatch countDownLatch = new CountDownLatch(1); + final GetDiscoveredNodesRequest getDiscoveredNodesRequest = new GetDiscoveredNodesRequest(); + getDiscoveredNodesRequest.setWaitForNodes(2); + getDiscoveredNodesRequest.setTimeout(TimeValue.ZERO); + transportService.sendRequest(localNode, GetDiscoveredNodesAction.NAME, getDiscoveredNodesRequest, new ResponseHandler() { + @Override + public void handleResponse(GetDiscoveredNodesResponse response) { + assertThat(response.getNodes(), containsInAnyOrder(localNode, otherNode)); + countDownLatch.countDown(); + } + + @Override + public void handleException(TransportException exp) { + throw new AssertionError("should not be called", exp); + } + }); + + assertTrue(countDownLatch.await(10, TimeUnit.SECONDS)); + } + } + + private abstract class ResponseHandler implements TransportResponseHandler { + @Override + public String executor() { + return Names.SAME; + } + + @Override + public GetDiscoveredNodesResponse read(StreamInput in) throws IOException { + return new GetDiscoveredNodesResponse(in); + } + } +} diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java b/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java index 8905b6ba0e25c..e755823e9b2b1 100644 --- a/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java @@ -35,6 +35,7 @@ import org.elasticsearch.cluster.service.ClusterApplier; import org.elasticsearch.common.Randomness; import org.elasticsearch.common.component.AbstractComponent; +import org.elasticsearch.common.lease.Releasable; import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; @@ -61,6 +62,7 @@ import java.util.Map; import java.util.Optional; import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Consumer; import java.util.function.Predicate; import java.util.function.Supplier; @@ -96,7 +98,9 @@ import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.greaterThanOrEqualTo; +import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.lessThan; import static org.hamcrest.Matchers.lessThanOrEqualTo; import static org.hamcrest.Matchers.not; import static org.hamcrest.Matchers.sameInstance; @@ -660,6 +664,39 @@ public void testAckListenerReceivesNacksFromFollowerInHigherTerm() { // assertTrue("expected ack from " + follower1, ackCollector.hasAckedSuccessfully(follower1)); } + public void testDiscoveryOfPeersTriggersNotification() { + final Cluster cluster = new Cluster(randomIntBetween(2, 5)); + + // register a listener and then deregister it again to show that it is not called after deregistration + try (Releasable ignored = cluster.getAnyNode().coordinator.withDiscoveryListener(ns -> { + throw new AssertionError("should not be called"); + })) { + // do nothing + } + + final long startTimeMillis = cluster.deterministicTaskQueue.getCurrentTimeMillis(); + final ClusterNode bootstrapNode = cluster.getAnyNode(); + final AtomicBoolean hasDiscoveredAllPeers = new AtomicBoolean(); + assertFalse(bootstrapNode.coordinator.getFoundPeers().iterator().hasNext()); + try (Releasable ignored = bootstrapNode.coordinator.withDiscoveryListener(discoveryNodes -> { + int peerCount = 0; + for (final DiscoveryNode discoveryNode : discoveryNodes) { + peerCount++; + } + assertThat(peerCount, lessThan(cluster.size())); + if (peerCount == cluster.size() - 1 && hasDiscoveredAllPeers.get() == false) { + hasDiscoveredAllPeers.set(true); + final long elapsedTimeMillis = cluster.deterministicTaskQueue.getCurrentTimeMillis() - startTimeMillis; + logger.info("--> {} discovered {} peers in {}ms", bootstrapNode.getId(), cluster.size() - 1, elapsedTimeMillis); + assertThat(elapsedTimeMillis, lessThanOrEqualTo(defaultMillis(DISCOVERY_FIND_PEERS_INTERVAL_SETTING) * 2)); + } + })) { + cluster.runFor(defaultMillis(DISCOVERY_FIND_PEERS_INTERVAL_SETTING) * 2 + randomLongBetween(0, 60000), "discovery phase"); + } + + assertTrue(hasDiscoveredAllPeers.get()); + } + public void testSettingInitialConfigurationTriggersElection() { final Cluster cluster = new Cluster(randomIntBetween(1, 5)); cluster.runFor(defaultMillis(DISCOVERY_FIND_PEERS_INTERVAL_SETTING) * 2 + randomLongBetween(0, 60000), "initial discovery phase"); @@ -669,13 +706,22 @@ public void testSettingInitialConfigurationTriggersElection() { assertThat(nodeId + " is in term 0", clusterNode.coordinator.getCurrentTerm(), is(0L)); assertThat(nodeId + " last accepted in term 0", clusterNode.coordinator.getLastAcceptedState().term(), is(0L)); assertThat(nodeId + " last accepted version 0", clusterNode.coordinator.getLastAcceptedState().version(), is(0L)); + assertFalse(nodeId + " has not received an initial configuration", clusterNode.coordinator.isInitialConfigurationSet()); assertTrue(nodeId + " has an empty last-accepted configuration", clusterNode.coordinator.getLastAcceptedState().getLastAcceptedConfiguration().isEmpty()); assertTrue(nodeId + " has an empty last-committed configuration", clusterNode.coordinator.getLastAcceptedState().getLastCommittedConfiguration().isEmpty()); + + final Set foundPeers = new HashSet<>(); + clusterNode.coordinator.getFoundPeers().forEach(foundPeers::add); + assertTrue(nodeId + " should not have discovered itself", foundPeers.add(clusterNode.getLocalNode())); + assertThat(nodeId + " should have found all peers", foundPeers, hasSize(cluster.size())); } - cluster.getAnyNode().applyInitialConfiguration(); + final ClusterNode bootstrapNode = cluster.getAnyNode(); + bootstrapNode.applyInitialConfiguration(); + assertTrue(bootstrapNode.getId() + " has been bootstrapped", bootstrapNode.coordinator.isInitialConfigurationSet()); + cluster.stabilise( // the first election should succeed, because only one node knows of the initial configuration and therefore can win a // pre-voting round and proceed to an election, so there cannot be any collisions @@ -696,10 +742,7 @@ public void testCannotSetInitialConfigurationTwice() { cluster.stabilise(); final Coordinator coordinator = cluster.getAnyNode().coordinator; - final CoordinationStateRejectedException exception = expectThrows(CoordinationStateRejectedException.class, - () -> coordinator.setInitialConfiguration(coordinator.getLastAcceptedState().getLastCommittedConfiguration())); - - assertThat(exception.getMessage(), is("Cannot set initial configuration: configuration has already been set")); + assertFalse(coordinator.setInitialConfiguration(coordinator.getLastAcceptedState().getLastCommittedConfiguration())); } public void testCannotSetInitialConfigurationWithoutQuorum() { @@ -715,7 +758,7 @@ public void testCannotSetInitialConfigurationWithoutQuorum() { assertThat(exceptionMessage, containsString(coordinator.getLocalNode().toString())); // This is VERY BAD: setting a _different_ initial configuration. Yet it works if the first attempt will never be a quorum. - coordinator.setInitialConfiguration(new VotingConfiguration(Collections.singleton(coordinator.getLocalNode().getId()))); + assertTrue(coordinator.setInitialConfiguration(new VotingConfiguration(Collections.singleton(coordinator.getLocalNode().getId())))); cluster.stabilise(); } @@ -965,7 +1008,7 @@ void stabilise(long stabilisationDurationMillis) { deterministicTaskQueue.getExecutionDelayVariabilityMillis(), lessThanOrEqualTo(DEFAULT_DELAY_VARIABILITY)); assertFalse("stabilisation requires stable storage", disruptStorage); - if (clusterNodes.stream().allMatch(n -> n.coordinator.getLastAcceptedState().getLastAcceptedConfiguration().isEmpty())) { + if (clusterNodes.stream().allMatch(n -> n.coordinator.isInitialConfigurationSet() == false)) { assertThat("setting initial configuration may fail with disconnected nodes", disconnectedNodes, empty()); assertThat("setting initial configuration may fail with blackholed nodes", blackholedNodes, empty()); runFor(defaultMillis(DISCOVERY_FIND_PEERS_INTERVAL_SETTING) * 2, "discovery prior to setting initial configuration"); @@ -983,6 +1026,7 @@ void stabilise(long stabilisationDurationMillis) { final Matcher isEqualToLeaderVersion = equalTo(leader.coordinator.getLastAcceptedState().getVersion()); final String leaderId = leader.getId(); + assertTrue(leaderId + " has been bootstrapped", leader.coordinator.isInitialConfigurationSet()); assertTrue(leaderId + " exists in its last-applied state", leader.getLastAppliedClusterState().getNodes().nodeExists(leaderId)); assertThat(leaderId + " has applied its state ", leader.getLastAppliedClusterState().getVersion(), isEqualToLeaderVersion); @@ -1008,6 +1052,7 @@ void stabilise(long stabilisationDurationMillis) { } assertTrue(nodeId + " is in the latest applied state on " + leaderId, leader.getLastAppliedClusterState().getNodes().nodeExists(nodeId)); + assertTrue(nodeId + " has been bootstrapped", clusterNode.coordinator.isInitialConfigurationSet()); } else { assertThat(nodeId + " is not following " + leaderId, clusterNode.coordinator.getMode(), is(CANDIDATE)); assertFalse(nodeId + " is not in the applied state on " + leaderId, diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/NoOpClusterApplier.java b/server/src/test/java/org/elasticsearch/cluster/coordination/NoOpClusterApplier.java new file mode 100644 index 0000000000000..c9ae545e6af6b --- /dev/null +++ b/server/src/test/java/org/elasticsearch/cluster/coordination/NoOpClusterApplier.java @@ -0,0 +1,36 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.cluster.coordination; + +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.service.ClusterApplier; + +import java.util.function.Supplier; + +public class NoOpClusterApplier implements ClusterApplier { + @Override + public void setInitialState(ClusterState initialState) { + + } + + @Override + public void onNewClusterState(String source, Supplier clusterStateSupplier, ClusterApplyListener listener) { + listener.onSuccess(source); + } +} diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/NodeJoinTests.java b/server/src/test/java/org/elasticsearch/cluster/coordination/NodeJoinTests.java index 21446e3b31f0e..7f0ec1908a999 100644 --- a/server/src/test/java/org/elasticsearch/cluster/coordination/NodeJoinTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/coordination/NodeJoinTests.java @@ -27,7 +27,6 @@ import org.elasticsearch.cluster.block.ClusterBlocks; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; -import org.elasticsearch.cluster.service.ClusterApplier; import org.elasticsearch.cluster.service.MasterService; import org.elasticsearch.cluster.service.MasterServiceTests; import org.elasticsearch.common.Randomness; @@ -65,7 +64,6 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; -import java.util.function.Supplier; import java.util.stream.Collectors; import java.util.stream.IntStream; import java.util.stream.Stream; @@ -529,16 +527,4 @@ private boolean isLocalNodeElectedMaster() { private boolean clusterStateHasNode(DiscoveryNode node) { return node.equals(MasterServiceTests.discoveryState(masterService).nodes().get(node.getId())); } - - private static class NoOpClusterApplier implements ClusterApplier { - @Override - public void setInitialState(ClusterState initialState) { - - } - - @Override - public void onNewClusterState(String source, Supplier clusterStateSupplier, ClusterApplyListener listener) { - listener.onSuccess(source); - } - } } diff --git a/test/framework/src/main/java/org/elasticsearch/test/ESSingleNodeTestCase.java b/test/framework/src/main/java/org/elasticsearch/test/ESSingleNodeTestCase.java index e3e45dd9ad8af..13a8eae467c8f 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/ESSingleNodeTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/test/ESSingleNodeTestCase.java @@ -224,7 +224,7 @@ private Node newNode() { } catch (NodeValidationException e) { throw new RuntimeException(e); } - }, Collections.singletonList(node), logger); + }, Collections.singletonList(node), logger, this::wrapClient); return node; } @@ -232,7 +232,11 @@ private Node newNode() { * Returns a client to the single-node cluster. */ public Client client() { - return NODE.client(); + return wrapClient(NODE.client()); + } + + public Client wrapClient(final Client client) { + return client; } /** diff --git a/test/framework/src/main/java/org/elasticsearch/test/ESTestCase.java b/test/framework/src/main/java/org/elasticsearch/test/ESTestCase.java index 1620ca07ec4ab..c3515bbe95fcb 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/ESTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/test/ESTestCase.java @@ -38,7 +38,6 @@ import org.apache.logging.log4j.core.appender.AbstractAppender; import org.apache.logging.log4j.core.config.Configurator; import org.apache.logging.log4j.core.layout.PatternLayout; -import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.logging.log4j.status.StatusConsoleListener; import org.apache.logging.log4j.status.StatusData; import org.apache.logging.log4j.status.StatusLogger; @@ -48,12 +47,15 @@ import org.apache.lucene.util.TestUtil; import org.apache.lucene.util.TimeUnits; import org.elasticsearch.Version; +import org.elasticsearch.action.admin.cluster.bootstrap.BootstrapClusterAction; +import org.elasticsearch.action.admin.cluster.bootstrap.BootstrapClusterRequest; +import org.elasticsearch.action.admin.cluster.bootstrap.GetDiscoveredNodesAction; +import org.elasticsearch.action.admin.cluster.bootstrap.GetDiscoveredNodesRequest; import org.elasticsearch.bootstrap.BootstrapForTesting; +import org.elasticsearch.client.Client; import org.elasticsearch.client.Requests; import org.elasticsearch.cluster.ClusterModule; import org.elasticsearch.cluster.ClusterName; -import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.coordination.CoordinationStateRejectedException; import org.elasticsearch.cluster.coordination.Coordinator; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.node.DiscoveryNode; @@ -61,7 +63,6 @@ import org.elasticsearch.common.CheckedRunnable; import org.elasticsearch.common.SuppressForbidden; import org.elasticsearch.common.bytes.BytesReference; -import org.elasticsearch.common.component.Lifecycle; import org.elasticsearch.common.io.PathUtils; import org.elasticsearch.common.io.PathUtilsForTesting; import org.elasticsearch.common.io.stream.BytesStreamOutput; @@ -124,6 +125,7 @@ import org.elasticsearch.test.junit.listeners.ReproduceInfoPrinter; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.MockTcpTransportPlugin; +import org.elasticsearch.transport.TransportService; import org.elasticsearch.transport.nio.MockNioTransportPlugin; import org.joda.time.DateTimeZone; import org.junit.After; @@ -161,6 +163,7 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.function.BooleanSupplier; import java.util.function.Consumer; +import java.util.function.Function; import java.util.function.IntFunction; import java.util.function.Predicate; import java.util.function.Supplier; @@ -300,57 +303,62 @@ public static TransportAddress buildNewFakeTransportAddress() { return new TransportAddress(TransportAddress.META_ADDRESS, portGenerator.incrementAndGet()); } - public static void bootstrapNodes(boolean condition, Runnable startAction, List nodes, Logger logger) { + public static void bootstrapNodes(boolean condition, Runnable startAction, List nodes, Logger logger, + Function clientWrapper) { final AtomicBoolean stopBootstrapThread = new AtomicBoolean(); Thread bootstrapThread = null; if (condition) { - final Set zen2MasterNodeIds = new HashSet<>(); - final Set zen2MasterNodes = new HashSet<>(); + int zen2MasterNodeCount = 0; for (Node node : nodes) { if (DiscoveryNode.isMasterNode(node.settings())) { Discovery discovery = node.injector().getInstance(Discovery.class); if (discovery instanceof Coordinator) { - zen2MasterNodeIds.add(node.getNodeEnvironment().nodeId()); - zen2MasterNodes.add(node); + zen2MasterNodeCount++; } } } - if (zen2MasterNodes.isEmpty() == false) { - Set configNodeIds = new HashSet<>(randomSubsetOf(zen2MasterNodeIds)); - if (configNodeIds.isEmpty()) { - configNodeIds = zen2MasterNodeIds; - } - final ClusterState.VotingConfiguration initalConfiguration = new ClusterState.VotingConfiguration(configNodeIds); - - logger.info("Bootstrapping cluster using initial configuration {}", initalConfiguration); + if (zen2MasterNodeCount > 0) { + final int minimumConfigurationSize = randomIntBetween(1, zen2MasterNodeCount); final Random bootstrapRandom = new Random(randomLong()); bootstrapThread = new Thread(() -> { + BootstrapClusterRequest bootstrapClusterRequest = null; while (stopBootstrapThread.get() == false) { - final Discovery discovery = randomFrom(bootstrapRandom, zen2MasterNodes).injector().getInstance(Discovery.class); - assert discovery instanceof Coordinator; - final Coordinator coordinator = (Coordinator) discovery; - try { - if (coordinator.lifecycleState() == Lifecycle.State.STARTED) { - coordinator.setInitialConfiguration(initalConfiguration); - if (usually(bootstrapRandom)) { - return; + final Node node = randomFrom(bootstrapRandom, nodes); + final TransportService transportService = node.injector().getInstance(TransportService.class); + if (transportService.getLocalNode() != null) { + final Client client = clientWrapper.apply(node.client()); + if (bootstrapClusterRequest == null) { + try { + final GetDiscoveredNodesRequest discoveredNodesRequest = new GetDiscoveredNodesRequest(); + discoveredNodesRequest.setWaitForNodes(minimumConfigurationSize); + bootstrapClusterRequest = new BootstrapClusterRequest( + client.execute(GetDiscoveredNodesAction.INSTANCE, discoveredNodesRequest).get() + .getBootstrapConfiguration()); + } catch (Exception e) { + logger.trace("exception getting bootstrap configuration", e); + } + } else { + try { + client.execute(BootstrapClusterAction.INSTANCE, bootstrapClusterRequest).get(); + if (usually(bootstrapRandom)) { + // occasionally carry on trying to bootstrap even after one request succeeded. + return; + } + } catch (Exception e) { + logger.trace("exception bootstrapping cluster", e); } } - } catch (CoordinationStateRejectedException e) { - logger.trace( - () -> new ParameterizedMessage("node [{}] rejected initial configuration", coordinator.getLocalNode()), e); } try { Thread.sleep(100); } catch (InterruptedException e) { - logger.trace("interrupted while sleeping", e); - return; + throw new AssertionError("interrupted while sleeping", e); } } - }, "Bootstrap-Thread for " + ClusterName.CLUSTER_NAME_SETTING.get(zen2MasterNodes.iterator().next().settings())); + }, "Bootstrap-Thread for " + ClusterName.CLUSTER_NAME_SETTING.get(nodes.get(0).settings())); bootstrapThread.start(); } } diff --git a/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java b/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java index d8cc58fa64719..1f71a2c2d9b6f 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java +++ b/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java @@ -524,7 +524,7 @@ private synchronized NodeAndClient getOrBuildRandomNode() { final Runnable onTransportServiceStarted = () -> {}; // do not create unicast host file for this one node. final NodeAndClient buildNode = buildNode(ord, random.nextLong(), null, false, 1, onTransportServiceStarted); assert nodes.isEmpty(); - bootstrapNodes(true, buildNode::startNode, Collections.singletonList(buildNode.node()), logger); + bootstrapNodes(true, buildNode::startNode, Collections.singletonList(buildNode.node()), logger, clientWrapper); buildNode.startNode(); publishNode(buildNode); return buildNode; @@ -1117,7 +1117,7 @@ private synchronized void reset(boolean wipeData) throws IOException { } bootstrapNodes(prevNodeCount == 0, () -> startAndPublishNodesAndClients(toStartAndPublish), - toStartAndPublish.stream().map(NodeAndClient::node).collect(Collectors.toList()), logger); + toStartAndPublish.stream().map(NodeAndClient::node).collect(Collectors.toList()), logger, clientWrapper); startAndPublishNodesAndClients(toStartAndPublish); @@ -1900,7 +1900,7 @@ public synchronized List startNodes(Settings... settings) { validateClusterFormed(); } }, - nodes.stream().map(NodeAndClient::node).collect(Collectors.toList()), logger); + nodes.stream().map(NodeAndClient::node).collect(Collectors.toList()), logger, clientWrapper); return nodes.stream().map(NodeAndClient::getName).collect(Collectors.toList()); } diff --git a/x-pack/plugin/security/src/test/java/org/elasticsearch/test/SecuritySingleNodeTestCase.java b/x-pack/plugin/security/src/test/java/org/elasticsearch/test/SecuritySingleNodeTestCase.java index e555bfdb3d335..3c1834c84fea4 100644 --- a/x-pack/plugin/security/src/test/java/org/elasticsearch/test/SecuritySingleNodeTestCase.java +++ b/x-pack/plugin/security/src/test/java/org/elasticsearch/test/SecuritySingleNodeTestCase.java @@ -265,14 +265,14 @@ protected SecureString nodeClientPassword() { } @Override - public Client client() { + public Client wrapClient(final Client client) { Map headers = Collections.singletonMap("Authorization", - basicAuthHeaderValue(nodeClientUsername(), nodeClientPassword())); + basicAuthHeaderValue(nodeClientUsername(), nodeClientPassword())); // we need to wrap node clients because we do not specify a user for nodes and all requests will use the system // user. This is ok for internal n2n stuff but the test framework does other things like wiping indices, repositories, etc // that the system user cannot do. so we wrap the node client with a user that can do these things since the client() calls // are all using a node client - return super.client().filterWithHeader(headers); + return client.filterWithHeader(headers); } protected boolean isTransportSSLEnabled() {