Skip to content

[DE-511] Retriable cursor #505

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 8 commits into from
May 26, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions core/src/main/java/com/arangodb/ArangoCursor.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,12 @@

import com.arangodb.entity.CursorStats;
import com.arangodb.entity.CursorWarning;
import com.arangodb.model.AqlQueryOptions;

import java.io.Closeable;
import java.util.Collection;
import java.util.List;
import java.util.NoSuchElementException;

/**
* @author Mark Vollmary
Expand Down Expand Up @@ -76,4 +78,27 @@ public interface ArangoCursor<T> extends ArangoIterable<T>, ArangoIterator<T>, C
*/
boolean isPotentialDirtyRead();

/**
* @return The ID of the batch after the current one. The first batch has an ID of 1 and the value is incremented by
* 1 with every batch. Only set if the allowRetry query option is enabled.
* @since ArangoDB 3.11
*/
String getNextBatchId();

/**
* Returns the next element in the iteration.
* <p/>
* If the cursor allows retries (see {@link AqlQueryOptions#allowRetry(Boolean)}), then it is safe to retry invoking
* this method in case of I/O exceptions (which are actually thrown as {@link com.arangodb.ArangoDBException} with
* cause {@link java.io.IOException}).
* <p/>
* If the cursor does not allow retries (default), then it is not safe to retry invoking this method in case of I/O
* exceptions, since the request to fetch the next batch is not idempotent (i.e. the cursor may advance multiple
* times on the server).
*
* @return the next element in the iteration
* @throws NoSuchElementException if the iteration has no more elements
*/
@Override
T next();
}
14 changes: 14 additions & 0 deletions core/src/main/java/com/arangodb/ArangoDatabase.java
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,20 @@ public interface ArangoDatabase extends ArangoSerdeAccessor {
*/
<T> ArangoCursor<T> cursor(String cursorId, Class<T> type);

/**
* Return an cursor from the given cursor-ID if still existing
*
* @param cursorId The ID of the cursor
* @param type The type of the result (POJO or {@link com.arangodb.util.RawData})
* @param nextBatchId The ID of the next cursor batch (set only if cursor allows retries, see
* {@link AqlQueryOptions#allowRetry(Boolean)}
* @return cursor of the results
* @see <a href= "https://www.arangodb.com/docs/stable/http/aql-query-cursor-accessing-cursors
* .html#read-next-batch-from-cursor">API Documentation</a>
* @since ArangoDB 3.11
*/
<T> ArangoCursor<T> cursor(String cursorId, Class<T> type, String nextBatchId);

/**
* Explain an AQL query and return information about it
*
Expand Down
14 changes: 0 additions & 14 deletions core/src/main/java/com/arangodb/entity/MetaAware.java

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,14 @@

import com.arangodb.internal.cursor.entity.InternalCursorEntity;

import java.util.Map;

/**
* @author Mark Vollmary
*/
public interface ArangoCursorExecute {

InternalCursorEntity next(String id, Map<String, String> meta);
InternalCursorEntity next(String id, String nextBatchId);

void close(String id, Map<String, String> meta);
void close(String id);

}
28 changes: 21 additions & 7 deletions core/src/main/java/com/arangodb/internal/ArangoDatabaseImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ public <T> ArangoCursor<T> query(
final String query, final Class<T> type, final Map<String, Object> bindVars, final AqlQueryOptions options) {
final InternalRequest request = queryRequest(query, bindVars, options);
final HostHandle hostHandle = new HostHandle();
final InternalCursorEntity result = executor.execute(request, InternalCursorEntity.class, hostHandle);
final InternalCursorEntity result = executor.execute(request, internalCursorEntityDeserializer(), hostHandle);
return createCursor(result, type, options, hostHandle);
}

Expand All @@ -183,8 +183,20 @@ public <T> ArangoCursor<T> query(final String query, final Class<T> type) {
@Override
public <T> ArangoCursor<T> cursor(final String cursorId, final Class<T> type) {
final HostHandle hostHandle = new HostHandle();
final InternalCursorEntity result = executor
.execute(queryNextRequest(cursorId, null, null), InternalCursorEntity.class, hostHandle);
final InternalCursorEntity result = executor.execute(
queryNextRequest(cursorId, null),
internalCursorEntityDeserializer(),
hostHandle);
return createCursor(result, type, null, hostHandle);
}

@Override
public <T> ArangoCursor<T> cursor(final String cursorId, final Class<T> type, final String nextBatchId) {
final HostHandle hostHandle = new HostHandle();
final InternalCursorEntity result = executor.execute(
queryNextByBatchIdRequest(cursorId, nextBatchId, null),
internalCursorEntityDeserializer(),
hostHandle);
return createCursor(result, type, null, hostHandle);
}

Expand All @@ -196,13 +208,15 @@ private <T> ArangoCursor<T> createCursor(

final ArangoCursorExecute execute = new ArangoCursorExecute() {
@Override
public InternalCursorEntity next(final String id, Map<String, String> meta) {
return executor.execute(queryNextRequest(id, options, meta), InternalCursorEntity.class, hostHandle);
public InternalCursorEntity next(final String id, final String nextBatchId) {
InternalRequest request = nextBatchId == null ?
queryNextRequest(id, options) : queryNextByBatchIdRequest(id, nextBatchId, options);
return executor.execute(request, internalCursorEntityDeserializer(), hostHandle);
}

@Override
public void close(final String id, Map<String, String> meta) {
executor.execute(queryCloseRequest(id, options, meta), Void.class, hostHandle);
public void close(final String id) {
executor.execute(queryCloseRequest(id, options), Void.class, hostHandle);
}
};

Expand Down
14 changes: 1 addition & 13 deletions core/src/main/java/com/arangodb/internal/ArangoExecutorSync.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,9 @@
package com.arangodb.internal;

import com.arangodb.ArangoDBException;
import com.arangodb.entity.MetaAware;
import com.arangodb.internal.config.ArangoConfig;
import com.arangodb.internal.net.CommunicationProtocol;
import com.arangodb.internal.net.HostHandle;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.lang.reflect.Type;
Expand All @@ -36,8 +33,6 @@
*/
public class ArangoExecutorSync extends ArangoExecutor {

private static final Logger LOG = LoggerFactory.getLogger(ArangoExecutorSync.class);

private final CommunicationProtocol protocol;

public ArangoExecutorSync(final CommunicationProtocol protocol, final ArangoConfig config) {
Expand All @@ -64,14 +59,7 @@ public <T> T execute(

final InternalResponse response = protocol.execute(interceptRequest(request), hostHandle);
interceptResponse(response);
T deserialize = responseDeserializer.deserialize(response);

if (deserialize instanceof MetaAware) {
LOG.debug("Response is MetaAware {}", deserialize.getClass().getName());
((MetaAware) deserialize).setMeta(response.getMeta());
}

return deserialize;
return responseDeserializer.deserialize(response);
}

public void disconnect() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.arangodb.entity.*;
import com.arangodb.entity.arangosearch.analyzer.SearchAnalyzer;
import com.arangodb.internal.ArangoExecutor.ResponseDeserializer;
import com.arangodb.internal.cursor.entity.InternalCursorEntity;
import com.arangodb.internal.util.RequestUtils;
import com.arangodb.model.*;
import com.arangodb.model.arangosearch.*;
Expand Down Expand Up @@ -156,27 +157,30 @@ protected InternalRequest queryRequest(final String query, final Map<String, Obj
return request;
}

protected InternalRequest queryNextRequest(final String id, final AqlQueryOptions options, Map<String, String> meta) {

protected InternalRequest queryNextRequest(final String id, final AqlQueryOptions options) {
final InternalRequest request = request(name, RequestType.POST, PATH_API_CURSOR, id);
request.putHeaderParams(meta);
return completeQueryNextRequest(request, options);
}

final AqlQueryOptions opt = options != null ? options : new AqlQueryOptions();
protected InternalRequest queryNextByBatchIdRequest(final String id,
final String nextBatchId,
final AqlQueryOptions options) {
final InternalRequest request = request(name, RequestType.POST, PATH_API_CURSOR, id, nextBatchId);
return completeQueryNextRequest(request, options);
}

private InternalRequest completeQueryNextRequest(final InternalRequest request, final AqlQueryOptions options) {
final AqlQueryOptions opt = options != null ? options : new AqlQueryOptions();
if (Boolean.TRUE.equals(opt.getAllowDirtyRead())) {
RequestUtils.allowDirtyRead(request);
}
request.putHeaderParam(TRANSACTION_ID, opt.getStreamTransactionId());
return request;
}

protected InternalRequest queryCloseRequest(final String id, final AqlQueryOptions options, Map<String, String> meta) {

protected InternalRequest queryCloseRequest(final String id, final AqlQueryOptions options) {
final InternalRequest request = request(name, RequestType.DELETE, PATH_API_CURSOR, id);
request.putHeaderParams(meta);

final AqlQueryOptions opt = options != null ? options : new AqlQueryOptions();

if (Boolean.TRUE.equals(opt.getAllowDirtyRead())) {
RequestUtils.allowDirtyRead(request);
}
Expand Down Expand Up @@ -243,6 +247,15 @@ protected InternalRequest deleteAqlFunctionRequest(final String name, final AqlF
return request;
}

protected ResponseDeserializer<InternalCursorEntity> internalCursorEntityDeserializer() {
return response -> {
InternalCursorEntity e = getSerde().deserialize(response.getBody(), InternalCursorEntity.class);
boolean potentialDirtyRead = Boolean.parseBoolean(response.getMeta("X-Arango-Potential-Dirty-Read"));
e.setPontentialDirtyRead(potentialDirtyRead);
return e;
};
}

protected ResponseDeserializer<Integer> deleteAqlFunctionResponseDeserializer() {
return response -> getSerde().deserialize(response.getBody(), "/deletedCount", Integer.class);
}
Expand Down

This file was deleted.

Loading