Skip to content

Commit e933d63

Browse files
committed
Fix PostgresSubscribableChannel race condition
The `PostgresSubscribableChannel` uses a task executor for dispatching messages. Even if we stop `PostgresChannelMessageTableSubscriber` and unsubscribe from the channel, the task might be ongoing. * Use explicit `ThreadPoolTaskExecutor` in the test to shout it down and wait for tasks to be completed before verifying DB status * Optimize `PostgresSubscribableChannel` to mark TX for rollback when we got a message from DB, but no handlers subscribed
1 parent bb908d8 commit e933d63

File tree

2 files changed

+30
-8
lines changed

2 files changed

+30
-8
lines changed

Diff for: spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/channel/PostgresSubscribableChannel.java

+10-1
Original file line numberDiff line numberDiff line change
@@ -176,7 +176,16 @@ private Optional<Message<?>> askForMessage() {
176176
if (this.hasHandlers) {
177177
if (this.transactionTemplate != null) {
178178
return this.retryTemplate.execute(context ->
179-
this.transactionTemplate.execute(status -> pollMessage().map(this::dispatch)));
179+
this.transactionTemplate.execute(status ->
180+
pollMessage()
181+
.filter(message -> {
182+
if (!this.hasHandlers) {
183+
status.setRollbackOnly();
184+
return false;
185+
}
186+
return true;
187+
})
188+
.map(this::dispatch)));
180189
}
181190
else {
182191
return pollMessage()

Diff for: spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/channel/PostgresChannelMessageTableSubscriberTests.java

+20-7
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@
4848
import org.springframework.messaging.MessageHandler;
4949
import org.springframework.messaging.support.GenericMessage;
5050
import org.springframework.retry.support.RetryTemplate;
51+
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
5152
import org.springframework.test.annotation.DirtiesContext;
5253
import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;
5354
import org.springframework.transaction.PlatformTransactionManager;
@@ -95,6 +96,8 @@ CREATE FUNCTION INT_CHANNEL_MESSAGE_NOTIFY_FCT()
9596

9697
private PostgresSubscribableChannel postgresSubscribableChannel;
9798

99+
private ThreadPoolTaskExecutor taskExecutor;
100+
98101
private String groupId;
99102

100103
@BeforeEach
@@ -107,17 +110,26 @@ void setUp(TestInfo testInfo) {
107110
POSTGRES_CONTAINER.getPassword())
108111
.unwrap(PgConnection.class));
109112

113+
114+
this.taskExecutor = new ThreadPoolTaskExecutor();
115+
this.taskExecutor.setCorePoolSize(10);
116+
this.taskExecutor.setWaitForTasksToCompleteOnShutdown(true);
117+
this.taskExecutor.setAwaitTerminationSeconds(10);
118+
this.taskExecutor.afterPropertiesSet();
119+
110120
this.groupId = testInfo.getDisplayName();
111121

112122
this.postgresSubscribableChannel =
113123
new PostgresSubscribableChannel(messageStore, groupId, postgresChannelMessageTableSubscriber);
114124
this.postgresSubscribableChannel.setBeanName("testPostgresChannel");
125+
this.postgresSubscribableChannel.setDispatcherExecutor(this.taskExecutor);
115126
this.postgresSubscribableChannel.afterPropertiesSet();
116127
}
117128

118129
@AfterEach
119130
void tearDown() {
120131
this.postgresChannelMessageTableSubscriber.stop();
132+
this.taskExecutor.shutdown();
121133
}
122134

123135

@@ -160,13 +172,13 @@ void testMessagesDispatchedInTransaction() throws InterruptedException {
160172
postgresChannelMessageTableSubscriber.start();
161173
MessageHandler messageHandler =
162174
message -> {
163-
try {
164-
throw new RuntimeException("An error has occurred");
165-
}
166-
finally {
167-
latch.countDown();
168-
}
169-
};
175+
try {
176+
throw new RuntimeException("An error has occurred");
177+
}
178+
finally {
179+
latch.countDown();
180+
}
181+
};
170182
postgresSubscribableChannel.subscribe(messageHandler);
171183

172184
messageStore.addMessageToGroup(groupId, new GenericMessage<>("1"));
@@ -177,6 +189,7 @@ void testMessagesDispatchedInTransaction() throws InterruptedException {
177189
// Stop subscriber to unlock records from TX for the next verification
178190
postgresChannelMessageTableSubscriber.stop();
179191
postgresSubscribableChannel.unsubscribe(messageHandler);
192+
this.taskExecutor.shutdown();
180193

181194
assertThat(messageStore.messageGroupSize(groupId)).isEqualTo(2);
182195
assertThat(messageStore.pollMessageFromGroup(groupId).getPayload()).isEqualTo("1");

0 commit comments

Comments
 (0)