@@ -10,9 +10,6 @@ import { promisify } from 'util';
10
10
import {
11
11
AbstractCursor ,
12
12
ChangeStream ,
13
- ChangeStreamCursor ,
14
- ChangeStreamDocument ,
15
- ChangeStreamInsertDocument ,
16
13
ChangeStreamOptions ,
17
14
Collection ,
18
15
CommandStartedEvent ,
@@ -2151,8 +2148,50 @@ describe('ChangeStream resumability', function () {
2151
2148
* unhappy path - it errors out
2152
2149
* resumable error - continues but also throws the error out
2153
2150
*/
2151
+ for ( const { error, code, message } of resumableErrorCodes ) {
2152
+ it (
2153
+ `resumes on error code ${ code } (${ error } )` ,
2154
+ { requires : { topology : '!single' , mongodb : '<4.2' } } ,
2155
+ async function ( ) {
2156
+ changeStream = collection . watch ( [ ] ) ;
2157
+ await initIteratorMode ( changeStream ) ;
2158
+
2159
+ // on 3.6 servers, no postBatchResumeToken is sent back in the initial aggregate response.
2160
+ // This means that a resume token isn't cached until the first change has been iterated.
2161
+ // In order to test the resume, we need to ensure that at least one document has
2162
+ // been iterated so we have a resume token to resume on.
2163
+
2164
+ // insert the doc
2165
+ await collection . insertOne ( { city : 'New York City' } ) ;
2166
+
2167
+ // fail the call
2168
+ const mock = sinon
2169
+ . stub ( changeStream . cursor , '_getMore' )
2170
+ . callsFake ( ( _batchSize , callback ) => {
2171
+ mock . restore ( ) ;
2172
+ const error = new MongoServerError ( { message } ) ;
2173
+ error . code = code ;
2174
+ callback ( error ) ;
2175
+ } ) ;
2176
+
2177
+ // insert another doc
2178
+ await collection . insertOne ( { city : 'New York City' } ) ;
2179
+
2180
+ let total_changes = 0 ;
2181
+ for await ( const change of changeStream ) {
2182
+ total_changes ++ ;
2183
+ if ( total_changes === 2 ) {
2184
+ changeStream . close ( ) ;
2185
+ }
2186
+ }
2187
+
2188
+ expect ( aggregateEvents ) . to . have . lengthOf ( 2 ) ;
2189
+ }
2190
+ ) ;
2191
+ }
2192
+
2154
2193
// happy path
2155
- it ( 'happy path' , async function ( ) {
2194
+ it ( 'happy path' , { requires : { topology : '!single' , mongodb : '>=4.2' } } , async function ( ) {
2156
2195
changeStream = collection . watch ( [ ] ) ;
2157
2196
await initIteratorMode ( changeStream ) ;
2158
2197
@@ -2166,10 +2205,43 @@ describe('ChangeStream resumability', function () {
2166
2205
2167
2206
count ++ ;
2168
2207
if ( count === 3 ) {
2208
+ expect ( docs . length ) . to . equal ( count ) ;
2169
2209
changeStream . close ( ) ;
2170
2210
}
2171
2211
}
2172
2212
} ) ;
2213
+
2214
+ // unhappy path
2215
+ it (
2216
+ 'unhappy path' ,
2217
+ { requires : { topology : '!single' , mongodb : '>=4.2' } } ,
2218
+ async function ( ) {
2219
+ changeStream = collection . watch ( [ ] ) ;
2220
+ await initIteratorMode ( changeStream ) ;
2221
+
2222
+ const unresumableErrorCode = 1000 ;
2223
+ await client . db ( 'admin' ) . command ( {
2224
+ configureFailPoint : is4_2Server ( this . configuration . version )
2225
+ ? 'failCommand'
2226
+ : 'failGetMoreAfterCursorCheckout' ,
2227
+ mode : { times : 1 } ,
2228
+ data : {
2229
+ failCommands : [ 'getMore' ] ,
2230
+ errorCode : unresumableErrorCode
2231
+ }
2232
+ } as FailPoint ) ;
2233
+
2234
+ await collection . insertOne ( { city : 'New York City' } ) ;
2235
+
2236
+ try {
2237
+ for await ( const change of changeStream ) {
2238
+ // should not run
2239
+ }
2240
+ } catch ( error ) {
2241
+ expect ( error ) . to . be . instanceOf ( MongoServerError ) ;
2242
+ }
2243
+ }
2244
+ ) ;
2173
2245
} ) ;
2174
2246
} ) ;
2175
2247
0 commit comments