Skip to content

Commit 38742c2

Browse files
nbbeekendurran
andauthored
perf(NODE-5928): consolidate signal use and abort promise wrap (#3992)
Co-authored-by: Durran Jordan <[email protected]>
1 parent 90cb6fa commit 38742c2

File tree

3 files changed

+41
-185
lines changed

3 files changed

+41
-185
lines changed

src/cmap/connection.ts

+41-23
Original file line numberDiff line numberDiff line change
@@ -29,14 +29,14 @@ import { type CancellationToken, TypedEventEmitter } from '../mongo_types';
2929
import type { ReadPreferenceLike } from '../read_preference';
3030
import { applySession, type ClientSession, updateSessionFromResponse } from '../sessions';
3131
import {
32-
abortable,
3332
BufferPool,
3433
calculateDurationInMs,
3534
type Callback,
3635
HostAddress,
3736
maxWireVersion,
3837
type MongoDBNamespace,
3938
now,
39+
promiseWithResolvers,
4040
uuidV4
4141
} from '../utils';
4242
import type { WriteConcern } from '../write_concern';
@@ -161,15 +161,14 @@ function streamIdentifier(stream: Stream, options: ConnectionOptions): string {
161161
export class Connection extends TypedEventEmitter<ConnectionEvents> {
162162
public id: number | '<monitor>';
163163
public address: string;
164-
public lastHelloMS?: number;
164+
public lastHelloMS = -1;
165165
public serverApi?: ServerApi;
166-
public helloOk?: boolean;
166+
public helloOk = false;
167167
public authContext?: AuthContext;
168168
public delayedTimeoutId: NodeJS.Timeout | null = null;
169169
public generation: number;
170170
public readonly description: Readonly<StreamDescription>;
171171
/**
172-
* @public
173172
* Represents if the connection has been established:
174173
* - TCP handshake
175174
* - TLS negotiated
@@ -180,15 +179,16 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
180179
public established: boolean;
181180

182181
private lastUseTime: number;
183-
private socketTimeoutMS: number;
184-
private monitorCommands: boolean;
185-
private socket: Stream;
186-
private controller: AbortController;
187-
private messageStream: Readable;
188-
private socketWrite: (buffer: Uint8Array) => Promise<void>;
189182
private clusterTime: Document | null = null;
190-
/** @internal */
191-
override mongoLogger: MongoLogger | undefined;
183+
184+
private readonly socketTimeoutMS: number;
185+
private readonly monitorCommands: boolean;
186+
private readonly socket: Stream;
187+
private readonly controller: AbortController;
188+
private readonly signal: AbortSignal;
189+
private readonly messageStream: Readable;
190+
private readonly socketWrite: (buffer: Uint8Array) => Promise<void>;
191+
private readonly aborted: Promise<never>;
192192

193193
/** @event */
194194
static readonly COMMAND_STARTED = COMMAND_STARTED;
@@ -221,7 +221,21 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
221221
this.lastUseTime = now();
222222

223223
this.socket = stream;
224+
225+
// TODO: Remove signal from connection layer
224226
this.controller = new AbortController();
227+
const { signal } = this.controller;
228+
this.signal = signal;
229+
const { promise: aborted, reject } = promiseWithResolvers<never>();
230+
aborted.then(undefined, () => null); // Prevent unhandled rejection
231+
this.signal.addEventListener(
232+
'abort',
233+
function onAbort() {
234+
reject(signal.reason);
235+
},
236+
{ once: true }
237+
);
238+
this.aborted = aborted;
225239

226240
this.messageStream = this.socket
227241
.on('error', this.onError.bind(this))
@@ -232,13 +246,13 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
232246

233247
const socketWrite = promisify(this.socket.write.bind(this.socket));
234248
this.socketWrite = async buffer => {
235-
return abortable(socketWrite(buffer), { signal: this.controller.signal });
249+
return Promise.race([socketWrite(buffer), this.aborted]);
236250
};
237251
}
238252

239253
/** Indicates that the connection (including underlying TCP socket) has been closed. */
240254
public get closed(): boolean {
241-
return this.controller.signal.aborted;
255+
return this.signal.aborted;
242256
}
243257

244258
public get hello() {
@@ -407,7 +421,7 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
407421
}
408422

409423
private async *sendWire(message: WriteProtocolMessageType, options: CommandOptions) {
410-
this.controller.signal.throwIfAborted();
424+
this.throwIfAborted();
411425

412426
if (typeof options.socketTimeoutMS === 'number') {
413427
this.socket.setTimeout(options.socketTimeoutMS);
@@ -426,7 +440,7 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
426440
return;
427441
}
428442

429-
this.controller.signal.throwIfAborted();
443+
this.throwIfAborted();
430444

431445
for await (const response of this.readMany()) {
432446
this.socket.setTimeout(0);
@@ -447,7 +461,7 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
447461
}
448462

449463
yield document;
450-
this.controller.signal.throwIfAborted();
464+
this.throwIfAborted();
451465

452466
if (typeof options.socketTimeoutMS === 'number') {
453467
this.socket.setTimeout(options.socketTimeoutMS);
@@ -481,7 +495,7 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
481495

482496
let document;
483497
try {
484-
this.controller.signal.throwIfAborted();
498+
this.throwIfAborted();
485499
for await (document of this.sendWire(message, options)) {
486500
if (!Buffer.isBuffer(document) && document.writeConcernError) {
487501
throw new MongoWriteConcernError(document.writeConcernError, document);
@@ -511,7 +525,7 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
511525
}
512526

513527
yield document;
514-
this.controller.signal.throwIfAborted();
528+
this.throwIfAborted();
515529
}
516530
} catch (error) {
517531
if (this.shouldEmitAndLogCommand) {
@@ -554,7 +568,7 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
554568
command: Document,
555569
options: CommandOptions = {}
556570
): Promise<Document> {
557-
this.controller.signal.throwIfAborted();
571+
this.throwIfAborted();
558572
for await (const document of this.sendCommand(ns, command, options)) {
559573
return document;
560574
}
@@ -568,16 +582,20 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
568582
replyListener: Callback
569583
) {
570584
const exhaustLoop = async () => {
571-
this.controller.signal.throwIfAborted();
585+
this.throwIfAborted();
572586
for await (const reply of this.sendCommand(ns, command, options)) {
573587
replyListener(undefined, reply);
574-
this.controller.signal.throwIfAborted();
588+
this.throwIfAborted();
575589
}
576590
throw new MongoUnexpectedServerResponseError('Server ended moreToCome unexpectedly');
577591
};
578592
exhaustLoop().catch(replyListener);
579593
}
580594

595+
private throwIfAborted() {
596+
this.signal.throwIfAborted();
597+
}
598+
581599
/**
582600
* @internal
583601
*
@@ -611,7 +629,7 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
611629
* Note that `for-await` loops call `return` automatically when the loop is exited.
612630
*/
613631
private async *readMany(): AsyncGenerator<OpMsgResponse | OpQueryResponse> {
614-
for await (const message of onData(this.messageStream, { signal: this.controller.signal })) {
632+
for await (const message of onData(this.messageStream, { signal: this.signal })) {
615633
const response = await decompressResponse(message);
616634
yield response;
617635

src/utils.ts

-30
Original file line numberDiff line numberDiff line change
@@ -1283,36 +1283,6 @@ export function isHostMatch(match: RegExp, host?: string): boolean {
12831283
return host && match.test(host.toLowerCase()) ? true : false;
12841284
}
12851285

1286-
/**
1287-
* Takes a promise and races it with a promise wrapping the abort event of the optionally provided signal.
1288-
* The given promise is _always_ ordered before the signal's abort promise.
1289-
* When given an already rejected promise and an already aborted signal, the promise's rejection takes precedence.
1290-
*
1291-
* @see https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/Promise/race
1292-
*
1293-
* @param promise - A promise to discard if the signal aborts
1294-
* @param options - An options object carrying an optional signal
1295-
*/
1296-
export async function abortable<T>(
1297-
promise: Promise<T>,
1298-
{ signal }: { signal: AbortSignal }
1299-
): Promise<T> {
1300-
const { promise: aborted, reject } = promiseWithResolvers<never>();
1301-
1302-
function rejectOnAbort() {
1303-
reject(signal.reason);
1304-
}
1305-
1306-
if (signal.aborted) rejectOnAbort();
1307-
else signal.addEventListener('abort', rejectOnAbort, { once: true });
1308-
1309-
try {
1310-
return await Promise.race([promise, aborted]);
1311-
} finally {
1312-
signal.removeEventListener('abort', rejectOnAbort);
1313-
}
1314-
}
1315-
13161286
export function promiseWithResolvers<T>() {
13171287
let resolve!: Parameters<ConstructorParameters<typeof Promise<T>>[0]>[0];
13181288
let reject!: Parameters<ConstructorParameters<typeof Promise<T>>[0]>[1];

test/unit/utils.test.ts

-132
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,7 @@
11
import { expect } from 'chai';
22
import * as sinon from 'sinon';
3-
import { setTimeout } from 'timers';
43

54
import {
6-
abortable,
75
BufferPool,
86
ByteUtils,
97
compareObjectId,
@@ -21,7 +19,6 @@ import {
2119
shuffle,
2220
TimeoutController
2321
} from '../mongodb';
24-
import { sleep } from '../tools/utils';
2522
import { createTimerSandbox } from './timer_sandbox';
2623

2724
describe('driver utils', function () {
@@ -1077,133 +1074,4 @@ describe('driver utils', function () {
10771074
});
10781075
});
10791076
});
1080-
1081-
describe('abortable()', () => {
1082-
const goodError = new Error('good error');
1083-
const badError = new Error('unexpected bad error!');
1084-
const expectedValue = "don't panic";
1085-
1086-
context('always removes the abort listener it attaches', () => {
1087-
let controller;
1088-
let removeEventListenerSpy;
1089-
let addEventListenerSpy;
1090-
1091-
beforeEach(() => {
1092-
controller = new AbortController();
1093-
addEventListenerSpy = sinon.spy(controller.signal, 'addEventListener');
1094-
removeEventListenerSpy = sinon.spy(controller.signal, 'removeEventListener');
1095-
});
1096-
1097-
afterEach(() => sinon.restore());
1098-
1099-
const expectListenerCleanup = () => {
1100-
expect(addEventListenerSpy).to.have.been.calledOnce;
1101-
expect(removeEventListenerSpy).to.have.been.calledOnce;
1102-
};
1103-
1104-
it('when promise rejects', async () => {
1105-
await abortable(Promise.reject(goodError), { signal: controller.signal }).catch(e => e);
1106-
expectListenerCleanup();
1107-
});
1108-
1109-
it('when promise resolves', async () => {
1110-
await abortable(Promise.resolve(expectedValue), { signal: controller.signal });
1111-
expectListenerCleanup();
1112-
});
1113-
1114-
it('when signal aborts', async () => {
1115-
setTimeout(() => controller.abort(goodError));
1116-
await abortable(new Promise(() => null), { signal: controller.signal }).catch(e => e);
1117-
expectListenerCleanup();
1118-
});
1119-
});
1120-
1121-
context('when given already rejected promise with already aborted signal', () => {
1122-
it('returns promise rejection', async () => {
1123-
const controller = new AbortController();
1124-
const { signal } = controller;
1125-
controller.abort(badError);
1126-
const result = await abortable(Promise.reject(goodError), { signal }).catch(e => e);
1127-
expect(result).to.deep.equal(goodError);
1128-
});
1129-
});
1130-
1131-
context('when given already resolved promise with already aborted signal', () => {
1132-
it('returns promise resolution', async () => {
1133-
const controller = new AbortController();
1134-
const { signal } = controller;
1135-
controller.abort(badError);
1136-
const result = await abortable(Promise.resolve(expectedValue), { signal }).catch(e => e);
1137-
expect(result).to.deep.equal(expectedValue);
1138-
});
1139-
});
1140-
1141-
context('when given already rejected promise with not yet aborted signal', () => {
1142-
it('returns promise rejection', async () => {
1143-
const controller = new AbortController();
1144-
const { signal } = controller;
1145-
const result = await abortable(Promise.reject(goodError), { signal }).catch(e => e);
1146-
expect(result).to.deep.equal(goodError);
1147-
});
1148-
});
1149-
1150-
context('when given already resolved promise with not yet aborted signal', () => {
1151-
it('returns promise resolution', async () => {
1152-
const controller = new AbortController();
1153-
const { signal } = controller;
1154-
const result = await abortable(Promise.resolve(expectedValue), { signal }).catch(e => e);
1155-
expect(result).to.deep.equal(expectedValue);
1156-
});
1157-
});
1158-
1159-
context('when given unresolved promise with an already aborted signal', () => {
1160-
it('returns signal reason', async () => {
1161-
const controller = new AbortController();
1162-
const { signal } = controller;
1163-
controller.abort(goodError);
1164-
const result = await abortable(new Promise(() => null), { signal }).catch(e => e);
1165-
expect(result).to.deep.equal(goodError);
1166-
});
1167-
});
1168-
1169-
context('when given eventually rejecting promise with not yet aborted signal', () => {
1170-
const eventuallyReject = async () => {
1171-
await sleep(1);
1172-
throw goodError;
1173-
};
1174-
1175-
it('returns promise rejection', async () => {
1176-
const controller = new AbortController();
1177-
const { signal } = controller;
1178-
const result = await abortable(eventuallyReject(), { signal }).catch(e => e);
1179-
expect(result).to.deep.equal(goodError);
1180-
});
1181-
});
1182-
1183-
context('when given eventually resolving promise with not yet aborted signal', () => {
1184-
const eventuallyResolve = async () => {
1185-
await sleep(1);
1186-
return expectedValue;
1187-
};
1188-
1189-
it('returns promise resolution', async () => {
1190-
const controller = new AbortController();
1191-
const { signal } = controller;
1192-
const result = await abortable(eventuallyResolve(), { signal }).catch(e => e);
1193-
expect(result).to.deep.equal(expectedValue);
1194-
});
1195-
});
1196-
1197-
context('when given unresolved promise with eventually aborted signal', () => {
1198-
it('returns signal reason', async () => {
1199-
const controller = new AbortController();
1200-
const { signal } = controller;
1201-
1202-
setTimeout(() => controller.abort(goodError), 1);
1203-
1204-
const result = await abortable(new Promise(() => null), { signal }).catch(e => e);
1205-
expect(result).to.deep.equal(goodError);
1206-
});
1207-
});
1208-
});
12091077
});

0 commit comments

Comments
 (0)