Skip to content

Commit a493c9c

Browse files
garyrussellartembilan
authored andcommitted
GH-3713: TCP: Fix Intercepted Sender List
Resolves #3713 #3326 added support for multiple `TcpSenders`. However, when connections are intercepted, the sender list was not properly chained through the interceptors. - override `registerSenders` and properly capture the real senders in the last interceptor and intermediate interceptors - this ensures that `addNewConnection` is called on each interceptor - when removing dead connections, use the connection sender list insted of the factory's raw sender list; detect if the connection is an interceptor and call its remove method instead. **cherry-pick to 5.5.x**
1 parent 98b2d1b commit a493c9c

File tree

4 files changed

+115
-29
lines changed

4 files changed

+115
-29
lines changed

spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/connection/AbstractConnectionFactory.java

+14-4
Original file line numberDiff line numberDiff line change
@@ -563,10 +563,15 @@ public void stop() {
563563
synchronized (this.connections) {
564564
Iterator<Entry<String, TcpConnectionSupport>> iterator = this.connections.entrySet().iterator();
565565
while (iterator.hasNext()) {
566-
TcpConnection connection = iterator.next().getValue();
566+
TcpConnectionSupport connection = iterator.next().getValue();
567567
connection.close();
568568
iterator.remove();
569-
getSenders().forEach(sender -> sender.removeDeadConnection(connection));
569+
if (connection instanceof TcpConnectionInterceptorSupport) {
570+
((TcpConnectionInterceptorSupport) connection).removeDeadConnection(connection);
571+
}
572+
else {
573+
connection.getSenders().forEach(sender -> sender.removeDeadConnection(connection));
574+
}
570575
}
571576
}
572577
synchronized (this.lifecycleMonitor) {
@@ -866,7 +871,12 @@ private List<String> removeClosedConnectionsAndReturnOpenConnectionIds() {
866871
TcpConnectionSupport connection = entry.getValue();
867872
if (!connection.isOpen()) {
868873
iterator.remove();
869-
getSenders().forEach(sender -> sender.removeDeadConnection(connection));
874+
if (connection instanceof TcpConnectionInterceptorSupport) {
875+
((TcpConnectionInterceptorSupport) connection).removeDeadConnection(connection);
876+
}
877+
else {
878+
connection.getSenders().forEach(sender -> sender.removeDeadConnection(connection));
879+
}
870880
logger.debug(() -> getComponentName() + ": Removed closed connection: " +
871881
connection.getConnectionId());
872882
}
@@ -944,7 +954,7 @@ public boolean closeConnection(String connectionId) {
944954
try {
945955
connection.close();
946956
closed = true;
947-
getSenders().forEach(sender -> sender.removeDeadConnection(connection));
957+
connection.getSenders().forEach(sender -> sender.removeDeadConnection(connection));
948958
}
949959
catch (Exception ex) {
950960
logger.debug(ex, () -> "Failed to close connection " + connectionId);

spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/connection/TcpConnectionInterceptorSupport.java

+55-18
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2021 the original author or authors.
2+
* Copyright 2002-2022 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -16,11 +16,15 @@
1616

1717
package org.springframework.integration.ip.tcp.connection;
1818

19+
import java.util.Collections;
20+
import java.util.List;
21+
1922
import javax.net.ssl.SSLSession;
2023

2124
import org.springframework.context.ApplicationEventPublisher;
2225
import org.springframework.core.serializer.Deserializer;
2326
import org.springframework.core.serializer.Serializer;
27+
import org.springframework.lang.Nullable;
2428
import org.springframework.messaging.Message;
2529
import org.springframework.messaging.support.ErrorMessage;
2630

@@ -38,9 +42,12 @@ public abstract class TcpConnectionInterceptorSupport extends TcpConnectionSuppo
3842

3943
private TcpListener tcpListener;
4044

41-
private TcpSender tcpSender;
45+
private boolean realSender;
46+
47+
private List<TcpSender> interceptedSenders;
48+
49+
private boolean removed;
4250

43-
private Boolean realSender;
4451

4552
public TcpConnectionInterceptorSupport() {
4653
}
@@ -92,10 +99,29 @@ public void registerListener(TcpListener listener) {
9299

93100
@Override
94101
public void registerSender(TcpSender sender) {
95-
this.tcpSender = sender;
96102
this.theConnection.registerSender(this);
97103
}
98104

105+
@Override
106+
public void registerSenders(List<TcpSender> sendersToRegister) {
107+
this.interceptedSenders = sendersToRegister;
108+
if (sendersToRegister.size() > 0) {
109+
if (!(sendersToRegister.get(0) instanceof TcpConnectionInterceptorSupport)) {
110+
this.realSender = true;
111+
}
112+
else {
113+
this.realSender = ((TcpConnectionInterceptorSupport) this.interceptedSenders.get(0))
114+
.hasRealSender();
115+
}
116+
}
117+
if (this.theConnection instanceof TcpConnectionInterceptorSupport) {
118+
this.theConnection.registerSenders(Collections.singletonList(this));
119+
}
120+
else {
121+
super.registerSender(this);
122+
}
123+
}
124+
99125
/**
100126
* {@inheritDoc}
101127
* <p>
@@ -198,21 +224,30 @@ public void setTheConnection(TcpConnectionSupport theConnection) {
198224
* @return the listener
199225
*/
200226
@Override
227+
@Nullable
201228
public TcpListener getListener() {
202229
return this.tcpListener;
203230
}
204231

205232
@Override
206233
public void addNewConnection(TcpConnection connection) {
207-
if (this.tcpSender != null) {
208-
this.tcpSender.addNewConnection(this);
234+
if (this.interceptedSenders != null) {
235+
this.interceptedSenders.forEach(sender -> sender.addNewConnection(connection));
209236
}
210237
}
211238

212239
@Override
213-
public void removeDeadConnection(TcpConnection connection) {
214-
if (this.tcpSender != null) {
215-
this.tcpSender.removeDeadConnection(this);
240+
public synchronized void removeDeadConnection(TcpConnection connection) {
241+
if (this.removed) {
242+
return;
243+
}
244+
this.removed = true;
245+
if (this.theConnection instanceof TcpConnectionInterceptorSupport && !this.theConnection.equals(this)) {
246+
((TcpConnectionInterceptorSupport) this.theConnection).removeDeadConnection(this);
247+
}
248+
TcpSender sender = getSender();
249+
if (sender != null && !(sender instanceof TcpConnectionInterceptorSupport)) {
250+
this.interceptedSenders.forEach(snder -> snder.removeDeadConnection(connection));
216251
}
217252
}
218253

@@ -222,19 +257,21 @@ public long incrementAndGetConnectionSequence() {
222257
}
223258

224259
@Override
260+
@Nullable
225261
public TcpSender getSender() {
226-
return this.tcpSender;
262+
return this.interceptedSenders != null && this.interceptedSenders.size() > 0
263+
? this.interceptedSenders.get(0)
264+
: null;
265+
}
266+
267+
@Override
268+
public List<TcpSender> getSenders() {
269+
return this.interceptedSenders == null
270+
? super.getSenders()
271+
: Collections.unmodifiableList(this.interceptedSenders);
227272
}
228273

229274
protected boolean hasRealSender() {
230-
if (this.realSender != null) {
231-
return this.realSender;
232-
}
233-
TcpSender sender = getSender();
234-
while (sender instanceof TcpConnectionInterceptorSupport) {
235-
sender = ((TcpConnectionInterceptorSupport) sender).getSender();
236-
}
237-
this.realSender = sender != null;
238275
return this.realSender;
239276
}
240277

spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/connection/TcpConnectionSupport.java

+3-4
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2001-2021 the original author or authors.
2+
* Copyright 2001-2022 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -199,9 +199,7 @@ protected void closeConnection(boolean isException) {
199199
outerListener = nextListener;
200200
}
201201
outerListener.close();
202-
for (TcpSender sender : getSenders()) {
203-
sender.removeDeadConnection(outerListener);
204-
}
202+
outerListener.removeDeadConnection(outerListener);
205203
if (isException) {
206204
// ensure physical close in case the interceptor did not close
207205
this.close();
@@ -337,6 +335,7 @@ public void registerSenders(List<TcpSender> sendersToRegister) {
337335
* @return the listener
338336
*/
339337
@Override
338+
@Nullable
340339
public TcpListener getListener() {
341340
if (this.needsTest && this.testListener != null) {
342341
this.needsTest = false;

spring-integration-ip/src/test/java/org/springframework/integration/ip/tcp/connection/TcpSenderTests.java

+43-3
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,13 @@
1818

1919
import static org.assertj.core.api.Assertions.assertThat;
2020

21+
import java.util.ArrayList;
22+
import java.util.Collections;
23+
import java.util.List;
2124
import java.util.concurrent.CountDownLatch;
2225
import java.util.concurrent.TimeUnit;
26+
import java.util.concurrent.atomic.AtomicInteger;
27+
import java.util.concurrent.atomic.AtomicReference;
2328

2429
import org.junit.jupiter.api.Test;
2530

@@ -69,26 +74,55 @@ void senderCalledForDeadConnectionClientNio() throws InterruptedException {
6974
private void senderCalledForDeadConnectionClient(AbstractClientConnectionFactory client) throws InterruptedException {
7075
CountDownLatch adds = new CountDownLatch(2);
7176
CountDownLatch removes = new CountDownLatch(2);
77+
CountDownLatch interceptorAddCalled = new CountDownLatch(6);
78+
CountDownLatch interceptorRemCalled = new CountDownLatch(6);
7279
TcpConnectionInterceptorFactoryChain chain = new TcpConnectionInterceptorFactoryChain();
73-
chain.setInterceptor(new HelloWorldInterceptorFactory() {
80+
AtomicInteger instances = new AtomicInteger();
81+
List<Integer> addOrder = Collections.synchronizedList(new ArrayList<>());
82+
List<Integer> remOrder = Collections.synchronizedList(new ArrayList<>());
83+
AtomicReference<Thread> thread = new AtomicReference<>();
84+
class InterceptorFactory extends HelloWorldInterceptorFactory {
7485

7586
@Override
7687
public TcpConnectionInterceptorSupport getInterceptor() {
7788
return new TcpConnectionInterceptorSupport() {
89+
90+
private final int instance = instances.incrementAndGet();
91+
92+
@Override
93+
public void addNewConnection(TcpConnection connection) {
94+
addOrder.add(this.instance);
95+
interceptorAddCalled.countDown();
96+
super.addNewConnection(connection);
97+
}
98+
99+
@Override
100+
public synchronized void removeDeadConnection(TcpConnection connection) {
101+
super.removeDeadConnection(connection);
102+
// can be called multiple times on different threads.
103+
if (!remOrder.contains(this.instance)) {
104+
remOrder.add(this.instance);
105+
interceptorRemCalled.countDown();
106+
}
107+
}
108+
78109
};
79110
}
80111

81-
});
112+
}
113+
chain.setInterceptor(new InterceptorFactory(), new InterceptorFactory(), new InterceptorFactory());
82114
client.setInterceptorFactoryChain(chain);
83115
client.registerSender(new TcpSender() {
84116

85117
@Override
86118
public void addNewConnection(TcpConnection connection) {
119+
addOrder.add(99);
87120
adds.countDown();
88121
}
89122

90123
@Override
91-
public void removeDeadConnection(TcpConnection connection) {
124+
public synchronized void removeDeadConnection(TcpConnection connection) {
125+
remOrder.add(99);
92126
removes.countDown();
93127
}
94128

@@ -97,12 +131,18 @@ public void removeDeadConnection(TcpConnection connection) {
97131
client.afterPropertiesSet();
98132
client.start();
99133
TcpConnectionSupport conn = client.getConnection();
134+
assertThat(((TcpConnectionInterceptorSupport) conn).hasRealSender()).isTrue();
100135
conn.close();
101136
conn = client.getConnection();
102137
assertThat(adds.await(10, TimeUnit.SECONDS)).isTrue();
138+
assertThat(addOrder).containsExactly(1, 2, 3, 99, 4, 5, 6, 99);
103139
conn.close();
104140
client.stop();
105141
assertThat(removes.await(10, TimeUnit.SECONDS)).isTrue();
142+
// 9x before 3, 6 due to ordering in overridden interceptor method
143+
assertThat(remOrder).containsExactly(1, 2, 99, 3, 4, 5, 99, 6);
144+
assertThat(interceptorAddCalled.await(10, TimeUnit.SECONDS)).isTrue();
145+
assertThat(interceptorRemCalled.await(10, TimeUnit.SECONDS)).isTrue();
106146
}
107147

108148
}

0 commit comments

Comments
 (0)