|
21 | 21 | import java.util.Arrays;
|
22 | 22 | import java.util.BitSet;
|
23 | 23 | import java.util.Collection;
|
| 24 | +import java.util.Collections; |
24 | 25 | import java.util.HashSet;
|
25 | 26 | import java.util.Iterator;
|
26 | 27 | import java.util.List;
|
@@ -148,7 +149,7 @@ protected Consumer<Integer, String> createKafkaConsumer(String groupId, String c
|
148 | 149 |
|
149 | 150 | final CountDownLatch latch = new CountDownLatch(3);
|
150 | 151 | final Set<String> listenerThreadNames = new ConcurrentSkipListSet<>();
|
151 |
| - final List<String> payloads = new ArrayList<>(); |
| 152 | + List<String> payloads = Collections.synchronizedList(new ArrayList<>()); |
152 | 153 | containerProps.setMessageListener((MessageListener<Integer, String>) message -> {
|
153 | 154 | ConcurrentMessageListenerContainerTests.this.logger.info("auto: " + message);
|
154 | 155 | listenerThreadNames.add(Thread.currentThread().getName());
|
@@ -198,7 +199,9 @@ protected Consumer<Integer, String> createKafkaConsumer(String groupId, String c
|
198 | 199 | template.flush();
|
199 | 200 | assertThat(intercepted.await(10, TimeUnit.SECONDS)).isTrue();
|
200 | 201 | assertThat(latch.await(60, TimeUnit.SECONDS)).isTrue();
|
201 |
| - assertThat(payloads).containsExactlyInAnyOrder("foo", "bar", "qux"); |
| 202 | + synchronized (payloads) { |
| 203 | + assertThat(payloads).containsExactlyInAnyOrder("foo", "bar", "qux"); |
| 204 | + } |
202 | 205 | assertThat(listenerThreadNames).contains("testAuto-0", "testAuto-1");
|
203 | 206 | List<KafkaMessageListenerContainer<Integer, String>> containers = KafkaTestUtils.getPropertyValue(container,
|
204 | 207 | "containers", List.class);
|
|
0 commit comments