Skip to content

Commit 85ac162

Browse files
committed
PYTHON-2143 Use a whitelist to determine resumable change stream errors
1 parent 7420245 commit 85ac162

File tree

6 files changed

+144
-32
lines changed

6 files changed

+144
-32
lines changed

pymongo/change_stream.py

+26-7
Original file line numberDiff line numberDiff line change
@@ -32,11 +32,25 @@
3232

3333
# The change streams spec considers the following server errors from the
3434
# getMore command non-resumable. All other getMore errors are resumable.
35-
_NON_RESUMABLE_GETMORE_ERRORS = frozenset([
36-
11601, # Interrupted
37-
136, # CappedPositionLost
38-
237, # CursorKilled
39-
None, # No error code was returned.
35+
_RESUMABLE_GETMORE_ERRORS = frozenset([
36+
6, # HostUnreachable
37+
7, # HostNotFound
38+
89, # NetworkTimeout
39+
91, # ShutdownInProgress
40+
189, # PrimarySteppedDown
41+
262, # ExceededTimeLimit
42+
9001, # SocketException
43+
10107, # NotMaster
44+
11600, # InterruptedAtShutdown
45+
11602, # InterruptedDueToReplStateChange
46+
13435, # NotMasterNoSlaveOk
47+
13436, # NotMasterOrSecondary
48+
63, # StaleShardVersion
49+
150, # StaleEpoch
50+
13388, # StaleConfig
51+
234, # RetryChangeStream
52+
133, # FailedToSatisfyReadPreference
53+
216 # ElectionInProgress
4054
])
4155

4256

@@ -287,8 +301,13 @@ def try_next(self):
287301
self._resume()
288302
change = self._cursor._try_next(False)
289303
except OperationFailure as exc:
290-
if (exc.code in _NON_RESUMABLE_GETMORE_ERRORS or
291-
exc.has_error_label("NonResumableChangeStreamError")):
304+
if exc.max_wire_version is None:
305+
raise
306+
is_resumable = ((exc.max_wire_version >= 9 and
307+
exc.has_error_label("ResumableChangeStreamError")) or
308+
(exc.max_wire_version < 9 and
309+
exc.code in _RESUMABLE_GETMORE_ERRORS))
310+
if not is_resumable:
292311
raise
293312
self._resume()
294313
change = self._cursor._try_next(False)

pymongo/errors.py

+13-1
Original file line numberDiff line numberDiff line change
@@ -136,18 +136,21 @@ class ConfigurationError(PyMongoError):
136136
class OperationFailure(PyMongoError):
137137
"""Raised when a database operation fails.
138138
139+
.. versionadded:: 3.11
140+
The :attr:`max_wire_version` attribute.
139141
.. versionadded:: 2.7
140142
The :attr:`details` attribute.
141143
"""
142144

143-
def __init__(self, error, code=None, details=None):
145+
def __init__(self, error, code=None, details=None, max_wire_version=None):
144146
error_labels = None
145147
if details is not None:
146148
error_labels = details.get('errorLabels')
147149
super(OperationFailure, self).__init__(
148150
error, error_labels=error_labels)
149151
self.__code = code
150152
self.__details = details
153+
self.__max_wire_version = max_wire_version
151154

152155
@property
153156
def code(self):
@@ -167,6 +170,15 @@ def details(self):
167170
"""
168171
return self.__details
169172

173+
@property
174+
def max_wire_version(self):
175+
"""The latest version of the wire protocol supported by the socket
176+
that was used to run the operation that raised this exception.
177+
178+
PyMongo does not always record this value and it may be None.
179+
"""
180+
return self.__max_wire_version
181+
170182

171183
class CursorNotFound(OperationFailure):
172184
"""Raised while iterating query results if the cursor is

pymongo/helpers.py

+14-7
Original file line numberDiff line numberDiff line change
@@ -103,14 +103,16 @@ def _index_document(index_list):
103103

104104

105105
def _check_command_response(response, msg=None, allowable_errors=None,
106-
parse_write_concern_error=False):
106+
parse_write_concern_error=False,
107+
max_wire_version=None):
107108
"""Check the response to a command for errors.
108109
"""
109110
if "ok" not in response:
110111
# Server didn't recognize our message as a command.
111112
raise OperationFailure(response.get("$err"),
112113
response.get("code"),
113-
response)
114+
response,
115+
max_wire_version)
114116

115117
if parse_write_concern_error and 'writeConcernError' in response:
116118
_raise_write_concern_error(response['writeConcernError'])
@@ -146,19 +148,24 @@ def _check_command_response(response, msg=None, allowable_errors=None,
146148
details.get("assertion", ""))
147149
raise OperationFailure(errmsg,
148150
details.get("assertionCode"),
149-
response)
151+
response,
152+
max_wire_version)
150153

151154
# Other errors
152155
# findAndModify with upsert can raise duplicate key error
153156
if code in (11000, 11001, 12582):
154-
raise DuplicateKeyError(errmsg, code, response)
157+
raise DuplicateKeyError(errmsg, code, response,
158+
max_wire_version)
155159
elif code == 50:
156-
raise ExecutionTimeout(errmsg, code, response)
160+
raise ExecutionTimeout(errmsg, code, response,
161+
max_wire_version)
157162
elif code == 43:
158-
raise CursorNotFound(errmsg, code, response)
163+
raise CursorNotFound(errmsg, code, response,
164+
max_wire_version)
159165

160166
msg = msg or "%s"
161-
raise OperationFailure(msg % errmsg, code, response)
167+
raise OperationFailure(msg % errmsg, code, response,
168+
max_wire_version)
162169

163170

164171
def _check_gle_response(result):

pymongo/server.py

+2-1
Original file line numberDiff line numberDiff line change
@@ -133,7 +133,8 @@ def run_operation_with_response(
133133
first = docs[0]
134134
operation.client._process_response(
135135
first, operation.session)
136-
_check_command_response(first)
136+
_check_command_response(
137+
first, max_wire_version=sock_info.max_wire_version)
137138
except Exception as exc:
138139
if publish:
139140
duration = datetime.now() - start

test/change_streams/change-streams-errors.json

+48-5
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,6 @@
7575
{
7676
"description": "Change Stream should error when _id is projected out",
7777
"minServerVersion": "4.1.11",
78-
"maxServerVersion": "4.3.3",
7978
"target": "collection",
8079
"topology": [
8180
"replicaset",
@@ -103,10 +102,54 @@
103102
],
104103
"result": {
105104
"error": {
106-
"code": 280,
107-
"errorLabels": [
108-
"NonResumableChangeStreamError"
109-
]
105+
"code": 280
106+
}
107+
}
108+
},
109+
{
110+
"description": "change stream errors on MaxTimeMSExpired",
111+
"minServerVersion": "4.2",
112+
"failPoint": {
113+
"configureFailPoint": "failCommand",
114+
"mode": {
115+
"times": 1
116+
},
117+
"data": {
118+
"failCommands": [
119+
"getMore"
120+
],
121+
"errorCode": 50,
122+
"closeConnection": false
123+
}
124+
},
125+
"target": "collection",
126+
"topology": [
127+
"replicaset",
128+
"sharded"
129+
],
130+
"changeStreamPipeline": [
131+
{
132+
"$project": {
133+
"_id": 0
134+
}
135+
}
136+
],
137+
"changeStreamOptions": {},
138+
"operations": [
139+
{
140+
"database": "change-stream-tests",
141+
"collection": "test",
142+
"name": "insertOne",
143+
"arguments": {
144+
"document": {
145+
"z": 3
146+
}
147+
}
148+
}
149+
],
150+
"result": {
151+
"error": {
152+
"code": 50
110153
}
111154
}
112155
}

test/test_change_stream.py

+41-11
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@
3737
from bson.raw_bson import DEFAULT_RAW_BSON_OPTIONS, RawBSONDocument
3838

3939
from pymongo import MongoClient
40-
from pymongo.change_stream import _NON_RESUMABLE_GETMORE_ERRORS
40+
# from pymongo.change_stream import _NON_RESUMABLE_GETMORE_ERRORS
4141
from pymongo.command_cursor import CommandCursor
4242
from pymongo.errors import (InvalidOperation, OperationFailure,
4343
ServerSelectionTimeoutError)
@@ -1075,7 +1075,7 @@ class TestAllScenarios(unittest.TestCase):
10751075
@classmethod
10761076
@client_context.require_connection
10771077
def setUpClass(cls):
1078-
cls.listener = WhiteListEventListener("aggregate")
1078+
cls.listener = WhiteListEventListener("aggregate", "getMore")
10791079
cls.client = rs_or_single_client(event_listeners=[cls.listener])
10801080

10811081
@classmethod
@@ -1086,14 +1086,26 @@ def setUp(self):
10861086
self.listener.results.clear()
10871087

10881088
def setUpCluster(self, scenario_dict):
1089-
assets = [
1090-
(scenario_dict["database_name"], scenario_dict["collection_name"]),
1091-
(scenario_dict["database2_name"], scenario_dict["collection2_name"]),
1092-
]
1089+
assets = [(scenario_dict["database_name"],
1090+
scenario_dict["collection_name"]),
1091+
(scenario_dict.get("database2_name", "db2"),
1092+
scenario_dict.get("collection2_name", "coll2"))]
10931093
for db, coll in assets:
10941094
self.client.drop_database(db)
10951095
self.client[db].create_collection(coll)
10961096

1097+
def setFailPoint(self, scenario_dict):
1098+
fail_point = scenario_dict.get("failPoint")
1099+
if fail_point is None:
1100+
return
1101+
1102+
fail_cmd = SON([('configureFailPoint', 'failCommand')])
1103+
fail_cmd.update(fail_point)
1104+
client_context.client.admin.command(fail_cmd)
1105+
self.addCleanup(
1106+
client_context.client.admin.command,
1107+
'configureFailPoint', fail_cmd['configureFailPoint'], mode='off')
1108+
10971109
def tearDown(self):
10981110
self.listener.results.clear()
10991111

@@ -1147,18 +1159,35 @@ def run_operation(client, operation):
11471159
return cmd(**arguments)
11481160

11491161

1162+
def assert_list_contents_are_subset(superlist, sublist):
1163+
assert len(superlist) == len(sublist)
1164+
for super, sub in zip(superlist, sublist):
1165+
if isinstance(sub, dict):
1166+
assert_dict_is_subset(super, sub)
1167+
continue
1168+
if isinstance(sub, (list, tuple)):
1169+
assert_list_contents_are_subset(super, sub)
1170+
continue
1171+
assert super == sub
1172+
1173+
11501174
def assert_dict_is_subset(superdict, subdict):
11511175
"""Check that subdict is a subset of superdict."""
1152-
exempt_fields = ["documentKey", "_id"]
1176+
exempt_fields = ["documentKey", "_id", "getMore"]
11531177
for key, value in iteritems(subdict):
11541178
if key not in superdict:
11551179
assert False
11561180
if isinstance(value, dict):
11571181
assert_dict_is_subset(superdict[key], value)
11581182
continue
1183+
if isinstance(value, (list, tuple)):
1184+
assert_list_contents_are_subset(superdict[key], value)
1185+
continue
11591186
if key in exempt_fields:
1160-
superdict[key] = "42"
1161-
assert superdict[key] == value
1187+
# Only check for presence of these exempt fields, but not value.
1188+
assert key in superdict
1189+
else:
1190+
assert superdict[key] == value
11621191

11631192

11641193
def check_event(event, expectation_dict):
@@ -1177,6 +1206,7 @@ def create_test(scenario_def, test):
11771206
def run_scenario(self):
11781207
# Set up
11791208
self.setUpCluster(scenario_def)
1209+
self.setFailPoint(test)
11801210
is_error = test["result"].get("error", False)
11811211
try:
11821212
with get_change_stream(
@@ -1208,8 +1238,8 @@ def run_scenario(self):
12081238
finally:
12091239
# Check for expected events
12101240
results = self.listener.results
1211-
for expectation in test.get("expectations", []):
1212-
for idx, (event_type, event_desc) in enumerate(iteritems(expectation)):
1241+
for idx, expectation in enumerate(test.get("expectations", [])):
1242+
for event_type, event_desc in iteritems(expectation):
12131243
results_key = event_type.split("_")[1]
12141244
event = results[results_key][idx] if len(results[results_key]) > idx else None
12151245
check_event(event, event_desc)

0 commit comments

Comments
 (0)