Skip to content

Commit 1c4916c

Browse files
authored
resume AQL cursor in transaction (#571)
1 parent 8bb2a91 commit 1c4916c

File tree

8 files changed

+154
-10
lines changed

8 files changed

+154
-10
lines changed

Diff for: core/src/main/java/com/arangodb/ArangoDatabase.java

+27
Original file line numberDiff line numberDiff line change
@@ -314,6 +314,19 @@ public interface ArangoDatabase extends ArangoSerdeAccessor {
314314
*/
315315
<T> ArangoCursor<T> cursor(String cursorId, Class<T> type);
316316

317+
/**
318+
* Return an cursor from the given cursor-ID if still existing
319+
*
320+
* @param cursorId The ID of the cursor
321+
* @param type The type of the result (POJO or {@link com.arangodb.util.RawData})
322+
* @param options options
323+
* @return cursor of the results
324+
* @see <a href=
325+
* "https://docs.arangodb.com/stable/develop/http-api/queries/aql-queries/#read-the-next-batch-from-a-cursor">API
326+
* Documentation</a>
327+
*/
328+
<T> ArangoCursor<T> cursor(String cursorId, Class<T> type, AqlQueryOptions options);
329+
317330
/**
318331
* Return an cursor from the given cursor-ID if still existing
319332
*
@@ -327,6 +340,20 @@ public interface ArangoDatabase extends ArangoSerdeAccessor {
327340
*/
328341
<T> ArangoCursor<T> cursor(String cursorId, Class<T> type, String nextBatchId);
329342

343+
/**
344+
* Return an cursor from the given cursor-ID if still existing
345+
*
346+
* @param cursorId The ID of the cursor
347+
* @param type The type of the result (POJO or {@link com.arangodb.util.RawData})
348+
* @param nextBatchId The ID of the next cursor batch (set only if cursor allows retries, see
349+
* {@link AqlQueryOptions#allowRetry(Boolean)}
350+
* @param options options
351+
* @return cursor of the results
352+
* @see <a href= "https://docs.arangodb.com/stable/develop/http-api/queries/aql-queries/#read-the-next-batch-from-a-cursor">API Documentation</a>
353+
* @since ArangoDB 3.11
354+
*/
355+
<T> ArangoCursor<T> cursor(String cursorId, Class<T> type, String nextBatchId, AqlQueryOptions options);
356+
330357
/**
331358
* Explain an AQL query and return information about it
332359
*

Diff for: core/src/main/java/com/arangodb/ArangoDatabaseAsync.java

+4
Original file line numberDiff line numberDiff line change
@@ -156,8 +156,12 @@ public interface ArangoDatabaseAsync extends ArangoSerdeAccessor {
156156

157157
<T> CompletableFuture<ArangoCursorAsync<T>> cursor(String cursorId, Class<T> type);
158158

159+
<T> CompletableFuture<ArangoCursorAsync<T>> cursor(String cursorId, Class<T> type, AqlQueryOptions options);
160+
159161
<T> CompletableFuture<ArangoCursorAsync<T>> cursor(String cursorId, Class<T> type, String nextBatchId);
160162

163+
<T> CompletableFuture<ArangoCursorAsync<T>> cursor(String cursorId, Class<T> type, String nextBatchId, AqlQueryOptions options);
164+
161165
/**
162166
* Asynchronous version of {@link ArangoDatabase#explainQuery(String, Map, AqlQueryExplainOptions)}
163167
*/

Diff for: core/src/main/java/com/arangodb/entity/CursorEntity.java

+4
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,10 @@ public String getId() {
4444
return id;
4545
}
4646

47+
public void setId(String id) {
48+
this.id = id;
49+
}
50+
4751
/**
4852
* @return the total number of result documents available (only available if the query was executed with the count
4953
* attribute set)

Diff for: core/src/main/java/com/arangodb/internal/ArangoDatabaseAsyncImpl.java

+14-4
Original file line numberDiff line numberDiff line change
@@ -193,15 +193,25 @@ public <T> CompletableFuture<ArangoCursorAsync<T>> query(String query, Class<T>
193193

194194
@Override
195195
public <T> CompletableFuture<ArangoCursorAsync<T>> cursor(final String cursorId, final Class<T> type) {
196-
return cursor(cursorId, type, null);
196+
return cursor(cursorId, type, null, new AqlQueryOptions());
197+
}
198+
199+
@Override
200+
public <T> CompletableFuture<ArangoCursorAsync<T>> cursor(String cursorId, Class<T> type, AqlQueryOptions options) {
201+
return cursor(cursorId, type, null, options);
197202
}
198203

199204
@Override
200205
public <T> CompletableFuture<ArangoCursorAsync<T>> cursor(final String cursorId, final Class<T> type, final String nextBatchId) {
201-
final HostHandle hostHandle = new HostHandle();
206+
return cursor(cursorId, type, nextBatchId, new AqlQueryOptions());
207+
}
208+
209+
@Override
210+
public <T> CompletableFuture<ArangoCursorAsync<T>> cursor(String cursorId, Class<T> type, String nextBatchId, AqlQueryOptions options) {
211+
options.allowRetry(nextBatchId != null);
212+
HostHandle hostHandle = new HostHandle();
202213
return executorAsync()
203-
.execute(() ->
204-
queryNextRequest(cursorId, new AqlQueryOptions(), nextBatchId),
214+
.execute(() -> queryNextRequest(cursorId, options, nextBatchId),
205215
cursorEntityDeserializer(type),
206216
hostHandle)
207217
.thenApply(res -> new ArangoCursorAsyncImpl<>(this, res, type, hostHandle, nextBatchId != null));

Diff for: core/src/main/java/com/arangodb/internal/ArangoDatabaseImpl.java

+16-5
Original file line numberDiff line numberDiff line change
@@ -186,17 +186,28 @@ public <T> ArangoCursor<T> query(final String query, final Class<T> type) {
186186

187187
@Override
188188
public <T> ArangoCursor<T> cursor(final String cursorId, final Class<T> type) {
189-
return cursor(cursorId, type, null);
189+
return cursor(cursorId, type, null, new AqlQueryOptions());
190+
}
191+
192+
@Override
193+
public <T> ArangoCursor<T> cursor(final String cursorId, final Class<T> type, final AqlQueryOptions options) {
194+
return cursor(cursorId, type, null, options);
190195
}
191196

192197
@Override
193198
public <T> ArangoCursor<T> cursor(final String cursorId, final Class<T> type, final String nextBatchId) {
194-
final HostHandle hostHandle = new HostHandle();
195-
final CursorEntity<T> result = executorSync().execute(
196-
queryNextRequest(cursorId, new AqlQueryOptions(), nextBatchId),
199+
return cursor(cursorId, type, nextBatchId, new AqlQueryOptions());
200+
}
201+
202+
@Override
203+
public <T> ArangoCursor<T> cursor(final String cursorId, final Class<T> type, final String nextBatchId, final AqlQueryOptions options) {
204+
options.allowRetry(nextBatchId != null);
205+
HostHandle hostHandle = new HostHandle();
206+
CursorEntity<T> result = executorSync().execute(
207+
queryNextRequest(cursorId, options, nextBatchId),
197208
cursorEntityDeserializer(type),
198209
hostHandle);
199-
return createCursor(result, type, null, hostHandle);
210+
return createCursor(result, type, options, hostHandle);
200211
}
201212

202213
private <T> ArangoCursor<T> createCursor(

Diff for: core/src/main/java/com/arangodb/internal/cursor/ArangoCursorAsyncImpl.java

+7-1
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ public class ArangoCursorAsyncImpl<T> extends InternalArangoCursor<T> implements
1717

1818
private final ArangoDatabaseAsyncImpl db;
1919
private final HostHandle hostHandle;
20+
private final CursorEntity<T> entity;
2021

2122
public ArangoCursorAsyncImpl(
2223
final ArangoDatabaseAsyncImpl db,
@@ -28,13 +29,18 @@ public ArangoCursorAsyncImpl(
2829
super(db, db.name(), entity, type, allowRetry);
2930
this.db = db;
3031
this.hostHandle = hostHandle;
32+
this.entity = entity;
3133
}
3234

3335
@Override
3436
public CompletableFuture<ArangoCursorAsync<T>> nextBatch() {
3537
if (Boolean.TRUE.equals(hasMore())) {
3638
return executorAsync().execute(this::queryNextRequest, db.cursorEntityDeserializer(getType()), hostHandle)
37-
.thenApply(r -> new ArangoCursorAsyncImpl<>(db, r, getType(), hostHandle, allowRetry()));
39+
.thenApply(r -> {
40+
// needed because the latest batch does not return the cursor id
41+
r.setId(entity.getId());
42+
return new ArangoCursorAsyncImpl<>(db, r, getType(), hostHandle, allowRetry());
43+
});
3844
} else {
3945
CompletableFuture<ArangoCursorAsync<T>> cf = new CompletableFuture<>();
4046
cf.completeExceptionally(new NoSuchElementException());

Diff for: test-functional/src/test/java/com/arangodb/ArangoDatabaseAsyncTest.java

+42
Original file line numberDiff line numberDiff line change
@@ -778,6 +778,26 @@ void queryCursor(ArangoDatabaseAsync db) throws ExecutionException, InterruptedE
778778
assertThat(result).containsExactly(1, 2, 3, 4);
779779
}
780780

781+
@ParameterizedTest
782+
@MethodSource("asyncDbs")
783+
void queryCursorInTx(ArangoDatabaseAsync db) throws ExecutionException, InterruptedException {
784+
StreamTransactionEntity tx = db.beginStreamTransaction(new StreamTransactionOptions()).get();
785+
ArangoCursorAsync<Integer> c1 = db.query("for i in 1..4 return i", Integer.class,
786+
new AqlQueryOptions().batchSize(1).streamTransactionId(tx.getId())).get();
787+
List<Integer> result = new ArrayList<>();
788+
result.addAll(c1.getResult());
789+
ArangoCursorAsync<Integer> c2 = c1.nextBatch().get();
790+
result.addAll(c2.getResult());
791+
ArangoCursorAsync<Integer> c3 = db.cursor(c2.getId(), Integer.class,
792+
new AqlQueryOptions().streamTransactionId(tx.getId())).get();
793+
result.addAll(c3.getResult());
794+
ArangoCursorAsync<Integer> c4 = c3.nextBatch().get();
795+
result.addAll(c4.getResult());
796+
assertThat(c4.hasMore()).isFalse();
797+
assertThat(result).containsExactly(1, 2, 3, 4);
798+
db.abortStreamTransaction(tx.getId()).get();
799+
}
800+
781801
@ParameterizedTest
782802
@MethodSource("asyncDbs")
783803
void queryCursorRetry(ArangoDatabaseAsync db) throws ExecutionException, InterruptedException {
@@ -797,6 +817,28 @@ void queryCursorRetry(ArangoDatabaseAsync db) throws ExecutionException, Interru
797817
assertThat(result).containsExactly(1, 2, 3, 4);
798818
}
799819

820+
@ParameterizedTest
821+
@MethodSource("asyncDbs")
822+
void queryCursorRetryInTx(ArangoDatabaseAsync db) throws ExecutionException, InterruptedException {
823+
assumeTrue(isAtLeastVersion(3, 11));
824+
StreamTransactionEntity tx = db.beginStreamTransaction(new StreamTransactionOptions()).get();
825+
ArangoCursorAsync<Integer> c1 = db.query("for i in 1..4 return i", Integer.class,
826+
new AqlQueryOptions().batchSize(1).allowRetry(true).streamTransactionId(tx.getId())).get();
827+
List<Integer> result = new ArrayList<>();
828+
result.addAll(c1.getResult());
829+
ArangoCursorAsync<Integer> c2 = c1.nextBatch().get();
830+
result.addAll(c2.getResult());
831+
ArangoCursorAsync<Integer> c3 = db.cursor(c2.getId(), Integer.class, c2.getNextBatchId(),
832+
new AqlQueryOptions().streamTransactionId(tx.getId())).get();
833+
result.addAll(c3.getResult());
834+
ArangoCursorAsync<Integer> c4 = c3.nextBatch().get();
835+
result.addAll(c4.getResult());
836+
c4.close();
837+
assertThat(c4.hasMore()).isFalse();
838+
assertThat(result).containsExactly(1, 2, 3, 4);
839+
db.abortStreamTransaction(tx.getId()).get();
840+
}
841+
800842
@ParameterizedTest
801843
@MethodSource("asyncDbs")
802844
void changeQueryTrackingProperties(ArangoDatabaseAsync db) throws ExecutionException, InterruptedException {

Diff for: test-functional/src/test/java/com/arangodb/ArangoDatabaseTest.java

+40
Original file line numberDiff line numberDiff line change
@@ -848,6 +848,25 @@ void queryCursor(ArangoDatabase db) {
848848
assertThat(result).containsExactly(1, 2, 3, 4);
849849
}
850850

851+
@ParameterizedTest
852+
@MethodSource("dbs")
853+
void queryCursorInTx(ArangoDatabase db) {
854+
StreamTransactionEntity tx = db.beginStreamTransaction(new StreamTransactionOptions());
855+
ArangoCursor<Integer> cursor = db.query("for i in 1..4 return i", Integer.class,
856+
new AqlQueryOptions().batchSize(1).streamTransactionId(tx.getId()));
857+
List<Integer> result = new ArrayList<>();
858+
result.add(cursor.next());
859+
result.add(cursor.next());
860+
ArangoCursor<Integer> cursor2 = db.cursor(cursor.getId(), Integer.class,
861+
new AqlQueryOptions().streamTransactionId(tx.getId())
862+
);
863+
result.add(cursor2.next());
864+
result.add(cursor2.next());
865+
assertThat(cursor2.hasNext()).isFalse();
866+
assertThat(result).containsExactly(1, 2, 3, 4);
867+
db.abortStreamTransaction(tx.getId());
868+
}
869+
851870
@ParameterizedTest
852871
@MethodSource("dbs")
853872
void queryCursorRetry(ArangoDatabase db) throws IOException {
@@ -865,6 +884,27 @@ void queryCursorRetry(ArangoDatabase db) throws IOException {
865884
assertThat(result).containsExactly(1, 2, 3, 4);
866885
}
867886

887+
@ParameterizedTest
888+
@MethodSource("dbs")
889+
void queryCursorRetryInTx(ArangoDatabase db) throws IOException {
890+
assumeTrue(isAtLeastVersion(3, 11));
891+
StreamTransactionEntity tx = db.beginStreamTransaction(new StreamTransactionOptions());
892+
ArangoCursor<Integer> cursor = db.query("for i in 1..4 return i", Integer.class,
893+
new AqlQueryOptions().batchSize(1).allowRetry(true).streamTransactionId(tx.getId()));
894+
List<Integer> result = new ArrayList<>();
895+
result.add(cursor.next());
896+
result.add(cursor.next());
897+
ArangoCursor<Integer> cursor2 = db.cursor(cursor.getId(), Integer.class, cursor.getNextBatchId(),
898+
new AqlQueryOptions().streamTransactionId(tx.getId())
899+
);
900+
result.add(cursor2.next());
901+
result.add(cursor2.next());
902+
cursor2.close();
903+
assertThat(cursor2.hasNext()).isFalse();
904+
assertThat(result).containsExactly(1, 2, 3, 4);
905+
db.abortStreamTransaction(tx.getId());
906+
}
907+
868908
@ParameterizedTest
869909
@MethodSource("dbs")
870910
void changeQueryTrackingProperties(ArangoDatabase db) {

0 commit comments

Comments
 (0)