@@ -15,12 +15,14 @@ import {
15
15
CommandStartedEvent ,
16
16
Db ,
17
17
Long ,
18
+ MongoAPIError ,
18
19
MongoChangeStreamError ,
19
20
MongoClient ,
20
21
MongoServerError ,
21
22
ReadPreference ,
22
23
ResumeToken
23
24
} from '../../../src' ;
25
+ import { next } from '../../../src/cursor/abstract_cursor' ;
24
26
import { isHello } from '../../../src/utils' ;
25
27
import * as mock from '../../tools/mongodb-mock/index' ;
26
28
import {
@@ -995,48 +997,125 @@ describe('Change Streams', function () {
995
997
996
998
for ( const doc of docs ) {
997
999
const change = await changeStreamIterator . next ( ) ;
998
- const { fullDocument } = change ;
1000
+ const { fullDocument } = change . value ;
999
1001
expect ( fullDocument . city ) . to . equal ( doc . city ) ;
1000
1002
}
1001
1003
1002
1004
changeStream . close ( ) ;
1003
1005
}
1004
1006
) ;
1005
1007
1006
- context ( 'when an error is thrown' , function ( ) {
1007
- it (
1008
- 'should close the change stream' ,
1009
- { requires : { topology : '!single' , mongodb : '>=4.2' } } ,
1010
- async function ( ) {
1011
- changeStream = collection . watch ( [ ] ) ;
1012
- await initIteratorMode ( changeStream ) ;
1013
- const changeStreamIterator = changeStream [ Symbol . asyncIterator ] ( ) ;
1008
+ it (
1009
+ 'should close the change stream when return is called' ,
1010
+ { requires : { topology : '!single' } } ,
1011
+ async function ( ) {
1012
+ changeStream = collection . watch ( [ ] ) ;
1013
+ await initIteratorMode ( changeStream ) ;
1014
1014
1015
- const unresumableErrorCode = 1000 ;
1016
- await client . db ( 'admin' ) . command ( {
1017
- configureFailPoint : is4_2Server ( this . configuration . version )
1018
- ? 'failCommand'
1019
- : 'failGetMoreAfterCursorCheckout' ,
1020
- mode : { times : 1 } ,
1021
- data : {
1022
- failCommands : [ 'getMore' ] ,
1023
- errorCode : unresumableErrorCode
1024
- }
1025
- } as FailPoint ) ;
1015
+ const docs = [ { city : 'New York City' } , { city : 'Seattle' } , { city : 'Boston' } ] ;
1016
+ await collection . insertMany ( docs ) ;
1026
1017
1027
- await collection . insertOne ( { city : 'New York City' } ) ;
1028
- try {
1029
- const change = await changeStreamIterator . next ( ) ;
1030
- expect . fail (
1031
- 'Change stream did not throw unresumable error and did not produce any events'
1032
- ) ;
1033
- } catch ( error ) {
1034
- expect ( changeStream . closed ) . to . be . true ;
1035
- expect ( changeStream . cursor . closed ) . to . be . true ;
1018
+ const changeStreamAsyncIteratorHelper = async function ( changeStream : ChangeStream ) {
1019
+ // eslint-disable-next-line @typescript-eslint/no-unused-vars
1020
+ for await ( const change of changeStream ) {
1021
+ return ;
1022
+ }
1023
+ } ;
1024
+
1025
+ await changeStreamAsyncIteratorHelper ( changeStream ) ;
1026
+ expect ( changeStream . closed ) . to . be . true ;
1027
+ expect ( changeStream . cursor . closed ) . to . be . true ;
1028
+ }
1029
+ ) ;
1030
+
1031
+ it (
1032
+ 'should close the change stream when an error is thrown' ,
1033
+ { requires : { topology : '!single' , mongodb : '>=4.2' } } ,
1034
+ async function ( ) {
1035
+ changeStream = collection . watch ( [ ] ) ;
1036
+ await initIteratorMode ( changeStream ) ;
1037
+ const changeStreamIterator = changeStream [ Symbol . asyncIterator ] ( ) ;
1038
+
1039
+ const unresumableErrorCode = 1000 ;
1040
+ await client . db ( 'admin' ) . command ( {
1041
+ configureFailPoint : is4_2Server ( this . configuration . version )
1042
+ ? 'failCommand'
1043
+ : 'failGetMoreAfterCursorCheckout' ,
1044
+ mode : { times : 1 } ,
1045
+ data : {
1046
+ failCommands : [ 'getMore' ] ,
1047
+ errorCode : unresumableErrorCode
1036
1048
}
1049
+ } as FailPoint ) ;
1050
+
1051
+ await collection . insertOne ( { city : 'New York City' } ) ;
1052
+ try {
1053
+ await changeStreamIterator . next ( ) ;
1054
+ expect . fail (
1055
+ 'Change stream did not throw unresumable error and did not produce any events'
1056
+ ) ;
1057
+ } catch ( error ) {
1058
+ expect ( changeStream . closed ) . to . be . true ;
1059
+ expect ( changeStream . cursor . closed ) . to . be . true ;
1037
1060
}
1038
- ) ;
1039
- } ) ;
1061
+ }
1062
+ ) ;
1063
+
1064
+ it (
1065
+ 'should not produce events on closed stream' ,
1066
+ { requires : { topology : '!single' } } ,
1067
+ async function ( ) {
1068
+ changeStream = collection . watch ( [ ] ) ;
1069
+ changeStream . close ( ) ;
1070
+
1071
+ const changeStreamIterator = changeStream [ Symbol . asyncIterator ] ( ) ;
1072
+ const change = await changeStreamIterator . next ( ) ;
1073
+
1074
+ expect ( change . value ) . to . be . undefined ;
1075
+ }
1076
+ ) ;
1077
+
1078
+ it (
1079
+ 'cannot be used with emitter-based iteration' ,
1080
+ { requires : { topology : '!single' } } ,
1081
+ async function ( ) {
1082
+ changeStream = collection . watch ( [ ] ) ;
1083
+ changeStream . on ( 'change' , sinon . stub ( ) ) ;
1084
+ const changeStreamIterator = changeStream [ Symbol . asyncIterator ] ( ) ;
1085
+
1086
+ try {
1087
+ await changeStreamIterator . next ( ) ;
1088
+ expect . fail ( 'Async iterator was used with emitter-based iteration' ) ;
1089
+ } catch ( error ) {
1090
+ expect ( error ) . to . be . instanceOf ( MongoAPIError ) ;
1091
+ }
1092
+ }
1093
+ ) ;
1094
+
1095
+ it . only (
1096
+ 'can be used with raw iterator API' ,
1097
+ { requires : { topology : '!single' } } ,
1098
+ async function ( ) {
1099
+ changeStream = collection . watch ( [ ] ) ;
1100
+ await initIteratorMode ( changeStream ) ;
1101
+ const changeStreamIterator = changeStream [ Symbol . asyncIterator ] ( ) ;
1102
+
1103
+ const docs = [ { city : 'Los Angeles' } , { city : 'Miami' } ] ;
1104
+ await collection . insertMany ( docs ) ;
1105
+
1106
+ await changeStream . next ( ) ;
1107
+
1108
+ try {
1109
+ const change = await changeStreamIterator . next ( ) ;
1110
+ expect ( change . value ) . to . not . be . undefined ;
1111
+
1112
+ const { fullDocument } = change . value ;
1113
+ expect ( fullDocument . city ) . to . equal ( docs [ 1 ] . city ) ;
1114
+ } catch ( error ) {
1115
+ expect . fail ( 'Async could not be used with raw iterator API' )
1116
+ }
1117
+ }
1118
+ ) ;
1040
1119
} ) ;
1041
1120
} ) ;
1042
1121
0 commit comments