Skip to content

Commit e360777

Browse files
committed
LoomKafkaProducer, wait for sendFromQueueThread to finish
1 parent 961961c commit e360777

File tree

1 file changed

+6
-5
lines changed

1 file changed

+6
-5
lines changed

data-plane/receiver-loom/src/main/java/dev/knative/eventing/kafka/broker/receiverloom/LoomKafkaProducer.java

+6-5
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ public class LoomKafkaProducer<K, V> implements ReactiveKafkaProducer<K, V> {
4242

4343
private final BlockingQueue<RecordPromise<K, V>> eventQueue;
4444
private final AtomicBoolean isClosed;
45+
private final AtomicBoolean isFinished;
4546
private final ProducerTracer<?> tracer;
4647
private final VertxInternal vertx;
4748
private final Thread sendFromQueueThread;
@@ -52,6 +53,7 @@ public LoomKafkaProducer(Vertx v, Producer<K, V> producer) {
5253
this.producer = producer;
5354
this.eventQueue = new LinkedBlockingQueue<>();
5455
this.isClosed = new AtomicBoolean(false);
56+
this.isFinished = new AtomicBoolean(false);
5557
this.vertx = (VertxInternal) v;
5658
final var ctxInt = ((ContextInternal) v.getOrCreateContext()).unwrap();
5759
if (ctxInt.tracer() != null) {
@@ -123,6 +125,7 @@ private void sendFromQueue() {
123125
}
124126
}
125127

128+
isFinished.set(true);
126129
logger.debug("Background thread completed.");
127130
}
128131

@@ -136,13 +139,11 @@ public Future<Void> close() {
136139

137140
Thread.ofVirtual().start(() -> {
138141
try {
139-
while (!eventQueue.isEmpty()) {
140-
logger.debug("Waiting for the eventQueue to become empty");
142+
while (!isFinished.get()) {
143+
logger.debug("Waiting for the sendFromQueueThread to finish");
141144
Thread.sleep(2000L);
142145
}
143-
logger.debug("Interrupting sendFromQueueThread thread");
144-
sendFromQueueThread.interrupt();
145-
logger.debug("Waiting for sendFromQueueThread thread to complete");
146+
146147
sendFromQueueThread.join();
147148
logger.debug("Closing the producer");
148149
producer.close();

0 commit comments

Comments
 (0)