Skip to content

Commit a7eda95

Browse files
committed
Mark global QoS setting as deprecated
[Why] Global QoS, where a single shared prefetch is used for an entire channel, is not recommended practice. Per-consumer QoS (non-global) should be set instead. [How] The global QoS setting is marked as deprecated in the code using the Deprecated features subsystem (based on feature flags). See #7390 for a description of that subsystem. To test RabbitMQ behavior as if the feature was removed, the following configuration setting can be used: deprecated_features.permit.global_qos = false Global QoS can be turned off anytime, there are no conditions to do that. Once global QoS is turned off, the prefetch setting will always be considered as non-global (i.e. per-consumer). A warning message will be logged if the default prefetch setting enables global QoS or anytime a client requests a global QoS on the channel. Note that given the marketing calendar, the deprecated feature will go directly from "permitted by default" to "removed" in RabbitMQ 4.0. It won't go through the gradual deprecation process.
1 parent 7d9414c commit a7eda95

File tree

4 files changed

+202
-23
lines changed

4 files changed

+202
-23
lines changed

deps/rabbit/BUILD.bazel

+5
Original file line numberDiff line numberDiff line change
@@ -803,6 +803,11 @@ rabbitmq_integration_suite(
803803
],
804804
)
805805

806+
rabbitmq_integration_suite(
807+
name = "rabbitmq_4_0_deprecations_SUITE",
808+
size = "large",
809+
)
810+
806811
rabbitmq_integration_suite(
807812
name = "rabbitmq_queues_cli_integration_SUITE",
808813
size = "medium",

deps/rabbit/app.bzl

+9
Original file line numberDiff line numberDiff line change
@@ -1963,3 +1963,12 @@ def test_suite_beam_files(name = "test_suite_beam_files"):
19631963
erlc_opts = "//:test_erlc_opts",
19641964
deps = ["//deps/amqp_client:erlang_app"],
19651965
)
1966+
erlang_bytecode(
1967+
name = "rabbitmq_4_0_deprecations_SUITE_beam_files",
1968+
testonly = True,
1969+
srcs = ["test/rabbitmq_4_0_deprecations_SUITE.erl"],
1970+
outs = ["test/rabbitmq_4_0_deprecations_SUITE.beam"],
1971+
app_name = "rabbit",
1972+
erlc_opts = "//:test_erlc_opts",
1973+
deps = ["//deps/amqp_client:erlang_app"],
1974+
)

deps/rabbit/src/rabbit_channel.erl

+47-23
Original file line numberDiff line numberDiff line change
@@ -236,6 +236,12 @@
236236

237237
%%----------------------------------------------------------------------------
238238

239+
-rabbit_deprecated_feature(
240+
{global_qos,
241+
#{deprecation_phase => permitted_by_default,
242+
doc_url => "https://blog.rabbitmq.com/posts/2021/08/4.0-deprecation-announcements/#removal-of-global-qos"
243+
}}).
244+
239245
-spec start_link
240246
(channel_number(), pid(), pid(), pid(), string(), rabbit_types:protocol(),
241247
rabbit_types:user(), rabbit_types:vhost(), rabbit_framing:amqp_table(),
@@ -506,8 +512,9 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost,
506512
true -> flow;
507513
false -> noflow
508514
end,
509-
{ok, {Global, Prefetch}} = application:get_env(rabbit, default_consumer_prefetch),
515+
{ok, {Global0, Prefetch}} = application:get_env(rabbit, default_consumer_prefetch),
510516
Limiter0 = rabbit_limiter:new(LimiterPid),
517+
Global = Global0 andalso is_global_qos_permitted(),
511518
Limiter = case {Global, Prefetch} of
512519
{true, 0} ->
513520
rabbit_limiter:unlimit_prefetch(Limiter0);
@@ -1559,30 +1566,44 @@ handle_method(#'basic.qos'{global = false,
15591566
limiter = Limiter1}};
15601567

15611568
handle_method(#'basic.qos'{global = true,
1562-
prefetch_count = 0},
1563-
_, State = #ch{limiter = Limiter}) ->
1564-
Limiter1 = rabbit_limiter:unlimit_prefetch(Limiter),
1565-
case rabbit_limiter:is_active(Limiter) of
1566-
true -> rabbit_amqqueue:deactivate_limit_all(
1567-
classic_consumer_queue_pids(State#ch.consumer_mapping), self());
1568-
false -> ok
1569-
end,
1570-
{reply, #'basic.qos_ok'{}, State#ch{limiter = Limiter1}};
1569+
prefetch_count = 0} = Method,
1570+
Content,
1571+
State = #ch{limiter = Limiter}) ->
1572+
case is_global_qos_permitted() of
1573+
true ->
1574+
Limiter1 = rabbit_limiter:unlimit_prefetch(Limiter),
1575+
case rabbit_limiter:is_active(Limiter) of
1576+
true -> rabbit_amqqueue:deactivate_limit_all(
1577+
classic_consumer_queue_pids(State#ch.consumer_mapping), self());
1578+
false -> ok
1579+
end,
1580+
{reply, #'basic.qos_ok'{}, State#ch{limiter = Limiter1}};
1581+
false ->
1582+
Method1 = Method#'basic.qos'{global = false},
1583+
handle_method(Method1, Content, State)
1584+
end;
15711585

15721586
handle_method(#'basic.qos'{global = true,
1573-
prefetch_count = PrefetchCount},
1574-
_, State = #ch{limiter = Limiter, unacked_message_q = UAMQ}) ->
1575-
%% TODO ?QUEUE:len(UAMQ) is not strictly right since that counts
1576-
%% unacked messages from basic.get too. Pretty obscure though.
1577-
Limiter1 = rabbit_limiter:limit_prefetch(Limiter,
1578-
PrefetchCount, ?QUEUE:len(UAMQ)),
1579-
case ((not rabbit_limiter:is_active(Limiter)) andalso
1580-
rabbit_limiter:is_active(Limiter1)) of
1581-
true -> rabbit_amqqueue:activate_limit_all(
1582-
classic_consumer_queue_pids(State#ch.consumer_mapping), self());
1583-
false -> ok
1584-
end,
1585-
{reply, #'basic.qos_ok'{}, State#ch{limiter = Limiter1}};
1587+
prefetch_count = PrefetchCount} = Method,
1588+
Content,
1589+
State = #ch{limiter = Limiter, unacked_message_q = UAMQ}) ->
1590+
case is_global_qos_permitted() of
1591+
true ->
1592+
%% TODO ?QUEUE:len(UAMQ) is not strictly right since that counts
1593+
%% unacked messages from basic.get too. Pretty obscure though.
1594+
Limiter1 = rabbit_limiter:limit_prefetch(Limiter,
1595+
PrefetchCount, ?QUEUE:len(UAMQ)),
1596+
case ((not rabbit_limiter:is_active(Limiter)) andalso
1597+
rabbit_limiter:is_active(Limiter1)) of
1598+
true -> rabbit_amqqueue:activate_limit_all(
1599+
classic_consumer_queue_pids(State#ch.consumer_mapping), self());
1600+
false -> ok
1601+
end,
1602+
{reply, #'basic.qos_ok'{}, State#ch{limiter = Limiter1}};
1603+
false ->
1604+
Method1 = Method#'basic.qos'{global = false},
1605+
handle_method(Method1, Content, State)
1606+
end;
15861607

15871608
handle_method(#'basic.recover_async'{requeue = true},
15881609
_, State = #ch{unacked_message_q = UAMQ,
@@ -2943,3 +2964,6 @@ maybe_decrease_global_publishers(#ch{publishing_mode = false}) ->
29432964
ok;
29442965
maybe_decrease_global_publishers(#ch{publishing_mode = true}) ->
29452966
rabbit_global_counters:publisher_deleted(amqp091).
2967+
2968+
is_global_qos_permitted() ->
2969+
rabbit_deprecated_features:is_permitted(global_qos).
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,141 @@
1+
%% This Source Code Form is subject to the terms of the Mozilla Public
2+
%% License, v. 2.0. If a copy of the MPL was not distributed with this
3+
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
4+
%%
5+
%% Copyright (c) 2023 VMware, Inc. or its affiliates. All rights reserved.
6+
%%
7+
8+
-module(rabbitmq_4_0_deprecations_SUITE).
9+
10+
-include_lib("eunit/include/eunit.hrl").
11+
-include_lib("common_test/include/ct.hrl").
12+
13+
-include_lib("amqp_client/include/amqp_client.hrl").
14+
15+
-export([suite/0,
16+
all/0,
17+
groups/0,
18+
init_per_suite/1,
19+
end_per_suite/1,
20+
init_per_group/2,
21+
end_per_group/2,
22+
init_per_testcase/2,
23+
end_per_testcase/2,
24+
25+
when_global_qos_is_permitted_by_default/1,
26+
when_global_qos_is_not_permitted_from_conf/1
27+
]).
28+
29+
suite() ->
30+
[{timetrap, {minutes, 5}}].
31+
32+
all() ->
33+
[
34+
{group, global_qos}
35+
].
36+
37+
groups() ->
38+
[
39+
{global_qos, [],
40+
[when_global_qos_is_permitted_by_default,
41+
when_global_qos_is_not_permitted_from_conf]}
42+
].
43+
44+
%% -------------------------------------------------------------------
45+
%% Testsuite setup/teardown.
46+
%% -------------------------------------------------------------------
47+
48+
init_per_suite(Config) ->
49+
rabbit_ct_helpers:log_environment(),
50+
logger:set_primary_config(level, debug),
51+
rabbit_ct_helpers:run_setup_steps(
52+
Config,
53+
[fun rabbit_ct_helpers:redirect_logger_to_ct_logs/1]).
54+
55+
end_per_suite(Config) ->
56+
Config.
57+
58+
init_per_group(global_qos, Config) ->
59+
rabbit_ct_helpers:set_config(Config, {rmq_nodes_count, 1});
60+
init_per_group(_Group, Config) ->
61+
Config.
62+
63+
end_per_group(_Group, Config) ->
64+
Config.
65+
66+
init_per_testcase(
67+
when_global_qos_is_not_permitted_from_conf = Testcase, Config) ->
68+
Config1 = rabbit_ct_helpers:merge_app_env(
69+
Config,
70+
{rabbit,
71+
[{permit_deprecated_features, #{global_qos => false}}]}),
72+
init_per_testcase1(Testcase, Config1);
73+
init_per_testcase(Testcase, Config) ->
74+
init_per_testcase1(Testcase, Config).
75+
76+
init_per_testcase1(Testcase, Config) ->
77+
rabbit_ct_helpers:testcase_started(Config, Testcase),
78+
ClusterSize = ?config(rmq_nodes_count, Config),
79+
TestNumber = rabbit_ct_helpers:testcase_number(Config, ?MODULE, Testcase),
80+
Config1 = rabbit_ct_helpers:set_config(Config, [
81+
{rmq_nodename_suffix, Testcase},
82+
{tcp_ports_base, {skip_n_nodes, TestNumber * ClusterSize}},
83+
{keep_pid_file_on_exit, true}
84+
]),
85+
rabbit_ct_helpers:run_steps(Config1,
86+
rabbit_ct_broker_helpers:setup_steps() ++
87+
rabbit_ct_client_helpers:setup_steps()).
88+
89+
end_per_testcase(Testcase, Config) ->
90+
Config1 = rabbit_ct_helpers:run_steps(Config,
91+
rabbit_ct_client_helpers:teardown_steps() ++
92+
rabbit_ct_broker_helpers:teardown_steps()),
93+
rabbit_ct_helpers:testcase_finished(Config1, Testcase).
94+
95+
%% -------------------------------------------------------------------
96+
%% Global QoS.
97+
%% -------------------------------------------------------------------
98+
99+
when_global_qos_is_permitted_by_default(Config) ->
100+
[NodeA] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
101+
102+
ExistingServerChs = list_server_channels(Config, NodeA),
103+
ClientCh = rabbit_ct_client_helpers:open_channel(Config, NodeA),
104+
[ServerCh] = list_server_channels(Config, NodeA) -- ExistingServerChs,
105+
106+
?assertNot(is_prefetch_limited(ServerCh)),
107+
108+
%% It's possible to request global QoS and it is accepted by the server.
109+
?assertMatch(
110+
#'basic.qos_ok'{},
111+
amqp_channel:call(
112+
ClientCh,
113+
#'basic.qos'{global = true, prefetch_count = 10})),
114+
?assert(is_prefetch_limited(ServerCh)).
115+
116+
when_global_qos_is_not_permitted_from_conf(Config) ->
117+
[NodeA] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
118+
119+
ExistingServerChs = list_server_channels(Config, NodeA),
120+
ClientCh = rabbit_ct_client_helpers:open_channel(Config, NodeA),
121+
[ServerCh] = list_server_channels(Config, NodeA) -- ExistingServerChs,
122+
123+
?assertNot(is_prefetch_limited(ServerCh)),
124+
125+
%% It's possible to request global QoS but it is ignored by the server.
126+
?assertMatch(
127+
#'basic.qos_ok'{},
128+
amqp_channel:call(
129+
ClientCh,
130+
#'basic.qos'{global = true, prefetch_count = 10})),
131+
?assertNot(is_prefetch_limited(ServerCh)).
132+
133+
list_server_channels(Config, Node) ->
134+
rabbit_ct_broker_helpers:rpc(Config, Node, rabbit_channel, list, []).
135+
136+
is_prefetch_limited(ServerCh) ->
137+
GenServer2State = sys:get_state(ServerCh),
138+
ChState = element(4, GenServer2State),
139+
ct:pal("Server channel (~p) state: ~p", [ServerCh, ChState]),
140+
LimiterState = element(3, ChState),
141+
element(3, LimiterState).

0 commit comments

Comments
 (0)