Skip to content

Commit 00baff4

Browse files
committed
feat: add asyncIterator
1 parent 5f37cb6 commit 00baff4

File tree

2 files changed

+55
-0
lines changed

2 files changed

+55
-0
lines changed

src/change_stream.ts

+22
Original file line numberDiff line numberDiff line change
@@ -727,6 +727,28 @@ export class ChangeStream<
727727
}, callback);
728728
}
729729

730+
[Symbol.asyncIterator](): AsyncIterator<TChange, void> {
731+
async function* nativeAsyncIterator(this: ChangeStream<TSchema, TChange>) {
732+
if (this.closed) {
733+
return;
734+
}
735+
736+
while (true) {
737+
if (!(await this.hasNext())) {
738+
break;
739+
}
740+
741+
yield await this.next();
742+
}
743+
}
744+
745+
const iterator = nativeAsyncIterator.call(this);
746+
747+
return {
748+
next: () => iterator.next()
749+
};
750+
}
751+
730752
/** Is the cursor closed */
731753
get closed(): boolean {
732754
return this[kClosed] || this.cursor.closed;

test/integration/change-streams/change_stream.test.ts

+33
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,9 @@ import { promisify } from 'util';
1010
import {
1111
AbstractCursor,
1212
ChangeStream,
13+
ChangeStreamCursor,
14+
ChangeStreamDocument,
15+
ChangeStreamInsertDocument,
1316
ChangeStreamOptions,
1417
Collection,
1518
CommandStartedEvent,
@@ -1737,6 +1740,7 @@ describe('ChangeStream resumability', function () {
17371740
aggregateEvents = [];
17381741
});
17391742

1743+
// TODO(andymina): resumable error tests here
17401744
context('iterator api', function () {
17411745
context('#next', function () {
17421746
for (const { error, code, message } of resumableErrorCodes) {
@@ -2138,6 +2142,35 @@ describe('ChangeStream resumability', function () {
21382142
);
21392143
});
21402144
});
2145+
2146+
context.only('#asyncIterator', function () {
2147+
/**
2148+
* TODO(andymina): three test cases to cover
2149+
*
2150+
* happy path - asyncIterable works
2151+
* unhappy path - it errors out
2152+
* resumable error - continues but also throws the error out
2153+
*/
2154+
// happy path
2155+
it('happy path', async function () {
2156+
changeStream = collection.watch([]);
2157+
await initIteratorMode(changeStream);
2158+
2159+
const docs = [{ city: 'New York City' }, { city: 'Seattle' }, { city: 'Boston' }];
2160+
await collection.insertMany(docs);
2161+
2162+
let count = 0;
2163+
for await (const change of changeStream) {
2164+
const { fullDocument } = change;
2165+
expect(fullDocument.city).to.equal(docs[count].city);
2166+
2167+
count++;
2168+
if (count === 3) {
2169+
changeStream.close();
2170+
}
2171+
}
2172+
});
2173+
});
21412174
});
21422175

21432176
describe('event emitter based iteration', function () {

0 commit comments

Comments
 (0)