Skip to content

Commit 8f68f63

Browse files
authored
[DE-511] Retriable cursor (#505)
* removed MetaAware * retriable cursor * tests * resilience test * test fix * resume cursor * doc * doc
1 parent 1eb2902 commit 8f68f63

File tree

16 files changed

+352
-266
lines changed

16 files changed

+352
-266
lines changed

Diff for: core/src/main/java/com/arangodb/ArangoCursor.java

+25
Original file line numberDiff line numberDiff line change
@@ -22,10 +22,12 @@
2222

2323
import com.arangodb.entity.CursorStats;
2424
import com.arangodb.entity.CursorWarning;
25+
import com.arangodb.model.AqlQueryOptions;
2526

2627
import java.io.Closeable;
2728
import java.util.Collection;
2829
import java.util.List;
30+
import java.util.NoSuchElementException;
2931

3032
/**
3133
* @author Mark Vollmary
@@ -76,4 +78,27 @@ public interface ArangoCursor<T> extends ArangoIterable<T>, ArangoIterator<T>, C
7678
*/
7779
boolean isPotentialDirtyRead();
7880

81+
/**
82+
* @return The ID of the batch after the current one. The first batch has an ID of 1 and the value is incremented by
83+
* 1 with every batch. Only set if the allowRetry query option is enabled.
84+
* @since ArangoDB 3.11
85+
*/
86+
String getNextBatchId();
87+
88+
/**
89+
* Returns the next element in the iteration.
90+
* <p/>
91+
* If the cursor allows retries (see {@link AqlQueryOptions#allowRetry(Boolean)}), then it is safe to retry invoking
92+
* this method in case of I/O exceptions (which are actually thrown as {@link com.arangodb.ArangoDBException} with
93+
* cause {@link java.io.IOException}).
94+
* <p/>
95+
* If the cursor does not allow retries (default), then it is not safe to retry invoking this method in case of I/O
96+
* exceptions, since the request to fetch the next batch is not idempotent (i.e. the cursor may advance multiple
97+
* times on the server).
98+
*
99+
* @return the next element in the iteration
100+
* @throws NoSuchElementException if the iteration has no more elements
101+
*/
102+
@Override
103+
T next();
79104
}

Diff for: core/src/main/java/com/arangodb/ArangoDatabase.java

+14
Original file line numberDiff line numberDiff line change
@@ -310,6 +310,20 @@ public interface ArangoDatabase extends ArangoSerdeAccessor {
310310
*/
311311
<T> ArangoCursor<T> cursor(String cursorId, Class<T> type);
312312

313+
/**
314+
* Return an cursor from the given cursor-ID if still existing
315+
*
316+
* @param cursorId The ID of the cursor
317+
* @param type The type of the result (POJO or {@link com.arangodb.util.RawData})
318+
* @param nextBatchId The ID of the next cursor batch (set only if cursor allows retries, see
319+
* {@link AqlQueryOptions#allowRetry(Boolean)}
320+
* @return cursor of the results
321+
* @see <a href= "https://www.arangodb.com/docs/stable/http/aql-query-cursor-accessing-cursors
322+
* .html#read-next-batch-from-cursor">API Documentation</a>
323+
* @since ArangoDB 3.11
324+
*/
325+
<T> ArangoCursor<T> cursor(String cursorId, Class<T> type, String nextBatchId);
326+
313327
/**
314328
* Explain an AQL query and return information about it
315329
*

Diff for: core/src/main/java/com/arangodb/entity/MetaAware.java

-14
This file was deleted.

Diff for: core/src/main/java/com/arangodb/internal/ArangoCursorExecute.java

+2-3
Original file line numberDiff line numberDiff line change
@@ -22,15 +22,14 @@
2222

2323
import com.arangodb.internal.cursor.entity.InternalCursorEntity;
2424

25-
import java.util.Map;
2625

2726
/**
2827
* @author Mark Vollmary
2928
*/
3029
public interface ArangoCursorExecute {
3130

32-
InternalCursorEntity next(String id, Map<String, String> meta);
31+
InternalCursorEntity next(String id, String nextBatchId);
3332

34-
void close(String id, Map<String, String> meta);
33+
void close(String id);
3534

3635
}

Diff for: core/src/main/java/com/arangodb/internal/ArangoDatabaseImpl.java

+21-7
Original file line numberDiff line numberDiff line change
@@ -161,7 +161,7 @@ public <T> ArangoCursor<T> query(
161161
final String query, final Class<T> type, final Map<String, Object> bindVars, final AqlQueryOptions options) {
162162
final InternalRequest request = queryRequest(query, bindVars, options);
163163
final HostHandle hostHandle = new HostHandle();
164-
final InternalCursorEntity result = executor.execute(request, InternalCursorEntity.class, hostHandle);
164+
final InternalCursorEntity result = executor.execute(request, internalCursorEntityDeserializer(), hostHandle);
165165
return createCursor(result, type, options, hostHandle);
166166
}
167167

@@ -183,8 +183,20 @@ public <T> ArangoCursor<T> query(final String query, final Class<T> type) {
183183
@Override
184184
public <T> ArangoCursor<T> cursor(final String cursorId, final Class<T> type) {
185185
final HostHandle hostHandle = new HostHandle();
186-
final InternalCursorEntity result = executor
187-
.execute(queryNextRequest(cursorId, null, null), InternalCursorEntity.class, hostHandle);
186+
final InternalCursorEntity result = executor.execute(
187+
queryNextRequest(cursorId, null),
188+
internalCursorEntityDeserializer(),
189+
hostHandle);
190+
return createCursor(result, type, null, hostHandle);
191+
}
192+
193+
@Override
194+
public <T> ArangoCursor<T> cursor(final String cursorId, final Class<T> type, final String nextBatchId) {
195+
final HostHandle hostHandle = new HostHandle();
196+
final InternalCursorEntity result = executor.execute(
197+
queryNextByBatchIdRequest(cursorId, nextBatchId, null),
198+
internalCursorEntityDeserializer(),
199+
hostHandle);
188200
return createCursor(result, type, null, hostHandle);
189201
}
190202

@@ -196,13 +208,15 @@ private <T> ArangoCursor<T> createCursor(
196208

197209
final ArangoCursorExecute execute = new ArangoCursorExecute() {
198210
@Override
199-
public InternalCursorEntity next(final String id, Map<String, String> meta) {
200-
return executor.execute(queryNextRequest(id, options, meta), InternalCursorEntity.class, hostHandle);
211+
public InternalCursorEntity next(final String id, final String nextBatchId) {
212+
InternalRequest request = nextBatchId == null ?
213+
queryNextRequest(id, options) : queryNextByBatchIdRequest(id, nextBatchId, options);
214+
return executor.execute(request, internalCursorEntityDeserializer(), hostHandle);
201215
}
202216

203217
@Override
204-
public void close(final String id, Map<String, String> meta) {
205-
executor.execute(queryCloseRequest(id, options, meta), Void.class, hostHandle);
218+
public void close(final String id) {
219+
executor.execute(queryCloseRequest(id, options), Void.class, hostHandle);
206220
}
207221
};
208222

Diff for: core/src/main/java/com/arangodb/internal/ArangoExecutorSync.java

+1-13
Original file line numberDiff line numberDiff line change
@@ -21,12 +21,9 @@
2121
package com.arangodb.internal;
2222

2323
import com.arangodb.ArangoDBException;
24-
import com.arangodb.entity.MetaAware;
2524
import com.arangodb.internal.config.ArangoConfig;
2625
import com.arangodb.internal.net.CommunicationProtocol;
2726
import com.arangodb.internal.net.HostHandle;
28-
import org.slf4j.Logger;
29-
import org.slf4j.LoggerFactory;
3027

3128
import java.io.IOException;
3229
import java.lang.reflect.Type;
@@ -36,8 +33,6 @@
3633
*/
3734
public class ArangoExecutorSync extends ArangoExecutor {
3835

39-
private static final Logger LOG = LoggerFactory.getLogger(ArangoExecutorSync.class);
40-
4136
private final CommunicationProtocol protocol;
4237

4338
public ArangoExecutorSync(final CommunicationProtocol protocol, final ArangoConfig config) {
@@ -64,14 +59,7 @@ public <T> T execute(
6459

6560
final InternalResponse response = protocol.execute(interceptRequest(request), hostHandle);
6661
interceptResponse(response);
67-
T deserialize = responseDeserializer.deserialize(response);
68-
69-
if (deserialize instanceof MetaAware) {
70-
LOG.debug("Response is MetaAware {}", deserialize.getClass().getName());
71-
((MetaAware) deserialize).setMeta(response.getMeta());
72-
}
73-
74-
return deserialize;
62+
return responseDeserializer.deserialize(response);
7563
}
7664

7765
public void disconnect() {

Diff for: core/src/main/java/com/arangodb/internal/InternalArangoDatabase.java

+22-9
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import com.arangodb.entity.*;
2424
import com.arangodb.entity.arangosearch.analyzer.SearchAnalyzer;
2525
import com.arangodb.internal.ArangoExecutor.ResponseDeserializer;
26+
import com.arangodb.internal.cursor.entity.InternalCursorEntity;
2627
import com.arangodb.internal.util.RequestUtils;
2728
import com.arangodb.model.*;
2829
import com.arangodb.model.arangosearch.*;
@@ -156,27 +157,30 @@ protected InternalRequest queryRequest(final String query, final Map<String, Obj
156157
return request;
157158
}
158159

159-
protected InternalRequest queryNextRequest(final String id, final AqlQueryOptions options, Map<String, String> meta) {
160-
160+
protected InternalRequest queryNextRequest(final String id, final AqlQueryOptions options) {
161161
final InternalRequest request = request(name, RequestType.POST, PATH_API_CURSOR, id);
162-
request.putHeaderParams(meta);
162+
return completeQueryNextRequest(request, options);
163+
}
163164

164-
final AqlQueryOptions opt = options != null ? options : new AqlQueryOptions();
165+
protected InternalRequest queryNextByBatchIdRequest(final String id,
166+
final String nextBatchId,
167+
final AqlQueryOptions options) {
168+
final InternalRequest request = request(name, RequestType.POST, PATH_API_CURSOR, id, nextBatchId);
169+
return completeQueryNextRequest(request, options);
170+
}
165171

172+
private InternalRequest completeQueryNextRequest(final InternalRequest request, final AqlQueryOptions options) {
173+
final AqlQueryOptions opt = options != null ? options : new AqlQueryOptions();
166174
if (Boolean.TRUE.equals(opt.getAllowDirtyRead())) {
167175
RequestUtils.allowDirtyRead(request);
168176
}
169177
request.putHeaderParam(TRANSACTION_ID, opt.getStreamTransactionId());
170178
return request;
171179
}
172180

173-
protected InternalRequest queryCloseRequest(final String id, final AqlQueryOptions options, Map<String, String> meta) {
174-
181+
protected InternalRequest queryCloseRequest(final String id, final AqlQueryOptions options) {
175182
final InternalRequest request = request(name, RequestType.DELETE, PATH_API_CURSOR, id);
176-
request.putHeaderParams(meta);
177-
178183
final AqlQueryOptions opt = options != null ? options : new AqlQueryOptions();
179-
180184
if (Boolean.TRUE.equals(opt.getAllowDirtyRead())) {
181185
RequestUtils.allowDirtyRead(request);
182186
}
@@ -243,6 +247,15 @@ protected InternalRequest deleteAqlFunctionRequest(final String name, final AqlF
243247
return request;
244248
}
245249

250+
protected ResponseDeserializer<InternalCursorEntity> internalCursorEntityDeserializer() {
251+
return response -> {
252+
InternalCursorEntity e = getSerde().deserialize(response.getBody(), InternalCursorEntity.class);
253+
boolean potentialDirtyRead = Boolean.parseBoolean(response.getMeta("X-Arango-Potential-Dirty-Read"));
254+
e.setPontentialDirtyRead(potentialDirtyRead);
255+
return e;
256+
};
257+
}
258+
246259
protected ResponseDeserializer<Integer> deleteAqlFunctionResponseDeserializer() {
247260
return response -> getSerde().deserialize(response.getBody(), "/deletedCount", Integer.class);
248261
}

Diff for: core/src/main/java/com/arangodb/internal/cursor/AbstractArangoIterable.java

-38
This file was deleted.

0 commit comments

Comments
 (0)