diff --git a/src/bulk/common.ts b/src/bulk/common.ts index 6ef8699892a..8e59d8a985b 100644 --- a/src/bulk/common.ts +++ b/src/bulk/common.ts @@ -137,7 +137,7 @@ export interface BulkResult { * Keeps the state of a unordered batch so we can rewrite the results * correctly after command execution * - * @internal + * @public */ export class Batch { originalZeroIndex: number; diff --git a/src/change_stream.ts b/src/change_stream.ts index 5d8582e3af5..ae3ec6ff570 100644 --- a/src/change_stream.ts +++ b/src/change_stream.ts @@ -46,6 +46,7 @@ const CHANGE_DOMAIN_TYPES = { const NO_RESUME_TOKEN_ERROR = new MongoError( 'A change stream document has been received that lacks a resume token (_id).' ); +const NO_CURSOR_ERROR = new MongoError('ChangeStream has no cursor'); const CHANGESTREAM_CLOSED_ERROR = new MongoError('ChangeStream is closed'); /** @public */ @@ -287,8 +288,7 @@ export class ChangeStream extends EventEmitter { next(callback?: Callback): Promise | void { return maybePromise(callback, cb => { getCursor(this, (err, cursor) => { - if (err) return cb(err); // failed to resume, raise an error - if (!cursor) return cb(new MongoError('Cursor is undefined')); + if (err || !cursor) return cb(err); // failed to resume, raise an error cursor.next((error, change) => { if (error) { this[kResumeQueue].push(() => this.next(cb)); @@ -330,11 +330,23 @@ export class ChangeStream extends EventEmitter { */ stream(options?: CursorStreamOptions): Readable { this.streamOptions = options; - if (!this.cursor) { - throw new MongoError('ChangeStream has no cursor, unable to stream'); - } + if (!this.cursor) throw NO_CURSOR_ERROR; return this.cursor.stream(options); } + + /** + * Try to get the next available document from the Change Stream's cursor or `null` if an empty batch is returned + */ + tryNext(): Promise; + tryNext(callback: Callback): void; + tryNext(callback?: Callback): Promise | void { + return maybePromise(callback, cb => { + getCursor(this, (err, cursor) => { + if (err || !cursor) return cb(err); // failed to resume, raise an error + return cursor.tryNext(cb); + }); + }); + } } /** @internal */ @@ -707,11 +719,16 @@ function getCursor(changeStream: ChangeStream, callback: Callback; tryNext(callback: Callback): void; diff --git a/test/functional/change_stream.test.js b/test/functional/change_stream.test.js index cb77a6cb956..6f465f85046 100644 --- a/test/functional/change_stream.test.js +++ b/test/functional/change_stream.test.js @@ -1939,8 +1939,7 @@ describe('Change Streams', function () { changeStream.on('change', () => { counter += 1; if (counter === 2) { - changeStream.close(); - setTimeout(() => close()); + changeStream.close(close); } else if (counter >= 3) { close(new Error('should not have received more than 2 events')); }