Skip to content

Commit 26913ea

Browse files
PYTHON-2143 Use an allow-list to determine resumable change stream errors (#445)
2 parents 1f4123e + 04926c6 commit 26913ea

10 files changed

+181
-111
lines changed

pymongo/change_stream.py

+28-8
Original file line numberDiff line numberDiff line change
@@ -25,18 +25,33 @@
2525
from pymongo.collation import validate_collation_or_none
2626
from pymongo.command_cursor import CommandCursor
2727
from pymongo.errors import (ConnectionFailure,
28+
CursorNotFound,
2829
InvalidOperation,
2930
OperationFailure,
3031
PyMongoError)
3132

3233

3334
# The change streams spec considers the following server errors from the
3435
# getMore command non-resumable. All other getMore errors are resumable.
35-
_NON_RESUMABLE_GETMORE_ERRORS = frozenset([
36-
11601, # Interrupted
37-
136, # CappedPositionLost
38-
237, # CursorKilled
39-
None, # No error code was returned.
36+
_RESUMABLE_GETMORE_ERRORS = frozenset([
37+
6, # HostUnreachable
38+
7, # HostNotFound
39+
89, # NetworkTimeout
40+
91, # ShutdownInProgress
41+
189, # PrimarySteppedDown
42+
262, # ExceededTimeLimit
43+
9001, # SocketException
44+
10107, # NotMaster
45+
11600, # InterruptedAtShutdown
46+
11602, # InterruptedDueToReplStateChange
47+
13435, # NotMasterNoSlaveOk
48+
13436, # NotMasterOrSecondary
49+
63, # StaleShardVersion
50+
150, # StaleEpoch
51+
13388, # StaleConfig
52+
234, # RetryChangeStream
53+
133, # FailedToSatisfyReadPreference
54+
216, # ElectionInProgress
4055
])
4156

4257

@@ -283,12 +298,17 @@ def try_next(self):
283298
# one resume attempt.
284299
try:
285300
change = self._cursor._try_next(True)
286-
except ConnectionFailure:
301+
except (ConnectionFailure, CursorNotFound):
287302
self._resume()
288303
change = self._cursor._try_next(False)
289304
except OperationFailure as exc:
290-
if (exc.code in _NON_RESUMABLE_GETMORE_ERRORS or
291-
exc.has_error_label("NonResumableChangeStreamError")):
305+
if exc._max_wire_version is None:
306+
raise
307+
is_resumable = ((exc._max_wire_version >= 9 and
308+
exc.has_error_label("ResumableChangeStreamError")) or
309+
(exc._max_wire_version < 9 and
310+
exc.code in _RESUMABLE_GETMORE_ERRORS))
311+
if not is_resumable:
292312
raise
293313
self._resume()
294314
change = self._cursor._try_next(False)

pymongo/errors.py

+7-1
Original file line numberDiff line numberDiff line change
@@ -144,14 +144,19 @@ class OperationFailure(PyMongoError):
144144
The :attr:`details` attribute.
145145
"""
146146

147-
def __init__(self, error, code=None, details=None):
147+
def __init__(self, error, code=None, details=None, max_wire_version=None):
148148
error_labels = None
149149
if details is not None:
150150
error_labels = details.get('errorLabels')
151151
super(OperationFailure, self).__init__(
152152
error, error_labels=error_labels)
153153
self.__code = code
154154
self.__details = details
155+
self.__max_wire_version = max_wire_version
156+
157+
@property
158+
def _max_wire_version(self):
159+
return self.__max_wire_version
155160

156161
@property
157162
def code(self):
@@ -177,6 +182,7 @@ def __str__(self):
177182
return output_str.encode('utf-8', errors='replace')
178183
return output_str
179184

185+
180186
class CursorNotFound(OperationFailure):
181187
"""Raised while iterating query results if the cursor is
182188
invalidated on the server.

pymongo/helpers.py

+16-9
Original file line numberDiff line numberDiff line change
@@ -102,15 +102,17 @@ def _index_document(index_list):
102102
return index
103103

104104

105-
def _check_command_response(response, msg=None, allowable_errors=None,
105+
def _check_command_response(response, max_wire_version, msg=None,
106+
allowable_errors=None,
106107
parse_write_concern_error=False):
107108
"""Check the response to a command for errors.
108109
"""
109110
if "ok" not in response:
110111
# Server didn't recognize our message as a command.
111112
raise OperationFailure(response.get("$err"),
112113
response.get("code"),
113-
response)
114+
response,
115+
max_wire_version)
114116

115117
if parse_write_concern_error and 'writeConcernError' in response:
116118
_raise_write_concern_error(response['writeConcernError'])
@@ -146,25 +148,30 @@ def _check_command_response(response, msg=None, allowable_errors=None,
146148
details.get("assertion", ""))
147149
raise OperationFailure(errmsg,
148150
details.get("assertionCode"),
149-
response)
151+
response,
152+
max_wire_version)
150153

151154
# Other errors
152155
# findAndModify with upsert can raise duplicate key error
153156
if code in (11000, 11001, 12582):
154-
raise DuplicateKeyError(errmsg, code, response)
157+
raise DuplicateKeyError(errmsg, code, response,
158+
max_wire_version)
155159
elif code == 50:
156-
raise ExecutionTimeout(errmsg, code, response)
160+
raise ExecutionTimeout(errmsg, code, response,
161+
max_wire_version)
157162
elif code == 43:
158-
raise CursorNotFound(errmsg, code, response)
163+
raise CursorNotFound(errmsg, code, response,
164+
max_wire_version)
159165

160166
msg = msg or "%s"
161-
raise OperationFailure(msg % errmsg, code, response)
167+
raise OperationFailure(msg % errmsg, code, response,
168+
max_wire_version)
162169

163170

164-
def _check_gle_response(result):
171+
def _check_gle_response(result, max_wire_version):
165172
"""Return getlasterror response as a dict, or raise OperationFailure."""
166173
# Did getlasterror itself fail?
167-
_check_command_response(result)
174+
_check_command_response(result, max_wire_version)
168175

169176
if result.get("wtimeout", False):
170177
# MongoDB versions before 1.8.0 return the error message in an "errmsg"

pymongo/network.py

+2-1
Original file line numberDiff line numberDiff line change
@@ -157,7 +157,8 @@ def command(sock_info, dbname, spec, slave_ok, is_mongos,
157157
client._process_response(response_doc, session)
158158
if check:
159159
helpers._check_command_response(
160-
response_doc, None, allowable_errors,
160+
response_doc, sock_info.max_wire_version, None,
161+
allowable_errors,
161162
parse_write_concern_error=parse_write_concern_error)
162163
except Exception as exc:
163164
if publish:

pymongo/pool.py

+4-3
Original file line numberDiff line numberDiff line change
@@ -606,7 +606,7 @@ def _next_reply(self):
606606
self.more_to_come = reply.more_to_come
607607
unpacked_docs = reply.unpack_response()
608608
response_doc = unpacked_docs[0]
609-
helpers._check_command_response(response_doc)
609+
helpers._check_command_response(response_doc, self.max_wire_version)
610610
return response_doc
611611

612612
def command(self, dbname, spec, slave_ok=False,
@@ -751,7 +751,8 @@ def legacy_write(self, request_id, msg, max_doc_size, with_last_error):
751751
self.send_message(msg, max_doc_size)
752752
if with_last_error:
753753
reply = self.receive_message(request_id)
754-
return helpers._check_gle_response(reply.command_response())
754+
return helpers._check_gle_response(reply.command_response(),
755+
self.max_wire_version)
755756

756757
def write_command(self, request_id, msg):
757758
"""Send "insert" etc. command, returning response as a dict.
@@ -767,7 +768,7 @@ def write_command(self, request_id, msg):
767768
result = reply.command_response()
768769

769770
# Raises NotMasterError or OperationFailure.
770-
helpers._check_command_response(result)
771+
helpers._check_command_response(result, self.max_wire_version)
771772
return result
772773

773774
def check_auth(self, all_credentials):

pymongo/server.py

+2-1
Original file line numberDiff line numberDiff line change
@@ -133,7 +133,8 @@ def run_operation_with_response(
133133
first = docs[0]
134134
operation.client._process_response(
135135
first, operation.session)
136-
_check_command_response(first)
136+
_check_command_response(
137+
first, sock_info.max_wire_version)
137138
except Exception as exc:
138139
if publish:
139140
duration = datetime.now() - start

test/change_streams/change-streams-errors.json

+48-5
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,6 @@
7575
{
7676
"description": "Change Stream should error when _id is projected out",
7777
"minServerVersion": "4.1.11",
78-
"maxServerVersion": "4.3.3",
7978
"target": "collection",
8079
"topology": [
8180
"replicaset",
@@ -103,10 +102,54 @@
103102
],
104103
"result": {
105104
"error": {
106-
"code": 280,
107-
"errorLabels": [
108-
"NonResumableChangeStreamError"
109-
]
105+
"code": 280
106+
}
107+
}
108+
},
109+
{
110+
"description": "change stream errors on MaxTimeMSExpired",
111+
"minServerVersion": "4.2",
112+
"failPoint": {
113+
"configureFailPoint": "failCommand",
114+
"mode": {
115+
"times": 1
116+
},
117+
"data": {
118+
"failCommands": [
119+
"getMore"
120+
],
121+
"errorCode": 50,
122+
"closeConnection": false
123+
}
124+
},
125+
"target": "collection",
126+
"topology": [
127+
"replicaset",
128+
"sharded"
129+
],
130+
"changeStreamPipeline": [
131+
{
132+
"$project": {
133+
"_id": 0
134+
}
135+
}
136+
],
137+
"changeStreamOptions": {},
138+
"operations": [
139+
{
140+
"database": "change-stream-tests",
141+
"collection": "test",
142+
"name": "insertOne",
143+
"arguments": {
144+
"document": {
145+
"z": 3
146+
}
147+
}
148+
}
149+
],
150+
"result": {
151+
"error": {
152+
"code": 50
110153
}
111154
}
112155
}

0 commit comments

Comments
 (0)