Skip to content

Commit 4ee1997

Browse files
committed
feat(NODE-6090): Implement CSOT logic for connection checkout and server selection
1 parent 9a5e611 commit 4ee1997

20 files changed

+570
-176
lines changed

src/admin.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,8 @@ export class Admin {
7878
new RunAdminCommandOperation(command, {
7979
...resolveBSONOptions(options),
8080
session: options?.session,
81-
readPreference: options?.readPreference
81+
readPreference: options?.readPreference,
82+
timeoutMS: options?.timeoutMS ?? this.s.db.timeoutMS
8283
})
8384
);
8485
}

src/cmap/connection.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ import { type CancellationToken, TypedEventEmitter } from '../mongo_types';
2929
import { ReadPreference, type ReadPreferenceLike } from '../read_preference';
3030
import { ServerType } from '../sdam/common';
3131
import { applySession, type ClientSession, updateSessionFromResponse } from '../sessions';
32+
import { type Timeout } from '../timeout';
3233
import {
3334
BufferPool,
3435
calculateDurationInMs,
@@ -93,6 +94,9 @@ export interface CommandOptions extends BSONSerializeOptions {
9394
writeConcern?: WriteConcern;
9495

9596
directConnection?: boolean;
97+
98+
/** @internal */
99+
timeout?: Timeout;
96100
}
97101

98102
/** @public */

src/cmap/connection_pool.ts

Lines changed: 38 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -21,13 +21,14 @@ import {
2121
MongoInvalidArgumentError,
2222
MongoMissingCredentialsError,
2323
MongoNetworkError,
24+
MongoOperationTimeoutError,
2425
MongoRuntimeError,
2526
MongoServerError
2627
} from '../error';
2728
import { CancellationToken, TypedEventEmitter } from '../mongo_types';
2829
import type { Server } from '../sdam/server';
2930
import { Timeout, TimeoutError } from '../timeout';
30-
import { type Callback, List, makeCounter, promiseWithResolvers } from '../utils';
31+
import { type Callback, csotMin, List, makeCounter, promiseWithResolvers } from '../utils';
3132
import { connect } from './connect';
3233
import { Connection, type ConnectionEvents, type ConnectionOptions } from './connection';
3334
import {
@@ -102,7 +103,6 @@ export interface ConnectionPoolOptions extends Omit<ConnectionOptions, 'id' | 'g
102103
export interface WaitQueueMember {
103104
resolve: (conn: Connection) => void;
104105
reject: (err: AnyError) => void;
105-
timeout: Timeout;
106106
[kCancelled]?: boolean;
107107
}
108108

@@ -354,35 +354,57 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
354354
* will be held by the pool. This means that if a connection is checked out it MUST be checked back in or
355355
* explicitly destroyed by the new owner.
356356
*/
357-
async checkOut(): Promise<Connection> {
357+
async checkOut(options?: { timeout?: Timeout }): Promise<Connection> {
358358
this.emitAndLog(
359359
ConnectionPool.CONNECTION_CHECK_OUT_STARTED,
360360
new ConnectionCheckOutStartedEvent(this)
361361
);
362362

363363
const waitQueueTimeoutMS = this.options.waitQueueTimeoutMS;
364+
const serverSelectionTimeoutMS = this[kServer].topology.s.serverSelectionTimeoutMS;
364365

365366
const { promise, resolve, reject } = promiseWithResolvers<Connection>();
366367

367-
const timeout = Timeout.expires(waitQueueTimeoutMS);
368+
let timeout: Timeout | null = null;
369+
if (options?.timeout) {
370+
// CSOT enabled
371+
// Determine if we're using the timeout passed in or a new timeout
372+
if (options.timeout.duration > 0 || serverSelectionTimeoutMS > 0) {
373+
// This check determines whether or not Topology.selectServer used the configured
374+
// `timeoutMS` or `serverSelectionTimeoutMS` value for its timeout
375+
if (
376+
options.timeout.duration === serverSelectionTimeoutMS ||
377+
csotMin(options.timeout.duration, serverSelectionTimeoutMS) < serverSelectionTimeoutMS
378+
) {
379+
// server selection used `timeoutMS`, so we should use the existing timeout as the timeout
380+
// here
381+
timeout = options.timeout;
382+
} else {
383+
// server selection used `serverSelectionTimeoutMS`, so we construct a new timeout with
384+
// the time remaining to ensure that Topology.selectServer and ConnectionPool.checkOut
385+
// cumulatively don't spend more than `serverSelectionTimeoutMS` blocking
386+
timeout = Timeout.expires(serverSelectionTimeoutMS - options.timeout.timeElapsed);
387+
}
388+
}
389+
} else {
390+
timeout = Timeout.expires(waitQueueTimeoutMS);
391+
}
368392

369393
const waitQueueMember: WaitQueueMember = {
370394
resolve,
371-
reject,
372-
timeout
395+
reject
373396
};
374397

375398
this[kWaitQueue].push(waitQueueMember);
376399
process.nextTick(() => this.processWaitQueue());
377400

378401
try {
379-
return await Promise.race([promise, waitQueueMember.timeout]);
402+
timeout?.throwIfExpired();
403+
return await (timeout ? Promise.race([promise, timeout]) : promise);
380404
} catch (error) {
381405
if (TimeoutError.is(error)) {
382406
waitQueueMember[kCancelled] = true;
383407

384-
waitQueueMember.timeout.clear();
385-
386408
this.emitAndLog(
387409
ConnectionPool.CONNECTION_CHECK_OUT_FAILED,
388410
new ConnectionCheckOutFailedEvent(this, 'timeout')
@@ -393,9 +415,16 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
393415
: 'Timed out while checking out a connection from connection pool',
394416
this.address
395417
);
418+
if (options?.timeout) {
419+
throw new MongoOperationTimeoutError('Timed out during connection checkout', {
420+
cause: timeoutError
421+
});
422+
}
396423
throw timeoutError;
397424
}
398425
throw error;
426+
} finally {
427+
if (timeout !== options?.timeout) timeout?.clear();
399428
}
400429
}
401430

@@ -761,7 +790,6 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
761790
ConnectionPool.CONNECTION_CHECK_OUT_FAILED,
762791
new ConnectionCheckOutFailedEvent(this, reason, error)
763792
);
764-
waitQueueMember.timeout.clear();
765793
this[kWaitQueue].shift();
766794
waitQueueMember.reject(error);
767795
continue;
@@ -782,7 +810,6 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
782810
ConnectionPool.CONNECTION_CHECKED_OUT,
783811
new ConnectionCheckedOutEvent(this, connection)
784812
);
785-
waitQueueMember.timeout.clear();
786813

787814
this[kWaitQueue].shift();
788815
waitQueueMember.resolve(connection);
@@ -820,8 +847,6 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
820847
);
821848
waitQueueMember.resolve(connection);
822849
}
823-
824-
waitQueueMember.timeout.clear();
825850
}
826851
process.nextTick(() => this.processWaitQueue());
827852
});

src/collection.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -261,6 +261,11 @@ export class Collection<TSchema extends Document = Document> {
261261
this.s.collectionHint = normalizeHintField(v);
262262
}
263263

264+
/** @internal */
265+
get timeoutMS(): number | undefined {
266+
return this.s.options.timeoutMS;
267+
}
268+
264269
/**
265270
* Inserts a single document into MongoDB. If documents passed in do not contain the **_id** field,
266271
* one will be added to each of the documents missing it by the driver, mutating the document. This behavior

src/db.ts

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -222,6 +222,11 @@ export class Db {
222222
return this.s.namespace.toString();
223223
}
224224

225+
/** @internal */
226+
get timeoutMS(): number | undefined {
227+
return this.s.options?.timeoutMS;
228+
}
229+
225230
/**
226231
* Create a new collection on a server with the specified options. Use this to create capped collections.
227232
* More information about command options available at https://www.mongodb.com/docs/manual/reference/command/create/
@@ -272,6 +277,7 @@ export class Db {
272277
this.client,
273278
new RunCommandOperation(this, command, {
274279
...resolveBSONOptions(options),
280+
timeoutMS: options?.timeoutMS,
275281
session: options?.session,
276282
readPreference: options?.readPreference
277283
})

src/error.ts

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -760,6 +760,15 @@ export class MongoUnexpectedServerResponseError extends MongoRuntimeError {
760760
}
761761
}
762762

763+
/**
764+
* @internal
765+
*/
766+
export class MongoOperationTimeoutError extends MongoRuntimeError {
767+
override get name(): string {
768+
return 'MongoOperationTimeoutError';
769+
}
770+
}
771+
763772
/**
764773
* An error thrown when the user attempts to add options to a cursor that has already been
765774
* initialized

src/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@ export {
6363
MongoNetworkTimeoutError,
6464
MongoNotConnectedError,
6565
MongoOIDCError,
66+
MongoOperationTimeoutError,
6667
MongoParseError,
6768
MongoRuntimeError,
6869
MongoServerClosedError,

src/operations/command.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@ export interface OperationParent {
6565
writeConcern?: WriteConcern;
6666
readPreference?: ReadPreference;
6767
bsonOptions?: BSONSerializeOptions;
68+
timeoutMS?: number;
6869
}
6970

7071
/** @internal */
@@ -131,6 +132,7 @@ export abstract class CommandOperation<T> extends AbstractOperation<T> {
131132
const options = {
132133
...this.options,
133134
...this.bsonOptions,
135+
timeout: this.timeout,
134136
readPreference: this.readPreference,
135137
session
136138
};

src/operations/find.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,8 @@ export class FindOperation extends CommandOperation<CursorResponse> {
116116
...this.options,
117117
...this.bsonOptions,
118118
documentsReturnedIn: 'firstBatch',
119-
session
119+
session,
120+
timeout: this.timeout
120121
},
121122
this.explain ? ExplainedCursorResponse : CursorResponse
122123
);

src/operations/operation.ts

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ import { type BSONSerializeOptions, type Document, resolveBSONOptions } from '..
22
import { ReadPreference, type ReadPreferenceLike } from '../read_preference';
33
import type { Server } from '../sdam/server';
44
import type { ClientSession } from '../sessions';
5+
import { type Timeout } from '../timeout';
56
import type { MongoDBNamespace } from '../utils';
67

78
export const Aspect = {
@@ -61,6 +62,11 @@ export abstract class AbstractOperation<TResult = any> {
6162

6263
options: OperationOptions;
6364

65+
/** @internal */
66+
timeout?: Timeout;
67+
/** @internal */
68+
timeoutMS?: number;
69+
6470
[kSession]: ClientSession | undefined;
6571

6672
constructor(options: OperationOptions = {}) {
@@ -76,6 +82,8 @@ export abstract class AbstractOperation<TResult = any> {
7682
this.options = options;
7783
this.bypassPinningCheck = !!options.bypassPinningCheck;
7884
this.trySecondaryWrite = false;
85+
86+
this.timeoutMS = options.timeoutMS;
7987
}
8088

8189
/** Must match the first key of the command object sent to the server.

src/operations/run_command.ts

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@ export type RunCommandOptions = {
1414
session?: ClientSession;
1515
/** The read preference */
1616
readPreference?: ReadPreferenceLike;
17+
/** @internal */
18+
timeoutMS?: number;
1719
} & BSONSerializeOptions;
1820

1921
/** @internal */
@@ -39,10 +41,12 @@ export class RunCommandOperation<T = Document> extends AbstractOperation<T> {
3941
{
4042
...this.options,
4143
readPreference: this.readPreference,
42-
session
44+
session,
45+
timeout: this.timeout
4346
},
4447
this.options.responseType
4548
);
49+
4650
return res;
4751
}
4852
}
@@ -68,7 +72,8 @@ export class RunAdminCommandOperation<T = Document> extends AbstractOperation<T>
6872
const res: TODO_NODE_3286 = await server.command(this.ns, this.command, {
6973
...this.options,
7074
readPreference: this.readPreference,
71-
session
75+
session,
76+
timeout: this.timeout
7277
});
7378
return res;
7479
}

src/sdam/server.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -311,7 +311,7 @@ export class Server extends TypedEventEmitter<ServerEvents> {
311311
this.incrementOperationCount();
312312
if (conn == null) {
313313
try {
314-
conn = await this.pool.checkOut();
314+
conn = await this.pool.checkOut(options);
315315
if (this.loadBalanced && isPinnableCommand(cmd, session)) {
316316
session?.pin(conn);
317317
}
@@ -336,6 +336,7 @@ export class Server extends TypedEventEmitter<ServerEvents> {
336336
operationError.code === MONGODB_ERROR_CODES.Reauthenticate
337337
) {
338338
await this.pool.reauthenticate(conn);
339+
// TODO(NODE-5682): Implement CSOT support for socket read/write at the connection layer
339340
try {
340341
const res = await conn.command(ns, cmd, finalOptions, responseType);
341342
throwIfWriteConcernError(res);

0 commit comments

Comments
 (0)