@@ -42,6 +42,8 @@ const kInitialized = Symbol('initialized');
42
42
const kClosed = Symbol ( 'closed' ) ;
43
43
/** @internal */
44
44
const kKilled = Symbol ( 'killed' ) ;
45
+ /** @internal */
46
+ const kInit = Symbol ( 'kInit' ) ;
45
47
46
48
/** @public */
47
49
export const CURSOR_FLAGS = [
@@ -77,7 +79,15 @@ export interface AbstractCursorOptions extends BSONSerializeOptions {
77
79
readConcern ?: ReadConcernLike ;
78
80
batchSize ?: number ;
79
81
maxTimeMS ?: number ;
80
- comment ?: Document | string ;
82
+ /**
83
+ * Comment to apply to the operation.
84
+ *
85
+ * In server versions pre-4.4, 'comment' must be string. A server
86
+ * error will be thrown if any other type is provided.
87
+ *
88
+ * In server versions 4.4 and above, 'comment' can be any valid BSON type.
89
+ */
90
+ comment ?: unknown ;
81
91
tailable ?: boolean ;
82
92
awaitData ?: boolean ;
83
93
noCursorTimeout ?: boolean ;
@@ -162,7 +172,9 @@ export abstract class AbstractCursor<
162
172
this [ kOptions ] . batchSize = options . batchSize ;
163
173
}
164
174
165
- if ( options . comment != null ) {
175
+ // we check for undefined specifically here to allow falsy values
176
+ // eslint-disable-next-line no-restricted-syntax
177
+ if ( options . comment !== undefined ) {
166
178
this [ kOptions ] . comment = options . comment ;
167
179
}
168
180
@@ -620,6 +632,65 @@ export abstract class AbstractCursor<
620
632
621
633
executeOperation ( this , getMoreOperation , callback ) ;
622
634
}
635
+
636
+ /**
637
+ * @internal
638
+ *
639
+ * This function is exposed for the unified test runner's createChangeStream
640
+ * operation. We cannot refactor to use the abstract _initialize method without
641
+ * a significant refactor.
642
+ */
643
+ [ kInit ] ( callback : Callback < TSchema | null > ) : void {
644
+ if ( this [ kSession ] == null ) {
645
+ if ( this [ kTopology ] . shouldCheckForSessionSupport ( ) ) {
646
+ return this [ kTopology ] . selectServer ( ReadPreference . primaryPreferred , { } , err => {
647
+ if ( err ) return callback ( err ) ;
648
+ return this [ kInit ] ( callback ) ;
649
+ } ) ;
650
+ } else if ( this [ kTopology ] . hasSessionSupport ( ) ) {
651
+ this [ kSession ] = this [ kTopology ] . startSession ( { owner : this , explicit : false } ) ;
652
+ }
653
+ }
654
+
655
+ this . _initialize ( this [ kSession ] , ( err , state ) => {
656
+ if ( state ) {
657
+ const response = state . response ;
658
+ this [ kServer ] = state . server ;
659
+ this [ kSession ] = state . session ;
660
+
661
+ if ( response . cursor ) {
662
+ this [ kId ] =
663
+ typeof response . cursor . id === 'number'
664
+ ? Long . fromNumber ( response . cursor . id )
665
+ : response . cursor . id ;
666
+
667
+ if ( response . cursor . ns ) {
668
+ this [ kNamespace ] = ns ( response . cursor . ns ) ;
669
+ }
670
+
671
+ this [ kDocuments ] = response . cursor . firstBatch ;
672
+ }
673
+
674
+ // When server responses return without a cursor document, we close this cursor
675
+ // and return the raw server response. This is often the case for explain commands
676
+ // for example
677
+ if ( this [ kId ] == null ) {
678
+ this [ kId ] = Long . ZERO ;
679
+ // TODO(NODE-3286): ExecutionResult needs to accept a generic parameter
680
+ this [ kDocuments ] = [ state . response as TODO_NODE_3286 ] ;
681
+ }
682
+ }
683
+
684
+ // the cursor is now initialized, even if an error occurred or it is dead
685
+ this [ kInitialized ] = true ;
686
+
687
+ if ( err || cursorIsDead ( this ) ) {
688
+ return cleanupCursor ( this , { error : err } , ( ) => callback ( err , nextDocument ( this ) ) ) ;
689
+ }
690
+
691
+ callback ( ) ;
692
+ } ) ;
693
+ }
623
694
}
624
695
625
696
function nextDocument < T > ( cursor : AbstractCursor ) : T | null | undefined {
@@ -653,61 +724,12 @@ function next<T>(cursor: AbstractCursor, blocking: boolean, callback: Callback<T
653
724
654
725
if ( cursorId == null ) {
655
726
// All cursors must operate within a session, one must be made implicitly if not explicitly provided
656
- if ( cursor [ kSession ] == null ) {
657
- if ( cursor [ kTopology ] . shouldCheckForSessionSupport ( ) ) {
658
- return cursor [ kTopology ] . selectServer ( ReadPreference . primaryPreferred , { } , err => {
659
- if ( err ) return callback ( err ) ;
660
- return next ( cursor , blocking , callback ) ;
661
- } ) ;
662
- } else if ( cursor [ kTopology ] . hasSessionSupport ( ) ) {
663
- cursor [ kSession ] = cursor [ kTopology ] . startSession ( { owner : cursor , explicit : false } ) ;
664
- }
665
- }
666
-
667
- cursor . _initialize ( cursor [ kSession ] , ( err , state ) => {
668
- if ( state ) {
669
- const response = state . response ;
670
- cursor [ kServer ] = state . server ;
671
- cursor [ kSession ] = state . session ;
672
-
673
- if ( response . cursor ) {
674
- cursor [ kId ] =
675
- typeof response . cursor . id === 'number'
676
- ? Long . fromNumber ( response . cursor . id )
677
- : response . cursor . id ;
678
-
679
- if ( response . cursor . ns ) {
680
- cursor [ kNamespace ] = ns ( response . cursor . ns ) ;
681
- }
682
-
683
- cursor [ kDocuments ] = response . cursor . firstBatch ;
684
- } else {
685
- // NOTE: This is for support of older servers (<3.2) which do not use commands
686
- cursor [ kId ] =
687
- typeof response . cursorId === 'number'
688
- ? Long . fromNumber ( response . cursorId )
689
- : response . cursorId ;
690
- cursor [ kDocuments ] = response . documents ;
691
- }
692
-
693
- // When server responses return without a cursor document, we close this cursor
694
- // and return the raw server response. This is often the case for explain commands
695
- // for example
696
- if ( cursor [ kId ] == null ) {
697
- cursor [ kId ] = Long . ZERO ;
698
- // TODO(NODE-3286): ExecutionResult needs to accept a generic parameter
699
- cursor [ kDocuments ] = [ state . response as TODO_NODE_3286 ] ;
700
- }
701
- }
702
-
703
- // the cursor is now initialized, even if an error occurred or it is dead
704
- cursor [ kInitialized ] = true ;
705
-
706
- if ( err || cursorIsDead ( cursor ) ) {
707
- return cleanupCursor ( cursor , { error : err } , ( ) => callback ( err , nextDocument ( cursor ) ) ) ;
727
+ cursor [ kInit ] ( ( err , value ) => {
728
+ if ( err ) return callback ( err ) ;
729
+ if ( value ) {
730
+ return callback ( undefined , value ) ;
708
731
}
709
-
710
- next ( cursor , blocking , callback ) ;
732
+ return next ( cursor , blocking , callback ) ;
711
733
} ) ;
712
734
713
735
return ;
0 commit comments