18
18
19
19
import java .time .Duration ;
20
20
import java .util .concurrent .CompletableFuture ;
21
+ import java .util .concurrent .ExecutionException ;
21
22
22
23
import org .reactivestreams .Publisher ;
23
24
25
+ import org .springframework .core .ReactiveAdapter ;
24
26
import org .springframework .core .ReactiveAdapterRegistry ;
27
+ import org .springframework .lang .Nullable ;
28
+ import org .springframework .util .Assert ;
25
29
26
30
/**
27
31
* Asynchronous subtype of {@link ServerResponse} that exposes the future
@@ -53,7 +57,7 @@ public interface AsyncServerResponse extends ServerResponse {
53
57
* @return the asynchronous response
54
58
*/
55
59
static AsyncServerResponse create (Object asyncResponse ) {
56
- return DefaultAsyncServerResponse . create (asyncResponse , null );
60
+ return createInternal (asyncResponse , null );
57
61
}
58
62
59
63
/**
@@ -69,7 +73,45 @@ static AsyncServerResponse create(Object asyncResponse) {
69
73
* @return the asynchronous response
70
74
*/
71
75
static AsyncServerResponse create (Object asyncResponse , Duration timeout ) {
72
- return DefaultAsyncServerResponse .create (asyncResponse , timeout );
76
+ return createInternal (asyncResponse , timeout );
77
+ }
78
+
79
+ private static AsyncServerResponse createInternal (Object asyncResponse , @ Nullable Duration timeout ) {
80
+ Assert .notNull (asyncResponse , "AsyncResponse must not be null" );
81
+
82
+ CompletableFuture <ServerResponse > futureResponse = toCompletableFuture (asyncResponse );
83
+ if (futureResponse .isDone () &&
84
+ !futureResponse .isCancelled () &&
85
+ !futureResponse .isCompletedExceptionally ()) {
86
+
87
+ try {
88
+ ServerResponse completedResponse = futureResponse .get ();
89
+ return new CompletedAsyncServerResponse (completedResponse );
90
+ }
91
+ catch (InterruptedException | ExecutionException ignored ) {
92
+ // fall through to use DefaultAsyncServerResponse
93
+ }
94
+ }
95
+ return new DefaultAsyncServerResponse (futureResponse , timeout );
96
+ }
97
+
98
+ @ SuppressWarnings ("unchecked" )
99
+ private static CompletableFuture <ServerResponse > toCompletableFuture (Object obj ) {
100
+ if (obj instanceof CompletableFuture <?> futureResponse ) {
101
+ return (CompletableFuture <ServerResponse >) futureResponse ;
102
+ }
103
+ else if (DefaultAsyncServerResponse .reactiveStreamsPresent ) {
104
+ ReactiveAdapterRegistry registry = ReactiveAdapterRegistry .getSharedInstance ();
105
+ ReactiveAdapter publisherAdapter = registry .getAdapter (obj .getClass ());
106
+ if (publisherAdapter != null ) {
107
+ Publisher <ServerResponse > publisher = publisherAdapter .toPublisher (obj );
108
+ ReactiveAdapter futureAdapter = registry .getAdapter (CompletableFuture .class );
109
+ if (futureAdapter != null ) {
110
+ return (CompletableFuture <ServerResponse >) futureAdapter .fromPublisher (publisher );
111
+ }
112
+ }
113
+ }
114
+ throw new IllegalArgumentException ("Asynchronous type not supported: " + obj .getClass ());
73
115
}
74
116
75
117
}
0 commit comments