Skip to content

[Zen2] Allow Setting a List of Bootstrap Nodes to Wait for #35847

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
import org.elasticsearch.common.unit.TimeValue;

import java.io.IOException;
import java.util.Collections;
import java.util.List;

/**
* Request the set of master-eligible nodes discovered by this node. Most useful in a brand-new cluster as a precursor to setting the
Expand All @@ -38,6 +40,8 @@ public class GetDiscoveredNodesRequest extends ActionRequest {
@Nullable // if the request should wait indefinitely
private TimeValue timeout = TimeValue.timeValueSeconds(30);

private List<String> requiredNodes = Collections.emptyList();

public GetDiscoveredNodesRequest() {
}

Expand Down Expand Up @@ -95,6 +99,14 @@ public TimeValue getTimeout() {
return timeout;
}

public List<String> getRequiredNodes() {
return requiredNodes;
}

public void setRequiredNodes(final List<String> requiredNodes) {
this.requiredNodes = requiredNodes;
}

@Override
public ActionRequestValidationException validate() {
return null;
Expand All @@ -110,13 +122,14 @@ public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeInt(waitForNodes);
out.writeOptionalTimeValue(timeout);
out.writeStringList(requiredNodes);
}

@Override
public String toString() {
return "GetDiscoveredNodesRequest{" +
"waitForNodes=" + waitForNodes +
", timeout=" + timeout +
'}';
", requiredNodes=[" + String.join(",", requiredNodes) + "]}";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.elasticsearch.threadpool.ThreadPool.Names;
import org.elasticsearch.transport.TransportService;

import java.util.HashSet;
import java.util.LinkedHashSet;
import java.util.Set;
import java.util.concurrent.ExecutorService;
Expand Down Expand Up @@ -93,7 +94,7 @@ public void accept(Iterable<DiscoveryNode> nodes) {
nodesSet.add(localNode);
nodes.forEach(nodesSet::add);
logger.trace("discovered {}", nodesSet);
if (nodesSet.size() >= request.getWaitForNodes() && listenerNotified.compareAndSet(false, true)) {
if (checkWaitRequirements(request, nodesSet) && listenerNotified.compareAndSet(false, true)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this needs a try {...} catch (Exception e) { listenableFuture.onFailure(e); } around it, but the test to find this needs to interleave the steps of the other tests so is a bit ugly:

    public void testThrowsExceptionIfDuplicateDiscoveredLater() throws InterruptedException {
        new TransportGetDiscoveredNodesAction(Settings.EMPTY, EMPTY_FILTERS, transportService, coordinator); // registers action
        transportService.start();
        transportService.acceptIncomingRequests();
        coordinator.start();
        coordinator.startInitialJoin();

        final GetDiscoveredNodesRequest getDiscoveredNodesRequest = new GetDiscoveredNodesRequest();
        final String ip = localNode.getAddress().getAddress();
        getDiscoveredNodesRequest.setRequiredNodes(Collections.singletonList(ip));
        getDiscoveredNodesRequest.setWaitForNodes(2);

        final CountDownLatch countDownLatch = new CountDownLatch(1);
        transportService.sendRequest(localNode, GetDiscoveredNodesAction.NAME, getDiscoveredNodesRequest, new ResponseHandler() {
            @Override
            public void handleResponse(GetDiscoveredNodesResponse response) {
                throw new AssertionError("should not be called");
            }

            @Override
            public void handleException(TransportException exp) {
                Throwable t = exp.getRootCause();
                assertThat(t, instanceOf(IllegalArgumentException.class));
                assertThat(t.getMessage(), startsWith('[' + ip + "] matches ["));
                countDownLatch.countDown();
            }
        });

        threadPool.generic().execute(() ->
            transportService.sendRequest(localNode, REQUEST_PEERS_ACTION_NAME, new PeersRequest(otherNode, emptyList()),
                new TransportResponseHandler<PeersResponse>() {
                    @Override
                    public PeersResponse read(StreamInput in) throws IOException {
                        return new PeersResponse(in);
                    }

                    @Override
                    public void handleResponse(PeersResponse response) {
                        logger.info("response: {}", response);
                    }

                    @Override
                    public void handleException(TransportException exp) {
                        logger.info("exception", exp);
                    }

                    @Override
                    public String executor() {
                        return Names.SAME;
                    }
                }));

        assertTrue(countDownLatch.await(10, TimeUnit.SECONDS));
    }

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added :) (tried making it a little nicer by drying it up against other tests)

listenableFuture.onResponse(new GetDiscoveredNodesResponse(nodesSet));
}
}
Expand Down Expand Up @@ -124,4 +125,16 @@ public String toString() {
});
}
}

private static boolean checkWaitRequirements(GetDiscoveredNodesRequest request, Set<DiscoveryNode> nodes) {
Set<String> requiredNodes = new HashSet<>(request.getRequiredNodes());
for (final DiscoveryNode node : nodes) {
requiredNodes.remove(node.getAddress().toString());
requiredNodes.remove(node.getName());
if (requiredNodes.isEmpty()) {
break;
}
}
return requiredNodes.isEmpty() && nodes.size() >= request.getWaitForNodes();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@
import org.elasticsearch.transport.TransportService;

import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.function.Function;

public class ClusterBootstrapService {

Expand All @@ -51,12 +54,17 @@ public class ClusterBootstrapService {
public static final Setting<Integer> INITIAL_MASTER_NODE_COUNT_SETTING =
Setting.intSetting("cluster.unsafe_initial_master_node_count", 0, 0, Property.NodeScope);

public static final Setting<List<String>> INITIAL_MASTER_NODES_SETTING =
Setting.listSetting("cluster.unsafe_initial_master_nodes", Collections.emptyList(), Function.identity(), Property.NodeScope);

private final int initialMasterNodeCount;
private final List<String> initialMasterNodes;
private final TransportService transportService;
private volatile boolean running;

public ClusterBootstrapService(Settings settings, TransportService transportService) {
initialMasterNodeCount = INITIAL_MASTER_NODE_COUNT_SETTING.get(settings);
initialMasterNodes = INITIAL_MASTER_NODES_SETTING.get(settings);
this.transportService = transportService;
}

Expand All @@ -73,6 +81,7 @@ public void start() {

final GetDiscoveredNodesRequest request = new GetDiscoveredNodesRequest();
request.setWaitForNodes(initialMasterNodeCount);
request.setRequiredNodes(initialMasterNodes);
request.setTimeout(null);
logger.trace("sending {}", request);
transportService.sendRequest(transportService.getLocalNode(), GetDiscoveredNodesAction.NAME, request,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -469,6 +469,7 @@ public void apply(Settings value, Settings current, Settings previous) {
LeaderChecker.LEADER_CHECK_RETRY_COUNT_SETTING,
Reconfigurator.CLUSTER_AUTO_SHRINK_VOTING_CONFIGURATION,
TransportAddVotingTombstonesAction.MAXIMUM_VOTING_TOMBSTONES_SETTING,
ClusterBootstrapService.INITIAL_MASTER_NODES_SETTING,
ClusterBootstrapService.INITIAL_MASTER_NODE_COUNT_SETTING
)));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@

import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;

import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
Expand Down Expand Up @@ -67,7 +68,9 @@ public void testNodeVersionIsUpdated() throws IOException, NodeValidationExcepti
.put(Node.NODE_DATA_SETTING.getKey(), false)
.put("cluster.name", "foobar")
.put(TestZenDiscovery.USE_ZEN2.getKey(), true)
.put(ClusterBootstrapService.INITIAL_MASTER_NODE_COUNT_SETTING.getKey(), 1)
.putList(
ClusterBootstrapService.INITIAL_MASTER_NODES_SETTING.getKey(), Collections.singletonList("testNodeVersionIsUpdated")
).put(ClusterBootstrapService.INITIAL_MASTER_NODE_COUNT_SETTING.getKey(), 1)
.build(), Arrays.asList(getTestTransportPlugin(), TestZenDiscovery.TestPlugin.class,
MockHttpTransport.TestPlugin.class)).start()) {
TransportAddress transportAddress = node.injector().getInstance(TransportService.class).boundAddress().publishAddress();
Expand Down