30
30
create /4 ,
31
31
delete /3 ,
32
32
create_super_stream /1 ,
33
- % % obsolete use create_super_stream/1
34
- % create_super_stream/3,
35
- % % obsolete use create_super_stream/1
36
33
create_super_stream /6 ,
37
34
delete_super_stream /3 ,
38
35
lookup_leader /2 ,
43
40
partitions /2 ,
44
41
partition_index /3 ]).
45
42
43
+ % % obsolete use create_super_stream/1
44
+ % create_super_stream/3,
45
+ % % obsolete use create_super_stream/1
46
+
46
47
-record (state , {configuration }).
47
48
48
49
-type super_stream_spec () ::
49
- #{name := binary (),
50
- vhost := binary (),
51
- username := binary (),
52
- partitions_source := {partition_count , pos_integer ()} |
53
- {routing_keys , [binary ()]},
54
- arguments => map (),
55
- exchange_type => binary () % <<"direct">> | <<"x-super-stream">>,
56
- }.
50
+ #{name := binary (),
51
+ vhost := binary (),
52
+ username := binary (),
53
+ partitions_source :=
54
+ {partition_count , pos_integer ()} | {routing_keys , [binary ()]},
55
+ arguments => map (),
56
+ exchange_type => binary ()}.
57
+
58
+ % <<"direct">> | <<"x-super-stream">>,
57
59
58
60
start_link (Conf ) ->
59
61
gen_server :start_link ({local , ? MODULE }, ? MODULE , [Conf ], []).
@@ -88,19 +90,16 @@ create_super_stream(VirtualHost,
88
90
Arguments ,
89
91
RoutingKeys ,
90
92
Username ) ->
91
- Options = #{partitions => Partitions ,
92
- args => Arguments ,
93
- routing_keys => RoutingKeys ,
94
- username => Username },
93
+ Options =
94
+ #{partitions => Partitions ,
95
+ args => Arguments ,
96
+ routing_keys => RoutingKeys ,
97
+ username => Username },
95
98
create_super_stream (VirtualHost , Name , Options ).
96
99
97
-
98
-
99
- -spec create_super_stream (binary (),
100
- binary (),
101
- map ()) -> ok | {error , term ()}.
102
- create_super_stream (VirtualHost ,
103
- Name ,
100
+ -spec create_super_stream (binary (), binary (), map ()) ->
101
+ ok | {error , term ()}.
102
+ create_super_stream (VirtualHost , Name ,
104
103
#{username := Username } = Options ) ->
105
104
Type = maps :get (exchange_type , Options , <<" direct" >>),
106
105
Partitions = maps :get (partitions , Options , []),
@@ -116,30 +115,29 @@ create_super_stream(VirtualHost,
116
115
RoutingKeys ,
117
116
Username }).
118
117
119
-
120
118
-spec create_super_stream (super_stream_spec ()) ->
121
- ok | {error , term ()}.
119
+ ok | {error , term ()}.
122
120
create_super_stream (#{exchange_type := <<" x-super-stream" >>,
123
121
partitions_source := {routing_keys , _ }}) ->
124
122
{error , unsupported_specification };
125
123
create_super_stream (#{name := Name ,
126
124
vhost := VHost ,
127
125
username := Username ,
128
- partitions_source := PartitionSource } = Spec ) ->
126
+ partitions_source := PartitionSource } =
127
+ Spec ) ->
129
128
Type = maps :get (exchange_type , Spec , <<" direct" >>),
130
129
Arguments = maps :get (arguments , Spec , #{}),
131
130
{Partitions , RoutingKeys } =
132
131
case PartitionSource of
133
132
{partition_count , Count } ->
134
- Streams = [ rabbit_stream_utils : partition_name ( Name , K )
135
- || K <- lists : seq ( 0 , Count - 1 )],
136
- Keys = [ integer_to_binary ( K ) ||
137
- K <- lists :seq (0 , Count - 1 )],
133
+ Streams =
134
+ [ rabbit_stream_utils : partition_name ( Name , K )
135
+ || K <- lists : seq ( 0 , Count - 1 )],
136
+ Keys = [ integer_to_binary ( K ) || K <- lists :seq (0 , Count - 1 )],
138
137
{Streams , Keys };
139
138
{routing_keys , Keys } ->
140
139
Streams =
141
- [rabbit_stream_utils :partition_name (Name , K )
142
- || K <- Keys ],
140
+ [rabbit_stream_utils :partition_name (Name , K ) || K <- Keys ],
143
141
{Streams , Keys }
144
142
end ,
145
143
@@ -825,15 +823,16 @@ add_super_stream_binding(VirtualHost,
825
823
ExchangeName = rabbit_misc :r (VirtualHost , exchange , ExchangeNameBin ),
826
824
QueueName = rabbit_misc :r (VirtualHost , queue , QueueNameBin ),
827
825
Pid = self (),
828
- Arguments = case ExchangeType of
829
- <<" direct" >> ->
830
- rabbit_misc :set_table_value ([],
831
- <<" x-stream-partition-order" >>,
832
- long ,
833
- Order );
834
- _ ->
835
- []
836
- end ,
826
+ Arguments =
827
+ case ExchangeType of
828
+ <<" direct" >> ->
829
+ rabbit_misc :set_table_value ([],
830
+ <<" x-stream-partition-order" >>,
831
+ long ,
832
+ Order );
833
+ _ ->
834
+ []
835
+ end ,
837
836
case rabbit_binding :add (# binding {source = ExchangeName ,
838
837
destination = QueueName ,
839
838
key = RoutingKey ,
@@ -952,22 +951,24 @@ is_resource_stream_queue(#resource{kind = queue} = Resource) ->
952
951
is_resource_stream_queue (_ ) ->
953
952
false .
954
953
955
- partition_index (# exchange {name = ExchangeName ,
956
- type = ExchangeType } = Exchange , Stream ) ->
954
+ partition_index (# exchange {name = ExchangeName , type = ExchangeType } =
955
+ Exchange ,
956
+ Stream ) ->
957
957
UnorderedBindings =
958
958
[Binding
959
959
|| Binding = # binding {destination = # resource {name = Q } = D }
960
- <- rabbit_binding :list_for_source (ExchangeName ),
960
+ <- rabbit_binding :list_for_source (ExchangeName ),
961
961
is_resource_stream_queue (D ), Q == Stream ],
962
962
case UnorderedBindings of
963
963
[] ->
964
964
{error , stream_not_found };
965
965
_ when ExchangeType =:= direct ->
966
- Bindings = rabbit_stream_utils :sort_partitions (Exchange , UnorderedBindings ),
966
+ Bindings =
967
+ rabbit_stream_utils :sort_partitions (Exchange ,
968
+ UnorderedBindings ),
967
969
Binding = lists :nth (1 , Bindings ),
968
970
# binding {args = Args } = Binding ,
969
- case rabbit_misc :table_lookup (Args ,
970
- <<" x-stream-partition-order" >>)
971
+ case rabbit_misc :table_lookup (Args , <<" x-stream-partition-order" >>)
971
972
of
972
973
{_ , Order } ->
973
974
Index = rabbit_data_coercion :to_integer (Order ),
0 commit comments