@@ -53,6 +53,9 @@ describe('Change Streams', function () {
53
53
let changeStream : ChangeStream ;
54
54
let db : Db ;
55
55
56
+ const is4_2Server = ( serverVersion : string ) =>
57
+ gte ( serverVersion , '4.2.0' ) && lt ( serverVersion , '4.3.0' ) ;
58
+
56
59
beforeEach ( async function ( ) {
57
60
const configuration = this . configuration ;
58
61
client = configuration . newClient ( ) ;
@@ -999,6 +1002,41 @@ describe('Change Streams', function () {
999
1002
changeStream . close ( ) ;
1000
1003
}
1001
1004
) ;
1005
+
1006
+ context ( 'when an error is thrown' , function ( ) {
1007
+ it (
1008
+ 'should close the change stream' ,
1009
+ { requires : { topology : '!single' , mongodb : '>=4.2' } } ,
1010
+ async function ( ) {
1011
+ changeStream = collection . watch ( [ ] ) ;
1012
+ await initIteratorMode ( changeStream ) ;
1013
+ const changeStreamIterator = changeStream [ Symbol . asyncIterator ] ( ) ;
1014
+
1015
+ const unresumableErrorCode = 1000 ;
1016
+ await client . db ( 'admin' ) . command ( {
1017
+ configureFailPoint : is4_2Server ( this . configuration . version )
1018
+ ? 'failCommand'
1019
+ : 'failGetMoreAfterCursorCheckout' ,
1020
+ mode : { times : 1 } ,
1021
+ data : {
1022
+ failCommands : [ 'getMore' ] ,
1023
+ errorCode : unresumableErrorCode
1024
+ }
1025
+ } as FailPoint ) ;
1026
+
1027
+ await collection . insertOne ( { city : 'New York City' } ) ;
1028
+ try {
1029
+ const change = await changeStreamIterator . next ( ) ;
1030
+ expect . fail (
1031
+ 'Change stream did not throw unresumable error and did not produce any events'
1032
+ ) ;
1033
+ } catch ( error ) {
1034
+ expect ( changeStream . closed ) . to . be . true ;
1035
+ expect ( changeStream . cursor . closed ) . to . be . true ;
1036
+ }
1037
+ }
1038
+ ) ;
1039
+ } ) ;
1002
1040
} ) ;
1003
1041
} ) ;
1004
1042
@@ -2271,6 +2309,8 @@ describe('ChangeStream resumability', function () {
2271
2309
{ requires : { topology : '!single' , mongodb : '>=4.2' } } ,
2272
2310
async function ( ) {
2273
2311
changeStream = collection . watch ( [ ] ) ;
2312
+ await initIteratorMode ( changeStream ) ;
2313
+ const changeStreamIterator = changeStream [ Symbol . asyncIterator ] ( ) ;
2274
2314
2275
2315
const unresumableErrorCode = 1000 ;
2276
2316
await client . db ( 'admin' ) . command ( {
@@ -2284,9 +2324,6 @@ describe('ChangeStream resumability', function () {
2284
2324
}
2285
2325
} as FailPoint ) ;
2286
2326
2287
- await initIteratorMode ( changeStream ) ;
2288
- const changeStreamIterator = changeStream [ Symbol . asyncIterator ] ( ) ;
2289
-
2290
2327
await collection . insertOne ( { city : 'New York City' } ) ;
2291
2328
try {
2292
2329
const change = await changeStreamIterator . next ( ) ;
0 commit comments