Skip to content

DRIVERS-1995: Do not error when parsing change stream event documents #1164

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 17 additions & 2 deletions source/change-streams/change-streams.rst
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ Change Streams
:Status: Accepted
:Type: Standards
:Minimum Server Version: 3.6
:Last Modified: 2022-02-10
:Version: 1.12
:Last Modified: 2022-03-25
:Version: 1.12.1

.. contents::

Expand Down Expand Up @@ -132,12 +132,20 @@ If an aggregate command with a ``$changeStream`` stage completes successfully, t
*
* @note: Whether a change is reported as an event of the operation type
* `update` or `replace` is a server implementation detail.
*
* @note: Drivers MUST not err when an unknown operationType is encountered.
* The server will add new operationType values in the future and drivers
* must not err when they encounter a new operationType. Unknown operationType
* values may be represented by "unknown".
*/
operationType: "insert" | "update" | "replace" | "delete" | "invalidate" | "drop" | "dropDatabase" | "rename";

/**
* Contains two fields: “db” and “coll” containing the database and
* collection name in which the change happened.
*
* @note: Drivers MUST not err when extra fields are encountered in the ns document
* as the server may add new fields in the future such as viewOn.
*/
ns: Document;

Expand Down Expand Up @@ -462,6 +470,11 @@ Presently change streams support only a subset of available aggregation stages:

A driver MUST NOT throw an exception if any unsupported stage is provided, but instead depend on the server to return an error.

A driver MUST NOT throw an exception if a user adds, removes, or modifies fields using ``$project``. The server will produce an error if ``_id``
is projected out, but a user should otherwise be able to modify the shape of the change stream event as desired. This may require the result
to be deserialized to a ``BsonDocument`` or custom-defined type rather than a ``ChangeStreamDocument``. It is the responsiblity of the
user to ensure that the deserialized type is compatible with the specified ``$project`` stage.

The aggregate helper methods MUST have no new logic related to the ``$changeStream`` stage. Drivers MUST be capable of handling `TAILABLE_AWAIT <https://github.com/mongodb/specifications/blob/master/source/crud/crud.rst#read>`_ cursors from the aggregate command in the same way they handle such cursors from find.

``Collection.watch`` helper
Expand Down Expand Up @@ -921,3 +934,5 @@ Changelog
| 2022-02-10 | Specified that ``getMore`` command must explicitly send |
| | inherited ``comment``. |
+------------+------------------------------------------------------------+
| 2022-03-25 | Do not error when parsing change stream event documents. |
+------------+------------------------------------------------------------+
262 changes: 256 additions & 6 deletions source/change-streams/tests/unified/change-streams.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@
"description": "change-streams",
"schemaVersion": "1.0",
"runOnRequirements": [
{
"minServerVersion": "3.6"
},
{
"topologies": [
"replicaset",
Expand Down Expand Up @@ -167,7 +170,6 @@
"description": "Test with document comment - pre 4.4",
"runOnRequirements": [
{
"minServerVersion": "3.6.0",
"maxServerVersion": "4.2.99"
}
],
Expand Down Expand Up @@ -211,11 +213,6 @@
},
{
"description": "Test with string comment",
"runOnRequirements": [
{
"minServerVersion": "3.6.0"
}
],
"operations": [
{
"name": "createChangeStream",
Expand Down Expand Up @@ -333,6 +330,259 @@
]
}
]
},
{
"description": "Test unknown operationType MUST NOT err",
"operations": [
{
"name": "createChangeStream",
"object": "collection0",
"arguments": {
"pipeline": [
{
"$project": {
"operationType": "addedInFutureMongoDBVersion",
"ns": 1
}
}
]
},
"saveResultAsEntity": "changeStream0"
},
{
"name": "insertOne",
"object": "collection0",
"arguments": {
"document": {
"_id": 1,
"a": 1
}
}
},
{
"name": "iterateUntilDocumentOrError",
"object": "changeStream0",
"expectResult": {
"operationType": "addedInFutureMongoDBVersion",
"ns": {
"db": "database0",
"coll": "collection0"
}
}
}
]
},
{
"description": "Test newField added in response MUST NOT err",
"operations": [
{
"name": "createChangeStream",
"object": "collection0",
"arguments": {
"pipeline": [
{
"$project": {
"operationType": 1,
"ns": 1,
"newField": "newFieldValue"
}
}
]
},
"saveResultAsEntity": "changeStream0"
},
{
"name": "insertOne",
"object": "collection0",
"arguments": {
"document": {
"_id": 1,
"a": 1
}
}
},
{
"name": "iterateUntilDocumentOrError",
"object": "changeStream0",
"expectResult": {
"operationType": "insert",
"ns": {
"db": "database0",
"coll": "collection0"
},
"newField": "newFieldValue"
}
}
]
},
{
"description": "Test new structure in ns document MUST NOT err",
"operations": [
{
"name": "createChangeStream",
"object": "collection0",
"arguments": {
"pipeline": [
{
"$project": {
"operationType": "insert",
"ns.viewOn": "db.coll"
}
}
]
},
"saveResultAsEntity": "changeStream0"
},
{
"name": "insertOne",
"object": "collection0",
"arguments": {
"document": {
"_id": 1,
"a": 1
}
}
},
{
"name": "iterateUntilDocumentOrError",
"object": "changeStream0",
"expectResult": {
"operationType": "insert",
"ns": {
"viewOn": "db.coll"
}
}
}
]
},
{
"description": "Test modified structure in ns document MUST NOT err",
"operations": [
{
"name": "createChangeStream",
"object": "collection0",
"arguments": {
"pipeline": [
{
"$project": {
"operationType": "insert",
"ns": {
"db": "$ns.db",
"coll": "$ns.coll",
"viewOn": "db.coll"
}
}
}
]
},
"saveResultAsEntity": "changeStream0"
},
{
"name": "insertOne",
"object": "collection0",
"arguments": {
"document": {
"_id": 1,
"a": 1
}
}
},
{
"name": "iterateUntilDocumentOrError",
"object": "changeStream0",
"expectResult": {
"operationType": "insert",
"ns": {
"db": "database0",
"coll": "collection0",
"viewOn": "db.coll"
}
}
}
]
},
{
"description": "Test server error on projecting out _id",
"operations": [
{
"name": "createChangeStream",
"object": "collection0",
"arguments": {
"pipeline": [
{
"$project": {
"_id": 0
}
}
]
},
"saveResultAsEntity": "changeStream0"
},
{
"name": "insertOne",
"object": "collection0",
"arguments": {
"document": {
"_id": 1,
"a": 1
}
}
},
{
"name": "iterateUntilDocumentOrError",
"object": "changeStream0",
"expectError": {
"errorCode": 280,
"errorCodeName": "ChangeStreamFatalError",
"errorLabelsContain": [
"NonResumableChangeStreamError"
]
}
}
]
},
{
"description": "Test projection in change stream returns expected fields",
"operations": [
{
"name": "createChangeStream",
"object": "collection0",
"arguments": {
"pipeline": [
{
"$project": {
"optype": "$operationType",
"ns": 1,
"newField": "value"
}
}
]
},
"saveResultAsEntity": "changeStream0"
},
{
"name": "insertOne",
"object": "collection0",
"arguments": {
"document": {
"_id": 1,
"a": 1
}
}
},
{
"name": "iterateUntilDocumentOrError",
"object": "changeStream0",
"expectResult": {
"optype": "insert",
"ns": {
"db": "database0",
"coll": "collection0"
},
"newField": "value"
}
}
]
}
]
}
Loading