Skip to content

Remove join timeout #60873

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions docs/reference/migration/migrate_8_0/cluster.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -24,3 +24,14 @@ to exclude instead of using a node filter. Requests submitted to the
`/_cluster/voting_config_exclusions/{node_filter}` endpoint will return an
error.
====

.The `cluster.join.timeout` setting has been removed.
[%collapsible]
====
*Details* +
The `cluster.join.timeout` setting has been removed. Join attempts no longer
time out.

*Impact* +
Do not set `cluster.join.timeout` in your `elasticsearch.yml` file.
====
14 changes: 4 additions & 10 deletions docs/reference/modules/discovery/discovery-settings.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,10 @@ for `discovery.seed_hosts` is `["127.0.0.1", "[::1]"]`. See <<unicast.hosts>>.

Specifies whether {es} should form a multiple-node cluster. By default, {es}
discovers other nodes when forming a cluster and allows other nodes to join
the cluster later. If `discovery.type` is set to `single-node`, {es} forms a
single-node cluster and suppresses the timeouts set by
`cluster.publish.timeout` and `cluster.join.timeout`. For more information
about when you might use this setting, see <<single-node-discovery>>.
the cluster later. If `discovery.type` is set to `single-node`, {es} forms
a single-node cluster and suppresses the timeout set by
`cluster.publish.timeout`. For more information about when you might use
this setting, see <<single-node-discovery>>.

`cluster.initial_master_nodes`::

Expand Down Expand Up @@ -179,12 +179,6 @@ or may become unstable or intolerant of certain failures.
time, it is considered to have failed and is removed from the cluster. See
<<cluster-state-publishing>>.

`cluster.join.timeout`::

Sets how long a node will wait after sending a request to join a cluster
before it considers the request to have failed and retries, unless
`discovery.type` is set to `single-node`. Defaults to `60s`.

`cluster.max_voting_config_exclusions`::

Sets a limit on the number of voting configuration exclusions at any one
Expand Down
4 changes: 1 addition & 3 deletions docs/reference/setup/add-nodes.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,7 @@ to the voting configuration if it is appropriate to do so.

During master election or when joining an existing formed cluster, a node
sends a join request to the master in order to be officially added to the
cluster. You can use the `cluster.join.timeout` setting to configure how long a
node waits after sending a request to join a cluster. Its default value is `30s`.
See <<modules-discovery-settings>>.
cluster.

[discrete]
[[modules-discovery-removing-nodes]]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,15 +52,11 @@ protected Collection<Class<? extends Plugin>> nodePlugins() {
public void testMasterFailoverDuringIndexingWithMappingChanges() throws Throwable {
logger.info("--> start 4 nodes, 3 master, 1 data");

final Settings sharedSettings = Settings.builder()
.put("cluster.join.timeout", "10s") // still long to induce failures but not too long so test won't time out
.build();

internalCluster().setBootstrapMasterNodeIndex(2);

internalCluster().startMasterOnlyNodes(3, sharedSettings);
internalCluster().startMasterOnlyNodes(3, Settings.EMPTY);

String dataNode = internalCluster().startDataOnlyNode(sharedSettings);
String dataNode = internalCluster().startDataOnlyNode(Settings.EMPTY);

logger.info("--> wait for all nodes to join the cluster");
ensureStableCluster(4);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,9 +168,9 @@ public Coordinator(String nodeName, Settings settings, ClusterSettings clusterSe
this.onJoinValidators = JoinTaskExecutor.addBuiltInJoinValidators(onJoinValidators);
this.singleNodeDiscovery = DiscoveryModule.isSingleNodeDiscovery(settings);
this.electionStrategy = electionStrategy;
this.joinHelper = new JoinHelper(settings, allocationService, masterService, transportService,
this::getCurrentTerm, this::getStateForMasterService, this::handleJoinRequest, this::joinLeaderInTerm, this.onJoinValidators,
rerouteService, nodeHealthService);
this.joinHelper = new JoinHelper(allocationService, masterService, transportService, this::getCurrentTerm,
this::getStateForMasterService, this::handleJoinRequest, this::joinLeaderInTerm, this.onJoinValidators, rerouteService,
nodeHealthService);
this.persistedStateSupplier = persistedStateSupplier;
this.noMasterBlockService = new NoMasterBlockService(settings, clusterSettings);
this.lastKnownLeader = Optional.empty();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,22 +34,17 @@
import org.elasticsearch.cluster.routing.RerouteService;
import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.cluster.service.MasterService;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.discovery.DiscoveryModule;
import org.elasticsearch.monitor.NodeHealthService;
import org.elasticsearch.monitor.StatusInfo;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.threadpool.ThreadPool.Names;
import org.elasticsearch.transport.TransportChannel;
import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.TransportRequest;
import org.elasticsearch.transport.TransportRequestOptions;
import org.elasticsearch.transport.TransportResponse;
import org.elasticsearch.transport.TransportResponse.Empty;
import org.elasticsearch.transport.TransportResponseHandler;
Expand Down Expand Up @@ -81,32 +76,22 @@ public class JoinHelper {
public static final String VALIDATE_JOIN_ACTION_NAME = "internal:cluster/coordination/join/validate";
public static final String START_JOIN_ACTION_NAME = "internal:cluster/coordination/start_join";

// the timeout for each join attempt
public static final Setting<TimeValue> JOIN_TIMEOUT_SETTING =
Setting.timeSetting("cluster.join.timeout",
TimeValue.timeValueMillis(60000), TimeValue.timeValueMillis(1), Setting.Property.NodeScope);

private final MasterService masterService;
private final TransportService transportService;
private final JoinTaskExecutor joinTaskExecutor;

@Nullable // if using single-node discovery
private final TimeValue joinTimeout;
private final NodeHealthService nodeHealthService;

private final Set<Tuple<DiscoveryNode, JoinRequest>> pendingOutgoingJoins = Collections.synchronizedSet(new HashSet<>());
private final AtomicReference<FailedJoinAttempt> lastFailedJoinAttempt = new AtomicReference<>();

private AtomicReference<FailedJoinAttempt> lastFailedJoinAttempt = new AtomicReference<>();

JoinHelper(Settings settings, AllocationService allocationService, MasterService masterService,
TransportService transportService, LongSupplier currentTermSupplier, Supplier<ClusterState> currentStateSupplier,
JoinHelper(AllocationService allocationService, MasterService masterService, TransportService transportService,
LongSupplier currentTermSupplier, Supplier<ClusterState> currentStateSupplier,
BiConsumer<JoinRequest, JoinCallback> joinHandler, Function<StartJoinRequest, Join> joinLeaderInTerm,
Collection<BiConsumer<DiscoveryNode, ClusterState>> joinValidators, RerouteService rerouteService,
NodeHealthService nodeHealthService) {
this.masterService = masterService;
this.transportService = transportService;
this.nodeHealthService = nodeHealthService;
this.joinTimeout = DiscoveryModule.isSingleNodeDiscovery(settings) ? null : JOIN_TIMEOUT_SETTING.get(settings);
this.joinTaskExecutor = new JoinTaskExecutor(allocationService, logger, rerouteService) {

@Override
Expand Down Expand Up @@ -249,7 +234,6 @@ public void sendJoinRequest(DiscoveryNode destination, long term, Optional<Join>
if (pendingOutgoingJoins.add(dedupKey)) {
logger.debug("attempting to join {} with {}", destination, joinRequest);
transportService.sendRequest(destination, JOIN_ACTION_NAME, joinRequest,
TransportRequestOptions.builder().withTimeout(joinTimeout).build(),
new TransportResponseHandler<Empty>() {
@Override
public Empty read(StreamInput in) {
Expand Down Expand Up @@ -309,10 +293,8 @@ public String executor() {
}

void sendValidateJoinRequest(DiscoveryNode node, ClusterState state, ActionListener<TransportResponse.Empty> listener) {
transportService.sendRequest(node, VALIDATE_JOIN_ACTION_NAME,
new ValidateJoinRequest(state),
TransportRequestOptions.builder().withTimeout(joinTimeout).build(),
new ActionListenerResponseHandler<>(listener, i -> Empty.INSTANCE, ThreadPool.Names.GENERIC));
transportService.sendRequest(node, VALIDATE_JOIN_ACTION_NAME, new ValidateJoinRequest(state),
new ActionListenerResponseHandler<>(listener, i -> Empty.INSTANCE, ThreadPool.Names.GENERIC));
}

public interface JoinCallback {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import org.apache.logging.log4j.LogManager;
import org.elasticsearch.action.admin.cluster.configuration.TransportAddVotingConfigExclusionsAction;
import org.elasticsearch.action.admin.indices.close.TransportCloseIndexAction;
import org.elasticsearch.index.IndexingPressure;
import org.elasticsearch.action.search.TransportSearchAction;
import org.elasticsearch.action.support.AutoCreateIndex;
import org.elasticsearch.action.support.DestructiveOperations;
Expand All @@ -38,7 +37,6 @@
import org.elasticsearch.cluster.coordination.Coordinator;
import org.elasticsearch.cluster.coordination.ElectionSchedulerFactory;
import org.elasticsearch.cluster.coordination.FollowersChecker;
import org.elasticsearch.cluster.coordination.JoinHelper;
import org.elasticsearch.cluster.coordination.LagDetector;
import org.elasticsearch.cluster.coordination.LeaderChecker;
import org.elasticsearch.cluster.coordination.NoMasterBlockService;
Expand Down Expand Up @@ -79,6 +77,7 @@
import org.elasticsearch.http.HttpTransportSettings;
import org.elasticsearch.index.IndexModule;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.IndexingPressure;
import org.elasticsearch.indices.IndexingMemoryController;
import org.elasticsearch.indices.IndicesQueryCache;
import org.elasticsearch.indices.IndicesRequestCache;
Expand Down Expand Up @@ -469,7 +468,6 @@ public void apply(Settings value, Settings current, Settings previous) {
ElectionSchedulerFactory.ELECTION_DURATION_SETTING,
Coordinator.PUBLISH_TIMEOUT_SETTING,
Coordinator.PUBLISH_INFO_TIMEOUT_SETTING,
JoinHelper.JOIN_TIMEOUT_SETTING,
FollowersChecker.FOLLOWER_CHECK_TIMEOUT_SETTING,
FollowersChecker.FOLLOWER_CHECK_INTERVAL_SETTING,
FollowersChecker.FOLLOWER_CHECK_RETRY_COUNT_SETTING,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ public void testJoinDeduplication() {
TransportService transportService = capturingTransport.createTransportService(Settings.EMPTY,
deterministicTaskQueue.getThreadPool(), TransportService.NOOP_TRANSPORT_INTERCEPTOR,
x -> localNode, null, Collections.emptySet());
JoinHelper joinHelper = new JoinHelper(Settings.EMPTY, null, null, transportService, () -> 0L, () -> null,
JoinHelper joinHelper = new JoinHelper(null, null, transportService, () -> 0L, () -> null,
(joinRequest, joinCallback) -> { throw new AssertionError(); }, startJoinRequest -> { throw new AssertionError(); },
Collections.emptyList(), (s, p, r) -> {},
() -> new StatusInfo(HEALTHY, "info"));
Expand Down Expand Up @@ -156,7 +156,7 @@ public void testJoinValidationRejectsMismatchedClusterUUID() {
TransportService transportService = mockTransport.createTransportService(Settings.EMPTY,
deterministicTaskQueue.getThreadPool(), TransportService.NOOP_TRANSPORT_INTERCEPTOR,
x -> localNode, null, Collections.emptySet());
new JoinHelper(Settings.EMPTY, null, null, transportService, () -> 0L, () -> localClusterState,
new JoinHelper(null, null, transportService, () -> 0L, () -> localClusterState,
(joinRequest, joinCallback) -> { throw new AssertionError(); }, startJoinRequest -> { throw new AssertionError(); },
Collections.emptyList(), (s, p, r) -> {}, null); // registers request handler
transportService.start();
Expand Down Expand Up @@ -189,9 +189,9 @@ public void testJoinFailureOnUnhealthyNodes() {
x -> localNode, null, Collections.emptySet());
AtomicReference<StatusInfo> nodeHealthServiceStatus = new AtomicReference<>
(new StatusInfo(UNHEALTHY, "unhealthy-info"));
JoinHelper joinHelper = new JoinHelper(Settings.EMPTY, null, null, transportService, () -> 0L, () -> null,
JoinHelper joinHelper = new JoinHelper(null, null, transportService, () -> 0L, () -> null,
(joinRequest, joinCallback) -> { throw new AssertionError(); }, startJoinRequest -> { throw new AssertionError(); },
Collections.emptyList(), (s, p, r) -> {}, () -> nodeHealthServiceStatus.get());
Collections.emptyList(), (s, p, r) -> {}, nodeHealthServiceStatus::get);
transportService.start();

DiscoveryNode node1 = new DiscoveryNode("node1", buildNewFakeTransportAddress(), Version.CURRENT);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.coordination.Coordinator;
import org.elasticsearch.cluster.coordination.FollowersChecker;
import org.elasticsearch.cluster.coordination.JoinHelper;
import org.elasticsearch.cluster.coordination.LeaderChecker;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.common.Nullable;
Expand Down Expand Up @@ -126,7 +125,6 @@ List<String> startCluster(int numberOfNodes) {
.put(LeaderChecker.LEADER_CHECK_RETRY_COUNT_SETTING.getKey(), 1) // for hitting simulated network failures quickly
.put(FollowersChecker.FOLLOWER_CHECK_TIMEOUT_SETTING.getKey(), "5s") // for hitting simulated network failures quickly
.put(FollowersChecker.FOLLOWER_CHECK_RETRY_COUNT_SETTING.getKey(), 1) // for hitting simulated network failures quickly
.put(JoinHelper.JOIN_TIMEOUT_SETTING.getKey(), "10s") // still long to induce failures but to long so test won't time out
.put(Coordinator.PUBLISH_TIMEOUT_SETTING.getKey(), "5s") // <-- for hitting simulated network failures quickly
.put(TransportSettings.CONNECT_TIMEOUT.getKey(), "10s") // Network delay disruption waits for the min between this
// value and the time of disruption and does not recover immediately
Expand Down