Skip to content

Commit 1903d9d

Browse files
kjnilssonmkuratczyk
authored andcommitted
[skip-ci] rabbit_chaos module
Start it on all nodes with rabbit_chaos:begin_default()
1 parent 2af6181 commit 1903d9d

File tree

1 file changed

+179
-0
lines changed

1 file changed

+179
-0
lines changed

deps/rabbit/src/rabbit_chaos.erl

+179
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,179 @@
1+
%% This Source Code Form is subject to the terms of the Mozilla Public
2+
%% License, v. 2.0. If a copy of the MPL was not distributed with this
3+
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
4+
%%
5+
%% Copyright (c) 2007-2023 Broadcom. All Rights Reserved. The term “Broadcom”
6+
%% refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
7+
8+
-module(rabbit_chaos).
9+
-behaviour(gen_server).
10+
11+
12+
-rabbit_boot_step({rabbit_chaos,
13+
[{description, "rabbit node chaos server"},
14+
{mfa, {rabbit_sup, start_restartable_child,
15+
[rabbit_chaos]}},
16+
{requires, [database]},
17+
{enables, core_initialized}]}).
18+
19+
-export([start_link/0]).
20+
-export([
21+
begin_default/0,
22+
begin_default/1,
23+
begin_chaos/1
24+
]).
25+
26+
27+
-export([init/1,
28+
handle_call/3,
29+
handle_cast/2,
30+
handle_info/2,
31+
terminate/2,
32+
code_change/3]).
33+
34+
-type chaos_event() :: {Name :: atom(),
35+
{kill_named_proc, Process :: atom()} |
36+
kill_quorum_queue_member,
37+
flood_node}.
38+
39+
-type chaos_cfg() :: #{interval := non_neg_integer(),
40+
events := [chaos_event()]}.
41+
-define(SERVER, ?MODULE).
42+
43+
-record(?MODULE, {cfg :: chaos_cfg()}).
44+
45+
-export_type([chaos_cfg/0,
46+
chaos_event/0]).
47+
48+
%%----------------------------------------------------------------------------
49+
%% A chaos server that can be enabled to create periodic configurable chaos
50+
%% inside the broker.
51+
%%----------------------------------------------------------------------------
52+
53+
begin_default() ->
54+
begin_default(20000).
55+
56+
begin_default(Interval) ->
57+
Events = [
58+
{kill_qq_wal, 1, {kill_named_proc, ra_log_wal}},
59+
{kill_qq_seq_writer, 1, {kill_named_proc, ra_log_segment_writer}},
60+
{kill_qq_member, 2, kill_ra_member},
61+
{kill_qq_member, 2, restart_ra_member},
62+
{flood_a_node, 2, flood_node}
63+
],
64+
begin_chaos(#{interval => Interval,
65+
events => Events}).
66+
67+
begin_chaos(Cfg) ->
68+
gen_server:call(?SERVER, {begin_chaos, Cfg}).
69+
70+
-spec start_link() -> rabbit_types:ok_pid_or_error().
71+
start_link() ->
72+
gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
73+
74+
init([]) ->
75+
process_flag(trap_exit, true),
76+
Cfg = #{interval => 20000,
77+
events => []},
78+
{ok, #?MODULE{cfg = Cfg}}.
79+
80+
handle_call({begin_chaos, #{interval := Interval} = Cfg}, _From, State) ->
81+
_ = erlang:send_after(Interval, self(), do_chaos),
82+
{reply, ok, State#?MODULE{cfg = Cfg}}.
83+
84+
handle_cast(_Request, State) ->
85+
{noreply, State}.
86+
87+
handle_info(do_chaos, #?MODULE{cfg = #{interval := Interval} = Cfg} = State) ->
88+
Events = maps:get(events, Cfg),
89+
{Name, _, Event} = pick_event(Events),
90+
do_event(Name, Event),
91+
_ = erlang:send_after(Interval, self(), do_chaos),
92+
{noreply, State};
93+
handle_info(_, #?MODULE{} = State) ->
94+
{noreply, State}.
95+
96+
terminate(_Reason, #?MODULE{}) ->
97+
ok.
98+
99+
code_change(_OldVsn, State, _Extra) ->
100+
{ok, State}.
101+
102+
%% internal
103+
104+
do_event(Name, {kill_named_proc, ProcName}) ->
105+
rabbit_log:info("~s: doing event ~s...", [?MODULE, Name]),
106+
catch exit(whereis(ProcName), chaos),
107+
ok;
108+
do_event(Name, flood_node) ->
109+
rabbit_log:info("~s: doing event ~s...", [?MODULE, Name]),
110+
%% TODO: avoid if nodes() == []
111+
Nodes = nodes(),
112+
At = rand:uniform(length(Nodes)),
113+
Selected = lists:nth(At, Nodes),
114+
115+
Pid = erpc:call(Selected, erlang, spawn, [fun() -> ok end]),
116+
117+
Data = crypto:strong_rand_bytes(100_000),
118+
Loop = fun F(0) -> ok;
119+
F(N) ->
120+
case erlang:send(Pid, Data, [nosuspend]) of
121+
nosuspend ->
122+
Pid ! Data,
123+
rabbit_log:info("~s: flood of node ~s competed ~s...",
124+
[?MODULE, Selected, Name]),
125+
%% flood complete
126+
ok;
127+
ok ->
128+
F(N-1)
129+
end
130+
end,
131+
132+
Loop(10000),
133+
ok;
134+
do_event(Name, kill_ra_member) ->
135+
rabbit_log:info("~s: doing event ~s...", [?MODULE, Name]),
136+
Procs = ets:tab2list(ra_leaderboard),
137+
At = rand:uniform(length(Procs)),
138+
{Selected, _, _} = lists:nth(At, Procs),
139+
{ok, _, _} = ra:local_query({Selected, node()},
140+
fun (_) -> process_flag(trap_exit, false) end),
141+
catch exit(whereis(Selected), chaos),
142+
ok;
143+
do_event(Name, restart_ra_member = Type) ->
144+
rabbit_log:info("~s: doing event ~s of type ~s", [?MODULE, Name, Type]),
145+
Queues = rabbit_amqqueue:list_local_quorum_queues(),
146+
At = rand:uniform(length(Queues)),
147+
Selected = lists:nth(At, Queues),
148+
{ServerName, _} = amqqueue:get_pid(Selected),
149+
ServerId = {ServerName, node()},
150+
ra:stop_server(quorum_queues, ServerId),
151+
Sleep = rand:uniform(10000) + 1000,
152+
timer:sleep(Sleep),
153+
ra:restart_server(quorum_queues, ServerId),
154+
ok;
155+
do_event(Name, {multi, Num, Interval, Event}) ->
156+
rabbit_log:info("~s: doing multi event ~s...",
157+
[?MODULE, Name]),
158+
catch [begin
159+
do_event(Name, Event),
160+
timer:sleep(Interval)
161+
end || _ <- lists:seq(1, Num)],
162+
ok.
163+
164+
pick_event(Events) ->
165+
TotalWeight = lists:sum([element(2, E) || E <- Events]),
166+
Pick = rand:uniform(TotalWeight),
167+
event_at_weight_point(Pick, 0, Events).
168+
169+
170+
event_at_weight_point(_Pick, _Cur, []) ->
171+
undefined;
172+
event_at_weight_point(Pick, Cur0, [{_, W, _} = E | Events]) ->
173+
Cur = Cur0 + W,
174+
case Pick =< Cur of
175+
true ->
176+
E;
177+
false ->
178+
event_at_weight_point(Pick, Cur, Events)
179+
end.

0 commit comments

Comments
 (0)