Skip to content

Commit c1bc870

Browse files
committed
Add holder objects for driver, session, transaction and result objects in backend
This update also removes hardcoded fetch size in reactive backend and adds support for sending additional PULL requests when there is more data to consume.
1 parent 6ee3936 commit c1bc870

35 files changed

+947
-426
lines changed

testkit-backend/src/main/java/neo4j/org/testkit/backend/RxBlockingSubscriber.java

+14-4
Original file line numberDiff line numberDiff line change
@@ -18,23 +18,33 @@
1818
*/
1919
package neo4j.org.testkit.backend;
2020

21-
import lombok.Getter;
2221
import org.reactivestreams.Subscriber;
2322
import org.reactivestreams.Subscription;
2423

2524
import java.util.concurrent.CompletableFuture;
25+
import java.util.concurrent.CompletionStage;
2626

2727
public class RxBlockingSubscriber<T> implements Subscriber<T>
2828
{
29-
@Getter
3029
private final CompletableFuture<Subscription> subscriptionFuture = new CompletableFuture<>();
30+
private final CompletableFuture<Void> completionFuture = new CompletableFuture<>();
3131
private CompletableFuture<CompletableFuture<T>> nextSignalConsumerFuture;
3232

3333
public void setNextSignalConsumer( CompletableFuture<T> nextSignalConsumer )
3434
{
3535
nextSignalConsumerFuture.complete( nextSignalConsumer );
3636
}
3737

38+
public CompletionStage<Subscription> getSubscriptionStage()
39+
{
40+
return subscriptionFuture;
41+
}
42+
43+
public CompletionStage<Void> getCompletionStage()
44+
{
45+
return completionFuture;
46+
}
47+
3848
@Override
3949
public void onSubscribe( Subscription s )
4050
{
@@ -51,13 +61,13 @@ public void onNext( T t )
5161
@Override
5262
public void onError( Throwable t )
5363
{
54-
blockUntilNextSignalConsumer().completeExceptionally( t );
64+
completionFuture.completeExceptionally( t );
5565
}
5666

5767
@Override
5868
public void onComplete()
5969
{
60-
blockUntilNextSignalConsumer().complete( null );
70+
completionFuture.complete( null );
6171
}
6272

6373
private CompletableFuture<T> blockUntilNextSignalConsumer()

testkit-backend/src/main/java/neo4j/org/testkit/backend/TestkitState.java

+143-53
Original file line numberDiff line numberDiff line change
@@ -18,53 +18,56 @@
1818
*/
1919
package neo4j.org.testkit.backend;
2020

21-
import lombok.AccessLevel;
2221
import lombok.Getter;
22+
import neo4j.org.testkit.backend.holder.AsyncSessionHolder;
23+
import neo4j.org.testkit.backend.holder.AsyncTransactionHolder;
24+
import neo4j.org.testkit.backend.holder.DriverHolder;
25+
import neo4j.org.testkit.backend.holder.ResultCursorHolder;
26+
import neo4j.org.testkit.backend.holder.ResultHolder;
27+
import neo4j.org.testkit.backend.holder.RxResultHolder;
28+
import neo4j.org.testkit.backend.holder.RxSessionHolder;
29+
import neo4j.org.testkit.backend.holder.RxTransactionHolder;
30+
import neo4j.org.testkit.backend.holder.SessionHolder;
31+
import neo4j.org.testkit.backend.holder.TransactionHolder;
2332
import neo4j.org.testkit.backend.messages.requests.TestkitCallbackResult;
2433
import neo4j.org.testkit.backend.messages.responses.TestkitResponse;
2534
import reactor.core.publisher.Mono;
2635

2736
import java.util.HashMap;
2837
import java.util.Map;
2938
import java.util.concurrent.CompletableFuture;
39+
import java.util.concurrent.CompletionStage;
3040
import java.util.concurrent.atomic.AtomicInteger;
3141
import java.util.function.Consumer;
3242

33-
import org.neo4j.driver.Driver;
34-
import org.neo4j.driver.Record;
35-
import org.neo4j.driver.Result;
36-
import org.neo4j.driver.Transaction;
37-
import org.neo4j.driver.async.AsyncTransaction;
38-
import org.neo4j.driver.async.ResultCursor;
3943
import org.neo4j.driver.exceptions.Neo4jException;
4044
import org.neo4j.driver.internal.cluster.RoutingTableRegistry;
41-
import org.neo4j.driver.reactive.RxResult;
42-
import org.neo4j.driver.reactive.RxTransaction;
4345

44-
@Getter
4546
public class TestkitState
4647
{
48+
private static final String DRIVER_NOT_FOUND_MESSAGE = "Could not find driver";
49+
private static final String SESSION_NOT_FOUND_MESSAGE = "Could not find session";
4750
private static final String TRANSACTION_NOT_FOUND_MESSAGE = "Could not find transaction";
51+
private static final String RESULT_NOT_FOUND_MESSAGE = "Could not find result";
4852

49-
private final Map<String,Driver> drivers = new HashMap<>();
53+
private final Map<String,DriverHolder> driverIdToDriverHolder = new HashMap<>();
54+
@Getter
5055
private final Map<String,RoutingTableRegistry> routingTableRegistry = new HashMap<>();
51-
private final Map<String,SessionState> sessionStates = new HashMap<>();
52-
private final Map<String,AsyncSessionState> asyncSessionStates = new HashMap<>();
53-
private final Map<String,RxSessionState> rxSessionStates = new HashMap<>();
54-
private final Map<String,Result> results = new HashMap<>();
55-
private final Map<String,ResultCursor> resultCursors = new HashMap<>();
56-
private final Map<String,RxResult> rxResults = new HashMap<>();
57-
private final Map<String,RxBlockingSubscriber<Record>> rxResultIdToRecordSubscriber = new HashMap<>();
58-
@Getter( AccessLevel.NONE )
59-
private final Map<String,Transaction> transactions = new HashMap<>();
60-
@Getter( AccessLevel.NONE )
61-
private final Map<String,AsyncTransaction> asyncTransactions = new HashMap<>();
62-
@Getter( AccessLevel.NONE )
63-
private final Map<String,RxTransaction> rxTransactions = new HashMap<>();
56+
private final Map<String,SessionHolder> sessionIdToSessionHolder = new HashMap<>();
57+
private final Map<String,AsyncSessionHolder> sessionIdToAsyncSessionHolder = new HashMap<>();
58+
private final Map<String,RxSessionHolder> sessionIdToRxSessionHolder = new HashMap<>();
59+
private final Map<String,ResultHolder> resultIdToResultHolder = new HashMap<>();
60+
private final Map<String,ResultCursorHolder> resultIdToResultCursorHolder = new HashMap<>();
61+
private final Map<String,RxResultHolder> resultIdToRxResultHolder = new HashMap<>();
62+
private final Map<String,TransactionHolder> transactionIdToTransactionHolder = new HashMap<>();
63+
private final Map<String,AsyncTransactionHolder> transactionIdToAsyncTransactionHolder = new HashMap<>();
64+
private final Map<String,RxTransactionHolder> transactionIdToRxTransactionHolder = new HashMap<>();
65+
@Getter
6466
private final Map<String,Neo4jException> errors = new HashMap<>();
65-
@Getter( AccessLevel.NONE )
6667
private final AtomicInteger idGenerator = new AtomicInteger( 0 );
68+
@Getter
6769
private final Consumer<TestkitResponse> responseWriter;
70+
@Getter
6871
private final Map<String,CompletableFuture<TestkitCallbackResult>> callbackIdToFuture = new HashMap<>();
6972

7073
public TestkitState( Consumer<TestkitResponse> responseWriter )
@@ -77,53 +80,140 @@ public String newId()
7780
return String.valueOf( idGenerator.getAndIncrement() );
7881
}
7982

80-
public String addTransaction( Transaction transaction )
83+
public void addDriverHolder( String id, DriverHolder driverHolder )
8184
{
82-
String id = newId();
83-
this.transactions.put( id, transaction );
84-
return id;
85+
driverIdToDriverHolder.put( id, driverHolder );
8586
}
8687

87-
public Transaction getTransaction( String id )
88+
public DriverHolder getDriverHolder( String id )
8889
{
89-
if ( !this.transactions.containsKey( id ) )
90-
{
91-
throw new RuntimeException( TRANSACTION_NOT_FOUND_MESSAGE );
92-
}
93-
return this.transactions.get( id );
90+
return get( id, driverIdToDriverHolder, DRIVER_NOT_FOUND_MESSAGE );
9491
}
9592

96-
public String addAsyncTransaction( AsyncTransaction transaction )
93+
public String addSessionHolder( SessionHolder sessionHolder )
9794
{
98-
String id = newId();
99-
this.asyncTransactions.put( id, transaction );
100-
return id;
95+
return add( sessionHolder, sessionIdToSessionHolder );
10196
}
10297

103-
public CompletableFuture<AsyncTransaction> getAsyncTransaction( String id )
98+
public SessionHolder getSessionHolder( String id )
10499
{
105-
if ( !this.asyncTransactions.containsKey( id ) )
106-
{
107-
CompletableFuture<AsyncTransaction> future = new CompletableFuture<>();
108-
future.completeExceptionally( new RuntimeException( TRANSACTION_NOT_FOUND_MESSAGE ) );
109-
return future;
110-
}
111-
return CompletableFuture.completedFuture( asyncTransactions.get( id ) );
100+
return get( id, sessionIdToSessionHolder, SESSION_NOT_FOUND_MESSAGE );
101+
}
102+
103+
public String addAsyncSessionHolder( AsyncSessionHolder sessionHolder )
104+
{
105+
return add( sessionHolder, sessionIdToAsyncSessionHolder );
106+
}
107+
108+
public CompletionStage<AsyncSessionHolder> getAsyncSessionHolder( String id )
109+
{
110+
return getAsync( id, sessionIdToAsyncSessionHolder, SESSION_NOT_FOUND_MESSAGE );
111+
}
112+
113+
public String addRxSessionHolder( RxSessionHolder sessionHolder )
114+
{
115+
return add( sessionHolder, sessionIdToRxSessionHolder );
116+
}
117+
118+
public Mono<RxSessionHolder> getRxSessionHolder( String id )
119+
{
120+
return getRx( id, sessionIdToRxSessionHolder, SESSION_NOT_FOUND_MESSAGE );
121+
}
122+
123+
public String addTransactionHolder( TransactionHolder transactionHolder )
124+
{
125+
return add( transactionHolder, transactionIdToTransactionHolder );
126+
}
127+
128+
public TransactionHolder getTransactionHolder( String id )
129+
{
130+
return get( id, transactionIdToTransactionHolder, TRANSACTION_NOT_FOUND_MESSAGE );
131+
}
132+
133+
public String addAsyncTransactionHolder( AsyncTransactionHolder transactionHolder )
134+
{
135+
return add( transactionHolder, transactionIdToAsyncTransactionHolder );
136+
}
137+
138+
public CompletionStage<AsyncTransactionHolder> getAsyncTransactionHolder( String id )
139+
{
140+
return getAsync( id, transactionIdToAsyncTransactionHolder, TRANSACTION_NOT_FOUND_MESSAGE );
141+
}
142+
143+
public String addRxTransactionHolder( RxTransactionHolder transactionHolder )
144+
{
145+
return add( transactionHolder, transactionIdToRxTransactionHolder );
146+
}
147+
148+
public Mono<RxTransactionHolder> getRxTransactionHolder( String id )
149+
{
150+
return getRx( id, transactionIdToRxTransactionHolder, TRANSACTION_NOT_FOUND_MESSAGE );
151+
}
152+
153+
public String addResultHolder( ResultHolder resultHolder )
154+
{
155+
return add( resultHolder, resultIdToResultHolder );
156+
}
157+
158+
public ResultHolder getResultHolder( String id )
159+
{
160+
return get( id, resultIdToResultHolder, RESULT_NOT_FOUND_MESSAGE );
161+
}
162+
163+
public String addAsyncResultHolder( ResultCursorHolder resultHolder )
164+
{
165+
return add( resultHolder, resultIdToResultCursorHolder );
166+
}
167+
168+
public CompletionStage<ResultCursorHolder> getAsyncResultHolder( String id )
169+
{
170+
return getAsync( id, resultIdToResultCursorHolder, RESULT_NOT_FOUND_MESSAGE );
112171
}
113172

114-
public String addRxTransaction( RxTransaction transaction )
173+
public String addRxResultHolder( RxResultHolder resultHolder )
174+
{
175+
return add( resultHolder, resultIdToRxResultHolder );
176+
}
177+
178+
public Mono<RxResultHolder> getRxResultHolder( String id )
179+
{
180+
return getRx( id, resultIdToRxResultHolder, RESULT_NOT_FOUND_MESSAGE );
181+
}
182+
183+
private <T> String add( T value, Map<String,T> idToT )
115184
{
116185
String id = newId();
117-
this.rxTransactions.put( id, transaction );
186+
idToT.put( id, value );
118187
return id;
119188
}
120189

121-
public Mono<RxTransaction> getRxTransaction( String id )
190+
private <T> T get( String id, Map<String,T> idToT, String notFoundMessage )
191+
{
192+
T value = idToT.get( id );
193+
if ( value == null )
194+
{
195+
throw new RuntimeException( notFoundMessage );
196+
}
197+
return value;
198+
}
199+
200+
private <T> CompletableFuture<T> getAsync( String id, Map<String,T> idToT, String notFoundMessage )
122201
{
123-
if ( !this.rxTransactions.containsKey( id ) )
202+
CompletableFuture<T> result = new CompletableFuture<>();
203+
T value = idToT.get( id );
204+
if ( value == null )
205+
{
206+
result.completeExceptionally( new RuntimeException( notFoundMessage ) );
207+
}
208+
else
124209
{
125-
return Mono.error( new RuntimeException( TRANSACTION_NOT_FOUND_MESSAGE ) );
210+
result.complete( value );
126211
}
127-
return Mono.just( rxTransactions.get( id ) );
212+
return result;
213+
}
214+
215+
private <T> Mono<T> getRx( String id, Map<String,T> idToT, String notFoundMessage )
216+
{
217+
return Mono.fromCompletionStage( getAsync( id, idToT, notFoundMessage ) );
128218
}
129219
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
/*
2+
* Copyright (c) "Neo4j"
3+
* Neo4j Sweden AB [http://neo4j.com]
4+
*
5+
* This file is part of Neo4j.
6+
*
7+
* Licensed under the Apache License, Version 2.0 (the "License");
8+
* you may not use this file except in compliance with the License.
9+
* You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
*/
19+
package neo4j.org.testkit.backend.holder;
20+
21+
import lombok.Getter;
22+
23+
import java.util.Optional;
24+
25+
public abstract class AbstractResultHolder<T1, T2 extends AbstractTransactionHolder<?,?>, T3>
26+
{
27+
private final T1 sessionHolder;
28+
private final T2 transactionHolder;
29+
@Getter
30+
private final T3 result;
31+
32+
public AbstractResultHolder( T1 sessionHolder, T3 result )
33+
{
34+
this.sessionHolder = sessionHolder;
35+
this.transactionHolder = null;
36+
this.result = result;
37+
}
38+
39+
public AbstractResultHolder( T2 transactionHolder, T3 result )
40+
{
41+
this.sessionHolder = null;
42+
this.transactionHolder = transactionHolder;
43+
this.result = result;
44+
}
45+
46+
public T1 getSessionHolder()
47+
{
48+
return transactionHolder != null ? getSessionHolder( transactionHolder ) : sessionHolder;
49+
}
50+
51+
public Optional<T2> getTransactionHolder()
52+
{
53+
return Optional.ofNullable( transactionHolder );
54+
}
55+
56+
protected abstract T1 getSessionHolder( T2 transactionHolder );
57+
}

testkit-backend/src/main/java/neo4j/org/testkit/backend/AsyncSessionState.java renamed to testkit-backend/src/main/java/neo4j/org/testkit/backend/holder/AbstractSessionHolder.java

+9-10
Original file line numberDiff line numberDiff line change
@@ -16,24 +16,23 @@
1616
* See the License for the specific language governing permissions and
1717
* limitations under the License.
1818
*/
19-
package neo4j.org.testkit.backend;
19+
package neo4j.org.testkit.backend.holder;
2020

2121
import lombok.Getter;
22+
import lombok.RequiredArgsConstructor;
2223
import lombok.Setter;
2324

2425
import java.util.concurrent.CompletableFuture;
2526

26-
import org.neo4j.driver.async.AsyncSession;
27+
import org.neo4j.driver.SessionConfig;
2728

29+
@RequiredArgsConstructor
2830
@Getter
29-
@Setter
30-
public class AsyncSessionState
31+
public abstract class AbstractSessionHolder<T>
3132
{
32-
public AsyncSession session;
33+
public final DriverHolder driverHolder;
34+
public final T session;
35+
public final SessionConfig config;
36+
@Setter
3337
public CompletableFuture<Void> txWorkFuture;
34-
35-
public AsyncSessionState( AsyncSession session )
36-
{
37-
this.session = session;
38-
}
3938
}

0 commit comments

Comments
 (0)