-
Notifications
You must be signed in to change notification settings - Fork 25.2k
[Zen2] Allow Setting a List of Bootstrap Nodes to Wait for #35847
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 3 commits
6c64fe8
af90c92
b7ce8ef
d78a282
92209a3
f8de0fc
b8af175
09a0b0e
f2620d5
ccaee66
654c310
e5e97a5
ffd3879
26b35a0
fac237d
b194ade
dff0202
adc0249
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -37,6 +37,7 @@ | |
import org.elasticsearch.threadpool.ThreadPool.Names; | ||
import org.elasticsearch.transport.TransportService; | ||
|
||
import java.util.HashSet; | ||
import java.util.LinkedHashSet; | ||
import java.util.Set; | ||
import java.util.concurrent.ExecutorService; | ||
|
@@ -93,7 +94,7 @@ public void accept(Iterable<DiscoveryNode> nodes) { | |
nodesSet.add(localNode); | ||
nodes.forEach(nodesSet::add); | ||
logger.trace("discovered {}", nodesSet); | ||
if (nodesSet.size() >= request.getWaitForNodes() && listenerNotified.compareAndSet(false, true)) { | ||
if (checkWaitRequirements(request, nodesSet) && listenerNotified.compareAndSet(false, true)) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think this needs a public void testThrowsExceptionIfDuplicateDiscoveredLater() throws InterruptedException {
new TransportGetDiscoveredNodesAction(Settings.EMPTY, EMPTY_FILTERS, transportService, coordinator); // registers action
transportService.start();
transportService.acceptIncomingRequests();
coordinator.start();
coordinator.startInitialJoin();
final GetDiscoveredNodesRequest getDiscoveredNodesRequest = new GetDiscoveredNodesRequest();
final String ip = localNode.getAddress().getAddress();
getDiscoveredNodesRequest.setRequiredNodes(Collections.singletonList(ip));
getDiscoveredNodesRequest.setWaitForNodes(2);
final CountDownLatch countDownLatch = new CountDownLatch(1);
transportService.sendRequest(localNode, GetDiscoveredNodesAction.NAME, getDiscoveredNodesRequest, new ResponseHandler() {
@Override
public void handleResponse(GetDiscoveredNodesResponse response) {
throw new AssertionError("should not be called");
}
@Override
public void handleException(TransportException exp) {
Throwable t = exp.getRootCause();
assertThat(t, instanceOf(IllegalArgumentException.class));
assertThat(t.getMessage(), startsWith('[' + ip + "] matches ["));
countDownLatch.countDown();
}
});
threadPool.generic().execute(() ->
transportService.sendRequest(localNode, REQUEST_PEERS_ACTION_NAME, new PeersRequest(otherNode, emptyList()),
new TransportResponseHandler<PeersResponse>() {
@Override
public PeersResponse read(StreamInput in) throws IOException {
return new PeersResponse(in);
}
@Override
public void handleResponse(PeersResponse response) {
logger.info("response: {}", response);
}
@Override
public void handleException(TransportException exp) {
logger.info("exception", exp);
}
@Override
public String executor() {
return Names.SAME;
}
}));
assertTrue(countDownLatch.await(10, TimeUnit.SECONDS));
} There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Added :) (tried making it a little nicer by drying it up against other tests) |
||
listenableFuture.onResponse(new GetDiscoveredNodesResponse(nodesSet)); | ||
} | ||
} | ||
|
@@ -124,4 +125,16 @@ public String toString() { | |
}); | ||
} | ||
} | ||
|
||
private static boolean checkWaitRequirements(GetDiscoveredNodesRequest request, Set<DiscoveryNode> nodes) { | ||
original-brownbear marked this conversation as resolved.
Show resolved
Hide resolved
|
||
Set<String> requiredNodes = new HashSet<>(request.getRequiredNodes()); | ||
for (final DiscoveryNode node : nodes) { | ||
DaveCTurner marked this conversation as resolved.
Show resolved
Hide resolved
|
||
requiredNodes.remove(node.getAddress().toString()); | ||
requiredNodes.remove(node.getName()); | ||
if (requiredNodes.isEmpty()) { | ||
break; | ||
} | ||
} | ||
return requiredNodes.isEmpty() && nodes.size() >= request.getWaitForNodes(); | ||
} | ||
} |
Uh oh!
There was an error while loading. Please reload this page.