Skip to content

[Zen2] Allow Setting a List of Bootstrap Nodes to Wait for #35847

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
import org.elasticsearch.common.unit.TimeValue;

import java.io.IOException;
import java.util.Collections;
import java.util.List;

/**
* Request the set of master-eligible nodes discovered by this node. Most useful in a brand-new cluster as a precursor to setting the
Expand All @@ -38,13 +40,16 @@ public class GetDiscoveredNodesRequest extends ActionRequest {
@Nullable // if the request should wait indefinitely
private TimeValue timeout = TimeValue.timeValueSeconds(30);

private List<String> requiredNodes = Collections.emptyList();

public GetDiscoveredNodesRequest() {
}

public GetDiscoveredNodesRequest(StreamInput in) throws IOException {
super(in);
waitForNodes = in.readInt();
timeout = in.readOptionalTimeValue();
requiredNodes = in.readList(StreamInput::readString);
}

/**
Expand Down Expand Up @@ -95,6 +100,26 @@ public TimeValue getTimeout() {
return timeout;
}

/**
* Sometimes it is useful only to receive a successful response after discovering a certain set of master-eligible nodes.
* This parameter gives the names or transport addresses of the expected nodes.
*
* @return list of expected nodes
*/
public List<String> getRequiredNodes() {
return requiredNodes;
}

/**
* Sometimes it is useful only to receive a successful response after discovering a certain set of master-eligible nodes.
* This parameter gives the names or transport addresses of the expected nodes.
*
* @param requiredNodes list of expected nodes
*/
public void setRequiredNodes(final List<String> requiredNodes) {
this.requiredNodes = requiredNodes;
}

@Override
public ActionRequestValidationException validate() {
return null;
Expand All @@ -110,13 +135,14 @@ public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeInt(waitForNodes);
out.writeOptionalTimeValue(timeout);
out.writeStringList(requiredNodes);
}

@Override
public String toString() {
return "GetDiscoveredNodesRequest{" +
"waitForNodes=" + waitForNodes +
", timeout=" + timeout +
'}';
", requiredNodes=" + requiredNodes + "}";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,14 @@
import org.elasticsearch.threadpool.ThreadPool.Names;
import org.elasticsearch.transport.TransportService;

import java.util.HashSet;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.stream.Collectors;

import static org.elasticsearch.discovery.DiscoveryModule.DISCOVERY_TYPE_SETTING;

Expand Down Expand Up @@ -93,8 +96,12 @@ public void accept(Iterable<DiscoveryNode> nodes) {
nodesSet.add(localNode);
nodes.forEach(nodesSet::add);
logger.trace("discovered {}", nodesSet);
if (nodesSet.size() >= request.getWaitForNodes() && listenerNotified.compareAndSet(false, true)) {
listenableFuture.onResponse(new GetDiscoveredNodesResponse(nodesSet));
try {
if (checkWaitRequirements(request, nodesSet) && listenerNotified.compareAndSet(false, true)) {
listenableFuture.onResponse(new GetDiscoveredNodesResponse(nodesSet));
}
} catch (Exception e) {
listenableFuture.onFailure(e);
}
}

Expand Down Expand Up @@ -124,4 +131,39 @@ public String toString() {
});
}
}

private static boolean matchesRequirement(DiscoveryNode discoveryNode, String requirement) {
return discoveryNode.getName().equals(requirement)
|| discoveryNode.getAddress().toString().equals(requirement)
|| discoveryNode.getAddress().getAddress().equals(requirement);
}

private static boolean checkWaitRequirements(GetDiscoveredNodesRequest request, Set<DiscoveryNode> nodes) {
if (nodes.size() < request.getWaitForNodes()) {
return false;
}

List<String> requirements = request.getRequiredNodes();
final Set<DiscoveryNode> selectedNodes = new HashSet<>();
for (final String requirement : requirements) {
final Set<DiscoveryNode> matchingNodes
= nodes.stream().filter(n -> matchesRequirement(n, requirement)).collect(Collectors.toSet());

if (matchingNodes.isEmpty()) {
return false;
}
if (matchingNodes.size() > 1) {
throw new IllegalArgumentException("[" + requirement + "] matches " + matchingNodes);
}

for (final DiscoveryNode matchingNode : matchingNodes) {
if (selectedNodes.add(matchingNode) == false) {
throw new IllegalArgumentException("[" + matchingNode + "] matches " +
requirements.stream().filter(r -> matchesRequirement(matchingNode, requirement)).collect(Collectors.toList()));
}
}
}

return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@
import org.elasticsearch.transport.TransportService;

import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.function.Function;

public class ClusterBootstrapService {

Expand All @@ -51,12 +54,17 @@ public class ClusterBootstrapService {
public static final Setting<Integer> INITIAL_MASTER_NODE_COUNT_SETTING =
Setting.intSetting("cluster.unsafe_initial_master_node_count", 0, 0, Property.NodeScope);

public static final Setting<List<String>> INITIAL_MASTER_NODES_SETTING =
Setting.listSetting("cluster.initial_master_nodes", Collections.emptyList(), Function.identity(), Property.NodeScope);

private final int initialMasterNodeCount;
private final List<String> initialMasterNodes;
private final TransportService transportService;
private volatile boolean running;

public ClusterBootstrapService(Settings settings, TransportService transportService) {
initialMasterNodeCount = INITIAL_MASTER_NODE_COUNT_SETTING.get(settings);
initialMasterNodes = INITIAL_MASTER_NODES_SETTING.get(settings);
this.transportService = transportService;
}

Expand All @@ -73,6 +81,7 @@ public void start() {

final GetDiscoveredNodesRequest request = new GetDiscoveredNodesRequest();
request.setWaitForNodes(initialMasterNodeCount);
request.setRequiredNodes(initialMasterNodes);
request.setTimeout(null);
logger.trace("sending {}", request);
transportService.sendRequest(transportService.getLocalNode(), GetDiscoveredNodesAction.NAME, request,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -470,6 +470,7 @@ public void apply(Settings value, Settings current, Settings previous) {
LeaderChecker.LEADER_CHECK_RETRY_COUNT_SETTING,
Reconfigurator.CLUSTER_AUTO_SHRINK_VOTING_CONFIGURATION,
TransportAddVotingTombstonesAction.MAXIMUM_VOTING_TOMBSTONES_SETTING,
ClusterBootstrapService.INITIAL_MASTER_NODES_SETTING,
ClusterBootstrapService.INITIAL_MASTER_NODE_COUNT_SETTING,
LagDetector.CLUSTER_FOLLOWER_LAG_TIMEOUT_SETTING
)));
Expand Down
Loading