Skip to content

Commit 86b40c1

Browse files
authored
PYTHON-2462 Avoid connection storms: implement pool PAUSED state (#531)
Mark server unknown and clear the pool when background connections fail. Eagerly evict threads from the wait queue when pool is paused. Evicted threads will raise the following error: AutoReconnect('localhost:27017: connection pool paused') Introduce PoolClearedEvent and ConnectionPoolListener.pool_ready. CMAP spec test changes: - CMAP unit tests should not use real monitors - Assert that CMAP threads complete all scheduled operations
1 parent a9d668c commit 86b40c1

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

48 files changed

+1023
-292
lines changed

doc/changelog.rst

+5
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,11 @@ Changelog
44
Changes in Version 4.0
55
----------------------
66

7+
Breaking Changes in 4.0
8+
```````````````````````
9+
10+
- Removed :mod:`~pymongo.thread_util`.
11+
712
Issues Resolved
813
...............
914

pymongo/event_loggers.py

+3
Original file line numberDiff line numberDiff line change
@@ -171,6 +171,9 @@ class ConnectionPoolLogger(monitoring.ConnectionPoolListener):
171171
def pool_created(self, event):
172172
logging.info("[pool {0.address}] pool created".format(event))
173173

174+
def pool_ready(self, event):
175+
logging.info("[pool {0.address}] pool ready".format(event))
176+
174177
def pool_cleared(self, event):
175178
logging.info("[pool {0.address}] pool cleared".format(event))
176179

pymongo/mongo_client.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -737,7 +737,7 @@ def target():
737737

738738
executor = periodic_executor.PeriodicExecutor(
739739
interval=common.KILL_CURSOR_FREQUENCY,
740-
min_interval=0.5,
740+
min_interval=common.MIN_HEARTBEAT_INTERVAL,
741741
target=target,
742742
name="pymongo_kill_cursors_thread")
743743

pymongo/monitoring.py

+34
Original file line numberDiff line numberDiff line change
@@ -255,6 +255,18 @@ def pool_created(self, event):
255255
"""
256256
raise NotImplementedError
257257

258+
def pool_ready(self, event):
259+
"""Abstract method to handle a :class:`PoolReadyEvent`.
260+
261+
Emitted when a Connection Pool is marked ready.
262+
263+
:Parameters:
264+
- `event`: An instance of :class:`PoolReadyEvent`.
265+
266+
.. versionadded:: 4.0
267+
"""
268+
raise NotImplementedError
269+
258270
def pool_cleared(self, event):
259271
"""Abstract method to handle a `PoolClearedEvent`.
260272
@@ -692,6 +704,18 @@ def __repr__(self):
692704
self.__class__.__name__, self.address, self.__options)
693705

694706

707+
class PoolReadyEvent(_PoolEvent):
708+
"""Published when a Connection Pool is marked ready.
709+
710+
:Parameters:
711+
- `address`: The address (host, port) pair of the server this Pool is
712+
attempting to connect to.
713+
714+
.. versionadded:: 4.0
715+
"""
716+
__slots__ = ()
717+
718+
695719
class PoolClearedEvent(_PoolEvent):
696720
"""Published when a Connection Pool is cleared.
697721
@@ -1475,6 +1499,16 @@ def publish_pool_created(self, address, options):
14751499
except Exception:
14761500
_handle_exception()
14771501

1502+
def publish_pool_ready(self, address):
1503+
"""Publish a :class:`PoolReadyEvent` to all pool listeners.
1504+
"""
1505+
event = PoolReadyEvent(address)
1506+
for subscriber in self.__cmap_listeners:
1507+
try:
1508+
subscriber.pool_ready(event)
1509+
except Exception:
1510+
_handle_exception()
1511+
14781512
def publish_pool_cleared(self, address):
14791513
"""Publish a :class:`PoolClearedEvent` to all pool listeners.
14801514
"""

pymongo/pool.py

+107-28
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@
3030
from bson import DEFAULT_CODEC_OPTIONS
3131
from bson.py3compat import imap, itervalues, _unicode, PY3
3232
from bson.son import SON
33-
from pymongo import auth, helpers, thread_util, __version__
33+
from pymongo import auth, helpers, __version__
3434
from pymongo.client_session import _validate_session_write_concern
3535
from pymongo.common import (MAX_BSON_SIZE,
3636
MAX_CONNECTING,
@@ -46,6 +46,7 @@
4646
CertificateError,
4747
ConnectionFailure,
4848
ConfigurationError,
49+
ExceededMaxWaiters,
4950
InvalidOperation,
5051
DocumentTooLarge,
5152
NetworkTimeout,
@@ -309,7 +310,8 @@ class PoolOptions(object):
309310
'__wait_queue_timeout', '__wait_queue_multiple',
310311
'__ssl_context', '__ssl_match_hostname', '__socket_keepalive',
311312
'__event_listeners', '__appname', '__driver', '__metadata',
312-
'__compression_settings', '__max_connecting')
313+
'__compression_settings', '__max_connecting',
314+
'__pause_enabled')
313315

314316
def __init__(self, max_pool_size=MAX_POOL_SIZE,
315317
min_pool_size=MIN_POOL_SIZE,
@@ -318,7 +320,8 @@ def __init__(self, max_pool_size=MAX_POOL_SIZE,
318320
wait_queue_multiple=None, ssl_context=None,
319321
ssl_match_hostname=True, socket_keepalive=True,
320322
event_listeners=None, appname=None, driver=None,
321-
compression_settings=None, max_connecting=MAX_CONNECTING):
323+
compression_settings=None, max_connecting=MAX_CONNECTING,
324+
pause_enabled=True):
322325

323326
self.__max_pool_size = max_pool_size
324327
self.__min_pool_size = min_pool_size
@@ -335,6 +338,7 @@ def __init__(self, max_pool_size=MAX_POOL_SIZE,
335338
self.__driver = driver
336339
self.__compression_settings = compression_settings
337340
self.__max_connecting = max_connecting
341+
self.__pause_enabled = pause_enabled
338342
self.__metadata = copy.deepcopy(_METADATA)
339343
if appname:
340344
self.__metadata['application'] = {'name': appname}
@@ -406,6 +410,10 @@ def max_connecting(self):
406410
"""
407411
return self.__max_connecting
408412

413+
@property
414+
def pause_enabled(self):
415+
return self.__pause_enabled
416+
409417
@property
410418
def max_idle_time_seconds(self):
411419
"""The maximum number of seconds that a connection can remain
@@ -1058,6 +1066,12 @@ class _PoolClosedError(PyMongoError):
10581066
pass
10591067

10601068

1069+
class PoolState(object):
1070+
PAUSED = 1
1071+
READY = 2
1072+
CLOSED = 3
1073+
1074+
10611075
# Do *not* explicitly inherit from object or Jython won't call __del__
10621076
# http://bugs.jython.org/issue1057
10631077
class Pool:
@@ -1068,6 +1082,10 @@ def __init__(self, address, options, handshake=True):
10681082
- `options`: a PoolOptions instance
10691083
- `handshake`: whether to call ismaster for each new SocketInfo
10701084
"""
1085+
if options.pause_enabled:
1086+
self.state = PoolState.PAUSED
1087+
else:
1088+
self.state = PoolState.READY
10711089
# Check a socket's health with socket_closed() every once in a while.
10721090
# Can override for testing: 0 to always check, None to never check.
10731091
self._check_interval_seconds = 1
@@ -1079,7 +1097,6 @@ def __init__(self, address, options, handshake=True):
10791097
self.active_sockets = 0
10801098
# Monotonically increasing connection ID required for CMAP Events.
10811099
self.next_connection_id = 1
1082-
self.closed = False
10831100
# Track whether the sockets in this pool are writeable or not.
10841101
self.is_writable = None
10851102

@@ -1098,13 +1115,23 @@ def __init__(self, address, options, handshake=True):
10981115

10991116
if (self.opts.wait_queue_multiple is None or
11001117
self.opts.max_pool_size is None):
1101-
max_waiters = None
1118+
max_waiters = float('inf')
11021119
else:
11031120
max_waiters = (
11041121
self.opts.max_pool_size * self.opts.wait_queue_multiple)
1105-
1106-
self._socket_semaphore = thread_util.create_semaphore(
1107-
self.opts.max_pool_size, max_waiters)
1122+
# The first portion of the wait queue.
1123+
# Enforces: maxPoolSize and waitQueueMultiple
1124+
# Also used for: clearing the wait queue
1125+
self.size_cond = threading.Condition(self.lock)
1126+
self.requests = 0
1127+
self.max_pool_size = self.opts.max_pool_size
1128+
if self.max_pool_size is None:
1129+
self.max_pool_size = float('inf')
1130+
self.waiters = 0
1131+
self.max_waiters = max_waiters
1132+
# The second portion of the wait queue.
1133+
# Enforces: maxConnecting
1134+
# Also used for: clearing the wait queue
11081135
self._max_connecting_cond = threading.Condition(self.lock)
11091136
self._max_connecting = self.opts.max_connecting
11101137
self._pending = 0
@@ -1114,10 +1141,23 @@ def __init__(self, address, options, handshake=True):
11141141
# Similar to active_sockets but includes threads in the wait queue.
11151142
self.operation_count = 0
11161143

1117-
def _reset(self, close):
1118-
with self.lock:
1144+
def ready(self):
1145+
old_state, self.state = self.state, PoolState.READY
1146+
if old_state != PoolState.READY:
1147+
if self.enabled_for_cmap:
1148+
self.opts.event_listeners.publish_pool_ready(self.address)
1149+
1150+
@property
1151+
def closed(self):
1152+
return self.state == PoolState.CLOSED
1153+
1154+
def _reset(self, close, pause=True):
1155+
old_state = self.state
1156+
with self.size_cond:
11191157
if self.closed:
11201158
return
1159+
if self.opts.pause_enabled and pause:
1160+
old_state, self.state = self.state, PoolState.PAUSED
11211161
self.generation += 1
11221162
newpid = os.getpid()
11231163
if self.pid != newpid:
@@ -1126,7 +1166,10 @@ def _reset(self, close):
11261166
self.operation_count = 0
11271167
sockets, self.sockets = self.sockets, collections.deque()
11281168
if close:
1129-
self.closed = True
1169+
self.state = PoolState.CLOSED
1170+
# Clear the wait queue
1171+
self._max_connecting_cond.notify_all()
1172+
self.size_cond.notify_all()
11301173

11311174
listeners = self.opts.event_listeners
11321175
# CMAP spec says that close() MUST close sockets before publishing the
@@ -1138,7 +1181,7 @@ def _reset(self, close):
11381181
if self.enabled_for_cmap:
11391182
listeners.publish_pool_closed(self.address)
11401183
else:
1141-
if self.enabled_for_cmap:
1184+
if old_state != PoolState.PAUSED and self.enabled_for_cmap:
11421185
listeners.publish_pool_cleared(self.address)
11431186
for sock_info in sockets:
11441187
sock_info.close_socket(ConnectionClosedReason.STALE)
@@ -1155,6 +1198,9 @@ def update_is_writable(self, is_writable):
11551198
def reset(self):
11561199
self._reset(close=False)
11571200

1201+
def reset_without_pause(self):
1202+
self._reset(close=False, pause=False)
1203+
11581204
def close(self):
11591205
self._reset(close=True)
11601206

@@ -1164,6 +1210,9 @@ def remove_stale_sockets(self, reference_generation, all_credentials):
11641210
`generation` at the point in time this operation was requested on the
11651211
pool.
11661212
"""
1213+
if self.state != PoolState.READY:
1214+
return
1215+
11671216
if self.opts.max_idle_time_seconds is not None:
11681217
with self.lock:
11691218
while (self.sockets and
@@ -1172,15 +1221,14 @@ def remove_stale_sockets(self, reference_generation, all_credentials):
11721221
sock_info.close_socket(ConnectionClosedReason.IDLE)
11731222

11741223
while True:
1175-
with self.lock:
1224+
with self.size_cond:
1225+
# There are enough sockets in the pool.
11761226
if (len(self.sockets) + self.active_sockets >=
11771227
self.opts.min_pool_size):
1178-
# There are enough sockets in the pool.
11791228
return
1180-
1181-
# We must acquire the semaphore to respect max_pool_size.
1182-
if not self._socket_semaphore.acquire(False):
1183-
return
1229+
if self.requests >= self.opts.min_pool_size:
1230+
return
1231+
self.requests += 1
11841232
incremented = False
11851233
try:
11861234
with self._max_connecting_cond:
@@ -1204,7 +1252,10 @@ def remove_stale_sockets(self, reference_generation, all_credentials):
12041252
with self._max_connecting_cond:
12051253
self._pending -= 1
12061254
self._max_connecting_cond.notify()
1207-
self._socket_semaphore.release()
1255+
1256+
with self.size_cond:
1257+
self.requests -= 1
1258+
self.size_cond.notify()
12081259

12091260
def connect(self, all_credentials=None):
12101261
"""Connect to Mongo and return a new SocketInfo.
@@ -1289,6 +1340,14 @@ def get_socket(self, all_credentials, checkout=False):
12891340
if not checkout:
12901341
self.return_socket(sock_info)
12911342

1343+
def _raise_if_not_ready(self, emit_event):
1344+
if self.state != PoolState.READY:
1345+
if self.enabled_for_cmap and emit_event:
1346+
self.opts.event_listeners.publish_connection_check_out_failed(
1347+
self.address, ConnectionCheckOutFailedReason.CONN_ERROR)
1348+
_raise_connection_failure(
1349+
self.address, AutoReconnect('connection pool paused'))
1350+
12921351
def _get_socket(self, all_credentials):
12931352
"""Get or create a SocketInfo. Can raise ConnectionFailure."""
12941353
# We use the pid here to avoid issues with fork / multiprocessing.
@@ -1313,9 +1372,26 @@ def _get_socket(self, all_credentials):
13131372
deadline = _time() + self.opts.wait_queue_timeout
13141373
else:
13151374
deadline = None
1316-
if not self._socket_semaphore.acquire(
1317-
True, self.opts.wait_queue_timeout):
1318-
self._raise_wait_queue_timeout()
1375+
1376+
with self.size_cond:
1377+
self._raise_if_not_ready(emit_event=True)
1378+
if self.waiters >= self.max_waiters:
1379+
raise ExceededMaxWaiters(
1380+
'exceeded max waiters: %s threads already waiting' % (
1381+
self.waiters))
1382+
self.waiters += 1
1383+
try:
1384+
while not (self.requests < self.max_pool_size):
1385+
if not _cond_wait(self.size_cond, deadline):
1386+
# Timed out, notify the next thread to ensure a
1387+
# timeout doesn't consume the condition.
1388+
if self.requests < self.max_pool_size:
1389+
self.size_cond.notify()
1390+
self._raise_wait_queue_timeout()
1391+
self._raise_if_not_ready(emit_event=True)
1392+
finally:
1393+
self.waiters -= 1
1394+
self.requests += 1
13191395

13201396
# We've now acquired the semaphore and must release it on error.
13211397
sock_info = None
@@ -1330,6 +1406,7 @@ def _get_socket(self, all_credentials):
13301406
# CMAP: we MUST wait for either maxConnecting OR for a socket
13311407
# to be checked back into the pool.
13321408
with self._max_connecting_cond:
1409+
self._raise_if_not_ready(emit_event=False)
13331410
while not (self.sockets or
13341411
self._pending < self._max_connecting):
13351412
if not _cond_wait(self._max_connecting_cond, deadline):
@@ -1340,6 +1417,7 @@ def _get_socket(self, all_credentials):
13401417
self._max_connecting_cond.notify()
13411418
emitted_event = True
13421419
self._raise_wait_queue_timeout()
1420+
self._raise_if_not_ready(emit_event=False)
13431421

13441422
try:
13451423
sock_info = self.sockets.popleft()
@@ -1361,11 +1439,11 @@ def _get_socket(self, all_credentials):
13611439
if sock_info:
13621440
# We checked out a socket but authentication failed.
13631441
sock_info.close_socket(ConnectionClosedReason.ERROR)
1364-
self._socket_semaphore.release()
1365-
1366-
if incremented:
1367-
with self.lock:
1442+
with self.size_cond:
1443+
self.requests -= 1
1444+
if incremented:
13681445
self.active_sockets -= 1
1446+
self.size_cond.notify()
13691447

13701448
if self.enabled_for_cmap and not emitted_event:
13711449
self.opts.event_listeners.publish_connection_check_out_failed(
@@ -1401,10 +1479,11 @@ def return_socket(self, sock_info):
14011479
# Notify any threads waiting to create a connection.
14021480
self._max_connecting_cond.notify()
14031481

1404-
self._socket_semaphore.release()
1405-
with self.lock:
1482+
with self.size_cond:
1483+
self.requests -= 1
14061484
self.active_sockets -= 1
14071485
self.operation_count -= 1
1486+
self.size_cond.notify()
14081487

14091488
def _perished(self, sock_info):
14101489
"""Return True and close the connection if it is "perished".

0 commit comments

Comments
 (0)