20
20
21
21
import lombok .Getter ;
22
22
import lombok .Setter ;
23
+ import neo4j .org .testkit .backend .RxBlockingSubscriber ;
23
24
import neo4j .org .testkit .backend .TestkitState ;
25
+ import neo4j .org .testkit .backend .holder .RxResultHolder ;
24
26
import neo4j .org .testkit .backend .messages .responses .Session ;
25
27
import neo4j .org .testkit .backend .messages .responses .TestkitResponse ;
26
28
import reactor .core .publisher .Mono ;
27
29
30
+ import java .util .concurrent .CompletableFuture ;
28
31
import java .util .concurrent .CompletionStage ;
32
+ import java .util .concurrent .atomic .AtomicLong ;
33
+
34
+ import org .neo4j .driver .Record ;
29
35
30
36
@ Setter
31
37
@ Getter
@@ -52,15 +58,121 @@ public CompletionStage<TestkitResponse> processAsync( TestkitState testkitState
52
58
public Mono <TestkitResponse > processRx ( TestkitState testkitState )
53
59
{
54
60
return testkitState .getRxSessionHolder ( data .getSessionId () )
55
- .flatMap ( sessionHolder -> Mono .fromDirect ( sessionHolder .getSession ().close () ) )
61
+ .flatMap ( sessionHolder -> sessionHolder .getResultHolder ()
62
+ .map ( this ::consumeRequestedDemandAndCancelIfSubscribed )
63
+ .orElse ( Mono .empty () )
64
+ .then ( Mono .fromDirect ( sessionHolder .getSession ().close () ) ) )
56
65
.then ( Mono .just ( createResponse () ) );
57
66
}
58
67
68
+ private Mono <Void > consumeRequestedDemandAndCancelIfSubscribed ( RxResultHolder resultHolder )
69
+ {
70
+ return resultHolder .getSubscriber ()
71
+ .map ( subscriber -> Mono .fromCompletionStage ( consumeRequestedDemandAndCancelIfSubscribed ( resultHolder , subscriber ) ) )
72
+ .orElse ( Mono .empty () );
73
+ }
74
+
75
+ private CompletionStage <Void > consumeRequestedDemandAndCancelIfSubscribed ( RxResultHolder resultHolder , RxBlockingSubscriber <Record > subscriber )
76
+ {
77
+ if ( subscriber .getCompletionStage ().toCompletableFuture ().isDone () )
78
+ {
79
+ return CompletableFuture .completedFuture ( null );
80
+ }
81
+
82
+ return new DemandConsumer <>( subscriber , resultHolder .getRequestedRecordsCounter () )
83
+ .getCompletedStage ()
84
+ .thenCompose ( completionReason ->
85
+ {
86
+ CompletionStage <Void > result ;
87
+ switch ( completionReason )
88
+ {
89
+ case REQUESTED_DEMAND_CONSUMED :
90
+ result = subscriber .getSubscriptionStage ().thenApply ( subscription ->
91
+ {
92
+ subscription .cancel ();
93
+ return null ;
94
+ } );
95
+ break ;
96
+ case RECORD_STREAM_EXHAUSTED :
97
+ result = CompletableFuture .completedFuture ( null );
98
+ break ;
99
+ default :
100
+ result = new CompletableFuture <>();
101
+ result .toCompletableFuture ()
102
+ .completeExceptionally ( new RuntimeException ( "Unexpected completion reason: " + completionReason ) );
103
+ }
104
+ return result ;
105
+ } );
106
+ }
107
+
59
108
private Session createResponse ()
60
109
{
61
110
return Session .builder ().data ( Session .SessionBody .builder ().id ( data .getSessionId () ).build () ).build ();
62
111
}
63
112
113
+ private static class DemandConsumer <T >
114
+ {
115
+ private final RxBlockingSubscriber <T > subscriber ;
116
+ private final AtomicLong unfulfilledDemandCounter ;
117
+ @ Getter
118
+ private final CompletableFuture <CompletionReason > completedStage = new CompletableFuture <>();
119
+
120
+ private enum CompletionReason
121
+ {
122
+ REQUESTED_DEMAND_CONSUMED ,
123
+ RECORD_STREAM_EXHAUSTED
124
+ }
125
+
126
+ private DemandConsumer ( RxBlockingSubscriber <T > subscriber , AtomicLong unfulfilledDemandCounter )
127
+ {
128
+ this .subscriber = subscriber ;
129
+ this .unfulfilledDemandCounter = unfulfilledDemandCounter ;
130
+
131
+ subscriber .getCompletionStage ().whenComplete ( this ::onComplete );
132
+ if ( this .unfulfilledDemandCounter .get () > 0 )
133
+ {
134
+ setupNextSignalConsumer ();
135
+ }
136
+ }
137
+
138
+ private void setupNextSignalConsumer ()
139
+ {
140
+ CompletableFuture <T > consumer = new CompletableFuture <>();
141
+ subscriber .setNextSignalConsumer ( consumer );
142
+ consumer .whenComplete ( this ::onNext );
143
+ }
144
+
145
+ private void onNext ( T ignored , Throwable throwable )
146
+ {
147
+ if ( throwable != null )
148
+ {
149
+ completedStage .completeExceptionally ( throwable );
150
+ return ;
151
+ }
152
+
153
+ if ( unfulfilledDemandCounter .decrementAndGet () > 0 )
154
+ {
155
+ setupNextSignalConsumer ();
156
+ }
157
+ else
158
+ {
159
+ completedStage .complete ( CompletionReason .REQUESTED_DEMAND_CONSUMED );
160
+ }
161
+ }
162
+
163
+ private void onComplete ( Void ignored , Throwable throwable )
164
+ {
165
+ if ( throwable != null )
166
+ {
167
+ completedStage .completeExceptionally ( throwable );
168
+ }
169
+ else
170
+ {
171
+ completedStage .complete ( CompletionReason .RECORD_STREAM_EXHAUSTED );
172
+ }
173
+ }
174
+ }
175
+
64
176
@ Setter
65
177
@ Getter
66
178
private static class SessionCloseBody
0 commit comments