Skip to content

Commit a67e07e

Browse files
authored
Improve control of outgoing connection lifecycles (#77295)
Today we open connections to other nodes in various places and largely assume that they remain open as needed, only closing them when applying a cluster state that removes the remote node from the cluster. This isn't ideal: we might preserve unnecessary connections to remote nodes that aren't in the cluster if they never manage to join the cluster, and we might also disconnect from a node that left the cluster while it's in the process of re-joining too (see #67873). With this commit we move to a model in which each user of a connection to a remote node acquires a reference to the connection that must be released once it's no longer needed. Connections remain open while there are any live references, but are now actively closed when all references are released. Fixes #67873
1 parent 00c3b5c commit a67e07e

File tree

45 files changed

+1262
-575
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

45 files changed

+1262
-575
lines changed

libs/core/src/main/java/org/elasticsearch/core/AbstractRefCounted.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,11 @@ public final boolean decRef() {
5858
return false;
5959
}
6060

61+
@Override
62+
public final boolean hasReferences() {
63+
return refCount.get() > 0;
64+
}
65+
6166
/**
6267
* Called whenever the ref count is incremented or decremented. Can be overridden to record access to the instance for debugging
6368
* purposes.
@@ -74,7 +79,7 @@ protected void alreadyClosed() {
7479
/**
7580
* Returns the current reference count.
7681
*/
77-
public int refCount() {
82+
public final int refCount() {
7883
return this.refCount.get();
7984
}
8085

libs/core/src/main/java/org/elasticsearch/core/RefCounted.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,4 +54,12 @@ public interface RefCounted {
5454
* @return returns {@code true} if the ref count dropped to 0 as a result of calling this method
5555
*/
5656
boolean decRef();
57+
58+
/**
59+
* Returns {@code true} only if there was at least one active reference when the method was called; if it returns {@code false} then the
60+
* object is closed; future attempts to acquire references will fail.
61+
*
62+
* @return whether there are currently any active references to this object.
63+
*/
64+
boolean hasReferences();
5765
}

libs/core/src/test/java/org/elasticsearch/common/util/concurrent/RefCountedTests.java

Lines changed: 8 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -59,13 +59,7 @@ public void testRefCount() {
5959
assertThat(
6060
expectThrows(IllegalStateException.class, counted::incRef).getMessage(),
6161
equalTo(AbstractRefCounted.ALREADY_CLOSED_MESSAGE));
62-
63-
try {
64-
counted.ensureOpen();
65-
fail(" expected exception");
66-
} catch (IllegalStateException ex) {
67-
assertThat(ex.getMessage(), equalTo("closed"));
68-
}
62+
assertThat(expectThrows(IllegalStateException.class, counted::ensureOpen).getMessage(), equalTo("closed"));
6963
}
7064

7165
public void testMultiThreaded() throws InterruptedException {
@@ -79,6 +73,7 @@ public void testMultiThreaded() throws InterruptedException {
7973
latch.await();
8074
for (int j = 0; j < 10000; j++) {
8175
counted.incRef();
76+
assertTrue(counted.hasReferences());
8277
try {
8378
counted.ensureOpen();
8479
} finally {
@@ -96,13 +91,11 @@ public void testMultiThreaded() throws InterruptedException {
9691
thread.join();
9792
}
9893
counted.decRef();
99-
try {
100-
counted.ensureOpen();
101-
fail("expected to be closed");
102-
} catch (IllegalStateException ex) {
103-
assertThat(ex.getMessage(), equalTo("closed"));
104-
}
94+
assertThat(expectThrows(IllegalStateException.class, counted::ensureOpen).getMessage(), equalTo("closed"));
95+
assertThat(expectThrows(IllegalStateException.class, counted::incRef).getMessage(),
96+
equalTo(AbstractRefCounted.ALREADY_CLOSED_MESSAGE));
10597
assertThat(counted.refCount(), is(0));
98+
assertFalse(counted.hasReferences());
10699
assertThat(exceptions, Matchers.emptyIterable());
107100
}
108101

@@ -117,7 +110,8 @@ protected void closeInternal() {
117110

118111
public void ensureOpen() {
119112
if (closed.get()) {
120-
assert this.refCount() == 0;
113+
assertEquals(0, this.refCount());
114+
assertFalse(hasReferences());
121115
throw new IllegalStateException("closed");
122116
}
123117
}

server/src/internalClusterTest/java/org/elasticsearch/action/admin/cluster/node/tasks/CancellableTasksIT.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -331,7 +331,9 @@ public void testRemoveBanParentsOnDisconnect() throws Exception {
331331
if (bannedParent.getNodeId().equals(node.getId()) && randomBoolean()) {
332332
Collection<Transport.Connection> childConns = taskManager.startBanOnChildTasks(bannedParent.getId(), "", () -> {});
333333
for (Transport.Connection connection : randomSubsetOf(childConns)) {
334-
connection.close();
334+
if (connection.getNode().equals(node) == false) {
335+
connection.close();
336+
}
335337
}
336338
}
337339
}

server/src/internalClusterTest/java/org/elasticsearch/discovery/ClusterDisruptionIT.java

Lines changed: 70 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,19 +16,22 @@
1616
import org.elasticsearch.action.get.GetResponse;
1717
import org.elasticsearch.action.index.IndexRequestBuilder;
1818
import org.elasticsearch.action.index.IndexResponse;
19+
import org.elasticsearch.action.support.PlainActionFuture;
1920
import org.elasticsearch.client.Client;
2021
import org.elasticsearch.cluster.ClusterState;
2122
import org.elasticsearch.cluster.action.shard.ShardStateAction;
2223
import org.elasticsearch.cluster.coordination.ClusterBootstrapService;
24+
import org.elasticsearch.cluster.coordination.FollowersChecker;
2325
import org.elasticsearch.cluster.coordination.LagDetector;
2426
import org.elasticsearch.cluster.metadata.IndexMetadata;
2527
import org.elasticsearch.cluster.routing.Murmur3HashFunction;
2628
import org.elasticsearch.cluster.routing.ShardRouting;
2729
import org.elasticsearch.cluster.routing.ShardRoutingState;
30+
import org.elasticsearch.cluster.service.ClusterService;
2831
import org.elasticsearch.common.settings.Settings;
29-
import org.elasticsearch.core.TimeValue;
3032
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
3133
import org.elasticsearch.common.xcontent.XContentType;
34+
import org.elasticsearch.core.TimeValue;
3235
import org.elasticsearch.index.VersionType;
3336
import org.elasticsearch.index.shard.IndexShard;
3437
import org.elasticsearch.index.shard.IndexShardTestCase;
@@ -40,6 +43,8 @@
4043
import org.elasticsearch.test.disruption.NetworkDisruption.TwoPartitions;
4144
import org.elasticsearch.test.disruption.ServiceDisruptionScheme;
4245
import org.elasticsearch.test.junit.annotations.TestIssueLogging;
46+
import org.elasticsearch.test.transport.MockTransportService;
47+
import org.elasticsearch.transport.TransportService;
4348

4449
import java.util.ArrayList;
4550
import java.util.Collections;
@@ -59,6 +64,11 @@
5964

6065
import static org.elasticsearch.action.DocWriteResponse.Result.CREATED;
6166
import static org.elasticsearch.action.DocWriteResponse.Result.UPDATED;
67+
import static org.elasticsearch.cluster.coordination.FollowersChecker.FOLLOWER_CHECK_INTERVAL_SETTING;
68+
import static org.elasticsearch.cluster.coordination.FollowersChecker.FOLLOWER_CHECK_RETRY_COUNT_SETTING;
69+
import static org.elasticsearch.cluster.coordination.LeaderChecker.LEADER_CHECK_INTERVAL_SETTING;
70+
import static org.elasticsearch.cluster.coordination.LeaderChecker.LEADER_CHECK_RETRY_COUNT_SETTING;
71+
import static org.elasticsearch.discovery.PeerFinder.DISCOVERY_FIND_PEERS_INTERVAL_SETTING;
6272
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
6373
import static org.hamcrest.Matchers.equalTo;
6474
import static org.hamcrest.Matchers.everyItem;
@@ -494,4 +504,63 @@ public void testRestartNodeWhileIndexing() throws Exception {
494504
}
495505
}
496506

507+
public void testRejoinWhileBeingRemoved() {
508+
final String masterNode = internalCluster().startMasterOnlyNode(Settings.builder()
509+
.put(FOLLOWER_CHECK_INTERVAL_SETTING.getKey(), "100ms")
510+
.put(FOLLOWER_CHECK_RETRY_COUNT_SETTING.getKey(), "1")
511+
.build());
512+
final String dataNode = internalCluster().startDataOnlyNode(Settings.builder()
513+
.put(DISCOVERY_FIND_PEERS_INTERVAL_SETTING.getKey(), "100ms")
514+
.put(LEADER_CHECK_INTERVAL_SETTING.getKey(), "100ms")
515+
.put(LEADER_CHECK_RETRY_COUNT_SETTING.getKey(), "1")
516+
.build());
517+
518+
final ClusterService masterClusterService = internalCluster().getInstance(ClusterService.class, masterNode);
519+
final PlainActionFuture<Void> removedNode = new PlainActionFuture<>();
520+
masterClusterService.addListener(clusterChangedEvent -> {
521+
if (removedNode.isDone() == false && clusterChangedEvent.state().nodes().getDataNodes().isEmpty()) {
522+
removedNode.onResponse(null);
523+
}
524+
});
525+
526+
final ClusterService dataClusterService = internalCluster().getInstance(ClusterService.class, dataNode);
527+
final PlainActionFuture<Void> failedLeader = new PlainActionFuture<>() {
528+
@Override
529+
protected boolean blockingAllowed() {
530+
// we're deliberately blocking the cluster applier on the master until the data node starts to rejoin
531+
return true;
532+
}
533+
};
534+
final AtomicBoolean dataNodeHasMaster = new AtomicBoolean(true);
535+
dataClusterService.addListener(clusterChangedEvent -> {
536+
dataNodeHasMaster.set(clusterChangedEvent.state().nodes().getMasterNode() != null);
537+
if (failedLeader.isDone() == false && dataNodeHasMaster.get() == false) {
538+
failedLeader.onResponse(null);
539+
}
540+
});
541+
542+
masterClusterService.addHighPriorityApplier(event -> {
543+
failedLeader.actionGet();
544+
if (dataNodeHasMaster.get() == false) {
545+
try {
546+
Thread.sleep(100);
547+
} catch (InterruptedException e) {
548+
throw new AssertionError("unexpected", e);
549+
}
550+
}
551+
});
552+
553+
final MockTransportService dataTransportService
554+
= (MockTransportService) internalCluster().getInstance(TransportService.class, dataNode);
555+
dataTransportService.addRequestHandlingBehavior(FollowersChecker.FOLLOWER_CHECK_ACTION_NAME, (handler, request, channel, task) -> {
556+
if (removedNode.isDone() == false) {
557+
channel.sendResponse(new ElasticsearchException("simulated check failure"));
558+
} else {
559+
handler.messageReceived(request, channel, task);
560+
}
561+
});
562+
563+
removedNode.actionGet(10, TimeUnit.SECONDS);
564+
ensureStableCluster(2);
565+
}
497566
}

0 commit comments

Comments
 (0)