1
1
/*
2
- * Copyright 2002-2024 the original author or authors.
2
+ * Copyright 2002-2025 the original author or authors.
3
3
*
4
4
* Licensed under the Apache License, Version 2.0 (the "License");
5
5
* you may not use this file except in compliance with the License.
26
26
import java .util .Optional ;
27
27
import java .util .concurrent .CompletableFuture ;
28
28
import java .util .concurrent .ConcurrentHashMap ;
29
+ import java .util .concurrent .atomic .AtomicBoolean ;
29
30
import java .util .function .Supplier ;
30
31
31
32
import org .apache .commons .logging .Log ;
@@ -449,14 +450,41 @@ private Object execute(CacheOperationInvoker invoker, Method method, CacheOperat
449
450
return cacheHit ;
450
451
}
451
452
453
+ @ SuppressWarnings ("unchecked" )
452
454
@ Nullable
453
455
private Object executeSynchronized (CacheOperationInvoker invoker , Method method , CacheOperationContexts contexts ) {
454
456
CacheOperationContext context = contexts .get (CacheableOperation .class ).iterator ().next ();
455
457
if (isConditionPassing (context , CacheOperationExpressionEvaluator .NO_RESULT )) {
456
458
Object key = generateKey (context , CacheOperationExpressionEvaluator .NO_RESULT );
457
459
Cache cache = context .getCaches ().iterator ().next ();
458
460
if (CompletableFuture .class .isAssignableFrom (method .getReturnType ())) {
459
- return doRetrieve (cache , key , () -> (CompletableFuture <?>) invokeOperation (invoker ));
461
+ AtomicBoolean invokeFailure = new AtomicBoolean (false );
462
+ CompletableFuture <?> result = doRetrieve (cache , key ,
463
+ () -> {
464
+ CompletableFuture <?> invokeResult = ((CompletableFuture <?>) invokeOperation (invoker ));
465
+ if (invokeResult == null ) {
466
+ return null ;
467
+ }
468
+ return invokeResult .exceptionallyCompose (ex -> {
469
+ invokeFailure .set (true );
470
+ return CompletableFuture .failedFuture (ex );
471
+ });
472
+ });
473
+ return result .exceptionallyCompose (ex -> {
474
+ if (!(ex instanceof RuntimeException rex )) {
475
+ return CompletableFuture .failedFuture (ex );
476
+ }
477
+ try {
478
+ getErrorHandler ().handleCacheGetError (rex , cache , key );
479
+ if (invokeFailure .get ()) {
480
+ return CompletableFuture .failedFuture (ex );
481
+ }
482
+ return (CompletableFuture ) invokeOperation (invoker );
483
+ }
484
+ catch (Throwable ex2 ) {
485
+ return CompletableFuture .failedFuture (ex2 );
486
+ }
487
+ });
460
488
}
461
489
if (this .reactiveCachingHandler != null ) {
462
490
Object returnValue = this .reactiveCachingHandler .executeSynchronized (invoker , method , cache , key );
@@ -517,9 +545,17 @@ private Object findInCaches(CacheOperationContext context, Object key,
517
545
if (CompletableFuture .class .isAssignableFrom (context .getMethod ().getReturnType ())) {
518
546
CompletableFuture <?> result = doRetrieve (cache , key );
519
547
if (result != null ) {
520
- return result .exceptionally (ex -> {
521
- getErrorHandler ().handleCacheGetError ((RuntimeException ) ex , cache , key );
522
- return null ;
548
+ return result .exceptionallyCompose (ex -> {
549
+ if (!(ex instanceof RuntimeException rex )) {
550
+ return CompletableFuture .failedFuture (ex );
551
+ }
552
+ try {
553
+ getErrorHandler ().handleCacheGetError (rex , cache , key );
554
+ return CompletableFuture .completedFuture (null );
555
+ }
556
+ catch (Throwable ex2 ) {
557
+ return CompletableFuture .failedFuture (ex2 );
558
+ }
523
559
}).thenCompose (value -> (CompletableFuture <?>) evaluate (
524
560
(value != null ? CompletableFuture .completedFuture (unwrapCacheValue (value )) : null ),
525
561
invoker , method , contexts ));
@@ -1097,32 +1133,72 @@ private class ReactiveCachingHandler {
1097
1133
1098
1134
private final ReactiveAdapterRegistry registry = ReactiveAdapterRegistry .getSharedInstance ();
1099
1135
1136
+ @ SuppressWarnings ({"rawtypes" , "unchecked" })
1100
1137
@ Nullable
1101
1138
public Object executeSynchronized (CacheOperationInvoker invoker , Method method , Cache cache , Object key ) {
1139
+ AtomicBoolean invokeFailure = new AtomicBoolean (false );
1102
1140
ReactiveAdapter adapter = this .registry .getAdapter (method .getReturnType ());
1103
1141
if (adapter != null ) {
1104
1142
if (adapter .isMultiValue ()) {
1105
1143
// Flux or similar
1106
1144
return adapter .fromPublisher (Flux .from (Mono .fromFuture (
1107
- cache .retrieve (key ,
1108
- () -> Flux .from (adapter .toPublisher (invokeOperation (invoker ))).collectList ().toFuture ())))
1109
- .flatMap (Flux ::fromIterable ));
1145
+ doRetrieve (cache , key ,
1146
+ () -> Flux .from (adapter .toPublisher (invokeOperation (invoker ))).collectList ().doOnError (ex -> invokeFailure .set (true )).toFuture ())))
1147
+ .flatMap (Flux ::fromIterable )
1148
+ .onErrorResume (RuntimeException .class , ex -> {
1149
+ try {
1150
+ getErrorHandler ().handleCacheGetError (ex , cache , key );
1151
+ if (invokeFailure .get ()) {
1152
+ return Flux .error (ex );
1153
+ }
1154
+ return Flux .from (adapter .toPublisher (invokeOperation (invoker )));
1155
+ }
1156
+ catch (RuntimeException exception ) {
1157
+ return Flux .error (exception );
1158
+ }
1159
+ }));
1110
1160
}
1111
1161
else {
1112
1162
// Mono or similar
1113
1163
return adapter .fromPublisher (Mono .fromFuture (
1114
- cache .retrieve (key ,
1115
- () -> Mono .from (adapter .toPublisher (invokeOperation (invoker ))).toFuture ())));
1164
+ doRetrieve (cache , key ,
1165
+ () -> Mono .from (adapter .toPublisher (invokeOperation (invoker ))).doOnError (ex -> invokeFailure .set (true )).toFuture ()))
1166
+ .onErrorResume (RuntimeException .class , ex -> {
1167
+ try {
1168
+ getErrorHandler ().handleCacheGetError (ex , cache , key );
1169
+ if (invokeFailure .get ()) {
1170
+ return Mono .error (ex );
1171
+ }
1172
+ return Mono .from (adapter .toPublisher (invokeOperation (invoker )));
1173
+ }
1174
+ catch (RuntimeException exception ) {
1175
+ return Mono .error (exception );
1176
+ }
1177
+ }));
1116
1178
}
1117
1179
}
1118
1180
if (KotlinDetector .isKotlinReflectPresent () && KotlinDetector .isSuspendingFunction (method )) {
1119
- return Mono .fromFuture (cache .retrieve (key , () -> {
1120
- Mono <?> mono = ((Mono <?>) invokeOperation (invoker ));
1121
- if (mono == null ) {
1181
+ return Mono .fromFuture (doRetrieve (cache , key , () -> {
1182
+ Mono <?> mono = (Mono <?>) invokeOperation (invoker );
1183
+ if (mono != null ) {
1184
+ mono = mono .doOnError (ex -> invokeFailure .set (true ));
1185
+ }
1186
+ else {
1122
1187
mono = Mono .empty ();
1123
1188
}
1124
1189
return mono .toFuture ();
1125
- }));
1190
+ })).onErrorResume (RuntimeException .class , ex -> {
1191
+ try {
1192
+ getErrorHandler ().handleCacheGetError (ex , cache , key );
1193
+ if (invokeFailure .get ()) {
1194
+ return Mono .error (ex );
1195
+ }
1196
+ return (Mono ) invokeOperation (invoker );
1197
+ }
1198
+ catch (RuntimeException exception ) {
1199
+ return Mono .error (exception );
1200
+ }
1201
+ });
1126
1202
}
1127
1203
return NOT_HANDLED ;
1128
1204
}
@@ -1137,7 +1213,7 @@ public Object processCacheEvicts(List<CacheOperationContext> contexts, @Nullable
1137
1213
return NOT_HANDLED ;
1138
1214
}
1139
1215
1140
- @ SuppressWarnings ({ "unchecked " , "rawtypes" })
1216
+ @ SuppressWarnings ({"rawtypes " , "unchecked" })
1141
1217
@ Nullable
1142
1218
public Object findInCaches (CacheOperationContext context , Cache cache , Object key ,
1143
1219
CacheOperationInvoker invoker , Method method , CacheOperationContexts contexts ) {
0 commit comments