Skip to content

Commit deba97d

Browse files
committed
async downstream executor (DE-697)
1 parent 7c9601d commit deba97d

File tree

7 files changed

+177
-18
lines changed

7 files changed

+177
-18
lines changed

core/src/main/java/com/arangodb/ArangoDB.java

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -340,11 +340,6 @@ public interface ArangoDB extends ArangoSerdeAccessor {
340340
*/
341341
class Builder extends InternalArangoDBBuilder<Builder> {
342342

343-
public Builder protocol(final Protocol protocol) {
344-
config.setProtocol(protocol);
345-
return this;
346-
}
347-
348343
/**
349344
* Returns an instance of {@link ArangoDB}.
350345
*

core/src/main/java/com/arangodb/internal/ArangoExecutorAsync.java

Lines changed: 20 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -27,16 +27,20 @@
2727

2828
import java.lang.reflect.Type;
2929
import java.util.concurrent.CompletableFuture;
30+
import java.util.concurrent.Executor;
31+
import java.util.function.Function;
3032
import java.util.function.Supplier;
3133

3234
/**
3335
* @author Mark Vollmary
3436
* @author Michele Rastelli
3537
*/
3638
public class ArangoExecutorAsync extends ArangoExecutor {
39+
private final Executor downstreamExecutor;
3740

3841
public ArangoExecutorAsync(final CommunicationProtocol protocol, final ArangoConfig config) {
3942
super(protocol, config);
43+
downstreamExecutor = config.getAsyncExecutor();
4044
}
4145

4246
public <T> CompletableFuture<T> execute(final Supplier<InternalRequest> requestSupplier, final Type type) {
@@ -56,18 +60,23 @@ public <T> CompletableFuture<T> execute(
5660
final ResponseDeserializer<T> responseDeserializer,
5761
final HostHandle hostHandle) {
5862

59-
return CompletableFuture.completedFuture(requestSupplier)
63+
CompletableFuture<T> cf = CompletableFuture.completedFuture(requestSupplier)
6064
.thenApply(Supplier::get)
61-
.thenCompose(request -> protocol.executeAsync(interceptRequest(request), hostHandle)
62-
.handle((r, e) -> {
63-
if (e != null) {
64-
throw ArangoDBException.of(e);
65-
} else {
66-
interceptResponse(r);
67-
return responseDeserializer.deserialize(r);
68-
}
69-
})
70-
);
65+
.thenCompose(request -> protocol.executeAsync(interceptRequest(request), hostHandle))
66+
.handle((r, e) -> {
67+
if (e != null) {
68+
throw ArangoDBException.of(e);
69+
} else {
70+
interceptResponse(r);
71+
return responseDeserializer.deserialize(r);
72+
}
73+
});
74+
75+
if (downstreamExecutor != null) {
76+
return cf.thenApplyAsync(Function.identity(), downstreamExecutor);
77+
} else {
78+
return cf;
79+
}
7180
}
7281

7382
}

core/src/main/java/com/arangodb/internal/InternalArangoDBBuilder.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
import java.util.ArrayList;
3939
import java.util.Collection;
4040
import java.util.ServiceLoader;
41+
import java.util.concurrent.Executor;
4142

4243

4344
/**
@@ -53,6 +54,12 @@ public T loadProperties(final ArangoConfigProperties properties) {
5354
return (T) this;
5455
}
5556

57+
@SuppressWarnings("unchecked")
58+
public T protocol(final Protocol protocol) {
59+
config.setProtocol(protocol);
60+
return (T) this;
61+
}
62+
5663
/**
5764
* Adds a host to connect to. Multiple hosts can be added to provide fallbacks.
5865
*
@@ -289,6 +296,19 @@ public T serde(final ArangoSerde serde) {
289296
return (T) this;
290297
}
291298

299+
/**
300+
* Sets the downstream async executor that will be used to consume the responses of the async API, that are returned
301+
* as {@link java.util.concurrent.CompletableFuture}
302+
*
303+
* @param executor async downstream executor
304+
* @return {@link ArangoDB.Builder}
305+
*/
306+
@SuppressWarnings("unchecked")
307+
public T asyncExecutor(final Executor executor) {
308+
config.setAsyncExecutor(executor);
309+
return (T) this;
310+
}
311+
292312
protected ProtocolProvider protocolProvider(Protocol protocol) {
293313
ServiceLoader<ProtocolProvider> loader = ServiceLoader.load(ProtocolProvider.class);
294314
for (ProtocolProvider p : loader) {

core/src/main/java/com/arangodb/internal/config/ArangoConfig.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
import javax.net.ssl.SSLContext;
2020
import java.util.*;
21+
import java.util.concurrent.Executor;
2122
import java.util.stream.Collectors;
2223

2324
public class ArangoConfig {
@@ -41,6 +42,7 @@ public class ArangoConfig {
4142
private ArangoSerde userDataSerde;
4243
private Integer responseQueueTimeSamples;
4344
private Module protocolModule;
45+
private Executor asyncExecutor;
4446

4547
private static final Logger LOG = LoggerFactory.getLogger(ArangoConfig.class);
4648

@@ -275,4 +277,12 @@ public void setResponseQueueTimeSamples(Integer responseQueueTimeSamples) {
275277
public void setProtocolModule(Module m) {
276278
protocolModule = m;
277279
}
280+
281+
public Executor getAsyncExecutor() {
282+
return asyncExecutor;
283+
}
284+
285+
public void setAsyncExecutor(Executor asyncExecutor) {
286+
this.asyncExecutor = asyncExecutor;
287+
}
278288
}
Lines changed: 118 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,118 @@
1+
package com.arangodb;
2+
3+
import com.arangodb.config.ArangoConfigProperties;
4+
import org.junit.jupiter.api.Disabled;
5+
import org.junit.jupiter.params.ParameterizedTest;
6+
import org.junit.jupiter.params.provider.EnumSource;
7+
8+
import java.util.UUID;
9+
import java.util.concurrent.ExecutionException;
10+
import java.util.concurrent.ExecutorService;
11+
import java.util.concurrent.Executors;
12+
13+
import static org.assertj.core.api.Assertions.assertThat;
14+
15+
public class ConsumerThreadAsyncTest extends BaseJunit5 {
16+
17+
private volatile Thread thread;
18+
19+
private void setThread() {
20+
thread = Thread.currentThread();
21+
}
22+
23+
private void sleep() {
24+
try {
25+
Thread.sleep(3_000);
26+
} catch (InterruptedException e) {
27+
throw new RuntimeException(e);
28+
}
29+
}
30+
31+
@ParameterizedTest
32+
@EnumSource(Protocol.class)
33+
@Disabled
34+
void defaultConsumerThread(Protocol protocol) throws ExecutionException, InterruptedException {
35+
ArangoDBAsync adb = new ArangoDB.Builder()
36+
.loadProperties(ArangoConfigProperties.fromFile())
37+
.protocol(protocol)
38+
.build()
39+
.async();
40+
41+
adb.getVersion()
42+
.thenAccept(it -> setThread())
43+
.get();
44+
45+
adb.shutdown();
46+
47+
if (Protocol.VST.equals(protocol)) {
48+
assertThat(thread.getName()).startsWith("adb-vst-");
49+
} else {
50+
assertThat(thread.getName()).startsWith("adb-http-");
51+
}
52+
}
53+
54+
@ParameterizedTest
55+
@EnumSource(Protocol.class)
56+
void customConsumerExecutor(Protocol protocol) throws ExecutionException, InterruptedException {
57+
ExecutorService es = Executors.newCachedThreadPool(r -> {
58+
Thread t = Executors.defaultThreadFactory().newThread(r);
59+
t.setName("custom-" + UUID.randomUUID());
60+
return t;
61+
});
62+
ArangoDBAsync adb = new ArangoDB.Builder()
63+
.loadProperties(ArangoConfigProperties.fromFile())
64+
.protocol(protocol)
65+
.asyncExecutor(es)
66+
.build()
67+
.async();
68+
69+
adb.getVersion()
70+
.thenAccept(it -> setThread())
71+
.get();
72+
73+
adb.shutdown();
74+
es.shutdown();
75+
assertThat(thread.getName()).startsWith("custom-");
76+
}
77+
78+
/**
79+
* Generates warns from Vert.x BlockedThreadChecker
80+
*/
81+
@ParameterizedTest
82+
@EnumSource(Protocol.class)
83+
@Disabled
84+
void sleepOnDefaultConsumerThread(Protocol protocol) throws ExecutionException, InterruptedException {
85+
ArangoDBAsync adb = new ArangoDB.Builder()
86+
.loadProperties(ArangoConfigProperties.fromFile())
87+
.protocol(protocol)
88+
.maxConnections(1)
89+
.build()
90+
.async();
91+
92+
adb.getVersion()
93+
.thenAccept(it -> sleep())
94+
.get();
95+
96+
adb.shutdown();
97+
}
98+
99+
@ParameterizedTest
100+
@EnumSource(Protocol.class)
101+
void nestedRequests(Protocol protocol) throws ExecutionException, InterruptedException {
102+
ArangoDBAsync adb = new ArangoDB.Builder()
103+
.loadProperties(ArangoConfigProperties.fromFile())
104+
.protocol(protocol)
105+
.maxConnections(1)
106+
.build()
107+
.async();
108+
109+
adb.getVersion()
110+
.thenCompose(it -> adb.getVersion())
111+
.thenCompose(it -> adb.getVersion())
112+
.thenCompose(it -> adb.getVersion())
113+
.get();
114+
115+
adb.shutdown();
116+
}
117+
118+
}

http/src/main/java/com/arangodb/http/HttpConnection.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,7 @@ private static String getUserAgent() {
9090
timeout = config.getTimeout();
9191
vertx = Vertx.vertx(new VertxOptions().setPreferNativeTransport(true).setEventLoopPoolSize(1));
9292
vertx.runOnContext(e -> {
93-
Thread.currentThread().setName("adb-eventloop-" + THREAD_COUNT.getAndIncrement());
93+
Thread.currentThread().setName("adb-http-" + THREAD_COUNT.getAndIncrement());
9494
auth = new UsernamePasswordCredentials(
9595
config.getUser(), Optional.ofNullable(config.getPassword()).orElse("")
9696
).toHttpAuthorization();

vst/src/main/java/com/arangodb/vst/internal/VstConnection.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,13 +48,15 @@
4848
import java.util.Date;
4949
import java.util.Map;
5050
import java.util.concurrent.*;
51+
import java.util.concurrent.atomic.AtomicInteger;
5152
import java.util.concurrent.atomic.AtomicLong;
5253

5354
/**
5455
* @author Mark Vollmary
5556
*/
5657
public abstract class VstConnection<T> implements Connection {
5758
private static final Logger LOGGER = LoggerFactory.getLogger(VstConnection.class);
59+
private static final AtomicInteger THREAD_COUNT = new AtomicInteger();
5860
private static final byte[] PROTOCOL_HEADER = "VST/1.0\r\n\r\n".getBytes();
5961
protected final MessageStore messageStore = new MessageStore();
6062
protected final Integer timeout;
@@ -171,7 +173,12 @@ public synchronized void open() throws IOException {
171173
}
172174
sendProtocolHeader();
173175

174-
executor = Executors.newSingleThreadExecutor();
176+
executor = Executors.newSingleThreadExecutor(r -> {
177+
Thread t = Executors.defaultThreadFactory().newThread(r);
178+
t.setDaemon(true);
179+
t.setName("adb-vst-" + THREAD_COUNT.getAndIncrement());
180+
return t;
181+
});
175182
executor.submit((Callable<Void>) () -> {
176183
LOGGER.debug("[" + connectionName + "]: Start Callable");
177184

0 commit comments

Comments
 (0)