Skip to content

Commit 9dbd9ba

Browse files
authored
Extract a ConnectionManager interface (#51722)
Currently we have three different implementations representing a `ConnectionManager`. There is the basic `ConnectionManager` which holds all connections for a cluster. And a remote connection manager which support proxy behavior. And a stubbable connection manager for tests. The remote and stubbable instances use the delegate pattern, so this commit extracts an interface for them all to implement.
1 parent 189ceb2 commit 9dbd9ba

17 files changed

+428
-309
lines changed
Lines changed: 281 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,281 @@
1+
/*
2+
* Licensed to Elasticsearch under one or more contributor
3+
* license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright
5+
* ownership. Elasticsearch licenses this file to you under
6+
* the Apache License, Version 2.0 (the "License"); you may
7+
* not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.elasticsearch.transport;
20+
21+
import org.apache.logging.log4j.LogManager;
22+
import org.apache.logging.log4j.Logger;
23+
import org.elasticsearch.action.ActionListener;
24+
import org.elasticsearch.cluster.node.DiscoveryNode;
25+
import org.elasticsearch.common.settings.Settings;
26+
import org.elasticsearch.common.util.concurrent.AbstractRefCounted;
27+
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
28+
import org.elasticsearch.common.util.concurrent.EsExecutors;
29+
import org.elasticsearch.common.util.concurrent.ListenableFuture;
30+
import org.elasticsearch.common.util.concurrent.RunOnce;
31+
import org.elasticsearch.core.internal.io.IOUtils;
32+
33+
import java.util.Collections;
34+
import java.util.Iterator;
35+
import java.util.Map;
36+
import java.util.Set;
37+
import java.util.concurrent.ConcurrentMap;
38+
import java.util.concurrent.CountDownLatch;
39+
import java.util.concurrent.atomic.AtomicBoolean;
40+
41+
/**
42+
* This class manages node connections within a cluster. The connection is opened by the underlying transport.
43+
* Once the connection is opened, this class manages the connection. This includes closing the connection when
44+
* the connection manager is closed.
45+
*/
46+
public class ClusterConnectionManager implements ConnectionManager {
47+
48+
private static final Logger logger = LogManager.getLogger(ClusterConnectionManager.class);
49+
50+
private final ConcurrentMap<DiscoveryNode, Transport.Connection> connectedNodes = ConcurrentCollections.newConcurrentMap();
51+
private final ConcurrentMap<DiscoveryNode, ListenableFuture<Void>> pendingConnections = ConcurrentCollections.newConcurrentMap();
52+
private final AbstractRefCounted connectingRefCounter = new AbstractRefCounted("connection manager") {
53+
@Override
54+
protected void closeInternal() {
55+
Iterator<Map.Entry<DiscoveryNode, Transport.Connection>> iterator = connectedNodes.entrySet().iterator();
56+
while (iterator.hasNext()) {
57+
Map.Entry<DiscoveryNode, Transport.Connection> next = iterator.next();
58+
try {
59+
IOUtils.closeWhileHandlingException(next.getValue());
60+
} finally {
61+
iterator.remove();
62+
}
63+
}
64+
closeLatch.countDown();
65+
}
66+
};
67+
private final Transport transport;
68+
private final ConnectionProfile defaultProfile;
69+
private final AtomicBoolean closing = new AtomicBoolean(false);
70+
private final CountDownLatch closeLatch = new CountDownLatch(1);
71+
private final DelegatingNodeConnectionListener connectionListener = new DelegatingNodeConnectionListener();
72+
73+
public ClusterConnectionManager(Settings settings, Transport transport) {
74+
this(ConnectionProfile.buildDefaultConnectionProfile(settings), transport);
75+
}
76+
77+
public ClusterConnectionManager(ConnectionProfile connectionProfile, Transport transport) {
78+
this.transport = transport;
79+
this.defaultProfile = connectionProfile;
80+
}
81+
82+
@Override
83+
public void addListener(TransportConnectionListener listener) {
84+
this.connectionListener.addListener(listener);
85+
}
86+
87+
@Override
88+
public void removeListener(TransportConnectionListener listener) {
89+
this.connectionListener.removeListener(listener);
90+
}
91+
92+
@Override
93+
public void openConnection(DiscoveryNode node, ConnectionProfile connectionProfile, ActionListener<Transport.Connection> listener) {
94+
ConnectionProfile resolvedProfile = ConnectionProfile.resolveConnectionProfile(connectionProfile, defaultProfile);
95+
internalOpenConnection(node, resolvedProfile, listener);
96+
}
97+
98+
/**
99+
* Connects to a node with the given connection profile. If the node is already connected this method has no effect.
100+
* Once a successful is established, it can be validated before being exposed.
101+
* The ActionListener will be called on the calling thread or the generic thread pool.
102+
*/
103+
@Override
104+
public void connectToNode(DiscoveryNode node, ConnectionProfile connectionProfile,
105+
ConnectionValidator connectionValidator,
106+
ActionListener<Void> listener) throws ConnectTransportException {
107+
ConnectionProfile resolvedProfile = ConnectionProfile.resolveConnectionProfile(connectionProfile, defaultProfile);
108+
if (node == null) {
109+
listener.onFailure(new ConnectTransportException(null, "can't connect to a null node"));
110+
return;
111+
}
112+
113+
if (connectingRefCounter.tryIncRef() == false) {
114+
listener.onFailure(new IllegalStateException("connection manager is closed"));
115+
return;
116+
}
117+
118+
if (connectedNodes.containsKey(node)) {
119+
connectingRefCounter.decRef();
120+
listener.onResponse(null);
121+
return;
122+
}
123+
124+
final ListenableFuture<Void> currentListener = new ListenableFuture<>();
125+
final ListenableFuture<Void> existingListener = pendingConnections.putIfAbsent(node, currentListener);
126+
if (existingListener != null) {
127+
try {
128+
// wait on previous entry to complete connection attempt
129+
existingListener.addListener(listener, EsExecutors.newDirectExecutorService());
130+
} finally {
131+
connectingRefCounter.decRef();
132+
}
133+
return;
134+
}
135+
136+
currentListener.addListener(listener, EsExecutors.newDirectExecutorService());
137+
138+
final RunOnce releaseOnce = new RunOnce(connectingRefCounter::decRef);
139+
internalOpenConnection(node, resolvedProfile, ActionListener.wrap(conn -> {
140+
connectionValidator.validate(conn, resolvedProfile, ActionListener.wrap(
141+
ignored -> {
142+
assert Transports.assertNotTransportThread("connection validator success");
143+
try {
144+
if (connectedNodes.putIfAbsent(node, conn) != null) {
145+
logger.debug("existing connection to node [{}], closing new redundant connection", node);
146+
IOUtils.closeWhileHandlingException(conn);
147+
} else {
148+
logger.debug("connected to node [{}]", node);
149+
try {
150+
connectionListener.onNodeConnected(node, conn);
151+
} finally {
152+
final Transport.Connection finalConnection = conn;
153+
conn.addCloseListener(ActionListener.wrap(() -> {
154+
logger.trace("unregistering {} after connection close and marking as disconnected", node);
155+
connectedNodes.remove(node, finalConnection);
156+
connectionListener.onNodeDisconnected(node, conn);
157+
}));
158+
}
159+
}
160+
} finally {
161+
ListenableFuture<Void> future = pendingConnections.remove(node);
162+
assert future == currentListener : "Listener in pending map is different than the expected listener";
163+
releaseOnce.run();
164+
future.onResponse(null);
165+
}
166+
}, e -> {
167+
assert Transports.assertNotTransportThread("connection validator failure");
168+
IOUtils.closeWhileHandlingException(conn);
169+
failConnectionListeners(node, releaseOnce, e, currentListener);
170+
}));
171+
}, e -> {
172+
assert Transports.assertNotTransportThread("internalOpenConnection failure");
173+
failConnectionListeners(node, releaseOnce, e, currentListener);
174+
}));
175+
}
176+
177+
/**
178+
* Returns a connection for the given node if the node is connected.
179+
* Connections returned from this method must not be closed. The lifecycle of this connection is
180+
* maintained by this connection manager
181+
*
182+
* @throws NodeNotConnectedException if the node is not connected
183+
* @see #connectToNode(DiscoveryNode, ConnectionProfile, ConnectionValidator, ActionListener)
184+
*/
185+
@Override
186+
public Transport.Connection getConnection(DiscoveryNode node) {
187+
Transport.Connection connection = connectedNodes.get(node);
188+
if (connection == null) {
189+
throw new NodeNotConnectedException(node, "Node not connected");
190+
}
191+
return connection;
192+
}
193+
194+
/**
195+
* Returns {@code true} if the node is connected.
196+
*/
197+
@Override
198+
public boolean nodeConnected(DiscoveryNode node) {
199+
return connectedNodes.containsKey(node);
200+
}
201+
202+
/**
203+
* Disconnected from the given node, if not connected, will do nothing.
204+
*/
205+
@Override
206+
public void disconnectFromNode(DiscoveryNode node) {
207+
Transport.Connection nodeChannels = connectedNodes.remove(node);
208+
if (nodeChannels != null) {
209+
// if we found it and removed it we close
210+
nodeChannels.close();
211+
}
212+
}
213+
214+
/**
215+
* Returns the number of nodes this manager is connected to.
216+
*/
217+
@Override
218+
public int size() {
219+
return connectedNodes.size();
220+
}
221+
222+
public Set<DiscoveryNode> getAllConnectedNodes() {
223+
return Collections.unmodifiableSet(connectedNodes.keySet());
224+
}
225+
226+
@Override
227+
public void close() {
228+
internalClose(true);
229+
}
230+
231+
@Override
232+
public void closeNoBlock() {
233+
internalClose(false);
234+
}
235+
236+
private void internalClose(boolean waitForPendingConnections) {
237+
assert Transports.assertNotTransportThread("Closing ConnectionManager");
238+
if (closing.compareAndSet(false, true)) {
239+
connectingRefCounter.decRef();
240+
if (waitForPendingConnections) {
241+
try {
242+
closeLatch.await();
243+
} catch (InterruptedException e) {
244+
Thread.currentThread().interrupt();
245+
throw new IllegalStateException(e);
246+
}
247+
}
248+
}
249+
}
250+
251+
private void internalOpenConnection(DiscoveryNode node, ConnectionProfile connectionProfile,
252+
ActionListener<Transport.Connection> listener) {
253+
transport.openConnection(node, connectionProfile, ActionListener.map(listener, connection -> {
254+
assert Transports.assertNotTransportThread("internalOpenConnection success");
255+
try {
256+
connectionListener.onConnectionOpened(connection);
257+
} finally {
258+
connection.addCloseListener(ActionListener.wrap(() -> connectionListener.onConnectionClosed(connection)));
259+
}
260+
if (connection.isClosed()) {
261+
throw new ConnectTransportException(node, "a channel closed while connecting");
262+
}
263+
return connection;
264+
}));
265+
}
266+
267+
private void failConnectionListeners(DiscoveryNode node, RunOnce releaseOnce, Exception e, ListenableFuture<Void> expectedListener) {
268+
ListenableFuture<Void> future = pendingConnections.remove(node);
269+
releaseOnce.run();
270+
if (future != null) {
271+
assert future == expectedListener : "Listener in pending map is different than the expected listener";
272+
future.onFailure(e);
273+
}
274+
}
275+
276+
@Override
277+
public ConnectionProfile getConnectionProfile() {
278+
return defaultProfile;
279+
}
280+
281+
}

0 commit comments

Comments
 (0)