Skip to content

fix(NODE-5052): prevent cursor and changestream close logic from running more than once #3562

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 30 commits into from
Feb 14, 2023
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
cbebd10
fix(NODE-5052): prevent cursor and changestream close logic from runn…
nbbeeken Feb 9, 2023
fc3ec7b
fix: keep cleaning up session, check for unhandled errors
nbbeeken Feb 9, 2023
3f02916
move flag setting to after clean up
nbbeeken Feb 9, 2023
0a3bd27
prevent mocha from ending early
nbbeeken Feb 9, 2023
c9db9ef
use uncaughtExceptionMonitor
nbbeeken Feb 9, 2023
c24d62d
do _something_ for unknown origin
nbbeeken Feb 9, 2023
465c058
reset arrays in after hook
nbbeeken Feb 9, 2023
b2238f6
removeAllListeners should be called whenever cursor closes
nbbeeken Feb 9, 2023
4404f73
fix up legacy tests
nbbeeken Feb 9, 2023
3ae17cf
test cleanup
nbbeeken Feb 10, 2023
aecb077
fix lint
nbbeeken Feb 10, 2023
28d6ee5
rollup cleanup into helpers
nbbeeken Feb 10, 2023
53c1314
fix tests
nbbeeken Feb 10, 2023
6387545
reset abstract_cursor
nbbeeken Feb 10, 2023
796ada5
gate on killed/closed
nbbeeken Feb 10, 2023
05acf5d
gate on killed/closed round 2
nbbeeken Feb 10, 2023
adaa40e
do not try to use a session that hasEnded
nbbeeken Feb 10, 2023
eb34100
cleanup test
nbbeeken Feb 10, 2023
3a52ff6
unknown array
nbbeeken Feb 10, 2023
c3c7cdb
clean up find_cursor tests
nbbeeken Feb 10, 2023
2ac087c
drop in crud_api tests
nbbeeken Feb 10, 2023
f1f8fd3
do not set kClosed early
nbbeeken Feb 10, 2023
430e8aa
pass iterable to .race
nbbeeken Feb 10, 2023
16e582f
fix if stmt
nbbeeken Feb 13, 2023
8cbe891
remove gating
nbbeeken Feb 13, 2023
c12b8bb
Merge branch 'main' into NODE-5052-close-change-stream
nbbeeken Feb 13, 2023
feefb02
test: update title
nbbeeken Feb 13, 2023
08037f8
fix: test name org
nbbeeken Feb 13, 2023
4babaee
fix: test title
nbbeeken Feb 13, 2023
d1fc282
improve readability
nbbeeken Feb 13, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions .mocharc.json
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,13 @@
"require": [
"source-map-support/register",
"ts-node/register",
"test/tools/runner/chai-addons.js"
"test/tools/runner/chai-addons.js",
"test/tools/runner/hooks/unhandled_checker.ts"
],
"extension": [
"js",
"ts"
],
"extension": ["js", "ts"],
"recursive": true,
"timeout": 60000,
"failZero": true,
Expand Down
19 changes: 18 additions & 1 deletion src/cursor/abstract_cursor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -412,8 +412,8 @@ export abstract class AbstractCursor<

async close(): Promise<void> {
const needsToEmitClosed = !this[kClosed];
this[kClosed] = true;
await cleanupCursorAsync(this, { needsToEmitClosed });
this[kClosed] = true;
}

/**
Expand Down Expand Up @@ -778,6 +778,20 @@ function cleanupCursor(
const error = options?.error;
const needsToEmitClosed = options?.needsToEmitClosed ?? cursor[kDocuments].length === 0;

if (cursor[kClosed]) {
if (needsToEmitClosed) {
cursor[kClosed] = true;
cursor[kId] = Long.ZERO;
cursor.emit(AbstractCursor.CLOSE);
}
cursor.removeAllListeners();
if (session.owner === cursor && !session.hasEnded) {
session.endSession({ error }).finally(callback);
return;
}
return process.nextTick(callback);
}

if (error) {
if (cursor.loadBalanced && error instanceof MongoNetworkError) {
return completeCleanup();
Expand All @@ -790,6 +804,7 @@ function cleanupCursor(
cursor[kId] = Long.ZERO;
cursor.emit(AbstractCursor.CLOSE);
}
cursor.removeAllListeners();

if (session) {
if (session.owner === cursor) {
Expand All @@ -812,6 +827,7 @@ function cleanupCursor(
if (session.owner === cursor) {
session.endSession({ error }).finally(() => {
cursor.emit(AbstractCursor.CLOSE);
cursor.removeAllListeners();
callback();
});
return;
Expand All @@ -823,6 +839,7 @@ function cleanupCursor(
}

cursor.emit(AbstractCursor.CLOSE);
cursor.removeAllListeners();
return callback();
}

Expand Down
43 changes: 43 additions & 0 deletions test/integration/change-streams/change_stream.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2699,4 +2699,47 @@ describe('ChangeStream resumability', function () {
expect(changeStream.cursor.maxWireVersion).equal(maxWireVersion);
}
);

it(
'should not have the bug from NODE-5052',
{ requires: { topology: '!single' } },
async function () {
if (globalThis.AbortSignal?.timeout == null) {
this.skipReason = 'test requires AbortSignal.timeout';
this.skip();
}

const unhandledRejections: AsyncIterableIterator<[reason: Error, promise: Promise<any>]> = on(
process,
'unhandledRejection',
{ signal: AbortSignal.timeout(2000) }
);

changeStream = collection.watch();

const shouldErrorLoop = (async function () {
try {
// eslint-disable-next-line @typescript-eslint/no-unused-vars
for await (const _change of changeStream) {
// ignore
}
return null;
} catch (error) {
return error;
}
})();

await sleep(200);
const closeResult = changeStream.close().catch(error => error);
expect(closeResult).to.not.be.instanceOf(Error);

const result = await shouldErrorLoop;
expect(result).to.be.instanceOf(MongoAPIError);
expect(result.message).to.match(/ChangeStream is closed/i);

const noUnhandledPromiseRejections = await unhandledRejections.next().catch(error => error);
expect(noUnhandledPromiseRejections).to.be.instanceOf(Error);
expect(noUnhandledPromiseRejections).to.have.nested.property('cause.name', 'TimeoutError');
}
);
});
178 changes: 63 additions & 115 deletions test/integration/crud/crud_api.test.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { expect } from 'chai';
import { on } from 'events';

import { MongoClient, MongoError, ObjectId, ReturnDocument } from '../../mongodb';
import { assert as test } from '../shared';
Expand Down Expand Up @@ -60,130 +61,77 @@ describe('CRUD API', function () {
await client.close();
});

it('should correctly execute find method using crud api', function (done) {
const db = client.db();

db.collection('t').insertMany([{ a: 1 }, { a: 1 }, { a: 1 }, { a: 1 }], function (err) {
expect(err).to.not.exist;

//
// Cursor
// --------------------------------------------------
const makeCursor = () => {
// Possible methods on the the cursor instance
return db
.collection('t')
.find({})
.filter({ a: 1 })
.addCursorFlag('noCursorTimeout', true)
.addQueryModifier('$comment', 'some comment')
.batchSize(2)
.comment('some comment 2')
.limit(2)
.maxTimeMS(50)
.project({ a: 1 })
.skip(0)
.sort({ a: 1 });
};

//
// Exercise count method
// -------------------------------------------------
const countMethod = function () {
// Execute the different methods supported by the cursor
const cursor = makeCursor();
cursor.count(function (err, count) {
expect(err).to.not.exist;
test.equal(2, count);
eachMethod();
});
};

//
// Exercise legacy method each
// -------------------------------------------------
const eachMethod = function () {
let count = 0;

const cursor = makeCursor();
cursor.forEach(
() => {
count = count + 1;
},
err => {
expect(err).to.not.exist;
test.equal(2, count);
toArrayMethod();
}
);
};
context('should correctly execute find method using crud api', () => {
let db;

//
// Exercise toArray
// -------------------------------------------------
const toArrayMethod = function () {
const cursor = makeCursor();
cursor.toArray(function (err, docs) {
expect(err).to.not.exist;
test.equal(2, docs.length);
nextMethod();
});
};
beforeEach(async () => {
db = client.db();
await db.collection('t').deleteMany({});
await db.collection('t').insertMany([{ a: 1 }, { a: 1 }, { a: 1 }, { a: 1 }]);
});

//
// Exercise next method
// -------------------------------------------------
const nextMethod = function () {
const cursor = makeCursor();
cursor.next(function (err, doc) {
expect(err).to.not.exist;
test.ok(doc != null);
const makeCursor = () => {
// Possible methods on the the cursor instance
return db
.collection('t')
.find({})
.filter({ a: 1 })
.addCursorFlag('noCursorTimeout', true)
.addQueryModifier('$comment', 'some comment')
.batchSize(1)
.comment('some comment 2')
.limit(2)
.maxTimeMS(50)
.project({ a: 1 })
.skip(0)
.sort({ a: 1 });
};

it('count should be accurate', async () => {
const cursor = makeCursor();
const res = await cursor.count();
expect(res).to.equal(2);
});

cursor.next(function (err, doc) {
expect(err).to.not.exist;
test.ok(doc != null);
it('forEach should run 2 times', async () => {
const cursor = makeCursor();
let count = 0;
await cursor.forEach(() => {
count += 1;
});
expect(count).to.equal(2);
});

cursor.next(function (err, doc) {
expect(err).to.not.exist;
expect(doc).to.not.exist;
streamMethod();
});
});
});
};
it('toArray should return an array with two documents', async () => {
const cursor = makeCursor();
const res = await cursor.toArray();
expect(res).to.have.lengthOf(2);
});

//
// Exercise stream
// -------------------------------------------------
const streamMethod = function () {
let count = 0;
const cursor = makeCursor();
const stream = cursor.stream();
stream.on('data', function () {
count = count + 1;
});
it('next be callable three times without blocking', async () => {
const cursor = makeCursor();
const doc0 = await cursor.next();
expect(doc0).to.exist;
const doc1 = await cursor.next();
expect(doc1).to.exist;
const doc2 = await cursor.next();
expect(doc2).to.not.exist;
});

it('stream() should create a node stream that emits 2 data events', async () => {
const count = 0;
const cursor = makeCursor();
const stream = cursor.stream();
on(stream, 'data'),
cursor.once('close', function () {
test.equal(2, count);
explainMethod();
expect(count).to.equal(2);
});
};

//
// Explain method
// -------------------------------------------------
const explainMethod = function () {
const cursor = makeCursor();
cursor.explain(function (err, result) {
expect(err).to.not.exist;
test.ok(result != null);

client.close(done);
});
};
});

// Execute all the methods
countMethod();
it('explain works', async () => {
const cursor = makeCursor();
const result = await cursor.explain();
expect(result).to.exist;
});
});

Expand Down
25 changes: 16 additions & 9 deletions test/integration/crud/find_cursor_methods.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -109,20 +109,27 @@ describe('Find Cursor', function () {
});

context('#close', function () {
it('should send a killCursors command when closed before completely iterated', function (done) {
beforeEach(async function () {
await client
.db()
.collection('abstract_cursor')
.insertMany([{ a: 1 }, { a: 2 }, { a: 3 }, { a: 4 }]);
});
afterEach(async function () {
await client.db().collection('abstract_cursor').deleteMany();
});

it('should send a killCursors command when closed before completely iterated', async function () {
const commands = [];
client.on('commandStarted', filterForCommands(['killCursors'], commands));

const coll = client.db().collection('abstract_cursor');
const cursor = coll.find({}, { batchSize: 2 });
cursor.next(err => {
expect(err).to.not.exist;
cursor.close(err => {
expect(err).to.not.exist;
expect(commands).to.have.length(1);
done();
});
});

const doc = await cursor.next();
expect(doc).property('a', 1);
await cursor.close();
expect(commands).to.have.length(1);
});

it('should not send a killCursors command when closed after completely iterated', function (done) {
Expand Down
Loading