@@ -4,6 +4,7 @@ const EventEmitter = require('events');
4
4
const { MongoError, isResumableError } = require ( './error' ) ;
5
5
const { Cursor } = require ( './cursor' ) ;
6
6
const { relayEvents, maxWireVersion } = require ( './utils' ) ;
7
+ const maybePromise = require ( './utils' ) . maybePromise ;
7
8
const AggregateOperation = require ( './operations/aggregate' ) ;
8
9
9
10
const CHANGE_STREAM_OPTIONS = [ 'resumeAfter' , 'startAfter' , 'startAtOperationTime' , 'fullDocument' ] ;
@@ -124,10 +125,10 @@ class ChangeStream extends EventEmitter {
124
125
* @function ChangeStream.prototype.hasNext
125
126
* @param {ChangeStream~resultCallback } [callback] The result callback.
126
127
* @throws {MongoError }
127
- * @returns {Promise } returns Promise if no callback passed
128
+ * @returns {Promise|void } returns Promise if no callback passed
128
129
*/
129
130
hasNext ( callback ) {
130
- return this . cursor . hasNext ( callback ) ;
131
+ return maybePromise ( this . parent , callback , cb => this . cursor . hasNext ( cb ) ) ;
131
132
}
132
133
133
134
/**
@@ -136,19 +137,17 @@ class ChangeStream extends EventEmitter {
136
137
* @function ChangeStream.prototype.next
137
138
* @param {ChangeStream~resultCallback } [callback] The result callback.
138
139
* @throws {MongoError }
139
- * @returns {Promise } returns Promise if no callback passed
140
+ * @returns {Promise|void } returns Promise if no callback passed
140
141
*/
141
142
next ( callback ) {
142
- var self = this ;
143
- if ( this . isClosed ( ) ) {
144
- if ( callback ) return callback ( new Error ( 'Change Stream is not open.' ) , null ) ;
145
- return self . promiseLibrary . reject ( new Error ( 'Change Stream is not open.' ) ) ;
146
- }
147
-
148
- return this . cursor
149
- . next ( )
150
- . then ( change => processNewChange ( { changeStream : self , change, callback } ) )
151
- . catch ( error => processNewChange ( { changeStream : self , error, callback } ) ) ;
143
+ return maybePromise ( this . parent , callback , cb => {
144
+ if ( this . isClosed ( ) ) {
145
+ return cb ( new Error ( 'Change Stream is not open.' ) ) ;
146
+ }
147
+ this . cursor . next ( ( error , change ) => {
148
+ processNewChange ( { changeStream : this , error, change, callback : cb } ) ;
149
+ } ) ;
150
+ } ) ;
152
151
}
153
152
154
153
/**
0 commit comments