From e11ccd03849a761254f8f70bfb1edf1bfbbca409 Mon Sep 17 00:00:00 2001 From: Michele Rastelli Date: Wed, 22 May 2024 13:25:10 +0200 Subject: [PATCH 1/8] reuse Vert.x instance --- core/src/main/java/com/arangodb/ArangoDB.java | 30 ++++++++++---- .../config/ArangoConfigProperties.java | 4 ++ .../com/arangodb/internal/ArangoDBImpl.java | 7 +++- .../com/arangodb/internal/ArangoDefaults.java | 1 + .../internal/config/ArangoConfig.java | 10 +++++ .../internal/net/ConnectionFactory.java | 4 ++ .../internal/net/ProtocolProvider.java | 12 +++++- .../java/com/arangodb/ArangoConfigTest.java | 1 + .../com/arangodb/http/HttpConnection.java | 27 ++++-------- .../arangodb/http/HttpConnectionFactory.java | 41 +++++++++++++++++-- .../arangodb/http/HttpProtocolProvider.java | 4 +- .../vst/VstConnectionFactoryAsync.java | 5 +++ 12 files changed, 109 insertions(+), 37 deletions(-) diff --git a/core/src/main/java/com/arangodb/ArangoDB.java b/core/src/main/java/com/arangodb/ArangoDB.java index 9a5cc430a..755c551f9 100644 --- a/core/src/main/java/com/arangodb/ArangoDB.java +++ b/core/src/main/java/com/arangodb/ArangoDB.java @@ -264,9 +264,9 @@ public interface ArangoDB extends ArangoSerdeAccessor { * * @param user The name of the user * @param permissions The permissions the user grant - * @since ArangoDB 3.2.0 * @see API * Documentation + * @since ArangoDB 3.2.0 */ void grantDefaultDatabaseAccess(String user, Permissions permissions); @@ -276,9 +276,9 @@ public interface ArangoDB extends ArangoSerdeAccessor { * * @param user The name of the user * @param permissions The permissions the user grant - * @since ArangoDB 3.2.0 * @see API * Documentation + * @since ArangoDB 3.2.0 */ void grantDefaultCollectionAccess(String user, Permissions permissions); @@ -313,9 +313,9 @@ public interface ArangoDB extends ArangoSerdeAccessor { * Returns the server's current loglevel settings. * * @return the server's current loglevel settings - * @since ArangoDB 3.1.0 * @see API * Documentation + * @since ArangoDB 3.1.0 */ LogLevelEntity getLogLevel(); @@ -323,9 +323,9 @@ public interface ArangoDB extends ArangoSerdeAccessor { * Returns the server's current loglevel settings. * * @return the server's current loglevel settings - * @since ArangoDB 3.10 * @see API * Documentation + * @since ArangoDB 3.10 */ LogLevelEntity getLogLevel(LogLevelOptions options); @@ -334,9 +334,9 @@ public interface ArangoDB extends ArangoSerdeAccessor { * * @param entity loglevel settings * @return the server's current loglevel settings - * @since ArangoDB 3.1.0 * @see API * Documentation + * @since ArangoDB 3.1.0 */ LogLevelEntity setLogLevel(LogLevelEntity entity); @@ -345,17 +345,17 @@ public interface ArangoDB extends ArangoSerdeAccessor { * * @param entity loglevel settings * @return the server's current loglevel settings - * @since ArangoDB 3.10 * @see API * Documentation + * @since ArangoDB 3.10 */ LogLevelEntity setLogLevel(LogLevelEntity entity, LogLevelOptions options); /** * @return the list of available rules and their respective flags - * @since ArangoDB 3.10 * @see API * Documentation + * @since ArangoDB 3.10 */ Collection getQueryOptimizerRules(); @@ -381,7 +381,7 @@ public ArangoDB build() { ProtocolProvider protocolProvider = protocolProvider(config.getProtocol()); config.setProtocolModule(protocolProvider.protocolModule()); - ConnectionFactory connectionFactory = protocolProvider.createConnectionFactory(); + ConnectionFactory connectionFactory = protocolProvider.createConnectionFactory(config); Collection hostList = createHostList(connectionFactory); HostResolver hostResolver = createHostResolver(hostList, connectionFactory); HostHandler hostHandler = createHostHandler(hostResolver); @@ -394,7 +394,8 @@ public ArangoDB build() { return new ArangoDBImpl( config, protocol, - hostHandler + hostHandler, + connectionFactory ); } @@ -680,6 +681,17 @@ public Builder compressionLevel(Integer level) { return this; } + /** + * Sets whether to reuse the Vert.x instance owning the current thread Vert.x context. + * + * @param reuseVertx whether to reuse the Vert.x instance (default: {@code false}) + * @return {@link ArangoDB.Builder} + */ + public Builder reuseVertx(Boolean reuseVertx) { + config.setReuseVertx(reuseVertx); + return this; + } + @UnstableApi protected ProtocolProvider protocolProvider(Protocol protocol) { ServiceLoader loader = ServiceLoader.load(ProtocolProvider.class); diff --git a/core/src/main/java/com/arangodb/config/ArangoConfigProperties.java b/core/src/main/java/com/arangodb/config/ArangoConfigProperties.java index 47c464a1e..9690e5f4e 100644 --- a/core/src/main/java/com/arangodb/config/ArangoConfigProperties.java +++ b/core/src/main/java/com/arangodb/config/ArangoConfigProperties.java @@ -110,4 +110,8 @@ default Optional getCompressionLevel() { return Optional.empty(); } + default Optional getReuseVertx() { + return Optional.empty(); + } + } diff --git a/core/src/main/java/com/arangodb/internal/ArangoDBImpl.java b/core/src/main/java/com/arangodb/internal/ArangoDBImpl.java index 7554c9919..3899a4d09 100644 --- a/core/src/main/java/com/arangodb/internal/ArangoDBImpl.java +++ b/core/src/main/java/com/arangodb/internal/ArangoDBImpl.java @@ -24,6 +24,7 @@ import com.arangodb.entity.*; import com.arangodb.internal.config.ArangoConfig; import com.arangodb.internal.net.CommunicationProtocol; +import com.arangodb.internal.net.ConnectionFactory; import com.arangodb.internal.net.HostHandler; import com.arangodb.internal.serde.SerdeUtils; import com.arangodb.model.*; @@ -41,12 +42,15 @@ public class ArangoDBImpl extends InternalArangoDB implements ArangoDB { private static final Logger LOGGER = LoggerFactory.getLogger(ArangoDBImpl.class); private final HostHandler hostHandler; + private final ConnectionFactory connectionFactory; public ArangoDBImpl(final ArangoConfig config, final CommunicationProtocol protocol, - final HostHandler hostHandler) { + final HostHandler hostHandler, + final ConnectionFactory connectionFactory) { super(protocol, config); this.hostHandler = hostHandler; + this.connectionFactory = connectionFactory; LOGGER.debug("ArangoDB Client is ready to use"); } @@ -58,6 +62,7 @@ public ArangoDBAsync async() { @Override public void shutdown() { executorSync().disconnect(); + connectionFactory.close(); } @Override diff --git a/core/src/main/java/com/arangodb/internal/ArangoDefaults.java b/core/src/main/java/com/arangodb/internal/ArangoDefaults.java index b08c045da..8c8bcffbb 100644 --- a/core/src/main/java/com/arangodb/internal/ArangoDefaults.java +++ b/core/src/main/java/com/arangodb/internal/ArangoDefaults.java @@ -54,6 +54,7 @@ public final class ArangoDefaults { public static final Integer DEFAULT_ACQUIRE_HOST_LIST_INTERVAL = 60 * 60 * 1000; // hour public static final LoadBalancingStrategy DEFAULT_LOAD_BALANCING_STRATEGY = LoadBalancingStrategy.NONE; public static final Integer DEFAULT_RESPONSE_QUEUE_TIME_SAMPLES = 10; + public static final Boolean DEFAULT_REUSE_VERTX = false; // region compression public static final Compression DEFAULT_COMPRESSION = Compression.NONE; diff --git a/core/src/main/java/com/arangodb/internal/config/ArangoConfig.java b/core/src/main/java/com/arangodb/internal/config/ArangoConfig.java index ecfc47724..e69debe3f 100644 --- a/core/src/main/java/com/arangodb/internal/config/ArangoConfig.java +++ b/core/src/main/java/com/arangodb/internal/config/ArangoConfig.java @@ -49,6 +49,7 @@ public class ArangoConfig { private Compression compression; private Integer compressionThreshold; private Integer compressionLevel; + private Boolean reuseVertx; private static final Logger LOG = LoggerFactory.getLogger(ArangoConfig.class); @@ -112,6 +113,7 @@ public void loadProperties(final ArangoConfigProperties properties) { compression = properties.getCompression().orElse(ArangoDefaults.DEFAULT_COMPRESSION); compressionThreshold = properties.getCompressionThreshold().orElse(ArangoDefaults.DEFAULT_COMPRESSION_THRESHOLD); compressionLevel = properties.getCompressionLevel().orElse(ArangoDefaults.DEFAULT_COMPRESSION_LEVEL); + reuseVertx = properties.getReuseVertx().orElse(ArangoDefaults.DEFAULT_REUSE_VERTX); } public List getHosts() { @@ -329,4 +331,12 @@ public Integer getCompressionLevel() { public void setCompressionLevel(Integer compressionLevel) { this.compressionLevel = compressionLevel; } + + public Boolean getReuseVertx() { + return reuseVertx; + } + + public void setReuseVertx(Boolean reuseVertx) { + this.reuseVertx = reuseVertx; + } } diff --git a/core/src/main/java/com/arangodb/internal/net/ConnectionFactory.java b/core/src/main/java/com/arangodb/internal/net/ConnectionFactory.java index b0fbbdf7b..3cabeaeec 100644 --- a/core/src/main/java/com/arangodb/internal/net/ConnectionFactory.java +++ b/core/src/main/java/com/arangodb/internal/net/ConnectionFactory.java @@ -30,4 +30,8 @@ @UsedInApi public interface ConnectionFactory { Connection create(ArangoConfig config, HostDescription host); + + default void close() { + // keep backward compatibility + } } diff --git a/core/src/main/java/com/arangodb/internal/net/ProtocolProvider.java b/core/src/main/java/com/arangodb/internal/net/ProtocolProvider.java index bcc55e127..8372806da 100644 --- a/core/src/main/java/com/arangodb/internal/net/ProtocolProvider.java +++ b/core/src/main/java/com/arangodb/internal/net/ProtocolProvider.java @@ -11,7 +11,17 @@ public interface ProtocolProvider { boolean supportsProtocol(Protocol protocol); - ConnectionFactory createConnectionFactory(); + /** + * @deprecated use {@link #createConnectionFactory(ArangoConfig)} instead + */ + @Deprecated + default ConnectionFactory createConnectionFactory() { + throw new UnsupportedOperationException(); + } + + default ConnectionFactory createConnectionFactory(ArangoConfig config) { + return createConnectionFactory(); + } CommunicationProtocol createProtocol(ArangoConfig config, HostHandler hostHandler); diff --git a/driver/src/test/java/com/arangodb/ArangoConfigTest.java b/driver/src/test/java/com/arangodb/ArangoConfigTest.java index f551fed76..cf7b62e9d 100644 --- a/driver/src/test/java/com/arangodb/ArangoConfigTest.java +++ b/driver/src/test/java/com/arangodb/ArangoConfigTest.java @@ -31,5 +31,6 @@ void defaultValues() { assertThat(cfg.getCompression()).isEqualTo(ArangoDefaults.DEFAULT_COMPRESSION); assertThat(cfg.getCompressionThreshold()).isEqualTo(ArangoDefaults.DEFAULT_COMPRESSION_THRESHOLD); assertThat(cfg.getCompressionLevel()).isEqualTo(ArangoDefaults.DEFAULT_COMPRESSION_LEVEL); + assertThat(cfg.getReuseVertx()).isEqualTo(ArangoDefaults.DEFAULT_REUSE_VERTX); } } diff --git a/http/src/main/java/com/arangodb/http/HttpConnection.java b/http/src/main/java/com/arangodb/http/HttpConnection.java index 575ac940a..f32ae9de0 100644 --- a/http/src/main/java/com/arangodb/http/HttpConnection.java +++ b/http/src/main/java/com/arangodb/http/HttpConnection.java @@ -37,7 +37,6 @@ import io.netty.handler.ssl.JdkSslContext; import io.vertx.core.MultiMap; import io.vertx.core.Vertx; -import io.vertx.core.VertxOptions; import io.vertx.core.buffer.Buffer; import io.vertx.core.http.HttpHeaders; import io.vertx.core.http.HttpMethod; @@ -50,8 +49,6 @@ import io.vertx.ext.web.client.HttpResponse; import io.vertx.ext.web.client.WebClient; import io.vertx.ext.web.client.WebClientOptions; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import javax.net.ssl.SSLContext; import java.security.NoSuchAlgorithmException; @@ -61,7 +58,6 @@ import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; /** @@ -70,24 +66,21 @@ */ @UnstableApi public class HttpConnection implements Connection { - private static final Logger LOGGER = LoggerFactory.getLogger(HttpConnection.class); private static final String CONTENT_TYPE_APPLICATION_JSON_UTF8 = "application/json; charset=utf-8"; private static final String CONTENT_TYPE_VPACK = "application/x-velocypack"; private static final String USER_AGENT = getUserAgent(); - private static final AtomicInteger THREAD_COUNT = new AtomicInteger(); - private String auth; + private volatile String auth; private final int compressionThreshold; private final Encoder encoder; private final WebClient client; private final Integer timeout; private final MultiMap commonHeaders = MultiMap.caseInsensitiveMultiMap(); - private final Vertx vertx; private static String getUserAgent() { return "JavaDriver/" + PackageVersion.VERSION + " (JVM/" + System.getProperty("java.specification.version") + ")"; } - HttpConnection(final ArangoConfig config, final HostDescription host) { + HttpConnection(final ArangoConfig config, final HostDescription host, final Vertx vertx) { super(); Protocol protocol = config.getProtocol(); ContentType contentType = ContentTypeFactory.of(protocol); @@ -108,14 +101,9 @@ private static String getUserAgent() { } commonHeaders.add("x-arango-driver", USER_AGENT); timeout = config.getTimeout(); - vertx = Vertx.vertx(new VertxOptions().setPreferNativeTransport(true).setEventLoopPoolSize(1)); - vertx.runOnContext(e -> { - Thread.currentThread().setName("adb-http-" + THREAD_COUNT.getAndIncrement()); - auth = new UsernamePasswordCredentials( - config.getUser(), Optional.ofNullable(config.getPassword()).orElse("") - ).toHttpAuthorization(); - LOGGER.debug("Created Vert.x context"); - }); + auth = new UsernamePasswordCredentials( + config.getUser(), Optional.ofNullable(config.getPassword()).orElse("") + ).toHttpAuthorization(); int intTtl = Optional.ofNullable(config.getConnectionTtl()) .map(ttl -> Math.toIntExact(ttl / 1000)) @@ -229,7 +217,6 @@ private static void addHeader(final InternalRequest request, final HttpRequest executeAsync(@UnstableApi final InternalRequest request) { CompletableFuture rfuture = new CompletableFuture<>(); - vertx.runOnContext(e -> doExecute(request, rfuture)); + doExecute(request, rfuture); return rfuture; } @@ -308,7 +295,7 @@ private InternalResponse buildResponse(final HttpResponse httpResponse) @Override public void setJwt(String jwt) { if (jwt != null) { - vertx.runOnContext(e -> auth = new TokenCredentials(jwt).toHttpAuthorization()); + auth = new TokenCredentials(jwt).toHttpAuthorization(); } } diff --git a/http/src/main/java/com/arangodb/http/HttpConnectionFactory.java b/http/src/main/java/com/arangodb/http/HttpConnectionFactory.java index 8776c093c..b8ec0185b 100644 --- a/http/src/main/java/com/arangodb/http/HttpConnectionFactory.java +++ b/http/src/main/java/com/arangodb/http/HttpConnectionFactory.java @@ -25,15 +25,48 @@ import com.arangodb.internal.config.ArangoConfig; import com.arangodb.internal.net.Connection; import com.arangodb.internal.net.ConnectionFactory; +import io.vertx.core.Context; +import io.vertx.core.Vertx; +import io.vertx.core.VertxOptions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Optional; -/** - * @author Mark Vollmary - */ @UnstableApi public class HttpConnectionFactory implements ConnectionFactory { + private final Logger LOGGER = LoggerFactory.getLogger(HttpConnectionFactory.class); + + private final Vertx vertx; + private final boolean manageVertx; + + public HttpConnectionFactory(final ArangoConfig config) { + Optional existingVertx = Optional.ofNullable(Vertx.currentContext()).map(Context::owner); + if (config.getReuseVertx() && existingVertx.isPresent()) { + LOGGER.info("Reusing existing Vert.x instance"); + vertx = existingVertx.get(); + manageVertx = false; + } else { + if (existingVertx.isPresent()) { + LOGGER.warn("Found an existing Vert.x instance, set reuseVertx=true to reuse it"); + } + LOGGER.info("Creating new Vert.x instance"); + vertx = Vertx.vertx(new VertxOptions().setPreferNativeTransport(true)); + manageVertx = true; + } + } + @Override @UnstableApi public Connection create(@UnstableApi final ArangoConfig config, final HostDescription host) { - return new HttpConnection(config, host); + return new HttpConnection(config, host, vertx); + } + + @Override + public synchronized void close() { + if (manageVertx) { + LOGGER.info("Closing Vert.x instance"); + vertx.close(); + } } } diff --git a/http/src/main/java/com/arangodb/http/HttpProtocolProvider.java b/http/src/main/java/com/arangodb/http/HttpProtocolProvider.java index e204f0886..40bbf7d36 100644 --- a/http/src/main/java/com/arangodb/http/HttpProtocolProvider.java +++ b/http/src/main/java/com/arangodb/http/HttpProtocolProvider.java @@ -22,8 +22,8 @@ public boolean supportsProtocol(Protocol protocol) { @Override @UnstableApi - public ConnectionFactory createConnectionFactory() { - return new HttpConnectionFactory(); + public ConnectionFactory createConnectionFactory(@UnstableApi ArangoConfig config) { + return new HttpConnectionFactory(config); } @Override diff --git a/vst/src/main/java/com/arangodb/vst/VstConnectionFactoryAsync.java b/vst/src/main/java/com/arangodb/vst/VstConnectionFactoryAsync.java index f0faca44f..24e29c497 100644 --- a/vst/src/main/java/com/arangodb/vst/VstConnectionFactoryAsync.java +++ b/vst/src/main/java/com/arangodb/vst/VstConnectionFactoryAsync.java @@ -39,4 +39,9 @@ public Connection create(@UnstableApi final ArangoConfig config, final HostDescr return new VstConnectionAsync(config, host); } + @Override + public void close() { + // no-op + } + } From 684b057f718cdb120d4e4454508bc0c5ba6eca6e Mon Sep 17 00:00:00 2001 From: Michele Rastelli Date: Wed, 22 May 2024 13:54:34 +0200 Subject: [PATCH 2/8] reuse Vert.x instance tests --- .../test/java/resilience/vertx/VertxTest.java | 76 +++++++++++++++++++ 1 file changed, 76 insertions(+) create mode 100644 resilience-tests/src/test/java/resilience/vertx/VertxTest.java diff --git a/resilience-tests/src/test/java/resilience/vertx/VertxTest.java b/resilience-tests/src/test/java/resilience/vertx/VertxTest.java new file mode 100644 index 000000000..808b18e8b --- /dev/null +++ b/resilience-tests/src/test/java/resilience/vertx/VertxTest.java @@ -0,0 +1,76 @@ +package resilience.vertx; + +import ch.qos.logback.classic.spi.ILoggingEvent; +import com.arangodb.ArangoDB; +import io.vertx.core.Vertx; +import org.junit.jupiter.api.Test; +import resilience.SingleServerTest; + +import java.util.concurrent.ExecutionException; + +import static org.assertj.core.api.Assertions.assertThat; + +public class VertxTest extends SingleServerTest { + + @Test + void managedVertx() { + ArangoDB adb = new ArangoDB.Builder() + .host("172.28.0.1", 8529) + .password("test") + .build(); + + adb.getVersion(); + adb.shutdown(); + + assertThat(logs.getLogs()) + .filteredOn(it -> it.getLoggerName().equals("com.arangodb.http.HttpConnectionFactory")) + .map(ILoggingEvent::getFormattedMessage) + .anySatisfy(it -> assertThat(it).contains("Creating new Vert.x instance")) + .anySatisfy(it -> assertThat(it).contains("Closing Vert.x instance")); + } + + @Test + void reuseVertx() throws ExecutionException, InterruptedException { + Vertx vertx = Vertx.vertx(); + vertx.executeBlocking(() -> { + ArangoDB adb = new ArangoDB.Builder() + .host("172.28.0.1", 8529) + .password("test") + .reuseVertx(true) + .build(); + adb.getVersion(); + adb.shutdown(); + return null; + }).toCompletionStage().toCompletableFuture().get(); + vertx.close(); + + assertThat(logs.getLogs()) + .filteredOn(it -> it.getLoggerName().equals("com.arangodb.http.HttpConnectionFactory")) + .map(ILoggingEvent::getFormattedMessage) + .anySatisfy(it -> assertThat(it).contains("Reusing existing Vert.x instance")); + } + + @Test + void notReuseVertx() throws ExecutionException, InterruptedException { + Vertx vertx = Vertx.vertx(); + vertx.executeBlocking(() -> { + ArangoDB adb = new ArangoDB.Builder() + .host("172.28.0.1", 8529) + .password("test") + .reuseVertx(false) + .build(); + adb.getVersion(); + adb.shutdown(); + return null; + }).toCompletionStage().toCompletableFuture().get(); + vertx.close(); + + assertThat(logs.getLogs()) + .filteredOn(it -> it.getLoggerName().equals("com.arangodb.http.HttpConnectionFactory")) + .map(ILoggingEvent::getFormattedMessage) + .anySatisfy(it -> assertThat(it).contains("Found an existing Vert.x instance, set reuseVertx=true to reuse it")) + .anySatisfy(it -> assertThat(it).contains("Creating new Vert.x instance")) + .anySatisfy(it -> assertThat(it).contains("Closing Vert.x instance")); + } + +} From 3e26dd912e8724485b0f7ee6d63fe96d75a37cbc Mon Sep 17 00:00:00 2001 From: Michele Rastelli Date: Wed, 22 May 2024 19:31:48 +0200 Subject: [PATCH 3/8] fixed tests --- driver/src/test/java/perf/SimpleSyncPerfTest.java | 5 +++++ .../main/java/com/arangodb/http/HttpConnectionFactory.java | 2 +- 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/driver/src/test/java/perf/SimpleSyncPerfTest.java b/driver/src/test/java/perf/SimpleSyncPerfTest.java index 4e18b71f4..5dcb5188a 100644 --- a/driver/src/test/java/perf/SimpleSyncPerfTest.java +++ b/driver/src/test/java/perf/SimpleSyncPerfTest.java @@ -21,6 +21,7 @@ package perf; import com.arangodb.ArangoDB; +import com.arangodb.BaseJunit5; import com.arangodb.Protocol; import org.junit.jupiter.api.Disabled; import org.junit.jupiter.params.ParameterizedTest; @@ -28,6 +29,8 @@ import java.util.Date; +import static org.junit.jupiter.api.Assumptions.assumeTrue; + /** * @author Michele Rastelli */ @@ -44,6 +47,8 @@ private void doGetVersion(ArangoDB arangoDB) { @ParameterizedTest @EnumSource(Protocol.class) void getVersion(Protocol protocol) throws InterruptedException { + assumeTrue(!protocol.equals(Protocol.VST) || BaseJunit5.isLessThanVersion(3, 12)); + ArangoDB arangoDB = new ArangoDB.Builder() .host("172.28.0.1", 8529) .password("test") diff --git a/http/src/main/java/com/arangodb/http/HttpConnectionFactory.java b/http/src/main/java/com/arangodb/http/HttpConnectionFactory.java index b8ec0185b..f770476e0 100644 --- a/http/src/main/java/com/arangodb/http/HttpConnectionFactory.java +++ b/http/src/main/java/com/arangodb/http/HttpConnectionFactory.java @@ -40,7 +40,7 @@ public class HttpConnectionFactory implements ConnectionFactory { private final Vertx vertx; private final boolean manageVertx; - public HttpConnectionFactory(final ArangoConfig config) { + public HttpConnectionFactory(@UnstableApi final ArangoConfig config) { Optional existingVertx = Optional.ofNullable(Vertx.currentContext()).map(Context::owner); if (config.getReuseVertx() && existingVertx.isPresent()) { LOGGER.info("Reusing existing Vert.x instance"); From d584819d216a3d21b4feb45a5ca3375957f2b09a Mon Sep 17 00:00:00 2001 From: Michele Rastelli Date: Wed, 22 May 2024 21:41:14 +0200 Subject: [PATCH 4/8] fixed acrivefailover tests --- .github/workflows/test.yml | 8 ++++++++ docker/start_db.sh | 3 ++- 2 files changed, 10 insertions(+), 1 deletion(-) diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 27dd0c47e..a4bbfc881 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -32,6 +32,9 @@ jobs: - docker.io/arangodb/arangodb:3.12 - docker.io/arangodb/enterprise:3.11 - docker.io/arangodb/enterprise:3.12 + starter-docker-img: + # support activefailover + - docker.io/arangodb/arangodb-starter:0.18.5 topology: - single - cluster @@ -60,6 +63,7 @@ jobs: ARANGO_LICENSE_KEY: ${{ secrets.ARANGO_LICENSE_KEY }} STARTER_MODE: ${{matrix.topology}} DOCKER_IMAGE: ${{matrix.docker-img}} + STARTER_DOCKER_IMAGE: ${{matrix.starter-docker-img}} - name: Info run: mvn -version - name: Test @@ -127,6 +131,9 @@ jobs: matrix: docker-img: - docker.io/arangodb/enterprise:3.11 + starter-docker-img: + # support activefailover + - docker.io/arangodb/arangodb-starter:0.18.5 topology: - single - cluster @@ -150,6 +157,7 @@ jobs: ARANGO_LICENSE_KEY: ${{ secrets.ARANGO_LICENSE_KEY }} STARTER_MODE: ${{matrix.topology}} DOCKER_IMAGE: ${{matrix.docker-img}} + STARTER_DOCKER_IMAGE: ${{matrix.starter-docker-img}} - name: Set JWT run: | ENDPOINT=$(./docker/find_active_endpoint.sh) diff --git a/docker/start_db.sh b/docker/start_db.sh index 9da44fd1c..b8b012637 100755 --- a/docker/start_db.sh +++ b/docker/start_db.sh @@ -3,6 +3,7 @@ # Configuration environment variables: # STARTER_MODE: (single|cluster|activefailover), default single # DOCKER_IMAGE: ArangoDB docker image, default docker.io/arangodb/arangodb:latest +# STARTER_DOCKER_IMAGE: ArangoDB Starter docker image, default docker.io/arangodb/arangodb-starter:latest # SSL: (true|false), default false # ARANGO_LICENSE_KEY: only required for ArangoDB Enterprise @@ -11,10 +12,10 @@ STARTER_MODE=${STARTER_MODE:=single} DOCKER_IMAGE=${DOCKER_IMAGE:=docker.io/arangodb/arangodb:latest} +STARTER_DOCKER_IMAGE=${STARTER_DOCKER_IMAGE:=docker.io/arangodb/arangodb-starter:latest} SSL=${SSL:=false} COMPRESSION=${COMPRESSION:=false} -STARTER_DOCKER_IMAGE=docker.io/arangodb/arangodb-starter:latest GW=172.28.0.1 docker network create arangodb --subnet 172.28.0.0/16 From f7f371d97ca61755c2f2e14b8a3655c0b372ab23 Mon Sep 17 00:00:00 2001 From: Michele Rastelli Date: Wed, 22 May 2024 21:49:26 +0200 Subject: [PATCH 5/8] fixed tests --- driver/src/test/java/com/arangodb/ArangoDBAsyncTest.java | 1 + 1 file changed, 1 insertion(+) diff --git a/driver/src/test/java/com/arangodb/ArangoDBAsyncTest.java b/driver/src/test/java/com/arangodb/ArangoDBAsyncTest.java index 19f663805..a97f5f08b 100644 --- a/driver/src/test/java/com/arangodb/ArangoDBAsyncTest.java +++ b/driver/src/test/java/com/arangodb/ArangoDBAsyncTest.java @@ -234,6 +234,7 @@ void getAccessibleDatabasesFor(ArangoDBAsync arangoDB) throws ExecutionException @ParameterizedTest @MethodSource("asyncArangos") void createUser(ArangoDBAsync arangoDB) throws ExecutionException, InterruptedException { + assumeTrue(isSingleServer()); String username = "user-" + UUID.randomUUID(); final UserEntity result = arangoDB.createUser(username, PW, null).get(); assertThat(result.getUser()).isEqualTo(username); From 3f95a855e74e326255584389478cc46b69a9893b Mon Sep 17 00:00:00 2001 From: Michele Rastelli Date: Fri, 7 Jun 2024 12:31:53 +0200 Subject: [PATCH 6/8] reverted managed Vert.x instances to single thread event loop --- core/src/main/java/com/arangodb/ArangoDB.java | 3 +- .../com/arangodb/internal/ArangoDBImpl.java | 7 +---- .../internal/net/ConnectionFactory.java | 4 --- .../com/arangodb/http/HttpConnection.java | 30 +++++++++++++++++-- .../arangodb/http/HttpConnectionFactory.java | 16 +--------- .../vst/VstConnectionFactoryAsync.java | 5 ---- 6 files changed, 31 insertions(+), 34 deletions(-) diff --git a/core/src/main/java/com/arangodb/ArangoDB.java b/core/src/main/java/com/arangodb/ArangoDB.java index 755c551f9..484f752ab 100644 --- a/core/src/main/java/com/arangodb/ArangoDB.java +++ b/core/src/main/java/com/arangodb/ArangoDB.java @@ -394,8 +394,7 @@ public ArangoDB build() { return new ArangoDBImpl( config, protocol, - hostHandler, - connectionFactory + hostHandler ); } diff --git a/core/src/main/java/com/arangodb/internal/ArangoDBImpl.java b/core/src/main/java/com/arangodb/internal/ArangoDBImpl.java index 3899a4d09..7554c9919 100644 --- a/core/src/main/java/com/arangodb/internal/ArangoDBImpl.java +++ b/core/src/main/java/com/arangodb/internal/ArangoDBImpl.java @@ -24,7 +24,6 @@ import com.arangodb.entity.*; import com.arangodb.internal.config.ArangoConfig; import com.arangodb.internal.net.CommunicationProtocol; -import com.arangodb.internal.net.ConnectionFactory; import com.arangodb.internal.net.HostHandler; import com.arangodb.internal.serde.SerdeUtils; import com.arangodb.model.*; @@ -42,15 +41,12 @@ public class ArangoDBImpl extends InternalArangoDB implements ArangoDB { private static final Logger LOGGER = LoggerFactory.getLogger(ArangoDBImpl.class); private final HostHandler hostHandler; - private final ConnectionFactory connectionFactory; public ArangoDBImpl(final ArangoConfig config, final CommunicationProtocol protocol, - final HostHandler hostHandler, - final ConnectionFactory connectionFactory) { + final HostHandler hostHandler) { super(protocol, config); this.hostHandler = hostHandler; - this.connectionFactory = connectionFactory; LOGGER.debug("ArangoDB Client is ready to use"); } @@ -62,7 +58,6 @@ public ArangoDBAsync async() { @Override public void shutdown() { executorSync().disconnect(); - connectionFactory.close(); } @Override diff --git a/core/src/main/java/com/arangodb/internal/net/ConnectionFactory.java b/core/src/main/java/com/arangodb/internal/net/ConnectionFactory.java index 3cabeaeec..b0fbbdf7b 100644 --- a/core/src/main/java/com/arangodb/internal/net/ConnectionFactory.java +++ b/core/src/main/java/com/arangodb/internal/net/ConnectionFactory.java @@ -30,8 +30,4 @@ @UsedInApi public interface ConnectionFactory { Connection create(ArangoConfig config, HostDescription host); - - default void close() { - // keep backward compatibility - } } diff --git a/http/src/main/java/com/arangodb/http/HttpConnection.java b/http/src/main/java/com/arangodb/http/HttpConnection.java index f32ae9de0..b5a57c9a6 100644 --- a/http/src/main/java/com/arangodb/http/HttpConnection.java +++ b/http/src/main/java/com/arangodb/http/HttpConnection.java @@ -37,6 +37,7 @@ import io.netty.handler.ssl.JdkSslContext; import io.vertx.core.MultiMap; import io.vertx.core.Vertx; +import io.vertx.core.VertxOptions; import io.vertx.core.buffer.Buffer; import io.vertx.core.http.HttpHeaders; import io.vertx.core.http.HttpMethod; @@ -49,6 +50,8 @@ import io.vertx.ext.web.client.HttpResponse; import io.vertx.ext.web.client.WebClient; import io.vertx.ext.web.client.WebClientOptions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import javax.net.ssl.SSLContext; import java.security.NoSuchAlgorithmException; @@ -58,6 +61,7 @@ import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; /** @@ -66,21 +70,24 @@ */ @UnstableApi public class HttpConnection implements Connection { + private static final Logger LOGGER = LoggerFactory.getLogger(HttpConnection.class); private static final String CONTENT_TYPE_APPLICATION_JSON_UTF8 = "application/json; charset=utf-8"; private static final String CONTENT_TYPE_VPACK = "application/x-velocypack"; private static final String USER_AGENT = getUserAgent(); + private static final AtomicInteger THREAD_COUNT = new AtomicInteger(); private volatile String auth; private final int compressionThreshold; private final Encoder encoder; private final WebClient client; private final Integer timeout; private final MultiMap commonHeaders = MultiMap.caseInsensitiveMultiMap(); + private final Vertx vertxToClose; private static String getUserAgent() { return "JavaDriver/" + PackageVersion.VERSION + " (JVM/" + System.getProperty("java.specification.version") + ")"; } - HttpConnection(final ArangoConfig config, final HostDescription host, final Vertx vertx) { + HttpConnection(final ArangoConfig config, final HostDescription host, final Vertx existingVertx) { super(); Protocol protocol = config.getProtocol(); ContentType contentType = ContentTypeFactory.of(protocol); @@ -105,6 +112,22 @@ private static String getUserAgent() { config.getUser(), Optional.ofNullable(config.getPassword()).orElse("") ).toHttpAuthorization(); + Vertx vertxToUse; + if (existingVertx != null) { + // reuse existing Vert.x + vertxToUse = existingVertx; + // Vert.x will not be closed when connection is closed + vertxToClose = null; + LOGGER.info("Reusing existing Vert.x instance"); + } else { + // create a new Vert.x instance + LOGGER.info("Creating new Vert.x instance"); + vertxToUse = Vertx.vertx(new VertxOptions().setPreferNativeTransport(true).setEventLoopPoolSize(1)); + vertxToUse.runOnContext(e -> Thread.currentThread().setName("adb-http-" + THREAD_COUNT.getAndIncrement())); + // Vert.x be closed when connection is closed + vertxToClose = vertxToUse; + } + int intTtl = Optional.ofNullable(config.getConnectionTtl()) .map(ttl -> Math.toIntExact(ttl / 1000)) .orElse(0); @@ -181,7 +204,7 @@ public SslContextFactory sslContextFactory() { }); } - client = WebClient.create(vertx, webClientOptions); + client = WebClient.create(vertxToUse, webClientOptions); } private static String buildUrl(final InternalRequest request) { @@ -217,6 +240,9 @@ private static void addHeader(final InternalRequest request, final HttpRequest existingVertx = Optional.ofNullable(Vertx.currentContext()).map(Context::owner); if (config.getReuseVertx() && existingVertx.isPresent()) { - LOGGER.info("Reusing existing Vert.x instance"); vertx = existingVertx.get(); - manageVertx = false; } else { if (existingVertx.isPresent()) { LOGGER.warn("Found an existing Vert.x instance, set reuseVertx=true to reuse it"); } - LOGGER.info("Creating new Vert.x instance"); - vertx = Vertx.vertx(new VertxOptions().setPreferNativeTransport(true)); - manageVertx = true; + vertx = null; } } @@ -61,12 +55,4 @@ public HttpConnectionFactory(@UnstableApi final ArangoConfig config) { public Connection create(@UnstableApi final ArangoConfig config, final HostDescription host) { return new HttpConnection(config, host, vertx); } - - @Override - public synchronized void close() { - if (manageVertx) { - LOGGER.info("Closing Vert.x instance"); - vertx.close(); - } - } } diff --git a/vst/src/main/java/com/arangodb/vst/VstConnectionFactoryAsync.java b/vst/src/main/java/com/arangodb/vst/VstConnectionFactoryAsync.java index 24e29c497..f0faca44f 100644 --- a/vst/src/main/java/com/arangodb/vst/VstConnectionFactoryAsync.java +++ b/vst/src/main/java/com/arangodb/vst/VstConnectionFactoryAsync.java @@ -39,9 +39,4 @@ public Connection create(@UnstableApi final ArangoConfig config, final HostDescr return new VstConnectionAsync(config, host); } - @Override - public void close() { - // no-op - } - } From c43c767877228a2e34db316d609bb84a2347a874 Mon Sep 17 00:00:00 2001 From: Michele Rastelli Date: Fri, 7 Jun 2024 13:15:46 +0200 Subject: [PATCH 7/8] changed log levels to debug --- http/src/main/java/com/arangodb/http/HttpConnection.java | 5 +++-- .../src/test/java/resilience/vertx/VertxTest.java | 9 ++++++--- 2 files changed, 9 insertions(+), 5 deletions(-) diff --git a/http/src/main/java/com/arangodb/http/HttpConnection.java b/http/src/main/java/com/arangodb/http/HttpConnection.java index b5a57c9a6..bb3e0f441 100644 --- a/http/src/main/java/com/arangodb/http/HttpConnection.java +++ b/http/src/main/java/com/arangodb/http/HttpConnection.java @@ -118,10 +118,10 @@ private static String getUserAgent() { vertxToUse = existingVertx; // Vert.x will not be closed when connection is closed vertxToClose = null; - LOGGER.info("Reusing existing Vert.x instance"); + LOGGER.debug("Reusing existing Vert.x instance"); } else { // create a new Vert.x instance - LOGGER.info("Creating new Vert.x instance"); + LOGGER.debug("Creating new Vert.x instance"); vertxToUse = Vertx.vertx(new VertxOptions().setPreferNativeTransport(true).setEventLoopPoolSize(1)); vertxToUse.runOnContext(e -> Thread.currentThread().setName("adb-http-" + THREAD_COUNT.getAndIncrement())); // Vert.x be closed when connection is closed @@ -241,6 +241,7 @@ private static void addHeader(final InternalRequest request, final HttpRequest it.getLoggerName().equals("com.arangodb.http.HttpConnectionFactory")) + .filteredOn(it -> it.getLoggerName().equals("com.arangodb.http.HttpConnection")) .map(ILoggingEvent::getFormattedMessage) .anySatisfy(it -> assertThat(it).contains("Creating new Vert.x instance")) .anySatisfy(it -> assertThat(it).contains("Closing Vert.x instance")); @@ -45,7 +45,7 @@ void reuseVertx() throws ExecutionException, InterruptedException { vertx.close(); assertThat(logs.getLogs()) - .filteredOn(it -> it.getLoggerName().equals("com.arangodb.http.HttpConnectionFactory")) + .filteredOn(it -> it.getLoggerName().equals("com.arangodb.http.HttpConnection")) .map(ILoggingEvent::getFormattedMessage) .anySatisfy(it -> assertThat(it).contains("Reusing existing Vert.x instance")); } @@ -68,7 +68,10 @@ void notReuseVertx() throws ExecutionException, InterruptedException { assertThat(logs.getLogs()) .filteredOn(it -> it.getLoggerName().equals("com.arangodb.http.HttpConnectionFactory")) .map(ILoggingEvent::getFormattedMessage) - .anySatisfy(it -> assertThat(it).contains("Found an existing Vert.x instance, set reuseVertx=true to reuse it")) + .anySatisfy(it -> assertThat(it).contains("Found an existing Vert.x instance, set reuseVertx=true to reuse it")); + assertThat(logs.getLogs()) + .filteredOn(it -> it.getLoggerName().equals("com.arangodb.http.HttpConnection")) + .map(ILoggingEvent::getFormattedMessage) .anySatisfy(it -> assertThat(it).contains("Creating new Vert.x instance")) .anySatisfy(it -> assertThat(it).contains("Closing Vert.x instance")); } From b74cbec0d5c418cd3e5132d0028149ce011a16f4 Mon Sep 17 00:00:00 2001 From: Michele Rastelli Date: Fri, 7 Jun 2024 15:00:57 +0200 Subject: [PATCH 8/8] added HttpProtocolConfig to allow configuring the Vert.x instance to use --- core/src/main/java/com/arangodb/ArangoDB.java | 11 +++-- .../com/arangodb/config/ProtocolConfig.java | 7 ++++ .../com/arangodb/internal/ArangoDefaults.java | 1 - .../internal/config/ArangoConfig.java | 12 +++--- .../internal/net/ProtocolProvider.java | 5 ++- .../java/com/arangodb/ArangoConfigTest.java | 12 +++++- .../arangodb/http/HttpConnectionFactory.java | 21 ++++------ .../com/arangodb/http/HttpProtocolConfig.java | 42 +++++++++++++++++++ .../arangodb/http/HttpProtocolProvider.java | 5 ++- .../test/java/resilience/vertx/VertxTest.java | 37 +++++++++++++--- 10 files changed, 117 insertions(+), 36 deletions(-) create mode 100644 core/src/main/java/com/arangodb/config/ProtocolConfig.java create mode 100644 http/src/main/java/com/arangodb/http/HttpProtocolConfig.java diff --git a/core/src/main/java/com/arangodb/ArangoDB.java b/core/src/main/java/com/arangodb/ArangoDB.java index 484f752ab..ad9b3bf43 100644 --- a/core/src/main/java/com/arangodb/ArangoDB.java +++ b/core/src/main/java/com/arangodb/ArangoDB.java @@ -23,6 +23,7 @@ import com.arangodb.arch.UnstableApi; import com.arangodb.config.ArangoConfigProperties; import com.arangodb.config.HostDescription; +import com.arangodb.config.ProtocolConfig; import com.arangodb.entity.*; import com.arangodb.internal.ArangoDBImpl; import com.arangodb.internal.ArangoExecutorSync; @@ -381,7 +382,7 @@ public ArangoDB build() { ProtocolProvider protocolProvider = protocolProvider(config.getProtocol()); config.setProtocolModule(protocolProvider.protocolModule()); - ConnectionFactory connectionFactory = protocolProvider.createConnectionFactory(config); + ConnectionFactory connectionFactory = protocolProvider.createConnectionFactory(config.getProtocolConfig()); Collection hostList = createHostList(connectionFactory); HostResolver hostResolver = createHostResolver(hostList, connectionFactory); HostHandler hostHandler = createHostHandler(hostResolver); @@ -681,13 +682,11 @@ public Builder compressionLevel(Integer level) { } /** - * Sets whether to reuse the Vert.x instance owning the current thread Vert.x context. - * - * @param reuseVertx whether to reuse the Vert.x instance (default: {@code false}) + * Configuration specific for {@link com.arangodb.internal.net.ProtocolProvider}. * @return {@link ArangoDB.Builder} */ - public Builder reuseVertx(Boolean reuseVertx) { - config.setReuseVertx(reuseVertx); + public Builder protocolConfig(ProtocolConfig protocolConfig) { + config.setProtocolConfig(protocolConfig); return this; } diff --git a/core/src/main/java/com/arangodb/config/ProtocolConfig.java b/core/src/main/java/com/arangodb/config/ProtocolConfig.java new file mode 100644 index 000000000..54432800d --- /dev/null +++ b/core/src/main/java/com/arangodb/config/ProtocolConfig.java @@ -0,0 +1,7 @@ +package com.arangodb.config; + +/** + * Configuration specific for {@link com.arangodb.internal.net.ProtocolProvider}. + */ +public interface ProtocolConfig { +} diff --git a/core/src/main/java/com/arangodb/internal/ArangoDefaults.java b/core/src/main/java/com/arangodb/internal/ArangoDefaults.java index 8c8bcffbb..b08c045da 100644 --- a/core/src/main/java/com/arangodb/internal/ArangoDefaults.java +++ b/core/src/main/java/com/arangodb/internal/ArangoDefaults.java @@ -54,7 +54,6 @@ public final class ArangoDefaults { public static final Integer DEFAULT_ACQUIRE_HOST_LIST_INTERVAL = 60 * 60 * 1000; // hour public static final LoadBalancingStrategy DEFAULT_LOAD_BALANCING_STRATEGY = LoadBalancingStrategy.NONE; public static final Integer DEFAULT_RESPONSE_QUEUE_TIME_SAMPLES = 10; - public static final Boolean DEFAULT_REUSE_VERTX = false; // region compression public static final Compression DEFAULT_COMPRESSION = Compression.NONE; diff --git a/core/src/main/java/com/arangodb/internal/config/ArangoConfig.java b/core/src/main/java/com/arangodb/internal/config/ArangoConfig.java index e69debe3f..da6bc176b 100644 --- a/core/src/main/java/com/arangodb/internal/config/ArangoConfig.java +++ b/core/src/main/java/com/arangodb/internal/config/ArangoConfig.java @@ -7,6 +7,7 @@ import com.arangodb.arch.UsedInApi; import com.arangodb.config.ArangoConfigProperties; import com.arangodb.config.HostDescription; +import com.arangodb.config.ProtocolConfig; import com.arangodb.entity.LoadBalancingStrategy; import com.arangodb.internal.ArangoDefaults; import com.arangodb.internal.serde.ContentTypeFactory; @@ -49,7 +50,7 @@ public class ArangoConfig { private Compression compression; private Integer compressionThreshold; private Integer compressionLevel; - private Boolean reuseVertx; + private ProtocolConfig protocolConfig; private static final Logger LOG = LoggerFactory.getLogger(ArangoConfig.class); @@ -113,7 +114,6 @@ public void loadProperties(final ArangoConfigProperties properties) { compression = properties.getCompression().orElse(ArangoDefaults.DEFAULT_COMPRESSION); compressionThreshold = properties.getCompressionThreshold().orElse(ArangoDefaults.DEFAULT_COMPRESSION_THRESHOLD); compressionLevel = properties.getCompressionLevel().orElse(ArangoDefaults.DEFAULT_COMPRESSION_LEVEL); - reuseVertx = properties.getReuseVertx().orElse(ArangoDefaults.DEFAULT_REUSE_VERTX); } public List getHosts() { @@ -332,11 +332,11 @@ public void setCompressionLevel(Integer compressionLevel) { this.compressionLevel = compressionLevel; } - public Boolean getReuseVertx() { - return reuseVertx; + public ProtocolConfig getProtocolConfig() { + return protocolConfig; } - public void setReuseVertx(Boolean reuseVertx) { - this.reuseVertx = reuseVertx; + public void setProtocolConfig(ProtocolConfig protocolConfig) { + this.protocolConfig = protocolConfig; } } diff --git a/core/src/main/java/com/arangodb/internal/net/ProtocolProvider.java b/core/src/main/java/com/arangodb/internal/net/ProtocolProvider.java index 8372806da..9420a1cb5 100644 --- a/core/src/main/java/com/arangodb/internal/net/ProtocolProvider.java +++ b/core/src/main/java/com/arangodb/internal/net/ProtocolProvider.java @@ -3,6 +3,7 @@ import com.arangodb.Protocol; import com.arangodb.arch.UsedInApi; +import com.arangodb.config.ProtocolConfig; import com.arangodb.internal.config.ArangoConfig; import com.fasterxml.jackson.databind.Module; @@ -12,14 +13,14 @@ public interface ProtocolProvider { boolean supportsProtocol(Protocol protocol); /** - * @deprecated use {@link #createConnectionFactory(ArangoConfig)} instead + * @deprecated use {@link #createConnectionFactory(ProtocolConfig)} instead */ @Deprecated default ConnectionFactory createConnectionFactory() { throw new UnsupportedOperationException(); } - default ConnectionFactory createConnectionFactory(ArangoConfig config) { + default ConnectionFactory createConnectionFactory(ProtocolConfig config) { return createConnectionFactory(); } diff --git a/driver/src/test/java/com/arangodb/ArangoConfigTest.java b/driver/src/test/java/com/arangodb/ArangoConfigTest.java index cf7b62e9d..b0acf1181 100644 --- a/driver/src/test/java/com/arangodb/ArangoConfigTest.java +++ b/driver/src/test/java/com/arangodb/ArangoConfigTest.java @@ -1,5 +1,6 @@ package com.arangodb; +import com.arangodb.http.HttpProtocolConfig; import com.arangodb.internal.ArangoDefaults; import com.arangodb.internal.config.ArangoConfig; import org.junit.jupiter.api.Test; @@ -8,7 +9,7 @@ public class ArangoConfigTest { @Test - void defaultValues() { + void ArangoConfigDefaultValues() { ArangoConfig cfg = new ArangoConfig(); assertThat(cfg.getHosts()).isEqualTo(ArangoDefaults.DEFAULT_HOSTS); assertThat(cfg.getProtocol()).isEqualTo(Protocol.HTTP2_JSON); @@ -31,6 +32,13 @@ void defaultValues() { assertThat(cfg.getCompression()).isEqualTo(ArangoDefaults.DEFAULT_COMPRESSION); assertThat(cfg.getCompressionThreshold()).isEqualTo(ArangoDefaults.DEFAULT_COMPRESSION_THRESHOLD); assertThat(cfg.getCompressionLevel()).isEqualTo(ArangoDefaults.DEFAULT_COMPRESSION_LEVEL); - assertThat(cfg.getReuseVertx()).isEqualTo(ArangoDefaults.DEFAULT_REUSE_VERTX); + assertThat(cfg.getProtocolConfig()).isNull(); } + + @Test + void HttpProtocolConfigDefaultValues() { + HttpProtocolConfig cfg = HttpProtocolConfig.builder().build(); + assertThat(cfg.getVertx()).isNull(); + } + } diff --git a/http/src/main/java/com/arangodb/http/HttpConnectionFactory.java b/http/src/main/java/com/arangodb/http/HttpConnectionFactory.java index 105db6155..03719882a 100644 --- a/http/src/main/java/com/arangodb/http/HttpConnectionFactory.java +++ b/http/src/main/java/com/arangodb/http/HttpConnectionFactory.java @@ -25,28 +25,25 @@ import com.arangodb.internal.config.ArangoConfig; import com.arangodb.internal.net.Connection; import com.arangodb.internal.net.ConnectionFactory; -import io.vertx.core.Context; import io.vertx.core.Vertx; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.Optional; - @UnstableApi public class HttpConnectionFactory implements ConnectionFactory { private final Logger LOGGER = LoggerFactory.getLogger(HttpConnectionFactory.class); private final Vertx vertx; - public HttpConnectionFactory(@UnstableApi final ArangoConfig config) { - Optional existingVertx = Optional.ofNullable(Vertx.currentContext()).map(Context::owner); - if (config.getReuseVertx() && existingVertx.isPresent()) { - vertx = existingVertx.get(); - } else { - if (existingVertx.isPresent()) { - LOGGER.warn("Found an existing Vert.x instance, set reuseVertx=true to reuse it"); - } - vertx = null; + public HttpConnectionFactory(@UnstableApi final HttpProtocolConfig config) { + HttpProtocolConfig cfg = config != null ? config : HttpProtocolConfig.builder().build(); + vertx = cfg.getVertx(); + if (vertx == null && Vertx.currentContext() != null) { + LOGGER.warn("Found an existing Vert.x instance, you can reuse it by setting:\n" + + "new ArangoDB.Builder()\n" + + " // ...\n" + + " .protocolConfig(HttpProtocolConfig.builder().vertx(Vertx.currentContext().owner()).build())\n" + + " .build();\n"); } } diff --git a/http/src/main/java/com/arangodb/http/HttpProtocolConfig.java b/http/src/main/java/com/arangodb/http/HttpProtocolConfig.java new file mode 100644 index 000000000..26c47f825 --- /dev/null +++ b/http/src/main/java/com/arangodb/http/HttpProtocolConfig.java @@ -0,0 +1,42 @@ +package com.arangodb.http; + +import com.arangodb.config.ProtocolConfig; +import io.vertx.core.Vertx; + +public final class HttpProtocolConfig implements ProtocolConfig { + private final Vertx vertx; + + public static Builder builder() { + return new Builder(); + } + + public static class Builder { + private Vertx vertx; + + private Builder() { + } + + /** + * Set the Vert.x instance to use for creating HTTP connections. + * + * @param vertx the Vert.x instance to use + * @return this builder + */ + public Builder vertx(Vertx vertx) { + this.vertx = vertx; + return this; + } + + public HttpProtocolConfig build() { + return new HttpProtocolConfig(vertx); + } + } + + private HttpProtocolConfig(Vertx vertx) { + this.vertx = vertx; + } + + public Vertx getVertx() { + return vertx; + } +} diff --git a/http/src/main/java/com/arangodb/http/HttpProtocolProvider.java b/http/src/main/java/com/arangodb/http/HttpProtocolProvider.java index 40bbf7d36..a85abe9d8 100644 --- a/http/src/main/java/com/arangodb/http/HttpProtocolProvider.java +++ b/http/src/main/java/com/arangodb/http/HttpProtocolProvider.java @@ -2,6 +2,7 @@ import com.arangodb.Protocol; import com.arangodb.arch.UnstableApi; +import com.arangodb.config.ProtocolConfig; import com.arangodb.internal.config.ArangoConfig; import com.arangodb.internal.net.CommunicationProtocol; import com.arangodb.internal.net.ConnectionFactory; @@ -22,8 +23,8 @@ public boolean supportsProtocol(Protocol protocol) { @Override @UnstableApi - public ConnectionFactory createConnectionFactory(@UnstableApi ArangoConfig config) { - return new HttpConnectionFactory(config); + public ConnectionFactory createConnectionFactory(@UnstableApi ProtocolConfig config) { + return new HttpConnectionFactory((HttpProtocolConfig) config); } @Override diff --git a/resilience-tests/src/test/java/resilience/vertx/VertxTest.java b/resilience-tests/src/test/java/resilience/vertx/VertxTest.java index ace443e9a..f4cd37bf3 100644 --- a/resilience-tests/src/test/java/resilience/vertx/VertxTest.java +++ b/resilience-tests/src/test/java/resilience/vertx/VertxTest.java @@ -1,7 +1,9 @@ package resilience.vertx; +import ch.qos.logback.classic.Level; import ch.qos.logback.classic.spi.ILoggingEvent; import com.arangodb.ArangoDB; +import com.arangodb.http.HttpProtocolConfig; import io.vertx.core.Vertx; import org.junit.jupiter.api.Test; import resilience.SingleServerTest; @@ -24,19 +26,39 @@ void managedVertx() { assertThat(logs.getLogs()) .filteredOn(it -> it.getLoggerName().equals("com.arangodb.http.HttpConnection")) + .filteredOn(it -> it.getLevel().equals(Level.DEBUG)) .map(ILoggingEvent::getFormattedMessage) .anySatisfy(it -> assertThat(it).contains("Creating new Vert.x instance")) .anySatisfy(it -> assertThat(it).contains("Closing Vert.x instance")); } @Test - void reuseVertx() throws ExecutionException, InterruptedException { + void reuseVertx() { + Vertx vertx = Vertx.vertx(); + ArangoDB adb = new ArangoDB.Builder() + .host("172.28.0.1", 8529) + .password("test") + .protocolConfig(HttpProtocolConfig.builder().vertx(vertx).build()) + .build(); + adb.getVersion(); + adb.shutdown(); + vertx.close(); + + assertThat(logs.getLogs()) + .filteredOn(it -> it.getLoggerName().equals("com.arangodb.http.HttpConnection")) + .filteredOn(it -> it.getLevel().equals(Level.DEBUG)) + .map(ILoggingEvent::getFormattedMessage) + .anySatisfy(it -> assertThat(it).contains("Reusing existing Vert.x instance")); + } + + @Test + void reuseVertxFromVertxThread() throws ExecutionException, InterruptedException { Vertx vertx = Vertx.vertx(); vertx.executeBlocking(() -> { ArangoDB adb = new ArangoDB.Builder() .host("172.28.0.1", 8529) .password("test") - .reuseVertx(true) + .protocolConfig(HttpProtocolConfig.builder().vertx(Vertx.currentContext().owner()).build()) .build(); adb.getVersion(); adb.shutdown(); @@ -46,18 +68,18 @@ void reuseVertx() throws ExecutionException, InterruptedException { assertThat(logs.getLogs()) .filteredOn(it -> it.getLoggerName().equals("com.arangodb.http.HttpConnection")) + .filteredOn(it -> it.getLevel().equals(Level.DEBUG)) .map(ILoggingEvent::getFormattedMessage) .anySatisfy(it -> assertThat(it).contains("Reusing existing Vert.x instance")); } @Test - void notReuseVertx() throws ExecutionException, InterruptedException { + void existingVertxNotUsed() throws ExecutionException, InterruptedException { Vertx vertx = Vertx.vertx(); vertx.executeBlocking(() -> { ArangoDB adb = new ArangoDB.Builder() .host("172.28.0.1", 8529) .password("test") - .reuseVertx(false) .build(); adb.getVersion(); adb.shutdown(); @@ -67,10 +89,15 @@ void notReuseVertx() throws ExecutionException, InterruptedException { assertThat(logs.getLogs()) .filteredOn(it -> it.getLoggerName().equals("com.arangodb.http.HttpConnectionFactory")) + .filteredOn(it -> it.getLevel().equals(Level.WARN)) .map(ILoggingEvent::getFormattedMessage) - .anySatisfy(it -> assertThat(it).contains("Found an existing Vert.x instance, set reuseVertx=true to reuse it")); + .anySatisfy(it -> assertThat(it) + .contains("Found an existing Vert.x instance, you can reuse it by setting:") + .contains(".protocolConfig(HttpProtocolConfig.builder().vertx(Vertx.currentContext().owner()).build())") + ); assertThat(logs.getLogs()) .filteredOn(it -> it.getLoggerName().equals("com.arangodb.http.HttpConnection")) + .filteredOn(it -> it.getLevel().equals(Level.DEBUG)) .map(ILoggingEvent::getFormattedMessage) .anySatisfy(it -> assertThat(it).contains("Creating new Vert.x instance")) .anySatisfy(it -> assertThat(it).contains("Closing Vert.x instance"));