Skip to content

Commit 6152b6c

Browse files
committed
Add test for super stream exchange
1 parent 95ac7a6 commit 6152b6c

File tree

5 files changed

+243
-10
lines changed

5 files changed

+243
-10
lines changed

deps/rabbitmq_stream/test/rabbit_stream_SUITE.erl

+5
Original file line numberDiff line numberDiff line change
@@ -424,6 +424,7 @@ java(Config) ->
424424
StreamPortNode2 = get_stream_port(Config, 1),
425425
StreamPortTlsNode1 = get_stream_port_tls(Config, 0),
426426
StreamPortTlsNode2 = get_stream_port_tls(Config, 1),
427+
AmqpPortNode1 = get_amqp_port(Config),
427428
Node1Name = get_node_name(Config, 0),
428429
Node2Name = get_node_name(Config, 1),
429430
RabbitMqCtl = get_rabbitmqctl(Config),
@@ -439,6 +440,7 @@ java(Config) ->
439440
{"NODE2_STREAM_PORT=~b", [StreamPortNode2]},
440441
{"NODE2_STREAM_PORT_TLS=~b",
441442
[StreamPortTlsNode2]},
443+
{"NODE1_AMQP_PORT=~b", [AmqpPortNode1]},
442444
{"RABBITMQCTL=~tp", [RabbitMqCtl]}]),
443445
{ok, _} = MakeResult.
444446

@@ -459,6 +461,9 @@ get_stream_port_tls(Config, Node) ->
459461
rabbit_ct_broker_helpers:get_node_config(Config, Node,
460462
tcp_port_stream_tls).
461463

464+
get_amqp_port(Config) ->
465+
rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_amqp).
466+
462467
get_node_name(Config) ->
463468
get_node_name(Config, 0).
464469

deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/Makefile

+1
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ MVN_FLAGS += -Dhostname=$(HOSTNAME) \
77
-Dnode2.name=$(NODE2_NAME) \
88
-Dnode2.stream.port=$(NODE2_STREAM_PORT) \
99
-Dnode2.stream.port.tls=$(NODE2_STREAM_PORT_TLS) \
10+
-Dnode1.amqp.port=$(NODE1_AMQP_PORT) \
1011
-Drabbitmqctl.bin=$(RABBITMQCTL)
1112

1213
.PHONY: tests clean

deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/pom.xml

+14-6
Original file line numberDiff line numberDiff line change
@@ -27,13 +27,14 @@
2727

2828
<properties>
2929
<stream-client.version>[0.7.0-SNAPSHOT,)</stream-client.version>
30-
<junit.jupiter.version>5.9.0</junit.jupiter.version>
31-
<assertj.version>3.23.1</assertj.version>
30+
<amqp-client.version>5.17.0</amqp-client.version>
31+
<junit.jupiter.version>5.9.3</junit.jupiter.version>
32+
<assertj.version>3.24.2</assertj.version>
3233
<logback.version>1.2.11</logback.version>
33-
<maven.compiler.plugin.version>3.10.1</maven.compiler.plugin.version>
34-
<maven-surefire-plugin.version>2.22.2</maven-surefire-plugin.version>
35-
<spotless.version>2.24.0</spotless.version>
36-
<google-java-format.version>1.15.0</google-java-format.version>
34+
<maven.compiler.plugin.version>3.11.0</maven.compiler.plugin.version>
35+
<maven-surefire-plugin.version>3.1.0</maven-surefire-plugin.version>
36+
<spotless.version>2.37.0</spotless.version>
37+
<google-java-format.version>1.17.0</google-java-format.version>
3738
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
3839
</properties>
3940

@@ -45,6 +46,13 @@
4546
<version>${stream-client.version}</version>
4647
</dependency>
4748

49+
<dependency>
50+
<groupId>com.rabbitmq</groupId>
51+
<artifactId>amqp-client</artifactId>
52+
<version>${amqp-client.version}</version>
53+
<scope>test</scope>
54+
</dependency>
55+
4856
<dependency>
4957
<groupId>org.junit.jupiter</groupId>
5058
<artifactId>junit-jupiter-engine</artifactId>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,148 @@
1+
// Copyright (c) 2023 VMware, Inc. or its affiliates. All rights reserved.
2+
//
3+
// This software, the RabbitMQ Stream Java client library, is dual-licensed under the
4+
// Mozilla Public License 2.0 ("MPL"), and the Apache License version 2 ("ASL").
5+
// For the MPL, please see LICENSE-MPL-RabbitMQ. For the ASL,
6+
// please see LICENSE-APACHE2.
7+
//
8+
// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND,
9+
// either express or implied. See the LICENSE file for specific language governing
10+
// rights and limitations of this software.
11+
//
12+
// If you have any questions regarding licensing, please contact us at
13+
14+
package com.rabbitmq.stream;
15+
16+
import static com.rabbitmq.stream.TestUtils.*;
17+
import static org.assertj.core.api.Assertions.assertThat;
18+
19+
import com.rabbitmq.client.AMQP;
20+
import com.rabbitmq.client.Channel;
21+
import com.rabbitmq.client.Connection;
22+
import com.rabbitmq.client.ConnectionFactory;
23+
import com.rabbitmq.stream.TestUtils.CallableConsumer;
24+
import io.netty.channel.EventLoopGroup;
25+
import java.util.*;
26+
import java.util.concurrent.ConcurrentHashMap;
27+
import java.util.concurrent.CountDownLatch;
28+
import java.util.function.BiConsumer;
29+
import java.util.stream.IntStream;
30+
import org.junit.jupiter.api.AfterEach;
31+
import org.junit.jupiter.api.BeforeEach;
32+
import org.junit.jupiter.api.Test;
33+
import org.junit.jupiter.api.TestInfo;
34+
import org.junit.jupiter.api.extension.ExtendWith;
35+
36+
@ExtendWith(TestUtils.StreamTestInfrastructureExtension.class)
37+
public class SuperStreamExchangeTest {
38+
39+
EventLoopGroup eventLoopGroup;
40+
41+
Environment environment;
42+
43+
Connection connection;
44+
int partitions = 3;
45+
int messageCount = 10_000;
46+
String superStream;
47+
48+
@BeforeEach
49+
void init(TestInfo info) throws Exception {
50+
EnvironmentBuilder environmentBuilder =
51+
Environment.builder()
52+
.port(TestUtils.streamPortNode1())
53+
.netty()
54+
.eventLoopGroup(eventLoopGroup)
55+
.environmentBuilder();
56+
environment = environmentBuilder.build();
57+
ConnectionFactory cf = new ConnectionFactory();
58+
cf.setPort(TestUtils.amqpPortNode1());
59+
connection = cf.newConnection();
60+
superStream = TestUtils.streamName(info);
61+
}
62+
63+
@AfterEach
64+
void tearDown() throws Exception {
65+
environment.close();
66+
deleteSuperStreamTopology(connection, superStream, partitions);
67+
connection.close();
68+
}
69+
70+
@Test
71+
void publish() throws Exception {
72+
declareSuperStreamTopology(connection, superStream, partitions);
73+
List<String> routingKeys = new ArrayList<>(messageCount);
74+
IntStream.range(0, messageCount)
75+
.forEach(ignored -> routingKeys.add(UUID.randomUUID().toString()));
76+
77+
CountDownLatch publishLatch = new CountDownLatch(messageCount);
78+
try (Producer producer =
79+
environment
80+
.producerBuilder()
81+
.superStream(superStream)
82+
.routing(msg -> msg.getProperties().getMessageIdAsString())
83+
.producerBuilder()
84+
.build()) {
85+
ConfirmationHandler confirmationHandler = status -> publishLatch.countDown();
86+
routingKeys.forEach(
87+
rk ->
88+
producer.send(
89+
producer.messageBuilder().properties().messageId(rk).messageBuilder().build(),
90+
confirmationHandler));
91+
92+
assertThat(publishLatch).is(completed());
93+
}
94+
95+
CallableConsumer<Map<String, Set<String>>> consumeMessages =
96+
receivedMessages -> {
97+
CountDownLatch consumeLatch = new CountDownLatch(messageCount);
98+
try (Consumer ignored =
99+
environment
100+
.consumerBuilder()
101+
.superStream(superStream)
102+
.offset(OffsetSpecification.first())
103+
.messageHandler(
104+
(ctx, msg) -> {
105+
receivedMessages
106+
.computeIfAbsent(ctx.stream(), k -> ConcurrentHashMap.newKeySet())
107+
.add(msg.getProperties().getMessageIdAsString());
108+
consumeLatch.countDown();
109+
})
110+
.build()) {
111+
112+
assertThat(consumeLatch).is(completed());
113+
assertThat(receivedMessages.values().stream().mapToInt(Set::size).sum())
114+
.isEqualTo(messageCount);
115+
}
116+
};
117+
118+
Map<String, Set<String>> streamProducerMessages = new ConcurrentHashMap<>(partitions);
119+
consumeMessages.accept(streamProducerMessages);
120+
121+
deleteSuperStreamTopology(connection, superStream, partitions);
122+
declareSuperStreamTopology(connection, superStream, partitions);
123+
124+
try (Channel channel = connection.createChannel()) {
125+
channel.confirmSelect();
126+
for (String rk : routingKeys) {
127+
channel.basicPublish(
128+
superStream, rk, new AMQP.BasicProperties.Builder().messageId(rk).build(), null);
129+
}
130+
channel.waitForConfirmsOrDie();
131+
}
132+
133+
Map<String, Set<String>> amqpProducerMessages = new ConcurrentHashMap<>(partitions);
134+
consumeMessages.accept(amqpProducerMessages);
135+
assertThat(amqpProducerMessages)
136+
.hasSameSizeAs(streamProducerMessages)
137+
.containsKeys(streamProducerMessages.keySet().toArray(new String[] {}));
138+
139+
BiConsumer<Set<String>, Set<String>> compareSets =
140+
(s1, s2) -> {
141+
assertThat(s1).hasSameSizeAs(s2);
142+
s1.forEach(rk -> assertThat(s2).contains(rk));
143+
};
144+
145+
amqpProducerMessages.forEach(
146+
(key, value) -> compareSets.accept(value, streamProducerMessages.get(key)));
147+
}
148+
}

deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/java/com/rabbitmq/stream/TestUtils.java

+75-4
Original file line numberDiff line numberDiff line change
@@ -21,17 +21,20 @@
2121
import static org.assertj.core.api.Assertions.assertThat;
2222
import static org.junit.jupiter.api.Assertions.fail;
2323

24+
import com.rabbitmq.client.Channel;
25+
import com.rabbitmq.client.Connection;
2426
import com.rabbitmq.stream.impl.Client;
2527
import com.rabbitmq.stream.impl.Client.Response;
2628
import io.netty.channel.EventLoopGroup;
2729
import io.netty.channel.nio.NioEventLoopGroup;
2830
import java.lang.reflect.Field;
2931
import java.lang.reflect.Method;
3032
import java.time.Duration;
31-
import java.util.Set;
32-
import java.util.UUID;
33+
import java.util.*;
3334
import java.util.concurrent.ConcurrentHashMap;
35+
import java.util.concurrent.CountDownLatch;
3436
import java.util.function.BooleanSupplier;
37+
import java.util.stream.IntStream;
3538
import org.assertj.core.api.Condition;
3639
import org.junit.jupiter.api.TestInfo;
3740
import org.junit.jupiter.api.extension.*;
@@ -40,12 +43,17 @@ public class TestUtils {
4043

4144
static int streamPortNode1() {
4245
String port = System.getProperty("node1.stream.port", "5552");
43-
return Integer.valueOf(port);
46+
return Integer.parseInt(port);
4447
}
4548

4649
static int streamPortNode2() {
4750
String port = System.getProperty("node2.stream.port", "5552");
48-
return Integer.valueOf(port);
51+
return Integer.parseInt(port);
52+
}
53+
54+
static int amqpPortNode1() {
55+
String port = System.getProperty("node1.amqp.port", "5672");
56+
return Integer.parseInt(port);
4957
}
5058

5159
static void waitUntil(BooleanSupplier condition) throws InterruptedException {
@@ -218,4 +226,67 @@ static Condition<Response> responseCode(short expectedResponse) {
218226
expectedResponse);
219227
}
220228
}
229+
230+
static void deleteSuperStreamTopology(Connection connection, String superStream, int partitions)
231+
throws Exception {
232+
String[] routingKeys =
233+
IntStream.range(0, partitions).mapToObj(String::valueOf).toArray(String[]::new);
234+
try (Channel ch = connection.createChannel()) {
235+
ch.exchangeDelete(superStream);
236+
for (String routingKey : routingKeys) {
237+
String partitionName = superStream + "-" + routingKey;
238+
ch.queueDelete(partitionName);
239+
}
240+
}
241+
}
242+
243+
static void declareSuperStreamTopology(Connection connection, String superStream, int partitions)
244+
throws Exception {
245+
String[] rks = IntStream.range(0, partitions).mapToObj(String::valueOf).toArray(String[]::new);
246+
try (Channel ch = connection.createChannel()) {
247+
ch.exchangeDeclare(
248+
superStream,
249+
"x-super-stream",
250+
true,
251+
false,
252+
Collections.singletonMap("x-super-stream", true));
253+
254+
List<Object[]> bindings = new ArrayList<>(rks.length);
255+
for (int i = 0; i < rks.length; i++) {
256+
bindings.add(new Object[] {rks[i], i});
257+
}
258+
// shuffle the order to make sure we get in the correct order from the server
259+
Collections.shuffle(bindings);
260+
261+
for (Object[] binding : bindings) {
262+
String routingKey = (String) binding[0];
263+
String partitionName = superStream + "-" + routingKey;
264+
ch.queueDeclare(
265+
partitionName, true, false, false, Collections.singletonMap("x-queue-type", "stream"));
266+
ch.queueBind(
267+
partitionName,
268+
superStream,
269+
routingKey,
270+
Collections.singletonMap("x-stream-partition-order", binding[1]));
271+
}
272+
}
273+
}
274+
275+
static Condition<CountDownLatch> completed() {
276+
return new Condition<>(
277+
countDownLatch -> {
278+
try {
279+
return countDownLatch.await(10, SECONDS);
280+
} catch (InterruptedException e) {
281+
Thread.interrupted();
282+
throw new RuntimeException(e);
283+
}
284+
},
285+
"not completed after 10 seconds");
286+
}
287+
288+
interface CallableConsumer<T> {
289+
290+
void accept(T o) throws Exception;
291+
}
221292
}

0 commit comments

Comments
 (0)