Skip to content

PYTHON-2143 Use an allow-list to determine resumable change stream errors #445

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 28 additions & 8 deletions pymongo/change_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,18 +25,33 @@
from pymongo.collation import validate_collation_or_none
from pymongo.command_cursor import CommandCursor
from pymongo.errors import (ConnectionFailure,
CursorNotFound,
InvalidOperation,
OperationFailure,
PyMongoError)


# The change streams spec considers the following server errors from the
# getMore command non-resumable. All other getMore errors are resumable.
_NON_RESUMABLE_GETMORE_ERRORS = frozenset([
11601, # Interrupted
136, # CappedPositionLost
237, # CursorKilled
None, # No error code was returned.
_RESUMABLE_GETMORE_ERRORS = frozenset([
6, # HostUnreachable
7, # HostNotFound
89, # NetworkTimeout
91, # ShutdownInProgress
189, # PrimarySteppedDown
262, # ExceededTimeLimit
9001, # SocketException
10107, # NotMaster
11600, # InterruptedAtShutdown
11602, # InterruptedDueToReplStateChange
13435, # NotMasterNoSlaveOk
13436, # NotMasterOrSecondary
63, # StaleShardVersion
150, # StaleEpoch
13388, # StaleConfig
234, # RetryChangeStream
133, # FailedToSatisfyReadPreference
216, # ElectionInProgress
])


Expand Down Expand Up @@ -283,12 +298,17 @@ def try_next(self):
# one resume attempt.
try:
change = self._cursor._try_next(True)
except ConnectionFailure:
except (ConnectionFailure, CursorNotFound):
self._resume()
change = self._cursor._try_next(False)
except OperationFailure as exc:
if (exc.code in _NON_RESUMABLE_GETMORE_ERRORS or
exc.has_error_label("NonResumableChangeStreamError")):
if exc._max_wire_version is None:
raise
is_resumable = ((exc._max_wire_version >= 9 and
exc.has_error_label("ResumableChangeStreamError")) or
(exc._max_wire_version < 9 and
exc.code in _RESUMABLE_GETMORE_ERRORS))
if not is_resumable:
raise
self._resume()
change = self._cursor._try_next(False)
Expand Down
8 changes: 7 additions & 1 deletion pymongo/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,14 +144,19 @@ class OperationFailure(PyMongoError):
The :attr:`details` attribute.
"""

def __init__(self, error, code=None, details=None):
def __init__(self, error, code=None, details=None, max_wire_version=None):
error_labels = None
if details is not None:
error_labels = details.get('errorLabels')
super(OperationFailure, self).__init__(
error, error_labels=error_labels)
self.__code = code
self.__details = details
self.__max_wire_version = max_wire_version

@property
def _max_wire_version(self):
return self.__max_wire_version

@property
def code(self):
Expand All @@ -177,6 +182,7 @@ def __str__(self):
return output_str.encode('utf-8', errors='replace')
return output_str


class CursorNotFound(OperationFailure):
"""Raised while iterating query results if the cursor is
invalidated on the server.
Expand Down
25 changes: 16 additions & 9 deletions pymongo/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,15 +102,17 @@ def _index_document(index_list):
return index


def _check_command_response(response, msg=None, allowable_errors=None,
def _check_command_response(response, max_wire_version, msg=None,
allowable_errors=None,
parse_write_concern_error=False):
"""Check the response to a command for errors.
"""
if "ok" not in response:
# Server didn't recognize our message as a command.
raise OperationFailure(response.get("$err"),
response.get("code"),
response)
response,
max_wire_version)

if parse_write_concern_error and 'writeConcernError' in response:
_raise_write_concern_error(response['writeConcernError'])
Expand Down Expand Up @@ -146,25 +148,30 @@ def _check_command_response(response, msg=None, allowable_errors=None,
details.get("assertion", ""))
raise OperationFailure(errmsg,
details.get("assertionCode"),
response)
response,
max_wire_version)

# Other errors
# findAndModify with upsert can raise duplicate key error
if code in (11000, 11001, 12582):
raise DuplicateKeyError(errmsg, code, response)
raise DuplicateKeyError(errmsg, code, response,
max_wire_version)
elif code == 50:
raise ExecutionTimeout(errmsg, code, response)
raise ExecutionTimeout(errmsg, code, response,
max_wire_version)
elif code == 43:
raise CursorNotFound(errmsg, code, response)
raise CursorNotFound(errmsg, code, response,
max_wire_version)

msg = msg or "%s"
raise OperationFailure(msg % errmsg, code, response)
raise OperationFailure(msg % errmsg, code, response,
max_wire_version)


def _check_gle_response(result):
def _check_gle_response(result, max_wire_version):
"""Return getlasterror response as a dict, or raise OperationFailure."""
# Did getlasterror itself fail?
_check_command_response(result)
_check_command_response(result, max_wire_version)

if result.get("wtimeout", False):
# MongoDB versions before 1.8.0 return the error message in an "errmsg"
Expand Down
3 changes: 2 additions & 1 deletion pymongo/network.py
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,8 @@ def command(sock_info, dbname, spec, slave_ok, is_mongos,
client._process_response(response_doc, session)
if check:
helpers._check_command_response(
response_doc, None, allowable_errors,
response_doc, sock_info.max_wire_version, None,
allowable_errors,
parse_write_concern_error=parse_write_concern_error)
except Exception as exc:
if publish:
Expand Down
7 changes: 4 additions & 3 deletions pymongo/pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -606,7 +606,7 @@ def _next_reply(self):
self.more_to_come = reply.more_to_come
unpacked_docs = reply.unpack_response()
response_doc = unpacked_docs[0]
helpers._check_command_response(response_doc)
helpers._check_command_response(response_doc, self.max_wire_version)
return response_doc

def command(self, dbname, spec, slave_ok=False,
Expand Down Expand Up @@ -751,7 +751,8 @@ def legacy_write(self, request_id, msg, max_doc_size, with_last_error):
self.send_message(msg, max_doc_size)
if with_last_error:
reply = self.receive_message(request_id)
return helpers._check_gle_response(reply.command_response())
return helpers._check_gle_response(reply.command_response(),
self.max_wire_version)

def write_command(self, request_id, msg):
"""Send "insert" etc. command, returning response as a dict.
Expand All @@ -767,7 +768,7 @@ def write_command(self, request_id, msg):
result = reply.command_response()

# Raises NotMasterError or OperationFailure.
helpers._check_command_response(result)
helpers._check_command_response(result, self.max_wire_version)
return result

def check_auth(self, all_credentials):
Expand Down
3 changes: 2 additions & 1 deletion pymongo/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,8 @@ def run_operation_with_response(
first = docs[0]
operation.client._process_response(
first, operation.session)
_check_command_response(first)
_check_command_response(
first, sock_info.max_wire_version)
except Exception as exc:
if publish:
duration = datetime.now() - start
Expand Down
53 changes: 48 additions & 5 deletions test/change_streams/change-streams-errors.json
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,6 @@
{
"description": "Change Stream should error when _id is projected out",
"minServerVersion": "4.1.11",
"maxServerVersion": "4.3.3",
"target": "collection",
"topology": [
"replicaset",
Expand Down Expand Up @@ -103,10 +102,54 @@
],
"result": {
"error": {
"code": 280,
"errorLabels": [
"NonResumableChangeStreamError"
]
"code": 280
}
}
},
{
"description": "change stream errors on MaxTimeMSExpired",
"minServerVersion": "4.2",
"failPoint": {
"configureFailPoint": "failCommand",
"mode": {
"times": 1
},
"data": {
"failCommands": [
"getMore"
],
"errorCode": 50,
"closeConnection": false
}
},
"target": "collection",
"topology": [
"replicaset",
"sharded"
],
"changeStreamPipeline": [
{
"$project": {
"_id": 0
}
}
],
"changeStreamOptions": {},
"operations": [
{
"database": "change-stream-tests",
"collection": "test",
"name": "insertOne",
"arguments": {
"document": {
"z": 3
}
}
}
],
"result": {
"error": {
"code": 50
}
}
}
Expand Down
Loading