Skip to content

Commit 3235c49

Browse files
authored
chore(s3stream): replace eventloop with executor in async semaphore (#2283)
Signed-off-by: Robin Han <[email protected]>
1 parent b4c5341 commit 3235c49

File tree

1 file changed

+9
-10
lines changed

1 file changed

+9
-10
lines changed

s3stream/src/main/java/com/automq/stream/utils/AsyncSemaphore.java

+9-10
Original file line numberDiff line numberDiff line change
@@ -11,14 +11,13 @@
1111

1212
package com.automq.stream.utils;
1313

14-
import com.automq.stream.utils.threads.EventLoop;
15-
1614
import org.slf4j.Logger;
1715
import org.slf4j.LoggerFactory;
1816

1917
import java.util.LinkedList;
2018
import java.util.Queue;
2119
import java.util.concurrent.CompletableFuture;
20+
import java.util.concurrent.Executor;
2221
import java.util.function.Supplier;
2322

2423
public class AsyncSemaphore {
@@ -36,11 +35,11 @@ public AsyncSemaphore(long permits) {
3635
* @param requiredPermits the required permits
3736
* @param task task to run when the permits are available, the task should return a CompletableFuture
3837
* which will be completed when the permits could be released.
39-
* @param eventLoop the eventLoop to run the task when the permits are available
38+
* @param executor the executor to run the task when the permits are available
4039
* @return true if the permits are acquired, false if the task is added to the waiting queue.
4140
*/
4241
public synchronized boolean acquire(long requiredPermits, Supplier<CompletableFuture<?>> task,
43-
EventLoop eventLoop) {
42+
Executor executor) {
4443
if (permits >= 0) {
4544
// allow permits minus to negative
4645
permits -= requiredPermits;
@@ -51,7 +50,7 @@ public synchronized boolean acquire(long requiredPermits, Supplier<CompletableFu
5150
}
5251
return true;
5352
} else {
54-
tasks.add(new AsyncSemaphoreTask(requiredPermits, task, eventLoop));
53+
tasks.add(new AsyncSemaphoreTask(requiredPermits, task, executor));
5554
return false;
5655
}
5756
}
@@ -69,21 +68,21 @@ synchronized void release(long requiredPermits) {
6968
if (permits > 0) {
7069
AsyncSemaphoreTask t = tasks.poll();
7170
if (t != null) {
72-
// use eventLoop to reset the thread stack to avoid stack overflow
73-
t.eventLoop.execute(() -> acquire(t.requiredPermits, t.task, t.eventLoop));
71+
// use executor to reset the thread stack to avoid stack overflow
72+
t.executor.execute(() -> acquire(t.requiredPermits, t.task, t.executor));
7473
}
7574
}
7675
}
7776

7877
static class AsyncSemaphoreTask {
7978
final long requiredPermits;
8079
final Supplier<CompletableFuture<?>> task;
81-
final EventLoop eventLoop;
80+
final Executor executor;
8281

83-
public AsyncSemaphoreTask(long requiredPermits, Supplier<CompletableFuture<?>> task, EventLoop eventLoop) {
82+
public AsyncSemaphoreTask(long requiredPermits, Supplier<CompletableFuture<?>> task, Executor executor) {
8483
this.requiredPermits = requiredPermits;
8584
this.task = task;
86-
this.eventLoop = eventLoop;
85+
this.executor = executor;
8786
}
8887
}
8988
}

0 commit comments

Comments
 (0)