Skip to content

Commit 5eb36bf

Browse files
committed
Mark transient non-exclusive queues as deprecated
[Why] Transient queues are queues that are removed upon node restart. An application developer can't rely on such an arbitrary event to reason about a queue's lifetime. The only exception are exclusive transient queues which have a lifetime linked to that of a client connection. [How] Non-exclusive transient queues are marked as deprecated in the code using the Deprecated features subsystem (based on feature flags). See pull request #7390 for a description of that subsystem. To test RabbitMQ behavior as if the feature was removed, the following configuration setting can be used: deprecated_features.permit.transient_nonexcl_queues = false Non-exclusive transient queues can be turned off anytime, there are no conditions to do that. Once non-exclusive transient queues are turned off, declaring a new queue with those arguments will be rejected with a protocol error. Note that given the marketing calendar, the deprecated feature will go directly from "permitted by default" to "removed" in RabbitMQ 4.0. It won't go through the gradual deprecation process.
1 parent 3d3e71c commit 5eb36bf

File tree

2 files changed

+88
-6
lines changed

2 files changed

+88
-6
lines changed

deps/rabbit/src/rabbit_amqqueue.erl

+32-3
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,12 @@
107107

108108
%%----------------------------------------------------------------------------
109109

110+
-rabbit_deprecated_feature(
111+
{transient_nonexcl_queues,
112+
#{deprecation_phase => permitted_by_default,
113+
doc_url => "https://blog.rabbitmq.com/posts/2021/08/4.0-deprecation-announcements/#removal-of-transient-non-exclusive-queues"
114+
}}).
115+
110116
-define(CONSUMER_INFO_KEYS,
111117
[queue_name, channel_pid, consumer_tag, ack_required, prefetch_count,
112118
active, activity_status, arguments]).
@@ -223,7 +229,14 @@ declare(QueueName = #resource{virtual_host = VHost}, Durable, AutoDelete, Args,
223229
VHost,
224230
#{user => ActingUser},
225231
Type),
226-
rabbit_queue_type:declare(Q, Node);
232+
case is_queue_args_combination_permitted(Q) of
233+
true ->
234+
rabbit_queue_type:declare(Q, Node);
235+
false ->
236+
Warning = rabbit_deprecated_features:get_warning(
237+
transient_nonexcl_queues),
238+
{protocol_error, internal_error, "~ts", [Warning]}
239+
end;
227240
false ->
228241
{protocol_error, internal_error,
229242
"Cannot declare a queue '~ts' of type '~ts' on node '~ts': "
@@ -721,8 +734,11 @@ augment_declare_args(VHost, Durable, Exclusive, AutoDelete, Args0) ->
721734
when is_binary(DefaultQueueType) andalso
722735
not HasQTypeArg ->
723736
Type = rabbit_queue_type:discover(DefaultQueueType),
724-
case rabbit_queue_type:is_compatible(Type, Durable,
725-
Exclusive, AutoDelete) of
737+
IsPermitted = is_queue_args_combination_permitted(
738+
Durable, Exclusive),
739+
IsCompatible = rabbit_queue_type:is_compatible(
740+
Type, Durable, Exclusive, AutoDelete),
741+
case IsPermitted andalso IsCompatible of
726742
true ->
727743
%% patch up declare arguments with x-queue-type if there
728744
%% is a vhost default set the queue is druable and not exclusive
@@ -2074,3 +2090,16 @@ get_bcc_queue(Q, BCCName) ->
20742090
#resource{virtual_host = VHost} = amqqueue:get_name(Q),
20752091
BCCQueueName = rabbit_misc:r(VHost, queue, BCCName),
20762092
rabbit_amqqueue:lookup(BCCQueueName).
2093+
2094+
is_queue_args_combination_permitted(Q) ->
2095+
Durable = amqqueue:is_durable(Q),
2096+
Exclusive = is_exclusive(Q),
2097+
is_queue_args_combination_permitted(Durable, Exclusive).
2098+
2099+
is_queue_args_combination_permitted(Durable, Exclusive) ->
2100+
case not Durable andalso not Exclusive of
2101+
false ->
2102+
true;
2103+
true ->
2104+
rabbit_deprecated_features:is_permitted(transient_nonexcl_queues)
2105+
end.

deps/rabbit/test/rabbitmq_4_0_deprecations_SUITE.erl

+56-3
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,10 @@
2929
join_when_ram_node_type_is_not_permitted_from_conf/1,
3030

3131
set_policy_when_cmq_is_permitted_by_default/1,
32-
set_policy_when_cmq_is_not_permitted_from_conf/1
32+
set_policy_when_cmq_is_not_permitted_from_conf/1,
33+
34+
when_transient_nonexcl_is_permitted_by_default/1,
35+
when_transient_nonexcl_is_not_permitted_from_conf/1
3336
]).
3437

3538
suite() ->
@@ -39,7 +42,8 @@ all() ->
3942
[
4043
{group, global_qos},
4144
{group, ram_node_type},
42-
{group, classic_queue_mirroring}
45+
{group, classic_queue_mirroring},
46+
{group, transient_nonexcl_queues}
4347
].
4448

4549
groups() ->
@@ -52,7 +56,10 @@ groups() ->
5256
join_when_ram_node_type_is_not_permitted_from_conf]},
5357
{classic_queue_mirroring, [],
5458
[set_policy_when_cmq_is_permitted_by_default,
55-
set_policy_when_cmq_is_not_permitted_from_conf]}
59+
set_policy_when_cmq_is_not_permitted_from_conf]},
60+
{transient_nonexcl_queues, [],
61+
[when_transient_nonexcl_is_permitted_by_default,
62+
when_transient_nonexcl_is_not_permitted_from_conf]}
5663
].
5764

5865
%% -------------------------------------------------------------------
@@ -76,6 +83,8 @@ init_per_group(ram_node_type, Config) ->
7683
{rmq_nodes_clustered, false}]);
7784
init_per_group(classic_queue_mirroring, Config) ->
7885
rabbit_ct_helpers:set_config(Config, {rmq_nodes_count, 1});
86+
init_per_group(transient_nonexcl_queues, Config) ->
87+
rabbit_ct_helpers:set_config(Config, {rmq_nodes_count, 1});
7988
init_per_group(_Group, Config) ->
8089
Config.
8190

@@ -104,6 +113,14 @@ init_per_testcase(
104113
[{permit_deprecated_features,
105114
#{classic_queue_mirroring => false}}]}),
106115
init_per_testcase1(Testcase, Config1);
116+
init_per_testcase(
117+
when_transient_nonexcl_is_not_permitted_from_conf = Testcase, Config) ->
118+
Config1 = rabbit_ct_helpers:merge_app_env(
119+
Config,
120+
{rabbit,
121+
[{permit_deprecated_features,
122+
#{transient_nonexcl_queues => false}}]}),
123+
init_per_testcase1(Testcase, Config1);
107124
init_per_testcase(Testcase, Config) ->
108125
init_per_testcase1(Testcase, Config).
109126

@@ -293,3 +310,39 @@ set_policy_when_cmq_is_not_permitted_from_conf(Config) ->
293310
"Validation failed\n\nClassic mirrored queues are deprecated." ++ _}},
294311
rabbit_ct_broker_helpers:set_ha_policy(
295312
Config, 0, <<".*">>, <<"all">>)).
313+
314+
%% -------------------------------------------------------------------
315+
%% Transient non-exclusive queues.
316+
%% -------------------------------------------------------------------
317+
318+
when_transient_nonexcl_is_permitted_by_default(Config) ->
319+
[NodeA] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
320+
321+
Ch = rabbit_ct_client_helpers:open_channel(Config, NodeA),
322+
323+
QName = list_to_binary(atom_to_list(?FUNCTION_NAME)),
324+
?assertEqual(
325+
{'queue.declare_ok', QName, 0, 0},
326+
amqp_channel:call(
327+
Ch,
328+
#'queue.declare'{queue = QName,
329+
durable = false,
330+
exclusive = false})).
331+
332+
when_transient_nonexcl_is_not_permitted_from_conf(Config) ->
333+
[NodeA] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
334+
335+
Ch = rabbit_ct_client_helpers:open_channel(Config, NodeA),
336+
337+
QName = list_to_binary(atom_to_list(?FUNCTION_NAME)),
338+
?assertExit(
339+
{{shutdown,
340+
{connection_closing,
341+
{server_initiated_close, 541,
342+
<<"INTERNAL_ERROR - Feature `transient_nonexcl_queues` is "
343+
"deprecated.", _/binary>>}}}, _},
344+
amqp_channel:call(
345+
Ch,
346+
#'queue.declare'{queue = QName,
347+
durable = false,
348+
exclusive = false})).

0 commit comments

Comments
 (0)