Skip to content

Commit 7bbc783

Browse files
authored
fix: allow event loop to process during wait queue processing (#2541)
Running `processWaitQueue` on the next tick allows the event loop to process while the connection pool is processing large numbers of wait queue members. This also uncovered a few issues with timing in our tests, and in some cases our top-level API: - `commitTransaction` / `abortTransaction` use `maybePromise` now - `endSession` must wait for all the machinery behind the scenes to check out a connection and write a message before considering its job finished - internal calls to `kill` a cursor now await the the process of fully sending that command, even if they ignore the response NODE-2803
1 parent e225ee5 commit 7bbc783

File tree

7 files changed

+82
-88
lines changed

7 files changed

+82
-88
lines changed

src/cmap/connection_pool.ts

+3-8
Original file line numberDiff line numberDiff line change
@@ -283,7 +283,6 @@ export class ConnectionPool extends EventEmitter {
283283
return;
284284
}
285285

286-
// add this request to the wait queue
287286
const waitQueueMember: WaitQueueMember = { callback };
288287
const waitQueueTimeoutMS = this.options.waitQueueTimeoutMS;
289288
if (waitQueueTimeoutMS) {
@@ -299,11 +298,8 @@ export class ConnectionPool extends EventEmitter {
299298
}, waitQueueTimeoutMS);
300299
}
301300

302-
// place the member at the end of the wait queue
303301
this[kWaitQueue].push(waitQueueMember);
304-
305-
// process the wait queue
306-
processWaitQueue(this);
302+
setImmediate(() => processWaitQueue(this));
307303
}
308304

309305
/**
@@ -316,7 +312,6 @@ export class ConnectionPool extends EventEmitter {
316312
const stale = connectionIsStale(this, connection);
317313
const willDestroy = !!(poolClosed || stale || connection.closed);
318314

319-
// Properly adjust state of connection
320315
if (!willDestroy) {
321316
connection.markAvailable();
322317
this[kConnections].push(connection);
@@ -329,7 +324,7 @@ export class ConnectionPool extends EventEmitter {
329324
destroyConnection(this, connection, reason);
330325
}
331326

332-
processWaitQueue(this);
327+
setImmediate(() => processWaitQueue(this));
333328
}
334329

335330
/**
@@ -503,7 +498,7 @@ function createConnection(pool: ConnectionPool, callback?: Callback<Connection>)
503498

504499
// otherwise add it to the pool for later acquisition, and try to process the wait queue
505500
pool[kConnections].push(connection);
506-
processWaitQueue(pool);
501+
setImmediate(() => processWaitQueue(pool));
507502
});
508503
}
509504

src/cursor/core_cursor.ts

+15-10
Original file line numberDiff line numberDiff line change
@@ -701,9 +701,10 @@ function nextFunction(self: CoreCursor, callback: Callback) {
701701

702702
if (self.cursorState.limit > 0 && self.cursorState.currentLimit >= self.cursorState.limit) {
703703
// Ensure we kill the cursor on the server
704-
self.kill();
705-
// Set cursor in dead and notified state
706-
return setCursorDeadAndNotified(self, callback);
704+
return self.kill(() =>
705+
// Set cursor in dead and notified state
706+
setCursorDeadAndNotified(self, callback)
707+
);
707708
} else if (
708709
self.cursorState.cursorIndex === self.cursorState.documents.length &&
709710
!Long.ZERO.equals(cursorId)
@@ -775,9 +776,12 @@ function nextFunction(self: CoreCursor, callback: Callback) {
775776
} else {
776777
if (self.cursorState.limit > 0 && self.cursorState.currentLimit >= self.cursorState.limit) {
777778
// Ensure we kill the cursor on the server
778-
self.kill();
779-
// Set cursor in dead and notified state
780-
return setCursorDeadAndNotified(self, callback);
779+
self.kill(() =>
780+
// Set cursor in dead and notified state
781+
setCursorDeadAndNotified(self, callback)
782+
);
783+
784+
return;
781785
}
782786

783787
// Increment the current cursor limit
@@ -789,11 +793,12 @@ function nextFunction(self: CoreCursor, callback: Callback) {
789793
// Doc overflow
790794
if (!doc || doc.$err) {
791795
// Ensure we kill the cursor on the server
792-
self.kill();
793-
// Set cursor in dead and notified state
794-
return setCursorDeadAndNotified(self, () =>
795-
callback(new MongoError(doc ? doc.$err : undefined))
796+
self.kill(() =>
797+
// Set cursor in dead and notified state
798+
setCursorDeadAndNotified(self, () => callback(new MongoError(doc ? doc.$err : undefined)))
796799
);
800+
801+
return;
797802
}
798803

799804
// Transform the doc with passed in transformation method if provided

src/cursor/cursor.ts

+9-33
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
import { emitDeprecatedOptionWarning } from '../utils';
2-
import { PromiseProvider } from '../promise_provider';
32
import { ReadPreference, ReadPreferenceLike } from '../read_preference';
43
import { Transform, PassThrough } from 'stream';
54
import { deprecate } from 'util';
@@ -656,46 +655,23 @@ export class Cursor<
656655
forEach(iterator: (doc: Document) => void): Promise<Document>;
657656
forEach(iterator: (doc: Document) => void, callback: Callback): void;
658657
forEach(iterator: (doc: Document) => void, callback?: Callback): Promise<Document> | void {
659-
const Promise = PromiseProvider.get();
658+
if (typeof iterator !== 'function') {
659+
throw new TypeError('Missing required parameter `iterator`');
660+
}
661+
660662
// Rewind cursor state
661663
this.rewind();
662664

663665
// Set current cursor to INIT
664666
this.s.state = CursorState.INIT;
665667

666-
if (typeof callback === 'function') {
668+
return maybePromise(callback, done => {
667669
each(this, (err, doc) => {
668-
if (err) {
669-
callback(err);
670-
return false;
671-
}
672-
673-
if (doc != null) {
674-
iterator(doc);
675-
return true;
676-
}
677-
678-
if (doc == null) {
679-
callback(undefined);
680-
return false;
681-
}
670+
if (err) return done(err);
671+
if (doc != null) return iterator(doc);
672+
done();
682673
});
683-
} else {
684-
return new Promise<Document>((fulfill, reject) => {
685-
each(this, (err, doc) => {
686-
if (err) {
687-
reject(err);
688-
return false;
689-
} else if (doc == null) {
690-
fulfill();
691-
return false;
692-
} else {
693-
iterator(doc);
694-
return true;
695-
}
696-
});
697-
});
698-
}
674+
});
699675
}
700676

701677
/**

src/sessions.ts

+44-22
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,8 @@ export interface ClientSessionOptions {
5252
/** @public */
5353
export type WithTransactionCallback = (session: ClientSession) => Promise<any> | void;
5454

55+
const kServerSession = Symbol('serverSession');
56+
5557
/**
5658
* A class representing a client session on the server
5759
*
@@ -62,7 +64,6 @@ class ClientSession extends EventEmitter {
6264
topology: Topology;
6365
sessionPool: ServerSessionPool;
6466
hasEnded: boolean;
65-
serverSession?: ServerSession;
6667
clientOptions?: MongoClientOptions;
6768
supports: { causalConsistency: boolean };
6869
clusterTime?: ClusterTime;
@@ -71,6 +72,7 @@ class ClientSession extends EventEmitter {
7172
owner: symbol | CoreCursor;
7273
defaultTransactionOptions: TransactionOptions;
7374
transaction: Transaction;
75+
[kServerSession]?: ServerSession;
7476

7577
/**
7678
* Create a client session.
@@ -102,8 +104,8 @@ class ClientSession extends EventEmitter {
102104
this.topology = topology;
103105
this.sessionPool = sessionPool;
104106
this.hasEnded = false;
105-
this.serverSession = sessionPool.acquire();
106107
this.clientOptions = clientOptions;
108+
this[kServerSession] = undefined;
107109

108110
this.supports = {
109111
causalConsistency:
@@ -124,41 +126,61 @@ class ClientSession extends EventEmitter {
124126
return this.serverSession?.id;
125127
}
126128

129+
get serverSession(): ServerSession {
130+
if (this[kServerSession] == null) {
131+
this[kServerSession] = this.sessionPool.acquire();
132+
}
133+
134+
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
135+
return this[kServerSession]!;
136+
}
137+
127138
/**
128139
* Ends this session on the server
129140
*
130141
* @param options - Optional settings. Currently reserved for future use
131142
* @param callback - Optional callback for completion of this operation
132143
*/
133-
endSession(): void;
144+
endSession(): Promise<void>;
134145
endSession(callback: Callback<void>): void;
146+
endSession(options: Record<string, unknown>): Promise<void>;
135147
endSession(options: Record<string, unknown>, callback: Callback<void>): void;
136-
endSession(options?: Record<string, unknown> | Callback<void>, callback?: Callback<void>): void {
137-
if (typeof options === 'function') (callback = options as Callback), (options = {});
148+
endSession(
149+
options?: Record<string, unknown> | Callback<void>,
150+
callback?: Callback<void>
151+
): void | Promise<void> {
152+
if (typeof options === 'function') (callback = options), (options = {});
138153
options = options || {};
139154

140-
if (this.hasEnded) {
141-
if (typeof callback === 'function') callback();
142-
return;
143-
}
155+
return maybePromise(callback, done => {
156+
if (this.hasEnded) {
157+
return done();
158+
}
144159

145-
if (this.serverSession && this.inTransaction()) {
146-
this.abortTransaction(); // pass in callback?
147-
}
160+
const completeEndSession = () => {
161+
// release the server session back to the pool
162+
this.sessionPool.release(this.serverSession);
163+
this[kServerSession] = undefined;
148164

149-
// mark the session as ended, and emit a signal
150-
this.hasEnded = true;
151-
this.emit('ended', this);
165+
// mark the session as ended, and emit a signal
166+
this.hasEnded = true;
167+
this.emit('ended', this);
152168

153-
// release the server session back to the pool
154-
if (this.serverSession) {
155-
this.sessionPool.release(this.serverSession);
156-
}
169+
// spec indicates that we should ignore all errors for `endSessions`
170+
done();
171+
};
157172

158-
this.serverSession = undefined;
173+
if (this.serverSession && this.inTransaction()) {
174+
this.abortTransaction(err => {
175+
if (err) return done(err);
176+
completeEndSession();
177+
});
178+
179+
return;
180+
}
159181

160-
// spec indicates that we should ignore all errors for `endSessions`
161-
if (typeof callback === 'function') callback();
182+
completeEndSession();
183+
});
162184
}
163185

164186
/**

test/functional/cursor.test.js

+4-8
Original file line numberDiff line numberDiff line change
@@ -3987,21 +3987,17 @@ describe('Cursor', function () {
39873987
}
39883988
);
39893989

3990-
it('should return a promise when no callback supplied to forEach method', function (done) {
3990+
it('should return a promise when no callback supplied to forEach method', function () {
39913991
const configuration = this.configuration;
39923992
const client = configuration.newClient({ w: 1 }, { poolSize: 1, auto_reconnect: false });
39933993

3994-
client.connect(function (err, client) {
3995-
expect(err).to.not.exist;
3994+
return client.connect(() => {
39963995
const db = client.db(configuration.db);
39973996
const collection = db.collection('cursor_session_tests2');
3998-
39993997
const cursor = collection.find();
4000-
const promise = cursor.forEach();
3998+
const promise = cursor.forEach(() => {});
40013999
expect(promise).to.exist.and.to.be.an.instanceof(Promise);
4002-
promise.catch(() => {});
4003-
4004-
cursor.close(() => client.close(() => done()));
4000+
return promise.then(() => cursor.close()).then(() => client.close());
40054001
});
40064002
});
40074003

test/functional/spec-runner/index.js

+6-5
Original file line numberDiff line numberDiff line change
@@ -344,11 +344,12 @@ function runTestSuiteTest(configuration, spec, context) {
344344
throw err;
345345
})
346346
.then(() => {
347-
if (session0) session0.endSession();
348-
if (session1) session1.endSession();
349-
350-
return validateExpectations(context.commandEvents, spec, savedSessionData);
351-
});
347+
const promises = [];
348+
if (session0) promises.push(session0.endSession());
349+
if (session1) promises.push(session1.endSession());
350+
return Promise.all(promises);
351+
})
352+
.then(() => validateExpectations(context.commandEvents, spec, savedSessionData));
352353
});
353354
}
354355

test/unit/cmap/connection_pool.test.js

+1-2
Original file line numberDiff line numberDiff line change
@@ -139,8 +139,7 @@ describe('Connection Pool', function () {
139139
sinon.stub(pool, 'availableConnectionCount').get(() => 0);
140140
pool.checkIn(conn);
141141

142-
expect(pool).property('waitQueueSize').to.equal(0);
143-
142+
setImmediate(() => expect(pool).property('waitQueueSize').to.equal(0));
144143
done();
145144
});
146145
});

0 commit comments

Comments
 (0)