Skip to content

Commit c33ad32

Browse files
committed
Add test for super stream exchange
References rabbitmq/rabbitmq-server#8398
1 parent 88dfaf1 commit c33ad32

File tree

3 files changed

+175
-2
lines changed

3 files changed

+175
-2
lines changed

.github/workflows/test-pr.yml

+3
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,9 @@ on:
55
branches:
66
- main
77

8+
env:
9+
RABBITMQ_IMAGE: 'pivotalrabbitmq/rabbitmq:stream-chunk-filtering-otp-max-bazel'
10+
811
jobs:
912
build:
1013
runs-on: ubuntu-22.04
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,140 @@
1+
// Copyright (c) 2023 VMware, Inc. or its affiliates. All rights reserved.
2+
//
3+
// This software, the RabbitMQ Stream Java client library, is dual-licensed under the
4+
// Mozilla Public License 2.0 ("MPL"), and the Apache License version 2 ("ASL").
5+
// For the MPL, please see LICENSE-MPL-RabbitMQ. For the ASL,
6+
// please see LICENSE-APACHE2.
7+
//
8+
// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND,
9+
// either express or implied. See the LICENSE file for specific language governing
10+
// rights and limitations of this software.
11+
//
12+
// If you have any questions regarding licensing, please contact us at
13+
14+
package com.rabbitmq.stream.impl;
15+
16+
import static com.rabbitmq.stream.impl.TestUtils.*;
17+
import static com.rabbitmq.stream.impl.TestUtils.SuperStreamExchangeType.SUPER;
18+
import static org.assertj.core.api.Assertions.assertThat;
19+
20+
import com.rabbitmq.client.AMQP;
21+
import com.rabbitmq.client.Channel;
22+
import com.rabbitmq.client.Connection;
23+
import com.rabbitmq.client.ConnectionFactory;
24+
import com.rabbitmq.stream.*;
25+
import io.netty.channel.EventLoopGroup;
26+
import java.util.*;
27+
import java.util.concurrent.ConcurrentHashMap;
28+
import java.util.concurrent.CountDownLatch;
29+
import java.util.function.BiConsumer;import java.util.stream.IntStream;
30+
import org.junit.jupiter.api.AfterEach;
31+
import org.junit.jupiter.api.BeforeEach;
32+
import org.junit.jupiter.api.Test;
33+
import org.junit.jupiter.api.TestInfo;
34+
import org.junit.jupiter.api.extension.ExtendWith;
35+
36+
@ExtendWith(TestUtils.StreamTestInfrastructureExtension.class)
37+
public class SuperStreamExchangeTest {
38+
39+
EventLoopGroup eventLoopGroup;
40+
41+
Environment environment;
42+
43+
Connection connection;
44+
int partitions = 3;
45+
int messageCount = 10_000;
46+
String superStream;
47+
48+
@BeforeEach
49+
void init(TestInfo info) throws Exception {
50+
EnvironmentBuilder environmentBuilder =
51+
Environment.builder().netty().eventLoopGroup(eventLoopGroup).environmentBuilder();
52+
environmentBuilder.addressResolver(add -> localhost());
53+
environment = environmentBuilder.build();
54+
connection = new ConnectionFactory().newConnection();
55+
superStream = TestUtils.streamName(info);
56+
}
57+
58+
@AfterEach
59+
void tearDown() throws Exception {
60+
environment.close();
61+
deleteSuperStreamTopology(connection, superStream, partitions);
62+
connection.close();
63+
}
64+
65+
@Test
66+
void publish() throws Exception {
67+
declareSuperStreamTopology(connection, superStream, SUPER, partitions);
68+
List<String> routingKeys = new ArrayList<>(messageCount);
69+
IntStream.range(0, messageCount)
70+
.forEach(ignored -> routingKeys.add(UUID.randomUUID().toString()));
71+
72+
CountDownLatch publishLatch = new CountDownLatch(messageCount);
73+
try (Producer producer =
74+
environment
75+
.producerBuilder()
76+
.superStream(superStream)
77+
.routing(msg -> msg.getProperties().getMessageIdAsString())
78+
.producerBuilder()
79+
.build()) {
80+
ConfirmationHandler confirmationHandler = status -> publishLatch.countDown();
81+
routingKeys.forEach(
82+
rk ->
83+
producer.send(
84+
producer.messageBuilder().properties().messageId(rk).messageBuilder().build(),
85+
confirmationHandler));
86+
latchAssert(publishLatch).completes();
87+
}
88+
89+
java.util.function.Consumer<Map<String, Set<String>>> consumeMessages =
90+
receivedMessages -> {
91+
CountDownLatch consumeLatch = new CountDownLatch(messageCount);
92+
try (Consumer ignored =
93+
environment
94+
.consumerBuilder()
95+
.superStream(superStream)
96+
.offset(OffsetSpecification.first())
97+
.messageHandler(
98+
(ctx, msg) -> {
99+
receivedMessages
100+
.computeIfAbsent(ctx.stream(), k -> ConcurrentHashMap.newKeySet())
101+
.add(msg.getProperties().getMessageIdAsString());
102+
consumeLatch.countDown();
103+
})
104+
.build()) {
105+
106+
latchAssert(consumeLatch).completes();
107+
assertThat(receivedMessages.values().stream().mapToInt(Set::size).sum())
108+
.isEqualTo(messageCount);
109+
}
110+
};
111+
112+
Map<String, Set<String>> streamProducerMessages = new ConcurrentHashMap<>(partitions);
113+
consumeMessages.accept(streamProducerMessages);
114+
115+
deleteSuperStreamTopology(connection, superStream, partitions);
116+
declareSuperStreamTopology(connection, superStream, SUPER, partitions);
117+
118+
try (Channel channel = connection.createChannel()) {
119+
channel.confirmSelect();
120+
for (String rk : routingKeys) {
121+
channel.basicPublish(
122+
superStream, rk, new AMQP.BasicProperties.Builder().messageId(rk).build(), null);
123+
}
124+
channel.waitForConfirmsOrDie();
125+
}
126+
127+
Map<String, Set<String>> amqpProducerMessages = new ConcurrentHashMap<>(partitions);
128+
consumeMessages.accept(amqpProducerMessages);
129+
assertThat(amqpProducerMessages).hasSameSizeAs(streamProducerMessages)
130+
.containsKeys(streamProducerMessages.keySet().toArray(new String[]{}));
131+
132+
BiConsumer<Set<String>, Set<String>> compareSets = (s1, s2) -> {
133+
assertThat(s1).hasSameSizeAs(s2);
134+
s1.forEach(rk -> assertThat(s2).contains(rk));
135+
};
136+
137+
amqpProducerMessages.forEach(
138+
(key, value) -> compareSets.accept(value, streamProducerMessages.get(key)));
139+
}
140+
}

src/test/java/com/rabbitmq/stream/impl/TestUtils.java

+32-2
Original file line numberDiff line numberDiff line change
@@ -13,13 +13,13 @@
1313
1414
package com.rabbitmq.stream.impl;
1515

16+
import static com.rabbitmq.stream.impl.TestUtils.SuperStreamExchangeType.DIRECT;
1617
import static java.lang.String.format;
1718
import static java.util.concurrent.TimeUnit.SECONDS;
1819
import static org.assertj.core.api.Assertions.assertThat;
1920
import static org.junit.jupiter.api.Assertions.fail;
2021

2122
import ch.qos.logback.classic.Level;
22-
import com.rabbitmq.client.BuiltinExchangeType;
2323
import com.rabbitmq.client.Channel;
2424
import com.rabbitmq.client.Connection;
2525
import com.rabbitmq.stream.Address;
@@ -272,18 +272,37 @@ static <T> void doIfNotNull(T obj, Consumer<T> action) {
272272

273273
static void declareSuperStreamTopology(Connection connection, String superStream, int partitions)
274274
throws Exception {
275+
declareSuperStreamTopology(connection, superStream, DIRECT, partitions);
276+
}
277+
278+
static void declareSuperStreamTopology(
279+
Connection connection,
280+
String superStream,
281+
SuperStreamExchangeType exchangeType,
282+
int partitions)
283+
throws Exception {
275284
declareSuperStreamTopology(
276285
connection,
277286
superStream,
287+
exchangeType,
278288
IntStream.range(0, partitions).mapToObj(String::valueOf).toArray(String[]::new));
279289
}
280290

281291
static void declareSuperStreamTopology(Connection connection, String superStream, String... rks)
282292
throws Exception {
293+
declareSuperStreamTopology(connection, superStream, DIRECT, rks);
294+
}
295+
296+
static void declareSuperStreamTopology(
297+
Connection connection,
298+
String superStream,
299+
SuperStreamExchangeType exchangeType,
300+
String... rks)
301+
throws Exception {
283302
try (Channel ch = connection.createChannel()) {
284303
ch.exchangeDeclare(
285304
superStream,
286-
BuiltinExchangeType.DIRECT,
305+
exchangeType.value,
287306
true,
288307
false,
289308
Collections.singletonMap("x-super-stream", true));
@@ -309,6 +328,17 @@ static void declareSuperStreamTopology(Connection connection, String superStream
309328
}
310329
}
311330

331+
public enum SuperStreamExchangeType {
332+
DIRECT("direct"),
333+
SUPER("x-super-stream");
334+
335+
final String value;
336+
337+
SuperStreamExchangeType(String value) {
338+
this.value = value;
339+
}
340+
}
341+
312342
static void deleteSuperStreamTopology(Connection connection, String superStream, int partitions)
313343
throws Exception {
314344
deleteSuperStreamTopology(

0 commit comments

Comments
 (0)