diff --git a/src/mongo_client.ts b/src/mongo_client.ts index 1079260d9df..7e73b4c9b5e 100644 --- a/src/mongo_client.ts +++ b/src/mongo_client.ts @@ -318,6 +318,8 @@ export class MongoClient extends TypedEventEmitter { topology?: Topology; /** @internal */ readonly mongoLogger: MongoLogger; + /** @internal */ + private connectionLock?: Promise; /** * The consolidate, parsed, transformed and merged options. @@ -405,6 +407,28 @@ export class MongoClient extends TypedEventEmitter { * @see docs.mongodb.org/manual/reference/connection-string/ */ async connect(): Promise { + if (this.connectionLock) { + return this.connectionLock; + } + + try { + this.connectionLock = this._connect(); + await this.connectionLock; + } finally { + // release + this.connectionLock = undefined; + } + + return this; + } + + /** + * Create a topology to open the connection, must be locked to avoid topology leaks in concurrency scenario. + * Locking is enforced by the connect method. + * + * @internal + */ + private async _connect(): Promise { if (this.topology && this.topology.isConnected()) { return this; } diff --git a/test/integration/node-specific/mongo_client.test.ts b/test/integration/node-specific/mongo_client.test.ts index 4e0103b3490..f92ccee05ac 100644 --- a/test/integration/node-specific/mongo_client.test.ts +++ b/test/integration/node-specific/mongo_client.test.ts @@ -517,6 +517,65 @@ describe('class MongoClient', function () { ); }); + context('concurrent #connect()', () => { + let client: MongoClient; + let topologyOpenEvents; + + /** Keep track number of call to client connect to close as many as connect (otherwise leak_checker hook will failed) */ + let clientConnectCounter: number; + + /** + * Wrap the connect method of the client to keep track + * of number of times connect is called + */ + async function clientConnect() { + if (!client) { + return; + } + clientConnectCounter++; + return client.connect(); + } + + beforeEach(async function () { + client = this.configuration.newClient(); + topologyOpenEvents = []; + clientConnectCounter = 0; + client.on('open', event => topologyOpenEvents.push(event)); + }); + + afterEach(async function () { + // close `clientConnectCounter` times + const clientClosePromises = Array.from({ length: clientConnectCounter }, () => + client.close() + ); + await Promise.all(clientClosePromises); + }); + + it('parallel client connect calls only create one topology', async function () { + await Promise.all([clientConnect(), clientConnect(), clientConnect()]); + + expect(topologyOpenEvents).to.have.lengthOf(1); + expect(client.topology?.isConnected()).to.be.true; + }); + + it('when connect rejects lock is released regardless', async function () { + const internalConnectStub = sinon.stub(client, '_connect' as keyof MongoClient); + internalConnectStub.onFirstCall().rejects(new Error('cannot connect')); + + // first call rejected to simulate a connection failure + const error = await clientConnect().catch(error => error); + expect(error).to.match(/cannot connect/); + + internalConnectStub.restore(); + + // second call should connect + await clientConnect(); + + expect(topologyOpenEvents).to.have.lengthOf(1); + expect(client.topology?.isConnected()).to.be.true; + }); + }); + context('#close()', () => { let client: MongoClient; let db: Db;