Skip to content

Commit be53593

Browse files
ColoredCarrotartembilan
authored andcommitted
GH-8678: Add BufferOverflowStrategy for WebScoket
Within Spring Integration's WebSocket support, a `ConcurrentWebSocketSessionDecorator`, which buffers outbound messages if sending is slow, is used to decorate all websocket sessions in `IntegrationWebSocketContainer`, the standard entrypoint for using websockets with Integration. * Expose a `ConcurrentWebSocketSessionDecorator.OverflowStrategy` option on the `IntegrationWebSocketContainer` **Cherry-pick to `5.5.x`, `6.0.x` & `6.1.x`**
1 parent 790b8f9 commit be53593

File tree

2 files changed

+81
-4
lines changed

2 files changed

+81
-4
lines changed

spring-integration-websocket/src/main/java/org/springframework/integration/websocket/IntegrationWebSocketContainer.java

Lines changed: 32 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import org.apache.commons.logging.LogFactory;
2929

3030
import org.springframework.beans.factory.DisposableBean;
31+
import org.springframework.lang.Nullable;
3132
import org.springframework.util.Assert;
3233
import org.springframework.util.ReflectionUtils;
3334
import org.springframework.web.socket.CloseStatus;
@@ -55,6 +56,7 @@
5556
*
5657
* @author Artem Bilan
5758
* @author Gary Russell
59+
* @author Julian Koch
5860
*
5961
* @since 4.1
6062
*
@@ -83,6 +85,9 @@ public abstract class IntegrationWebSocketContainer implements DisposableBean {
8385

8486
private int sendBufferSizeLimit = DEFAULT_SEND_BUFFER_SIZE;
8587

88+
@Nullable
89+
private ConcurrentWebSocketSessionDecorator.OverflowStrategy sendBufferOverflowStrategy;
90+
8691
public void setSendTimeLimit(int sendTimeLimit) {
8792
this.sendTimeLimit = sendTimeLimit;
8893
}
@@ -91,6 +96,21 @@ public void setSendBufferSizeLimit(int sendBufferSizeLimit) {
9196
this.sendBufferSizeLimit = sendBufferSizeLimit;
9297
}
9398

99+
/**
100+
* Set the send buffer overflow strategy.
101+
* <p>Concurrently generated outbound messages are buffered if sending is slow.
102+
* This strategy determines the behavior when the buffer has reached the limit
103+
* configured with {@link #setSendBufferSizeLimit}.
104+
* @param overflowStrategy The {@link ConcurrentWebSocketSessionDecorator.OverflowStrategy} to use.
105+
* @since 5.5.19
106+
* @see ConcurrentWebSocketSessionDecorator
107+
*/
108+
public void setSendBufferOverflowStrategy(
109+
@Nullable ConcurrentWebSocketSessionDecorator.OverflowStrategy overflowStrategy) {
110+
111+
this.sendBufferOverflowStrategy = overflowStrategy;
112+
}
113+
94114
public void setMessageListener(WebSocketListener messageListener) {
95115
Assert.state(this.messageListener == null || this.messageListener.equals(messageListener),
96116
"'messageListener' is already configured");
@@ -166,6 +186,17 @@ public void destroy() {
166186
}
167187
}
168188

189+
private WebSocketSession decorateSession(WebSocketSession sessionToDecorate) {
190+
if (this.sendBufferOverflowStrategy == null) {
191+
return new ConcurrentWebSocketSessionDecorator(sessionToDecorate, this.sendTimeLimit,
192+
this.sendBufferSizeLimit);
193+
}
194+
else {
195+
return new ConcurrentWebSocketSessionDecorator(sessionToDecorate, this.sendTimeLimit,
196+
this.sendBufferSizeLimit, this.sendBufferOverflowStrategy);
197+
}
198+
}
199+
169200
/**
170201
* An internal {@link WebSocketHandler} implementation to be used with native
171202
* Web-Socket containers.
@@ -187,10 +218,7 @@ public List<String> getSubProtocols() {
187218
public void afterConnectionEstablished(WebSocketSession sessionToDecorate)
188219
throws Exception { // NOSONAR
189220

190-
WebSocketSession session =
191-
new ConcurrentWebSocketSessionDecorator(sessionToDecorate,
192-
IntegrationWebSocketContainer.this.sendTimeLimit,
193-
IntegrationWebSocketContainer.this.sendBufferSizeLimit);
221+
WebSocketSession session = decorateSession(sessionToDecorate);
194222

195223
IntegrationWebSocketContainer.this.sessions.put(session.getId(), session);
196224
if (IntegrationWebSocketContainer.this.logger.isDebugEnabled()) {

spring-integration-websocket/src/test/java/org/springframework/integration/websocket/ClientWebSocketContainerTests.java

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import org.junit.jupiter.api.Test;
3535

3636
import org.springframework.http.HttpHeaders;
37+
import org.springframework.integration.test.util.TestUtils;
3738
import org.springframework.web.socket.CloseStatus;
3839
import org.springframework.web.socket.PingMessage;
3940
import org.springframework.web.socket.PongMessage;
@@ -42,12 +43,14 @@
4243
import org.springframework.web.socket.WebSocketMessage;
4344
import org.springframework.web.socket.WebSocketSession;
4445
import org.springframework.web.socket.client.standard.StandardWebSocketClient;
46+
import org.springframework.web.socket.handler.ConcurrentWebSocketSessionDecorator;
4547

4648
import static org.assertj.core.api.Assertions.assertThat;
4749
import static org.assertj.core.api.Assertions.assertThatIllegalStateException;
4850

4951
/**
5052
* @author Artem Bilan
53+
* @author Julian Koch
5154
*
5255
* @since 4.1
5356
*/
@@ -138,16 +141,53 @@ protected CompletableFuture<WebSocketSession> executeInternal(WebSocketHandler w
138141
assertThat(session.isOpen()).isTrue();
139142
}
140143

144+
@Test
145+
public void testWebSocketContainerOverflowStrategyPropagation() throws Exception {
146+
StandardWebSocketClient webSocketClient = new StandardWebSocketClient();
147+
148+
Map<String, Object> userProperties = new HashMap<>();
149+
userProperties.put(Constants.IO_TIMEOUT_MS_PROPERTY, "" + (Constants.IO_TIMEOUT_MS_DEFAULT * 6));
150+
webSocketClient.setUserProperties(userProperties);
151+
152+
ClientWebSocketContainer container =
153+
new ClientWebSocketContainer(webSocketClient, new URI(server.getWsBaseUrl() + "/ws/websocket"));
154+
155+
container.setSendTimeLimit(10_000);
156+
container.setSendBufferSizeLimit(12345);
157+
container.setSendBufferOverflowStrategy(ConcurrentWebSocketSessionDecorator.OverflowStrategy.DROP);
158+
159+
TestWebSocketListener messageListener = new TestWebSocketListener();
160+
container.setMessageListener(messageListener);
161+
container.setConnectionTimeout(30);
162+
163+
container.start();
164+
165+
assertThat(messageListener.sessionStartedLatch.await(10, TimeUnit.SECONDS)).isTrue();
166+
167+
assertThat(messageListener.sendTimeLimit).isEqualTo(10_000);
168+
assertThat(messageListener.sendBufferSizeLimit).isEqualTo(12345);
169+
assertThat(messageListener.sendBufferOverflowStrategy)
170+
.isEqualTo(ConcurrentWebSocketSessionDecorator.OverflowStrategy.DROP);
171+
}
172+
141173
private static class TestWebSocketListener implements WebSocketListener {
142174

143175
public final CountDownLatch messageLatch = new CountDownLatch(1);
144176

177+
public final CountDownLatch sessionStartedLatch = new CountDownLatch(1);
178+
145179
public final CountDownLatch sessionEndedLatch = new CountDownLatch(1);
146180

147181
public WebSocketMessage<?> message;
148182

149183
public boolean started;
150184

185+
int sendTimeLimit;
186+
187+
int sendBufferSizeLimit;
188+
189+
ConcurrentWebSocketSessionDecorator.OverflowStrategy sendBufferOverflowStrategy;
190+
151191
TestWebSocketListener() {
152192
}
153193

@@ -160,6 +200,15 @@ public void onMessage(WebSocketSession session, WebSocketMessage<?> message) {
160200
@Override
161201
public void afterSessionStarted(WebSocketSession session) {
162202
this.started = true;
203+
204+
var sessionDecorator = (ConcurrentWebSocketSessionDecorator) session;
205+
this.sendTimeLimit = sessionDecorator.getSendTimeLimit();
206+
this.sendBufferSizeLimit = sessionDecorator.getBufferSizeLimit();
207+
this.sendBufferOverflowStrategy =
208+
TestUtils.getPropertyValue(sessionDecorator, "overflowStrategy",
209+
ConcurrentWebSocketSessionDecorator.OverflowStrategy.class);
210+
211+
this.sessionStartedLatch.countDown();
163212
}
164213

165214
@Override

0 commit comments

Comments
 (0)