Skip to content

Commit a5f2e19

Browse files
Merge pull request #420 from rabbitmq/rabbitmq-java-client-417-rpc-mandatory-flag
Add mandatory flag option to RpcClient
2 parents b233e14 + 7f89b26 commit a5f2e19

File tree

4 files changed

+406
-38
lines changed

4 files changed

+406
-38
lines changed

src/main/java/com/rabbitmq/client/RpcClient.java

+77-22
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import java.util.Map;
2828
import java.util.Map.Entry;
2929
import java.util.concurrent.TimeoutException;
30+
import java.util.function.Function;
3031

3132
import com.rabbitmq.client.impl.MethodArgumentReader;
3233
import com.rabbitmq.client.impl.MethodArgumentWriter;
@@ -58,6 +59,27 @@ public class RpcClient {
5859
private final int _timeout;
5960
/** NO_TIMEOUT value must match convention on {@link BlockingCell#uninterruptibleGet(int)} */
6061
protected final static int NO_TIMEOUT = -1;
62+
/** Whether to publish RPC requests with the mandatory flag or not. */
63+
private final boolean _useMandatory;
64+
65+
public final static Function<Object, Response> DEFAULT_REPLY_HANDLER = reply -> {
66+
if (reply instanceof ShutdownSignalException) {
67+
ShutdownSignalException sig = (ShutdownSignalException) reply;
68+
ShutdownSignalException wrapper =
69+
new ShutdownSignalException(sig.isHardError(),
70+
sig.isInitiatedByApplication(),
71+
sig.getReason(),
72+
sig.getReference());
73+
wrapper.initCause(sig);
74+
throw wrapper;
75+
} else if (reply instanceof UnroutableRpcRequestException) {
76+
throw (UnroutableRpcRequestException) reply;
77+
} else {
78+
return (Response) reply;
79+
}
80+
};
81+
82+
private final Function<Object, Response> _replyHandler;
6183

6284
/** Map from request correlation ID to continuation BlockingCell */
6385
private final Map<String, BlockingCell<Object>> _continuationMap = new HashMap<String, BlockingCell<Object>>();
@@ -67,6 +89,46 @@ public class RpcClient {
6789
/** Consumer attached to our reply queue */
6890
private DefaultConsumer _consumer;
6991

92+
/**
93+
* Construct a {@link RpcClient} with the passed-in {@link RpcClientParams}.
94+
*
95+
* @param params
96+
* @throws IOException
97+
* @see RpcClientParams
98+
* @since 5.6.0
99+
*/
100+
public RpcClient(RpcClientParams params) throws
101+
IOException {
102+
_channel = params.getChannel();
103+
_exchange = params.getExchange();
104+
_routingKey = params.getRoutingKey();
105+
_replyTo = params.getReplyTo();
106+
if (params.getTimeout() < NO_TIMEOUT) {
107+
throw new IllegalArgumentException("Timeout argument must be NO_TIMEOUT(-1) or non-negative.");
108+
}
109+
_timeout = params.getTimeout();
110+
_useMandatory = params.shouldUseMandatory();
111+
_replyHandler = params.getReplyHandler();
112+
_correlationId = 0;
113+
114+
_consumer = setupConsumer();
115+
if (_useMandatory) {
116+
this._channel.addReturnListener(returnMessage -> {
117+
synchronized (_continuationMap) {
118+
String replyId = returnMessage.getProperties().getCorrelationId();
119+
BlockingCell<Object> blocker = _continuationMap.remove(replyId);
120+
if (blocker == null) {
121+
// Entry should have been removed if request timed out,
122+
// log a warning nevertheless.
123+
LOGGER.warn("No outstanding request for correlation ID {}", replyId);
124+
} else {
125+
blocker.set(new UnroutableRpcRequestException(returnMessage));
126+
}
127+
}
128+
});
129+
}
130+
}
131+
70132
/**
71133
* Construct a new RpcClient that will communicate on the given channel, sending
72134
* requests to the given exchange with the given routing key.
@@ -78,18 +140,16 @@ public class RpcClient {
78140
* @param replyTo the queue where the server should put the reply
79141
* @param timeout milliseconds before timing out on wait for response
80142
* @throws IOException if an error is encountered
143+
* @deprecated use {@link RpcClient#RpcClient(RpcClientParams)} instead, will be removed in 6.0.0
81144
*/
145+
@Deprecated
82146
public RpcClient(Channel channel, String exchange, String routingKey, String replyTo, int timeout) throws
83147
IOException {
84-
_channel = channel;
85-
_exchange = exchange;
86-
_routingKey = routingKey;
87-
_replyTo = replyTo;
88-
if (timeout < NO_TIMEOUT) throw new IllegalArgumentException("Timeout arguument must be NO_TIMEOUT(-1) or non-negative.");
89-
_timeout = timeout;
90-
_correlationId = 0;
91-
92-
_consumer = setupConsumer();
148+
this(new RpcClientParams()
149+
.channel(channel).exchange(exchange).routingKey(routingKey)
150+
.replyTo(replyTo).timeout(timeout)
151+
.useMandatory(false)
152+
);
93153
}
94154

95155
/**
@@ -106,7 +166,9 @@ public RpcClient(Channel channel, String exchange, String routingKey, String rep
106166
* @param routingKey the routing key
107167
* @param replyTo the queue where the server should put the reply
108168
* @throws IOException if an error is encountered
169+
* @deprecated use {@link RpcClient#RpcClient(RpcClientParams)} instead, will be removed in 6.0.0
109170
*/
171+
@Deprecated
110172
public RpcClient(Channel channel, String exchange, String routingKey, String replyTo) throws IOException {
111173
this(channel, exchange, routingKey, replyTo, NO_TIMEOUT);
112174
}
@@ -123,7 +185,9 @@ public RpcClient(Channel channel, String exchange, String routingKey, String rep
123185
* @param exchange the exchange to connect to
124186
* @param routingKey the routing key
125187
* @throws IOException if an error is encountered
188+
* @deprecated use {@link RpcClient#RpcClient(RpcClientParams)} instead, will be removed in 6.0.0
126189
*/
190+
@Deprecated
127191
public RpcClient(Channel channel, String exchange, String routingKey) throws IOException {
128192
this(channel, exchange, routingKey, "amq.rabbitmq.reply-to", NO_TIMEOUT);
129193
}
@@ -142,7 +206,9 @@ public RpcClient(Channel channel, String exchange, String routingKey) throws IOE
142206
* @param routingKey the routing key
143207
* @param timeout milliseconds before timing out on wait for response
144208
* @throws IOException if an error is encountered
209+
* @deprecated use {@link RpcClient#RpcClient(RpcClientParams)} instead, will be removed in 6.0.0
145210
*/
211+
@Deprecated
146212
public RpcClient(Channel channel, String exchange, String routingKey, int timeout) throws IOException {
147213
this(channel, exchange, routingKey, "amq.rabbitmq.reply-to", timeout);
148214
}
@@ -213,7 +279,7 @@ public void handleDelivery(String consumerTag,
213279
public void publish(AMQP.BasicProperties props, byte[] message)
214280
throws IOException
215281
{
216-
_channel.basicPublish(_exchange, _routingKey, props, message);
282+
_channel.basicPublish(_exchange, _routingKey, _useMandatory, props, message);
217283
}
218284

219285
public Response doCall(AMQP.BasicProperties props, byte[] message)
@@ -242,18 +308,7 @@ public Response doCall(AMQP.BasicProperties props, byte[] message, int timeout)
242308
_continuationMap.remove(replyId);
243309
throw ex;
244310
}
245-
if (reply instanceof ShutdownSignalException) {
246-
ShutdownSignalException sig = (ShutdownSignalException) reply;
247-
ShutdownSignalException wrapper =
248-
new ShutdownSignalException(sig.isHardError(),
249-
sig.isInitiatedByApplication(),
250-
sig.getReason(),
251-
sig.getReference());
252-
wrapper.initCause(sig);
253-
throw wrapper;
254-
} else {
255-
return (Response) reply;
256-
}
311+
return _replyHandler.apply(reply);
257312
}
258313

259314
public byte[] primitiveCall(AMQP.BasicProperties props, byte[] message)
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,193 @@
1+
// Copyright (c) 2007-Present Pivotal Software, Inc. All rights reserved.
2+
//
3+
// This software, the RabbitMQ Java client library, is triple-licensed under the
4+
// Mozilla Public License 1.1 ("MPL"), the GNU General Public License version 2
5+
// ("GPL") and the Apache License version 2 ("ASL"). For the MPL, please see
6+
// LICENSE-MPL-RabbitMQ. For the GPL, please see LICENSE-GPL2. For the ASL,
7+
// please see LICENSE-APACHE2.
8+
//
9+
// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND,
10+
// either express or implied. See the LICENSE file for specific language governing
11+
// rights and limitations of this software.
12+
//
13+
// If you have any questions regarding licensing, please contact us at
14+
15+
16+
package com.rabbitmq.client;
17+
18+
import java.util.function.Function;
19+
20+
/**
21+
* Holder class to configure a {@link RpcClient}.
22+
*
23+
* @see RpcClient#RpcClient(RpcClientParams)
24+
* @since 5.6.0
25+
*/
26+
public class RpcClientParams {
27+
28+
/**
29+
* Channel we are communicating on
30+
*/
31+
private Channel channel;
32+
/**
33+
* Exchange to send requests to
34+
*/
35+
private String exchange;
36+
/**
37+
* Routing key to use for requests
38+
*/
39+
private String routingKey;
40+
/**
41+
* Queue where the server should put the reply
42+
*/
43+
private String replyTo = "amq.rabbitmq.reply-to";
44+
/**
45+
* Timeout in milliseconds to use on call responses
46+
*/
47+
private int timeout = RpcClient.NO_TIMEOUT;
48+
/**
49+
* Whether to publish RPC requests with the mandatory flag or not.
50+
*/
51+
private boolean useMandatory = false;
52+
/**
53+
* Behavior to handle reply messages.
54+
*/
55+
private Function<Object, RpcClient.Response> replyHandler = RpcClient.DEFAULT_REPLY_HANDLER;
56+
57+
/**
58+
* Set the channel to use for communication.
59+
*
60+
* @return
61+
*/
62+
public Channel getChannel() {
63+
return channel;
64+
}
65+
66+
public RpcClientParams channel(Channel channel) {
67+
this.channel = channel;
68+
return this;
69+
}
70+
71+
/**
72+
* Set the exchange to send requests to.
73+
*
74+
* @return
75+
*/
76+
public String getExchange() {
77+
return exchange;
78+
}
79+
80+
public RpcClientParams exchange(String exchange) {
81+
this.exchange = exchange;
82+
return this;
83+
}
84+
85+
public String getRoutingKey() {
86+
return routingKey;
87+
}
88+
89+
/**
90+
* Set the routing key to use for requests.
91+
*
92+
* @param routingKey
93+
* @return
94+
*/
95+
public RpcClientParams routingKey(String routingKey) {
96+
this.routingKey = routingKey;
97+
return this;
98+
}
99+
100+
public String getReplyTo() {
101+
return replyTo;
102+
}
103+
104+
/**
105+
* Set the queue where the server should put replies on.
106+
* <p>
107+
* The default is to use
108+
* <a href="https://www.rabbitmq.com/direct-reply-to.html">Direct Reply-to</a>.
109+
* Using another value will cause the creation of a temporary private
110+
* auto-delete queue.
111+
* <p>
112+
* The default shouldn't be changed for performance reasons.
113+
*
114+
* @param replyTo
115+
* @return
116+
*/
117+
public RpcClientParams replyTo(String replyTo) {
118+
this.replyTo = replyTo;
119+
return this;
120+
}
121+
122+
public int getTimeout() {
123+
return timeout;
124+
}
125+
126+
/**
127+
* Set the timeout in milliseconds to use on call responses.
128+
*
129+
* @param timeout
130+
* @return
131+
*/
132+
public RpcClientParams timeout(int timeout) {
133+
this.timeout = timeout;
134+
return this;
135+
}
136+
137+
/**
138+
* Whether to publish RPC requests with the mandatory flag or not.
139+
* <p>
140+
* Default is to not publish requests with the mandatory flag
141+
* set to true.
142+
* <p>
143+
* When set to true, unroutable requests will result
144+
* in {@link UnroutableRpcRequestException} exceptions thrown.
145+
* Use a custom reply handler to change this behavior.
146+
*
147+
* @param useMandatory
148+
* @return
149+
* @see #replyHandler(Function)
150+
*/
151+
public RpcClientParams useMandatory(boolean useMandatory) {
152+
this.useMandatory = useMandatory;
153+
return this;
154+
}
155+
156+
/**
157+
* Instructs to use the mandatory flag when publishing RPC requests.
158+
* <p>
159+
* Unroutable requests will result in {@link UnroutableRpcRequestException} exceptions
160+
* thrown. Use a custom reply handler to change this behavior.
161+
*
162+
* @return
163+
* @see #replyHandler(Function)
164+
*/
165+
public RpcClientParams useMandatory() {
166+
return useMandatory(true);
167+
}
168+
169+
public boolean shouldUseMandatory() {
170+
return useMandatory;
171+
}
172+
173+
public Function<Object, RpcClient.Response> getReplyHandler() {
174+
return replyHandler;
175+
}
176+
177+
/**
178+
* Set the behavior to use when receiving replies.
179+
* <p>
180+
* The default is to wrap the reply into a {@link com.rabbitmq.client.RpcClient.Response}
181+
* instance. Unroutable requests will result in {@link UnroutableRpcRequestException}
182+
* exceptions.
183+
*
184+
* @param replyHandler
185+
* @return
186+
* @see #useMandatory()
187+
* @see #useMandatory(boolean)
188+
*/
189+
public RpcClientParams replyHandler(Function<Object, RpcClient.Response> replyHandler) {
190+
this.replyHandler = replyHandler;
191+
return this;
192+
}
193+
}

0 commit comments

Comments
 (0)