Skip to content

[DE-1016] Async connection pool #602

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
<relativePath>../release-parent</relativePath>
<groupId>com.arangodb</groupId>
<artifactId>release-parent</artifactId>
<version>7.18.0</version>
<version>7.19.0-SNAPSHOT</version>
</parent>

<name>core</name>
Expand Down
11 changes: 11 additions & 0 deletions core/src/main/java/com/arangodb/ArangoDB.java
Original file line number Diff line number Diff line change
Expand Up @@ -517,6 +517,17 @@ public Builder chunkSize(final Integer chunkSize) {
return this;
}

/**
* Set whether to use requests pipelining in HTTP/1.1 ({@link Protocol#HTTP_JSON} or {@link Protocol#HTTP_VPACK}).
*
* @param pipelining {@code true} if enabled
* @return {@link ArangoDB.Builder}
*/
public Builder pipelining(final Boolean pipelining) {
config.setPipelining(pipelining);
return this;
}

/**
* Sets the maximum number of connections the built in connection pool will open per host.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ public interface ArangoConfigProperties {
String KEY_USE_SSL = "useSsl";
String KEY_VERIFY_HOST = "verifyHost";
String KEY_CHUNK_SIZE = "chunkSize";
String KEY_PIPELINING = "pipelining";
String KEY_MAX_CONNECTIONS = "maxConnections";
String KEY_CONNECTION_TTL = "connectionTtl";
String KEY_KEEP_ALIVE_INTERVAL = "keepAliveInterval";
Expand Down Expand Up @@ -110,6 +111,10 @@ default Optional<Integer> getChunkSize() {
return Optional.empty();
}

default Optional<Boolean> getPipelining() {
return Optional.empty();
}

default Optional<Integer> getMaxConnections() {
return Optional.empty();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ public final class ArangoDefaults {
public static final Boolean DEFAULT_USE_SSL = false;
public static final Boolean DEFAULT_VERIFY_HOST = true;
public static final Integer DEFAULT_CHUNK_SIZE = 30_000;
public static final Boolean DEFAULT_PIPELINING = false;
public static final Boolean DEFAULT_ACQUIRE_HOST_LIST = false;
public static final Integer DEFAULT_ACQUIRE_HOST_LIST_INTERVAL = 60 * 60 * 1000; // hour
public static final LoadBalancingStrategy DEFAULT_LOAD_BALANCING_STRATEGY = LoadBalancingStrategy.NONE;
Expand Down
10 changes: 10 additions & 0 deletions core/src/main/java/com/arangodb/internal/config/ArangoConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ public class ArangoConfig {
private SSLContext sslContext;
private Boolean verifyHost;
private Integer chunkSize;
private Boolean pipelining;
private Integer maxConnections;
private Long connectionTtl;
private Integer keepAliveInterval;
Expand Down Expand Up @@ -70,6 +71,7 @@ public void loadProperties(final ArangoConfigProperties properties) {
useSsl = properties.getUseSsl().orElse(ArangoDefaults.DEFAULT_USE_SSL);
verifyHost = properties.getVerifyHost().orElse(ArangoDefaults.DEFAULT_VERIFY_HOST);
chunkSize = properties.getChunkSize().orElse(ArangoDefaults.DEFAULT_CHUNK_SIZE);
pipelining = properties.getPipelining().orElse(ArangoDefaults.DEFAULT_PIPELINING);
// FIXME: make maxConnections field Optional
maxConnections = properties.getMaxConnections().orElse(null);
// FIXME: make connectionTtl field Optional
Expand Down Expand Up @@ -173,6 +175,14 @@ public void setChunkSize(Integer chunkSize) {
this.chunkSize = chunkSize;
}

public Boolean getPipelining() {
return pipelining;
}

public void setPipelining(Boolean pipelining) {
this.pipelining = pipelining;
}

public Integer getMaxConnections() {
if (maxConnections == null) {
maxConnections = getDefaultMaxConnections();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,11 @@ public Optional<Integer> getChunkSize() {
return Optional.ofNullable(getProperty(KEY_CHUNK_SIZE)).map(Integer::valueOf);
}

@Override
public Optional<Boolean> getPipelining() {
return Optional.ofNullable(getProperty(KEY_PIPELINING)).map(Boolean::valueOf);
}

@Override
public Optional<Integer> getMaxConnections() {
return Optional.ofNullable(getProperty(KEY_MAX_CONNECTIONS)).map(Integer::valueOf);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,9 @@ public CompletableFuture<InternalResponse> executeAsync(final InternalRequest re

private CompletableFuture<InternalResponse> executeAsync(final InternalRequest request, final HostHandle hostHandle, final Host host, final int attemptCount) {
long reqId = reqCount.getAndIncrement();
return doExecuteAsync(request, hostHandle, host, attemptCount, host.connection(), reqId);
return host.connection().thenCompose(c ->
doExecuteAsync(request, hostHandle, host, attemptCount, c, reqId)
.whenComplete((r, t) -> host.release(c)));
}

private CompletableFuture<InternalResponse> doExecuteAsync(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.arangodb.config.HostDescription;

import java.io.Closeable;
import java.util.concurrent.CompletableFuture;

/**
* @author Mark Vollmary
Expand All @@ -31,7 +32,9 @@ public interface ConnectionPool extends Closeable {

Connection createConnection(final HostDescription host);

Connection connection();
CompletableFuture<Connection> connection();

void release(final Connection connection);

void setJwt(String jwt);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,22 +23,28 @@
import com.arangodb.ArangoDBException;
import com.arangodb.config.HostDescription;
import com.arangodb.internal.config.ArangoConfig;
import com.arangodb.internal.util.AsyncQueue;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;

/**
* @author Mark Vollmary
*/
public class ConnectionPoolImpl implements ConnectionPool {

public static final int HTTP1_PIPELINING_LIMIT = 10;
public static final int HTTP2_STREAMS = 32; // hard-coded, see BTS-2049

private final AsyncQueue<Connection> slots = new AsyncQueue<>();
private final HostDescription host;
private final ArangoConfig config;
private final int maxConnections;
private final List<Connection> connections;
private final ConnectionFactory factory;
private int current;
private final int maxSlots;
private volatile String jwt = null;
private boolean closed = false;

Expand All @@ -49,7 +55,14 @@ public ConnectionPoolImpl(final HostDescription host, final ArangoConfig config,
this.maxConnections = config.getMaxConnections();
this.factory = factory;
connections = new ArrayList<>();
current = 0;
switch (config.getProtocol()) {
case HTTP_JSON:
case HTTP_VPACK:
maxSlots = config.getPipelining() ? HTTP1_PIPELINING_LIMIT : 1;
break;
default:
maxSlots = HTTP2_STREAMS;
}
}

@Override
Expand All @@ -60,23 +73,25 @@ public Connection createConnection(final HostDescription host) {
}

@Override
public synchronized Connection connection() {
public synchronized CompletableFuture<Connection> connection() {
if (closed) {
throw new ArangoDBException("Connection pool already closed!");
}

final Connection connection;

if (connections.size() < maxConnections) {
connection = createConnection(host);
Connection connection = createConnection(host);
connections.add(connection);
current++;
} else {
final int index = Math.floorMod(current++, connections.size());
connection = connections.get(index);
for (int i = 0; i < maxSlots; i++) {
slots.offer((connection));
}
}

return connection;
return slots.poll();
}

@Override
public void release(Connection connection) {
slots.offer(connection);
}

@Override
Expand All @@ -101,7 +116,7 @@ public synchronized void close() throws IOException {
@Override
public String toString() {
return "ConnectionPoolImpl [host=" + host + ", maxConnections=" + maxConnections + ", connections="
+ connections.size() + ", current=" + current + ", factory=" + factory.getClass().getSimpleName() + "]";
+ connections.size() + ", factory=" + factory.getClass().getSimpleName() + "]";
}

}
6 changes: 4 additions & 2 deletions core/src/main/java/com/arangodb/internal/net/Host.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.arangodb.config.HostDescription;

import java.io.IOException;
import java.util.concurrent.CompletableFuture;

/**
* @author Mark Vollmary
Expand All @@ -33,7 +34,9 @@ public interface Host {

HostDescription getDescription();

Connection connection();
CompletableFuture<Connection> connection();

void release(Connection c);

void closeOnError();

Expand All @@ -44,5 +47,4 @@ public interface Host {
void setMarkforDeletion(boolean markforDeletion);

void setJwt(String jwt);

}
8 changes: 7 additions & 1 deletion core/src/main/java/com/arangodb/internal/net/HostImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.arangodb.config.HostDescription;

import java.io.IOException;
import java.util.concurrent.CompletableFuture;

/**
* @author Mark Vollmary
Expand Down Expand Up @@ -51,10 +52,15 @@ public HostDescription getDescription() {
}

@Override
public Connection connection() {
public CompletableFuture<Connection> connection() {
return connectionPool.connection();
}

@Override
public void release(Connection c) {
connectionPool.release(c);
}

@Override
public void closeOnError() {
try {
Expand Down
30 changes: 30 additions & 0 deletions core/src/main/java/com/arangodb/internal/util/AsyncQueue.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package com.arangodb.internal.util;

import java.util.ArrayDeque;
import java.util.Queue;
import java.util.concurrent.CompletableFuture;

public class AsyncQueue<T> {
private final Queue<CompletableFuture<T>> requests = new ArrayDeque<>();
private final Queue<T> offers = new ArrayDeque<>();

public synchronized CompletableFuture<T> poll() {
CompletableFuture<T> r = new CompletableFuture<>();
T o = offers.poll();
if (o != null) {
r.complete(o);
} else {
requests.add(r);
}
return r;
}

public synchronized void offer(T o) {
CompletableFuture<T> r = requests.poll();
if (r != null) {
r.complete(o);
} else {
offers.add(o);
}
}
}
2 changes: 1 addition & 1 deletion docker/start_db.sh
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ docker run -d \
--starter.address="${GW}" \
--docker.image="${DOCKER_IMAGE}" \
--starter.local --starter.mode=${STARTER_MODE} --all.log.level=debug --all.log.output=+ --log.verbose \
--all.server.descriptors-minimum=1024 --all.javascript.allow-admin-execute=true
--all.server.descriptors-minimum=1024 --all.javascript.allow-admin-execute=true --all.server.maximal-threads=128


wait_server() {
Expand Down
2 changes: 1 addition & 1 deletion driver/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
<relativePath>../release-parent</relativePath>
<groupId>com.arangodb</groupId>
<artifactId>release-parent</artifactId>
<version>7.18.0</version>
<version>7.19.0-SNAPSHOT</version>
</parent>

<name>arangodb-java-driver</name>
Expand Down
2 changes: 1 addition & 1 deletion http-protocol/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
<relativePath>../release-parent</relativePath>
<groupId>com.arangodb</groupId>
<artifactId>release-parent</artifactId>
<version>7.18.0</version>
<version>7.19.0-SNAPSHOT</version>
</parent>

<name>http-protocol</name>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,9 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import static com.arangodb.internal.net.ConnectionPoolImpl.HTTP1_PIPELINING_LIMIT;
import static com.arangodb.internal.net.ConnectionPoolImpl.HTTP2_STREAMS;


/**
* @author Mark Vollmary
Expand All @@ -88,7 +91,6 @@ private static String getUserAgent() {
}

HttpConnection(final ArangoConfig config, final HostDescription host, final HttpProtocolConfig protocolConfig) {
super();
Protocol protocol = config.getProtocol();
ContentType contentType = ContentTypeFactory.of(protocol);
if (contentType == ContentType.VPACK) {
Expand Down Expand Up @@ -148,7 +150,9 @@ private static String getUserAgent() {
.setLogActivity(true)
.setKeepAlive(true)
.setTcpKeepAlive(true)
.setPipelining(true)
.setPipelining(config.getPipelining())
.setPipeliningLimit(HTTP1_PIPELINING_LIMIT)
.setHttp2MultiplexingLimit(HTTP2_STREAMS)
.setReuseAddress(true)
.setReusePort(true)
.setHttp2ClearTextUpgrade(false)
Expand Down Expand Up @@ -273,7 +277,7 @@ public CompletableFuture<InternalResponse> executeAsync(@UnstableApi final Inter
return rfuture;
}

public void doExecute(@UnstableApi final InternalRequest request, @UnstableApi final CompletableFuture<InternalResponse> rfuture) {
private void doExecute(@UnstableApi final InternalRequest request, @UnstableApi final CompletableFuture<InternalResponse> rfuture) {
String path = buildUrl(request);
HttpRequest<Buffer> httpRequest = client
.request(requestTypeToHttpMethod(request.getRequestType()), path)
Expand Down
2 changes: 1 addition & 1 deletion jackson-serde-json/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
<relativePath>../release-parent</relativePath>
<groupId>com.arangodb</groupId>
<artifactId>release-parent</artifactId>
<version>7.18.0</version>
<version>7.19.0-SNAPSHOT</version>
</parent>

<name>jackson-serde-json</name>
Expand Down
2 changes: 1 addition & 1 deletion jackson-serde-vpack/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
<relativePath>../release-parent</relativePath>
<groupId>com.arangodb</groupId>
<artifactId>release-parent</artifactId>
<version>7.18.0</version>
<version>7.19.0-SNAPSHOT</version>
</parent>

<name>jackson-serde-vpack</name>
Expand Down
2 changes: 1 addition & 1 deletion jsonb-serde/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
<relativePath>../release-parent</relativePath>
<groupId>com.arangodb</groupId>
<artifactId>release-parent</artifactId>
<version>7.18.0</version>
<version>7.19.0-SNAPSHOT</version>
</parent>

<name>jsonb-serde</name>
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

<groupId>com.arangodb</groupId>
<artifactId>arangodb-java-driver-parent</artifactId>
<version>7.18.0</version>
<version>7.19.0-SNAPSHOT</version>
<inceptionYear>2016</inceptionYear>
<modules>
<module>release-parent</module>
Expand Down
Loading