Skip to content

Commit cd386d5

Browse files
committed
Mark Classic mirrored queues as deprecated
[TBW]
1 parent a4ec5a3 commit cd386d5

File tree

3 files changed

+109
-26
lines changed

3 files changed

+109
-26
lines changed

deps/rabbit/src/rabbit_amqqueue.erl

+12-2
Original file line numberDiff line numberDiff line change
@@ -297,11 +297,21 @@ policy_changed(Q1, Q2) ->
297297
notify_policy_changed(Q2).
298298

299299
is_policy_applicable(Q, Policy) when ?is_amqqueue(Q) ->
300-
rabbit_queue_type:is_policy_applicable(Q, Policy);
300+
case rabbit_queue_type:is_policy_applicable(Q, Policy) of
301+
true ->
302+
%% If the applicable policy configures classic mirrored queues and
303+
%% this feature is disabled, we consider this policy not
304+
%% applicable.
305+
not rabbit_mirror_queue_misc:does_policy_configure_cmq(Policy)
306+
orelse
307+
rabbit_mirror_queue_misc:is_cmq_permitted();
308+
false ->
309+
false
310+
end;
301311
is_policy_applicable(QName, Policy) ->
302312
case lookup(QName) of
303313
{ok, Q} ->
304-
rabbit_queue_type:is_policy_applicable(Q, Policy);
314+
is_policy_applicable(Q, Policy);
305315
_ ->
306316
%% Defaults to previous behaviour. Apply always
307317
true

deps/rabbit/src/rabbit_amqqueue_process.erl

+9-1
Original file line numberDiff line numberDiff line change
@@ -191,7 +191,15 @@ init_it(Recover, From, State = #q{q = Q0}) ->
191191
State#q{backing_queue = BQ, backing_queue_state = BQS}}
192192
end.
193193

194-
init_it2(Recover, From, State = #q{q = Q,
194+
init_it2(Recover, From, State = #q{q = Q}) ->
195+
%% Prevent the queue from starting if it is a classic mirrored queue and
196+
%% the feature is disabled.
197+
case rabbit_mirror_queue_misc:warn_if_queue_is_mirrored(Q) of
198+
false -> init_it3(Recover, From, State);
199+
true -> {stop, normal, classic_mirrored_queues_disabled, State}
200+
end.
201+
202+
init_it3(Recover, From, State = #q{q = Q,
195203
backing_queue = undefined,
196204
backing_queue_state = undefined}) ->
197205
{Barrier, TermsOrNew} = recovery_status(Recover),

deps/rabbit/src/rabbit_mirror_queue_misc.erl

+88-23
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@
99
-behaviour(rabbit_policy_validator).
1010
-behaviour(rabbit_policy_merge_strategy).
1111

12+
-include_lib("kernel/include/logger.hrl").
13+
1214
-include("amqqueue.hrl").
1315

1416
-export([remove_from_queue/3, on_vhost_up/1, add_mirrors/3,
@@ -26,13 +28,25 @@
2628

2729
-export([get_replicas/1, transfer_leadership/2, migrate_leadership_to_existing_replica/2]).
2830

31+
-export([does_policy_configure_cmq/1,
32+
is_cmq_permitted/0,
33+
warn_if_queue_is_mirrored/1]).
34+
2935
%% for testing only
3036
-export([module/1]).
3137

3238
-include_lib("rabbit_common/include/rabbit.hrl").
3339

3440
-define(HA_NODES_MODULE, rabbit_mirror_queue_mode_nodes).
3541

42+
-rabbit_deprecated_feature(
43+
{classic_mirrored_queues,
44+
#{deprecation_phase => optout,
45+
warning =>
46+
"Classic mirrored queues are deprecated and will go away in RabbitMQ 4.x",
47+
doc_url => "https://blog.rabbitmq.com/posts/2021/08/4.0-deprecation-announcements/"
48+
}}).
49+
3650
-rabbit_boot_step(
3751
{?MODULE,
3852
[{description, "HA policy validation"},
@@ -724,30 +738,81 @@ maybe_drop_master_after_sync(Q) when ?is_amqqueue(Q) ->
724738

725739
%%----------------------------------------------------------------------------
726740

741+
does_policy_configure_cmq(KeyList) ->
742+
lists:keymember(<<"ha-mode">>, 1, KeyList).
743+
744+
is_cmq_permitted() ->
745+
FeatureName = classic_mirrored_queues,
746+
rabbit_deprecated_features:is_permitted(FeatureName).
747+
748+
warn_if_queue_is_mirrored(Q) ->
749+
case rabbit_policy:effective_definition(Q) of
750+
undefined ->
751+
false;
752+
Policy ->
753+
case does_policy_configure_cmq(Policy) of
754+
true ->
755+
Permitted = is_cmq_permitted(),
756+
#resource{
757+
virtual_host = VHost,
758+
name = Name
759+
} = amqqueue:get_name(Q),
760+
case Permitted of
761+
true ->
762+
?LOG_WARNING(
763+
"Mirroring is configured for queue `~ts` in "
764+
"vhost `~ts`: it will stop working once "
765+
"classic mirrored queue support is dropped",
766+
[Name, VHost]);
767+
false ->
768+
?LOG_ERROR(
769+
"Mirroring is configured for queue `~ts` in "
770+
"vhost `~ts`: mirroring is deprecated and "
771+
"disabled, except configured otherwise.",
772+
[Name, VHost])
773+
end,
774+
%% Return a boolean indicating if the caller should abort
775+
%% what it's doing because the feature is not permitted.
776+
not Permitted;
777+
false ->
778+
false
779+
end
780+
end.
781+
727782
validate_policy(KeyList) ->
728-
Mode = proplists:get_value(<<"ha-mode">>, KeyList, none),
729-
Params = proplists:get_value(<<"ha-params">>, KeyList, none),
730-
SyncMode = proplists:get_value(<<"ha-sync-mode">>, KeyList, none),
731-
SyncBatchSize = proplists:get_value(
732-
<<"ha-sync-batch-size">>, KeyList, none),
733-
PromoteOnShutdown = proplists:get_value(
734-
<<"ha-promote-on-shutdown">>, KeyList, none),
735-
PromoteOnFailure = proplists:get_value(
736-
<<"ha-promote-on-failure">>, KeyList, none),
737-
case {Mode, Params, SyncMode, SyncBatchSize, PromoteOnShutdown, PromoteOnFailure} of
738-
{none, none, none, none, none, none} ->
739-
ok;
740-
{none, _, _, _, _, _} ->
741-
{error, "ha-mode must be specified to specify ha-params, "
742-
"ha-sync-mode or ha-promote-on-shutdown", []};
743-
_ ->
744-
validate_policies(
745-
[{Mode, fun validate_mode/1},
746-
{Params, ha_params_validator(Mode)},
747-
{SyncMode, fun validate_sync_mode/1},
748-
{SyncBatchSize, fun validate_sync_batch_size/1},
749-
{PromoteOnShutdown, fun validate_pos/1},
750-
{PromoteOnFailure, fun validate_pof/1}])
783+
case is_cmq_permitted() of
784+
false ->
785+
%% If the policy configures classic mirrored queues and this
786+
%% feature is disabled, we consider this policy not valid and deny
787+
%% it.
788+
FeatureName = classic_mirrored_queues,
789+
Warning = rabbit_deprecated_features:get_warning(FeatureName),
790+
{error, "~ts", [Warning]};
791+
true ->
792+
Mode = proplists:get_value(<<"ha-mode">>, KeyList, none),
793+
Params = proplists:get_value(<<"ha-params">>, KeyList, none),
794+
SyncMode = proplists:get_value(<<"ha-sync-mode">>, KeyList, none),
795+
SyncBatchSize = proplists:get_value(
796+
<<"ha-sync-batch-size">>, KeyList, none),
797+
PromoteOnShutdown = proplists:get_value(
798+
<<"ha-promote-on-shutdown">>, KeyList, none),
799+
PromoteOnFailure = proplists:get_value(
800+
<<"ha-promote-on-failure">>, KeyList, none),
801+
case {Mode, Params, SyncMode, SyncBatchSize, PromoteOnShutdown, PromoteOnFailure} of
802+
{none, none, none, none, none, none} ->
803+
ok;
804+
{none, _, _, _, _, _} ->
805+
{error, "ha-mode must be specified to specify ha-params, "
806+
"ha-sync-mode or ha-promote-on-shutdown", []};
807+
_ ->
808+
validate_policies(
809+
[{Mode, fun validate_mode/1},
810+
{Params, ha_params_validator(Mode)},
811+
{SyncMode, fun validate_sync_mode/1},
812+
{SyncBatchSize, fun validate_sync_batch_size/1},
813+
{PromoteOnShutdown, fun validate_pos/1},
814+
{PromoteOnFailure, fun validate_pof/1}])
815+
end
751816
end.
752817

753818
ha_params_validator(Mode) ->

0 commit comments

Comments
 (0)