Skip to content

Commit c77ec2a

Browse files
committed
Delete queue metrics after sync.
This ensures that the core metrics for a queue is cleared up after a queue master might have moved to another node. [#147753285]
1 parent 744e699 commit c77ec2a

4 files changed

+26
-20
lines changed

src/rabbit_amqqueue_process.erl

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -258,11 +258,13 @@ init_with_backing_queue_state(Q = #amqqueue{exclusive_owner = Owner}, BQ, BQS,
258258
State3.
259259

260260
terminate(shutdown = R, State = #q{backing_queue = BQ}) ->
261+
rabbit_core_metrics:queue_deleted(qname(State)),
261262
terminate_shutdown(fun (BQS) -> BQ:terminate(R, BQS) end, State);
262263
terminate({shutdown, missing_owner} = Reason, State) ->
263264
%% if the owner was missing then there will be no queue, so don't emit stats
264265
terminate_shutdown(terminate_delete(false, Reason, State), State);
265266
terminate({shutdown, _} = R, State = #q{backing_queue = BQ}) ->
267+
rabbit_core_metrics:queue_deleted(qname(State)),
266268
terminate_shutdown(fun (BQS) -> BQ:terminate(R, BQS) end, State);
267269
terminate(normal, State) -> %% delete case
268270
terminate_shutdown(terminate_delete(true, normal, State), State);

src/rabbit_core_metrics_gc.erl

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -54,9 +54,13 @@ terminate(_Reason, #state{timer = TRef}) ->
5454
code_change(_OldVsn, State, _Extra) ->
5555
{ok, State}.
5656

57+
start_timer(Interval, #state{timer = TRef0} = St) ->
58+
timer:cancel(TRef0),
59+
TRef1 = erlang:send_after(Interval, self(), start_gc),
60+
St#state{timer = TRef1}.
61+
5762
start_timer(#state{interval = Interval} = St) ->
58-
TRef = erlang:send_after(Interval, self(), start_gc),
59-
St#state{timer = TRef}.
63+
start_timer(Interval, St).
6064

6165
gc_connections() ->
6266
gc_process(connection_created),

src/rabbit_mirror_queue_sync.erl

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -263,9 +263,17 @@ syncer_loop(Ref, MPid, SPids) ->
263263
%% want.
264264
ok;
265265
{done, Ref} ->
266+
cleanup_metrics(),
266267
[SPid ! {sync_complete, Ref} || SPid <- SPids]
267268
end.
268269

270+
cleanup_metrics() ->
271+
case get(process_name) of
272+
{_Module, Queue} ->
273+
rabbit_core_metrics:queue_deleted(Queue);
274+
_ -> ok
275+
end.
276+
269277
broadcast(SPids, Msg) ->
270278
[begin
271279
credit_flow:send(SPid),

test/rabbit_core_metrics_gc_SUITE.erl

Lines changed: 10 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -343,11 +343,6 @@ cluster_queue_metrics(Config) ->
343343
Node0 = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
344344
Node1 = rabbit_ct_broker_helpers:get_node_config(Config, 1, nodename),
345345

346-
rabbit_ct_broker_helpers:rpc(Config, Node0, erlang, send, [rabbit_core_metrics_gc, start_gc]),
347-
rabbit_ct_broker_helpers:rpc(Config, Node0, gen_server, call, [rabbit_core_metrics_gc, test]),
348-
rabbit_ct_broker_helpers:rpc(Config, Node1, erlang, send, [rabbit_core_metrics_gc, start_gc]),
349-
rabbit_ct_broker_helpers:rpc(Config, Node1, gen_server, call, [rabbit_core_metrics_gc, test]),
350-
351346
Ch = rabbit_ct_client_helpers:open_channel(Config, Node0),
352347

353348
Node0Name = rabbit_data_coercion:to_binary(Node0),
@@ -370,24 +365,21 @@ cluster_queue_metrics(Config) ->
370365
% Synchronize
371366
Name = rabbit_misc:r(VHost, queue, QueueName),
372367
[#amqqueue{pid = QPid}] = rabbit_ct_broker_helpers:rpc(Config, Node0,
373-
ets, lookup, [rabbit_queue, Name]),
374-
ok = rabbit_ct_broker_helpers:rpc(Config, Node0, rabbit_amqqueue, sync_mirrors, [QPid]),
375-
376-
timer:sleep(1000),
368+
ets, lookup,
369+
[rabbit_queue, Name]),
370+
ok = rabbit_ct_broker_helpers:rpc(Config, Node0, rabbit_amqqueue,
371+
sync_mirrors, [QPid]),
377372

378-
rabbit_ct_broker_helpers:rpc(Config, Node0, erlang, send, [rabbit_core_metrics_gc, start_gc]),
379-
rabbit_ct_broker_helpers:rpc(Config, Node0, gen_server, call, [rabbit_core_metrics_gc, test]),
380-
rabbit_ct_broker_helpers:rpc(Config, Node1, erlang, send, [rabbit_core_metrics_gc, start_gc]),
381-
rabbit_ct_broker_helpers:rpc(Config, Node1, gen_server, call, [rabbit_core_metrics_gc, test]),
373+
timer:sleep(1500),
382374

383375
% Check ETS table for data
384376
% rabbit_core_metrics:queue_stats
385-
% {Name, MessagesReady, MessagesUnacknowledge, Messages, Reductions}
386-
% [{{resource,<<"/">>,queue,<<"cluster_queue_metrics">>}, 1,0,1,10524}]
387-
[] = rabbit_ct_broker_helpers:rpc(Config, Node0, ets, tab2list, [queue_coarse_metrics]),
377+
[] = rabbit_ct_broker_helpers:rpc(Config, Node0, ets, tab2list,
378+
[queue_coarse_metrics]),
388379

389-
EtsData1_0 = rabbit_ct_broker_helpers:rpc(Config, Node1, ets, tab2list, [queue_coarse_metrics]),
390-
[{Name, 1, 0, 1, _}] = EtsData1_0,
380+
[{Name, 1, 0, 1, _}] = rabbit_ct_broker_helpers:rpc(Config, Node1, ets,
381+
tab2list,
382+
[queue_coarse_metrics]),
391383

392384
amqp_channel:call(Ch, #'queue.delete'{queue=QueueName}),
393385
rabbit_ct_client_helpers:close_channel(Ch),

0 commit comments

Comments
 (0)