21
21
import java .util .concurrent .ConcurrentMap ;
22
22
23
23
import io .vavr .control .Try ;
24
- import kotlin .reflect .KFunction ;
25
- import kotlin .reflect .jvm .ReflectJvmMapping ;
24
+ import kotlin .coroutines .Continuation ;
25
+ import kotlinx .coroutines .reactive .AwaitKt ;
26
+ import kotlinx .coroutines .reactive .ReactiveFlowKt ;
26
27
import org .apache .commons .logging .Log ;
27
28
import org .apache .commons .logging .LogFactory ;
29
+ import org .reactivestreams .Publisher ;
28
30
import reactor .core .publisher .Flux ;
29
31
import reactor .core .publisher .Mono ;
30
32
33
35
import org .springframework .beans .factory .InitializingBean ;
34
36
import org .springframework .beans .factory .annotation .BeanFactoryAnnotationUtils ;
35
37
import org .springframework .core .KotlinDetector ;
38
+ import org .springframework .core .MethodParameter ;
36
39
import org .springframework .core .NamedThreadLocal ;
37
40
import org .springframework .core .ReactiveAdapter ;
38
41
import org .springframework .core .ReactiveAdapterRegistry ;
44
47
import org .springframework .transaction .TransactionManager ;
45
48
import org .springframework .transaction .TransactionStatus ;
46
49
import org .springframework .transaction .TransactionSystemException ;
47
- import org .springframework .transaction .TransactionUsageException ;
48
50
import org .springframework .transaction .reactive .TransactionContextManager ;
49
51
import org .springframework .transaction .support .CallbackPreferringPlatformTransactionManager ;
50
52
import org .springframework .util .Assert ;
78
80
* @author Stéphane Nicoll
79
81
* @author Sam Brannen
80
82
* @author Mark Paluch
83
+ * @author Sebastien Deleuze
81
84
* @since 1.1
82
85
* @see PlatformTransactionManager
83
86
* @see ReactiveTransactionManager
@@ -96,6 +99,8 @@ public abstract class TransactionAspectSupport implements BeanFactoryAware, Init
96
99
*/
97
100
private static final Object DEFAULT_TRANSACTION_MANAGER_KEY = new Object ();
98
101
102
+ private static final String COROUTINES_FLOW_CLASS_NAME = "kotlinx.coroutines.flow.Flow" ;
103
+
99
104
/**
100
105
* Vavr library present on the classpath?
101
106
*/
@@ -336,21 +341,20 @@ protected Object invokeWithinTransaction(Method method, @Nullable Class<?> targe
336
341
final TransactionManager tm = determineTransactionManager (txAttr );
337
342
338
343
if (this .reactiveAdapterRegistry != null && tm instanceof ReactiveTransactionManager ) {
344
+ boolean isSuspendingFunction = KotlinDetector .isSuspendingFunction (method );
345
+ boolean hasSuspendingFlowReturnType = isSuspendingFunction && COROUTINES_FLOW_CLASS_NAME .equals (new MethodParameter (method , -1 ).getParameterType ().getName ());
339
346
ReactiveTransactionSupport txSupport = this .transactionSupportCache .computeIfAbsent (method , key -> {
340
- if (KotlinDetector .isKotlinType (method .getDeclaringClass ()) && KotlinDelegate .isSuspend (method )) {
341
- throw new TransactionUsageException (
342
- "Unsupported annotated transaction on suspending function detected: " + method +
343
- ". Use TransactionalOperator.transactional extensions instead." );
344
- }
345
- ReactiveAdapter adapter = this .reactiveAdapterRegistry .getAdapter (method .getReturnType ());
347
+ Class <?> reactiveType = (isSuspendingFunction ? (hasSuspendingFlowReturnType ? Flux .class : Mono .class ) : method .getReturnType ());
348
+ ReactiveAdapter adapter = this .reactiveAdapterRegistry .getAdapter (reactiveType );
346
349
if (adapter == null ) {
347
350
throw new IllegalStateException ("Cannot apply reactive transaction to non-reactive return type: " +
348
351
method .getReturnType ());
349
352
}
350
353
return new ReactiveTransactionSupport (adapter );
351
354
});
352
- return txSupport .invokeWithinTransaction (
353
- method , targetClass , invocation , txAttr , (ReactiveTransactionManager ) tm );
355
+ Publisher <?> publisher = (Publisher <?>) txSupport .invokeWithinTransaction (method , targetClass , invocation , txAttr , (ReactiveTransactionManager ) tm );
356
+ return (isSuspendingFunction ? (hasSuspendingFlowReturnType ? KotlinDelegate .asFlow (publisher ) :
357
+ KotlinDelegate .awaitSingleOrNull (publisher , ((CoroutinesInvocationCallback ) invocation ).getContinuation ())) : publisher );
354
358
}
355
359
356
360
PlatformTransactionManager ptm = asPlatformTransactionManager (tm );
@@ -785,6 +789,11 @@ protected interface InvocationCallback {
785
789
Object proceedWithInvocation () throws Throwable ;
786
790
}
787
791
792
+ protected interface CoroutinesInvocationCallback extends InvocationCallback {
793
+
794
+ Object getContinuation ();
795
+ }
796
+
788
797
789
798
/**
790
799
* Internal holder class for a Throwable in a callback transaction model.
@@ -837,9 +846,13 @@ public static Object evaluateTryFailure(Object retVal, TransactionAttribute txAt
837
846
*/
838
847
private static class KotlinDelegate {
839
848
840
- private static boolean isSuspend (Method method ) {
841
- KFunction <?> function = ReflectJvmMapping .getKotlinFunction (method );
842
- return function != null && function .isSuspend ();
849
+ private static Object asFlow (Publisher <?> publisher ) {
850
+ return ReactiveFlowKt .asFlow (publisher );
851
+ }
852
+
853
+ @ SuppressWarnings ("unchecked" )
854
+ private static Object awaitSingleOrNull (Publisher <?> publisher , Object continuation ) {
855
+ return AwaitKt .awaitSingleOrNull (publisher , (Continuation <Object >) continuation );
843
856
}
844
857
}
845
858
0 commit comments