diff --git a/source/change-streams/change-streams.rst b/source/change-streams/change-streams.rst index 7c75cc4262..27cb704df6 100644 --- a/source/change-streams/change-streams.rst +++ b/source/change-streams/change-streams.rst @@ -9,8 +9,8 @@ Change Streams :Status: Accepted :Type: Standards :Minimum Server Version: 3.6 -:Last Modified: 2022-02-10 -:Version: 1.12 +:Last Modified: 2022-03-25 +:Version: 1.12.1 .. contents:: @@ -132,12 +132,20 @@ If an aggregate command with a ``$changeStream`` stage completes successfully, t * * @note: Whether a change is reported as an event of the operation type * `update` or `replace` is a server implementation detail. + * + * @note: Drivers MUST not err when an unknown operationType is encountered. + * The server will add new operationType values in the future and drivers + * must not err when they encounter a new operationType. Unknown operationType + * values may be represented by "unknown". */ operationType: "insert" | "update" | "replace" | "delete" | "invalidate" | "drop" | "dropDatabase" | "rename"; /** * Contains two fields: “db” and “coll” containing the database and * collection name in which the change happened. + * + * @note: Drivers MUST not err when extra fields are encountered in the ns document + * as the server may add new fields in the future such as viewOn. */ ns: Document; @@ -462,6 +470,11 @@ Presently change streams support only a subset of available aggregation stages: A driver MUST NOT throw an exception if any unsupported stage is provided, but instead depend on the server to return an error. +A driver MUST NOT throw an exception if a user adds, removes, or modifies fields using ``$project``. The server will produce an error if ``_id`` +is projected out, but a user should otherwise be able to modify the shape of the change stream event as desired. This may require the result +to be deserialized to a ``BsonDocument`` or custom-defined type rather than a ``ChangeStreamDocument``. It is the responsiblity of the +user to ensure that the deserialized type is compatible with the specified ``$project`` stage. + The aggregate helper methods MUST have no new logic related to the ``$changeStream`` stage. Drivers MUST be capable of handling `TAILABLE_AWAIT `_ cursors from the aggregate command in the same way they handle such cursors from find. ``Collection.watch`` helper @@ -921,3 +934,5 @@ Changelog | 2022-02-10 | Specified that ``getMore`` command must explicitly send | | | inherited ``comment``. | +------------+------------------------------------------------------------+ +| 2022-03-25 | Do not error when parsing change stream event documents. | ++------------+------------------------------------------------------------+ diff --git a/source/change-streams/tests/unified/change-streams.json b/source/change-streams/tests/unified/change-streams.json index bc9d462ee3..4fc745f97e 100644 --- a/source/change-streams/tests/unified/change-streams.json +++ b/source/change-streams/tests/unified/change-streams.json @@ -2,6 +2,9 @@ "description": "change-streams", "schemaVersion": "1.0", "runOnRequirements": [ + { + "minServerVersion": "3.6" + }, { "topologies": [ "replicaset", @@ -167,7 +170,6 @@ "description": "Test with document comment - pre 4.4", "runOnRequirements": [ { - "minServerVersion": "3.6.0", "maxServerVersion": "4.2.99" } ], @@ -211,11 +213,6 @@ }, { "description": "Test with string comment", - "runOnRequirements": [ - { - "minServerVersion": "3.6.0" - } - ], "operations": [ { "name": "createChangeStream", @@ -333,6 +330,259 @@ ] } ] + }, + { + "description": "Test unknown operationType MUST NOT err", + "operations": [ + { + "name": "createChangeStream", + "object": "collection0", + "arguments": { + "pipeline": [ + { + "$project": { + "operationType": "addedInFutureMongoDBVersion", + "ns": 1 + } + } + ] + }, + "saveResultAsEntity": "changeStream0" + }, + { + "name": "insertOne", + "object": "collection0", + "arguments": { + "document": { + "_id": 1, + "a": 1 + } + } + }, + { + "name": "iterateUntilDocumentOrError", + "object": "changeStream0", + "expectResult": { + "operationType": "addedInFutureMongoDBVersion", + "ns": { + "db": "database0", + "coll": "collection0" + } + } + } + ] + }, + { + "description": "Test newField added in response MUST NOT err", + "operations": [ + { + "name": "createChangeStream", + "object": "collection0", + "arguments": { + "pipeline": [ + { + "$project": { + "operationType": 1, + "ns": 1, + "newField": "newFieldValue" + } + } + ] + }, + "saveResultAsEntity": "changeStream0" + }, + { + "name": "insertOne", + "object": "collection0", + "arguments": { + "document": { + "_id": 1, + "a": 1 + } + } + }, + { + "name": "iterateUntilDocumentOrError", + "object": "changeStream0", + "expectResult": { + "operationType": "insert", + "ns": { + "db": "database0", + "coll": "collection0" + }, + "newField": "newFieldValue" + } + } + ] + }, + { + "description": "Test new structure in ns document MUST NOT err", + "operations": [ + { + "name": "createChangeStream", + "object": "collection0", + "arguments": { + "pipeline": [ + { + "$project": { + "operationType": "insert", + "ns.viewOn": "db.coll" + } + } + ] + }, + "saveResultAsEntity": "changeStream0" + }, + { + "name": "insertOne", + "object": "collection0", + "arguments": { + "document": { + "_id": 1, + "a": 1 + } + } + }, + { + "name": "iterateUntilDocumentOrError", + "object": "changeStream0", + "expectResult": { + "operationType": "insert", + "ns": { + "viewOn": "db.coll" + } + } + } + ] + }, + { + "description": "Test modified structure in ns document MUST NOT err", + "operations": [ + { + "name": "createChangeStream", + "object": "collection0", + "arguments": { + "pipeline": [ + { + "$project": { + "operationType": "insert", + "ns": { + "db": "$ns.db", + "coll": "$ns.coll", + "viewOn": "db.coll" + } + } + } + ] + }, + "saveResultAsEntity": "changeStream0" + }, + { + "name": "insertOne", + "object": "collection0", + "arguments": { + "document": { + "_id": 1, + "a": 1 + } + } + }, + { + "name": "iterateUntilDocumentOrError", + "object": "changeStream0", + "expectResult": { + "operationType": "insert", + "ns": { + "db": "database0", + "coll": "collection0", + "viewOn": "db.coll" + } + } + } + ] + }, + { + "description": "Test server error on projecting out _id", + "operations": [ + { + "name": "createChangeStream", + "object": "collection0", + "arguments": { + "pipeline": [ + { + "$project": { + "_id": 0 + } + } + ] + }, + "saveResultAsEntity": "changeStream0" + }, + { + "name": "insertOne", + "object": "collection0", + "arguments": { + "document": { + "_id": 1, + "a": 1 + } + } + }, + { + "name": "iterateUntilDocumentOrError", + "object": "changeStream0", + "expectError": { + "errorCode": 280, + "errorCodeName": "ChangeStreamFatalError", + "errorLabelsContain": [ + "NonResumableChangeStreamError" + ] + } + } + ] + }, + { + "description": "Test projection in change stream returns expected fields", + "operations": [ + { + "name": "createChangeStream", + "object": "collection0", + "arguments": { + "pipeline": [ + { + "$project": { + "optype": "$operationType", + "ns": 1, + "newField": "value" + } + } + ] + }, + "saveResultAsEntity": "changeStream0" + }, + { + "name": "insertOne", + "object": "collection0", + "arguments": { + "document": { + "_id": 1, + "a": 1 + } + } + }, + { + "name": "iterateUntilDocumentOrError", + "object": "changeStream0", + "expectResult": { + "optype": "insert", + "ns": { + "db": "database0", + "coll": "collection0" + }, + "newField": "value" + } + } + ] } ] } diff --git a/source/change-streams/tests/unified/change-streams.yml b/source/change-streams/tests/unified/change-streams.yml index a8468c9bc7..bca5c0d913 100644 --- a/source/change-streams/tests/unified/change-streams.yml +++ b/source/change-streams/tests/unified/change-streams.yml @@ -1,6 +1,7 @@ description: "change-streams" schemaVersion: "1.0" runOnRequirements: + - minServerVersion: "3.6" - topologies: [ replicaset, sharded-replicaset ] createEntities: - client: @@ -97,8 +98,7 @@ tests: - description: "Test with document comment - pre 4.4" runOnRequirements: - - minServerVersion: "3.6.0" - maxServerVersion: "4.2.99" + - maxServerVersion: "4.2.99" operations: - name: createChangeStream object: *collection0 @@ -118,8 +118,6 @@ tests: comment: *comment0 - description: "Test with string comment" - runOnRequirements: - - minServerVersion: "3.6.0" operations: - name: createChangeStream object: *collection0 @@ -177,3 +175,123 @@ tests: comment: "comment" commandName: getMore databaseName: *database0 + + - description: "Test unknown operationType MUST NOT err" + operations: + - name: createChangeStream + object: *collection0 + arguments: + # using $project to simulate future changes to ChangeStreamDocument structure + pipeline: [ { $project: { operationType: "addedInFutureMongoDBVersion", ns: 1 } } ] + saveResultAsEntity: &changeStream0 changeStream0 + - name: insertOne + object: *collection0 + arguments: + document: { "_id": 1, "a": 1 } + - name: iterateUntilDocumentOrError + object: *changeStream0 + expectResult: + operationType: "addedInFutureMongoDBVersion" + ns: + db: *database0 + coll: *collection0 + + - description: "Test newField added in response MUST NOT err" + operations: + - name: createChangeStream + object: *collection0 + arguments: + # using $project to simulate future changes to ChangeStreamDocument structure + pipeline: [ { $project: { operationType: 1, ns: 1, newField: "newFieldValue" } } ] + saveResultAsEntity: &changeStream0 changeStream0 + - name: insertOne + object: *collection0 + arguments: + document: { "_id": 1, "a": 1 } + - name: iterateUntilDocumentOrError + object: *changeStream0 + expectResult: + operationType: "insert" + ns: + db: *database0 + coll: *collection0 + newField: "newFieldValue" + + - description: "Test new structure in ns document MUST NOT err" + operations: + - name: createChangeStream + object: *collection0 + arguments: + # using $project to simulate future changes to ChangeStreamDocument structure + pipeline: [ { $project: { operationType: "insert", "ns.viewOn": "db.coll" } } ] + saveResultAsEntity: &changeStream0 changeStream0 + - name: insertOne + object: *collection0 + arguments: + document: { "_id": 1, "a": 1 } + - name: iterateUntilDocumentOrError + object: *changeStream0 + expectResult: + operationType: "insert" + ns: + viewOn: "db.coll" + + - description: "Test modified structure in ns document MUST NOT err" + operations: + - name: createChangeStream + object: *collection0 + arguments: + # using $project to simulate future changes to ChangeStreamDocument structure + pipeline: [ { $project: { operationType: "insert", ns: { db: "$ns.db", coll: "$ns.coll", viewOn: "db.coll" } } } ] + saveResultAsEntity: &changeStream0 changeStream0 + - name: insertOne + object: *collection0 + arguments: + document: { "_id": 1, "a": 1 } + - name: iterateUntilDocumentOrError + object: *changeStream0 + expectResult: + operationType: "insert" + ns: + db: *database0 + coll: *collection0 + viewOn: "db.coll" + + - description: "Test server error on projecting out _id" + operations: + - name: createChangeStream + object: *collection0 + arguments: + pipeline: [ { $project: { _id: 0 } } ] + saveResultAsEntity: &changeStream0 changeStream0 + - name: insertOne + object: *collection0 + arguments: + document: { "_id": 1, "a": 1 } + - name: iterateUntilDocumentOrError + object: *changeStream0 + expectError: + errorCode: 280 + errorCodeName: "ChangeStreamFatalError" + errorLabelsContain: [ "NonResumableChangeStreamError" ] + + + - description: "Test projection in change stream returns expected fields" + operations: + - name: createChangeStream + object: *collection0 + arguments: + pipeline: [ { $project: { optype: "$operationType", ns: 1, newField: "value" } } ] + saveResultAsEntity: &changeStream0 changeStream0 + - name: insertOne + object: *collection0 + arguments: + document: { "_id": 1, "a": 1 } + - name: iterateUntilDocumentOrError + object: *changeStream0 + expectResult: + optype: "insert" + ns: + db: *database0 + coll: *collection0 + newField: "value"