Skip to content

Commit 579c586

Browse files
committed
Support AMQP over WebSocket (OSS part)
1 parent 5a46793 commit 579c586

33 files changed

+381
-596
lines changed

README.md

+4-3
Original file line numberDiff line numberDiff line change
@@ -5,13 +5,14 @@
55
[RabbitMQ](https://rabbitmq.com) is a [feature rich](https://www.rabbitmq.com/docs),
66
multi-protocol messaging and streaming broker. It supports:
77

8-
* AMQP 0-9-1
98
* AMQP 1.0
9+
* AMQP 0-9-1
1010
* [RabbitMQ Stream Protocol](https://www.rabbitmq.com/docs/streams)
1111
* MQTT 3.1, 3.1.1, and 5.0
1212
* STOMP 1.0 through 1.2
13-
* [MQTT over WebSockets](https://www.rabbitmq.com/docs/web-mqtt)
14-
* [STOMP over WebSockets](https://www.rabbitmq.com/docs/web-stomp)
13+
* [MQTT over WebSocket](https://www.rabbitmq.com/docs/web-mqtt)
14+
* [STOMP over WebSocket](https://www.rabbitmq.com/docs/web-stomp)
15+
* AMQP 1.0 over WebSocket (supported in [VMware Tanzu RabbitMQ](https://www.vmware.com/products/app-platform/tanzu-rabbitmq))
1516

1617

1718
## Installation

deps/rabbit/include/rabbit_amqp.hrl

-8
Original file line numberDiff line numberDiff line change
@@ -43,14 +43,6 @@
4343
node
4444
] ++ ?AUTH_EVENT_KEYS).
4545

46-
-define(INFO_ITEMS,
47-
[connection_state,
48-
recv_oct,
49-
recv_cnt,
50-
send_oct,
51-
send_cnt
52-
] ++ ?ITEMS).
53-
5446
%% for rabbit_event connection_created
5547
-define(CONNECTION_EVENT_KEYS,
5648
[type,
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
-define(SIMPLE_METRICS, [pid,
2+
recv_oct,
3+
send_oct,
4+
reductions]).
5+
6+
-define(OTHER_METRICS, [recv_cnt,
7+
send_cnt,
8+
send_pend,
9+
state,
10+
channels,
11+
garbage_collection]).
+63
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
%% same values as in rabbit_reader
2+
-define(NORMAL_TIMEOUT, 3_000).
3+
-define(CLOSING_TIMEOUT, 30_000).
4+
-define(SILENT_CLOSE_DELAY, 3_000).
5+
6+
%% Allow for potentially large sets of tokens during the SASL exchange.
7+
%% https://docs.oasis-open.org/amqp/amqp-cbs/v1.0/csd01/amqp-cbs-v1.0-csd01.html#_Toc67999915
8+
-define(INITIAL_MAX_FRAME_SIZE, 8192).
9+
10+
-type protocol() :: amqp | sasl.
11+
-type channel_number() :: non_neg_integer().
12+
-type callback() :: handshake |
13+
{frame_header, protocol()} |
14+
{frame_body, protocol(), DataOffset :: pos_integer(), channel_number()}.
15+
16+
-record(v1_connection,
17+
{name :: binary(),
18+
container_id = none :: none | binary(),
19+
vhost = none :: none | rabbit_types:vhost(),
20+
%% server host
21+
host :: inet:ip_address() | inet:hostname(),
22+
%% client host
23+
peer_host :: inet:ip_address() | inet:hostname(),
24+
%% server port
25+
port :: inet:port_number(),
26+
%% client port
27+
peer_port :: inet:port_number(),
28+
connected_at :: integer(),
29+
user = unauthenticated :: unauthenticated | rabbit_types:user(),
30+
timeout = ?NORMAL_TIMEOUT :: non_neg_integer(),
31+
incoming_max_frame_size = ?INITIAL_MAX_FRAME_SIZE :: pos_integer(),
32+
outgoing_max_frame_size = ?INITIAL_MAX_FRAME_SIZE :: unlimited | pos_integer(),
33+
%% "Prior to any explicit negotiation, [...] the maximum channel number is 0." [2.4.1]
34+
channel_max = 0 :: non_neg_integer(),
35+
auth_mechanism = sasl_init_unprocessed :: sasl_init_unprocessed | {binary(), module()},
36+
auth_state = unauthenticated :: term(),
37+
credential_timer :: undefined | reference(),
38+
properties :: undefined | {map, list(tuple())}
39+
}).
40+
41+
-record(v1,
42+
{parent :: pid(),
43+
helper_sup :: pid(),
44+
writer = none :: none | pid(),
45+
heartbeater = none :: none | rabbit_heartbeat:heartbeaters(),
46+
session_sup = none :: none | pid(),
47+
websocket :: boolean(),
48+
sock :: none | rabbit_net:socket(),
49+
proxy_socket :: undefined | {rabbit_proxy_socket, any(), any()},
50+
connection :: none | #v1_connection{},
51+
connection_state :: waiting_amqp3100 | received_amqp3100 | waiting_sasl_init |
52+
securing | waiting_amqp0100 | waiting_open | running |
53+
closing | closed,
54+
callback :: callback(),
55+
recv_len = 8 :: non_neg_integer(),
56+
pending_recv :: boolean(),
57+
buf :: list(),
58+
buf_len :: non_neg_integer(),
59+
tracked_channels = maps:new() :: #{channel_number() => Session :: pid()},
60+
stats_timer :: rabbit_event:state()
61+
}).
62+
63+
-type state() :: #v1{}.

0 commit comments

Comments
 (0)