Skip to content

Commit 6e4f304

Browse files
committed
Synchronize pendingOutgoingJoins (#39900)
Today we use a ConcurrentHashSet to track the in-flight outgoing joins in the `JoinHelper`. This is fine for adding and removing elements but not for the emptiness test in `isJoinPending()` which might return false if one join finishes just after another one starts, even though joins were pending throughout. As used today this is ok: it means the node was trying to join a master but this join attempt just finished unsuccessfully, and causes it to (rightfully) reject a `FollowerCheck` from the failed master. However this kind of API inconsistency is trappy and there is no need to be clever here, so this change replaces the set with a `synchronizedSet()`.
1 parent a079b9f commit 6e4f304

File tree

1 file changed

+4
-4
lines changed

1 file changed

+4
-4
lines changed

server/src/main/java/org/elasticsearch/cluster/coordination/JoinHelper.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,6 @@
3636
import org.elasticsearch.common.settings.Setting;
3737
import org.elasticsearch.common.settings.Settings;
3838
import org.elasticsearch.common.unit.TimeValue;
39-
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
4039
import org.elasticsearch.discovery.zen.MembershipAction;
4140
import org.elasticsearch.discovery.zen.ZenDiscovery;
4241
import org.elasticsearch.threadpool.ThreadPool;
@@ -53,7 +52,9 @@
5352

5453
import java.io.IOException;
5554
import java.util.Collection;
55+
import java.util.Collections;
5656
import java.util.HashMap;
57+
import java.util.HashSet;
5758
import java.util.LinkedHashMap;
5859
import java.util.List;
5960
import java.util.Map;
@@ -82,7 +83,7 @@ public class JoinHelper {
8283
private final JoinTaskExecutor joinTaskExecutor;
8384
private final TimeValue joinTimeout;
8485

85-
final Set<Tuple<DiscoveryNode, JoinRequest>> pendingOutgoingJoins = ConcurrentCollections.newConcurrentSet();
86+
private final Set<Tuple<DiscoveryNode, JoinRequest>> pendingOutgoingJoins = Collections.synchronizedSet(new HashSet<>());
8687

8788
public JoinHelper(Settings settings, AllocationService allocationService, MasterService masterService,
8889
TransportService transportService, LongSupplier currentTermSupplier, Supplier<ClusterState> currentStateSupplier,
@@ -190,8 +191,7 @@ public String toString() {
190191
}
191192

192193
boolean isJoinPending() {
193-
// cannot use pendingOutgoingJoins.isEmpty() because it's not properly synchronized.
194-
return pendingOutgoingJoins.iterator().hasNext();
194+
return pendingOutgoingJoins.isEmpty() == false;
195195
}
196196

197197
public void sendJoinRequest(DiscoveryNode destination, Optional<Join> optionalJoin) {

0 commit comments

Comments
 (0)