Skip to content

Commit fe50fe1

Browse files
committed
wip
1 parent a6cc438 commit fe50fe1

23 files changed

+172
-128
lines changed

.evergreen/run-tests.sh

+1
Original file line numberDiff line numberDiff line change
@@ -65,4 +65,5 @@ export MONGODB_URI=${MONGODB_URI}
6565
export LOAD_BALANCER=${LOAD_BALANCER}
6666
export TEST_CSFLE=${TEST_CSFLE}
6767
export COMPRESSOR=${COMPRESSOR}
68+
export NODE_OPTIONS="${NODE_OPTIONS} --trace-uncaught"
6869
npm run "${TEST_NPM_SCRIPT}"

src/change_stream.ts

+3-1
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ import type { ReadPreference } from './read_preference';
2222
import { type AsyncDisposable, configureResourceManagement } from './resource_management';
2323
import type { ServerSessionId } from './sessions';
2424
import { CSOTTimeoutContext, type TimeoutContext } from './timeout';
25-
import { filterOptions, getTopology, type MongoDBNamespace, noop, squashError } from './utils';
25+
import { filterOptions, getTopology, type MongoDBNamespace, squashError } from './utils';
2626

2727
const CHANGE_STREAM_OPTIONS = [
2828
'resumeAfter',
@@ -664,6 +664,8 @@ export class ChangeStream<
664664
this.isClosed = false;
665665
this.mode = false;
666666

667+
this.on('error', () => null);
668+
667669
// Listen for any `change` listeners being added to ChangeStream
668670
this.on('newListener', eventName => {
669671
if (eventName === 'change' && this.cursor && this.listenerCount('change') === 0) {

src/client-side-encryption/state_machine.ts

+2-6
Original file line numberDiff line numberDiff line change
@@ -351,12 +351,8 @@ export class StateMachine {
351351
let socket: tls.TLSSocket;
352352

353353
function destroySockets() {
354-
for (const sock of [socket, netSocket]) {
355-
if (sock) {
356-
sock.removeAllListeners();
357-
sock.destroy();
358-
}
359-
}
354+
socket?.destroy();
355+
netSocket?.destroy();
360356
}
361357

362358
function onerror(cause: Error) {

src/cmap/connect.ts

+1
Original file line numberDiff line numberDiff line change
@@ -389,6 +389,7 @@ export async function makeSocket(
389389

390390
addAbortSignalToStream(closeSignal, socket);
391391

392+
socket.unref();
392393
socket.setKeepAlive(true, 300000);
393394
socket.setTimeout(connectTimeoutMS);
394395
socket.setNoDelay(noDelay);

src/cmap/connection.ts

+9-1
Original file line numberDiff line numberDiff line change
@@ -298,6 +298,14 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
298298
);
299299
}
300300

301+
unref() {
302+
this.socket.unref();
303+
}
304+
305+
ref() {
306+
this.socket.ref();
307+
}
308+
301309
public markAvailable(): void {
302310
this.lastUseTime = now();
303311
}
@@ -353,7 +361,7 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
353361
return;
354362
}
355363

356-
this.socket.destroy();
364+
if (!this.socket.destroyed) this.socket.destroy();
357365
this.error = error;
358366

359367
this.dataEvents?.throw(error).then(undefined, squashError);

src/cmap/connection_pool.ts

+3
Original file line numberDiff line numberDiff line change
@@ -413,6 +413,8 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
413413
if (!this.checkedOut.has(connection)) {
414414
return;
415415
}
416+
417+
connection.unref();
416418
const poolClosed = this.closed;
417419
const stale = this.connectionIsStale(connection);
418420
const willDestroy = !!(poolClosed || stale || connection.closed);
@@ -788,6 +790,7 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
788790
);
789791

790792
this.waitQueue.shift();
793+
connection.ref();
791794
waitQueueMember.resolve(connection);
792795
}
793796
}

src/mongo_client.ts

+9-8
Original file line numberDiff line numberDiff line change
@@ -691,14 +691,15 @@ export class MongoClient extends TypedEventEmitter<MongoClientEvents> implements
691691

692692
/* @internal */
693693
private async _close(force = false): Promise<void> {
694-
this.closeController.abort();
695-
// There's no way to set hasBeenClosed back to false
696-
Object.defineProperty(this.s, 'hasBeenClosed', {
697-
value: true,
698-
enumerable: true,
699-
configurable: false,
700-
writable: false
701-
});
694+
try {
695+
this.closeController.abort();
696+
// There's no way to set hasBeenClosed back to false
697+
Object.defineProperty(this.s, 'hasBeenClosed', {
698+
value: true,
699+
enumerable: true,
700+
configurable: false,
701+
writable: false
702+
});
702703

703704
const activeCursorCloses = Array.from(this.s.activeCursors, cursor => cursor.close());
704705
this.s.activeCursors.clear();

src/sdam/monitor.ts

+8-14
Original file line numberDiff line numberDiff line change
@@ -384,20 +384,13 @@ function checkServer(monitor: Monitor, callback: Callback<Document | null>) {
384384
}
385385

386386
// connecting does an implicit `hello`
387-
(async () => {
387+
const makeMonitoringConnection = async () => {
388388
const socket = await makeSocket(monitor.connectOptions, monitor.closeSignal);
389389
const connection = makeConnection(monitor.connectOptions, socket);
390390
// The start time is after socket creation but before the handshake
391391
start = now();
392392
try {
393393
await performInitialHandshake(connection, monitor.connectOptions, monitor.closeSignal);
394-
return connection;
395-
} catch (error) {
396-
connection.destroy();
397-
throw error;
398-
}
399-
})().then(
400-
connection => {
401394
if (isInCloseState(monitor)) {
402395
connection.destroy();
403396
return;
@@ -417,15 +410,16 @@ function checkServer(monitor: Monitor, callback: Callback<Document | null>) {
417410
useStreamingProtocol(monitor, connection.hello?.topologyVersion)
418411
)
419412
);
420-
421-
callback(undefined, connection.hello);
422-
},
423-
error => {
413+
return connection.hello;
414+
} catch (error) {
415+
connection.destroy();
424416
monitor.connection = null;
425417
awaited = false;
426-
onHeartbeatFailed(error);
418+
throw error;
427419
}
428-
);
420+
};
421+
422+
makeMonitoringConnection().then(callback.bind(undefined, undefined), onHeartbeatFailed);
429423
}
430424

431425
function monitorServer(monitor: Monitor) {

test/integration/change-streams/change_stream.test.ts

+14-2
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import {
1818
MongoChangeStreamError,
1919
type MongoClient,
2020
MongoServerError,
21+
promiseWithResolvers,
2122
ReadPreference,
2223
type ResumeToken
2324
} from '../../mongodb';
@@ -62,6 +63,7 @@ describe('Change Streams', function () {
6263
await csDb.createCollection('test').catch(() => null);
6364
collection = csDb.collection('test');
6465
changeStream = collection.watch();
66+
changeStream.once('error', error => this.error(error));
6567
});
6668

6769
afterEach(async () => {
@@ -695,10 +697,18 @@ describe('Change Streams', function () {
695697
async test() {
696698
await initIteratorMode(changeStream);
697699

700+
const { promise, resolve, reject } = promiseWithResolvers<void>();
701+
698702
const outStream = new PassThrough({ objectMode: true });
699703

700-
// @ts-expect-error: transform requires a Document return type
701-
changeStream.stream({ transform: JSON.stringify }).pipe(outStream);
704+
const csStream = changeStream
705+
// @ts-expect-error: transform requires a Document return type
706+
.stream({ transform: JSON.stringify });
707+
708+
csStream.once('error', reject).pipe(outStream).once('error', reject);
709+
710+
outStream.on('close', resolve);
711+
csStream.on('close', resolve);
702712

703713
const willBeData = once(outStream, 'data');
704714

@@ -709,6 +719,8 @@ describe('Change Streams', function () {
709719
expect(parsedEvent).to.have.nested.property('fullDocument.a', 1);
710720

711721
outStream.destroy();
722+
csStream.destroy();
723+
await promise;
712724
}
713725
});
714726

test/integration/client-side-encryption/client_side_encryption.prose.18.azure_kms_mock_server.test.ts

+19-6
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,8 @@ const metadata: MongoDBMetadataUI = {
3030
}
3131
};
3232

33+
const closeSignal = new AbortController().signal;
34+
3335
context('Azure KMS Mock Server Tests', function () {
3436
context('Case 1: Success', metadata, function () {
3537
// Do not set an ``X-MongoDB-HTTP-TestParams`` header.
@@ -44,7 +46,7 @@ context('Azure KMS Mock Server Tests', function () {
4446
// 5. The token will have a resource of ``"https://vault.azure.net"``
4547

4648
it('returns a properly formatted access token', async () => {
47-
const credentials = await fetchAzureKMSToken(new KMSRequestOptions());
49+
const credentials = await fetchAzureKMSToken(new KMSRequestOptions(), closeSignal);
4850
expect(credentials).to.have.property('accessToken', 'magic-cookie');
4951
});
5052
});
@@ -59,7 +61,10 @@ context('Azure KMS Mock Server Tests', function () {
5961
// The test case should ensure that this error condition is handled gracefully.
6062

6163
it('returns an error', async () => {
62-
const error = await fetchAzureKMSToken(new KMSRequestOptions('empty-json')).catch(e => e);
64+
const error = await fetchAzureKMSToken(
65+
new KMSRequestOptions('empty-json'),
66+
closeSignal
67+
).catch(e => e);
6368

6469
expect(error).to.be.instanceof(MongoCryptAzureKMSRequestError);
6570
});
@@ -74,7 +79,9 @@ context('Azure KMS Mock Server Tests', function () {
7479
// The test case should ensure that this error condition is handled gracefully.
7580

7681
it('returns an error', async () => {
77-
const error = await fetchAzureKMSToken(new KMSRequestOptions('bad-json')).catch(e => e);
82+
const error = await fetchAzureKMSToken(new KMSRequestOptions('bad-json'), closeSignal).catch(
83+
e => e
84+
);
7885

7986
expect(error).to.be.instanceof(MongoCryptAzureKMSRequestError);
8087
});
@@ -89,7 +96,9 @@ context('Azure KMS Mock Server Tests', function () {
8996
// 2. The response body is unspecified.
9097
// The test case should ensure that this error condition is handled gracefully.
9198
it('returns an error', async () => {
92-
const error = await fetchAzureKMSToken(new KMSRequestOptions('404')).catch(e => e);
99+
const error = await fetchAzureKMSToken(new KMSRequestOptions('404'), closeSignal).catch(
100+
e => e
101+
);
93102

94103
expect(error).to.be.instanceof(MongoCryptAzureKMSRequestError);
95104
});
@@ -104,7 +113,9 @@ context('Azure KMS Mock Server Tests', function () {
104113
// 2. The response body is unspecified.
105114
// The test case should ensure that this error condition is handled gracefully.
106115
it('returns an error', async () => {
107-
const error = await fetchAzureKMSToken(new KMSRequestOptions('500')).catch(e => e);
116+
const error = await fetchAzureKMSToken(new KMSRequestOptions('500'), closeSignal).catch(
117+
e => e
118+
);
108119

109120
expect(error).to.be.instanceof(MongoCryptAzureKMSRequestError);
110121
});
@@ -117,7 +128,9 @@ context('Azure KMS Mock Server Tests', function () {
117128
// The HTTP response from the ``fake_azure`` server will take at least 1000 seconds
118129
// to complete. The request should fail with a timeout.
119130
it('returns an error after the request times out', async () => {
120-
const error = await fetchAzureKMSToken(new KMSRequestOptions('slow')).catch(e => e);
131+
const error = await fetchAzureKMSToken(new KMSRequestOptions('slow'), closeSignal).catch(
132+
e => e
133+
);
121134

122135
expect(error).to.be.instanceof(MongoCryptAzureKMSRequestError);
123136
});

test/integration/client-side-encryption/driver.test.ts

+4-2
Original file line numberDiff line numberDiff line change
@@ -829,12 +829,14 @@ describe('CSOT', function () {
829829
});
830830

831831
describe('State machine', function () {
832-
const stateMachine = new StateMachine({} as any);
832+
const signal = new AbortController().signal;
833+
const stateMachine = new StateMachine({} as any, undefined, signal);
833834

834835
const timeoutContext = () => ({
835836
timeoutContext: new CSOTTimeoutContext({
836837
timeoutMS: 1000,
837-
serverSelectionTimeoutMS: 30000
838+
serverSelectionTimeoutMS: 30000,
839+
closeSignal: signal
838840
})
839841
});
840842

test/integration/client-side-operations-timeout/client_side_operations_timeout.unit.test.ts

+38-31
Original file line numberDiff line numberDiff line change
@@ -136,50 +136,57 @@ describe('CSOT spec unit tests', function () {
136136
it('the kms request times out through remainingTimeMS', async function () {
137137
const timeoutContext = new CSOTTimeoutContext({
138138
timeoutMS: 500,
139-
serverSelectionTimeoutMS: 30000
139+
serverSelectionTimeoutMS: 30000,
140+
closeSignal: new AbortController().signal
140141
});
141142
const err = await stateMachine.kmsRequest(request, { timeoutContext }).catch(e => e);
142143
expect(err).to.be.instanceOf(MongoOperationTimeoutError);
143144
expect(err.errmsg).to.equal('KMS request timed out');
144145
});
145146
});
146147

147-
context('when StateMachine.kmsRequest() is not passed a `CSOTimeoutContext`', function () {
148-
let clock: sinon.SinonFakeTimers;
149-
let timerSandbox: sinon.SinonSandbox;
148+
// todo: we have to clean up the TLS socket made here.
149+
context.skip(
150+
'when StateMachine.kmsRequest() is not passed a `CSOTimeoutContext`',
151+
function () {
152+
let clock: sinon.SinonFakeTimers;
153+
let timerSandbox: sinon.SinonSandbox;
150154

151-
let sleep;
155+
let sleep;
152156

153-
beforeEach(async function () {
154-
sinon.stub(TLSSocket.prototype, 'connect').callsFake(function (..._args) {
155-
clock.tick(30000);
157+
beforeEach(async function () {
158+
sinon.stub(TLSSocket.prototype, 'connect').callsFake(function (..._args) {
159+
clock.tick(30000);
160+
});
161+
timerSandbox = createTimerSandbox();
162+
clock = sinon.useFakeTimers();
163+
sleep = promisify(setTimeout);
156164
});
157-
timerSandbox = createTimerSandbox();
158-
clock = sinon.useFakeTimers();
159-
sleep = promisify(setTimeout);
160-
});
161165

162-
afterEach(async function () {
163-
if (clock) {
164-
timerSandbox.restore();
165-
clock.restore();
166-
clock = undefined;
167-
}
168-
sinon.restore();
169-
});
166+
afterEach(async function () {
167+
if (clock) {
168+
timerSandbox.restore();
169+
clock.restore();
170+
clock = undefined;
171+
}
172+
sinon.restore();
173+
});
170174

171-
it('the kms request does not timeout within 30 seconds', async function () {
172-
const sleepingFn = async () => {
173-
await sleep(30000);
174-
throw Error('Slept for 30s');
175-
};
175+
it('the kms request does not timeout within 30 seconds', async function () {
176+
const sleepingFn = async () => {
177+
await sleep(30000);
178+
throw Error('Slept for 30s');
179+
};
176180

177-
const err$ = Promise.all([stateMachine.kmsRequest(request), sleepingFn()]).catch(e => e);
178-
clock.tick(30000);
179-
const err = await err$;
180-
expect(err.message).to.equal('Slept for 30s');
181-
});
182-
});
181+
const err$ = Promise.all([stateMachine.kmsRequest(request), sleepingFn()]).catch(
182+
e => e
183+
);
184+
clock.tick(30000);
185+
const err = await err$;
186+
expect(err.message).to.equal('Slept for 30s');
187+
});
188+
}
189+
);
183190
});
184191

185192
describe('Auto Encryption', function () {

test/integration/crud/misc_cursors.test.js

+1-1
Original file line numberDiff line numberDiff line change
@@ -1872,7 +1872,7 @@ describe('Cursor', function () {
18721872
expect(cursor).to.have.property('closed', true);
18731873

18741874
const error = await rejectedEarlyBecauseClientClosed;
1875-
expect(error).to.be.null; // TODO(NODE-6632): This should throw again after the client signal aborts the in-progress next call
1875+
expect(error).to.be.instanceOf(Error); // TODO: Whatever the MongoClient aborts with.
18761876
});
18771877

18781878
it('shouldAwaitData', {

test/mocha_mongodb.json

-1
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717
"recursive": true,
1818
"timeout": 60000,
1919
"failZero": true,
20-
"reporter": "test/tools/reporter/mongodb_reporter.js",
2120
"sort": true,
2221
"color": true,
2322
"ignore": [

0 commit comments

Comments
 (0)