Skip to content

[DE-725] Bugfix VST resilience #529

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 8 commits into from
Dec 1, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 0 additions & 3 deletions .github/workflows/resilience.yml
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,6 @@ jobs:
env:
TOXIPROXY_VERSION: v2.7.0

strategy:
fail-fast: false

steps:
- uses: actions/checkout@v2
- name: Set up JDK
Expand Down
7 changes: 7 additions & 0 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ on:
jobs:

test:
if: '! github.event.pull_request.draft'
timeout-minutes: 20
runs-on: ubuntu-latest

Expand Down Expand Up @@ -63,6 +64,7 @@ jobs:
run: mvn --no-transfer-progress -am -pl driver test -DargLine="-Duser.language=${{matrix.user-language}}"

test-ssl:
if: '! github.event.pull_request.draft'
timeout-minutes: 10
runs-on: ubuntu-latest

Expand Down Expand Up @@ -98,6 +100,7 @@ jobs:

# test encodeURIComponent() and normalize('NFC') comparing to Javascript behavior
test-graalvm:
if: '! github.event.pull_request.draft'
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
Expand All @@ -113,6 +116,7 @@ jobs:
run: mvn -e --no-transfer-progress -am -pl driver test -Dtest=graalvm.UnicodeUtilsTest -Dsurefire.failIfNoSpecifiedTests=false

test-jwt:
if: '! github.event.pull_request.draft'
timeout-minutes: 20
runs-on: ubuntu-latest

Expand Down Expand Up @@ -160,6 +164,7 @@ jobs:
run: mvn --no-transfer-progress -am -pl driver test -DargLine="-Duser.language=${{matrix.user-language}}"

jackson-test:
if: '! github.event.pull_request.draft'
timeout-minutes: 20
runs-on: ubuntu-latest

Expand Down Expand Up @@ -205,6 +210,7 @@ jobs:
run: mvn --no-transfer-progress -am -pl driver test -Dadb.jackson.version=${{matrix.jackson-version}}

integration-tests:
if: '! github.event.pull_request.draft'
timeout-minutes: 20
runs-on: ubuntu-latest

Expand Down Expand Up @@ -250,6 +256,7 @@ jobs:
run: mvn --no-transfer-progress -Pplain test

sonar:
if: '! github.event.pull_request.draft'
timeout-minutes: 10
runs-on: ubuntu-latest

Expand Down
161 changes: 161 additions & 0 deletions core/src/main/java/com/arangodb/internal/net/Communication.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
package com.arangodb.internal.net;

import com.arangodb.ArangoDBException;
import com.arangodb.config.HostDescription;
import com.arangodb.internal.InternalRequest;
import com.arangodb.internal.InternalResponse;
import com.arangodb.internal.RequestType;
import com.arangodb.internal.config.ArangoConfig;
import com.arangodb.internal.serde.InternalSerde;
import com.arangodb.internal.util.HostUtils;
import com.arangodb.internal.util.RequestUtils;
import com.arangodb.internal.util.ResponseUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.Closeable;
import java.io.IOException;
import java.net.ConnectException;
import java.net.SocketTimeoutException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;

public abstract class Communication implements Closeable {
private static final Logger LOGGER = LoggerFactory.getLogger(Communication.class);
protected final HostHandler hostHandler;
protected final InternalSerde serde;
private final AtomicLong reqCount;


protected Communication(final ArangoConfig config, final HostHandler hostHandler) {
this.hostHandler = hostHandler;
serde = config.getInternalSerde();
reqCount = new AtomicLong();
}

protected abstract void connect(final Connection conn) throws IOException;

@Override
public void close() throws IOException {
hostHandler.close();
}

public CompletableFuture<InternalResponse> executeAsync(final InternalRequest request, final HostHandle hostHandle) {
return executeAsync(request, hostHandle, hostHandler.get(hostHandle, RequestUtils.determineAccessType(request)), 0);
}

private CompletableFuture<InternalResponse> executeAsync(final InternalRequest request, final HostHandle hostHandle, final Host host, final int attemptCount) {
long reqId = reqCount.getAndIncrement();
return doExecuteAsync(request, hostHandle, host, attemptCount, host.connection(), reqId);
}

private CompletableFuture<InternalResponse> doExecuteAsync(
final InternalRequest request, final HostHandle hostHandle, final Host host, final int attemptCount, Connection connection, long reqId
) {
if (LOGGER.isDebugEnabled()) {
String body = request.getBody() == null ? "" : serde.toJsonString(request.getBody());
LOGGER.debug("Send Request [id={}]: {} {}", reqId, request, body);
}
final CompletableFuture<InternalResponse> rfuture = new CompletableFuture<>();
try {
connect(connection);
} catch (IOException e) {
handleException(true, e, hostHandle, request, host, reqId, attemptCount, rfuture);
return rfuture;
}

connection.executeAsync(request)
.whenComplete((response, e) -> {
try {
if (e instanceof SocketTimeoutException) {
// SocketTimeoutException exceptions are wrapped and rethrown.
TimeoutException te = new TimeoutException(e.getMessage());
te.initCause(e);
rfuture.completeExceptionally(ArangoDBException.of(te, reqId));
} else if (e instanceof TimeoutException) {
rfuture.completeExceptionally(ArangoDBException.of(e, reqId));
} else if (e instanceof ConnectException) {
handleException(true, e, hostHandle, request, host, reqId, attemptCount, rfuture);
} else if (e != null) {
handleException(isSafe(request), e, hostHandle, request, host, reqId, attemptCount, rfuture);
} else {
if (LOGGER.isDebugEnabled()) {
String body = response.getBody() == null ? "" : serde.toJsonString(response.getBody());
LOGGER.debug("Received Response [id={}]: {} {}", reqId, response, body);
}
ArangoDBException errorEntityEx = ResponseUtils.translateError(serde, response);
if (errorEntityEx instanceof ArangoDBRedirectException) {
if (attemptCount >= 3) {
rfuture.completeExceptionally(errorEntityEx);
} else {
final String location = ((ArangoDBRedirectException) errorEntityEx).getLocation();
final HostDescription redirectHost = HostUtils.createFromLocation(location);
hostHandler.failIfNotMatch(redirectHost, errorEntityEx);
mirror(
executeAsync(request, new HostHandle().setHost(redirectHost), hostHandler.get(hostHandle, RequestUtils.determineAccessType(request)), attemptCount + 1),
rfuture
);
}
} else if (errorEntityEx != null) {
rfuture.completeExceptionally(errorEntityEx);
} else {
hostHandler.success();
rfuture.complete(response);
}
}
} catch (Exception ex) {
rfuture.completeExceptionally(ArangoDBException.of(ex, reqId));
}
});
return rfuture;
}

private void handleException(boolean isSafe, Throwable e, HostHandle hostHandle, InternalRequest request, Host host,
long reqId, int attemptCount, CompletableFuture<InternalResponse> rfuture) {
IOException ioEx = wrapIOEx(e);
hostHandler.fail(ioEx);
if (hostHandle != null && hostHandle.getHost() != null) {
hostHandle.setHost(null);
}
hostHandler.checkNext(hostHandle, RequestUtils.determineAccessType(request));
if (isSafe) {
Host nextHost = hostHandler.get(hostHandle, RequestUtils.determineAccessType(request));
LOGGER.warn("Could not connect to {} while executing request [id={}]",
host.getDescription(), reqId, ioEx);
LOGGER.debug("Try connecting to {}", nextHost.getDescription());
mirror(
executeAsync(request, hostHandle, nextHost, attemptCount),
rfuture
);
} else {
ArangoDBException aEx = ArangoDBException.of(ioEx, reqId);
rfuture.completeExceptionally(aEx);
}
}

private void mirror(CompletableFuture<InternalResponse> up, CompletableFuture<InternalResponse> down) {
up.whenComplete((v, err) -> {
if (err != null) {
down.completeExceptionally(err instanceof CompletionException ? err.getCause() : err);
} else {
down.complete(v);
}
});
}

private static IOException wrapIOEx(Throwable t) {
if (t instanceof IOException) {
return (IOException) t;
} else {
return new IOException(t);
}
}

private boolean isSafe(final InternalRequest request) {
RequestType type = request.getRequestType();
return type == RequestType.GET || type == RequestType.HEAD || type == RequestType.OPTIONS;
}

}
6 changes: 6 additions & 0 deletions core/src/main/java/com/arangodb/internal/net/Connection.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,17 @@

package com.arangodb.internal.net;

import com.arangodb.internal.InternalRequest;
import com.arangodb.internal.InternalResponse;

import java.io.Closeable;
import java.util.concurrent.CompletableFuture;

/**
* @author Mark Vollmary
*/
public interface Connection extends Closeable {
void setJwt(String jwt);

CompletableFuture<InternalResponse> executeAsync(InternalRequest request);
}
88 changes: 0 additions & 88 deletions driver/src/test/java/com/arangodb/ConsumerThreadAsyncTest.java
Original file line number Diff line number Diff line change
@@ -1,101 +1,13 @@
package com.arangodb;

import com.arangodb.config.ArangoConfigProperties;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;

import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import static org.assertj.core.api.Assertions.assertThat;

public class ConsumerThreadAsyncTest extends BaseJunit5 {

private volatile Thread thread;

private void setThread() {
thread = Thread.currentThread();
}

private void sleep() {
try {
Thread.sleep(3_000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}

@ParameterizedTest
@EnumSource(Protocol.class)
@Disabled
void defaultConsumerThread(Protocol protocol) throws ExecutionException, InterruptedException {
ArangoDBAsync adb = new ArangoDB.Builder()
.loadProperties(ArangoConfigProperties.fromFile())
.protocol(protocol)
.build()
.async();

adb.getVersion()
.thenAccept(it -> setThread())
.get();

adb.shutdown();

if (Protocol.VST.equals(protocol)) {
assertThat(thread.getName()).startsWith("adb-vst-");
} else {
assertThat(thread.getName()).startsWith("adb-http-");
}
}

@ParameterizedTest
@EnumSource(Protocol.class)
void customConsumerExecutor(Protocol protocol) throws ExecutionException, InterruptedException {
ExecutorService es = Executors.newCachedThreadPool(r -> {
Thread t = Executors.defaultThreadFactory().newThread(r);
t.setName("custom-" + UUID.randomUUID());
return t;
});
ArangoDBAsync adb = new ArangoDB.Builder()
.loadProperties(ArangoConfigProperties.fromFile())
.protocol(protocol)
.asyncExecutor(es)
.build()
.async();

adb.getVersion()
.thenAccept(it -> setThread())
.get();

adb.shutdown();
es.shutdown();
assertThat(thread.getName()).startsWith("custom-");
}

/**
* Generates warns from Vert.x BlockedThreadChecker
*/
@ParameterizedTest
@EnumSource(Protocol.class)
@Disabled
void sleepOnDefaultConsumerThread(Protocol protocol) throws ExecutionException, InterruptedException {
ArangoDBAsync adb = new ArangoDB.Builder()
.loadProperties(ArangoConfigProperties.fromFile())
.protocol(protocol)
.maxConnections(1)
.build()
.async();

adb.getVersion()
.thenAccept(it -> sleep())
.get();

adb.shutdown();
}

@ParameterizedTest
@EnumSource(Protocol.class)
void nestedRequests(Protocol protocol) throws ExecutionException, InterruptedException {
Expand Down
Loading