Skip to content

Commit 9a8fdb2

Browse files
nbbeekendurran
andauthored
fix(NODE-5588): recursive calls to next cause memory leak (#3841)
Co-authored-by: Durran Jordan <[email protected]>
1 parent e9a5079 commit 9a8fdb2

File tree

2 files changed

+69
-76
lines changed

2 files changed

+69
-76
lines changed

Diff for: src/cursor/abstract_cursor.ts

+65-74
Original file line numberDiff line numberDiff line change
@@ -220,6 +220,11 @@ export abstract class AbstractCursor<
220220
return this[kId] ?? undefined;
221221
}
222222

223+
/** @internal */
224+
get isDead() {
225+
return (this[kId]?.isZero() ?? false) || this[kClosed] || this[kKilled];
226+
}
227+
223228
/** @internal */
224229
get client(): MongoClient {
225230
return this[kClient];
@@ -671,7 +676,7 @@ export abstract class AbstractCursor<
671676
return cleanupCursor(this, { error }, () => callback(error, undefined));
672677
}
673678

674-
if (cursorIsDead(this)) {
679+
if (this.isDead) {
675680
return cleanupCursor(this, undefined, () => callback());
676681
}
677682

@@ -701,96 +706,82 @@ async function next<T>(
701706
transform: boolean;
702707
}
703708
): Promise<T | null> {
704-
const cursorId = cursor[kId];
705709
if (cursor.closed) {
706710
return null;
707711
}
708712

709-
if (cursor[kDocuments].length !== 0) {
710-
const doc = cursor[kDocuments].shift();
713+
do {
714+
if (cursor[kId] == null) {
715+
// All cursors must operate within a session, one must be made implicitly if not explicitly provided
716+
await promisify(cursor[kInit].bind(cursor))();
717+
}
718+
719+
if (cursor[kDocuments].length !== 0) {
720+
const doc = cursor[kDocuments].shift();
711721

712-
if (doc != null && transform && cursor[kTransform]) {
713-
try {
714-
return cursor[kTransform](doc);
715-
} catch (error) {
716-
await cleanupCursorAsync(cursor, { error, needsToEmitClosed: true }).catch(() => {
722+
if (doc != null && transform && cursor[kTransform]) {
723+
try {
724+
return cursor[kTransform](doc);
725+
} catch (error) {
717726
// `cleanupCursorAsync` should never throw, but if it does we want to throw the original
718727
// error instead.
719-
});
720-
throw error;
728+
await cleanupCursorAsync(cursor, { error, needsToEmitClosed: true }).catch(() => null);
729+
throw error;
730+
}
721731
}
722-
}
723732

724-
return doc;
725-
}
733+
return doc;
734+
}
726735

727-
if (cursorId == null) {
728-
// All cursors must operate within a session, one must be made implicitly if not explicitly provided
729-
const init = promisify(cb => cursor[kInit](cb));
730-
await init();
731-
return next(cursor, { blocking, transform });
732-
}
736+
if (cursor.isDead) {
737+
// if the cursor is dead, we clean it up
738+
// cleanupCursorAsync should never throw, but if it does it indicates a bug in the driver
739+
// and we should surface the error
740+
await cleanupCursorAsync(cursor, {});
741+
return null;
742+
}
733743

734-
if (cursorIsDead(cursor)) {
735-
// if the cursor is dead, we clean it up
736-
// cleanupCursorAsync should never throw, but if it does it indicates a bug in the driver
737-
// and we should surface the error
738-
await cleanupCursorAsync(cursor, {});
739-
return null;
740-
}
744+
// otherwise need to call getMore
745+
const batchSize = cursor[kOptions].batchSize || 1000;
741746

742-
// otherwise need to call getMore
743-
const batchSize = cursor[kOptions].batchSize || 1000;
744-
const getMore = promisify((batchSize: number, cb: Callback<Document | undefined>) =>
745-
cursor._getMore(batchSize, cb)
746-
);
747-
748-
let response: Document | undefined;
749-
try {
750-
response = await getMore(batchSize);
751-
} catch (error) {
752-
if (error) {
753-
await cleanupCursorAsync(cursor, { error }).catch(() => {
754-
// `cleanupCursorAsync` should never throw, but if it does we want to throw the original
755-
// error instead.
756-
});
747+
try {
748+
const response = await promisify(cursor._getMore.bind(cursor))(batchSize);
749+
750+
if (response) {
751+
const cursorId =
752+
typeof response.cursor.id === 'number'
753+
? Long.fromNumber(response.cursor.id)
754+
: typeof response.cursor.id === 'bigint'
755+
? Long.fromBigInt(response.cursor.id)
756+
: response.cursor.id;
757+
758+
cursor[kDocuments].pushMany(response.cursor.nextBatch);
759+
cursor[kId] = cursorId;
760+
}
761+
} catch (error) {
762+
// `cleanupCursorAsync` should never throw, but if it does we want to throw the original
763+
// error instead.
764+
await cleanupCursorAsync(cursor, { error }).catch(() => null);
757765
throw error;
758766
}
759-
}
760-
761-
if (response) {
762-
const cursorId =
763-
typeof response.cursor.id === 'number'
764-
? Long.fromNumber(response.cursor.id)
765-
: typeof response.cursor.id === 'bigint'
766-
? Long.fromBigInt(response.cursor.id)
767-
: response.cursor.id;
768-
769-
cursor[kDocuments].pushMany(response.cursor.nextBatch);
770-
cursor[kId] = cursorId;
771-
}
772-
773-
if (cursorIsDead(cursor)) {
774-
// If we successfully received a response from a cursor BUT the cursor indicates that it is exhausted,
775-
// we intentionally clean up the cursor to release its session back into the pool before the cursor
776-
// is iterated. This prevents a cursor that is exhausted on the server from holding
777-
// onto a session indefinitely until the AbstractCursor is iterated.
778-
//
779-
// cleanupCursorAsync should never throw, but if it does it indicates a bug in the driver
780-
// and we should surface the error
781-
await cleanupCursorAsync(cursor, {});
782-
}
783767

784-
if (cursor[kDocuments].length === 0 && blocking === false) {
785-
return null;
786-
}
768+
if (cursor.isDead) {
769+
// If we successfully received a response from a cursor BUT the cursor indicates that it is exhausted,
770+
// we intentionally clean up the cursor to release its session back into the pool before the cursor
771+
// is iterated. This prevents a cursor that is exhausted on the server from holding
772+
// onto a session indefinitely until the AbstractCursor is iterated.
773+
//
774+
// cleanupCursorAsync should never throw, but if it does it indicates a bug in the driver
775+
// and we should surface the error
776+
await cleanupCursorAsync(cursor, {});
777+
}
787778

788-
return next(cursor, { blocking, transform });
789-
}
779+
if (cursor[kDocuments].length === 0 && blocking === false) {
780+
return null;
781+
}
782+
} while (!cursor.isDead || cursor[kDocuments].length !== 0);
790783

791-
function cursorIsDead(cursor: AbstractCursor): boolean {
792-
const cursorId = cursor[kId];
793-
return !!cursorId && cursorId.isZero();
784+
return null;
794785
}
795786

796787
const cleanupCursorAsync = promisify(cleanupCursor);

Diff for: src/cursor/find_cursor.ts

+4-2
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import type { Document } from '../bson';
1+
import { type Document, Long } from '../bson';
22
import { MongoInvalidArgumentError, MongoTailableCursorError } from '../error';
33
import type { ExplainVerbosityLike } from '../explain';
44
import type { MongoClient } from '../mongo_client';
@@ -101,7 +101,9 @@ export class FindCursor<TSchema = any> extends AbstractCursor<TSchema> {
101101
limit && limit > 0 && numReturned + batchSize > limit ? limit - numReturned : batchSize;
102102

103103
if (batchSize <= 0) {
104-
this.close().finally(() => callback());
104+
this.close().finally(() =>
105+
callback(undefined, { cursor: { id: Long.ZERO, nextBatch: [] } })
106+
);
105107
return;
106108
}
107109
}

0 commit comments

Comments
 (0)