Skip to content

Commit 3b95935

Browse files
committed
feat(NODE-6882): close checked out connection in MongoClient.close()
1 parent cb88b05 commit 3b95935

File tree

11 files changed

+111
-22
lines changed

11 files changed

+111
-22
lines changed

src/cmap/connection_pool.ts

+11
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ import {
1717
} from '../constants';
1818
import {
1919
type AnyError,
20+
ConnectionPoolClosedError,
2021
type MongoError,
2122
MongoInvalidArgumentError,
2223
MongoMissingCredentialsError,
@@ -489,6 +490,16 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
489490
}
490491
}
491492

493+
closeCheckedOutConnections() {
494+
for (const conn of this.checkedOut) {
495+
this.emitAndLog(
496+
ConnectionPool.CONNECTION_CLOSED,
497+
new ConnectionClosedEvent(this, conn, 'poolClosed')
498+
);
499+
conn.onError(new ConnectionPoolClosedError());
500+
}
501+
}
502+
492503
/** Close the pool */
493504
close(): void {
494505
if (this.closed) {

src/error.ts

+56
Original file line numberDiff line numberDiff line change
@@ -1018,6 +1018,62 @@ export class MongoTopologyClosedError extends MongoAPIError {
10181018
}
10191019
}
10201020

1021+
/**
1022+
* An error generated when the MongoClient is closed and async
1023+
* operations are interrupted.
1024+
*
1025+
* @public
1026+
* @category Error
1027+
*/
1028+
export class MongoClientClosedError extends MongoAPIError {
1029+
/**
1030+
* **Do not use this constructor!**
1031+
*
1032+
* Meant for internal use only.
1033+
*
1034+
* @remarks
1035+
* This class is only meant to be constructed within the driver. This constructor is
1036+
* not subject to semantic versioning compatibility guarantees and may change at any time.
1037+
*
1038+
* @public
1039+
**/
1040+
constructor(message = 'MongoClient is closed') {
1041+
super(message);
1042+
}
1043+
1044+
override get name(): string {
1045+
return 'MongoClientClosedError';
1046+
}
1047+
}
1048+
1049+
/**
1050+
* An error generated when a ConnectionPool is closed and async
1051+
* operations are interrupted.
1052+
*
1053+
* @public
1054+
* @category Error
1055+
*/
1056+
export class ConnectionPoolClosedError extends MongoAPIError {
1057+
/**
1058+
* **Do not use this constructor!**
1059+
*
1060+
* Meant for internal use only.
1061+
*
1062+
* @remarks
1063+
* This class is only meant to be constructed within the driver. This constructor is
1064+
* not subject to semantic versioning compatibility guarantees and may change at any time.
1065+
*
1066+
* @public
1067+
**/
1068+
constructor(message = 'ConnectionPool is closed') {
1069+
super(message);
1070+
}
1071+
1072+
override get name(): string {
1073+
return 'ConnectionPoolClosedError';
1074+
}
1075+
}
1076+
10211077
/** @public */
10221078
export interface MongoNetworkErrorOptions {
10231079
/** Indicates the timeout happened before a connection handshake completed */

src/index.ts

+2
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ export {
4545
export { ClientEncryption } from './client-side-encryption/client_encryption';
4646
export { ChangeStreamCursor } from './cursor/change_stream_cursor';
4747
export {
48+
ConnectionPoolClosedError,
4849
MongoAPIError,
4950
MongoAWSError,
5051
MongoAzureError,
@@ -53,6 +54,7 @@ export {
5354
MongoClientBulkWriteCursorError,
5455
MongoClientBulkWriteError,
5556
MongoClientBulkWriteExecutionError,
57+
MongoClientClosedError,
5658
MongoCompatibilityError,
5759
MongoCursorExhaustedError,
5860
MongoCursorInUseError,

src/mongo_client.ts

+5-1
Original file line numberDiff line numberDiff line change
@@ -636,7 +636,9 @@ export class MongoClient extends TypedEventEmitter<MongoClientEvents> implements
636636
}
637637

638638
/**
639-
* Cleans up client-side resources used by the MongoCLient and . This includes:
639+
* Cleans up client-side resources used by the MongoClient.
640+
*
641+
* This includes:
640642
*
641643
* - Closes all open, unused connections (see note).
642644
* - Ends all in-use sessions with {@link ClientSession#endSession|ClientSession.endSession()}.
@@ -672,6 +674,8 @@ export class MongoClient extends TypedEventEmitter<MongoClientEvents> implements
672674
writable: false
673675
});
674676

677+
this.topology?.closeCheckedOutConnections();
678+
675679
const activeCursorCloses = Array.from(this.s.activeCursors, cursor => cursor.close());
676680
this.s.activeCursors.clear();
677681

src/sdam/server.ts

+5-1
Original file line numberDiff line numberDiff line change
@@ -246,8 +246,12 @@ export class Server extends TypedEventEmitter<ServerEvents> {
246246
}
247247
}
248248

249+
closeCheckedOutConnections() {
250+
return this.pool.closeCheckedOutConnections();
251+
}
252+
249253
/** Destroy the server connection */
250-
destroy(): void {
254+
close(): void {
251255
if (this.s.state === STATE_CLOSED) {
252256
return;
253257
}

src/sdam/topology.ts

+10-4
Original file line numberDiff line numberDiff line change
@@ -490,14 +490,20 @@ export class Topology extends TypedEventEmitter<TopologyEvents> {
490490
}
491491
}
492492

493+
closeCheckedOutConnections() {
494+
for (const server of this.s.servers.values()) {
495+
return server.closeCheckedOutConnections();
496+
}
497+
}
498+
493499
/** Close this topology */
494500
close(): void {
495501
if (this.s.state === STATE_CLOSED || this.s.state === STATE_CLOSING) {
496502
return;
497503
}
498504

499505
for (const server of this.s.servers.values()) {
500-
destroyServer(server, this);
506+
closeServer(server, this);
501507
}
502508

503509
this.s.servers.clear();
@@ -791,12 +797,12 @@ export class Topology extends TypedEventEmitter<TopologyEvents> {
791797
}
792798

793799
/** Destroys a server, and removes all event listeners from the instance */
794-
function destroyServer(server: Server, topology: Topology) {
800+
function closeServer(server: Server, topology: Topology) {
795801
for (const event of LOCAL_SERVER_EVENTS) {
796802
server.removeAllListeners(event);
797803
}
798804

799-
server.destroy();
805+
server.close();
800806
topology.emitAndLog(
801807
Topology.SERVER_CLOSED,
802808
new ServerClosedEvent(topology.s.id, server.description.address)
@@ -903,7 +909,7 @@ function updateServers(topology: Topology, incomingServerDescription?: ServerDes
903909

904910
// prepare server for garbage collection
905911
if (server) {
906-
destroyServer(server, topology);
912+
closeServer(server, topology);
907913
}
908914
}
909915
}

test/integration/change-streams/change_stream.test.ts

+8-3
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@ describe('Change Streams', function () {
6363
await csDb.createCollection('test').catch(() => null);
6464
collection = csDb.collection('test');
6565
changeStream = collection.watch();
66+
changeStream.on('error', () => null);
6667
});
6768

6869
afterEach(async () => {
@@ -702,15 +703,19 @@ describe('Change Streams', function () {
702703

703704
const outStream = new PassThrough({ objectMode: true });
704705

705-
// @ts-expect-error: transform requires a Document return type
706-
changeStream.stream({ transform: JSON.stringify }).pipe(outStream);
706+
const transform = doc => ({ doc: JSON.stringify(doc) });
707+
changeStream
708+
.stream({ transform })
709+
.on('error', () => null)
710+
.pipe(outStream)
711+
.on('error', () => null);
707712

708713
const willBeData = once(outStream, 'data');
709714

710715
await collection.insertMany([{ a: 1 }]);
711716

712717
const [data] = await willBeData;
713-
const parsedEvent = JSON.parse(data);
718+
const parsedEvent = JSON.parse(data.doc);
714719
expect(parsedEvent).to.have.nested.property('fullDocument.a', 1);
715720

716721
outStream.destroy();

test/integration/crud/misc_cursors.test.js

+2-2
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ const { Writable } = require('stream');
1111
const { once, on } = require('events');
1212
const { setTimeout } = require('timers');
1313
const { ReadPreference } = require('../../mongodb');
14-
const { ServerType } = require('../../mongodb');
14+
const { ServerType, ConnectionPoolClosedError } = require('../../mongodb');
1515
const { formatSort } = require('../../mongodb');
1616

1717
describe('Cursor', function () {
@@ -1872,7 +1872,7 @@ describe('Cursor', function () {
18721872
expect(cursor).to.have.property('closed', true);
18731873

18741874
const error = await rejectedEarlyBecauseClientClosed;
1875-
expect(error).to.be.null; // TODO(NODE-6632): This should throw again after the client signal aborts the in-progress next call
1875+
expect(error).to.be.instanceOf(ConnectionPoolClosedError);
18761876
});
18771877

18781878
it('shouldAwaitData', {

test/integration/index_management.test.ts

+8-9
Original file line numberDiff line numberDiff line change
@@ -770,20 +770,19 @@ describe('Indexes', function () {
770770

771771
expect(events).to.be.an('array').with.lengthOf(1);
772772
expect(events[0]).nested.property('command.commitQuorum').to.equal(0);
773-
await collection.drop(err => {
774-
expect(err).to.not.exist;
775-
});
773+
await collection.drop();
776774
}
777775
};
778776
}
779777
it(
780778
'should run command with commitQuorum if specified on db.createIndex',
781-
commitQuorumTest((db, collection) =>
782-
db.createIndex(collection.collectionName, 'a', {
783-
// @ts-expect-error revaluate this?
784-
writeConcern: { w: 'majority' },
785-
commitQuorum: 0
786-
})
779+
commitQuorumTest(
780+
async (db, collection) =>
781+
await db.createIndex(collection.collectionName, 'a', {
782+
// @ts-expect-error revaluate this?
783+
writeConcern: { w: 'majority' },
784+
commitQuorum: 0
785+
})
787786
)
788787
);
789788
it(

test/integration/node-specific/examples/causal_consistency.test.js

+2-2
Original file line numberDiff line numberDiff line change
@@ -31,8 +31,8 @@ describe('examples(causal-consistency):', function () {
3131
it('supports causal consistency', async function () {
3232
const session = client.startSession({ causalConsistency: true });
3333

34-
collection.insertOne({ darmok: 'jalad' }, { session });
35-
collection.updateOne({ darmok: 'jalad' }, { $set: { darmok: 'tanagra' } }, { session });
34+
await collection.insertOne({ darmok: 'jalad' }, { session });
35+
await collection.updateOne({ darmok: 'jalad' }, { $set: { darmok: 'tanagra' } }, { session });
3636

3737
const results = await collection.find({}, { session }).toArray();
3838

test/unit/index.test.ts

+2
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ const EXPECTED_EXPORTS = [
3636
'ConnectionClosedEvent',
3737
'ConnectionCreatedEvent',
3838
'ConnectionPoolClearedEvent',
39+
'ConnectionPoolClosedError',
3940
'ConnectionPoolClosedEvent',
4041
'ConnectionPoolCreatedEvent',
4142
'ConnectionPoolMonitoringEvent',
@@ -71,6 +72,7 @@ const EXPECTED_EXPORTS = [
7172
'MongoClientBulkWriteCursorError',
7273
'MongoClientBulkWriteError',
7374
'MongoClientBulkWriteExecutionError',
75+
'MongoClientClosedError',
7476
'MongoCompatibilityError',
7577
'MongoCryptAzureKMSRequestError',
7678
'MongoCryptCreateDataKeyError',

0 commit comments

Comments
 (0)