Skip to content

Commit 40a1659

Browse files
committed
Intercept message directly before routing
Intercept message directly before routing and have this consistent across all protocols since it's easier to reason about.
1 parent 1565eab commit 40a1659

File tree

3 files changed

+7
-7
lines changed

3 files changed

+7
-7
lines changed

deps/rabbit/src/rabbit_amqp_session.erl

+3-3
Original file line numberDiff line numberDiff line change
@@ -2442,12 +2442,12 @@ incoming_link_transfer(
24422442
Mc0 = mc:init(mc_amqp, PayloadBin, #{}),
24432443
case lookup_target(LinkExchange, LinkRKey, Mc0, Vhost, User, PermCache0) of
24442444
{ok, X, RoutingKeys, Mc1, PermCache} ->
2445+
check_user_id(Mc1, User),
2446+
TopicPermCache = check_write_permitted_on_topics(
2447+
X, User, RoutingKeys, TopicPermCache0),
24452448
Mc2 = rabbit_message_interceptor:intercept(Mc1,
24462449
MsgInterceptorCtx,
24472450
incoming_message_interceptors),
2448-
check_user_id(Mc2, User),
2449-
TopicPermCache = check_write_permitted_on_topics(
2450-
X, User, RoutingKeys, TopicPermCache0),
24512451
QNames = rabbit_exchange:route(X, Mc2, #{return_binding_keys => true}),
24522452
rabbit_trace:tap_in(Mc2, QNames, ConnName, ChannelNum, Username, Trace),
24532453
Opts = #{correlation => {HandleInt, DeliveryId}},

deps/rabbit/src/rabbit_channel.erl

+1-1
Original file line numberDiff line numberDiff line change
@@ -1213,10 +1213,10 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin,
12131213
rabbit_misc:precondition_failed("invalid message: ~tp", [Reason]);
12141214
{ok, Message0} ->
12151215
check_write_permitted_on_topics(Exchange, User, Message0, AuthzContext),
1216+
check_user_id_header(Message0, User),
12161217
Message = rabbit_message_interceptor:intercept(Message0,
12171218
MsgInterceptorCtx,
12181219
incoming_message_interceptors),
1219-
check_user_id_header(Message, User),
12201220
QNames = rabbit_exchange:route(Exchange, Message, #{return_binding_keys => true}),
12211221
[deliver_reply(RK, Message) || {virtual_reply_queue, RK} <- QNames],
12221222
Queues = rabbit_amqqueue:lookup_many(QNames),

deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl

+3-3
Original file line numberDiff line numberDiff line change
@@ -1637,11 +1637,11 @@ publish_to_queues(
16371637
Anns = #{?ANN_EXCHANGE => ExchangeNameBin,
16381638
?ANN_ROUTING_KEYS => [mqtt_to_amqp(Topic)]},
16391639
Msg0 = mc:init(mc_mqtt, MqttMsg, Anns, mc_env()),
1640-
Msg = rabbit_message_interceptor:intercept(Msg0,
1641-
msg_interceptor_ctx(State),
1642-
incoming_message_interceptors),
16431640
case rabbit_exchange:lookup(ExchangeName) of
16441641
{ok, Exchange} ->
1642+
Msg = rabbit_message_interceptor:intercept(Msg0,
1643+
msg_interceptor_ctx(State),
1644+
incoming_message_interceptors),
16451645
QNames0 = rabbit_exchange:route(Exchange, Msg, #{return_binding_keys => true}),
16461646
QNames = drop_local(QNames0, State),
16471647
rabbit_trace:tap_in(Msg, QNames, ConnName, Username, TraceState),

0 commit comments

Comments
 (0)