Skip to content

Commit cefa58f

Browse files
committed
fix(NODE-5993): connection's aborted promise leak
1 parent 4ac9675 commit cefa58f

File tree

4 files changed

+152
-55
lines changed

4 files changed

+152
-55
lines changed

Diff for: src/cmap/connection.ts

+34-40
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
import { type Readable, Transform, type TransformCallback } from 'stream';
22
import { clearTimeout, setTimeout } from 'timers';
3-
import { promisify } from 'util';
43

54
import type { BSONSerializeOptions, Document, ObjectId } from '../bson';
65
import type { AutoEncrypter } from '../client-side-encryption/auto_encrypter';
@@ -180,18 +179,18 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
180179
* Once connection is established, command logging can log events (if enabled)
181180
*/
182181
public established: boolean;
182+
/** Indicates that the connection (including underlying TCP socket) has been closed. */
183+
public closed = false;
183184

184185
private lastUseTime: number;
185186
private clusterTime: Document | null = null;
187+
private error: Error | null = null;
188+
private dataEvents: AsyncGenerator<Buffer, void, void> | null = null;
186189

187190
private readonly socketTimeoutMS: number;
188191
private readonly monitorCommands: boolean;
189192
private readonly socket: Stream;
190-
private readonly controller: AbortController;
191-
private readonly signal: AbortSignal;
192193
private readonly messageStream: Readable;
193-
private readonly socketWrite: (buffer: Uint8Array) => Promise<void>;
194-
private readonly aborted: Promise<never>;
195194

196195
/** @event */
197196
static readonly COMMAND_STARTED = COMMAND_STARTED;
@@ -211,6 +210,7 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
211210
constructor(stream: Stream, options: ConnectionOptions) {
212211
super();
213212

213+
this.socket = stream;
214214
this.id = options.id;
215215
this.address = streamIdentifier(stream, options);
216216
this.socketTimeoutMS = options.socketTimeoutMS ?? 0;
@@ -223,39 +223,12 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
223223
this.generation = options.generation;
224224
this.lastUseTime = now();
225225

226-
this.socket = stream;
227-
228-
// TODO: Remove signal from connection layer
229-
this.controller = new AbortController();
230-
const { signal } = this.controller;
231-
this.signal = signal;
232-
const { promise: aborted, reject } = promiseWithResolvers<never>();
233-
aborted.then(undefined, () => null); // Prevent unhandled rejection
234-
this.signal.addEventListener(
235-
'abort',
236-
function onAbort() {
237-
reject(signal.reason);
238-
},
239-
{ once: true }
240-
);
241-
this.aborted = aborted;
242-
243226
this.messageStream = this.socket
244227
.on('error', this.onError.bind(this))
245228
.pipe(new SizedMessageTransform({ connection: this }))
246229
.on('error', this.onError.bind(this));
247230
this.socket.on('close', this.onClose.bind(this));
248231
this.socket.on('timeout', this.onTimeout.bind(this));
249-
250-
const socketWrite = promisify(this.socket.write.bind(this.socket));
251-
this.socketWrite = async buffer => {
252-
return Promise.race([socketWrite(buffer), this.aborted]);
253-
};
254-
}
255-
256-
/** Indicates that the connection (including underlying TCP socket) has been closed. */
257-
public get closed(): boolean {
258-
return this.signal.aborted;
259232
}
260233

261234
public get hello() {
@@ -355,7 +328,11 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
355328
}
356329

357330
this.socket.destroy();
358-
this.controller.abort(error);
331+
if (error) {
332+
this.error = error;
333+
this.dataEvents?.throw(error).then(undefined, () => null); // squash unhandled rejection
334+
}
335+
this.closed = true;
359336
this.emit(Connection.CLOSE);
360337
}
361338

@@ -596,7 +573,7 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
596573
}
597574

598575
private throwIfAborted() {
599-
this.signal.throwIfAborted();
576+
if (this.error) throw this.error;
600577
}
601578

602579
/**
@@ -619,7 +596,18 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
619596

620597
const buffer = Buffer.concat(await finalCommand.toBin());
621598

622-
return this.socketWrite(buffer);
599+
if (this.socket.write(buffer)) return;
600+
601+
const { promise: drained, resolve, reject } = promiseWithResolvers<void>();
602+
const onDrain = () => resolve();
603+
const onError = (error: Error) => reject(error);
604+
605+
this.socket.once('drain', onDrain).once('error', onError);
606+
try {
607+
return await drained;
608+
} finally {
609+
this.socket.off('drain', onDrain).off('error', onError);
610+
}
623611
}
624612

625613
/**
@@ -632,13 +620,19 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
632620
* Note that `for-await` loops call `return` automatically when the loop is exited.
633621
*/
634622
private async *readMany(): AsyncGenerator<OpMsgResponse | OpQueryResponse> {
635-
for await (const message of onData(this.messageStream, { signal: this.signal })) {
636-
const response = await decompressResponse(message);
637-
yield response;
623+
try {
624+
this.dataEvents = this.dataEvents = onData(this.messageStream);
625+
for await (const message of this.dataEvents) {
626+
const response = await decompressResponse(message);
627+
yield response;
638628

639-
if (!response.moreToCome) {
640-
return;
629+
if (!response.moreToCome) {
630+
return;
631+
}
641632
}
633+
} finally {
634+
this.dataEvents = null;
635+
this.throwIfAborted();
642636
}
643637
}
644638
}

Diff for: src/cmap/wire_protocol/on_data.ts

+1-15
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,7 @@ type PendingPromises = Omit<
1818
* Returns an AsyncIterator that iterates each 'data' event emitted from emitter.
1919
* It will reject upon an error event or if the provided signal is aborted.
2020
*/
21-
export function onData(emitter: EventEmitter, options: { signal: AbortSignal }) {
22-
const signal = options.signal;
23-
21+
export function onData(emitter: EventEmitter) {
2422
// Setup pending events and pending promise lists
2523
/**
2624
* When the caller has not yet called .next(), we store the
@@ -89,19 +87,8 @@ export function onData(emitter: EventEmitter, options: { signal: AbortSignal })
8987
emitter.on('data', eventHandler);
9088
emitter.on('error', errorHandler);
9189

92-
if (signal.aborted) {
93-
// If the signal is aborted, set up the first .next() call to be a rejection
94-
queueMicrotask(abortListener);
95-
} else {
96-
signal.addEventListener('abort', abortListener, { once: true });
97-
}
98-
9990
return iterator;
10091

101-
function abortListener() {
102-
errorHandler(signal.reason);
103-
}
104-
10592
function eventHandler(value: Buffer) {
10693
const promise = unconsumedPromises.shift();
10794
if (promise != null) promise.resolve({ value, done: false });
@@ -119,7 +106,6 @@ export function onData(emitter: EventEmitter, options: { signal: AbortSignal })
119106
// Adding event handlers
120107
emitter.off('data', eventHandler);
121108
emitter.off('error', errorHandler);
122-
signal.removeEventListener('abort', abortListener);
123109
finished = true;
124110
const doneResult = { value: undefined, done: finished } as const;
125111

Diff for: test/integration/connection-monitoring-and-pooling/connection.test.ts

+84
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,10 @@
11
import { expect } from 'chai';
2+
import { type EventEmitter, once } from 'events';
3+
import * as sinon from 'sinon';
4+
import { setTimeout } from 'timers';
25

36
import {
7+
Binary,
48
connect,
59
Connection,
610
type ConnectionOptions,
@@ -14,7 +18,9 @@ import {
1418
ServerHeartbeatStartedEvent,
1519
Topology
1620
} from '../../mongodb';
21+
import * as mock from '../../tools/mongodb-mock/index';
1722
import { skipBrokenAuthTestBeforeEachHook } from '../../tools/runner/hooks/configuration';
23+
import { getSymbolFrom, sleep } from '../../tools/utils';
1824
import { assert as test, setupDatabase } from '../shared';
1925

2026
const commonConnectOptions = {
@@ -197,6 +203,84 @@ describe('Connection', function () {
197203
client.connect();
198204
});
199205

206+
context(
207+
'when a large message is written to the socket',
208+
{ requires: { topology: 'single', auth: 'disabled' } },
209+
() => {
210+
let client, mockServer: import('../../tools/mongodb-mock/src/server').MockServer;
211+
212+
beforeEach(async function () {
213+
mockServer = await mock.createServer();
214+
215+
mockServer
216+
.addMessageHandler('insert', req => {
217+
setTimeout(() => {
218+
req.reply({ ok: 1 });
219+
}, 800);
220+
})
221+
.addMessageHandler('hello', req => {
222+
req.reply(Object.assign({}, mock.HELLO));
223+
})
224+
.addMessageHandler(LEGACY_HELLO_COMMAND, req => {
225+
req.reply(Object.assign({}, mock.HELLO));
226+
});
227+
228+
client = new MongoClient(`mongodb://${mockServer.uri()}`, {
229+
minPoolSize: 1,
230+
maxPoolSize: 1
231+
});
232+
});
233+
234+
afterEach(async function () {
235+
await client.close();
236+
mockServer.destroy();
237+
sinon.restore();
238+
});
239+
240+
it('waits for an async drain event because the write was buffered', async () => {
241+
const connectionReady = once(client, 'connectionReady');
242+
await client.connect();
243+
await connectionReady;
244+
245+
// Get the only connection
246+
const pool = [...client.topology.s.servers.values()][0].pool;
247+
248+
const connections = pool[getSymbolFrom(pool, 'connections')];
249+
expect(connections).to.have.lengthOf(1);
250+
251+
const connection = connections.first();
252+
const socket: EventEmitter = connection.socket;
253+
254+
// Spy on the socket event listeners
255+
const addedListeners: string[] = [];
256+
const removedListeners: string[] = [];
257+
socket
258+
.on('removeListener', name => removedListeners.push(name))
259+
.on('newListener', name => addedListeners.push(name));
260+
261+
// Make server sockets block
262+
for (const s of mockServer.sockets) s.pause();
263+
264+
const insert = client
265+
.db('test')
266+
.collection('test')
267+
// Anything above 16Kb should work I think (10mb to be extra sure)
268+
.insertOne({ a: new Binary(Buffer.alloc(10 * (2 ** 10) ** 2), 127) });
269+
270+
// Sleep a bit and unblock server sockets
271+
await sleep(10);
272+
for (const s of mockServer.sockets) s.resume();
273+
274+
// Let the operation finish
275+
await insert;
276+
277+
// Ensure that we used the drain event for this write
278+
expect(addedListeners).to.deep.equal(['drain', 'error']);
279+
expect(removedListeners).to.deep.equal(['drain', 'error']);
280+
});
281+
}
282+
);
283+
200284
context('when connecting with a username and password', () => {
201285
let utilClient: MongoClient;
202286
let client: MongoClient;

Diff for: test/integration/node-specific/resource_clean_up.test.ts

+33
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
1+
import * as v8 from 'node:v8';
2+
13
import { expect } from 'chai';
24

5+
import { sleep } from '../../tools/utils';
36
import { runScript } from './resource_tracking_script_builder';
47

58
/**
@@ -86,4 +89,34 @@ describe('Driver Resources', () => {
8689
});
8790
});
8891
});
92+
93+
context('when 100s of operations are executed and complete', () => {
94+
beforeEach(function () {
95+
if (this.currentTest && typeof v8.queryObjects !== 'function') {
96+
this.currentTest.skipReason = 'Test requires v8.queryObjects API to count Promises';
97+
this.currentTest?.skip();
98+
}
99+
});
100+
101+
let client;
102+
beforeEach(async function () {
103+
client = this.configuration.newClient();
104+
});
105+
106+
afterEach(async function () {
107+
await client.close();
108+
});
109+
110+
it('does not leave behind additional promises', async () => {
111+
const test = client.db('test').collection('test');
112+
const promiseCountBefore = v8.queryObjects(Promise, { format: 'count' });
113+
for (let i = 0; i < 100; i++) {
114+
await test.findOne();
115+
}
116+
await sleep(10);
117+
const promiseCountAfter = v8.queryObjects(Promise, { format: 'count' });
118+
119+
expect(promiseCountAfter).to.be.within(promiseCountBefore - 5, promiseCountBefore + 5);
120+
});
121+
});
89122
});

0 commit comments

Comments
 (0)