Skip to content

Commit 0f5e23c

Browse files
authored
Add connect_timeout parameter to Connection (#161)
* Add connect_timeout parameter to Connection * Add connect_timeout parameter to Connection - Add unit tests and update Changelog
1 parent 3969118 commit 0f5e23c

File tree

5 files changed

+59
-4
lines changed

5 files changed

+59
-4
lines changed

Changelog

+6
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,12 @@
77
.. contents::
88
:local:
99

10+
Next Release
11+
============
12+
13+
- Add support for ``Connection.connect_timeout`` parameter
14+
15+
1016
.. _version-2.0.0:
1117

1218
2.0.0

Modules/_librabbitmq/connection.c

+14-3
Original file line numberDiff line numberDiff line change
@@ -1052,6 +1052,7 @@ PyRabbitMQ_ConnectionType_init(PyRabbitMQ_Connection *self,
10521052
"frame_max",
10531053
"heartbeat",
10541054
"client_properties",
1055+
"connect_timeout",
10551056
NULL
10561057
};
10571058
char *hostname;
@@ -1063,11 +1064,13 @@ PyRabbitMQ_ConnectionType_init(PyRabbitMQ_Connection *self,
10631064
int frame_max = 131072;
10641065
int heartbeat = 0;
10651066
int port = 5672;
1067+
int connect_timeout = 0;
10661068
PyObject *client_properties = NULL;
10671069

1068-
if (!PyArg_ParseTupleAndKeywords(args, kwargs, "|ssssiiiiO", kwlist,
1070+
if (!PyArg_ParseTupleAndKeywords(args, kwargs, "|ssssiiiiOi", kwlist,
10691071
&hostname, &userid, &password, &virtual_host, &port,
1070-
&channel_max, &frame_max, &heartbeat, &client_properties)) {
1072+
&channel_max, &frame_max, &heartbeat, &client_properties,
1073+
&connect_timeout)) {
10711074
return -1;
10721075
}
10731076

@@ -1089,6 +1092,7 @@ PyRabbitMQ_ConnectionType_init(PyRabbitMQ_Connection *self,
10891092
self->channel_max = channel_max;
10901093
self->frame_max = frame_max;
10911094
self->heartbeat = heartbeat;
1095+
self->connect_timeout = connect_timeout;
10921096
self->weakreflist = NULL;
10931097
self->callbacks = PyDict_New();
10941098
if (self->callbacks == NULL) return -1;
@@ -1127,6 +1131,7 @@ PyRabbitMQ_Connection_connect(PyRabbitMQ_Connection *self)
11271131
amqp_rpc_reply_t reply;
11281132
amqp_pool_t pool;
11291133
amqp_table_t properties;
1134+
struct timeval timeout = {0, 0};
11301135

11311136
pyobject_array_t pyobj_array = {0};
11321137

@@ -1144,7 +1149,13 @@ PyRabbitMQ_Connection_connect(PyRabbitMQ_Connection *self)
11441149
goto error;
11451150
}
11461151
Py_BEGIN_ALLOW_THREADS;
1147-
status = amqp_socket_open(socket, self->hostname, self->port);
1152+
if (self->connect_timeout <= 0) {
1153+
status = amqp_socket_open(socket, self->hostname, self->port);
1154+
} else {
1155+
timeout.tv_sec = self->connect_timeout;
1156+
status = amqp_socket_open_noblock(socket, self->hostname, self->port, &timeout);
1157+
}
1158+
11481159
Py_END_ALLOW_THREADS;
11491160
if (PyRabbitMQ_HandleAMQStatus(status, "Error opening socket")) {
11501161
goto error;

Modules/_librabbitmq/connection.h

+3
Original file line numberDiff line numberDiff line change
@@ -151,6 +151,7 @@ typedef struct {
151151
int frame_max;
152152
int channel_max;
153153
int heartbeat;
154+
int connect_timeout;
154155

155156
int sockfd;
156157
int connected;
@@ -267,6 +268,8 @@ static PyMemberDef PyRabbitMQ_ConnectionType_members[] = {
267268
offsetof(PyRabbitMQ_Connection, frame_max), READONLY, NULL},
268269
{"callbacks", T_OBJECT_EX,
269270
offsetof(PyRabbitMQ_Connection, callbacks), READONLY, NULL},
271+
{"connect_timeout", T_INT,
272+
offsetof(PyRabbitMQ_Connection, connect_timeout), READONLY, NULL},
270273
{NULL, 0, 0, 0, NULL} /* Sentinel */
271274
};
272275

librabbitmq/__init__.py

+2-1
Original file line numberDiff line numberDiff line change
@@ -191,14 +191,15 @@ class Connection(_librabbitmq.Connection):
191191
def __init__(self, host='localhost', userid='guest', password='guest',
192192
virtual_host='/', port=5672, channel_max=0xffff,
193193
frame_max=131072, heartbeat=0, lazy=False,
194-
client_properties=None, **kwargs):
194+
client_properties=None, connect_timeout=None, **kwargs):
195195
if ':' in host:
196196
host, port = host.split(':')
197197
super(Connection, self).__init__(
198198
hostname=host, port=int(port), userid=userid, password=password,
199199
virtual_host=virtual_host, channel_max=channel_max,
200200
frame_max=frame_max, heartbeat=heartbeat,
201201
client_properties=client_properties,
202+
connect_timeout=0 if connect_timeout is None else int(connect_timeout),
202203
)
203204
self.channels = {}
204205
self._used_channel_ids = array('H')

tests/test_functional.py

+34
Original file line numberDiff line numberDiff line change
@@ -6,11 +6,45 @@
66
import socket
77
import unittest
88
from array import array
9+
import time
910

1011
from librabbitmq import Message, Connection, ConnectionError, ChannelError
1112
TEST_QUEUE = 'pyrabbit.testq'
1213

1314

15+
class test_Connection(unittest.TestCase):
16+
def test_connection_defaults(self):
17+
"""Test making a connection with the default settings."""
18+
with Connection() as connection:
19+
self.assertGreaterEqual(connection.fileno(), 0)
20+
21+
def test_connection_invalid_host(self):
22+
"""Test connection to an invalid host fails."""
23+
# Will fail quickly as OS will reject it.
24+
with self.assertRaises(ConnectionError):
25+
Connection(host="255.255.255.255")
26+
27+
def test_connection_invalid_port(self):
28+
"""Test connection to an invalid port fails."""
29+
# Will fail quickly as OS will reject it.
30+
with self.assertRaises(ConnectionError):
31+
Connection(port=0)
32+
33+
def test_connection_timeout(self):
34+
"""Test connection timeout."""
35+
# Can't rely on local OS being configured to ignore SYN packets
36+
# (OS would normally reply with RST to closed port). To test the
37+
# timeout, need to connect to something that is either slow, or
38+
# never responds.
39+
start_time = time.time()
40+
with self.assertRaises(ConnectionError):
41+
Connection(host="google.com", port=81, connect_timeout=3)
42+
took_time = time.time() - start_time
43+
# Allow some leaway to avoid spurious test failures.
44+
self.assertGreaterEqual(took_time, 2)
45+
self.assertLessEqual(took_time, 4)
46+
47+
1448
class test_Channel(unittest.TestCase):
1549

1650
def setUp(self):

0 commit comments

Comments
 (0)