Skip to content

[DE-497] Asynchronous AQL cursors #520

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 12 commits into from
Oct 23, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 30 additions & 0 deletions core/src/main/java/com/arangodb/ArangoCursorAsync.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* DISCLAIMER
*
* Copyright 2016 ArangoDB GmbH, Cologne, Germany
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Copyright holder is ArangoDB GmbH, Cologne, Germany
*/

package com.arangodb;

import java.util.concurrent.CompletableFuture;

public interface ArangoCursorAsync<T> extends BaseArangoCursor<T> {

CompletableFuture<ArangoCursorAsync<T>> nextBatch();

CompletableFuture<Void> close();
}
22 changes: 11 additions & 11 deletions core/src/main/java/com/arangodb/ArangoDatabaseAsync.java
Original file line number Diff line number Diff line change
Expand Up @@ -146,17 +146,17 @@ public interface ArangoDatabaseAsync extends ArangoSerdeAccessor {
*/
CompletableFuture<Permissions> getPermissions(String user);

// <T> CompletableFuture<ArangoCursor<T>> query(String query, Class<T> type, Map<String, Object> bindVars, AqlQueryOptions options);
//
// <T> CompletableFuture<ArangoCursor<T>> query(String query, Class<T> type, AqlQueryOptions options);
//
// <T> CompletableFuture<ArangoCursor<T>> query(String query, Class<T> type, Map<String, Object> bindVars);
//
// <T> CompletableFuture<ArangoCursor<T>> query(String query, Class<T> type);
//
// <T> CompletableFuture<ArangoCursor<T>> cursor(String cursorId, Class<T> type);
//
// <T> CompletableFuture<ArangoCursor<T>> cursor(String cursorId, Class<T> type, String nextBatchId);
<T> CompletableFuture<ArangoCursorAsync<T>> query(String query, Class<T> type, Map<String, Object> bindVars, AqlQueryOptions options);

<T> CompletableFuture<ArangoCursorAsync<T>> query(String query, Class<T> type, AqlQueryOptions options);

<T> CompletableFuture<ArangoCursorAsync<T>> query(String query, Class<T> type, Map<String, Object> bindVars);

<T> CompletableFuture<ArangoCursorAsync<T>> query(String query, Class<T> type);

<T> CompletableFuture<ArangoCursorAsync<T>> cursor(String cursorId, Class<T> type);

<T> CompletableFuture<ArangoCursorAsync<T>> cursor(String cursorId, Class<T> type, String nextBatchId);

/**
* Asynchronous version of {@link ArangoDatabase#explainQuery(String, Map, AqlQueryExplainOptions)}
Expand Down
23 changes: 23 additions & 0 deletions core/src/main/java/com/arangodb/BaseArangoCursor.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package com.arangodb;

import com.arangodb.entity.CursorEntity;

import java.util.List;

public interface BaseArangoCursor<T> {
String getId();

Long getCount();

Boolean isCached();

Boolean hasMore();

List<T> getResult();

Boolean isPotentialDirtyRead();

String getNextBatchId();

CursorEntity.Extra getExtra();
}
128 changes: 128 additions & 0 deletions core/src/main/java/com/arangodb/entity/CursorEntity.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
/*
* DISCLAIMER
*
* Copyright 2016 ArangoDB GmbH, Cologne, Germany
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Copyright holder is ArangoDB GmbH, Cologne, Germany
*/

package com.arangodb.entity;

import com.arangodb.internal.serde.UserDataInside;

import java.util.Collection;
import java.util.Collections;
import java.util.List;

/**
* @author Mark Vollmary
* @see <a href="https://www.arangodb.com/docs/stable/http/aql-query-cursor-accessing-cursors.html#create-cursor">API
* Documentation</a>
*/
public final class CursorEntity<T> {
private String id;
private Long count;
private Boolean cached;
private Boolean hasMore;

// TODO: test whether user-serde is used for result elements
@UserDataInside
private List<T> result;
private Boolean potentialDirtyRead;
private String nextBatchId;
private final Extra extra = new Extra();

public String getId() {
return id;
}

/**
* @return the total number of result documents available (only available if the query was executed with the count
* attribute set)
*/
public Long getCount() {
return count;
}

/**
* @return an optional object with extra information about the query result contained in its stats sub-attribute.
* For data-modification queries, the extra.stats sub-attribute will contain the number of modified
* documents and the number of documents that could not be modified due to an error (if ignoreErrors query
* option is specified)
*/
public Extra getExtra() {
return extra;
}

/**
* @return a boolean flag indicating whether the query result was served from the query cache or not. If the query
* result is served from the query cache, the extra return attribute will not contain any stats
* sub-attribute and no profile sub-attribute.
*/
public Boolean getCached() {
return cached;
}

/**
* @return A boolean indicator whether there are more results available for the cursor on the server
*/
public Boolean getHasMore() {
return hasMore;
}

/**
* @return a list of result documents (might be empty if query has no results)
*/
public List<T> getResult() {
return result;
}

/**
* @return true if the result is a potential dirty read
* @since ArangoDB 3.10
*/
public Boolean isPotentialDirtyRead() {
return potentialDirtyRead;
}

public void setPotentialDirtyRead(final Boolean potentialDirtyRead) {
this.potentialDirtyRead = potentialDirtyRead;
}

/**
* @return The ID of the batch after the current one. The first batch has an ID of 1 and the value is incremented by
* 1 with every batch. Only set if the allowRetry query option is enabled.
* @since ArangoDB 3.11
*/
public String getNextBatchId() {
return nextBatchId;
}

public static final class Extra {
private final Collection<CursorWarning> warnings = Collections.emptyList();
private CursorStats stats;

public CursorStats getStats() {
return stats;
}

public Collection<CursorWarning> getWarnings() {
return warnings;
}

}

}

107 changes: 41 additions & 66 deletions core/src/main/java/com/arangodb/internal/ArangoDatabaseAsyncImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import com.arangodb.*;
import com.arangodb.entity.*;
import com.arangodb.entity.arangosearch.analyzer.SearchAnalyzer;
import com.arangodb.internal.cursor.ArangoCursorAsyncImpl;
import com.arangodb.internal.net.HostHandle;
import com.arangodb.internal.util.DocumentUtil;
import com.arangodb.model.*;
import com.arangodb.model.arangosearch.AnalyzerDeleteOptions;
Expand Down Expand Up @@ -168,72 +170,45 @@ public CompletableFuture<Permissions> getPermissions(final String user) {
return executorAsync().execute(getPermissionsRequest(user), getPermissionsResponseDeserialzer());
}

// @Override
// public <T> ArangoCursor<T> query(
// final String query, final Class<T> type, final Map<String, Object> bindVars, final AqlQueryOptions options) {
// final InternalRequest request = queryRequest(query, bindVars, options);
// final HostHandle hostHandle = new HostHandle();
// final InternalCursorEntity result = executorAsync().execute(request, internalCursorEntityDeserializer(), hostHandle);
// return createCursor(result, type, options, hostHandle);
// }
//
// @Override
// public <T> ArangoCursor<T> query(final String query, final Class<T> type, final Map<String, Object> bindVars) {
// return query(query, type, bindVars, new AqlQueryOptions());
// }
//
// @Override
// public <T> ArangoCursor<T> query(final String query, final Class<T> type, final AqlQueryOptions options) {
// return query(query, type, null, options);
// }
//
// @Override
// public <T> ArangoCursor<T> query(final String query, final Class<T> type) {
// return query(query, type, null, new AqlQueryOptions());
// }
//
// @Override
// public <T> ArangoCursor<T> cursor(final String cursorId, final Class<T> type) {
// final HostHandle hostHandle = new HostHandle();
// final InternalCursorEntity result = executorAsync().execute(
// queryNextRequest(cursorId, null),
// internalCursorEntityDeserializer(),
// hostHandle);
// return createCursor(result, type, null, hostHandle);
// }
//
// @Override
// public <T> ArangoCursor<T> cursor(final String cursorId, final Class<T> type, final String nextBatchId) {
// final HostHandle hostHandle = new HostHandle();
// final InternalCursorEntity result = executorAsync().execute(
// queryNextByBatchIdRequest(cursorId, nextBatchId, null),
// internalCursorEntityDeserializer(),
// hostHandle);
// return createCursor(result, type, null, hostHandle);
// }
//
// private <T> ArangoCursor<T> createCursor(
// final InternalCursorEntity result,
// final Class<T> type,
// final AqlQueryOptions options,
// final HostHandle hostHandle) {
//
// final ArangoCursorExecute execute = new ArangoCursorExecute() {
// @Override
// public InternalCursorEntity next(final String id, final String nextBatchId) {
// InternalRequest request = nextBatchId == null ?
// queryNextRequest(id, options) : queryNextByBatchIdRequest(id, nextBatchId, options);
// return executorAsync().execute(request, internalCursorEntityDeserializer(), hostHandle);
// }
//
// @Override
// public void close(final String id) {
// executorAsync().execute(queryCloseRequest(id, options), Void.class, hostHandle);
// }
// };
//
// return new ArangoCursorImpl<>(this, execute, type, result);
// }
@Override
public <T> CompletableFuture<ArangoCursorAsync<T>> query(
final String query, final Class<T> type, final Map<String, Object> bindVars, final AqlQueryOptions options) {
final InternalRequest request = queryRequest(query, bindVars, options);
final HostHandle hostHandle = new HostHandle();
return executorAsync().execute(request, cursorEntityDeserializer(type), hostHandle)
.thenApply(res -> new ArangoCursorAsyncImpl<>(this, res, type, hostHandle, options.getAllowRetry()));
}

@Override
public <T> CompletableFuture<ArangoCursorAsync<T>> query(String query, Class<T> type, AqlQueryOptions options) {
return query(query, type, null, options);
}

@Override
public <T> CompletableFuture<ArangoCursorAsync<T>> query(String query, Class<T> type, Map<String, Object> bindVars) {
return query(query, type, bindVars, new AqlQueryOptions());
}

@Override
public <T> CompletableFuture<ArangoCursorAsync<T>> query(String query, Class<T> type) {
return query(query, type, null, new AqlQueryOptions());
}

@Override
public <T> CompletableFuture<ArangoCursorAsync<T>> cursor(final String cursorId, final Class<T> type) {
return cursor(cursorId, type, null);
}

@Override
public <T> CompletableFuture<ArangoCursorAsync<T>> cursor(final String cursorId, final Class<T> type, final String nextBatchId) {
final HostHandle hostHandle = new HostHandle();
return executorAsync()
.execute(
queryNextRequest(cursorId, new AqlQueryOptions(), nextBatchId),
cursorEntityDeserializer(type),
hostHandle)
.thenApply(res -> new ArangoCursorAsyncImpl<>(this, res, type, hostHandle, nextBatchId != null));
}

@Override
public CompletableFuture<AqlExecutionExplainEntity> explainQuery(
Expand Down
13 changes: 3 additions & 10 deletions core/src/main/java/com/arangodb/internal/ArangoDatabaseImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -189,19 +189,14 @@ public <T> ArangoCursor<T> query(final String query, final Class<T> type) {

@Override
public <T> ArangoCursor<T> cursor(final String cursorId, final Class<T> type) {
final HostHandle hostHandle = new HostHandle();
final InternalCursorEntity result = executorSync().execute(
queryNextRequest(cursorId, null),
internalCursorEntityDeserializer(),
hostHandle);
return createCursor(result, type, null, hostHandle);
return cursor(cursorId, type, null);
}

@Override
public <T> ArangoCursor<T> cursor(final String cursorId, final Class<T> type, final String nextBatchId) {
final HostHandle hostHandle = new HostHandle();
final InternalCursorEntity result = executorSync().execute(
queryNextByBatchIdRequest(cursorId, nextBatchId, null),
queryNextRequest(cursorId, new AqlQueryOptions(), nextBatchId),
internalCursorEntityDeserializer(),
hostHandle);
return createCursor(result, type, null, hostHandle);
Expand All @@ -216,9 +211,7 @@ private <T> ArangoCursor<T> createCursor(
final ArangoCursorExecute execute = new ArangoCursorExecute() {
@Override
public InternalCursorEntity next(final String id, final String nextBatchId) {
InternalRequest request = nextBatchId == null ?
queryNextRequest(id, options) : queryNextByBatchIdRequest(id, nextBatchId, options);
return executorSync().execute(request, internalCursorEntityDeserializer(), hostHandle);
return executorSync().execute(queryNextRequest(id, options, nextBatchId), internalCursorEntityDeserializer(), hostHandle);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ private ArangoExecuteable(final ArangoExecutorSync executorSync,
protected static String createPath(final String... params) {
final StringBuilder sb = new StringBuilder();
for (int i = 0; i < params.length; i++) {
if (params[i] == null) continue;
if (i > 0) {
sb.append(SLASH);
}
Expand Down
Loading