|
32 | 32 | import org.springframework.amqp.core.AnonymousQueue;
|
33 | 33 | import org.springframework.amqp.rabbit.core.RabbitAdmin;
|
34 | 34 | import org.springframework.amqp.rabbit.junit.AbstractTestContainerTests;
|
| 35 | +import org.springframework.amqp.rabbit.junit.BrokerRunningSupport; |
35 | 36 | import org.springframework.amqp.rabbit.junit.RabbitAvailable;
|
| 37 | +import org.springframework.amqp.rabbit.junit.RabbitAvailableCondition; |
36 | 38 | import org.springframework.core.ParameterizedTypeReference;
|
37 | 39 | import org.springframework.http.MediaType;
|
38 | 40 | import org.springframework.web.reactive.function.client.ExchangeFilterFunctions;
|
@@ -63,12 +65,15 @@ public void setup() {
|
63 | 65 | this.defaultAdmin = new RabbitAdmin(this.defaultConnectionFactory);
|
64 | 66 | this.testContainerFactory = new CachingConnectionFactory("localhost", amqpPort());
|
65 | 67 | this.testContainerAdmin = new RabbitAdmin(this.testContainerFactory);
|
66 |
| - String[] addresses = new String[] { "localhost:5672", "localhost:" + amqpPort() }; |
67 |
| - String[] adminUris = new String[] { "http://localhost:15672", "http://localhost:" + managementPort() }; |
68 |
| - String[] nodes = new String[] { "rabbit@localhost", findTcNode() }; |
| 68 | + BrokerRunningSupport brokerRunning = RabbitAvailableCondition.getBrokerRunning(); |
| 69 | + |
| 70 | + String[] addresses = new String[] { brokerRunning.getHostName() + ":" + brokerRunning.getPort(), |
| 71 | + RABBITMQ.getHost() + ":" + RABBITMQ.getAmqpPort() }; |
| 72 | + String[] adminUris = new String[] { brokerRunning.getAdminUri(), RABBITMQ.getHttpUrl() }; |
| 73 | + String[] nodes = new String[] { findLocalNode(), findTcNode() }; |
69 | 74 | String vhost = "/";
|
70 |
| - String username = "guest"; |
71 |
| - String password = "guest"; |
| 75 | + String username = brokerRunning.getAdminUser(); |
| 76 | + String password = brokerRunning.getAdminPassword(); |
72 | 77 | this.lqcf = new LocalizedQueueConnectionFactory(defaultConnectionFactory, addresses,
|
73 | 78 | adminUris, nodes, vhost, username, password, false, null);
|
74 | 79 | }
|
@@ -100,9 +105,11 @@ public void testFindCorrectConnection() throws Exception {
|
100 | 105 | @Test
|
101 | 106 | void findLocal() {
|
102 | 107 | ConnectionFactory defaultCf = mock(ConnectionFactory.class);
|
| 108 | + BrokerRunningSupport brokerRunning = RabbitAvailableCondition.getBrokerRunning(); |
103 | 109 | LocalizedQueueConnectionFactory lqcf = new LocalizedQueueConnectionFactory(defaultCf,
|
104 |
| - Map.of("rabbit@localhost", "localhost:5672"), new String[] { "http://localhost:15672" }, |
105 |
| - "/", "guest", "guest", false, null); |
| 110 | + Map.of(findLocalNode(), brokerRunning.getHostName() + ":" + brokerRunning.getPort()), |
| 111 | + new String[] { brokerRunning.getAdminUri() }, |
| 112 | + "/", brokerRunning.getAdminUser(), brokerRunning.getAdminPassword(), false, null); |
106 | 113 | ConnectionFactory cf = lqcf.getTargetConnectionFactory("[local]");
|
107 | 114 | RabbitAdmin admin = new RabbitAdmin(cf);
|
108 | 115 | assertThat(admin.getQueueProperties("local")).isNotNull();
|
@@ -139,5 +146,32 @@ private String findTcNode() {
|
139 | 146 | return (String) queueInfo.get("node");
|
140 | 147 | }
|
141 | 148 |
|
| 149 | + private String findLocalNode() { |
| 150 | + AnonymousQueue queue = new AnonymousQueue(); |
| 151 | + this.defaultAdmin.declareQueue(queue); |
| 152 | + URI uri; |
| 153 | + BrokerRunningSupport brokerRunning = RabbitAvailableCondition.getBrokerRunning(); |
| 154 | + try { |
| 155 | + uri = new URI(brokerRunning.getAdminUri()) |
| 156 | + .resolve("/api/queues/" + UriUtils.encodePathSegment("/", StandardCharsets.UTF_8) + "/" |
| 157 | + + queue.getName()); |
| 158 | + } |
| 159 | + catch (URISyntaxException ex) { |
| 160 | + throw new IllegalStateException(ex); |
| 161 | + } |
| 162 | + WebClient client = WebClient.builder() |
| 163 | + .filter(ExchangeFilterFunctions.basicAuthentication(brokerRunning.getAdminUser(), |
| 164 | + brokerRunning.getAdminPassword())) |
| 165 | + .build(); |
| 166 | + Map<String, Object> queueInfo = client.get() |
| 167 | + .uri(uri) |
| 168 | + .accept(MediaType.APPLICATION_JSON) |
| 169 | + .retrieve() |
| 170 | + .bodyToMono(new ParameterizedTypeReference<Map<String, Object>>() { |
| 171 | + }) |
| 172 | + .block(Duration.ofSeconds(10)); |
| 173 | + this.defaultAdmin.deleteQueue(queue.getName()); |
| 174 | + return (String) queueInfo.get("node"); |
| 175 | + } |
142 | 176 |
|
143 | 177 | }
|
0 commit comments