19
19
import static org .assertj .core .api .Assertions .assertThat ;
20
20
import static org .mockito .Mockito .mock ;
21
21
22
+ import java .net .URI ;
23
+ import java .net .URISyntaxException ;
24
+ import java .nio .charset .StandardCharsets ;
25
+ import java .time .Duration ;
22
26
import java .util .Map ;
23
27
import java .util .UUID ;
24
28
25
29
import org .junit .jupiter .api .AfterEach ;
26
30
import org .junit .jupiter .api .BeforeEach ;
27
31
import org .junit .jupiter .api .Test ;
28
32
33
+ import org .springframework .amqp .core .AnonymousQueue ;
29
34
import org .springframework .amqp .core .Queue ;
30
35
import org .springframework .amqp .rabbit .core .RabbitAdmin ;
31
36
import org .springframework .amqp .rabbit .core .RabbitTemplate ;
37
+ import org .springframework .amqp .rabbit .junit .BrokerRunningSupport ;
32
38
import org .springframework .amqp .rabbit .junit .RabbitAvailable ;
39
+ import org .springframework .amqp .rabbit .junit .RabbitAvailableCondition ;
40
+ import org .springframework .core .ParameterizedTypeReference ;
41
+ import org .springframework .http .MediaType ;
42
+ import org .springframework .web .reactive .function .client .ExchangeFilterFunctions ;
43
+ import org .springframework .web .reactive .function .client .WebClient ;
44
+ import org .springframework .web .util .UriUtils ;
33
45
34
46
35
47
/**
@@ -43,23 +55,32 @@ public class LocalizedQueueConnectionFactoryIntegrationTests {
43
55
44
56
private CachingConnectionFactory defaultConnectionFactory ;
45
57
58
+ private RabbitAdmin defaultAdmin ;
59
+
46
60
@ BeforeEach
47
61
public void setup () {
48
62
this .defaultConnectionFactory = new CachingConnectionFactory ("localhost" );
49
- String [] addresses = new String [] { "localhost:9999" , "localhost:5672" };
50
- String [] adminUris = new String [] { "http://localhost:15672" , "http://localhost:15672" };
51
- String [] nodes = new String [] { "foo@bar" , "rabbit@localhost" };
63
+ this .defaultAdmin = new RabbitAdmin (this .defaultConnectionFactory );
64
+ BrokerRunningSupport brokerRunning = RabbitAvailableCondition .getBrokerRunning ();
65
+ String [] addresses = new String [] { "localhost:9999" ,
66
+ brokerRunning .getHostName () + ":" + brokerRunning .getPort () };
67
+ String [] adminUris = new String [] { brokerRunning .getAdminUri (), brokerRunning .getAdminUri () };
68
+ String [] nodes = new String [] { "foo@bar" , findLocalNode () };
52
69
String vhost = "/" ;
53
- String username = "guest" ;
54
- String password = "guest" ;
70
+ String username = brokerRunning . getAdminUser () ;
71
+ String password = brokerRunning . getAdminPassword () ;
55
72
this .lqcf = new LocalizedQueueConnectionFactory (defaultConnectionFactory , addresses ,
56
73
adminUris , nodes , vhost , username , password , false , null );
57
74
}
58
75
59
76
@ AfterEach
60
77
public void tearDown () {
61
- this .lqcf .destroy ();
62
- this .defaultConnectionFactory .destroy ();
78
+ if (this .lqcf != null ) {
79
+ this .lqcf .destroy ();
80
+ }
81
+ if (this .defaultConnectionFactory != null ) {
82
+ this .defaultConnectionFactory .destroy ();
83
+ }
63
84
}
64
85
65
86
@ Test
@@ -68,6 +89,7 @@ public void testConnect() throws Exception {
68
89
Queue queue = new Queue (UUID .randomUUID ().toString (), false , false , true );
69
90
admin .declareQueue (queue );
70
91
ConnectionFactory targetConnectionFactory = this .lqcf .getTargetConnectionFactory ("[" + queue .getName () + "]" );
92
+ assertThat (targetConnectionFactory ).isNotSameAs (this .defaultConnectionFactory );
71
93
RabbitTemplate template = new RabbitTemplate (targetConnectionFactory );
72
94
template .convertAndSend ("" , queue .getName (), "foo" );
73
95
assertThat (template .receiveAndConvert (queue .getName ())).isEqualTo ("foo" );
@@ -77,9 +99,11 @@ public void testConnect() throws Exception {
77
99
@ Test
78
100
void findLocal () {
79
101
ConnectionFactory defaultCf = mock (ConnectionFactory .class );
102
+ BrokerRunningSupport brokerRunning = RabbitAvailableCondition .getBrokerRunning ();
80
103
LocalizedQueueConnectionFactory lqcf = new LocalizedQueueConnectionFactory (defaultCf ,
81
- Map .of ("rabbit@localhost" , "localhost:5672" ), new String [] { "http://localhost:15672" },
82
- "/" , "guest" , "guest" , false , null );
104
+ Map .of (findLocalNode (), brokerRunning .getHostName () + ":" + brokerRunning .getPort ()),
105
+ new String [] { brokerRunning .getAdminUri () },
106
+ "/" , brokerRunning .getAdminUser (), brokerRunning .getAdminPassword (), false , null );
83
107
ConnectionFactory cf = lqcf .getTargetConnectionFactory ("[local]" );
84
108
RabbitAdmin admin = new RabbitAdmin (cf );
85
109
assertThat (admin .getQueueProperties ("local" )).isNotNull ();
@@ -89,4 +113,32 @@ void findLocal() {
89
113
lqcf .destroy ();
90
114
}
91
115
116
+ private String findLocalNode () {
117
+ AnonymousQueue queue = new AnonymousQueue ();
118
+ this .defaultAdmin .declareQueue (queue );
119
+ URI uri ;
120
+ BrokerRunningSupport brokerRunning = RabbitAvailableCondition .getBrokerRunning ();
121
+ try {
122
+ uri = new URI (brokerRunning .getAdminUri ())
123
+ .resolve ("/api/queues/" + UriUtils .encodePathSegment ("/" , StandardCharsets .UTF_8 ) + "/"
124
+ + queue .getName ());
125
+ }
126
+ catch (URISyntaxException ex ) {
127
+ throw new IllegalStateException (ex );
128
+ }
129
+ WebClient client = WebClient .builder ()
130
+ .filter (ExchangeFilterFunctions .basicAuthentication (brokerRunning .getAdminUser (),
131
+ brokerRunning .getAdminPassword ()))
132
+ .build ();
133
+ Map <String , Object > queueInfo = client .get ()
134
+ .uri (uri )
135
+ .accept (MediaType .APPLICATION_JSON )
136
+ .retrieve ()
137
+ .bodyToMono (new ParameterizedTypeReference <Map <String , Object >>() {
138
+ })
139
+ .block (Duration .ofSeconds (10 ));
140
+ this .defaultAdmin .deleteQueue (queue .getName ());
141
+ return (String ) queueInfo .get ("node" );
142
+ }
143
+
92
144
}
0 commit comments