Skip to content

Only preserve stats for local queues #1278

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 13 commits into from
Jul 10, 2017
Merged
7 changes: 6 additions & 1 deletion src/rabbit_amqqueue.erl
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
check_exclusive_access/2, with_exclusive_access_or_die/3,
stat/1, deliver/2, requeue/3, ack/3, reject/4]).
-export([list/0, list/1, info_keys/0, info/1, info/2, info_all/1, info_all/2,
info_all/5, info_local/1, list_names/0]).
info_all/5, info_local/1, list_names/0, list_local_names/0]).
-export([list_down/1]).
-export([force_event_refresh/1, notify_policy_changed/1]).
-export([consumers/1, consumers_all/1, consumers_all/3, consumer_info_keys/0]).
Expand Down Expand Up @@ -571,6 +571,11 @@ list() -> mnesia:dirty_match_object(rabbit_queue, #amqqueue{_ = '_'}).

list_names() -> mnesia:dirty_all_keys(rabbit_queue).

list_local_names() ->
[ Q#amqqueue.name || #amqqueue{state = State, pid = QPid} = Q <- list(),
State =/= crashed,
node() =:= node(QPid) ].

list(VHostPath) -> list(VHostPath, rabbit_queue).

%% Not dirty_match_object since that would not be transactional when used in a
Expand Down
2 changes: 2 additions & 0 deletions src/rabbit_amqqueue_process.erl
Original file line number Diff line number Diff line change
Expand Up @@ -258,11 +258,13 @@ init_with_backing_queue_state(Q = #amqqueue{exclusive_owner = Owner}, BQ, BQS,
State3.

terminate(shutdown = R, State = #q{backing_queue = BQ}) ->
rabbit_core_metrics:queue_deleted(qname(State)),
terminate_shutdown(fun (BQS) -> BQ:terminate(R, BQS) end, State);
terminate({shutdown, missing_owner} = Reason, State) ->
%% if the owner was missing then there will be no queue, so don't emit stats
terminate_shutdown(terminate_delete(false, Reason, State), State);
terminate({shutdown, _} = R, State = #q{backing_queue = BQ}) ->
rabbit_core_metrics:queue_deleted(qname(State)),
terminate_shutdown(fun (BQS) -> BQ:terminate(R, BQS) end, State);
terminate(normal, State) -> %% delete case
terminate_shutdown(terminate_delete(true, normal, State), State);
Expand Down
2 changes: 1 addition & 1 deletion src/rabbit_core_metrics_gc.erl
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ gc_channels() ->
ok.

gc_queues() ->
Queues = rabbit_amqqueue:list_names(),
Queues = rabbit_amqqueue:list_local_names(),
GbSet = gb_sets:from_list(Queues),
gc_entity(queue_metrics, GbSet),
gc_entity(queue_coarse_metrics, GbSet),
Expand Down
2 changes: 1 addition & 1 deletion src/rabbit_mirror_queue_sync.erl
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

-module(rabbit_mirror_queue_sync).

-include("rabbit.hrl").
-include_lib("rabbit_common/include/rabbit.hrl").

-export([master_prepare/4, master_go/8, slave/7, conserve_resources/3]).

Expand Down
105 changes: 80 additions & 25 deletions test/rabbit_core_metrics_gc_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@

all() ->
[
{group, non_parallel_tests}
{group, non_parallel_tests},
{group, cluster_tests}
].

groups() ->
Expand All @@ -37,41 +38,36 @@ groups() ->
gen_server2_metrics,
consumer_metrics
]
}
},
{cluster_tests, [], [cluster_queue_metrics]}
].

%% -------------------------------------------------------------------
%% Testsuite setup/teardown.
%% -------------------------------------------------------------------

merge_app_env(Config) ->
rabbit_ct_helpers:merge_app_env(Config,
{rabbit, [
{core_metrics_gc_interval, 6000000},
{collect_statistics_interval, 100},
{collect_statistics, fine}
]}).

init_per_suite(Config) ->
AppEnv = {rabbit, [{core_metrics_gc_interval, 6000000},
{collect_statistics_interval, 100},
{collect_statistics, fine}]},
rabbit_ct_helpers:merge_app_env(Config, AppEnv).

init_per_group(cluster_tests, Config) ->
rabbit_ct_helpers:log_environment(),
Conf = [{rmq_nodename_suffix, cluster_tests}, {rmq_nodes_count, 2}],
Config1 = rabbit_ct_helpers:set_config(Config, Conf),
rabbit_ct_helpers:run_setup_steps(Config1, setup_steps());
init_per_group(non_parallel_tests, Config) ->
rabbit_ct_helpers:log_environment(),
Config1 = rabbit_ct_helpers:set_config(Config, [
{rmq_nodename_suffix, ?MODULE}
]),
rabbit_ct_helpers:run_setup_steps(
Config1,
[ fun merge_app_env/1 ] ++ rabbit_ct_broker_helpers:setup_steps()).

end_per_suite(Config) ->
Conf = [{rmq_nodename_suffix, non_parallel_tests}],
Config1 = rabbit_ct_helpers:set_config(Config, Conf),
rabbit_ct_helpers:run_setup_steps(Config1, setup_steps()).

end_per_group(_, Config) ->
rabbit_ct_helpers:run_teardown_steps(
Config,
rabbit_ct_broker_helpers:teardown_steps()).

init_per_group(_, Config) ->
Config.

end_per_group(_, Config) ->
Config.

init_per_testcase(Testcase, Config) ->
rabbit_ct_helpers:testcase_started(Config, Testcase),
rabbit_ct_helpers:run_steps(Config,
Expand All @@ -83,8 +79,11 @@ end_per_testcase(Testcase, Config) ->
Config,
rabbit_ct_client_helpers:teardown_steps()).

setup_steps() ->
[ fun merge_app_env/1 ] ++ rabbit_ct_broker_helpers:setup_steps().

%% -------------------------------------------------------------------
%% Testcases.
%% Single-node Testcases.
%% -------------------------------------------------------------------

queue_metrics(Config) ->
Expand Down Expand Up @@ -329,3 +328,59 @@ x(Name) ->
#resource{ virtual_host = <<"/">>,
kind = exchange,
name = Name }.

%% -------------------------------------------------------------------
%% Cluster Testcases.
%% -------------------------------------------------------------------

cluster_queue_metrics(Config) ->
VHost = <<"/">>,
QueueName = <<"cluster_queue_metrics">>,
PolicyName = <<"ha-policy-1">>,
PolicyPattern = <<".*">>,
PolicyAppliesTo = <<"queues">>,

Node0 = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
Node1 = rabbit_ct_broker_helpers:get_node_config(Config, 1, nodename),

Ch = rabbit_ct_client_helpers:open_channel(Config, Node0),

Node0Name = rabbit_data_coercion:to_binary(Node0),
Definition0 = [{<<"ha-mode">>, <<"nodes">>}, {<<"ha-params">>, [Node0Name]}],
ok = rabbit_ct_broker_helpers:set_policy(Config, 0,
PolicyName, PolicyPattern,
PolicyAppliesTo, Definition0),

amqp_channel:call(Ch, #'queue.declare'{queue = QueueName}),
amqp_channel:call(Ch, #'basic.publish'{routing_key = QueueName},
#amqp_msg{payload = <<"hello">>}),

% Update policy to point to other node
Node1Name = rabbit_data_coercion:to_binary(Node1),
Definition1 = [{<<"ha-mode">>, <<"nodes">>}, {<<"ha-params">>, [Node1Name]}],
ok = rabbit_ct_broker_helpers:set_policy(Config, 0,
PolicyName, PolicyPattern,
PolicyAppliesTo, Definition1),

% Synchronize
Name = rabbit_misc:r(VHost, queue, QueueName),
[#amqqueue{pid = QPid}] = rabbit_ct_broker_helpers:rpc(Config, Node0,
ets, lookup,
[rabbit_queue, Name]),
ok = rabbit_ct_broker_helpers:rpc(Config, Node0, rabbit_amqqueue,
sync_mirrors, [QPid]),

timer:sleep(1500),

% Check ETS table for data
% rabbit_core_metrics:queue_stats
[] = rabbit_ct_broker_helpers:rpc(Config, Node0, ets, tab2list,
[queue_coarse_metrics]),

[{Name, 1, 0, 1, _}] = rabbit_ct_broker_helpers:rpc(Config, Node1, ets,
tab2list,
[queue_coarse_metrics]),

amqp_channel:call(Ch, #'queue.delete'{queue=QueueName}),
rabbit_ct_client_helpers:close_channel(Ch),
Config.