Skip to content

Commit f7d15cf

Browse files
committed
Make TransactionalApplicationListenerAdapter support fallbackExecution
1 parent ac7c7ff commit f7d15cf

File tree

3 files changed

+88
-1
lines changed

3 files changed

+88
-1
lines changed

spring-tx/src/main/java/org/springframework/transaction/event/TransactionalApplicationListener.java

+17
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@
4343
*
4444
* @author Juergen Hoeller
4545
* @author Oliver Drotbohm
46+
* @author Réda Housni Alaoui
4647
* @since 5.3
4748
* @param <E> the specific {@code ApplicationEvent} subclass to listen to
4849
* @see TransactionalEventListener
@@ -134,10 +135,26 @@ static <T> TransactionalApplicationListener<PayloadApplicationEvent<T>> forPaylo
134135
*/
135136
static <T> TransactionalApplicationListener<PayloadApplicationEvent<T>> forPayload(
136137
TransactionPhase phase, Consumer<T> consumer) {
138+
return forPayload(phase, false, consumer);
139+
}
140+
141+
/**
142+
* Create a new {@code TransactionalApplicationListener} for the given payload consumer.
143+
* @param phase the transaction phase in which to invoke the listener
144+
* @param fallbackExecution Whether the event should be handled if no transaction is running.
145+
* @param consumer the event payload consumer
146+
* @param <T> the type of the event payload
147+
* @return a corresponding {@code TransactionalApplicationListener} instance
148+
* @see PayloadApplicationEvent#getPayload()
149+
* @see TransactionalApplicationListenerAdapter
150+
*/
151+
static <T> TransactionalApplicationListener<PayloadApplicationEvent<T>> forPayload(
152+
TransactionPhase phase, boolean fallbackExecution, Consumer<T> consumer) {
137153

138154
TransactionalApplicationListenerAdapter<PayloadApplicationEvent<T>> listener =
139155
new TransactionalApplicationListenerAdapter<>(event -> consumer.accept(event.getPayload()));
140156
listener.setTransactionPhase(phase);
157+
listener.setFallbackExecution(fallbackExecution);
141158
return listener;
142159
}
143160

spring-tx/src/main/java/org/springframework/transaction/event/TransactionalApplicationListenerAdapter.java

+30-1
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@
1919
import java.util.List;
2020
import java.util.concurrent.CopyOnWriteArrayList;
2121

22+
import org.apache.commons.logging.Log;
23+
import org.apache.commons.logging.LogFactory;
2224
import org.springframework.context.ApplicationEvent;
2325
import org.springframework.context.ApplicationListener;
2426
import org.springframework.core.Ordered;
@@ -35,6 +37,7 @@
3537
* as a convenient alternative to custom usage of this adapter class.
3638
*
3739
* @author Juergen Hoeller
40+
* @author Réda Housni Alaoui
3841
* @since 5.3
3942
* @param <E> the specific {@code ApplicationEvent} subclass to listen to
4043
* @see TransactionalApplicationListener
@@ -44,11 +47,14 @@
4447
public class TransactionalApplicationListenerAdapter<E extends ApplicationEvent>
4548
implements TransactionalApplicationListener<E>, Ordered {
4649

50+
private final Log logger = LogFactory.getLog(getClass());
51+
4752
private final ApplicationListener<E> targetListener;
4853

4954
private int order = Ordered.LOWEST_PRECEDENCE;
5055

5156
private TransactionPhase transactionPhase = TransactionPhase.AFTER_COMMIT;
57+
private boolean fallbackExecution;
5258

5359
private String listenerId = "";
5460

@@ -97,6 +103,13 @@ public TransactionPhase getTransactionPhase() {
97103
return this.transactionPhase;
98104
}
99105

106+
/**
107+
* @param fallbackExecution Whether the event should be handled if no transaction is running.
108+
*/
109+
public void setFallbackExecution(boolean fallbackExecution) {
110+
this.fallbackExecution = fallbackExecution;
111+
}
112+
100113
/**
101114
* Specify an id to identify the listener with.
102115
* <p>The default is an empty String.
@@ -127,7 +140,23 @@ public void processEvent(E event) {
127140

128141
@Override
129142
public void onApplicationEvent(E event) {
130-
TransactionalApplicationListenerSynchronization.register(event, this, this.callbacks);
143+
if (TransactionalApplicationListenerSynchronization.register(event, this, this.callbacks)) {
144+
if (logger.isDebugEnabled()) {
145+
logger.debug("Registered transaction synchronization for " + event);
146+
}
147+
}
148+
else if (fallbackExecution) {
149+
if (getTransactionPhase() == TransactionPhase.AFTER_ROLLBACK && logger.isWarnEnabled()) {
150+
logger.warn("Processing " + event + " as a fallback execution on AFTER_ROLLBACK phase");
151+
}
152+
processEvent(event);
153+
}
154+
else {
155+
// No transactional event execution at all
156+
if (logger.isDebugEnabled()) {
157+
logger.debug("No transaction is active - skipping " + event);
158+
}
159+
}
131160
}
132161

133162
}

spring-tx/src/test/java/org/springframework/transaction/event/TransactionalApplicationListenerAdapterTests.java

+41
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,11 @@
2525
import static org.assertj.core.api.Assertions.assertThat;
2626
import static org.assertj.core.api.Assertions.assertThatRuntimeException;
2727

28+
import java.util.concurrent.atomic.AtomicReference;
29+
2830
/**
2931
* @author Juergen Hoeller
32+
* @author Réda Housni Alaoui
3033
*/
3134
class TransactionalApplicationListenerAdapterTests {
3235

@@ -88,6 +91,44 @@ void useSpecifiedIdentifier() {
8891
assertThat(adapter.getListenerId()).isEqualTo("identifier");
8992
}
9093

94+
@Test
95+
void invokedNothingOutsideOfTransaction() {
96+
CapturingSynchronizationCallback callback = new CapturingSynchronizationCallback();
97+
PayloadApplicationEvent<Object> event = new PayloadApplicationEvent<>(this, new Object());
98+
99+
AtomicReference<Object> consumedPayload = new AtomicReference<>();
100+
TransactionalApplicationListener<PayloadApplicationEvent<Object>> adapter =
101+
TransactionalApplicationListener.forPayload(consumedPayload::set);
102+
adapter.addCallback(callback);
103+
adapter.onApplicationEvent(event);
104+
105+
assertThat(consumedPayload.get()).isNull();
106+
assertThat(callback.preEvent).isNull();
107+
assertThat(callback.postEvent).isNull();
108+
assertThat(callback.ex).isNull();
109+
assertThat(adapter.getTransactionPhase()).isEqualTo(TransactionPhase.AFTER_COMMIT);
110+
assertThat(adapter.getListenerId()).isEmpty();
111+
}
112+
113+
@Test
114+
void invokesConsumerOutsideOfTransactionIfFallbackExecutionEnabled() {
115+
CapturingSynchronizationCallback callback = new CapturingSynchronizationCallback();
116+
PayloadApplicationEvent<Object> event = new PayloadApplicationEvent<>(this, new Object());
117+
118+
AtomicReference<Object> consumedPayload = new AtomicReference<>();
119+
TransactionalApplicationListener<PayloadApplicationEvent<Object>> adapter =
120+
TransactionalApplicationListener.forPayload(TransactionPhase.AFTER_COMMIT, true, consumedPayload::set);
121+
adapter.addCallback(callback);
122+
adapter.onApplicationEvent(event);
123+
124+
assertThat(consumedPayload.get()).isSameAs(event.getPayload());
125+
assertThat(callback.preEvent).isNull();
126+
assertThat(callback.postEvent).isNull();
127+
assertThat(callback.ex).isNull();
128+
assertThat(adapter.getTransactionPhase()).isEqualTo(TransactionPhase.AFTER_COMMIT);
129+
assertThat(adapter.getListenerId()).isEmpty();
130+
}
131+
91132

92133
private static void runInTransaction(Runnable runnable) {
93134
TransactionSynchronizationManager.setActualTransactionActive(true);

0 commit comments

Comments
 (0)