Skip to content

Commit 1ec6170

Browse files
committed
Handle correctly future await from an execute blocking on a virtual thread.
Motivation: Future#await ensures that we use a virtual thread context to obtain an execute to suspend the virtual thread, however this check also includes execute blocking tasks. Changes: When obtaining the worker executor to suspend the current task, ensure that we are on the context thread, so execute blocking tasks are not included.
1 parent 7acf5b2 commit 1ec6170

File tree

5 files changed

+123
-19
lines changed

5 files changed

+123
-19
lines changed

src/main/java/io/vertx/core/Future.java

+38-9
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,8 @@
2020
import io.vertx.core.impl.future.SucceededFuture;
2121

2222
import java.util.List;
23-
import java.util.concurrent.CompletableFuture;
24-
import java.util.concurrent.CompletionStage;
25-
import java.util.concurrent.CountDownLatch;
26-
import java.util.concurrent.TimeUnit;
23+
import java.util.Objects;
24+
import java.util.concurrent.*;
2725
import java.util.function.Function;
2826
import java.util.function.Supplier;
2927

@@ -668,14 +666,46 @@ static <T> Future<T> fromCompletionStage(CompletionStage<T> completionStage, Con
668666
* @throws IllegalStateException when called from an event-loop thread or a non Vert.x thread
669667
*/
670668
static <T> T await(Future<T> future) {
669+
try {
670+
return await(future, -1, null);
671+
} catch (TimeoutException e) {
672+
// Not a possible case
673+
return null;
674+
}
675+
676+
}
677+
678+
/**
679+
* Like {@link #await(Future)} but with a timeout.
680+
*
681+
* @param timeout the timeout
682+
* @param unit the timeout unit
683+
* @return the result
684+
* @throws TimeoutException when the timeout fires before the future completes
685+
* @throws IllegalStateException when called from a vertx event-loop or worker thread
686+
*/
687+
public static <T> T await(Future<T> future, long timeout, TimeUnit unit) throws TimeoutException {
688+
if (timeout >= 0L && unit == null) {
689+
throw new NullPointerException();
690+
}
671691
io.vertx.core.impl.WorkerExecutor executor = io.vertx.core.impl.WorkerExecutor.unwrapWorkerExecutor();
672-
if (executor == null) {
673-
throw new IllegalStateException();
692+
CountDownLatch latch;
693+
if (executor != null) {
694+
latch = executor.suspend(cont -> future.onComplete(ar -> cont.resume()));
695+
} else {
696+
latch = new CountDownLatch(1);
697+
future.onComplete(ar -> latch.countDown());
674698
}
675-
CountDownLatch latch = executor.suspend(cont -> future.onComplete(ar -> cont.resume()));
676699
if (latch != null) {
677700
try {
678-
latch.await();
701+
if (timeout >= 0) {
702+
Objects.requireNonNull(unit);
703+
if (!latch.await(timeout, unit)) {
704+
throw new TimeoutException();
705+
}
706+
} else {
707+
latch.await();
708+
}
679709
} catch (InterruptedException e) {
680710
Utils.throwAsUnchecked(e);
681711
return null;
@@ -691,5 +721,4 @@ static <T> T await(Future<T> future) {
691721
return null;
692722
}
693723
}
694-
695724
}

src/main/java/io/vertx/core/impl/WorkerExecutor.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,8 @@ public static io.vertx.core.impl.WorkerExecutor unwrapWorkerExecutor() {
3636
throw new IllegalStateException(msg);
3737
}
3838
ContextInternal ctx = VertxImpl.currentContext(thread);
39-
if (ctx != null && ctx.threadingModel() == ThreadingModel.VIRTUAL_THREAD) {
39+
if (ctx != null && ctx.inThread()) {
40+
// It can only be a Vert.x virtual thread
4041
return (io.vertx.core.impl.WorkerExecutor) ctx.executor();
4142
} else {
4243
return null;
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
package io.vertx.core;
2+
3+
import io.vertx.core.impl.ContextInternal;
4+
import io.vertx.test.core.VertxTestBase;
5+
import org.junit.Test;
6+
7+
import java.util.concurrent.TimeUnit;
8+
import java.util.concurrent.TimeoutException;
9+
10+
public class FutureAwaitTest extends VertxTestBase {
11+
12+
@Test
13+
public void testAwaitFromEventLoopThread() {
14+
Promise<String> promise = Promise.promise();
15+
Future<String> future = promise.future();
16+
ContextInternal ctx = (ContextInternal) vertx.getOrCreateContext();
17+
ctx.nettyEventLoop().execute(() -> {
18+
try {
19+
Future.await(future);
20+
} catch (IllegalStateException expected) {
21+
testComplete();
22+
}
23+
});
24+
await();
25+
}
26+
27+
@Test
28+
public void testAwaitFromNonVertxThread() {
29+
Promise<String> promise = Promise.promise();
30+
Future<String> future = promise.future();
31+
Thread current = Thread.currentThread();
32+
new Thread(() -> {
33+
while (current.getState() != Thread.State.WAITING) {
34+
try {
35+
Thread.sleep(10);
36+
} catch (InterruptedException ignore) {
37+
}
38+
}
39+
promise.complete("the-result");
40+
}).start();
41+
String res = Future.await(future);
42+
assertEquals("the-result", res);
43+
}
44+
45+
@Test
46+
public void testAwaitWithTimeout() {
47+
Promise<String> promise = Promise.promise();
48+
Future<String> future = promise.future();
49+
long now = System.currentTimeMillis();
50+
try {
51+
Future.await(future, 100, TimeUnit.MILLISECONDS);
52+
fail();
53+
} catch (TimeoutException expected) {
54+
}
55+
assertTrue((System.currentTimeMillis() - now) >= 100);
56+
}
57+
}

src/test/java/io/vertx/core/FutureTest.java

-9
Original file line numberDiff line numberDiff line change
@@ -1821,15 +1821,6 @@ public void testAndThenCompleteHandlerWithError() {
18211821
await();
18221822
}
18231823

1824-
@Test
1825-
public void testAwaitFromPlainThread() {
1826-
try {
1827-
Future.await(Promise.promise().future());
1828-
fail();
1829-
} catch (IllegalStateException e) {
1830-
}
1831-
}
1832-
18331824
@Test
18341825
public void contextFutureTimeoutFires() {
18351826
ContextInternal ctx = (ContextInternal) vertx.getOrCreateContext();

src/test/java/io/vertx/core/VirtualThreadContextTest.java

+26
Original file line numberDiff line numberDiff line change
@@ -387,4 +387,30 @@ public void testSubmitAfterClose() {
387387
});
388388
await();
389389
}
390+
391+
@Test
392+
public void testAwaitFromVirtualThreadExecuteBlocking() {
393+
Assume.assumeTrue(isVirtualThreadAvailable());
394+
Context ctx = vertx.createVirtualThreadContext();
395+
ctx.executeBlocking(() -> {
396+
Future.await(vertx.timer(20));
397+
return "done";
398+
}).onComplete(onSuccess(res -> {
399+
assertEquals("done", res);
400+
testComplete();
401+
}));
402+
await();
403+
}
404+
405+
@Test
406+
public void testAwaitFromWorkerExecuteBlocking() {
407+
Context ctx = vertx.getOrCreateContext();
408+
ctx.executeBlocking(() -> {
409+
Future.await(vertx.timer(20));
410+
return "done";
411+
}).onComplete(onFailure(res -> {
412+
testComplete();
413+
}));
414+
await();
415+
}
390416
}

0 commit comments

Comments
 (0)