@@ -53,7 +53,6 @@ import {
53
53
import type { Stream } from './connect' ;
54
54
import { MessageStream , OperationDescription } from './message_stream' ;
55
55
import { StreamDescription , StreamDescriptionOptions } from './stream_description' ;
56
- import { MONGODB_WIRE_VERSION } from './wire_protocol/constants' ;
57
56
import { applyCommonQueryOptions , getReadPreference , isSharded } from './wire_protocol/shared' ;
58
57
59
58
/** @internal */
@@ -251,35 +250,9 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
251
250
/* ignore errors, listen to `close` instead */
252
251
} ) ;
253
252
254
- const eventHandlerFactory = ( event : 'error' | 'close' | 'timeout' ) =>
255
- [
256
- event ,
257
- ( error ?: Error ) => {
258
- if ( this . closed ) {
259
- return ;
260
- }
261
- this . closed = true ;
262
-
263
- switch ( event ) {
264
- case 'error' :
265
- this . onError ( error ) ;
266
- break ;
267
- case 'close' :
268
- this . onClose ( ) ;
269
- break ;
270
- case 'timeout' :
271
- this . onTimeout ( ) ;
272
- break ;
273
- }
274
-
275
- this [ kQueue ] . clear ( ) ;
276
- this . emit ( Connection . CLOSE ) ;
277
- }
278
- ] as const ;
279
-
280
- this [ kMessageStream ] . on ( ...eventHandlerFactory ( 'error' ) ) ;
281
- stream . on ( ...eventHandlerFactory ( 'close' ) ) ;
282
- stream . on ( ...eventHandlerFactory ( 'timeout' ) ) ;
253
+ this [ kMessageStream ] . on ( 'error' , error => this . handleIssue ( { destroy : error } ) ) ;
254
+ stream . on ( 'close' , ( ) => this . handleIssue ( { isClose : true } ) ) ;
255
+ stream . on ( 'timeout' , ( ) => this . handleIssue ( { isTimeout : true , destroy : true } ) ) ;
283
256
284
257
// hook the message stream up to the passed in stream
285
258
stream . pipe ( this [ kMessageStream ] ) ;
@@ -335,29 +308,33 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
335
308
this [ kLastUseTime ] = now ( ) ;
336
309
}
337
310
338
- onTimeout ( ) {
339
- this [ kStream ] . destroy ( ) ;
340
- const beforeHandshake = {
341
- beforeHandshake : this . hello == null
342
- } ;
343
- const msg = `connection ${ this . id } to ${ this . address } timed out` ;
344
-
345
- for ( const [ , op ] of this [ kQueue ] ) {
346
- op . cb ( new MongoNetworkTimeoutError ( msg , beforeHandshake ) ) ;
311
+ handleIssue ( issue : { isTimeout ?: boolean ; isClose ?: boolean ; destroy ?: boolean | Error } ) : void {
312
+ if ( this . closed ) {
313
+ return ;
347
314
}
348
- }
349
315
350
- onClose ( ) {
351
- for ( const [ , op ] of this [ kQueue ] ) {
352
- op . cb ( new MongoNetworkError ( `connection ${ this . id } to ${ this . address } closed` ) ) ;
316
+ if ( issue . destroy ) {
317
+ this [ kStream ] . destroy ( typeof issue . destroy === 'boolean' ? undefined : issue . destroy ) ;
353
318
}
354
- }
355
319
356
- onError ( error ?: Error ) {
357
- this [ kStream ] . destroy ( error ) ;
320
+ this . closed = true ;
321
+
358
322
for ( const [ , op ] of this [ kQueue ] ) {
359
- op . cb ( error ) ;
323
+ if ( issue . isTimeout ) {
324
+ op . cb (
325
+ new MongoNetworkTimeoutError ( `connection ${ this . id } to ${ this . address } timed out` , {
326
+ beforeHandshake : this . hello == null
327
+ } )
328
+ ) ;
329
+ } else if ( issue . isClose ) {
330
+ op . cb ( new MongoNetworkError ( `connection ${ this . id } to ${ this . address } closed` ) ) ;
331
+ } else {
332
+ op . cb ( typeof issue . destroy === 'boolean' ? undefined : issue . destroy ) ;
333
+ }
360
334
}
335
+
336
+ this [ kQueue ] . clear ( ) ;
337
+ this . emit ( Connection . CLOSE ) ;
361
338
}
362
339
363
340
destroy ( ) : void ;
@@ -566,7 +543,7 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
566
543
return ;
567
544
}
568
545
569
- if ( wireVersion < MONGODB_WIRE_VERSION . FIND_COMMAND ) {
546
+ if ( wireVersion < 4 ) {
570
547
const getMoreOp = new GetMore ( ns . toString ( ) , cursorId , { numberToReturn : options . batchSize } ) ;
571
548
const queryOptions = applyCommonQueryOptions (
572
549
{ } ,
@@ -620,7 +597,7 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
620
597
throw new MongoRuntimeError ( `Invalid list of cursor ids provided: ${ cursorIds } ` ) ;
621
598
}
622
599
623
- if ( maxWireVersion ( this ) < MONGODB_WIRE_VERSION . FIND_COMMAND ) {
600
+ if ( maxWireVersion ( this ) < 4 ) {
624
601
try {
625
602
write (
626
603
this ,
@@ -678,12 +655,12 @@ export class CryptoConnection extends Connection {
678
655
}
679
656
680
657
const serverWireVersion = maxWireVersion ( this ) ;
681
- if ( serverWireVersion === MONGODB_WIRE_VERSION . UNKNOWN ) {
658
+ if ( serverWireVersion === 0 ) {
682
659
// This means the initial handshake hasn't happened yet
683
660
return super . command ( ns , cmd , options , callback ) ;
684
661
}
685
662
686
- if ( serverWireVersion < MONGODB_WIRE_VERSION . SHARDED_TRANSACTIONS ) {
663
+ if ( serverWireVersion < 8 ) {
687
664
callback (
688
665
new MongoCompatibilityError ( 'Auto-encryption requires a minimum MongoDB version of 4.2' )
689
666
) ;
@@ -720,10 +697,7 @@ function supportsOpMsg(conn: Connection) {
720
697
return false ;
721
698
}
722
699
723
- return (
724
- maxWireVersion ( conn ) >= MONGODB_WIRE_VERSION . SUPPORTS_OP_MSG &&
725
- ! description . __nodejs_mock_server__ // mock server does not support OP_MSG
726
- ) ;
700
+ return maxWireVersion ( conn ) >= 6 && ! description . __nodejs_mock_server__ ;
727
701
}
728
702
729
703
function messageHandler ( conn : Connection ) {
0 commit comments