Skip to content

WIP: super stream exchange type #8398

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 9 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion MODULE.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ bazel_dep(

bazel_dep(
name = "rules_erlang",
version = "3.10.5",
version = "3.10.7",
)

bazel_dep(
Expand Down Expand Up @@ -246,6 +246,12 @@ erlang_package.hex_package(
version = "1.4.1",
)

erlang_package.git_package(
branch = "master",
build_file = "@//:bazel/BUILD.murmerl3",
repository = "rabbitmq/murmerl3",
)

erlang_package.hex_package(
name = "thoas",
build_file = "@rabbitmq-server//bazel:BUILD.thoas",
Expand Down Expand Up @@ -357,6 +363,7 @@ use_repo(
"gun",
"jose",
"json",
"murmerl3",
"observer_cli",
"osiris",
"prometheus",
Expand Down
85 changes: 85 additions & 0 deletions bazel/BUILD.murmerl3
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
load("@rules_erlang//:erlang_bytecode2.bzl", "erlang_bytecode", "erlc_opts")
load("@rules_erlang//:erlang_app.bzl", "erlang_app")

erlc_opts(
name = "erlc_opts",
values = select({
"@rules_erlang//:debug_build": [
"+debug_info",
],
"//conditions:default": [
"+debug_info",
"+deterministic",
],
}),
visibility = [":__subpackages__"],
)

erlang_bytecode(
name = "other_beam",
srcs = [
"src/murmerl3.erl",
],
hdrs = [":public_and_private_hdrs"],
app_name = "murmerl3",
dest = "ebin",
erlc_opts = "//:erlc_opts",
)

filegroup(
name = "beam_files",
srcs = [":other_beam"],
)

filegroup(
name = "srcs",
srcs = [
"src/murmerl3.app.src",
"src/murmerl3.erl",
],
)

filegroup(name = "private_hdrs")

filegroup(name = "public_hdrs")

filegroup(name = "priv")

filegroup(
name = "license_files",
srcs = [
"LICENSE",
],
)

filegroup(
name = "public_and_private_hdrs",
srcs = [
":private_hdrs",
":public_hdrs",
],
)

filegroup(
name = "all_srcs",
srcs = [
":public_and_private_hdrs",
":srcs",
],
)

erlang_app(
name = "erlang_app",
srcs = [":all_srcs"],
hdrs = [":public_hdrs"],
app_name = "murmerl3",
beam_files = [":beam_files"],
license_files = [":license_files"],
priv = [":priv"],
)

alias(
name = "murmerl3",
actual = ":erlang_app",
visibility = ["//visibility:public"],
)
1 change: 1 addition & 0 deletions deps/rabbitmq_stream/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ rabbitmq_app(
deps = [
"//deps/rabbit:erlang_app",
"//deps/rabbitmq_stream_common:erlang_app",
"@murmerl3//:erlang_app",
"@osiris//:erlang_app",
"@ranch//:erlang_app",
],
Expand Down
3 changes: 2 additions & 1 deletion deps/rabbitmq_stream/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,10 @@ endef


LOCAL_DEPS = ssl
DEPS = rabbit rabbitmq_stream_common osiris ranch
DEPS = rabbit rabbitmq_stream_common osiris ranch murmerl3
TEST_DEPS = rabbitmq_ct_helpers rabbitmq_ct_client_helpers amqp_client

dep_murmerl3 = git https://github.com/rabbitmq/murmerl3 master
DEP_EARLY_PLUGINS = rabbit_common/mk/rabbitmq-early-plugin.mk
DEP_PLUGINS = rabbit_common/mk/rabbitmq-plugin.mk

Expand Down
3 changes: 3 additions & 0 deletions deps/rabbitmq_stream/app.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ def all_beam_files(name = "all_beam_files"):
"src/Elixir.RabbitMQ.CLI.Ctl.Commands.ListStreamConsumersCommand.erl",
"src/Elixir.RabbitMQ.CLI.Ctl.Commands.ListStreamGroupConsumersCommand.erl",
"src/Elixir.RabbitMQ.CLI.Ctl.Commands.ListStreamPublishersCommand.erl",
"src/rabbit_exchange_type_super_stream.erl",
"src/rabbit_stream.erl",
"src/rabbit_stream_connection_sup.erl",
"src/rabbit_stream_manager.erl",
Expand Down Expand Up @@ -55,6 +56,7 @@ def all_test_beam_files(name = "all_test_beam_files"):
"src/Elixir.RabbitMQ.CLI.Ctl.Commands.ListStreamConsumersCommand.erl",
"src/Elixir.RabbitMQ.CLI.Ctl.Commands.ListStreamGroupConsumersCommand.erl",
"src/Elixir.RabbitMQ.CLI.Ctl.Commands.ListStreamPublishersCommand.erl",
"src/rabbit_exchange_type_super_stream.erl",
"src/rabbit_stream.erl",
"src/rabbit_stream_connection_sup.erl",
"src/rabbit_stream_manager.erl",
Expand Down Expand Up @@ -104,6 +106,7 @@ def all_srcs(name = "all_srcs"):
"src/Elixir.RabbitMQ.CLI.Ctl.Commands.ListStreamConsumersCommand.erl",
"src/Elixir.RabbitMQ.CLI.Ctl.Commands.ListStreamGroupConsumersCommand.erl",
"src/Elixir.RabbitMQ.CLI.Ctl.Commands.ListStreamPublishersCommand.erl",
"src/rabbit_exchange_type_super_stream.erl",
"src/rabbit_stream.erl",
"src/rabbit_stream_connection_sup.erl",
"src/rabbit_stream_manager.erl",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ description() ->
switches() ->
[{partitions, integer},
{routing_keys, string},
{exchange_type, string},
{max_length_bytes, string},
{max_age, string},
{stream_max_segment_size_bytes, string},
Expand All @@ -55,6 +56,10 @@ validate([], _Opts) ->
validate([_Name], #{partitions := _, routing_keys := _}) ->
{validation_failure,
"Specify --partitions or routing-keys, not both."};
validate([_Name],
#{exchange_type := <<"x-super-stream">>, routing_keys := _}) ->
{validation_failure,
"Exchange type x-super-stream cannot be used with routing-keys."};
validate([_Name], #{partitions := Partitions}) when Partitions < 1 ->
{validation_failure, "The partition number must be greater than 0"};
validate([_Name], Opts) ->
Expand Down Expand Up @@ -125,6 +130,17 @@ validate_stream_arguments(#{initial_cluster_size := Value} = Opts) ->
"Invalid value for --initial-cluster-size, the "
"value must be a positive integer."}
end;
validate_stream_arguments(#{exchange_type := Type} = Opts) ->
case Type of
<<"direct">> ->
validate_stream_arguments(maps:remove(exchange_type, Opts));
<<"x-super-stream">> ->
validate_stream_arguments(maps:remove(exchange_type, Opts));
_ ->
{validation_failure,
"Invalid value for --exchange_type, must be one of:"
"'direct' or 'x-super-stream'"}
end;
validate_stream_arguments(_) ->
ok.

Expand All @@ -138,8 +154,7 @@ usage() ->
"s <partitions>] [--routing-keys <routing-keys>]">>.

usage_additional() ->
[[<<"<name>">>,
<<"The name of the super stream.">>],
[[<<"<name>">>, <<"The name of the super stream.">>],
[<<"--vhost <vhost>">>,
<<"The virtual host the super stream is added to.">>],
[<<"--partitions <partitions>">>,
Expand All @@ -162,54 +177,36 @@ usage_doc_guides() ->

run([SuperStream],
#{node := NodeName,
vhost := VHost,
timeout := Timeout,
partitions := Partitions} =
Opts) ->
Streams =
[list_to_binary(binary_to_list(SuperStream)
++ "-"
++ integer_to_list(K))
|| K <- lists:seq(0, Partitions - 1)],
RoutingKeys =
[integer_to_binary(K) || K <- lists:seq(0, Partitions - 1)],
create_super_stream(NodeName,
Timeout,
VHost,
SuperStream,
Streams,
stream_arguments(Opts),
RoutingKeys);
Spec0 = maps:with([vhost, exchange_type], Opts),
Spec =
Spec0#{username => cli_acting_user(),
name => SuperStream,
partitions_source => {partition_count, Partitions},
arguments => stream_arguments(Opts)},
create_super_stream(NodeName, Timeout, Spec);
run([SuperStream],
#{node := NodeName,
vhost := VHost,
timeout := Timeout,
routing_keys := RoutingKeysStr} =
Opts) ->
RoutingKeys =
[rabbit_data_coercion:to_binary(
string:strip(K))
|| K
<- string:tokens(
rabbit_data_coercion:to_list(RoutingKeysStr), ",")],
Streams =
[list_to_binary(binary_to_list(SuperStream)
++ "-"
++ binary_to_list(K))
|| K <- RoutingKeys],
create_super_stream(NodeName,
Timeout,
VHost,
SuperStream,
Streams,
stream_arguments(Opts),
RoutingKeys).
Spec0 = maps:with([vhost, exchange_type], Opts),
RoutingKeys = [string:trim(K) || K <- string:lexemes(RoutingKeysStr, ",")],
Spec =
Spec0#{username => cli_acting_user(),
name => SuperStream,
partitions_source => {routing_keys, RoutingKeys},
arguments => stream_arguments(Opts)},
create_super_stream(NodeName, Timeout, Spec).

stream_arguments(Opts) ->
stream_arguments(#{}, Opts).

%% Something strange, dialyzer infers that map_size/1 returns positive_integer()
-dialyzer({no_match, stream_arguments/2}).

stream_arguments(Acc, Arguments) when map_size(Arguments) =:= 0 ->
Acc;
stream_arguments(Acc, #{max_length_bytes := Value} = Arguments) ->
Expand Down Expand Up @@ -248,28 +245,17 @@ duration_to_seconds([{sign, _},
{seconds, S}]) ->
Y * 365 * 86400 + M * 30 * 86400 + D * 86400 + H * 3600 + Mn * 60 + S.

create_super_stream(NodeName,
Timeout,
VHost,
SuperStream,
Streams,
Arguments,
RoutingKeys) ->
create_super_stream(NodeName, Timeout, Spec) ->
case rabbit_misc:rpc_call(NodeName,
rabbit_stream_manager,
create_super_stream,
[VHost,
SuperStream,
Streams,
Arguments,
RoutingKeys,
cli_acting_user()],
[Spec],
Timeout)
of
ok ->
{ok,
rabbit_misc:format("Super stream ~ts has been created",
[SuperStream])};
[maps:get(name, Spec)])};
Error ->
Error
end.
Expand Down
Loading