Skip to content

Commit 085669f

Browse files
committed
Remove manual tracking of registered channels (#27445)
This is related to #27260. Currently, every ESSelector keeps track of all channels that are registered with it. ESSelector is just an abstraction over a raw java nio selector. The java nio selector already tracks its own selection keys. This commit removes our tracking and relies on the java nio selector tracking.
1 parent fbf1344 commit 085669f

File tree

7 files changed

+16
-62
lines changed

7 files changed

+16
-62
lines changed

test/framework/src/main/java/org/elasticsearch/transport/nio/AcceptingSelector.java

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -22,13 +22,9 @@
2222
import org.elasticsearch.transport.nio.channel.NioServerSocketChannel;
2323

2424
import java.io.IOException;
25-
import java.nio.channels.CancelledKeyException;
2625
import java.nio.channels.ClosedChannelException;
27-
import java.nio.channels.ClosedSelectorException;
2826
import java.nio.channels.SelectionKey;
2927
import java.nio.channels.Selector;
30-
import java.util.Iterator;
31-
import java.util.Set;
3228
import java.util.concurrent.ConcurrentLinkedQueue;
3329

3430
/**
@@ -93,7 +89,6 @@ private void setUpNewServerChannels() {
9389
newChannel.register();
9490
SelectionKey selectionKey = newChannel.getSelectionKey();
9591
selectionKey.attach(newChannel);
96-
addRegisteredChannel(newChannel);
9792
eventHandler.serverChannelRegistered(newChannel);
9893
} else {
9994
eventHandler.registrationException(newChannel, new ClosedChannelException());

test/framework/src/main/java/org/elasticsearch/transport/nio/ESSelector.java

Lines changed: 2 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -28,14 +28,13 @@
2828
import java.nio.channels.ClosedSelectorException;
2929
import java.nio.channels.SelectionKey;
3030
import java.nio.channels.Selector;
31-
import java.util.Collections;
3231
import java.util.Iterator;
3332
import java.util.Set;
34-
import java.util.concurrent.ConcurrentHashMap;
3533
import java.util.concurrent.ConcurrentLinkedQueue;
3634
import java.util.concurrent.CountDownLatch;
3735
import java.util.concurrent.atomic.AtomicBoolean;
3836
import java.util.concurrent.locks.ReentrantLock;
37+
import java.util.stream.Collectors;
3938

4039
/**
4140
* This is a basic selector abstraction used by {@link org.elasticsearch.transport.nio.NioTransport}. This
@@ -56,7 +55,6 @@ public abstract class ESSelector implements Closeable {
5655
private final CountDownLatch exitedLoop = new CountDownLatch(1);
5756
private final AtomicBoolean isClosed = new AtomicBoolean(false);
5857
private final PlainActionFuture<Boolean> isRunningFuture = PlainActionFuture.newFuture();
59-
private final Set<NioChannel> registeredChannels = Collections.newSetFromMap(new ConcurrentHashMap<NioChannel, Boolean>());
6058
private volatile Thread thread;
6159

6260
ESSelector(EventHandler eventHandler) throws IOException {
@@ -134,7 +132,7 @@ void singleLoop() {
134132

135133
void cleanupAndCloseChannels() {
136134
cleanup();
137-
channelsToClose.addAll(registeredChannels);
135+
channelsToClose.addAll(selector.keys().stream().map(sk -> (NioChannel) sk.attachment()).collect(Collectors.toList()));
138136
closePendingChannels();
139137
}
140138

@@ -171,19 +169,6 @@ void wakeup() {
171169
selector.wakeup();
172170
}
173171

174-
public Set<NioChannel> getRegisteredChannels() {
175-
return registeredChannels;
176-
}
177-
178-
public void addRegisteredChannel(NioChannel channel) {
179-
assert registeredChannels.contains(channel) == false : "Should only register channel once";
180-
registeredChannels.add(channel);
181-
}
182-
183-
public void removeRegisteredChannel(NioChannel channel) {
184-
registeredChannels.remove(channel);
185-
}
186-
187172
@Override
188173
public void close() throws IOException {
189174
if (isClosed.compareAndSet(false, true)) {

test/framework/src/main/java/org/elasticsearch/transport/nio/SocketSelector.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -171,7 +171,6 @@ private void setupChannel(NioSocketChannel newChannel) {
171171
try {
172172
if (newChannel.isOpen()) {
173173
newChannel.register();
174-
addRegisteredChannel(newChannel);
175174
SelectionKey key = newChannel.getSelectionKey();
176175
key.attach(newChannel);
177176
eventHandler.handleRegistration(newChannel);

test/framework/src/main/java/org/elasticsearch/transport/nio/channel/AbstractNioChannel.java

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -115,9 +115,6 @@ public void closeFromSelector() throws IOException {
115115
} catch (IOException e) {
116116
closeContext.completeExceptionally(e);
117117
throw e;
118-
} finally {
119-
// There is no problem with calling this multiple times
120-
selector.removeRegisteredChannel(this);
121118
}
122119
}
123120
}

test/framework/src/test/java/org/elasticsearch/transport/nio/AcceptingSelectorTests.java

Lines changed: 6 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020
package org.elasticsearch.transport.nio;
2121

2222
import org.elasticsearch.test.ESTestCase;
23-
import org.elasticsearch.transport.nio.channel.NioChannel;
2423
import org.elasticsearch.transport.nio.channel.NioServerSocketChannel;
2524
import org.elasticsearch.transport.nio.utils.TestSelectionKey;
2625
import org.junit.Before;
@@ -30,8 +29,8 @@
3029
import java.nio.channels.SelectionKey;
3130
import java.nio.channels.Selector;
3231
import java.security.PrivilegedActionException;
32+
import java.util.Collections;
3333
import java.util.HashSet;
34-
import java.util.Set;
3534

3635
import static org.mockito.Matchers.any;
3736
import static org.mockito.Matchers.same;
@@ -46,6 +45,7 @@ public class AcceptingSelectorTests extends ESTestCase {
4645
private NioServerSocketChannel serverChannel;
4746
private AcceptorEventHandler eventHandler;
4847
private TestSelectionKey selectionKey;
48+
private Selector rawSelector;
4949

5050
@Before
5151
public void setUp() throws Exception {
@@ -54,7 +54,7 @@ public void setUp() throws Exception {
5454
eventHandler = mock(AcceptorEventHandler.class);
5555
serverChannel = mock(NioServerSocketChannel.class);
5656

57-
Selector rawSelector = mock(Selector.class);
57+
rawSelector = mock(Selector.class);
5858
selector = new AcceptingSelector(eventHandler, rawSelector);
5959
this.selector.setThread();
6060

@@ -71,9 +71,6 @@ public void testRegisteredChannel() throws IOException, PrivilegedActionExceptio
7171
selector.preSelect();
7272

7373
verify(eventHandler).serverChannelRegistered(serverChannel);
74-
Set<NioChannel> registeredChannels = selector.getRegisteredChannels();
75-
assertEquals(1, registeredChannels.size());
76-
assertTrue(registeredChannels.contains(serverChannel));
7774
}
7875

7976
public void testClosedChannelWillNotBeRegistered() throws Exception {
@@ -83,10 +80,6 @@ public void testClosedChannelWillNotBeRegistered() throws Exception {
8380
selector.preSelect();
8481

8582
verify(eventHandler).registrationException(same(serverChannel), any(ClosedChannelException.class));
86-
87-
Set<NioChannel> registeredChannels = selector.getRegisteredChannels();
88-
assertEquals(0, registeredChannels.size());
89-
assertFalse(registeredChannels.contains(serverChannel));
9083
}
9184

9285
public void testRegisterChannelFailsDueToException() throws Exception {
@@ -98,10 +91,6 @@ public void testRegisterChannelFailsDueToException() throws Exception {
9891
selector.preSelect();
9992

10093
verify(eventHandler).registrationException(serverChannel, closedChannelException);
101-
102-
Set<NioChannel> registeredChannels = selector.getRegisteredChannels();
103-
assertEquals(0, registeredChannels.size());
104-
assertFalse(registeredChannels.contains(serverChannel));
10594
}
10695

10796
public void testAcceptEvent() throws IOException {
@@ -128,7 +117,9 @@ public void testCleanup() throws IOException {
128117

129118
selector.preSelect();
130119

131-
assertEquals(1, selector.getRegisteredChannels().size());
120+
TestSelectionKey key = new TestSelectionKey(0);
121+
key.attach(serverChannel);
122+
when(rawSelector.keys()).thenReturn(new HashSet<>(Collections.singletonList(key)));
132123

133124
selector.cleanupAndCloseChannels();
134125

test/framework/src/test/java/org/elasticsearch/transport/nio/ESSelectorTests.java

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -51,19 +51,12 @@ public void setUp() throws Exception {
5151
public void testQueueChannelForClosed() throws IOException {
5252
NioChannel channel = mock(NioChannel.class);
5353
when(channel.getSelector()).thenReturn(selector);
54-
selector.addRegisteredChannel(channel);
5554

5655
selector.queueChannelClose(channel);
5756

58-
assertEquals(1, selector.getRegisteredChannels().size());
59-
6057
selector.singleLoop();
6158

6259
verify(handler).handleClose(channel);
63-
// Will be called in the channel close method
64-
selector.removeRegisteredChannel(channel);
65-
66-
assertEquals(0, selector.getRegisteredChannels().size());
6760
}
6861

6962
public void testSelectorClosedExceptionIsNotCaughtWhileRunning() throws IOException {

test/framework/src/test/java/org/elasticsearch/transport/nio/SocketSelectorTests.java

Lines changed: 8 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,8 @@
3434
import java.nio.channels.ClosedSelectorException;
3535
import java.nio.channels.SelectionKey;
3636
import java.nio.channels.Selector;
37-
import java.util.Set;
37+
import java.util.Collections;
38+
import java.util.HashSet;
3839

3940
import static org.mockito.Matchers.any;
4041
import static org.mockito.Matchers.anyInt;
@@ -54,6 +55,7 @@ public class SocketSelectorTests extends ESTestCase {
5455
private WriteContext writeContext;
5556
private ActionListener<NioChannel> listener;
5657
private NetworkBytesReference bufferReference = NetworkBytesReference.wrap(new BytesArray(new byte[1]));
58+
private Selector rawSelector;
5759

5860
@Before
5961
@SuppressWarnings("unchecked")
@@ -65,7 +67,7 @@ public void setUp() throws Exception {
6567
listener = mock(ActionListener.class);
6668
selectionKey = new TestSelectionKey(0);
6769
selectionKey.attach(channel);
68-
Selector rawSelector = mock(Selector.class);
70+
rawSelector = mock(Selector.class);
6971

7072
this.socketSelector = new SocketSelector(eventHandler, rawSelector);
7173
this.socketSelector.setThread();
@@ -83,10 +85,6 @@ public void testRegisterChannel() throws Exception {
8385
socketSelector.preSelect();
8486

8587
verify(eventHandler).handleRegistration(channel);
86-
87-
Set<NioChannel> registeredChannels = socketSelector.getRegisteredChannels();
88-
assertEquals(1, registeredChannels.size());
89-
assertTrue(registeredChannels.contains(channel));
9088
}
9189

9290
public void testClosedChannelWillNotBeRegistered() throws Exception {
@@ -97,10 +95,6 @@ public void testClosedChannelWillNotBeRegistered() throws Exception {
9795

9896
verify(eventHandler).registrationException(same(channel), any(ClosedChannelException.class));
9997
verify(channel, times(0)).finishConnect();
100-
101-
Set<NioChannel> registeredChannels = socketSelector.getRegisteredChannels();
102-
assertEquals(0, registeredChannels.size());
103-
assertFalse(registeredChannels.contains(channel));
10498
}
10599

106100
public void testRegisterChannelFailsDueToException() throws Exception {
@@ -113,10 +107,6 @@ public void testRegisterChannelFailsDueToException() throws Exception {
113107

114108
verify(eventHandler).registrationException(channel, closedChannelException);
115109
verify(channel, times(0)).finishConnect();
116-
117-
Set<NioChannel> registeredChannels = socketSelector.getRegisteredChannels();
118-
assertEquals(0, registeredChannels.size());
119-
assertFalse(registeredChannels.contains(channel));
120110
}
121111

122112
public void testSuccessfullyRegisterChannelWillConnect() throws Exception {
@@ -309,6 +299,10 @@ public void testCleanup() throws Exception {
309299
socketSelector.queueWrite(new WriteOperation(mock(NioSocketChannel.class), networkBuffer, listener));
310300
socketSelector.scheduleForRegistration(unRegisteredChannel);
311301

302+
TestSelectionKey testSelectionKey = new TestSelectionKey(0);
303+
testSelectionKey.attach(channel);
304+
when(rawSelector.keys()).thenReturn(new HashSet<>(Collections.singletonList(testSelectionKey)));
305+
312306
socketSelector.cleanupAndCloseChannels();
313307

314308
verify(listener).onFailure(any(ClosedSelectorException.class));

0 commit comments

Comments
 (0)