Skip to content

Commit aae855c

Browse files
committed
[DE-497] Asynchronous AQL cursors (#520)
* ArangoCursorAsync (wip) * ArangoDatabaseAsyncTest * async tests with query * ArangoCursorAsync#close() * ArangoCursorAsync.nextBatch() * test fixes * ArangoDatabaseAsync.cursor() * async tests parity * tests fixes * dbg tests failures * test fixes * test fixes
1 parent 76fcaf8 commit aae855c

21 files changed

+1679
-799
lines changed
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
/*
2+
* DISCLAIMER
3+
*
4+
* Copyright 2016 ArangoDB GmbH, Cologne, Germany
5+
*
6+
* Licensed under the Apache License, Version 2.0 (the "License");
7+
* you may not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*
18+
* Copyright holder is ArangoDB GmbH, Cologne, Germany
19+
*/
20+
21+
package com.arangodb;
22+
23+
import java.util.concurrent.CompletableFuture;
24+
25+
public interface ArangoCursorAsync<T> extends BaseArangoCursor<T> {
26+
27+
CompletableFuture<ArangoCursorAsync<T>> nextBatch();
28+
29+
CompletableFuture<Void> close();
30+
}

core/src/main/java/com/arangodb/ArangoDatabaseAsync.java

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -146,17 +146,17 @@ public interface ArangoDatabaseAsync extends ArangoSerdeAccessor {
146146
*/
147147
CompletableFuture<Permissions> getPermissions(String user);
148148

149-
// <T> CompletableFuture<ArangoCursor<T>> query(String query, Class<T> type, Map<String, Object> bindVars, AqlQueryOptions options);
150-
//
151-
// <T> CompletableFuture<ArangoCursor<T>> query(String query, Class<T> type, AqlQueryOptions options);
152-
//
153-
// <T> CompletableFuture<ArangoCursor<T>> query(String query, Class<T> type, Map<String, Object> bindVars);
154-
//
155-
// <T> CompletableFuture<ArangoCursor<T>> query(String query, Class<T> type);
156-
//
157-
// <T> CompletableFuture<ArangoCursor<T>> cursor(String cursorId, Class<T> type);
158-
//
159-
// <T> CompletableFuture<ArangoCursor<T>> cursor(String cursorId, Class<T> type, String nextBatchId);
149+
<T> CompletableFuture<ArangoCursorAsync<T>> query(String query, Class<T> type, Map<String, Object> bindVars, AqlQueryOptions options);
150+
151+
<T> CompletableFuture<ArangoCursorAsync<T>> query(String query, Class<T> type, AqlQueryOptions options);
152+
153+
<T> CompletableFuture<ArangoCursorAsync<T>> query(String query, Class<T> type, Map<String, Object> bindVars);
154+
155+
<T> CompletableFuture<ArangoCursorAsync<T>> query(String query, Class<T> type);
156+
157+
<T> CompletableFuture<ArangoCursorAsync<T>> cursor(String cursorId, Class<T> type);
158+
159+
<T> CompletableFuture<ArangoCursorAsync<T>> cursor(String cursorId, Class<T> type, String nextBatchId);
160160

161161
/**
162162
* Asynchronous version of {@link ArangoDatabase#explainQuery(String, Map, AqlQueryExplainOptions)}
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
package com.arangodb;
2+
3+
import com.arangodb.entity.CursorEntity;
4+
5+
import java.util.List;
6+
7+
public interface BaseArangoCursor<T> {
8+
String getId();
9+
10+
Long getCount();
11+
12+
Boolean isCached();
13+
14+
Boolean hasMore();
15+
16+
List<T> getResult();
17+
18+
Boolean isPotentialDirtyRead();
19+
20+
String getNextBatchId();
21+
22+
CursorEntity.Extra getExtra();
23+
}
Lines changed: 128 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,128 @@
1+
/*
2+
* DISCLAIMER
3+
*
4+
* Copyright 2016 ArangoDB GmbH, Cologne, Germany
5+
*
6+
* Licensed under the Apache License, Version 2.0 (the "License");
7+
* you may not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*
18+
* Copyright holder is ArangoDB GmbH, Cologne, Germany
19+
*/
20+
21+
package com.arangodb.entity;
22+
23+
import com.arangodb.internal.serde.UserDataInside;
24+
25+
import java.util.Collection;
26+
import java.util.Collections;
27+
import java.util.List;
28+
29+
/**
30+
* @author Mark Vollmary
31+
* @see <a href="https://www.arangodb.com/docs/stable/http/aql-query-cursor-accessing-cursors.html#create-cursor">API
32+
* Documentation</a>
33+
*/
34+
public final class CursorEntity<T> {
35+
private String id;
36+
private Long count;
37+
private Boolean cached;
38+
private Boolean hasMore;
39+
40+
// TODO: test whether user-serde is used for result elements
41+
@UserDataInside
42+
private List<T> result;
43+
private Boolean potentialDirtyRead;
44+
private String nextBatchId;
45+
private final Extra extra = new Extra();
46+
47+
public String getId() {
48+
return id;
49+
}
50+
51+
/**
52+
* @return the total number of result documents available (only available if the query was executed with the count
53+
* attribute set)
54+
*/
55+
public Long getCount() {
56+
return count;
57+
}
58+
59+
/**
60+
* @return an optional object with extra information about the query result contained in its stats sub-attribute.
61+
* For data-modification queries, the extra.stats sub-attribute will contain the number of modified
62+
* documents and the number of documents that could not be modified due to an error (if ignoreErrors query
63+
* option is specified)
64+
*/
65+
public Extra getExtra() {
66+
return extra;
67+
}
68+
69+
/**
70+
* @return a boolean flag indicating whether the query result was served from the query cache or not. If the query
71+
* result is served from the query cache, the extra return attribute will not contain any stats
72+
* sub-attribute and no profile sub-attribute.
73+
*/
74+
public Boolean getCached() {
75+
return cached;
76+
}
77+
78+
/**
79+
* @return A boolean indicator whether there are more results available for the cursor on the server
80+
*/
81+
public Boolean getHasMore() {
82+
return hasMore;
83+
}
84+
85+
/**
86+
* @return a list of result documents (might be empty if query has no results)
87+
*/
88+
public List<T> getResult() {
89+
return result;
90+
}
91+
92+
/**
93+
* @return true if the result is a potential dirty read
94+
* @since ArangoDB 3.10
95+
*/
96+
public Boolean isPotentialDirtyRead() {
97+
return potentialDirtyRead;
98+
}
99+
100+
public void setPotentialDirtyRead(final Boolean potentialDirtyRead) {
101+
this.potentialDirtyRead = potentialDirtyRead;
102+
}
103+
104+
/**
105+
* @return The ID of the batch after the current one. The first batch has an ID of 1 and the value is incremented by
106+
* 1 with every batch. Only set if the allowRetry query option is enabled.
107+
* @since ArangoDB 3.11
108+
*/
109+
public String getNextBatchId() {
110+
return nextBatchId;
111+
}
112+
113+
public static final class Extra {
114+
private final Collection<CursorWarning> warnings = Collections.emptyList();
115+
private CursorStats stats;
116+
117+
public CursorStats getStats() {
118+
return stats;
119+
}
120+
121+
public Collection<CursorWarning> getWarnings() {
122+
return warnings;
123+
}
124+
125+
}
126+
127+
}
128+

core/src/main/java/com/arangodb/internal/ArangoDatabaseAsyncImpl.java

Lines changed: 41 additions & 66 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@
2323
import com.arangodb.*;
2424
import com.arangodb.entity.*;
2525
import com.arangodb.entity.arangosearch.analyzer.SearchAnalyzer;
26+
import com.arangodb.internal.cursor.ArangoCursorAsyncImpl;
27+
import com.arangodb.internal.net.HostHandle;
2628
import com.arangodb.internal.util.DocumentUtil;
2729
import com.arangodb.model.*;
2830
import com.arangodb.model.arangosearch.AnalyzerDeleteOptions;
@@ -168,72 +170,45 @@ public CompletableFuture<Permissions> getPermissions(final String user) {
168170
return executorAsync().execute(getPermissionsRequest(user), getPermissionsResponseDeserialzer());
169171
}
170172

171-
// @Override
172-
// public <T> ArangoCursor<T> query(
173-
// final String query, final Class<T> type, final Map<String, Object> bindVars, final AqlQueryOptions options) {
174-
// final InternalRequest request = queryRequest(query, bindVars, options);
175-
// final HostHandle hostHandle = new HostHandle();
176-
// final InternalCursorEntity result = executorAsync().execute(request, internalCursorEntityDeserializer(), hostHandle);
177-
// return createCursor(result, type, options, hostHandle);
178-
// }
179-
//
180-
// @Override
181-
// public <T> ArangoCursor<T> query(final String query, final Class<T> type, final Map<String, Object> bindVars) {
182-
// return query(query, type, bindVars, new AqlQueryOptions());
183-
// }
184-
//
185-
// @Override
186-
// public <T> ArangoCursor<T> query(final String query, final Class<T> type, final AqlQueryOptions options) {
187-
// return query(query, type, null, options);
188-
// }
189-
//
190-
// @Override
191-
// public <T> ArangoCursor<T> query(final String query, final Class<T> type) {
192-
// return query(query, type, null, new AqlQueryOptions());
193-
// }
194-
//
195-
// @Override
196-
// public <T> ArangoCursor<T> cursor(final String cursorId, final Class<T> type) {
197-
// final HostHandle hostHandle = new HostHandle();
198-
// final InternalCursorEntity result = executorAsync().execute(
199-
// queryNextRequest(cursorId, null),
200-
// internalCursorEntityDeserializer(),
201-
// hostHandle);
202-
// return createCursor(result, type, null, hostHandle);
203-
// }
204-
//
205-
// @Override
206-
// public <T> ArangoCursor<T> cursor(final String cursorId, final Class<T> type, final String nextBatchId) {
207-
// final HostHandle hostHandle = new HostHandle();
208-
// final InternalCursorEntity result = executorAsync().execute(
209-
// queryNextByBatchIdRequest(cursorId, nextBatchId, null),
210-
// internalCursorEntityDeserializer(),
211-
// hostHandle);
212-
// return createCursor(result, type, null, hostHandle);
213-
// }
214-
//
215-
// private <T> ArangoCursor<T> createCursor(
216-
// final InternalCursorEntity result,
217-
// final Class<T> type,
218-
// final AqlQueryOptions options,
219-
// final HostHandle hostHandle) {
220-
//
221-
// final ArangoCursorExecute execute = new ArangoCursorExecute() {
222-
// @Override
223-
// public InternalCursorEntity next(final String id, final String nextBatchId) {
224-
// InternalRequest request = nextBatchId == null ?
225-
// queryNextRequest(id, options) : queryNextByBatchIdRequest(id, nextBatchId, options);
226-
// return executorAsync().execute(request, internalCursorEntityDeserializer(), hostHandle);
227-
// }
228-
//
229-
// @Override
230-
// public void close(final String id) {
231-
// executorAsync().execute(queryCloseRequest(id, options), Void.class, hostHandle);
232-
// }
233-
// };
234-
//
235-
// return new ArangoCursorImpl<>(this, execute, type, result);
236-
// }
173+
@Override
174+
public <T> CompletableFuture<ArangoCursorAsync<T>> query(
175+
final String query, final Class<T> type, final Map<String, Object> bindVars, final AqlQueryOptions options) {
176+
final InternalRequest request = queryRequest(query, bindVars, options);
177+
final HostHandle hostHandle = new HostHandle();
178+
return executorAsync().execute(request, cursorEntityDeserializer(type), hostHandle)
179+
.thenApply(res -> new ArangoCursorAsyncImpl<>(this, res, type, hostHandle, options.getAllowRetry()));
180+
}
181+
182+
@Override
183+
public <T> CompletableFuture<ArangoCursorAsync<T>> query(String query, Class<T> type, AqlQueryOptions options) {
184+
return query(query, type, null, options);
185+
}
186+
187+
@Override
188+
public <T> CompletableFuture<ArangoCursorAsync<T>> query(String query, Class<T> type, Map<String, Object> bindVars) {
189+
return query(query, type, bindVars, new AqlQueryOptions());
190+
}
191+
192+
@Override
193+
public <T> CompletableFuture<ArangoCursorAsync<T>> query(String query, Class<T> type) {
194+
return query(query, type, null, new AqlQueryOptions());
195+
}
196+
197+
@Override
198+
public <T> CompletableFuture<ArangoCursorAsync<T>> cursor(final String cursorId, final Class<T> type) {
199+
return cursor(cursorId, type, null);
200+
}
201+
202+
@Override
203+
public <T> CompletableFuture<ArangoCursorAsync<T>> cursor(final String cursorId, final Class<T> type, final String nextBatchId) {
204+
final HostHandle hostHandle = new HostHandle();
205+
return executorAsync()
206+
.execute(
207+
queryNextRequest(cursorId, new AqlQueryOptions(), nextBatchId),
208+
cursorEntityDeserializer(type),
209+
hostHandle)
210+
.thenApply(res -> new ArangoCursorAsyncImpl<>(this, res, type, hostHandle, nextBatchId != null));
211+
}
237212

238213
@Override
239214
public CompletableFuture<AqlExecutionExplainEntity> explainQuery(

core/src/main/java/com/arangodb/internal/ArangoDatabaseImpl.java

Lines changed: 3 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -189,19 +189,14 @@ public <T> ArangoCursor<T> query(final String query, final Class<T> type) {
189189

190190
@Override
191191
public <T> ArangoCursor<T> cursor(final String cursorId, final Class<T> type) {
192-
final HostHandle hostHandle = new HostHandle();
193-
final InternalCursorEntity result = executorSync().execute(
194-
queryNextRequest(cursorId, null),
195-
internalCursorEntityDeserializer(),
196-
hostHandle);
197-
return createCursor(result, type, null, hostHandle);
192+
return cursor(cursorId, type, null);
198193
}
199194

200195
@Override
201196
public <T> ArangoCursor<T> cursor(final String cursorId, final Class<T> type, final String nextBatchId) {
202197
final HostHandle hostHandle = new HostHandle();
203198
final InternalCursorEntity result = executorSync().execute(
204-
queryNextByBatchIdRequest(cursorId, nextBatchId, null),
199+
queryNextRequest(cursorId, new AqlQueryOptions(), nextBatchId),
205200
internalCursorEntityDeserializer(),
206201
hostHandle);
207202
return createCursor(result, type, null, hostHandle);
@@ -216,9 +211,7 @@ private <T> ArangoCursor<T> createCursor(
216211
final ArangoCursorExecute execute = new ArangoCursorExecute() {
217212
@Override
218213
public InternalCursorEntity next(final String id, final String nextBatchId) {
219-
InternalRequest request = nextBatchId == null ?
220-
queryNextRequest(id, options) : queryNextByBatchIdRequest(id, nextBatchId, options);
221-
return executorSync().execute(request, internalCursorEntityDeserializer(), hostHandle);
214+
return executorSync().execute(queryNextRequest(id, options, nextBatchId), internalCursorEntityDeserializer(), hostHandle);
222215
}
223216

224217
@Override

core/src/main/java/com/arangodb/internal/ArangoExecuteable.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@ private ArangoExecuteable(final ArangoExecutorSync executorSync,
5959
protected static String createPath(final String... params) {
6060
final StringBuilder sb = new StringBuilder();
6161
for (int i = 0; i < params.length; i++) {
62+
if (params[i] == null) continue;
6263
if (i > 0) {
6364
sb.append(SLASH);
6465
}

0 commit comments

Comments
 (0)