Skip to content

Commit a6cc438

Browse files
committed
feat(NODE-6632): destroy async resources when client closes
1 parent cf748f8 commit a6cc438

33 files changed

+704
-279
lines changed

.eslintrc.json

+8
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,14 @@
119119
{
120120
"selector": "BinaryExpression[operator=/[=!]==?/] Literal[value='undefined']",
121121
"message": "Do not strictly check typeof undefined (NOTE: currently this rule only detects the usage of 'undefined' string literal so this could be a misfire)"
122+
},
123+
{
124+
"selector": "CallExpression[callee.name='setTimeout']",
125+
"message": "setTimeout must be abortable"
126+
},
127+
{
128+
"selector": "CallExpression[callee.name='clearTimeout']",
129+
"message": "clearTimeout must remove abort listener"
122130
}
123131
],
124132
"@typescript-eslint/no-unused-vars": "error",

src/change_stream.ts

+2-1
Original file line numberDiff line numberDiff line change
@@ -680,7 +680,8 @@ export class ChangeStream<
680680
if (this.options.timeoutMS != null) {
681681
this.timeoutContext = new CSOTTimeoutContext({
682682
timeoutMS: this.options.timeoutMS,
683-
serverSelectionTimeoutMS
683+
serverSelectionTimeoutMS,
684+
closeSignal: this.cursor.client.closeSignal
684685
});
685686
}
686687
}

src/client-side-encryption/auto_encrypter.ts

+22-14
Original file line numberDiff line numberDiff line change
@@ -393,13 +393,17 @@ export class AutoEncrypter {
393393
context.ns = ns;
394394
context.document = cmd;
395395

396-
const stateMachine = new StateMachine({
397-
promoteValues: false,
398-
promoteLongs: false,
399-
proxyOptions: this._proxyOptions,
400-
tlsOptions: this._tlsOptions,
401-
socketOptions: autoSelectSocketOptions(this._client.s.options)
402-
});
396+
const stateMachine = new StateMachine(
397+
{
398+
promoteValues: false,
399+
promoteLongs: false,
400+
proxyOptions: this._proxyOptions,
401+
tlsOptions: this._tlsOptions,
402+
socketOptions: autoSelectSocketOptions(this._client.s.options)
403+
},
404+
undefined,
405+
this._client.closeSignal
406+
);
403407

404408
return deserialize(await stateMachine.execute(this, context, options), {
405409
promoteValues: false,
@@ -420,12 +424,16 @@ export class AutoEncrypter {
420424

421425
context.id = this._contextCounter++;
422426

423-
const stateMachine = new StateMachine({
424-
...options,
425-
proxyOptions: this._proxyOptions,
426-
tlsOptions: this._tlsOptions,
427-
socketOptions: autoSelectSocketOptions(this._client.s.options)
428-
});
427+
const stateMachine = new StateMachine(
428+
{
429+
...options,
430+
proxyOptions: this._proxyOptions,
431+
tlsOptions: this._tlsOptions,
432+
socketOptions: autoSelectSocketOptions(this._client.s.options)
433+
},
434+
undefined,
435+
this._client.closeSignal
436+
);
429437

430438
return await stateMachine.execute(this, context, options);
431439
}
@@ -438,7 +446,7 @@ export class AutoEncrypter {
438446
* the original ones.
439447
*/
440448
async askForKMSCredentials(): Promise<KMSProviders> {
441-
return await refreshKMSCredentials(this._kmsProviders);
449+
return await refreshKMSCredentials(this._kmsProviders, this._client.closeSignal);
442450
}
443451

444452
/**

src/client-side-encryption/client_encryption.ts

+37-21
Original file line numberDiff line numberDiff line change
@@ -214,11 +214,15 @@ export class ClientEncryption {
214214
keyMaterial
215215
});
216216

217-
const stateMachine = new StateMachine({
218-
proxyOptions: this._proxyOptions,
219-
tlsOptions: this._tlsOptions,
220-
socketOptions: autoSelectSocketOptions(this._client.s.options)
221-
});
217+
const stateMachine = new StateMachine(
218+
{
219+
proxyOptions: this._proxyOptions,
220+
tlsOptions: this._tlsOptions,
221+
socketOptions: autoSelectSocketOptions(this._client.s.options)
222+
},
223+
undefined,
224+
this._client.closeSignal
225+
);
222226

223227
const timeoutContext =
224228
options?.timeoutContext ??
@@ -283,11 +287,15 @@ export class ClientEncryption {
283287
}
284288
const filterBson = serialize(filter);
285289
const context = this._mongoCrypt.makeRewrapManyDataKeyContext(filterBson, keyEncryptionKeyBson);
286-
const stateMachine = new StateMachine({
287-
proxyOptions: this._proxyOptions,
288-
tlsOptions: this._tlsOptions,
289-
socketOptions: autoSelectSocketOptions(this._client.s.options)
290-
});
290+
const stateMachine = new StateMachine(
291+
{
292+
proxyOptions: this._proxyOptions,
293+
tlsOptions: this._tlsOptions,
294+
socketOptions: autoSelectSocketOptions(this._client.s.options)
295+
},
296+
undefined,
297+
this._client.closeSignal
298+
);
291299

292300
const timeoutContext = TimeoutContext.create(
293301
resolveTimeoutOptions(this._client, { timeoutMS: this._timeoutMS })
@@ -687,11 +695,15 @@ export class ClientEncryption {
687695
const valueBuffer = serialize({ v: value });
688696
const context = this._mongoCrypt.makeExplicitDecryptionContext(valueBuffer);
689697

690-
const stateMachine = new StateMachine({
691-
proxyOptions: this._proxyOptions,
692-
tlsOptions: this._tlsOptions,
693-
socketOptions: autoSelectSocketOptions(this._client.s.options)
694-
});
698+
const stateMachine = new StateMachine(
699+
{
700+
proxyOptions: this._proxyOptions,
701+
tlsOptions: this._tlsOptions,
702+
socketOptions: autoSelectSocketOptions(this._client.s.options)
703+
},
704+
undefined,
705+
this._client.closeSignal
706+
);
695707

696708
const timeoutContext =
697709
this._timeoutMS != null
@@ -712,7 +724,7 @@ export class ClientEncryption {
712724
* the original ones.
713725
*/
714726
async askForKMSCredentials(): Promise<KMSProviders> {
715-
return await refreshKMSCredentials(this._kmsProviders);
727+
return await refreshKMSCredentials(this._kmsProviders, this._client.closeSignal);
716728
}
717729

718730
static get libmongocryptVersion() {
@@ -771,11 +783,15 @@ export class ClientEncryption {
771783
}
772784

773785
const valueBuffer = serialize({ v: value });
774-
const stateMachine = new StateMachine({
775-
proxyOptions: this._proxyOptions,
776-
tlsOptions: this._tlsOptions,
777-
socketOptions: autoSelectSocketOptions(this._client.s.options)
778-
});
786+
const stateMachine = new StateMachine(
787+
{
788+
proxyOptions: this._proxyOptions,
789+
tlsOptions: this._tlsOptions,
790+
socketOptions: autoSelectSocketOptions(this._client.s.options)
791+
},
792+
undefined,
793+
this._client.closeSignal
794+
);
779795
const context = this._mongoCrypt.makeExplicitEncryptionContext(valueBuffer, contextOptions);
780796

781797
const timeoutContext =

src/client-side-encryption/providers/azure.ts

+12-8
Original file line numberDiff line numberDiff line change
@@ -30,9 +30,9 @@ interface AzureTokenCacheEntry extends AccessToken {
3030
export class AzureCredentialCache {
3131
cachedToken: AzureTokenCacheEntry | null = null;
3232

33-
async getToken(): Promise<AccessToken> {
33+
async getToken(closeSignal: AbortSignal): Promise<AccessToken> {
3434
if (this.cachedToken == null || this.needsRefresh(this.cachedToken)) {
35-
this.cachedToken = await this._getToken();
35+
this.cachedToken = await this._getToken(closeSignal);
3636
}
3737

3838
return { accessToken: this.cachedToken.accessToken };
@@ -53,8 +53,8 @@ export class AzureCredentialCache {
5353
/**
5454
* exposed for testing
5555
*/
56-
_getToken(): Promise<AzureTokenCacheEntry> {
57-
return fetchAzureKMSToken();
56+
_getToken(closeSignal: AbortSignal): Promise<AzureTokenCacheEntry> {
57+
return fetchAzureKMSToken(undefined, closeSignal);
5858
}
5959
}
6060

@@ -156,11 +156,12 @@ export function prepareRequest(options: AzureKMSRequestOptions): {
156156
* [prose test 18](https://github.com/mongodb/specifications/tree/master/source/client-side-encryption/tests#azure-imds-credentials)
157157
*/
158158
export async function fetchAzureKMSToken(
159-
options: AzureKMSRequestOptions = {}
159+
options: AzureKMSRequestOptions = {},
160+
closeSignal: AbortSignal
160161
): Promise<AzureTokenCacheEntry> {
161162
const { headers, url } = prepareRequest(options);
162163
try {
163-
const response = await get(url, { headers });
164+
const response = await get(url, { headers }, closeSignal);
164165
return await parseResponse(response);
165166
} catch (error) {
166167
if (error instanceof MongoNetworkTimeoutError) {
@@ -175,7 +176,10 @@ export async function fetchAzureKMSToken(
175176
*
176177
* @throws Will reject with a `MongoCryptError` if the http request fails or the http response is malformed.
177178
*/
178-
export async function loadAzureCredentials(kmsProviders: KMSProviders): Promise<KMSProviders> {
179-
const azure = await tokenCache.getToken();
179+
export async function loadAzureCredentials(
180+
kmsProviders: KMSProviders,
181+
closeSignal: AbortSignal
182+
): Promise<KMSProviders> {
183+
const azure = await tokenCache.getToken(closeSignal);
180184
return { ...kmsProviders, azure };
181185
}

src/client-side-encryption/providers/index.ts

+5-2
Original file line numberDiff line numberDiff line change
@@ -176,7 +176,10 @@ export function isEmptyCredentials(
176176
*
177177
* @internal
178178
*/
179-
export async function refreshKMSCredentials(kmsProviders: KMSProviders): Promise<KMSProviders> {
179+
export async function refreshKMSCredentials(
180+
kmsProviders: KMSProviders,
181+
closeSignal: AbortSignal
182+
): Promise<KMSProviders> {
180183
let finalKMSProviders = kmsProviders;
181184

182185
if (isEmptyCredentials('aws', kmsProviders)) {
@@ -188,7 +191,7 @@ export async function refreshKMSCredentials(kmsProviders: KMSProviders): Promise
188191
}
189192

190193
if (isEmptyCredentials('azure', kmsProviders)) {
191-
finalKMSProviders = await loadAzureCredentials(finalKMSProviders);
194+
finalKMSProviders = await loadAzureCredentials(finalKMSProviders, closeSignal);
192195
}
193196
return finalKMSProviders;
194197
}

src/client-side-encryption/state_machine.ts

+12-3
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import { type Abortable } from '../mongo_types';
1919
import { Timeout, type TimeoutContext, TimeoutError } from '../timeout';
2020
import {
2121
addAbortListener,
22+
addAbortSignalToStream,
2223
BufferPool,
2324
kDispose,
2425
MongoDBCollectionNamespace,
@@ -185,10 +186,15 @@ export type StateMachineOptions = {
185186
*/
186187
// TODO(DRIVERS-2671): clarify CSOT behavior for FLE APIs
187188
export class StateMachine {
189+
closeSignal: AbortSignal;
190+
188191
constructor(
189192
private options: StateMachineOptions,
190-
private bsonOptions = pluckBSONSerializeOptions(options)
191-
) {}
193+
private bsonOptions = pluckBSONSerializeOptions(options),
194+
closeSignal: AbortSignal
195+
) {
196+
this.closeSignal = closeSignal;
197+
}
192198

193199
/**
194200
* Executes the state machine according to the specification
@@ -339,6 +345,9 @@ export class StateMachine {
339345
const buffer = new BufferPool();
340346

341347
const netSocket: net.Socket = new net.Socket();
348+
349+
addAbortSignalToStream(this.closeSignal, netSocket);
350+
342351
let socket: tls.TLSSocket;
343352

344353
function destroySockets() {
@@ -451,7 +460,7 @@ export class StateMachine {
451460
await (options?.timeoutContext?.csotEnabled()
452461
? Promise.all([
453462
willResolveKmsRequest,
454-
Timeout.expires(options.timeoutContext?.remainingTimeMS)
463+
Timeout.expires(options.timeoutContext.remainingTimeMS, this.closeSignal)
455464
])
456465
: willResolveKmsRequest);
457466
} catch (error) {

src/cmap/auth/auth_provider.ts

+5-4
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,8 @@ export abstract class AuthProvider {
4747
*/
4848
async prepare(
4949
handshakeDoc: HandshakeDocument,
50-
_authContext: AuthContext
50+
_authContext: AuthContext,
51+
_closeSignal: AbortSignal
5152
): Promise<HandshakeDocument> {
5253
return handshakeDoc;
5354
}
@@ -57,19 +58,19 @@ export abstract class AuthProvider {
5758
*
5859
* @param context - A shared context for authentication flow
5960
*/
60-
abstract auth(context: AuthContext): Promise<void>;
61+
abstract auth(context: AuthContext, closeSignal: AbortSignal): Promise<void>;
6162

6263
/**
6364
* Reauthenticate.
6465
* @param context - The shared auth context.
6566
*/
66-
async reauth(context: AuthContext): Promise<void> {
67+
async reauth(context: AuthContext, closeSignal: AbortSignal): Promise<void> {
6768
if (context.reauthenticating) {
6869
throw new MongoRuntimeError('Reauthentication already in progress.');
6970
}
7071
try {
7172
context.reauthenticating = true;
72-
await this.auth(context);
73+
await this.auth(context, closeSignal);
7374
} finally {
7475
context.reauthenticating = false;
7576
}

src/cmap/auth/mongodb_oidc.ts

+15-6
Original file line numberDiff line numberDiff line change
@@ -106,12 +106,20 @@ export interface Workflow {
106106
/**
107107
* Each workflow should specify the correct custom behaviour for reauthentication.
108108
*/
109-
reauthenticate(connection: Connection, credentials: MongoCredentials): Promise<void>;
109+
reauthenticate(
110+
connection: Connection,
111+
credentials: MongoCredentials,
112+
closeSignal: AbortSignal
113+
): Promise<void>;
110114

111115
/**
112116
* Get the document to add for speculative authentication.
113117
*/
114-
speculativeAuth(connection: Connection, credentials: MongoCredentials): Promise<Document>;
118+
speculativeAuth(
119+
connection: Connection,
120+
credentials: MongoCredentials,
121+
closeSignal: AbortSignal
122+
): Promise<Document>;
115123
}
116124

117125
/** @internal */
@@ -141,14 +149,14 @@ export class MongoDBOIDC extends AuthProvider {
141149
/**
142150
* Authenticate using OIDC
143151
*/
144-
override async auth(authContext: AuthContext): Promise<void> {
152+
override async auth(authContext: AuthContext, closeSignal: AbortSignal): Promise<void> {
145153
const { connection, reauthenticating, response } = authContext;
146154
if (response?.speculativeAuthenticate?.done && !reauthenticating) {
147155
return;
148156
}
149157
const credentials = getCredentials(authContext);
150158
if (reauthenticating) {
151-
await this.workflow.reauthenticate(connection, credentials);
159+
await this.workflow.reauthenticate(connection, credentials, closeSignal);
152160
} else {
153161
await this.workflow.execute(connection, credentials, response);
154162
}
@@ -159,11 +167,12 @@ export class MongoDBOIDC extends AuthProvider {
159167
*/
160168
override async prepare(
161169
handshakeDoc: HandshakeDocument,
162-
authContext: AuthContext
170+
authContext: AuthContext,
171+
closeSignal: AbortSignal
163172
): Promise<HandshakeDocument> {
164173
const { connection } = authContext;
165174
const credentials = getCredentials(authContext);
166-
const result = await this.workflow.speculativeAuth(connection, credentials);
175+
const result = await this.workflow.speculativeAuth(connection, credentials, closeSignal);
167176
return { ...handshakeDoc, ...result };
168177
}
169178
}

src/cmap/auth/mongodb_oidc/automated_callback_workflow.ts

+3-3
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,8 @@ export class AutomatedCallbackWorkflow extends CallbackWorkflow {
1919
/**
2020
* Instantiate the human callback workflow.
2121
*/
22-
constructor(cache: TokenCache, callback: OIDCCallbackFunction) {
23-
super(cache, callback);
22+
constructor(cache: TokenCache, callback: OIDCCallbackFunction, closeSignal: AbortSignal) {
23+
super(cache, callback, closeSignal);
2424
}
2525

2626
/**
@@ -66,7 +66,7 @@ export class AutomatedCallbackWorkflow extends CallbackWorkflow {
6666
if (credentials.username) {
6767
params.username = credentials.username;
6868
}
69-
const timeout = Timeout.expires(AUTOMATED_TIMEOUT_MS);
69+
const timeout = Timeout.expires(AUTOMATED_TIMEOUT_MS, this.closeSignal);
7070
try {
7171
return await Promise.race([this.executeAndValidateCallback(params), timeout]);
7272
} catch (error) {

0 commit comments

Comments
 (0)