Skip to content

Commit e5fea8d

Browse files
Merge pull request #1278 from rabbitmq/rabbitmq-management-427
Only preserve stats for local queues
2 parents 39c1bd1 + 6a77b6d commit e5fea8d

5 files changed

+90
-28
lines changed

src/rabbit_amqqueue.erl

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@
2525
check_exclusive_access/2, with_exclusive_access_or_die/3,
2626
stat/1, deliver/2, requeue/3, ack/3, reject/4]).
2727
-export([list/0, list/1, info_keys/0, info/1, info/2, info_all/1, info_all/2,
28-
info_all/5, info_local/1, list_names/0]).
28+
info_all/5, info_local/1, list_names/0, list_local_names/0]).
2929
-export([list_down/1]).
3030
-export([force_event_refresh/1, notify_policy_changed/1]).
3131
-export([consumers/1, consumers_all/1, consumers_all/3, consumer_info_keys/0]).
@@ -571,6 +571,11 @@ list() -> mnesia:dirty_match_object(rabbit_queue, #amqqueue{_ = '_'}).
571571

572572
list_names() -> mnesia:dirty_all_keys(rabbit_queue).
573573

574+
list_local_names() ->
575+
[ Q#amqqueue.name || #amqqueue{state = State, pid = QPid} = Q <- list(),
576+
State =/= crashed,
577+
node() =:= node(QPid) ].
578+
574579
list(VHostPath) -> list(VHostPath, rabbit_queue).
575580

576581
%% Not dirty_match_object since that would not be transactional when used in a

src/rabbit_amqqueue_process.erl

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -258,11 +258,13 @@ init_with_backing_queue_state(Q = #amqqueue{exclusive_owner = Owner}, BQ, BQS,
258258
State3.
259259

260260
terminate(shutdown = R, State = #q{backing_queue = BQ}) ->
261+
rabbit_core_metrics:queue_deleted(qname(State)),
261262
terminate_shutdown(fun (BQS) -> BQ:terminate(R, BQS) end, State);
262263
terminate({shutdown, missing_owner} = Reason, State) ->
263264
%% if the owner was missing then there will be no queue, so don't emit stats
264265
terminate_shutdown(terminate_delete(false, Reason, State), State);
265266
terminate({shutdown, _} = R, State = #q{backing_queue = BQ}) ->
267+
rabbit_core_metrics:queue_deleted(qname(State)),
266268
terminate_shutdown(fun (BQS) -> BQ:terminate(R, BQS) end, State);
267269
terminate(normal, State) -> %% delete case
268270
terminate_shutdown(terminate_delete(true, normal, State), State);

src/rabbit_core_metrics_gc.erl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ gc_channels() ->
7070
ok.
7171

7272
gc_queues() ->
73-
Queues = rabbit_amqqueue:list_names(),
73+
Queues = rabbit_amqqueue:list_local_names(),
7474
GbSet = gb_sets:from_list(Queues),
7575
gc_entity(queue_metrics, GbSet),
7676
gc_entity(queue_coarse_metrics, GbSet),

src/rabbit_mirror_queue_sync.erl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616

1717
-module(rabbit_mirror_queue_sync).
1818

19-
-include("rabbit.hrl").
19+
-include_lib("rabbit_common/include/rabbit.hrl").
2020

2121
-export([master_prepare/4, master_go/8, slave/7, conserve_resources/3]).
2222

test/rabbit_core_metrics_gc_SUITE.erl

Lines changed: 80 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,8 @@
2424

2525
all() ->
2626
[
27-
{group, non_parallel_tests}
27+
{group, non_parallel_tests},
28+
{group, cluster_tests}
2829
].
2930

3031
groups() ->
@@ -37,41 +38,36 @@ groups() ->
3738
gen_server2_metrics,
3839
consumer_metrics
3940
]
40-
}
41+
},
42+
{cluster_tests, [], [cluster_queue_metrics]}
4143
].
4244

4345
%% -------------------------------------------------------------------
4446
%% Testsuite setup/teardown.
4547
%% -------------------------------------------------------------------
4648

4749
merge_app_env(Config) ->
48-
rabbit_ct_helpers:merge_app_env(Config,
49-
{rabbit, [
50-
{core_metrics_gc_interval, 6000000},
51-
{collect_statistics_interval, 100},
52-
{collect_statistics, fine}
53-
]}).
54-
55-
init_per_suite(Config) ->
50+
AppEnv = {rabbit, [{core_metrics_gc_interval, 6000000},
51+
{collect_statistics_interval, 100},
52+
{collect_statistics, fine}]},
53+
rabbit_ct_helpers:merge_app_env(Config, AppEnv).
54+
55+
init_per_group(cluster_tests, Config) ->
56+
rabbit_ct_helpers:log_environment(),
57+
Conf = [{rmq_nodename_suffix, cluster_tests}, {rmq_nodes_count, 2}],
58+
Config1 = rabbit_ct_helpers:set_config(Config, Conf),
59+
rabbit_ct_helpers:run_setup_steps(Config1, setup_steps());
60+
init_per_group(non_parallel_tests, Config) ->
5661
rabbit_ct_helpers:log_environment(),
57-
Config1 = rabbit_ct_helpers:set_config(Config, [
58-
{rmq_nodename_suffix, ?MODULE}
59-
]),
60-
rabbit_ct_helpers:run_setup_steps(
61-
Config1,
62-
[ fun merge_app_env/1 ] ++ rabbit_ct_broker_helpers:setup_steps()).
63-
64-
end_per_suite(Config) ->
62+
Conf = [{rmq_nodename_suffix, non_parallel_tests}],
63+
Config1 = rabbit_ct_helpers:set_config(Config, Conf),
64+
rabbit_ct_helpers:run_setup_steps(Config1, setup_steps()).
65+
66+
end_per_group(_, Config) ->
6567
rabbit_ct_helpers:run_teardown_steps(
6668
Config,
6769
rabbit_ct_broker_helpers:teardown_steps()).
6870

69-
init_per_group(_, Config) ->
70-
Config.
71-
72-
end_per_group(_, Config) ->
73-
Config.
74-
7571
init_per_testcase(Testcase, Config) ->
7672
rabbit_ct_helpers:testcase_started(Config, Testcase),
7773
rabbit_ct_helpers:run_steps(Config,
@@ -83,8 +79,11 @@ end_per_testcase(Testcase, Config) ->
8379
Config,
8480
rabbit_ct_client_helpers:teardown_steps()).
8581

82+
setup_steps() ->
83+
[ fun merge_app_env/1 ] ++ rabbit_ct_broker_helpers:setup_steps().
84+
8685
%% -------------------------------------------------------------------
87-
%% Testcases.
86+
%% Single-node Testcases.
8887
%% -------------------------------------------------------------------
8988

9089
queue_metrics(Config) ->
@@ -329,3 +328,59 @@ x(Name) ->
329328
#resource{ virtual_host = <<"/">>,
330329
kind = exchange,
331330
name = Name }.
331+
332+
%% -------------------------------------------------------------------
333+
%% Cluster Testcases.
334+
%% -------------------------------------------------------------------
335+
336+
cluster_queue_metrics(Config) ->
337+
VHost = <<"/">>,
338+
QueueName = <<"cluster_queue_metrics">>,
339+
PolicyName = <<"ha-policy-1">>,
340+
PolicyPattern = <<".*">>,
341+
PolicyAppliesTo = <<"queues">>,
342+
343+
Node0 = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
344+
Node1 = rabbit_ct_broker_helpers:get_node_config(Config, 1, nodename),
345+
346+
Ch = rabbit_ct_client_helpers:open_channel(Config, Node0),
347+
348+
Node0Name = rabbit_data_coercion:to_binary(Node0),
349+
Definition0 = [{<<"ha-mode">>, <<"nodes">>}, {<<"ha-params">>, [Node0Name]}],
350+
ok = rabbit_ct_broker_helpers:set_policy(Config, 0,
351+
PolicyName, PolicyPattern,
352+
PolicyAppliesTo, Definition0),
353+
354+
amqp_channel:call(Ch, #'queue.declare'{queue = QueueName}),
355+
amqp_channel:call(Ch, #'basic.publish'{routing_key = QueueName},
356+
#amqp_msg{payload = <<"hello">>}),
357+
358+
% Update policy to point to other node
359+
Node1Name = rabbit_data_coercion:to_binary(Node1),
360+
Definition1 = [{<<"ha-mode">>, <<"nodes">>}, {<<"ha-params">>, [Node1Name]}],
361+
ok = rabbit_ct_broker_helpers:set_policy(Config, 0,
362+
PolicyName, PolicyPattern,
363+
PolicyAppliesTo, Definition1),
364+
365+
% Synchronize
366+
Name = rabbit_misc:r(VHost, queue, QueueName),
367+
[#amqqueue{pid = QPid}] = rabbit_ct_broker_helpers:rpc(Config, Node0,
368+
ets, lookup,
369+
[rabbit_queue, Name]),
370+
ok = rabbit_ct_broker_helpers:rpc(Config, Node0, rabbit_amqqueue,
371+
sync_mirrors, [QPid]),
372+
373+
timer:sleep(1500),
374+
375+
% Check ETS table for data
376+
% rabbit_core_metrics:queue_stats
377+
[] = rabbit_ct_broker_helpers:rpc(Config, Node0, ets, tab2list,
378+
[queue_coarse_metrics]),
379+
380+
[{Name, 1, 0, 1, _}] = rabbit_ct_broker_helpers:rpc(Config, Node1, ets,
381+
tab2list,
382+
[queue_coarse_metrics]),
383+
384+
amqp_channel:call(Ch, #'queue.delete'{queue=QueueName}),
385+
rabbit_ct_client_helpers:close_channel(Ch),
386+
Config.

0 commit comments

Comments
 (0)