Skip to content

Commit dbed22d

Browse files
committed
DRIVERS-1995: Do not error when parsing change stream event documents (#1164)
1 parent af7c992 commit dbed22d

File tree

3 files changed

+398
-17
lines changed

3 files changed

+398
-17
lines changed

source/change-streams/change-streams.rst

+17-3
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,8 @@ Change Streams
99
:Status: Accepted
1010
:Type: Standards
1111
:Minimum Server Version: 3.6
12-
:Last Modified: 2022-03-14
13-
:Version: 1.13
12+
:Last Modified: 2022-03-25
13+
:Version: 1.13.1
1414

1515
.. contents::
1616

@@ -132,12 +132,19 @@ If an aggregate command with a ``$changeStream`` stage completes successfully, t
132132
*
133133
* @note: Whether a change is reported as an event of the operation type
134134
* `update` or `replace` is a server implementation detail.
135+
*
136+
* @note: The server will add new `operationType` values in the future and drivers
137+
* MUST NOT err when they encounter a new `operationType`. Unknown `operationType`
138+
* values may be represented by "unknown" or the literal string value.
135139
*/
136140
operationType: "insert" | "update" | "replace" | "delete" | "invalidate" | "drop" | "dropDatabase" | "rename";
137141
138142
/**
139-
* Contains two fields: “db” and coll containing the database and
143+
* Contains two fields: "db" and "coll" containing the database and
140144
* collection name in which the change happened.
145+
*
146+
* @note: Drivers MUST NOT err when extra fields are encountered in the `ns` document
147+
* as the server may add new fields in the future such as `viewOn`.
141148
*/
142149
ns: Document;
143150
@@ -471,6 +478,11 @@ Presently change streams support only a subset of available aggregation stages:
471478

472479
A driver MUST NOT throw an exception if any unsupported stage is provided, but instead depend on the server to return an error.
473480

481+
A driver MUST NOT throw an exception if a user adds, removes, or modifies fields using ``$project``. The server will produce an error if ``_id``
482+
is projected out, but a user should otherwise be able to modify the shape of the change stream event as desired. This may require the result
483+
to be deserialized to a ``BsonDocument`` or custom-defined type rather than a ``ChangeStreamDocument``. It is the responsiblity of the
484+
user to ensure that the deserialized type is compatible with the specified ``$project`` stage.
485+
474486
The aggregate helper methods MUST have no new logic related to the ``$changeStream`` stage. Drivers MUST be capable of handling `TAILABLE_AWAIT <https://github.com/mongodb/specifications/blob/master/source/crud/crud.rst#read>`_ cursors from the aggregate command in the same way they handle such cursors from find.
475487

476488
``Collection.watch`` helper
@@ -932,3 +944,5 @@ Changelog
932944
+------------+------------------------------------------------------------+
933945
| 2022-02-28 | Added ``to`` to ``ChangeStreamDocument``. |
934946
+------------+------------------------------------------------------------+
947+
| 2022-03-25 | Do not error when parsing change stream event documents. |
948+
+------------+------------------------------------------------------------+

source/change-streams/tests/unified/change-streams.json

+256-7
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,9 @@
22
"description": "change-streams",
33
"schemaVersion": "1.0",
44
"runOnRequirements": [
5+
{
6+
"minServerVersion": "3.6"
7+
},
58
{
69
"topologies": [
710
"replicaset",
@@ -167,7 +170,6 @@
167170
"description": "Test with document comment - pre 4.4",
168171
"runOnRequirements": [
169172
{
170-
"minServerVersion": "3.6.0",
171173
"maxServerVersion": "4.2.99"
172174
}
173175
],
@@ -211,11 +213,6 @@
211213
},
212214
{
213215
"description": "Test with string comment",
214-
"runOnRequirements": [
215-
{
216-
"minServerVersion": "3.6.0"
217-
}
218-
],
219216
"operations": [
220217
{
221218
"name": "createChangeStream",
@@ -343,7 +340,6 @@
343340
"description": "Test that comment is not set on getMore - pre 4.4",
344341
"runOnRequirements": [
345342
{
346-
"minServerVersion": "3.6.0",
347343
"maxServerVersion": "4.3.99",
348344
"topologies": [
349345
"replicaset"
@@ -466,6 +462,259 @@
466462
}
467463
}
468464
]
465+
},
466+
{
467+
"description": "Test unknown operationType MUST NOT err",
468+
"operations": [
469+
{
470+
"name": "createChangeStream",
471+
"object": "collection0",
472+
"arguments": {
473+
"pipeline": [
474+
{
475+
"$project": {
476+
"operationType": "addedInFutureMongoDBVersion",
477+
"ns": 1
478+
}
479+
}
480+
]
481+
},
482+
"saveResultAsEntity": "changeStream0"
483+
},
484+
{
485+
"name": "insertOne",
486+
"object": "collection0",
487+
"arguments": {
488+
"document": {
489+
"_id": 1,
490+
"a": 1
491+
}
492+
}
493+
},
494+
{
495+
"name": "iterateUntilDocumentOrError",
496+
"object": "changeStream0",
497+
"expectResult": {
498+
"operationType": "addedInFutureMongoDBVersion",
499+
"ns": {
500+
"db": "database0",
501+
"coll": "collection0"
502+
}
503+
}
504+
}
505+
]
506+
},
507+
{
508+
"description": "Test newField added in response MUST NOT err",
509+
"operations": [
510+
{
511+
"name": "createChangeStream",
512+
"object": "collection0",
513+
"arguments": {
514+
"pipeline": [
515+
{
516+
"$project": {
517+
"operationType": 1,
518+
"ns": 1,
519+
"newField": "newFieldValue"
520+
}
521+
}
522+
]
523+
},
524+
"saveResultAsEntity": "changeStream0"
525+
},
526+
{
527+
"name": "insertOne",
528+
"object": "collection0",
529+
"arguments": {
530+
"document": {
531+
"_id": 1,
532+
"a": 1
533+
}
534+
}
535+
},
536+
{
537+
"name": "iterateUntilDocumentOrError",
538+
"object": "changeStream0",
539+
"expectResult": {
540+
"operationType": "insert",
541+
"ns": {
542+
"db": "database0",
543+
"coll": "collection0"
544+
},
545+
"newField": "newFieldValue"
546+
}
547+
}
548+
]
549+
},
550+
{
551+
"description": "Test new structure in ns document MUST NOT err",
552+
"operations": [
553+
{
554+
"name": "createChangeStream",
555+
"object": "collection0",
556+
"arguments": {
557+
"pipeline": [
558+
{
559+
"$project": {
560+
"operationType": "insert",
561+
"ns.viewOn": "db.coll"
562+
}
563+
}
564+
]
565+
},
566+
"saveResultAsEntity": "changeStream0"
567+
},
568+
{
569+
"name": "insertOne",
570+
"object": "collection0",
571+
"arguments": {
572+
"document": {
573+
"_id": 1,
574+
"a": 1
575+
}
576+
}
577+
},
578+
{
579+
"name": "iterateUntilDocumentOrError",
580+
"object": "changeStream0",
581+
"expectResult": {
582+
"operationType": "insert",
583+
"ns": {
584+
"viewOn": "db.coll"
585+
}
586+
}
587+
}
588+
]
589+
},
590+
{
591+
"description": "Test modified structure in ns document MUST NOT err",
592+
"operations": [
593+
{
594+
"name": "createChangeStream",
595+
"object": "collection0",
596+
"arguments": {
597+
"pipeline": [
598+
{
599+
"$project": {
600+
"operationType": "insert",
601+
"ns": {
602+
"db": "$ns.db",
603+
"coll": "$ns.coll",
604+
"viewOn": "db.coll"
605+
}
606+
}
607+
}
608+
]
609+
},
610+
"saveResultAsEntity": "changeStream0"
611+
},
612+
{
613+
"name": "insertOne",
614+
"object": "collection0",
615+
"arguments": {
616+
"document": {
617+
"_id": 1,
618+
"a": 1
619+
}
620+
}
621+
},
622+
{
623+
"name": "iterateUntilDocumentOrError",
624+
"object": "changeStream0",
625+
"expectResult": {
626+
"operationType": "insert",
627+
"ns": {
628+
"db": "database0",
629+
"coll": "collection0",
630+
"viewOn": "db.coll"
631+
}
632+
}
633+
}
634+
]
635+
},
636+
{
637+
"description": "Test server error on projecting out _id",
638+
"operations": [
639+
{
640+
"name": "createChangeStream",
641+
"object": "collection0",
642+
"arguments": {
643+
"pipeline": [
644+
{
645+
"$project": {
646+
"_id": 0
647+
}
648+
}
649+
]
650+
},
651+
"saveResultAsEntity": "changeStream0"
652+
},
653+
{
654+
"name": "insertOne",
655+
"object": "collection0",
656+
"arguments": {
657+
"document": {
658+
"_id": 1,
659+
"a": 1
660+
}
661+
}
662+
},
663+
{
664+
"name": "iterateUntilDocumentOrError",
665+
"object": "changeStream0",
666+
"expectError": {
667+
"errorCode": 280,
668+
"errorCodeName": "ChangeStreamFatalError",
669+
"errorLabelsContain": [
670+
"NonResumableChangeStreamError"
671+
]
672+
}
673+
}
674+
]
675+
},
676+
{
677+
"description": "Test projection in change stream returns expected fields",
678+
"operations": [
679+
{
680+
"name": "createChangeStream",
681+
"object": "collection0",
682+
"arguments": {
683+
"pipeline": [
684+
{
685+
"$project": {
686+
"optype": "$operationType",
687+
"ns": 1,
688+
"newField": "value"
689+
}
690+
}
691+
]
692+
},
693+
"saveResultAsEntity": "changeStream0"
694+
},
695+
{
696+
"name": "insertOne",
697+
"object": "collection0",
698+
"arguments": {
699+
"document": {
700+
"_id": 1,
701+
"a": 1
702+
}
703+
}
704+
},
705+
{
706+
"name": "iterateUntilDocumentOrError",
707+
"object": "changeStream0",
708+
"expectResult": {
709+
"optype": "insert",
710+
"ns": {
711+
"db": "database0",
712+
"coll": "collection0"
713+
},
714+
"newField": "value"
715+
}
716+
}
717+
]
469718
}
470719
]
471720
}

0 commit comments

Comments
 (0)