Skip to content

Commit a42039b

Browse files
fix(NODE-5840): heartbeat duration includes socket creation (#3973)
Co-authored-by: Alena Khineika <[email protected]>
1 parent b7d28d3 commit a42039b

File tree

10 files changed

+282
-277
lines changed

10 files changed

+282
-277
lines changed

src/cmap/connect.ts

+103-134
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ import {
1616
MongoRuntimeError,
1717
needsRetryableWriteLabel
1818
} from '../error';
19-
import { type Callback, HostAddress, ns } from '../utils';
19+
import { HostAddress, ns, promiseWithResolvers } from '../utils';
2020
import { AuthContext, type AuthProvider } from './auth/auth_provider';
2121
import { GSSAPI } from './auth/gssapi';
2222
import { MongoCR } from './auth/mongocr';
@@ -55,27 +55,26 @@ export const AUTH_PROVIDERS = new Map<AuthMechanism | string, AuthProvider>([
5555
/** @public */
5656
export type Stream = Socket | TLSSocket;
5757

58-
export function connect(options: ConnectionOptions, callback: Callback<Connection>): void {
59-
makeConnection({ ...options, existingSocket: undefined }, (err, socket) => {
60-
if (err || !socket) {
61-
return callback(err);
62-
}
63-
64-
let ConnectionType = options.connectionType ?? Connection;
65-
if (options.autoEncrypter) {
66-
ConnectionType = CryptoConnection;
67-
}
58+
export async function connect(options: ConnectionOptions): Promise<Connection> {
59+
let connection: Connection | null = null;
60+
try {
61+
const socket = await makeSocket(options);
62+
connection = makeConnection(options, socket);
63+
await performInitialHandshake(connection, options);
64+
return connection;
65+
} catch (error) {
66+
connection?.destroy({ force: false });
67+
throw error;
68+
}
69+
}
6870

69-
const connection = new ConnectionType(socket, options);
71+
export function makeConnection(options: ConnectionOptions, socket: Stream): Connection {
72+
let ConnectionType = options.connectionType ?? Connection;
73+
if (options.autoEncrypter) {
74+
ConnectionType = CryptoConnection;
75+
}
7076

71-
performInitialHandshake(connection, options).then(
72-
() => callback(undefined, connection),
73-
error => {
74-
connection.destroy({ force: false });
75-
callback(error);
76-
}
77-
);
78-
});
77+
return new ConnectionType(socket, options);
7978
}
8079

8180
function checkSupportedServer(hello: Document, options: ConnectionOptions) {
@@ -103,7 +102,7 @@ function checkSupportedServer(hello: Document, options: ConnectionOptions) {
103102
return new MongoCompatibilityError(message);
104103
}
105104

106-
async function performInitialHandshake(
105+
export async function performInitialHandshake(
107106
conn: Connection,
108107
options: ConnectionOptions
109108
): Promise<void> {
@@ -329,35 +328,21 @@ function parseSslOptions(options: MakeConnectionOptions): TLSConnectionOpts {
329328
return result;
330329
}
331330

332-
const SOCKET_ERROR_EVENT_LIST = ['error', 'close', 'timeout', 'parseError'] as const;
333-
type ErrorHandlerEventName = (typeof SOCKET_ERROR_EVENT_LIST)[number] | 'cancel';
334-
const SOCKET_ERROR_EVENTS = new Set(SOCKET_ERROR_EVENT_LIST);
335-
336-
function makeConnection(options: MakeConnectionOptions, _callback: Callback<Stream>) {
331+
export async function makeSocket(options: MakeConnectionOptions): Promise<Stream> {
337332
const useTLS = options.tls ?? false;
338333
const noDelay = options.noDelay ?? true;
339334
const connectTimeoutMS = options.connectTimeoutMS ?? 30000;
340335
const rejectUnauthorized = options.rejectUnauthorized ?? true;
341336
const existingSocket = options.existingSocket;
342337

343338
let socket: Stream;
344-
const callback: Callback<Stream> = function (err, ret) {
345-
if (err && socket) {
346-
socket.destroy();
347-
}
348-
349-
_callback(err, ret);
350-
};
351339

352340
if (options.proxyHost != null) {
353341
// Currently, only Socks5 is supported.
354-
return makeSocks5Connection(
355-
{
356-
...options,
357-
connectTimeoutMS // Should always be present for Socks5
358-
},
359-
callback
360-
);
342+
return makeSocks5Connection({
343+
...options,
344+
connectTimeoutMS // Should always be present for Socks5
345+
});
361346
}
362347

363348
if (useTLS) {
@@ -379,47 +364,41 @@ function makeConnection(options: MakeConnectionOptions, _callback: Callback<Stre
379364
socket.setTimeout(connectTimeoutMS);
380365
socket.setNoDelay(noDelay);
381366

382-
const connectEvent = useTLS ? 'secureConnect' : 'connect';
383-
let cancellationHandler: (err: Error) => void;
384-
function errorHandler(eventName: ErrorHandlerEventName) {
385-
return (err: Error) => {
386-
SOCKET_ERROR_EVENTS.forEach(event => socket.removeAllListeners(event));
387-
if (cancellationHandler && options.cancellationToken) {
388-
options.cancellationToken.removeListener('cancel', cancellationHandler);
389-
}
390-
391-
socket.removeListener(connectEvent, connectHandler);
392-
callback(connectionFailureError(eventName, err));
393-
};
394-
}
367+
let cancellationHandler: ((err: Error) => void) | null = null;
395368

396-
function connectHandler() {
397-
SOCKET_ERROR_EVENTS.forEach(event => socket.removeAllListeners(event));
398-
if (cancellationHandler && options.cancellationToken) {
399-
options.cancellationToken.removeListener('cancel', cancellationHandler);
369+
const { promise: connectedSocket, resolve, reject } = promiseWithResolvers<Stream>();
370+
if (existingSocket) {
371+
resolve(socket);
372+
} else {
373+
const connectEvent = useTLS ? 'secureConnect' : 'connect';
374+
socket
375+
.once(connectEvent, () => resolve(socket))
376+
.once('error', error => reject(connectionFailureError('error', error)))
377+
.once('timeout', () => reject(connectionFailureError('timeout')))
378+
.once('close', () => reject(connectionFailureError('close')));
379+
380+
if (options.cancellationToken != null) {
381+
cancellationHandler = () => reject(connectionFailureError('cancel'));
382+
options.cancellationToken.once('cancel', cancellationHandler);
400383
}
384+
}
401385

402-
if ('authorizationError' in socket) {
403-
if (socket.authorizationError && rejectUnauthorized) {
404-
// TODO(NODE-5192): wrap this with a MongoError subclass
405-
return callback(socket.authorizationError);
406-
}
386+
try {
387+
socket = await connectedSocket;
388+
return socket;
389+
} catch (error) {
390+
socket.destroy();
391+
if ('authorizationError' in socket && socket.authorizationError != null && rejectUnauthorized) {
392+
// TODO(NODE-5192): wrap this with a MongoError subclass
393+
throw socket.authorizationError;
407394
}
408-
395+
throw error;
396+
} finally {
409397
socket.setTimeout(0);
410-
callback(undefined, socket);
411-
}
412-
413-
SOCKET_ERROR_EVENTS.forEach(event => socket.once(event, errorHandler(event)));
414-
if (options.cancellationToken) {
415-
cancellationHandler = errorHandler('cancel');
416-
options.cancellationToken.once('cancel', cancellationHandler);
417-
}
418-
419-
if (existingSocket) {
420-
process.nextTick(connectHandler);
421-
} else {
422-
socket.once(connectEvent, connectHandler);
398+
socket.removeAllListeners();
399+
if (cancellationHandler != null) {
400+
options.cancellationToken?.removeListener('cancel', cancellationHandler);
401+
}
423402
}
424403
}
425404

@@ -435,78 +414,68 @@ function loadSocks() {
435414
return socks;
436415
}
437416

438-
function makeSocks5Connection(options: MakeConnectionOptions, callback: Callback<Stream>) {
417+
async function makeSocks5Connection(options: MakeConnectionOptions): Promise<Stream> {
439418
const hostAddress = HostAddress.fromHostPort(
440419
options.proxyHost ?? '', // proxyHost is guaranteed to set here
441420
options.proxyPort ?? 1080
442421
);
443422

444423
// First, connect to the proxy server itself:
445-
makeConnection(
446-
{
447-
...options,
448-
hostAddress,
449-
tls: false,
450-
proxyHost: undefined
451-
},
452-
(err, rawSocket) => {
453-
if (err || !rawSocket) {
454-
return callback(err);
455-
}
424+
const rawSocket = await makeSocket({
425+
...options,
426+
hostAddress,
427+
tls: false,
428+
proxyHost: undefined
429+
});
456430

457-
const destination = parseConnectOptions(options) as net.TcpNetConnectOpts;
458-
if (typeof destination.host !== 'string' || typeof destination.port !== 'number') {
459-
return callback(
460-
new MongoInvalidArgumentError('Can only make Socks5 connections to TCP hosts')
461-
);
462-
}
431+
const destination = parseConnectOptions(options) as net.TcpNetConnectOpts;
432+
if (typeof destination.host !== 'string' || typeof destination.port !== 'number') {
433+
throw new MongoInvalidArgumentError('Can only make Socks5 connections to TCP hosts');
434+
}
463435

464-
try {
465-
socks ??= loadSocks();
466-
} catch (error) {
467-
return callback(error);
436+
socks ??= loadSocks();
437+
438+
try {
439+
// Then, establish the Socks5 proxy connection:
440+
const { socket } = await socks.SocksClient.createConnection({
441+
existing_socket: rawSocket,
442+
timeout: options.connectTimeoutMS,
443+
command: 'connect',
444+
destination: {
445+
host: destination.host,
446+
port: destination.port
447+
},
448+
proxy: {
449+
// host and port are ignored because we pass existing_socket
450+
host: 'iLoveJavaScript',
451+
port: 0,
452+
type: 5,
453+
userId: options.proxyUsername || undefined,
454+
password: options.proxyPassword || undefined
468455
}
456+
});
469457

470-
// Then, establish the Socks5 proxy connection:
471-
socks.SocksClient.createConnection({
472-
existing_socket: rawSocket,
473-
timeout: options.connectTimeoutMS,
474-
command: 'connect',
475-
destination: {
476-
host: destination.host,
477-
port: destination.port
478-
},
479-
proxy: {
480-
// host and port are ignored because we pass existing_socket
481-
host: 'iLoveJavaScript',
482-
port: 0,
483-
type: 5,
484-
userId: options.proxyUsername || undefined,
485-
password: options.proxyPassword || undefined
486-
}
487-
}).then(
488-
({ socket }) => {
489-
// Finally, now treat the resulting duplex stream as the
490-
// socket over which we send and receive wire protocol messages:
491-
makeConnection(
492-
{
493-
...options,
494-
existingSocket: socket,
495-
proxyHost: undefined
496-
},
497-
callback
498-
);
499-
},
500-
error => callback(connectionFailureError('error', error))
501-
);
502-
}
503-
);
458+
// Finally, now treat the resulting duplex stream as the
459+
// socket over which we send and receive wire protocol messages:
460+
return await makeSocket({
461+
...options,
462+
existingSocket: socket,
463+
proxyHost: undefined
464+
});
465+
} catch (error) {
466+
throw connectionFailureError('error', error);
467+
}
504468
}
505469

506-
function connectionFailureError(type: ErrorHandlerEventName, err: Error) {
470+
function connectionFailureError(type: 'error', cause: Error): MongoNetworkError;
471+
function connectionFailureError(type: 'close' | 'timeout' | 'cancel'): MongoNetworkError;
472+
function connectionFailureError(
473+
type: 'error' | 'close' | 'timeout' | 'cancel',
474+
cause?: Error
475+
): MongoNetworkError {
507476
switch (type) {
508477
case 'error':
509-
return new MongoNetworkError(MongoError.buildErrorMessage(err), { cause: err });
478+
return new MongoNetworkError(MongoError.buildErrorMessage(cause), { cause });
510479
case 'timeout':
511480
return new MongoNetworkTimeoutError('connection timed out');
512481
case 'close':

0 commit comments

Comments
 (0)