Skip to content

Commit 5c7d707

Browse files
maschmidmatzew
authored andcommitted
LoomKafkaProducer|Consumer let the background thread finish itself (knative-extensions#4013)
1 parent 3337ac4 commit 5c7d707

File tree

2 files changed

+11
-18
lines changed

2 files changed

+11
-18
lines changed

data-plane/dispatcher-loom/src/main/java/dev/knative/eventing/kafka/broker/dispatcherloom/LoomKafkaConsumer.java

+5-15
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import java.util.Map;
2828
import java.util.concurrent.BlockingQueue;
2929
import java.util.concurrent.LinkedBlockingQueue;
30+
import java.util.concurrent.TimeUnit;
3031
import java.util.concurrent.atomic.AtomicBoolean;
3132
import org.apache.kafka.clients.consumer.Consumer;
3233
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
@@ -44,15 +45,13 @@ public class LoomKafkaConsumer<K, V> implements ReactiveKafkaConsumer<K, V> {
4445
private final Consumer<K, V> consumer;
4546
private final BlockingQueue<Runnable> taskQueue;
4647
private final AtomicBoolean isClosed;
47-
private final AtomicBoolean isFinished;
4848
private final Thread taskRunnerThread;
4949
private final Promise<Void> closePromise = Promise.promise();
5050

5151
public LoomKafkaConsumer(Vertx vertx, Consumer<K, V> consumer) {
5252
this.consumer = consumer;
5353
this.taskQueue = new LinkedBlockingQueue<>();
5454
this.isClosed = new AtomicBoolean(false);
55-
this.isFinished = new AtomicBoolean(false);
5655

5756
if (Boolean.parseBoolean(System.getenv("ENABLE_VIRTUAL_THREADS"))) {
5857
this.taskRunnerThread = Thread.ofVirtual().start(this::processTaskQueue);
@@ -74,14 +73,15 @@ private void processTaskQueue() {
7473
// Process queue elements until this is closed and the tasks queue is empty
7574
while (!isClosed.get() || !taskQueue.isEmpty()) {
7675
try {
77-
taskQueue.take().run();
76+
Runnable task = taskQueue.poll(2000, TimeUnit.MILLISECONDS);
77+
if (task != null) {
78+
task.run();
79+
}
7880
} catch (InterruptedException e) {
7981
logger.debug("Interrupted while waiting for task", e);
8082
break;
8183
}
8284
}
83-
84-
isFinished.set(true);
8585
}
8686

8787
@Override
@@ -126,16 +126,6 @@ public Future<Void> close() {
126126
}
127127
logger.debug("Queue is empty");
128128

129-
if (!isFinished.get()) {
130-
logger.debug("Background thread not finished yet, waiting for it to complete");
131-
Thread.sleep(2000L);
132-
133-
if (!isFinished.get()) {
134-
logger.debug("Background thread still not finished yet, interrupting background thread");
135-
taskRunnerThread.interrupt();
136-
}
137-
}
138-
139129
taskRunnerThread.join();
140130
closePromise.tryComplete();
141131

data-plane/receiver-loom/src/main/java/dev/knative/eventing/kafka/broker/receiverloom/LoomKafkaProducer.java

+6-3
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import java.util.Objects;
2727
import java.util.concurrent.BlockingQueue;
2828
import java.util.concurrent.LinkedBlockingQueue;
29+
import java.util.concurrent.TimeUnit;
2930
import java.util.concurrent.atomic.AtomicBoolean;
3031
import org.apache.kafka.clients.producer.Producer;
3132
import org.apache.kafka.clients.producer.ProducerRecord;
@@ -84,7 +85,11 @@ private void sendFromQueue() {
8485
// Process queue elements until this is closed and the tasks queue is empty
8586
while (!isClosed.get() || !eventQueue.isEmpty()) {
8687
try {
87-
final var recordPromise = eventQueue.take();
88+
final var recordPromise = eventQueue.poll(2000, TimeUnit.MILLISECONDS);
89+
if (recordPromise == null) {
90+
continue;
91+
}
92+
8893
final var startedSpan = this.tracer == null
8994
? null
9095
: this.tracer.prepareSendMessage(recordPromise.getContext(), recordPromise.getRecord());
@@ -140,8 +145,6 @@ public Future<Void> close() {
140145
logger.debug("Waiting for the eventQueue to become empty");
141146
Thread.sleep(2000L);
142147
}
143-
logger.debug("Interrupting sendFromQueueThread thread");
144-
sendFromQueueThread.interrupt();
145148
logger.debug("Waiting for sendFromQueueThread thread to complete");
146149
sendFromQueueThread.join();
147150
logger.debug("Closing the producer");

0 commit comments

Comments
 (0)