@@ -2141,13 +2141,41 @@ describe('ChangeStream resumability', function () {
2141
2141
} ) ;
2142
2142
2143
2143
context . only ( '#asyncIterator' , function ( ) {
2144
- /**
2145
- * TODO(andymina): three test cases to cover
2146
- *
2147
- * happy path - asyncIterable works
2148
- * unhappy path - it errors out
2149
- * resumable error - continues but also throws the error out
2150
- */
2144
+ for ( const { error, code, message } of resumableErrorCodes ) {
2145
+ it (
2146
+ `resumes on error code ${ code } (${ error } )` ,
2147
+ { requires : { topology : '!single' , mongodb : '>=4.2' } } ,
2148
+ async function ( ) {
2149
+ changeStream = collection . watch ( [ ] ) ;
2150
+ await initIteratorMode ( changeStream ) ;
2151
+
2152
+ await client . db ( 'admin' ) . command ( {
2153
+ configureFailPoint : is4_2Server ( this . configuration . version )
2154
+ ? 'failCommand'
2155
+ : 'failGetMoreAfterCursorCheckout' ,
2156
+ mode : { times : 1 } ,
2157
+ data : {
2158
+ failCommands : [ 'getMore' ] ,
2159
+ errorCode : code ,
2160
+ errmsg : message
2161
+ }
2162
+ } as FailPoint ) ;
2163
+
2164
+ await collection . insertOne ( { city : 'New York City' } ) ;
2165
+
2166
+ let total_changes = 0 ;
2167
+ for await ( const change of changeStream ) {
2168
+ total_changes ++ ;
2169
+ if ( total_changes === 1 ) {
2170
+ changeStream . close ( ) ;
2171
+ }
2172
+ }
2173
+
2174
+ expect ( aggregateEvents ) . to . have . lengthOf ( 2 ) ;
2175
+ }
2176
+ ) ;
2177
+ }
2178
+
2151
2179
for ( const { error, code, message } of resumableErrorCodes ) {
2152
2180
it (
2153
2181
`resumes on error code ${ code } (${ error } )` ,
@@ -2161,10 +2189,8 @@ describe('ChangeStream resumability', function () {
2161
2189
// In order to test the resume, we need to ensure that at least one document has
2162
2190
// been iterated so we have a resume token to resume on.
2163
2191
2164
- // insert the doc
2165
2192
await collection . insertOne ( { city : 'New York City' } ) ;
2166
2193
2167
- // fail the call
2168
2194
const mock = sinon
2169
2195
. stub ( changeStream . cursor , '_getMore' )
2170
2196
. callsFake ( ( _batchSize , callback ) => {
@@ -2190,58 +2216,64 @@ describe('ChangeStream resumability', function () {
2190
2216
) ;
2191
2217
}
2192
2218
2193
- // happy path
2194
- it ( 'happy path' , { requires : { topology : '!single' , mongodb : '>=4.2' } } , async function ( ) {
2195
- changeStream = collection . watch ( [ ] ) ;
2196
- await initIteratorMode ( changeStream ) ;
2197
-
2198
- const docs = [ { city : 'New York City' } , { city : 'Seattle' } , { city : 'Boston' } ] ;
2199
- await collection . insertMany ( docs ) ;
2200
-
2201
- let count = 0 ;
2202
- for await ( const change of changeStream ) {
2203
- const { fullDocument } = change ;
2204
- expect ( fullDocument . city ) . to . equal ( docs [ count ] . city ) ;
2205
-
2206
- count ++ ;
2207
- if ( count === 3 ) {
2208
- expect ( docs . length ) . to . equal ( count ) ;
2209
- changeStream . close ( ) ;
2210
- }
2211
- }
2212
- } ) ;
2213
-
2214
- // unhappy path
2215
2219
it (
2216
- 'unhappy path ' ,
2220
+ 'can iterate through changes ' ,
2217
2221
{ requires : { topology : '!single' , mongodb : '>=4.2' } } ,
2218
2222
async function ( ) {
2219
2223
changeStream = collection . watch ( [ ] ) ;
2220
2224
await initIteratorMode ( changeStream ) ;
2221
2225
2222
- const unresumableErrorCode = 1000 ;
2223
- await client . db ( 'admin' ) . command ( {
2224
- configureFailPoint : is4_2Server ( this . configuration . version )
2225
- ? 'failCommand'
2226
- : 'failGetMoreAfterCursorCheckout' ,
2227
- mode : { times : 1 } ,
2228
- data : {
2229
- failCommands : [ 'getMore' ] ,
2230
- errorCode : unresumableErrorCode
2231
- }
2232
- } as FailPoint ) ;
2226
+ const docs = [ { city : 'New York City' } , { city : 'Seattle' } , { city : 'Boston' } ] ;
2227
+ await collection . insertMany ( docs ) ;
2233
2228
2234
- await collection . insertOne ( { city : 'New York City' } ) ;
2229
+ let count = 0 ;
2230
+ for await ( const change of changeStream ) {
2231
+ const { fullDocument } = change ;
2232
+ expect ( fullDocument . city ) . to . equal ( docs [ count ] . city ) ;
2235
2233
2236
- try {
2237
- for await ( const change of changeStream ) {
2238
- // should not run
2234
+ count ++ ;
2235
+ if ( count === 3 ) {
2236
+ changeStream . close ( ) ;
2239
2237
}
2240
- } catch ( error ) {
2241
- expect ( error ) . to . be . instanceOf ( MongoServerError ) ;
2242
2238
}
2239
+
2240
+ expect ( docs . length ) . to . equal ( count ) ;
2243
2241
}
2244
2242
) ;
2243
+
2244
+ context ( 'when the error is not a resumable error' , function ( ) {
2245
+ it (
2246
+ 'does not resume' ,
2247
+ { requires : { topology : '!single' , mongodb : '>=4.2' } } ,
2248
+ async function ( ) {
2249
+ changeStream = collection . watch ( [ ] ) ;
2250
+ await initIteratorMode ( changeStream ) ;
2251
+
2252
+ const unresumableErrorCode = 1000 ;
2253
+ await client . db ( 'admin' ) . command ( {
2254
+ configureFailPoint : is4_2Server ( this . configuration . version )
2255
+ ? 'failCommand'
2256
+ : 'failGetMoreAfterCursorCheckout' ,
2257
+ mode : { times : 1 } ,
2258
+ data : {
2259
+ failCommands : [ 'getMore' ] ,
2260
+ errorCode : unresumableErrorCode
2261
+ }
2262
+ } as FailPoint ) ;
2263
+
2264
+ await collection . insertOne ( { city : 'New York City' } ) ;
2265
+
2266
+ try {
2267
+ for await ( const change of changeStream ) {
2268
+ // should not run
2269
+ }
2270
+ } catch ( error ) {
2271
+ expect ( error ) . to . be . instanceOf ( MongoServerError ) ;
2272
+ }
2273
+ }
2274
+ ) ;
2275
+ } ) ;
2276
+
2245
2277
} ) ;
2246
2278
} ) ;
2247
2279
0 commit comments