Skip to content

Commit fa26a82

Browse files
committed
Bump to 3.10.x Quarkus.io and align libraries (knative-extensions#3872)
* Bump quarkus BOM to get newer vert.x * Use unwrapped verxInternal's tracer() * Revert "Use unwrapped verxInternal's tracer()" * Fix ReceiverVerticleTracingTest#traceIsPropagated Try with 3.15.2 We change the mircometer-prometheus client dependency, because the simpleclient is backwards compatible to the old APIs. While vertx-micrometer 4.5.10 is still on the 1.12 APIs, we do inherit the 1.13 micrometer APIs via the Quarkus BOM... And hence we really need the simpleclient, we are using that, to avoid clash Signed-off-by: Matthias Wessendorf <[email protected]>
1 parent eafcec9 commit fa26a82

File tree

19 files changed

+291
-271
lines changed

19 files changed

+291
-271
lines changed

data-plane/THIRD-PARTY.txt

Lines changed: 160 additions & 153 deletions
Large diffs are not rendered by default.

data-plane/core/pom.xml

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,16 @@
5858
<groupId>io.fabric8</groupId>
5959
<artifactId>kubernetes-server-mock</artifactId>
6060
<scope>test</scope>
61+
<exclusions>
62+
<exclusion>
63+
<artifactId>mockwebserver</artifactId>
64+
<groupId>com.squareup.okhttp3</groupId>
65+
</exclusion>
66+
</exclusions>
67+
</dependency>
68+
<dependency>
69+
<groupId>com.squareup.okhttp3</groupId>
70+
<artifactId>mockwebserver</artifactId>
6171
</dependency>
6272

6373
<dependency>
@@ -171,7 +181,7 @@
171181

172182
<dependency>
173183
<groupId>io.micrometer</groupId>
174-
<artifactId>micrometer-registry-prometheus</artifactId>
184+
<artifactId>micrometer-registry-prometheus-simpleclient</artifactId>
175185
</dependency>
176186

177187
<dependency>

data-plane/core/src/test/java/dev/knative/eventing/kafka/broker/core/metrics/MetricsTest.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,8 @@
3636
public class MetricsTest {
3737

3838
static {
39-
BackendRegistries.setupBackend(new MicrometerMetricsOptions().setRegistryName(Metrics.METRICS_REGISTRY_NAME));
39+
BackendRegistries.setupBackend(
40+
new MicrometerMetricsOptions().setRegistryName(Metrics.METRICS_REGISTRY_NAME), null);
4041
}
4142

4243
@Test

data-plane/dispatcher/src/test/java/dev/knative/eventing/kafka/broker/dispatcher/impl/RecordDispatcherTest.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -73,9 +73,11 @@ public class RecordDispatcherTest {
7373
.build());
7474

7575
static {
76-
BackendRegistries.setupBackend(new MicrometerMetricsOptions()
77-
.setMicrometerRegistry(new PrometheusMeterRegistry(PrometheusConfig.DEFAULT))
78-
.setRegistryName(Metrics.METRICS_REGISTRY_NAME));
76+
BackendRegistries.setupBackend(
77+
new MicrometerMetricsOptions()
78+
.setMicrometerRegistry(new PrometheusMeterRegistry(PrometheusConfig.DEFAULT))
79+
.setRegistryName(Metrics.METRICS_REGISTRY_NAME),
80+
null);
7981
}
8082

8183
private PrometheusMeterRegistry registry;

data-plane/dispatcher/src/test/java/dev/knative/eventing/kafka/broker/dispatcher/impl/ResponseToKafkaTopicHandlerTest.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,8 @@
5252
public class ResponseToKafkaTopicHandlerTest {
5353

5454
static {
55-
BackendRegistries.setupBackend(new MicrometerMetricsOptions().setRegistryName(Metrics.METRICS_REGISTRY_NAME));
55+
BackendRegistries.setupBackend(
56+
new MicrometerMetricsOptions().setRegistryName(Metrics.METRICS_REGISTRY_NAME), null);
5657
}
5758

5859
private static final String TOPIC = "t1";

data-plane/dispatcher/src/test/java/dev/knative/eventing/kafka/broker/dispatcher/impl/consumer/AbstractConsumerVerticleTest.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,8 @@ public abstract class AbstractConsumerVerticleTest {
6262
private static final ConsumerVerticleContext resourceContext = FakeConsumerVerticleContext.get();
6363

6464
static {
65-
BackendRegistries.setupBackend(new MicrometerMetricsOptions().setRegistryName(Metrics.METRICS_REGISTRY_NAME));
65+
BackendRegistries.setupBackend(
66+
new MicrometerMetricsOptions().setRegistryName(Metrics.METRICS_REGISTRY_NAME), null);
6667
}
6768

6869
@Test

data-plane/dispatcher/src/test/java/dev/knative/eventing/kafka/broker/dispatcher/impl/http/WebClientCloudEventSenderTest.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -59,9 +59,11 @@
5959
public class WebClientCloudEventSenderTest {
6060

6161
static {
62-
BackendRegistries.setupBackend(new MicrometerMetricsOptions()
63-
.setMicrometerRegistry(new PrometheusMeterRegistry(PrometheusConfig.DEFAULT))
64-
.setRegistryName(Metrics.METRICS_REGISTRY_NAME));
62+
BackendRegistries.setupBackend(
63+
new MicrometerMetricsOptions()
64+
.setMicrometerRegistry(new PrometheusMeterRegistry(PrometheusConfig.DEFAULT))
65+
.setRegistryName(Metrics.METRICS_REGISTRY_NAME),
66+
null);
6567
}
6668

6769
@Test

data-plane/dispatcher/src/test/java/dev/knative/eventing/kafka/broker/dispatcher/integration/UnorderedConsumerTest.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,8 @@
5858
public class UnorderedConsumerTest {
5959

6060
static {
61-
BackendRegistries.setupBackend(new MicrometerMetricsOptions().setRegistryName(Metrics.METRICS_REGISTRY_NAME));
61+
BackendRegistries.setupBackend(
62+
new MicrometerMetricsOptions().setRegistryName(Metrics.METRICS_REGISTRY_NAME), null);
6263
}
6364

6465
private static final Logger logger = LoggerFactory.getLogger(UnorderedConsumerTest.class);

data-plane/dispatcher/src/test/java/dev/knative/eventing/kafka/broker/dispatcher/main/ConsumerVerticleFactoryImplTest.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,8 @@
5252
public class ConsumerVerticleFactoryImplTest {
5353

5454
static {
55-
BackendRegistries.setupBackend(new MicrometerMetricsOptions().setRegistryName(Metrics.METRICS_REGISTRY_NAME));
55+
BackendRegistries.setupBackend(
56+
new MicrometerMetricsOptions().setRegistryName(Metrics.METRICS_REGISTRY_NAME), null);
5657
}
5758

5859
@Test

data-plane/pom.xml

Lines changed: 27 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -42,25 +42,25 @@
4242
<spotless.plugin.version>2.38.0</spotless.plugin.version>
4343

4444
<!-- dependencies version -->
45-
<vertx.version>4.4.7</vertx.version>
45+
<vertx.version>4.5.10</vertx.version>
4646
<cloudevents.sdk.version>4.0.1</cloudevents.sdk.version>
47-
<micrometer.version>1.11.5</micrometer.version>
48-
<opentelemetry.version>1.17.0</opentelemetry.version>
47+
<micrometer.version>1.13.3</micrometer.version>
48+
<opentelemetry.version>1.39.0</opentelemetry.version>
4949
<jackson.version>2.14.1</jackson.version>
5050
<protobuf.version>3.25.2</protobuf.version>
5151
<bucket4j.version>7.6.0</bucket4j.version>
52-
<slf4j.version>2.0.9</slf4j.version>
53-
<ch.qos.logback.version>1.4.14</ch.qos.logback.version>
54-
<net.logstash.logback.encoder.version>7.2</net.logstash.logback.encoder.version>
52+
<slf4j.version>2.0.16</slf4j.version>
53+
<ch.qos.logback.version>1.5.16</ch.qos.logback.version>
54+
<net.logstash.logback.encoder.version>8.0</net.logstash.logback.encoder.version>
5555
<assertj.version>3.26.0</assertj.version>
5656
<awaitility.version>4.2.0</awaitility.version>
57-
<junit.jupiter.version>5.10.1</junit.jupiter.version>
57+
<junit.jupiter.version>5.10.3</junit.jupiter.version>
5858
<mokito.junit.jupiter.version>5.12.0</mokito.junit.jupiter.version>
59-
<fabric8.kubernetes.version>6.10.0</fabric8.kubernetes.version>
59+
<fabric8.kubernetes.version>6.13.4</fabric8.kubernetes.version>
6060
<kafka.version>3.7.1</kafka.version>
6161
<debezium.version>2.6.1.Final</debezium.version>
62-
<jib.version>3.4.0</jib.version>
63-
<quarkus.version>3.8.4</quarkus.version>
62+
<jib.version>3.4.4</jib.version>
63+
<quarkus.version>3.15.2</quarkus.version>
6464
<antlr.version>4.9.2
6565
</antlr.version> <!-- Overwritting quarkus's antlr version. Reminder: antlr4-maven-plugin,antlr4-runtime, antlr4 need to have the same version -->
6666
<palantirJavaFormat.version>2.38.0</palantirJavaFormat.version>
@@ -209,6 +209,10 @@
209209
<artifactId>okhttp</artifactId>
210210
<groupId>com.squareup.okhttp3</groupId>
211211
</exclusion>
212+
<exclusion>
213+
<artifactId>mockwebserver</artifactId>
214+
<groupId>com.squareup.okhttp3</groupId>
215+
</exclusion>
212216
<exclusion>
213217
<artifactId>builder-annotations</artifactId>
214218
<groupId>io.sundr</groupId>
@@ -219,6 +223,19 @@
219223
</exclusion>
220224
</exclusions>
221225
</dependency>
226+
<dependency>
227+
<artifactId>mockwebserver</artifactId>
228+
<groupId>com.squareup.okhttp3</groupId>
229+
<version>4.12.0</version>
230+
<scope>test</scope>
231+
</dependency>
232+
233+
<dependency>
234+
<artifactId>mockwebserver</artifactId>
235+
<groupId>com.squareup.okhttp3</groupId>
236+
<version>4.12.0</version>
237+
<scope>test</scope>
238+
</dependency>
222239

223240
<!-- Jackson -->
224241
<dependency>

data-plane/profiler/run.sh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -136,7 +136,7 @@ java \
136136
dispatcher_pid=$!
137137

138138
# Download Sacura
139-
GO111MODULE=off go get github.com/pierdipi/sacura/cmd/sacura || exit 1
139+
GO111MODULE=off go install github.com/pierdipi/sacura/cmd/sacura@main || exit 1
140140

141141
# Suppress failure since it fails when it doesn't receive all events.
142142
echo "Warm up $(date)"

data-plane/receiver-loom/src/main/java/dev/knative/eventing/kafka/broker/receiverloom/LoomKafkaProducer.java

Lines changed: 53 additions & 81 deletions
Original file line numberDiff line numberDiff line change
@@ -15,18 +15,20 @@
1515
*/
1616
package dev.knative.eventing.kafka.broker.receiverloom;
1717

18+
import com.google.common.util.concurrent.Uninterruptibles;
1819
import dev.knative.eventing.kafka.broker.core.ReactiveKafkaProducer;
1920
import dev.knative.eventing.kafka.broker.core.tracing.kafka.ProducerTracer;
21+
import io.opentelemetry.context.Context;
2022
import io.vertx.core.Future;
2123
import io.vertx.core.Promise;
2224
import io.vertx.core.Vertx;
2325
import io.vertx.core.impl.ContextInternal;
2426
import io.vertx.core.impl.VertxInternal;
27+
import io.vertx.core.impl.future.PromiseInternal;
2528
import io.vertx.core.tracing.TracingPolicy;
2629
import java.util.Objects;
27-
import java.util.concurrent.BlockingQueue;
28-
import java.util.concurrent.LinkedBlockingQueue;
29-
import java.util.concurrent.TimeUnit;
30+
import java.util.concurrent.ExecutorService;
31+
import java.util.concurrent.Executors;
3032
import java.util.concurrent.atomic.AtomicBoolean;
3133
import org.apache.kafka.clients.producer.Producer;
3234
import org.apache.kafka.clients.producer.ProducerRecord;
@@ -41,17 +43,15 @@ public class LoomKafkaProducer<K, V> implements ReactiveKafkaProducer<K, V> {
4143

4244
private final Producer<K, V> producer;
4345

44-
private final BlockingQueue<RecordPromise<K, V>> eventQueue;
46+
private final ExecutorService executorService;
4547
private final AtomicBoolean isClosed;
4648
private final ProducerTracer<?> tracer;
4749
private final VertxInternal vertx;
48-
private final Thread sendFromQueueThread;
4950
private final Promise<Void> closePromise = Promise.promise();
5051

5152
public LoomKafkaProducer(Vertx v, Producer<K, V> producer) {
5253
Objects.requireNonNull(v, "Vertx cannot be null");
5354
this.producer = producer;
54-
this.eventQueue = new LinkedBlockingQueue<>();
5555
this.isClosed = new AtomicBoolean(false);
5656
this.vertx = (VertxInternal) v;
5757
final var ctxInt = ((ContextInternal) v.getOrCreateContext()).unwrap();
@@ -62,73 +62,54 @@ public LoomKafkaProducer(Vertx v, Producer<K, V> producer) {
6262
this.tracer = null;
6363
}
6464

65+
ExecutorService executorService;
6566
if (Boolean.parseBoolean(System.getenv("ENABLE_VIRTUAL_THREADS"))) {
66-
this.sendFromQueueThread = Thread.ofVirtual().start(this::sendFromQueue);
67+
executorService = Executors.newVirtualThreadPerTaskExecutor();
6768
} else {
68-
this.sendFromQueueThread = new Thread(this::sendFromQueue);
69-
this.sendFromQueueThread.start();
69+
executorService = Executors.newSingleThreadExecutor();
7070
}
71+
this.executorService = Context.taskWrapping(executorService);
7172
}
7273

7374
@Override
7475
public Future<RecordMetadata> send(ProducerRecord<K, V> record) {
75-
final Promise<RecordMetadata> promise = Promise.promise();
7676
if (isClosed.get()) {
77-
promise.fail("Producer is closed");
78-
} else {
79-
eventQueue.add(new RecordPromise<>(record, this.vertx.getOrCreateContext(), promise));
77+
return Future.failedFuture("Producer is closed");
8078
}
79+
PromiseInternal<RecordMetadata> promise = vertx.promise();
80+
executorService.execute(() -> sendFromQueue(new RecordPromise<>(record, promise)));
8181
return promise.future();
8282
}
8383

84-
private void sendFromQueue() {
85-
// Process queue elements until this is closed and the tasks queue is empty
86-
while (!isClosed.get() || !eventQueue.isEmpty()) {
87-
try {
88-
final var recordPromise = eventQueue.poll(2000, TimeUnit.MILLISECONDS);
89-
if (recordPromise == null) {
90-
continue;
84+
private void sendFromQueue(RecordPromise<K, V> recordPromise) {
85+
final var startedSpan = this.tracer == null
86+
? null
87+
: this.tracer.prepareSendMessage(recordPromise.context(), recordPromise.record);
88+
89+
recordPromise
90+
.promise
91+
.future()
92+
.onComplete(v -> {
93+
if (startedSpan != null) {
94+
startedSpan.finish(recordPromise.context());
95+
}
96+
})
97+
.onFailure(cause -> {
98+
if (startedSpan != null) {
99+
startedSpan.fail(recordPromise.context(), cause);
100+
}
101+
});
102+
try {
103+
producer.send(recordPromise.record, (metadata, exception) -> {
104+
if (exception != null) {
105+
recordPromise.fail(exception);
106+
return;
91107
}
92-
93-
final var startedSpan = this.tracer == null
94-
? null
95-
: this.tracer.prepareSendMessage(recordPromise.getContext(), recordPromise.getRecord());
96-
97-
recordPromise
98-
.getPromise()
99-
.future()
100-
.onComplete(v -> {
101-
if (startedSpan != null) {
102-
startedSpan.finish(recordPromise.getContext());
103-
}
104-
})
105-
.onFailure(cause -> {
106-
if (startedSpan != null) {
107-
startedSpan.fail(recordPromise.getContext(), cause);
108-
}
109-
});
110-
try {
111-
producer.send(
112-
recordPromise.getRecord(),
113-
(metadata, exception) -> recordPromise.getContext().runOnContext(v -> {
114-
if (exception != null) {
115-
recordPromise.getPromise().fail(exception);
116-
return;
117-
}
118-
recordPromise.getPromise().complete(metadata);
119-
}));
120-
} catch (final KafkaException exception) {
121-
recordPromise
122-
.getContext()
123-
.runOnContext(v -> recordPromise.getPromise().fail(exception));
124-
}
125-
} catch (InterruptedException e) {
126-
logger.debug("Interrupted while waiting for event queue to be populated.");
127-
break;
128-
}
108+
recordPromise.complete(metadata);
109+
});
110+
} catch (final KafkaException exception) {
111+
recordPromise.fail(exception);
129112
}
130-
131-
logger.debug("Background thread completed.");
132113
}
133114

134115
@Override
@@ -141,12 +122,9 @@ public Future<Void> close() {
141122

142123
Thread.ofVirtual().start(() -> {
143124
try {
144-
while (!eventQueue.isEmpty()) {
145-
logger.debug("Waiting for the eventQueue to become empty");
146-
Thread.sleep(2000L);
147-
}
148-
logger.debug("Waiting for sendFromQueueThread thread to complete");
149-
sendFromQueueThread.join();
125+
executorService.shutdown();
126+
logger.debug("Waiting for tasks to complete");
127+
Uninterruptibles.awaitTerminationUninterruptibly(executorService);
150128
logger.debug("Closing the producer");
151129
producer.close();
152130
closePromise.complete();
@@ -178,35 +156,29 @@ public Producer<K, V> unwrap() {
178156
}
179157

180158
private static class RecordPromise<K, V> {
181-
private final ProducerRecord<K, V> record;
182-
private final ContextInternal context;
183-
private final Promise<RecordMetadata> promise;
159+
final ProducerRecord<K, V> record;
160+
final PromiseInternal<RecordMetadata> promise;
184161

185-
private RecordPromise(ProducerRecord<K, V> record, ContextInternal context, Promise<RecordMetadata> promise) {
162+
RecordPromise(ProducerRecord<K, V> record, PromiseInternal<RecordMetadata> promise) {
186163
this.record = record;
187-
this.context = context;
188164
this.promise = promise;
189165
}
190166

191-
public ProducerRecord<K, V> getRecord() {
192-
return record;
167+
ContextInternal context() {
168+
return promise.context();
193169
}
194170

195-
public Promise<RecordMetadata> getPromise() {
196-
return promise;
171+
void complete(RecordMetadata result) {
172+
promise.complete(result);
197173
}
198174

199-
public ContextInternal getContext() {
200-
return context;
175+
void fail(Throwable cause) {
176+
promise.fail(cause);
201177
}
202178
}
203179

204180
// Function needed for testing
205181
public boolean isSendFromQueueThreadAlive() {
206-
return sendFromQueueThread.isAlive();
207-
}
208-
209-
public int getEventQueueSize() {
210-
return eventQueue.size();
182+
return !executorService.isTerminated();
211183
}
212184
}

0 commit comments

Comments
 (0)