Skip to content

Commit b261996

Browse files
committed
Use channel memory pools
1 parent 5a773c5 commit b261996

File tree

3 files changed

+83
-26
lines changed

3 files changed

+83
-26
lines changed

Modules/_librabbitmq/_amqstate.h

+28-3
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,11 @@
33

44
#include <amqp.h>
55

6+
/* 7 bytes up front, then payload, then 1 byte footer */
7+
#define HEADER_SIZE 7
8+
#define FOOTER_SIZE 1
9+
#define POOL_TABLE_SIZE 16
10+
611
typedef enum amqp_connection_state_enum_ {
712
CONNECTION_STATE_IDLE = 0,
813
CONNECTION_STATE_INITIAL,
@@ -15,23 +20,34 @@ typedef struct amqp_link_t_ {
1520
void *data;
1621
} amqp_link_t;
1722

23+
typedef struct amqp_pool_table_entry_t_ {
24+
struct amqp_pool_table_entry_t_ *next;
25+
amqp_pool_t pool;
26+
amqp_channel_t channel;
27+
} amqp_pool_table_entry_t;
28+
1829
struct amqp_connection_state_t_ {
19-
amqp_pool_t frame_pool;
20-
amqp_pool_t decoding_pool;
30+
amqp_pool_table_entry_t *pool_table[POOL_TABLE_SIZE];
2131

2232
amqp_connection_state_enum state;
2333

2434
int channel_max;
2535
int frame_max;
2636
int heartbeat;
37+
38+
/* buffer for holding frame headers. Allows us to delay allocating
39+
* the raw frame buffer until the type, channel, and size are all known
40+
*/
41+
char header_buffer[HEADER_SIZE + 1];
2742
amqp_bytes_t inbound_buffer;
2843

2944
size_t inbound_offset;
3045
size_t target_size;
3146

3247
amqp_bytes_t outbound_buffer;
3348

34-
int sockfd;
49+
amqp_socket_t *socket;
50+
3551
amqp_bytes_t sock_inbound_buffer;
3652
size_t sock_inbound_offset;
3753
size_t sock_inbound_limit;
@@ -40,6 +56,15 @@ struct amqp_connection_state_t_ {
4056
amqp_link_t *last_queued_frame;
4157

4258
amqp_rpc_reply_t most_recent_api_result;
59+
60+
uint64_t next_recv_heartbeat;
61+
uint64_t next_send_heartbeat;
62+
63+
amqp_table_t server_properties;
64+
amqp_pool_t properties_pool;
4365
};
4466

67+
68+
amqp_pool_t *amqp_get_or_create_channel_pool(amqp_connection_state_t, amqp_channel_t);
69+
4570
#endif /* __PYRMQ_AMQSTATE_H__ */

Modules/_librabbitmq/connection.c

+54-22
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,8 @@ basic_properties_to_PyDict(amqp_basic_properties_t*, PyObject*);
6969
_PYRMQ_INLINE int
7070
PyDict_to_basic_properties(PyObject *,
7171
amqp_basic_properties_t *,
72-
amqp_connection_state_t );
72+
amqp_connection_state_t,
73+
amqp_pool_t *);
7374

7475
_PYRMQ_INLINE void
7576
amqp_basic_deliver_to_PyDict(PyObject *, uint64_t, amqp_bytes_t,
@@ -97,8 +98,8 @@ int PyRabbitMQ_HandleAMQError(PyRabbitMQ_Connection *, unsigned int,
9798
void PyRabbitMQ_SetErr_UnexpectedHeader(amqp_frame_t*);
9899
int PyRabbitMQ_Not_Connected(PyRabbitMQ_Connection *);
99100

100-
static amqp_table_t PyDict_ToAMQTable(amqp_connection_state_t, PyObject *);
101-
static amqp_array_t PyList_ToAMQArray(amqp_connection_state_t, PyObject *);
101+
static amqp_table_t PyDict_ToAMQTable(amqp_connection_state_t, PyObject *, amqp_pool_t *);
102+
static amqp_array_t PyList_ToAMQArray(amqp_connection_state_t, PyObject *, amqp_pool_t *);
102103

103104
static PyObject* AMQTable_toPyDict(amqp_table_t *table);
104105
static PyObject* AMQArray_toPyList(amqp_array_t *array);
@@ -211,7 +212,7 @@ AMQArray_SetIntValue(amqp_array_t *array, int value)
211212
}
212213

213214
static amqp_table_t
214-
PyDict_ToAMQTable(amqp_connection_state_t conn, PyObject *src)
215+
PyDict_ToAMQTable(amqp_connection_state_t conn, PyObject *src, amqp_pool_t *pool)
215216
{
216217
PyObject *dkey = NULL;
217218
PyObject *dvalue = NULL;
@@ -226,21 +227,20 @@ PyDict_ToAMQTable(amqp_connection_state_t conn, PyObject *src)
226227

227228
/* allocate new table */
228229
dst.num_entries = 0;
229-
dst.entries = amqp_pool_alloc(&conn->frame_pool,
230-
size * sizeof(amqp_table_entry_t));
230+
dst.entries = amqp_pool_alloc(pool, size * sizeof(amqp_table_entry_t));
231231
while (PyDict_Next(src, &pos, &dkey, &dvalue)) {
232232

233233
if (PyDict_Check(dvalue)) {
234234
/* Dict */
235235
AMQTable_SetTableValue(&dst,
236236
PyString_AS_AMQBYTES(dkey),
237-
PyDict_ToAMQTable(conn, dvalue));
237+
PyDict_ToAMQTable(conn, dvalue, pool));
238238
}
239239
else if (PyList_Check(dvalue)) {
240240
/* List */
241241
AMQTable_SetArrayValue(&dst,
242242
PyString_AS_AMQBYTES(dkey),
243-
PyList_ToAMQArray(conn, dvalue));
243+
PyList_ToAMQArray(conn, dvalue, pool));
244244
}
245245
else if (PyLong_Check(dvalue) || PyInt_Check(dvalue)) {
246246
/* Int | Long */
@@ -293,7 +293,7 @@ PyDict_ToAMQTable(amqp_connection_state_t conn, PyObject *src)
293293
}
294294

295295
static amqp_array_t
296-
PyList_ToAMQArray(amqp_connection_state_t conn, PyObject *src)
296+
PyList_ToAMQArray(amqp_connection_state_t conn, PyObject *src, amqp_pool_t *pool)
297297
{
298298
PyObject *dvalue = NULL;
299299
PY_SIZE_TYPE size = 0;
@@ -306,21 +306,20 @@ PyList_ToAMQArray(amqp_connection_state_t conn, PyObject *src)
306306

307307
/* allocate new array */
308308
dst.num_entries = 0;
309-
dst.entries = amqp_pool_alloc(&conn->frame_pool,
310-
size * sizeof(amqp_field_value_t));
309+
dst.entries = amqp_pool_alloc(pool, size * sizeof(amqp_field_value_t));
311310
for (pos = 0; pos < size; ++pos) {
312311

313312
dvalue = PyList_GetItem(src, pos);
314313

315314
if (PyDict_Check(dvalue)) {
316315
/* Dict */
317316
AMQArray_SetTableValue(&dst,
318-
PyDict_ToAMQTable(conn, dvalue));
317+
PyDict_ToAMQTable(conn, dvalue, pool));
319318
}
320319
else if (PyList_Check(dvalue)) {
321320
/* List */
322321
AMQArray_SetArrayValue(&dst,
323-
PyList_ToAMQArray(conn, dvalue));
322+
PyList_ToAMQArray(conn, dvalue, pool));
324323
}
325324
else if (PyLong_Check(dvalue) || PyInt_Check(dvalue)) {
326325
/* Int | Long */
@@ -618,7 +617,8 @@ AMQArray_toPyList(amqp_array_t *array)
618617
_PYRMQ_INLINE int
619618
PyDict_to_basic_properties(PyObject *p,
620619
amqp_basic_properties_t *props,
621-
amqp_connection_state_t conn)
620+
amqp_connection_state_t conn,
621+
amqp_pool_t *pool)
622622
{
623623
PyObject *value = NULL;
624624
props->headers = AMQP_EMPTY_TABLE;
@@ -683,7 +683,7 @@ PyDict_to_basic_properties(PyObject *p,
683683
}
684684

685685
if ((value = PyDict_GetItemString(p, "headers")) != NULL) {
686-
props->headers = PyDict_ToAMQTable(conn, value);
686+
props->headers = PyDict_ToAMQTable(conn, value, pool);
687687
if (PyErr_Occurred())
688688
return -1;
689689
}
@@ -1000,7 +1000,6 @@ PyRabbitMQ_Connection_close(PyRabbitMQ_Connection *self)
10001000
Py_BEGIN_ALLOW_THREADS
10011001
reply = amqp_connection_close(self->conn, AMQP_REPLY_SUCCESS);
10021002
amqp_destroy_connection(self->conn);
1003-
close(self->sockfd);
10041003
self->sockfd = 0;
10051004
Py_END_ALLOW_THREADS
10061005
}
@@ -1300,6 +1299,7 @@ PyRabbitMQ_Connection_queue_bind(PyRabbitMQ_Connection *self,
13001299
PyObject *arguments = NULL;
13011300

13021301
amqp_table_t bargs = AMQP_EMPTY_TABLE;
1302+
amqp_pool_t *channel_pool = NULL;
13031303
amqp_rpc_reply_t reply;
13041304

13051305
if (PyRabbitMQ_Not_Connected(self))
@@ -1312,7 +1312,12 @@ PyRabbitMQ_Connection_queue_bind(PyRabbitMQ_Connection *self,
13121312
if ((exchange = Maybe_Unicode(exchange)) == NULL) goto bail;
13131313
if ((routing_key = Maybe_Unicode(routing_key)) == NULL) goto bail;
13141314

1315-
bargs = PyDict_ToAMQTable(self->conn, arguments);
1315+
channel_pool = amqp_get_or_create_channel_pool(self->conn, (amqp_channel_t)channel);
1316+
if (channel_pool == NULL) {
1317+
PyErr_NoMemory();
1318+
goto bail;
1319+
}
1320+
bargs = PyDict_ToAMQTable(self->conn, arguments, channel_pool);
13161321
if (PyErr_Occurred())
13171322
goto bail;
13181323

@@ -1349,6 +1354,7 @@ PyRabbitMQ_Connection_queue_unbind(PyRabbitMQ_Connection *self,
13491354
PyObject *arguments = NULL;
13501355

13511356
amqp_table_t uargs = AMQP_EMPTY_TABLE;
1357+
amqp_pool_t *channel_pool = NULL;
13521358
amqp_rpc_reply_t reply;
13531359

13541360
if (PyRabbitMQ_Not_Connected(self))
@@ -1360,7 +1366,13 @@ PyRabbitMQ_Connection_queue_unbind(PyRabbitMQ_Connection *self,
13601366
if ((queue = Maybe_Unicode(queue)) == NULL) goto bail;
13611367
if ((exchange = Maybe_Unicode(exchange)) == NULL) goto bail;
13621368
if ((routing_key = Maybe_Unicode(routing_key)) == NULL) goto bail;
1363-
uargs = PyDict_ToAMQTable(self->conn, arguments);
1369+
1370+
channel_pool = amqp_get_or_create_channel_pool(self->conn, (amqp_channel_t)channel);
1371+
if (channel_pool == NULL) {
1372+
PyErr_NoMemory();
1373+
goto bail;
1374+
}
1375+
uargs = PyDict_ToAMQTable(self->conn, arguments, channel_pool);
13641376
if (PyErr_Occurred())
13651377
goto bail;
13661378

@@ -1442,6 +1454,7 @@ PyRabbitMQ_Connection_queue_declare(PyRabbitMQ_Connection *self,
14421454

14431455
amqp_queue_declare_ok_t *ok;
14441456
amqp_rpc_reply_t reply;
1457+
amqp_pool_t *channel_pool = NULL;
14451458
amqp_table_t qargs = AMQP_EMPTY_TABLE;
14461459
PyObject *ret = NULL;
14471460

@@ -1453,7 +1466,12 @@ PyRabbitMQ_Connection_queue_declare(PyRabbitMQ_Connection *self,
14531466
&exclusive, &auto_delete, &arguments))
14541467
goto bail;
14551468
if ((queue = Maybe_Unicode(queue)) == NULL) goto bail;
1456-
qargs = PyDict_ToAMQTable(self->conn, arguments);
1469+
channel_pool = amqp_get_or_create_channel_pool(self->conn, (amqp_channel_t)channel);
1470+
if (channel_pool == NULL) {
1471+
PyErr_NoMemory();
1472+
goto bail;
1473+
}
1474+
qargs = PyDict_ToAMQTable(self->conn, arguments, channel_pool);
14571475
if (PyErr_Occurred())
14581476
goto bail;
14591477

@@ -1538,6 +1556,7 @@ PyRabbitMQ_Connection_exchange_declare(PyRabbitMQ_Connection *self,
15381556
unsigned int auto_delete = 0;
15391557

15401558
amqp_table_t eargs = AMQP_EMPTY_TABLE;
1559+
amqp_pool_t *channel_pool = NULL;
15411560
amqp_rpc_reply_t reply;
15421561

15431562
if (PyRabbitMQ_Not_Connected(self))
@@ -1549,7 +1568,12 @@ PyRabbitMQ_Connection_exchange_declare(PyRabbitMQ_Connection *self,
15491568
goto bail;
15501569
if ((exchange = Maybe_Unicode(exchange)) == NULL) goto bail;
15511570
if ((type = Maybe_Unicode(type)) == NULL) goto bail;
1552-
eargs = PyDict_ToAMQTable(self->conn, arguments);
1571+
channel_pool = amqp_get_or_create_channel_pool(self->conn, (amqp_channel_t)channel);
1572+
if (channel_pool == NULL) {
1573+
PyErr_NoMemory();
1574+
goto bail;
1575+
}
1576+
eargs = PyDict_ToAMQTable(self->conn, arguments, channel_pool);
15531577
if (PyErr_Occurred())
15541578
goto bail;
15551579

@@ -1627,6 +1651,7 @@ PyRabbitMQ_Connection_basic_publish(PyRabbitMQ_Connection *self,
16271651
int ret = 0;
16281652
amqp_basic_properties_t props;
16291653
amqp_bytes_t bytes;
1654+
amqp_pool_t *channel_pool = NULL;
16301655
memset(&props, 0, sizeof(props));
16311656

16321657
if (PyRabbitMQ_Not_Connected(self))
@@ -1640,7 +1665,8 @@ PyRabbitMQ_Connection_basic_publish(PyRabbitMQ_Connection *self,
16401665
if ((routing_key = Maybe_Unicode(routing_key)) == NULL) goto bail;
16411666

16421667
Py_INCREF(propdict);
1643-
if (!PyDict_to_basic_properties(propdict, &props, self->conn))
1668+
channel_pool = amqp_get_or_create_channel_pool(self->conn, (amqp_channel_t)channel);
1669+
if (!PyDict_to_basic_properties(propdict, &props, self->conn, channel_pool))
16441670
goto bail;
16451671
Py_DECREF(propdict);
16461672

@@ -1792,6 +1818,7 @@ PyRabbitMQ_Connection_basic_consume(PyRabbitMQ_Connection *self,
17921818

17931819
amqp_basic_consume_ok_t *ok;
17941820
amqp_rpc_reply_t reply;
1821+
amqp_pool_t *channel_pool = NULL;
17951822
amqp_table_t cargs = AMQP_EMPTY_TABLE;
17961823

17971824
if (PyRabbitMQ_Not_Connected(self))
@@ -1804,7 +1831,12 @@ PyRabbitMQ_Connection_basic_consume(PyRabbitMQ_Connection *self,
18041831
if ((queue = Maybe_Unicode(queue)) == NULL) goto bail;
18051832
if ((consumer_tag = Maybe_Unicode(consumer_tag)) == NULL) goto bail;
18061833

1807-
cargs = PyDict_ToAMQTable(self->conn, arguments);
1834+
channel_pool = amqp_get_or_create_channel_pool(self->conn, (amqp_channel_t)channel);
1835+
if (channel_pool == NULL) {
1836+
PyErr_NoMemory();
1837+
goto bail;
1838+
}
1839+
cargs = PyDict_ToAMQTable(self->conn, arguments, channel_pool);
18081840
if (PyErr_Occurred()) goto bail;
18091841

18101842
Py_BEGIN_ALLOW_THREADS;

Modules/_librabbitmq/connection.h

+1-1
Original file line numberDiff line numberDiff line change
@@ -148,7 +148,7 @@ typedef struct {
148148

149149
int
150150
PyDict_to_basic_properties(PyObject *, amqp_basic_properties_t *,
151-
amqp_connection_state_t);
151+
amqp_connection_state_t, amqp_pool_t *);
152152

153153
/* Connection method sigs */
154154
static PyRabbitMQ_Connection*

0 commit comments

Comments
 (0)