|
9 | 9 | -behaviour(rabbit_policy_validator).
|
10 | 10 | -behaviour(rabbit_policy_merge_strategy).
|
11 | 11 |
|
| 12 | +-include_lib("kernel/include/logger.hrl"). |
| 13 | + |
12 | 14 | -include("amqqueue.hrl").
|
13 | 15 |
|
14 | 16 | -export([remove_from_queue/3, on_vhost_up/1, add_mirrors/3,
|
|
26 | 28 |
|
27 | 29 | -export([get_replicas/1, transfer_leadership/2, migrate_leadership_to_existing_replica/2]).
|
28 | 30 |
|
| 31 | +-export([does_policy_configure_cmq/1, |
| 32 | + is_cmq_permitted/0, |
| 33 | + warn_if_queue_is_mirrored/1]). |
| 34 | + |
29 | 35 | %% for testing only
|
30 | 36 | -export([module/1]).
|
31 | 37 |
|
32 | 38 | -include_lib("rabbit_common/include/rabbit.hrl").
|
33 | 39 |
|
34 | 40 | -define(HA_NODES_MODULE, rabbit_mirror_queue_mode_nodes).
|
35 | 41 |
|
| 42 | +-rabbit_deprecated_feature( |
| 43 | + {classic_mirrored_queues, |
| 44 | + #{deprecation_phase => optout, |
| 45 | + warning => |
| 46 | + "Classic mirrored queues are deprecated and will go away in RabbitMQ 4.x", |
| 47 | + doc_url => "https://blog.rabbitmq.com/posts/2021/08/4.0-deprecation-announcements/" |
| 48 | + }}). |
| 49 | + |
36 | 50 | -rabbit_boot_step(
|
37 | 51 | {?MODULE,
|
38 | 52 | [{description, "HA policy validation"},
|
@@ -724,30 +738,81 @@ maybe_drop_master_after_sync(Q) when ?is_amqqueue(Q) ->
|
724 | 738 |
|
725 | 739 | %%----------------------------------------------------------------------------
|
726 | 740 |
|
| 741 | +does_policy_configure_cmq(KeyList) -> |
| 742 | + lists:keymember(<<"ha-mode">>, 1, KeyList). |
| 743 | + |
| 744 | +is_cmq_permitted() -> |
| 745 | + FeatureName = classic_mirrored_queues, |
| 746 | + rabbit_deprecated_features:is_permitted(FeatureName). |
| 747 | + |
| 748 | +warn_if_queue_is_mirrored(Q) -> |
| 749 | + case rabbit_policy:effective_definition(Q) of |
| 750 | + undefined -> |
| 751 | + false; |
| 752 | + Policy -> |
| 753 | + case does_policy_configure_cmq(Policy) of |
| 754 | + true -> |
| 755 | + Permitted = is_cmq_permitted(), |
| 756 | + #resource{ |
| 757 | + virtual_host = VHost, |
| 758 | + name = Name |
| 759 | + } = amqqueue:get_name(Q), |
| 760 | + case Permitted of |
| 761 | + true -> |
| 762 | + ?LOG_WARNING( |
| 763 | + "Mirroring is configured for queue `~ts` in " |
| 764 | + "vhost `~ts`: it will stop working once " |
| 765 | + "classic mirrored queue support is dropped", |
| 766 | + [Name, VHost]); |
| 767 | + false -> |
| 768 | + ?LOG_ERROR( |
| 769 | + "Mirroring is configured for queue `~ts` in " |
| 770 | + "vhost `~ts`: mirroring is deprecated and " |
| 771 | + "disabled, except configured otherwise.", |
| 772 | + [Name, VHost]) |
| 773 | + end, |
| 774 | + %% Return a boolean indicating if the caller should abort |
| 775 | + %% what it's doing because the feature is not permitted. |
| 776 | + not Permitted; |
| 777 | + false -> |
| 778 | + false |
| 779 | + end |
| 780 | + end. |
| 781 | + |
727 | 782 | validate_policy(KeyList) ->
|
728 |
| - Mode = proplists:get_value(<<"ha-mode">>, KeyList, none), |
729 |
| - Params = proplists:get_value(<<"ha-params">>, KeyList, none), |
730 |
| - SyncMode = proplists:get_value(<<"ha-sync-mode">>, KeyList, none), |
731 |
| - SyncBatchSize = proplists:get_value( |
732 |
| - <<"ha-sync-batch-size">>, KeyList, none), |
733 |
| - PromoteOnShutdown = proplists:get_value( |
734 |
| - <<"ha-promote-on-shutdown">>, KeyList, none), |
735 |
| - PromoteOnFailure = proplists:get_value( |
736 |
| - <<"ha-promote-on-failure">>, KeyList, none), |
737 |
| - case {Mode, Params, SyncMode, SyncBatchSize, PromoteOnShutdown, PromoteOnFailure} of |
738 |
| - {none, none, none, none, none, none} -> |
739 |
| - ok; |
740 |
| - {none, _, _, _, _, _} -> |
741 |
| - {error, "ha-mode must be specified to specify ha-params, " |
742 |
| - "ha-sync-mode or ha-promote-on-shutdown", []}; |
743 |
| - _ -> |
744 |
| - validate_policies( |
745 |
| - [{Mode, fun validate_mode/1}, |
746 |
| - {Params, ha_params_validator(Mode)}, |
747 |
| - {SyncMode, fun validate_sync_mode/1}, |
748 |
| - {SyncBatchSize, fun validate_sync_batch_size/1}, |
749 |
| - {PromoteOnShutdown, fun validate_pos/1}, |
750 |
| - {PromoteOnFailure, fun validate_pof/1}]) |
| 783 | + case is_cmq_permitted() of |
| 784 | + false -> |
| 785 | + %% If the policy configures classic mirrored queues and this |
| 786 | + %% feature is disabled, we consider this policy not valid and deny |
| 787 | + %% it. |
| 788 | + FeatureName = classic_mirrored_queues, |
| 789 | + Warning = rabbit_deprecated_features:get_warning(FeatureName), |
| 790 | + {error, "~ts", [Warning]}; |
| 791 | + true -> |
| 792 | + Mode = proplists:get_value(<<"ha-mode">>, KeyList, none), |
| 793 | + Params = proplists:get_value(<<"ha-params">>, KeyList, none), |
| 794 | + SyncMode = proplists:get_value(<<"ha-sync-mode">>, KeyList, none), |
| 795 | + SyncBatchSize = proplists:get_value( |
| 796 | + <<"ha-sync-batch-size">>, KeyList, none), |
| 797 | + PromoteOnShutdown = proplists:get_value( |
| 798 | + <<"ha-promote-on-shutdown">>, KeyList, none), |
| 799 | + PromoteOnFailure = proplists:get_value( |
| 800 | + <<"ha-promote-on-failure">>, KeyList, none), |
| 801 | + case {Mode, Params, SyncMode, SyncBatchSize, PromoteOnShutdown, PromoteOnFailure} of |
| 802 | + {none, none, none, none, none, none} -> |
| 803 | + ok; |
| 804 | + {none, _, _, _, _, _} -> |
| 805 | + {error, "ha-mode must be specified to specify ha-params, " |
| 806 | + "ha-sync-mode or ha-promote-on-shutdown", []}; |
| 807 | + _ -> |
| 808 | + validate_policies( |
| 809 | + [{Mode, fun validate_mode/1}, |
| 810 | + {Params, ha_params_validator(Mode)}, |
| 811 | + {SyncMode, fun validate_sync_mode/1}, |
| 812 | + {SyncBatchSize, fun validate_sync_batch_size/1}, |
| 813 | + {PromoteOnShutdown, fun validate_pos/1}, |
| 814 | + {PromoteOnFailure, fun validate_pof/1}]) |
| 815 | + end |
751 | 816 | end.
|
752 | 817 |
|
753 | 818 | ha_params_validator(Mode) ->
|
|
0 commit comments