Skip to content

[DE-771] transaction aware deserializer #554

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 11 commits into from
Apr 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions core/src/main/java/com/arangodb/internal/ArangoExecutor.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import com.arangodb.internal.config.ArangoConfig;
import com.arangodb.internal.net.CommunicationProtocol;
import com.arangodb.internal.serde.InternalSerde;
import com.arangodb.serde.SerdeContext;

import java.io.IOException;
import java.lang.reflect.Type;
Expand Down Expand Up @@ -58,8 +59,8 @@ public void setJwt(String jwt) {
protocol.setJwt(jwt);
}

protected <T> T createResult(final Type type, final InternalResponse response) {
return serde.deserialize(response.getBody(), type);
protected <T> T createResult(final Type type, final InternalResponse response, final SerdeContext ctx) {
return serde.deserialize(response.getBody(), type, ctx);
}

protected final void interceptResponse(InternalResponse response) {
Expand All @@ -79,6 +80,6 @@ public QueueTimeMetrics getQueueTimeMetrics() {
}

public interface ResponseDeserializer<T> {
T deserialize(InternalResponse response);
T deserialize(InternalResponse response, SerdeContext ctx);
}
}
24 changes: 20 additions & 4 deletions core/src/main/java/com/arangodb/internal/ArangoExecutorAsync.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import com.arangodb.internal.config.ArangoConfig;
import com.arangodb.internal.net.CommunicationProtocol;
import com.arangodb.internal.net.HostHandle;
import com.arangodb.internal.serde.SerdeUtils;
import com.arangodb.serde.SerdeContext;

import java.lang.reflect.Type;
import java.util.concurrent.CompletableFuture;
Expand All @@ -48,7 +50,7 @@ public <T> CompletableFuture<T> execute(final Supplier<InternalRequest> requestS
}

public <T> CompletableFuture<T> execute(final Supplier<InternalRequest> requestSupplier, final Type type, final HostHandle hostHandle) {
return execute(requestSupplier, response -> createResult(type, response), hostHandle);
return execute(requestSupplier, (response, ctx) -> createResult(type, response, ctx), hostHandle);
}

public <T> CompletableFuture<T> execute(final Supplier<InternalRequest> requestSupplier, final ResponseDeserializer<T> responseDeserializer) {
Expand All @@ -62,13 +64,16 @@ public <T> CompletableFuture<T> execute(

CompletableFuture<T> cf = CompletableFuture.completedFuture(requestSupplier)
.thenApply(Supplier::get)
.thenCompose(request -> protocol.executeAsync(interceptRequest(request), hostHandle))
.thenCompose(request -> protocol
.executeAsync(interceptRequest(request), hostHandle)
.thenApply(resp -> new ResponseWithContext(resp, SerdeUtils.createSerdeContext(request)))
)
.handle((r, e) -> {
if (e != null) {
throw ArangoDBException.of(e);
} else {
interceptResponse(r);
return responseDeserializer.deserialize(r);
interceptResponse(r.response);
return responseDeserializer.deserialize(r.response, r.context);
}
});

Expand All @@ -79,4 +84,15 @@ public <T> CompletableFuture<T> execute(
}
}

private static class ResponseWithContext {
final InternalResponse response;
final SerdeContext context;

ResponseWithContext(InternalResponse response, SerdeContext context) {
this.response = response;
this.context = context;
}
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.arangodb.internal.config.ArangoConfig;
import com.arangodb.internal.net.CommunicationProtocol;
import com.arangodb.internal.net.HostHandle;
import com.arangodb.internal.serde.SerdeUtils;

import java.lang.reflect.Type;

Expand All @@ -40,7 +41,7 @@ public <T> T execute(final InternalRequest request, final Type type) {
}

public <T> T execute(final InternalRequest request, final Type type, final HostHandle hostHandle) {
return execute(request, response -> createResult(type, response), hostHandle);
return execute(request, (response, ctx) -> createResult(type, response, ctx), hostHandle);
}

public <T> T execute(final InternalRequest request, final ResponseDeserializer<T> responseDeserializer) {
Expand All @@ -54,7 +55,7 @@ public <T> T execute(

final InternalResponse response = protocol.execute(interceptRequest(request), hostHandle);
interceptResponse(response);
return responseDeserializer.deserialize(response);
return responseDeserializer.deserialize(response, SerdeUtils.createSerdeContext(request));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ private InternalRequest createInsertDocumentRequest(final DocumentCreateOptions
}

protected <T> ResponseDeserializer<MultiDocumentEntity<DocumentCreateEntity<T>>> insertDocumentsResponseDeserializer(Class<T> userDataClass) {
return response -> {
return (response, ctx) -> {
final MultiDocumentEntity<DocumentCreateEntity<T>> multiDocument = new MultiDocumentEntity<>();
final List<DocumentCreateEntity<T>> docs = new ArrayList<>();
final List<ErrorEntity> errors = new ArrayList<>();
Expand All @@ -119,12 +119,12 @@ protected <T> ResponseDeserializer<MultiDocumentEntity<DocumentCreateEntity<T>>>
for (final JsonNode next : body) {
JsonNode isError = next.get(ArangoResponseField.ERROR_FIELD_NAME);
if (isError != null && isError.booleanValue()) {
final ErrorEntity error = getSerde().deserialize(next, ErrorEntity.class);
final ErrorEntity error = getSerde().deserialize(next, ErrorEntity.class, ctx);
errors.add(error);
documentsAndErrors.add(error);
} else {
Type type = constructParametricType(DocumentCreateEntity.class, userDataClass);
final DocumentCreateEntity<T> doc = getSerde().deserialize(next, type);
final DocumentCreateEntity<T> doc = getSerde().deserialize(next, type, ctx);
docs.add(doc);
documentsAndErrors.add(doc);
}
Expand Down Expand Up @@ -168,7 +168,7 @@ protected InternalRequest getDocumentRequest(final String key, final DocumentRea
}

protected <T> ResponseDeserializer<T> getDocumentResponseDeserializer(final Class<T> type) {
return response -> getSerde().deserializeUserData(response.getBody(), type);
return (response, ctx) -> getSerde().deserializeUserData(response.getBody(), type, ctx);
}

protected InternalRequest getDocumentsRequest(final Iterable<String> keys, final DocumentReadOptions options) {
Expand All @@ -186,7 +186,7 @@ protected InternalRequest getDocumentsRequest(final Iterable<String> keys, final

protected <T> ResponseDeserializer<MultiDocumentEntity<T>> getDocumentsResponseDeserializer(
final Class<T> type) {
return response -> {
return (response, ctx) -> {
final MultiDocumentEntity<T> multiDocument = new MultiDocumentEntity<>();
boolean potentialDirtyRead = Boolean.parseBoolean(response.getMeta("X-Arango-Potential-Dirty-Read"));
multiDocument.setPotentialDirtyRead(potentialDirtyRead);
Expand All @@ -197,11 +197,11 @@ protected <T> ResponseDeserializer<MultiDocumentEntity<T>> getDocumentsResponseD
for (final JsonNode next : body) {
JsonNode isError = next.get(ArangoResponseField.ERROR_FIELD_NAME);
if (isError != null && isError.booleanValue()) {
final ErrorEntity error = getSerde().deserialize(next, ErrorEntity.class);
final ErrorEntity error = getSerde().deserialize(next, ErrorEntity.class, ctx);
errors.add(error);
documentsAndErrors.add(error);
} else {
final T doc = getSerde().deserializeUserData(getSerde().serialize(next), type);
final T doc = getSerde().deserializeUserData(getSerde().serialize(next), type, ctx);
docs.add(doc);
documentsAndErrors.add(doc);
}
Expand Down Expand Up @@ -249,7 +249,7 @@ private InternalRequest createReplaceDocumentRequest(final DocumentReplaceOption

protected <T> ResponseDeserializer<MultiDocumentEntity<DocumentUpdateEntity<T>>> replaceDocumentsResponseDeserializer(
final Class<T> returnType) {
return response -> {
return (response, ctx) -> {
final MultiDocumentEntity<DocumentUpdateEntity<T>> multiDocument = new MultiDocumentEntity<>();
final List<DocumentUpdateEntity<T>> docs = new ArrayList<>();
final List<ErrorEntity> errors = new ArrayList<>();
Expand All @@ -258,12 +258,12 @@ protected <T> ResponseDeserializer<MultiDocumentEntity<DocumentUpdateEntity<T>>>
for (final JsonNode next : body) {
JsonNode isError = next.get(ArangoResponseField.ERROR_FIELD_NAME);
if (isError != null && isError.booleanValue()) {
final ErrorEntity error = getSerde().deserialize(next, ErrorEntity.class);
final ErrorEntity error = getSerde().deserialize(next, ErrorEntity.class, ctx);
errors.add(error);
documentsAndErrors.add(error);
} else {
Type type = constructParametricType(DocumentUpdateEntity.class, returnType);
final DocumentUpdateEntity<T> doc = getSerde().deserialize(next, type);
final DocumentUpdateEntity<T> doc = getSerde().deserialize(next, type, ctx);
docs.add(doc);
documentsAndErrors.add(doc);
}
Expand Down Expand Up @@ -312,7 +312,7 @@ private InternalRequest createUpdateDocumentRequest(final DocumentUpdateOptions

protected <T> ResponseDeserializer<MultiDocumentEntity<DocumentUpdateEntity<T>>> updateDocumentsResponseDeserializer(
final Class<T> returnType) {
return response -> {
return (response, ctx) -> {
final MultiDocumentEntity<DocumentUpdateEntity<T>> multiDocument = new MultiDocumentEntity<>();
final List<DocumentUpdateEntity<T>> docs = new ArrayList<>();
final List<ErrorEntity> errors = new ArrayList<>();
Expand All @@ -321,12 +321,12 @@ protected <T> ResponseDeserializer<MultiDocumentEntity<DocumentUpdateEntity<T>>>
for (final JsonNode next : body) {
JsonNode isError = next.get(ArangoResponseField.ERROR_FIELD_NAME);
if (isError != null && isError.booleanValue()) {
final ErrorEntity error = getSerde().deserialize(next, ErrorEntity.class);
final ErrorEntity error = getSerde().deserialize(next, ErrorEntity.class, ctx);
errors.add(error);
documentsAndErrors.add(error);
} else {
Type type = constructParametricType(DocumentUpdateEntity.class, returnType);
final DocumentUpdateEntity<T> doc = getSerde().deserialize(next, type);
final DocumentUpdateEntity<T> doc = getSerde().deserialize(next, type, ctx);
docs.add(doc);
documentsAndErrors.add(doc);
}
Expand Down Expand Up @@ -368,7 +368,7 @@ private InternalRequest createDeleteDocumentRequest(final DocumentDeleteOptions

protected <T> ResponseDeserializer<MultiDocumentEntity<DocumentDeleteEntity<T>>> deleteDocumentsResponseDeserializer(
final Class<T> userDataClass) {
return response -> {
return (response, ctx) -> {
final MultiDocumentEntity<DocumentDeleteEntity<T>> multiDocument = new MultiDocumentEntity<>();
final List<DocumentDeleteEntity<T>> docs = new ArrayList<>();
final List<ErrorEntity> errors = new ArrayList<>();
Expand All @@ -377,12 +377,12 @@ protected <T> ResponseDeserializer<MultiDocumentEntity<DocumentDeleteEntity<T>>>
for (final JsonNode next : body) {
JsonNode isError = next.get(ArangoResponseField.ERROR_FIELD_NAME);
if (isError != null && isError.booleanValue()) {
final ErrorEntity error = getSerde().deserialize(next, ErrorEntity.class);
final ErrorEntity error = getSerde().deserialize(next, ErrorEntity.class, ctx);
errors.add(error);
documentsAndErrors.add(error);
} else {
Type type = constructParametricType(DocumentDeleteEntity.class, userDataClass);
final DocumentDeleteEntity<T> doc = getSerde().deserialize(next, type);
final DocumentDeleteEntity<T> doc = getSerde().deserialize(next, type, ctx);
docs.add(doc);
documentsAndErrors.add(doc);
}
Expand Down Expand Up @@ -413,7 +413,7 @@ protected InternalRequest deleteIndexRequest(final String id) {
}

protected ResponseDeserializer<String> deleteIndexResponseDeserializer() {
return response -> getSerde().deserialize(response.getBody(), "/id", String.class);
return (response, ctx) -> getSerde().deserialize(response.getBody(), "/id", String.class, ctx);
}

private String createIndexId(final String id) {
Expand Down Expand Up @@ -495,23 +495,23 @@ protected InternalRequest getIndexesRequest() {
}

protected ResponseDeserializer<Collection<IndexEntity>> getIndexesResponseDeserializer() {
return response -> {
return (response, ctx) -> {
Collection<IndexEntity> indexes = new ArrayList<>();
for (JsonNode idx : getSerde().parse(response.getBody(), "/indexes")) {
if (!"inverted".equals(idx.get("type").textValue())) {
indexes.add(getSerde().deserialize(idx, IndexEntity.class));
indexes.add(getSerde().deserialize(idx, IndexEntity.class, ctx));
}
}
return indexes;
};
}

protected ResponseDeserializer<Collection<InvertedIndexEntity>> getInvertedIndexesResponseDeserializer() {
return response -> {
return (response, ctx) -> {
Collection<InvertedIndexEntity> indexes = new ArrayList<>();
for (JsonNode idx : getSerde().parse(response.getBody(), "/indexes")) {
if ("inverted".equals(idx.get("type").textValue())) {
indexes.add(getSerde().deserialize(idx, InvertedIndexEntity.class));
indexes.add(getSerde().deserialize(idx, InvertedIndexEntity.class, ctx));
}
}
return indexes;
Expand Down Expand Up @@ -583,8 +583,8 @@ protected InternalRequest getPermissionsRequest(final String user) {
}

protected ResponseDeserializer<Permissions> getPermissionsResponseDeserialzer() {
return response -> getSerde().deserialize(response.getBody(), ArangoResponseField.RESULT_JSON_POINTER,
Permissions.class);
return (response, ctx) -> getSerde().deserialize(response.getBody(), ArangoResponseField.RESULT_JSON_POINTER,
Permissions.class, ctx);
}

}
22 changes: 11 additions & 11 deletions core/src/main/java/com/arangodb/internal/InternalArangoDB.java
Original file line number Diff line number Diff line change
Expand Up @@ -66,11 +66,11 @@ protected InternalRequest getServerIdRequest() {
}

protected ResponseDeserializer<ServerRole> getRoleResponseDeserializer() {
return response -> getSerde().deserialize(response.getBody(), "/role", ServerRole.class);
return (response, ctx) -> getSerde().deserialize(response.getBody(), "/role", ServerRole.class, ctx);
}

protected ResponseDeserializer<String> getServerIdResponseDeserializer() {
return response -> getSerde().deserialize(response.getBody(), "/id", String.class);
return (response, ctx) -> getSerde().deserialize(response.getBody(), "/id", String.class, ctx);
}

protected InternalRequest createDatabaseRequest(final DBCreateOptions options) {
Expand All @@ -81,25 +81,25 @@ protected InternalRequest createDatabaseRequest(final DBCreateOptions options) {
}

protected ResponseDeserializer<Boolean> createDatabaseResponseDeserializer() {
return response -> getSerde().deserialize(response.getBody(), ArangoResponseField.RESULT_JSON_POINTER,
Boolean.class);
return (response, ctx) -> getSerde().deserialize(response.getBody(), ArangoResponseField.RESULT_JSON_POINTER,
Boolean.class, ctx);
}

protected InternalRequest getDatabasesRequest(final String dbName) {
return request(dbName, RequestType.GET, InternalArangoDatabase.PATH_API_DATABASE);
}

protected ResponseDeserializer<Collection<String>> getDatabaseResponseDeserializer() {
return response -> getSerde().deserialize(response.getBody(), ArangoResponseField.RESULT_JSON_POINTER,
constructListType(String.class));
return (response, ctx) -> getSerde().deserialize(response.getBody(), ArangoResponseField.RESULT_JSON_POINTER,
constructListType(String.class), ctx);
}

protected InternalRequest getAccessibleDatabasesForRequest(final String dbName, final String user) {
return request(dbName, RequestType.GET, PATH_API_USER, user, ArangoRequestParam.DATABASE);
}

protected ResponseDeserializer<Collection<String>> getAccessibleDatabasesForResponseDeserializer() {
return response -> {
return (response, ctx) -> {
Iterator<String> names =
getSerde().parse(response.getBody(), ArangoResponseField.RESULT_JSON_POINTER).fieldNames();
final Collection<String> dbs = new ArrayList<>();
Expand Down Expand Up @@ -136,8 +136,8 @@ protected InternalRequest getUserRequest(final String dbName, final String user)
}

protected ResponseDeserializer<Collection<UserEntity>> getUsersResponseDeserializer() {
return response -> getSerde().deserialize(response.getBody(), ArangoResponseField.RESULT_JSON_POINTER,
constructListType(UserEntity.class));
return (response, ctx) -> getSerde().deserialize(response.getBody(), ArangoResponseField.RESULT_JSON_POINTER,
constructListType(UserEntity.class), ctx);
}

protected InternalRequest updateUserRequest(final String dbName, final String user, final UserUpdateOptions options) {
Expand Down Expand Up @@ -173,10 +173,10 @@ protected InternalRequest executeRequest(final Request<?> request) {
}

protected <T> ResponseDeserializer<Response<T>> responseDeserializer(Class<T> type) {
return response -> new Response<>(
return (response, ctx) -> new Response<>(
response.getResponseCode(),
response.getMeta(),
getSerde().deserializeUserData(response.getBody(), type)
getSerde().deserializeUserData(response.getBody(), type, ctx)
);
}

Expand Down
Loading
Loading