20
20
21
21
import lombok .Getter ;
22
22
import lombok .Setter ;
23
+ import neo4j .org .testkit .backend .RxBlockingSubscriber ;
23
24
import neo4j .org .testkit .backend .TestkitState ;
25
+ import neo4j .org .testkit .backend .holder .RxResultHolder ;
24
26
import neo4j .org .testkit .backend .messages .responses .Session ;
25
27
import neo4j .org .testkit .backend .messages .responses .TestkitResponse ;
26
28
import reactor .core .publisher .Mono ;
27
29
30
+ import java .util .concurrent .CompletableFuture ;
28
31
import java .util .concurrent .CompletionStage ;
32
+ import java .util .concurrent .atomic .AtomicLong ;
33
+
34
+ import org .neo4j .driver .Record ;
29
35
30
36
@ Setter
31
37
@ Getter
@@ -52,15 +58,117 @@ public CompletionStage<TestkitResponse> processAsync( TestkitState testkitState
52
58
public Mono <TestkitResponse > processRx ( TestkitState testkitState )
53
59
{
54
60
return testkitState .getRxSessionHolder ( data .getSessionId () )
55
- .flatMap ( sessionHolder -> Mono .fromDirect ( sessionHolder .getSession ().close () ) )
61
+ .flatMap ( sessionHolder ->
62
+ sessionHolder .getResultHolder ()
63
+ .map ( this ::consumeRequestedDemandAndCancel )
64
+ .orElse ( Mono .empty () )
65
+ .then ( Mono .fromDirect ( sessionHolder .getSession ().close () ) ) )
56
66
.then ( Mono .just ( createResponse () ) );
57
67
}
58
68
69
+ private Mono <Void > consumeRequestedDemandAndCancel ( RxResultHolder resultHolder )
70
+ {
71
+ return resultHolder .getSubscriber ()
72
+ .map ( subscriber -> Mono .fromCompletionStage ( consumeRequestedDemandAndCancel ( resultHolder , subscriber ) ) )
73
+ .orElse ( Mono .empty () );
74
+ }
75
+
76
+ private CompletionStage <Void > consumeRequestedDemandAndCancel ( RxResultHolder resultHolder , RxBlockingSubscriber <Record > subscriber )
77
+ {
78
+ RemainingDemandConsumer remainingDemandConsumer = new RemainingDemandConsumer ( subscriber , resultHolder .getRequestedRecordsCounter () );
79
+ return remainingDemandConsumer .getCompletedStage ()
80
+ .thenCompose ( completionReason ->
81
+ {
82
+ CompletionStage <Void > result ;
83
+ switch ( completionReason )
84
+ {
85
+ case REQUESTED_DEMAND_CONSUMED :
86
+ result = subscriber .getSubscriptionStage ().thenApply ( subscription ->
87
+ {
88
+ subscription .cancel ();
89
+ return null ;
90
+ } );
91
+ break ;
92
+ case RECORD_STREAM_EXHAUSTED :
93
+ result = CompletableFuture .completedFuture ( null );
94
+ break ;
95
+ default :
96
+ result = new CompletableFuture <>();
97
+ result .toCompletableFuture ()
98
+ .completeExceptionally ( new RuntimeException ( "Unexpected completion reason" ) );
99
+ }
100
+ return result ;
101
+ } );
102
+ }
103
+
59
104
private Session createResponse ()
60
105
{
61
106
return Session .builder ().data ( Session .SessionBody .builder ().id ( data .getSessionId () ).build () ).build ();
62
107
}
63
108
109
+ private static class RemainingDemandConsumer
110
+ {
111
+ private final RxBlockingSubscriber <Record > subscriber ;
112
+ private final AtomicLong remainingRequestedDemand ;
113
+ @ Getter
114
+ private final CompletableFuture <CompletionReason > completedStage = new CompletableFuture <>();
115
+
116
+ private enum CompletionReason
117
+ {
118
+ REQUESTED_DEMAND_CONSUMED ,
119
+ RECORD_STREAM_EXHAUSTED
120
+ }
121
+
122
+ private RemainingDemandConsumer ( RxBlockingSubscriber <Record > subscriber , AtomicLong remainingRequestedDemand )
123
+ {
124
+ this .subscriber = subscriber ;
125
+ this .remainingRequestedDemand = remainingRequestedDemand ;
126
+
127
+ subscriber .getCompletionStage ().whenComplete ( this ::onComplete );
128
+ if ( this .remainingRequestedDemand .get () > 0 )
129
+ {
130
+ setNextSignalConsumer ();
131
+ }
132
+ }
133
+
134
+ private void setNextSignalConsumer ()
135
+ {
136
+ CompletableFuture <Record > recordConsumer = new CompletableFuture <>();
137
+ subscriber .setNextSignalConsumer ( recordConsumer );
138
+ recordConsumer .whenComplete ( this ::onNext );
139
+ }
140
+
141
+ private void onNext ( Record ignored , Throwable throwable )
142
+ {
143
+ if ( throwable != null )
144
+ {
145
+ completedStage .completeExceptionally ( throwable );
146
+ return ;
147
+ }
148
+
149
+ if ( remainingRequestedDemand .decrementAndGet () > 0 )
150
+ {
151
+ setNextSignalConsumer ();
152
+ }
153
+ else
154
+ {
155
+ completedStage .complete ( CompletionReason .REQUESTED_DEMAND_CONSUMED );
156
+ }
157
+ }
158
+
159
+ private void onComplete ( Void ignored , Throwable throwable )
160
+ {
161
+ if ( throwable != null )
162
+ {
163
+ completedStage .completeExceptionally ( throwable );
164
+ }
165
+ else
166
+ {
167
+ completedStage .complete ( CompletionReason .RECORD_STREAM_EXHAUSTED );
168
+ }
169
+ }
170
+ }
171
+
64
172
@ Setter
65
173
@ Getter
66
174
private static class SessionCloseBody
0 commit comments