Skip to content

Commit d5bbdd8

Browse files
committed
test: add tests to check options consistency
1 parent d1e4f01 commit d5bbdd8

File tree

2 files changed

+56
-22
lines changed

2 files changed

+56
-22
lines changed

src/change_stream.ts

+9-17
Original file line numberDiff line numberDiff line change
@@ -727,26 +727,18 @@ export class ChangeStream<
727727
}, callback);
728728
}
729729

730-
[Symbol.asyncIterator](): AsyncIterator<TChange, void> {
731-
async function* nativeAsyncIterator(this: ChangeStream<TSchema, TChange>) {
732-
if (this.closed) {
733-
return;
734-
}
735-
736-
while (true) {
737-
if (!(await this.hasNext())) {
738-
break;
739-
}
740-
741-
yield await this.next();
742-
}
730+
async *[Symbol.asyncIterator](): AsyncIterator<TChange, void> {
731+
if (this.closed) {
732+
return;
743733
}
744734

745-
const iterator = nativeAsyncIterator.call(this);
735+
while (true) {
736+
if (!(await this.hasNext())) {
737+
break;
738+
}
746739

747-
return {
748-
next: () => iterator.next()
749-
};
740+
yield await this.next();
741+
}
750742
}
751743

752744
/** Is the cursor closed */

test/integration/change-streams/change_stream.test.ts

+47-5
Original file line numberDiff line numberDiff line change
@@ -1737,7 +1737,6 @@ describe('ChangeStream resumability', function () {
17371737
aggregateEvents = [];
17381738
});
17391739

1740-
// TODO(andymina): resumable error tests here
17411740
context('iterator api', function () {
17421741
context('#next', function () {
17431742
for (const { error, code, message } of resumableErrorCodes) {
@@ -2140,7 +2139,7 @@ describe('ChangeStream resumability', function () {
21402139
});
21412140
});
21422141

2143-
context('#asyncIterator', function () {
2142+
context.only('#asyncIterator', function () {
21442143
for (const { error, code, message } of resumableErrorCodes) {
21452144
it(
21462145
`resumes on error code ${code} (${error})`,
@@ -2241,13 +2240,51 @@ describe('ChangeStream resumability', function () {
22412240
}
22422241
);
22432242

2243+
it(
2244+
'maintains change stream options on resume',
2245+
{ requires: { topology: '!single', mongodb: '>=4.2' } },
2246+
async function () {
2247+
changeStream = collection.watch([], changeStreamResumeOptions);
2248+
await initIteratorMode(changeStream);
2249+
2250+
await client.db('admin').command({
2251+
configureFailPoint: is4_2Server(this.configuration.version)
2252+
? 'failCommand'
2253+
: 'failGetMoreAfterCursorCheckout',
2254+
mode: { times: 1 },
2255+
data: {
2256+
failCommands: ['getMore'],
2257+
errorCode: resumableErrorCodes[0].code,
2258+
errmsg: resumableErrorCodes[0].message
2259+
}
2260+
} as FailPoint);
2261+
2262+
expect(changeStream.cursor)
2263+
.to.have.property('options')
2264+
.that.containSubset(changeStreamResumeOptions);
2265+
2266+
await collection.insertOne({ city: 'New York City' });
2267+
2268+
let total_changes = 0;
2269+
for await (const change of changeStream) {
2270+
total_changes++;
2271+
if (total_changes === 1) {
2272+
changeStream.close();
2273+
}
2274+
}
2275+
2276+
expect(changeStream.cursor)
2277+
.to.have.property('options')
2278+
.that.containSubset(changeStreamResumeOptions);
2279+
}
2280+
);
2281+
22442282
context('when the error is not a resumable error', function () {
22452283
it(
22462284
'does not resume',
22472285
{ requires: { topology: '!single', mongodb: '>=4.2' } },
22482286
async function () {
22492287
changeStream = collection.watch([]);
2250-
await initIteratorMode(changeStream);
22512288

22522289
const unresumableErrorCode = 1000;
22532290
await client.db('admin').command({
@@ -2261,15 +2298,20 @@ describe('ChangeStream resumability', function () {
22612298
}
22622299
} as FailPoint);
22632300

2264-
await collection.insertOne({ city: 'New York City' });
2301+
await initIteratorMode(changeStream);
22652302

2303+
await collection.insertOne({ city: 'New York City' });
22662304
try {
22672305
for await (const change of changeStream) {
2268-
// should not run
2306+
// DOESN'T REACH
2307+
expect.fail('Change stream produced changes on an unresumable error');
22692308
}
22702309
} catch (error) {
22712310
expect(error).to.be.instanceOf(MongoServerError);
2311+
expect(aggregateEvents).to.have.lengthOf(1);
22722312
}
2313+
// fails here
2314+
expect.fail('Change stream did not throw unresumable error');
22732315
}
22742316
);
22752317
});

0 commit comments

Comments
 (0)