1
+ import type { Long } from 'bson' ;
1
2
import Denque = require( 'denque' ) ;
2
3
import type { Readable } from 'stream' ;
3
4
@@ -20,7 +21,7 @@ import {
20
21
MongoRuntimeError
21
22
} from './error' ;
22
23
import { MongoClient } from './mongo_client' ;
23
- import { InferIdType , TypedEventEmitter } from './mongo_types' ;
24
+ import { InferIdType , TODO_NODE_3286 , TypedEventEmitter } from './mongo_types' ;
24
25
import { AggregateOperation , AggregateOptions } from './operations/aggregate' ;
25
26
import type { CollationOptions , OperationParent } from './operations/command' ;
26
27
import { executeOperation , ExecutionResult } from './operations/execute_operation' ;
@@ -112,6 +113,18 @@ export interface PipeOptions {
112
113
end ?: boolean ;
113
114
}
114
115
116
+ /** @internal */
117
+ type ChangeStreamAggregateRawResult < TChange > = {
118
+ $clusterTime : { clusterTime : Timestamp } ;
119
+ cursor : {
120
+ postBatchResumeToken : unknown ;
121
+ ns : string ;
122
+ id : Long ;
123
+ } & ( { firstBatch : TChange [ ] } | { nextBatch : TChange [ ] } ) ;
124
+ ok : 1 ;
125
+ operationTime : Timestamp ;
126
+ } ;
127
+
115
128
/**
116
129
* Options that can be passed to a ChangeStream. Note that startAfter, resumeAfter, and startAtOperationTime are all mutually exclusive, and the server will error if more than one is specified.
117
130
* @public
@@ -347,7 +360,7 @@ export type ChangeStreamEvents<
347
360
TChange extends ChangeStreamDocument < TSchema > = ChangeStreamDocument < TSchema >
348
361
> = {
349
362
resumeTokenChanged ( token : ResumeToken ) : void ;
350
- init ( response : TChange ) : void ;
363
+ init ( response : any ) : void ;
351
364
more ( response ?: any ) : void ;
352
365
response ( ) : void ;
353
366
end ( ) : void ;
@@ -360,7 +373,7 @@ export type ChangeStreamEvents<
360
373
* @public
361
374
*/
362
375
export class ChangeStream <
363
- TSchema = Document ,
376
+ TSchema extends Document = Document ,
364
377
TChange extends ChangeStreamDocument < TSchema > = ChangeStreamDocument < TSchema >
365
378
> extends TypedEventEmitter < ChangeStreamEvents < TSchema , TChange > > {
366
379
pipeline : Document [ ] ;
@@ -774,25 +787,25 @@ export class ChangeStream<
774
787
* @param changeStream - the parent ChangeStream
775
788
* @param err - error getting a new cursor
776
789
*/
777
- private _processResumeQueue ( err ?: Error ) {
790
+ private _processResumeQueue ( err ?: Error ) : void {
778
791
while ( this [ kResumeQueue ] . length ) {
779
792
const request = this [ kResumeQueue ] . pop ( ) ;
780
793
if ( ! request ) break ; // Should never occur but TS can't use the length check in the while condition
781
794
782
- if ( ! err ) {
783
- if ( this [ kClosed ] ) {
784
- // TODO(NODE-3485): Replace with MongoChangeStreamClosedError
785
- request ( new MongoAPIError ( CHANGESTREAM_CLOSED_ERROR ) ) ;
786
- return ;
787
- }
788
- if ( ! this . cursor ) {
789
- request ( new MongoChangeStreamError ( NO_CURSOR_ERROR ) ) ;
790
- return ;
791
- } else {
792
- request ( err , this . cursor ) ;
793
- }
795
+ if ( err != null ) {
796
+ return request ( err ) ;
797
+ }
798
+
799
+ if ( this [ kClosed ] ) {
800
+ // TODO(NODE-3485): Replace with MongoChangeStreamClosedError
801
+ return request ( new MongoAPIError ( CHANGESTREAM_CLOSED_ERROR ) ) ;
802
+ }
803
+
804
+ if ( ! this . cursor ) {
805
+ return request ( new MongoChangeStreamError ( NO_CURSOR_ERROR ) ) ;
794
806
}
795
- request ( err ) ;
807
+
808
+ return request ( undefined , this . cursor ) ;
796
809
}
797
810
}
798
811
}
@@ -878,12 +891,14 @@ export class ChangeStreamCursor<
878
891
this . hasReceived = true ;
879
892
}
880
893
881
- _processBatch ( batchName : string , response ?: Document ) : void {
882
- const cursor = response ? .cursor || { } ;
894
+ _processBatch ( response : ChangeStreamAggregateRawResult < TChange > ) : void {
895
+ const cursor = response . cursor ;
883
896
if ( cursor . postBatchResumeToken ) {
884
- this . postBatchResumeToken = cursor . postBatchResumeToken ;
897
+ this . postBatchResumeToken = response . cursor . postBatchResumeToken ;
885
898
886
- if ( cursor [ batchName ] . length === 0 ) {
899
+ const batch =
900
+ 'firstBatch' in response . cursor ? response . cursor . firstBatch : response . cursor . nextBatch ;
901
+ if ( batch . length === 0 ) {
887
902
this . resumeToken = cursor . postBatchResumeToken ;
888
903
}
889
904
}
@@ -896,37 +911,39 @@ export class ChangeStreamCursor<
896
911
}
897
912
898
913
_initialize ( session : ClientSession , callback : Callback < ExecutionResult > ) : void {
899
- const aggregateOperation = new AggregateOperation < TChange > ( this . namespace , this . pipeline , {
914
+ const aggregateOperation = new AggregateOperation ( this . namespace , this . pipeline , {
900
915
...this . cursorOptions ,
901
916
...this . options ,
902
917
session
903
918
} ) ;
904
919
905
- executeOperation ( session , aggregateOperation , ( err , response ) => {
906
- if ( err || response == null ) {
907
- return callback ( err ) ;
908
- }
920
+ executeOperation < any , ChangeStreamAggregateRawResult < TChange > > (
921
+ session ,
922
+ aggregateOperation ,
923
+ ( err , response ) => {
924
+ if ( err || response == null ) {
925
+ return callback ( err ) ;
926
+ }
909
927
910
- const server = aggregateOperation . server ;
911
- if (
912
- this . startAtOperationTime == null &&
913
- this . resumeAfter == null &&
914
- this . startAfter == null &&
915
- maxWireVersion ( server ) >= 7
916
- ) {
917
- // eslint-disable-next-line @typescript-eslint/ban-ts-comment
918
- // @ts -ignore: TODO REMOVE ME! IMPROVE TYPES
919
- this . startAtOperationTime = response . operationTime ;
920
- }
928
+ const server = aggregateOperation . server ;
929
+ if (
930
+ this . startAtOperationTime == null &&
931
+ this . resumeAfter == null &&
932
+ this . startAfter == null &&
933
+ maxWireVersion ( server ) >= 7
934
+ ) {
935
+ this . startAtOperationTime = response . operationTime ;
936
+ }
921
937
922
- this . _processBatch ( 'firstBatch' , response ) ;
938
+ this . _processBatch ( response ) ;
923
939
924
- this . emit ( ChangeStream . INIT , response ) ;
925
- this . emit ( ChangeStream . RESPONSE ) ;
940
+ this . emit ( ChangeStream . INIT , response ) ;
941
+ this . emit ( ChangeStream . RESPONSE ) ;
926
942
927
- // TODO: NODE-2882
928
- callback ( undefined , { server, session, response } ) ;
929
- } ) ;
943
+ // TODO: NODE-2882
944
+ callback ( undefined , { server, session, response } ) ;
945
+ }
946
+ ) ;
930
947
}
931
948
932
949
override _getMore ( batchSize : number , callback : Callback ) : void {
@@ -935,7 +952,7 @@ export class ChangeStreamCursor<
935
952
return callback ( err ) ;
936
953
}
937
954
938
- this . _processBatch ( 'nextBatch' , response ) ;
955
+ this . _processBatch ( response as TODO_NODE_3286 as ChangeStreamAggregateRawResult < TChange > ) ;
939
956
940
957
this . emit ( ChangeStream . MORE , response ) ;
941
958
this . emit ( ChangeStream . RESPONSE ) ;
0 commit comments