Skip to content

Improve control of outgoing connection lifecycles #77295

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
36579d5
Add RefCounted#hasReferences utility
DaveCTurner Sep 4, 2021
af67f2d
Add DiscoveryNode#descriptionWithoutAttributes utility
DaveCTurner Sep 4, 2021
5e61b43
Add test demonstrating the join loop
DaveCTurner Sep 3, 2021
b695875
Remote connections are special, don't use connectToNode directly
DaveCTurner Sep 1, 2021
d80e09c
API change: connectToNode yields a Releasable
DaveCTurner Sep 1, 2021
cc07e03
Add ref-counting to Transport.Connection
DaveCTurner Sep 1, 2021
b59ef40
Add onRemoved to lifecycle of Transport.Connection
DaveCTurner Sep 3, 2021
3d4c21e
Integrate refcounting into cluster connection manager
DaveCTurner Sep 1, 2021
5f46f81
Fake cluster applier service must still pass connections into NodeCon…
DaveCTurner Sep 3, 2021
098f444
Properly release connections acquired in PeerFinder
DaveCTurner Sep 1, 2021
2879448
Properly release connections acquired during join validation
DaveCTurner Sep 1, 2021
f63ace5
Acquire and release reference during joining
DaveCTurner Sep 1, 2021
76c25ae
Rework NodeConnectionsService to use refcounting
DaveCTurner Sep 1, 2021
f051776
Assert that connections are not leaked
DaveCTurner Sep 3, 2021
44282fa
Log remotely-closed connections at INFO
DaveCTurner Sep 4, 2021
4811a35
Review suggestions
DaveCTurner Sep 6, 2021
c8a6902
Collapse duplicated methods
DaveCTurner Sep 6, 2021
6a7d381
Merge branch 'master' into 2021-09-06-releasable-connections
DaveCTurner Sep 13, 2021
1712241
Merge branch 'master' into 2021-09-06-releasable-connections
DaveCTurner Sep 13, 2021
03df026
Shorter follower check interval
DaveCTurner Sep 13, 2021
bbe1683
Maintain reference to request in DeterministicTaskQueue
DaveCTurner Sep 13, 2021
fae5a7f
Merge branch 'master' into 2021-09-06-releasable-connections
DaveCTurner Sep 13, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,11 @@ public final boolean decRef() {
return false;
}

@Override
public final boolean hasReferences() {
return refCount.get() > 0;
}

/**
* Called whenever the ref count is incremented or decremented. Can be overridden to record access to the instance for debugging
* purposes.
Expand All @@ -74,7 +79,7 @@ protected void alreadyClosed() {
/**
* Returns the current reference count.
*/
public int refCount() {
public final int refCount() {
return this.refCount.get();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,4 +54,12 @@ public interface RefCounted {
* @return returns {@code true} if the ref count dropped to 0 as a result of calling this method
*/
boolean decRef();

/**
* Returns {@code true} only if there was at least one active reference when the method was called; if it returns {@code false} then the
* object is closed; future attempts to acquire references will fail.
*
* @return whether there are currently any active references to this object.
*/
boolean hasReferences();
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,13 +59,7 @@ public void testRefCount() {
assertThat(
expectThrows(IllegalStateException.class, counted::incRef).getMessage(),
equalTo(AbstractRefCounted.ALREADY_CLOSED_MESSAGE));

try {
counted.ensureOpen();
fail(" expected exception");
} catch (IllegalStateException ex) {
assertThat(ex.getMessage(), equalTo("closed"));
}
assertThat(expectThrows(IllegalStateException.class, counted::ensureOpen).getMessage(), equalTo("closed"));
}

public void testMultiThreaded() throws InterruptedException {
Expand All @@ -79,6 +73,7 @@ public void testMultiThreaded() throws InterruptedException {
latch.await();
for (int j = 0; j < 10000; j++) {
counted.incRef();
assertTrue(counted.hasReferences());
try {
counted.ensureOpen();
} finally {
Expand All @@ -96,13 +91,11 @@ public void testMultiThreaded() throws InterruptedException {
thread.join();
}
counted.decRef();
try {
counted.ensureOpen();
fail("expected to be closed");
} catch (IllegalStateException ex) {
assertThat(ex.getMessage(), equalTo("closed"));
}
assertThat(expectThrows(IllegalStateException.class, counted::ensureOpen).getMessage(), equalTo("closed"));
assertThat(expectThrows(IllegalStateException.class, counted::incRef).getMessage(),
equalTo(AbstractRefCounted.ALREADY_CLOSED_MESSAGE));
assertThat(counted.refCount(), is(0));
assertFalse(counted.hasReferences());
assertThat(exceptions, Matchers.emptyIterable());
}

Expand All @@ -117,7 +110,8 @@ protected void closeInternal() {

public void ensureOpen() {
if (closed.get()) {
assert this.refCount() == 0;
assertEquals(0, this.refCount());
assertFalse(hasReferences());
throw new IllegalStateException("closed");
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -331,7 +331,9 @@ public void testRemoveBanParentsOnDisconnect() throws Exception {
if (bannedParent.getNodeId().equals(node.getId()) && randomBoolean()) {
Collection<Transport.Connection> childConns = taskManager.startBanOnChildTasks(bannedParent.getId(), "", () -> {});
for (Transport.Connection connection : randomSubsetOf(childConns)) {
connection.close();
if (connection.getNode().equals(node) == false) {
connection.close();
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,22 @@
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.action.shard.ShardStateAction;
import org.elasticsearch.cluster.coordination.ClusterBootstrapService;
import org.elasticsearch.cluster.coordination.FollowersChecker;
import org.elasticsearch.cluster.coordination.LagDetector;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.routing.Murmur3HashFunction;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.IndexShardTestCase;
Expand All @@ -40,6 +43,8 @@
import org.elasticsearch.test.disruption.NetworkDisruption.TwoPartitions;
import org.elasticsearch.test.disruption.ServiceDisruptionScheme;
import org.elasticsearch.test.junit.annotations.TestIssueLogging;
import org.elasticsearch.test.transport.MockTransportService;
import org.elasticsearch.transport.TransportService;

import java.util.ArrayList;
import java.util.Collections;
Expand All @@ -59,6 +64,11 @@

import static org.elasticsearch.action.DocWriteResponse.Result.CREATED;
import static org.elasticsearch.action.DocWriteResponse.Result.UPDATED;
import static org.elasticsearch.cluster.coordination.FollowersChecker.FOLLOWER_CHECK_INTERVAL_SETTING;
import static org.elasticsearch.cluster.coordination.FollowersChecker.FOLLOWER_CHECK_RETRY_COUNT_SETTING;
import static org.elasticsearch.cluster.coordination.LeaderChecker.LEADER_CHECK_INTERVAL_SETTING;
import static org.elasticsearch.cluster.coordination.LeaderChecker.LEADER_CHECK_RETRY_COUNT_SETTING;
import static org.elasticsearch.discovery.PeerFinder.DISCOVERY_FIND_PEERS_INTERVAL_SETTING;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.everyItem;
Expand Down Expand Up @@ -494,4 +504,63 @@ public void testRestartNodeWhileIndexing() throws Exception {
}
}

public void testRejoinWhileBeingRemoved() {
final String masterNode = internalCluster().startMasterOnlyNode(Settings.builder()
.put(FOLLOWER_CHECK_INTERVAL_SETTING.getKey(), "100ms")
.put(FOLLOWER_CHECK_RETRY_COUNT_SETTING.getKey(), "1")
.build());
final String dataNode = internalCluster().startDataOnlyNode(Settings.builder()
.put(DISCOVERY_FIND_PEERS_INTERVAL_SETTING.getKey(), "100ms")
.put(LEADER_CHECK_INTERVAL_SETTING.getKey(), "100ms")
.put(LEADER_CHECK_RETRY_COUNT_SETTING.getKey(), "1")
.build());

final ClusterService masterClusterService = internalCluster().getInstance(ClusterService.class, masterNode);
final PlainActionFuture<Void> removedNode = new PlainActionFuture<>();
masterClusterService.addListener(clusterChangedEvent -> {
if (removedNode.isDone() == false && clusterChangedEvent.state().nodes().getDataNodes().isEmpty()) {
removedNode.onResponse(null);
}
});

final ClusterService dataClusterService = internalCluster().getInstance(ClusterService.class, dataNode);
final PlainActionFuture<Void> failedLeader = new PlainActionFuture<>() {
@Override
protected boolean blockingAllowed() {
// we're deliberately blocking the cluster applier on the master until the data node starts to rejoin
return true;
}
};
final AtomicBoolean dataNodeHasMaster = new AtomicBoolean(true);
dataClusterService.addListener(clusterChangedEvent -> {
dataNodeHasMaster.set(clusterChangedEvent.state().nodes().getMasterNode() != null);
if (failedLeader.isDone() == false && dataNodeHasMaster.get() == false) {
failedLeader.onResponse(null);
}
});

masterClusterService.addHighPriorityApplier(event -> {
failedLeader.actionGet();
if (dataNodeHasMaster.get() == false) {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
throw new AssertionError("unexpected", e);
}
}
});

final MockTransportService dataTransportService
= (MockTransportService) internalCluster().getInstance(TransportService.class, dataNode);
dataTransportService.addRequestHandlingBehavior(FollowersChecker.FOLLOWER_CHECK_ACTION_NAME, (handler, request, channel, task) -> {
if (removedNode.isDone() == false) {
channel.sendResponse(new ElasticsearchException("simulated check failure"));
} else {
handler.messageReceived(request, channel, task);
}
});

removedNode.actionGet(10, TimeUnit.SECONDS);
ensureStableCluster(2);
}
}
Loading