18
18
19
19
-export ([info /2 , initial_state /2 , initial_state /5 ,
20
20
process_frame /2 , amqp_pub /2 , amqp_callback /2 , send_will /1 ,
21
- close_connection /1 ]).
21
+ close_connection /1 , handle_pre_hibernate / 0 ]).
22
22
23
23
% % for testing purposes
24
24
-export ([get_vhost_username /1 , get_vhost /3 , get_vhost_from_user_mapping /2 ,
31
31
-define (APP , rabbitmq_mqtt ).
32
32
-define (FRAME_TYPE (Frame , Type ),
33
33
Frame = # mqtt_frame { fixed = # mqtt_frame_fixed { type = Type }}).
34
+ -define (MAX_TOPIC_PERMISSION_CACHE_SIZE , 12 ).
34
35
35
36
initial_state (Socket , SSLLoginName ) ->
36
37
RealSocket = rabbit_net :unwrap_socket (Socket ),
@@ -903,6 +904,10 @@ close_connection(PState = #proc_state{ connection = Connection,
903
904
PState # proc_state { channels = {undefined , undefined },
904
905
connection = undefined }.
905
906
907
+ handle_pre_hibernate () ->
908
+ erase (topic_permission_cache ),
909
+ ok .
910
+
906
911
% % NB: check_*: MQTT spec says we should ack normally, ie pretend there
907
912
% % was no auth error, but here we are closing the connection with an error. This
908
913
% % is what happens anyway if there is an authorization failure at the AMQP 0-9-1 client level.
@@ -929,28 +934,46 @@ check_topic_access(TopicName, Access,
929
934
exchange = Exchange ,
930
935
client_id = ClientId ,
931
936
mqtt2amqp_fun = Mqtt2AmqpFun }) ->
932
- Resource = # resource {virtual_host = VHost ,
933
- kind = topic ,
934
- name = Exchange },
935
- RoutingKey = Mqtt2AmqpFun (TopicName ),
936
- Context = #{routing_key => RoutingKey ,
937
- variable_map => #{
938
- <<" username" >> => Username ,
939
- <<" vhost" >> => VHost ,
940
- <<" client_id" >> => rabbit_data_coercion :to_binary (ClientId )
941
- }
942
- },
943
-
944
- try rabbit_access_control :check_topic_access (User , Resource , Access , Context ) of
945
- R -> R
946
- catch
947
- _ :{amqp_error , access_refused , Msg , _ } ->
948
- rabbit_log :error (" operation resulted in an error (access_refused): ~p~n " , [Msg ]),
949
- {error , access_refused };
950
- _ :Error ->
951
- rabbit_log :error (" ~p~n " , [Error ]),
952
- {error , access_refused }
953
- end .
937
+ Cache =
938
+ case get (topic_permission_cache ) of
939
+ undefined -> [];
940
+ Other -> Other
941
+ end ,
942
+
943
+ Key = {TopicName , Username , ClientId , VHost , Exchange , Access },
944
+ case lists :member (Key , Cache ) of
945
+ true ->
946
+ ok ;
947
+ false ->
948
+ Resource = # resource {virtual_host = VHost ,
949
+ kind = topic ,
950
+ name = Exchange },
951
+
952
+ RoutingKey = Mqtt2AmqpFun (TopicName ),
953
+ Context = #{routing_key => RoutingKey ,
954
+ variable_map => #{
955
+ <<" username" >> => Username ,
956
+ <<" vhost" >> => VHost ,
957
+ <<" client_id" >> => rabbit_data_coercion :to_binary (ClientId )
958
+ }
959
+ },
960
+
961
+ try rabbit_access_control :check_topic_access (User , Resource , Access , Context ) of
962
+ ok ->
963
+ CacheTail = lists :sublist (Cache , ? MAX_TOPIC_PERMISSION_CACHE_SIZE - 1 ),
964
+ put (topic_permission_cache , [Key | CacheTail ]),
965
+ ok ;
966
+ R ->
967
+ R
968
+ catch
969
+ _ :{amqp_error , access_refused , Msg , _ } ->
970
+ rabbit_log :error (" operation resulted in an error (access_refused): ~p~n " , [Msg ]),
971
+ {error , access_refused };
972
+ _ :Error ->
973
+ rabbit_log :error (" ~p~n " , [Error ]),
974
+ {error , access_refused }
975
+ end
976
+ end .
954
977
955
978
info (consumer_tags , # proc_state {consumer_tags = Val }) -> Val ;
956
979
info (unacked_pubs , # proc_state {unacked_pubs = Val }) -> Val ;
0 commit comments