Skip to content

Commit 470c78b

Browse files
authored
Synchronize pendingOutgoingJoins (#39900)
Today we use a ConcurrentHashSet to track the in-flight outgoing joins in the `JoinHelper`. This is fine for adding and removing elements but not for the emptiness test in `isJoinPending()` which might return false if one join finishes just after another one starts, even though joins were pending throughout. As used today this is ok: it means the node was trying to join a master but this join attempt just finished unsuccessfully, and causes it to (rightfully) reject a `FollowerCheck` from the failed master. However this kind of API inconsistency is trappy and there is no need to be clever here, so this change replaces the set with a `synchronizedSet()`.
1 parent abb8140 commit 470c78b

File tree

3 files changed

+9
-10
lines changed

3 files changed

+9
-10
lines changed

server/src/main/java/org/elasticsearch/cluster/coordination/JoinHelper.java

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,6 @@
3636
import org.elasticsearch.common.settings.Setting;
3737
import org.elasticsearch.common.settings.Settings;
3838
import org.elasticsearch.common.unit.TimeValue;
39-
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
4039
import org.elasticsearch.threadpool.ThreadPool;
4140
import org.elasticsearch.threadpool.ThreadPool.Names;
4241
import org.elasticsearch.transport.EmptyTransportResponseHandler;
@@ -51,7 +50,9 @@
5150

5251
import java.io.IOException;
5352
import java.util.Collection;
53+
import java.util.Collections;
5454
import java.util.HashMap;
55+
import java.util.HashSet;
5556
import java.util.LinkedHashMap;
5657
import java.util.List;
5758
import java.util.Map;
@@ -80,7 +81,7 @@ public class JoinHelper {
8081
private final JoinTaskExecutor joinTaskExecutor;
8182
private final TimeValue joinTimeout;
8283

83-
final Set<Tuple<DiscoveryNode, JoinRequest>> pendingOutgoingJoins = ConcurrentCollections.newConcurrentSet();
84+
private final Set<Tuple<DiscoveryNode, JoinRequest>> pendingOutgoingJoins = Collections.synchronizedSet(new HashSet<>());
8485

8586
JoinHelper(Settings settings, AllocationService allocationService, MasterService masterService,
8687
TransportService transportService, LongSupplier currentTermSupplier, Supplier<ClusterState> currentStateSupplier,
@@ -89,7 +90,7 @@ public class JoinHelper {
8990
this.masterService = masterService;
9091
this.transportService = transportService;
9192
this.joinTimeout = JOIN_TIMEOUT_SETTING.get(settings);
92-
this.joinTaskExecutor = new JoinTaskExecutor(settings, allocationService, logger) {
93+
this.joinTaskExecutor = new JoinTaskExecutor(allocationService, logger) {
9394

9495
@Override
9596
public ClusterTasksResult<JoinTaskExecutor.Task> execute(ClusterState currentState, List<JoinTaskExecutor.Task> joiningTasks)
@@ -168,8 +169,7 @@ public String toString() {
168169
}
169170

170171
boolean isJoinPending() {
171-
// cannot use pendingOutgoingJoins.isEmpty() because it's not properly synchronized.
172-
return pendingOutgoingJoins.iterator().hasNext();
172+
return pendingOutgoingJoins.isEmpty() == false;
173173
}
174174

175175
void sendJoinRequest(DiscoveryNode destination, Optional<Join> optionalJoin) {
@@ -208,7 +208,7 @@ public String executor() {
208208
}
209209
}
210210

211-
public void sendStartJoinRequest(final StartJoinRequest startJoinRequest, final DiscoveryNode destination) {
211+
void sendStartJoinRequest(final StartJoinRequest startJoinRequest, final DiscoveryNode destination) {
212212
assert startJoinRequest.getSourceNode().isMasterNode()
213213
: "sending start-join request for master-ineligible " + startJoinRequest.getSourceNode();
214214
transportService.sendRequest(destination, START_JOIN_ACTION_NAME,
@@ -235,7 +235,7 @@ public String executor() {
235235
});
236236
}
237237

238-
public void sendValidateJoinRequest(DiscoveryNode node, ClusterState state, ActionListener<TransportResponse.Empty> listener) {
238+
void sendValidateJoinRequest(DiscoveryNode node, ClusterState state, ActionListener<TransportResponse.Empty> listener) {
239239
transportService.sendRequest(node, VALIDATE_JOIN_ACTION_NAME,
240240
new ValidateJoinRequest(state),
241241
TransportRequestOptions.builder().withTimeout(joinTimeout).build(),

server/src/main/java/org/elasticsearch/cluster/coordination/JoinTaskExecutor.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@
2929
import org.elasticsearch.cluster.node.DiscoveryNode;
3030
import org.elasticsearch.cluster.node.DiscoveryNodes;
3131
import org.elasticsearch.cluster.routing.allocation.AllocationService;
32-
import org.elasticsearch.common.settings.Settings;
3332
import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
3433

3534
import java.util.ArrayList;
@@ -81,7 +80,7 @@ public boolean isFinishElectionTask() {
8180
private static final String FINISH_ELECTION_TASK_REASON = "_FINISH_ELECTION_";
8281
}
8382

84-
public JoinTaskExecutor(Settings settings, AllocationService allocationService, Logger logger) {
83+
public JoinTaskExecutor(AllocationService allocationService, Logger logger) {
8584
this.allocationService = allocationService;
8685
this.logger = logger;
8786
}

server/src/test/java/org/elasticsearch/indices/cluster/ClusterStateChanges.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -213,7 +213,7 @@ allocationService, new AliasValidator(), environment,
213213
transportService, clusterService, threadPool, createIndexService, actionFilters, indexNameExpressionResolver);
214214

215215
nodeRemovalExecutor = new NodeRemovalClusterStateTaskExecutor(allocationService, logger);
216-
joinTaskExecutor = new JoinTaskExecutor(Settings.EMPTY, allocationService, logger);
216+
joinTaskExecutor = new JoinTaskExecutor(allocationService, logger);
217217
}
218218

219219
public ClusterState createIndex(ClusterState state, CreateIndexRequest request) {

0 commit comments

Comments
 (0)