Skip to content
This repository was archived by the owner on Nov 17, 2020. It is now read-only.

Commit bb3bd56

Browse files
Merge pull request #217 from velimir/cache-topic-access
cache topic access (cherry picked from commit 1c9e9d7)
1 parent d3ee921 commit bb3bd56

File tree

2 files changed

+57
-24
lines changed

2 files changed

+57
-24
lines changed

src/rabbit_mqtt_processor.erl

Lines changed: 52 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818

1919
-export([info/2, initial_state/2, initial_state/5,
2020
process_frame/2, amqp_pub/2, amqp_callback/2, send_will/1,
21-
close_connection/1]).
21+
close_connection/1, handle_pre_hibernate/0]).
2222

2323
%% for testing purposes
2424
-export([get_vhost_username/1, get_vhost/3, get_vhost_from_user_mapping/2,
@@ -31,6 +31,7 @@
3131
-define(APP, rabbitmq_mqtt).
3232
-define(FRAME_TYPE(Frame, Type),
3333
Frame = #mqtt_frame{ fixed = #mqtt_frame_fixed{ type = Type }}).
34+
-define(MAX_TOPIC_PERMISSION_CACHE_SIZE, 12).
3435

3536
initial_state(Socket, SSLLoginName) ->
3637
RealSocket = rabbit_net:unwrap_socket(Socket),
@@ -884,9 +885,19 @@ close_connection(PState = #proc_state{ connection = Connection,
884885
PState #proc_state{ channels = {undefined, undefined},
885886
connection = undefined }.
886887

888+
<<<<<<< HEAD
887889
% NB: check_*: MQTT spec says we should ack normally, ie pretend there
888890
% was no auth error, but here we are closing the connection with an error. This
889891
% is what happens anyway if there is an authorization failure at the AMQP level.
892+
=======
893+
handle_pre_hibernate() ->
894+
erase(topic_permission_cache),
895+
ok.
896+
897+
%% NB: check_*: MQTT spec says we should ack normally, ie pretend there
898+
%% was no auth error, but here we are closing the connection with an error. This
899+
%% is what happens anyway if there is an authorization failure at the AMQP 0-9-1 client level.
900+
>>>>>>> 1c9e9d7... Merge pull request #217 from velimir/cache-topic-access
890901

891902
check_publish(TopicName, Fn, PState) ->
892903
case check_topic_access(TopicName, write, PState) of
@@ -910,28 +921,46 @@ check_topic_access(TopicName, Access,
910921
exchange = Exchange,
911922
client_id = ClientId,
912923
mqtt2amqp_fun = Mqtt2AmqpFun }) ->
913-
Resource = #resource{virtual_host = VHost,
914-
kind = topic,
915-
name = Exchange},
916-
RoutingKey = Mqtt2AmqpFun(TopicName),
917-
Context = #{routing_key => RoutingKey,
918-
variable_map => #{
919-
<<"username">> => Username,
920-
<<"vhost">> => VHost,
921-
<<"client_id">> => rabbit_data_coercion:to_binary(ClientId)
922-
}
923-
},
924-
925-
try rabbit_access_control:check_topic_access(User, Resource, Access, Context) of
926-
R -> R
927-
catch
928-
_:{amqp_error, access_refused, Msg, _} ->
929-
rabbit_log:error("operation resulted in an error (access_refused): ~p~n", [Msg]),
930-
{error, access_refused};
931-
_:Error ->
932-
rabbit_log:error("~p~n", [Error]),
933-
{error, access_refused}
934-
end.
924+
Cache =
925+
case get(topic_permission_cache) of
926+
undefined -> [];
927+
Other -> Other
928+
end,
929+
930+
Key = {TopicName, Username, ClientId, VHost, Exchange, Access},
931+
case lists:member(Key, Cache) of
932+
true ->
933+
ok;
934+
false ->
935+
Resource = #resource{virtual_host = VHost,
936+
kind = topic,
937+
name = Exchange},
938+
939+
RoutingKey = Mqtt2AmqpFun(TopicName),
940+
Context = #{routing_key => RoutingKey,
941+
variable_map => #{
942+
<<"username">> => Username,
943+
<<"vhost">> => VHost,
944+
<<"client_id">> => rabbit_data_coercion:to_binary(ClientId)
945+
}
946+
},
947+
948+
try rabbit_access_control:check_topic_access(User, Resource, Access, Context) of
949+
ok ->
950+
CacheTail = lists:sublist(Cache, ?MAX_TOPIC_PERMISSION_CACHE_SIZE - 1),
951+
put(topic_permission_cache, [Key | CacheTail]),
952+
ok;
953+
R ->
954+
R
955+
catch
956+
_:{amqp_error, access_refused, Msg, _} ->
957+
rabbit_log:error("operation resulted in an error (access_refused): ~p~n", [Msg]),
958+
{error, access_refused};
959+
_:Error ->
960+
rabbit_log:error("~p~n", [Error]),
961+
{error, access_refused}
962+
end
963+
end.
935964

936965
info(consumer_tags, #proc_state{consumer_tags = Val}) -> Val;
937966
info(unacked_pubs, #proc_state{unacked_pubs = Val}) -> Val;

src/rabbit_mqtt_reader.erl

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@
2424

2525
-export([start_link/2]).
2626
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
27-
code_change/3, terminate/2]).
27+
code_change/3, terminate/2, handle_pre_hibernate/1]).
2828

2929
-export([conserve_resources/3, start_keepalive/2]).
3030

@@ -189,6 +189,10 @@ terminate(Reason, State) ->
189189
maybe_emit_stats(State),
190190
do_terminate(Reason, State).
191191

192+
handle_pre_hibernate(State) ->
193+
rabbit_mqtt_processor:handle_pre_hibernate(),
194+
{hibernate, State}.
195+
192196
do_terminate({network_error, {ssl_upgrade_error, closed}, ConnStr}, _State) ->
193197
rabbit_log_connection:error("MQTT detected TLS upgrade error on ~s: connection closed~n",
194198
[ConnStr]);

0 commit comments

Comments
 (0)