From b7984ce365e3d0da978eb8171419a46f3c680af4 Mon Sep 17 00:00:00 2001 From: David Turner Date: Mon, 11 Mar 2019 08:24:58 +0000 Subject: [PATCH 1/2] Synchronize pendingOutgoingJoins Today we use a ConcurrentHashSet to track the in-flight outgoing joins in the `JoinHelper`. This is fine for adding and removing elements but not for the emptiness test in `isJoinPending()` which might return false if one join finishes just after another one starts, even though joins were pending throughout. As used today this is ok: it means the node was trying to join a master but this join attempt just finished unsuccessfully, and causes it to (rightfully) reject a `FollowerCheck` from the failed master. However this kind of API inconsistency is trappy and there is no need to be clever here, so this change replaces the set with a synchronizedSet(). --- .../cluster/coordination/JoinHelper.java | 14 +++++++------- .../cluster/coordination/JoinTaskExecutor.java | 2 +- .../indices/cluster/ClusterStateChanges.java | 2 +- 3 files changed, 9 insertions(+), 9 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/JoinHelper.java b/server/src/main/java/org/elasticsearch/cluster/coordination/JoinHelper.java index 138cba10f1c8d..fab22a6ccb16d 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/JoinHelper.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/JoinHelper.java @@ -36,7 +36,6 @@ import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool.Names; import org.elasticsearch.transport.EmptyTransportResponseHandler; @@ -51,7 +50,9 @@ import java.io.IOException; import java.util.Collection; +import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; @@ -80,7 +81,7 @@ public class JoinHelper { private final JoinTaskExecutor joinTaskExecutor; private final TimeValue joinTimeout; - final Set> pendingOutgoingJoins = ConcurrentCollections.newConcurrentSet(); + private final Set> pendingOutgoingJoins = Collections.synchronizedSet(new HashSet<>()); JoinHelper(Settings settings, AllocationService allocationService, MasterService masterService, TransportService transportService, LongSupplier currentTermSupplier, Supplier currentStateSupplier, @@ -89,7 +90,7 @@ public class JoinHelper { this.masterService = masterService; this.transportService = transportService; this.joinTimeout = JOIN_TIMEOUT_SETTING.get(settings); - this.joinTaskExecutor = new JoinTaskExecutor(settings, allocationService, logger) { + this.joinTaskExecutor = new JoinTaskExecutor(allocationService, logger) { @Override public ClusterTasksResult execute(ClusterState currentState, List joiningTasks) @@ -168,8 +169,7 @@ public String toString() { } boolean isJoinPending() { - // cannot use pendingOutgoingJoins.isEmpty() because it's not properly synchronized. - return pendingOutgoingJoins.iterator().hasNext(); + return pendingOutgoingJoins.isEmpty() == false; } void sendJoinRequest(DiscoveryNode destination, Optional optionalJoin) { @@ -208,7 +208,7 @@ public String executor() { } } - public void sendStartJoinRequest(final StartJoinRequest startJoinRequest, final DiscoveryNode destination) { + void sendStartJoinRequest(final StartJoinRequest startJoinRequest, final DiscoveryNode destination) { assert startJoinRequest.getSourceNode().isMasterNode() : "sending start-join request for master-ineligible " + startJoinRequest.getSourceNode(); transportService.sendRequest(destination, START_JOIN_ACTION_NAME, @@ -235,7 +235,7 @@ public String executor() { }); } - public void sendValidateJoinRequest(DiscoveryNode node, ClusterState state, ActionListener listener) { + void sendValidateJoinRequest(DiscoveryNode node, ClusterState state, ActionListener listener) { transportService.sendRequest(node, VALIDATE_JOIN_ACTION_NAME, new ValidateJoinRequest(state), TransportRequestOptions.builder().withTimeout(joinTimeout).build(), diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/JoinTaskExecutor.java b/server/src/main/java/org/elasticsearch/cluster/coordination/JoinTaskExecutor.java index cc7128fcdcd4c..d5b53de5850aa 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/JoinTaskExecutor.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/JoinTaskExecutor.java @@ -81,7 +81,7 @@ public boolean isFinishElectionTask() { private static final String FINISH_ELECTION_TASK_REASON = "_FINISH_ELECTION_"; } - public JoinTaskExecutor(Settings settings, AllocationService allocationService, Logger logger) { + public JoinTaskExecutor(AllocationService allocationService, Logger logger) { this.allocationService = allocationService; this.logger = logger; } diff --git a/server/src/test/java/org/elasticsearch/indices/cluster/ClusterStateChanges.java b/server/src/test/java/org/elasticsearch/indices/cluster/ClusterStateChanges.java index c1e32be9d29af..a8c47f5d3ef39 100644 --- a/server/src/test/java/org/elasticsearch/indices/cluster/ClusterStateChanges.java +++ b/server/src/test/java/org/elasticsearch/indices/cluster/ClusterStateChanges.java @@ -213,7 +213,7 @@ allocationService, new AliasValidator(), environment, transportService, clusterService, threadPool, createIndexService, actionFilters, indexNameExpressionResolver); nodeRemovalExecutor = new NodeRemovalClusterStateTaskExecutor(allocationService, logger); - joinTaskExecutor = new JoinTaskExecutor(Settings.EMPTY, allocationService, logger); + joinTaskExecutor = new JoinTaskExecutor(allocationService, logger); } public ClusterState createIndex(ClusterState state, CreateIndexRequest request) { From 0780d157e170217b86ebb5e210072a1b72f65806 Mon Sep 17 00:00:00 2001 From: David Turner Date: Mon, 11 Mar 2019 09:00:40 +0000 Subject: [PATCH 2/2] Imports --- .../org/elasticsearch/cluster/coordination/JoinTaskExecutor.java | 1 - 1 file changed, 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/JoinTaskExecutor.java b/server/src/main/java/org/elasticsearch/cluster/coordination/JoinTaskExecutor.java index d5b53de5850aa..7434a246eed8a 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/JoinTaskExecutor.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/JoinTaskExecutor.java @@ -29,7 +29,6 @@ import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.routing.allocation.AllocationService; -import org.elasticsearch.common.settings.Settings; import org.elasticsearch.persistent.PersistentTasksCustomMetaData; import java.util.ArrayList;