Skip to content

Commit 9db8369

Browse files
authored
fix(ChangeStream): handle null changes
NODE-2626
1 parent f262c59 commit 9db8369

File tree

3 files changed

+83
-41
lines changed

3 files changed

+83
-41
lines changed

src/change_stream.ts

+7-2
Original file line numberDiff line numberDiff line change
@@ -348,8 +348,8 @@ class ChangeStreamCursor extends Cursor {
348348

349349
_initializeCursor(callback: Function) {
350350
super._initializeCursor((err?: any, result?: any) => {
351-
if (err) {
352-
callback(err);
351+
if (err || result == null) {
352+
callback(err, result);
353353
return;
354354
}
355355

@@ -504,6 +504,11 @@ function waitForTopologyConnected(topology: any, options: any, callback: Functio
504504
function processNewChange(changeStream: any, change: any, callback?: Function) {
505505
const cursor = changeStream.cursor;
506506

507+
// a null change means the cursor has been notified, implicitly closing the change stream
508+
if (change == null) {
509+
changeStream.closed = true;
510+
}
511+
507512
if (changeStream.closed) {
508513
if (callback) callback(new MongoError('ChangeStream is closed'));
509514
return;

src/cursor/core_cursor.ts

+30-39
Original file line numberDiff line numberDiff line change
@@ -473,50 +473,41 @@ class CoreCursor extends Readable {
473473
}
474474

475475
const result = r.message;
476-
if (result.queryFailure) {
477-
return done(new MongoError(result.documents[0]), null);
478-
}
479476

480-
// Check if we have a command cursor
481-
if (
482-
Array.isArray(result.documents) &&
483-
result.documents.length === 1 &&
484-
(!cursor.cmd.find || (cursor.cmd.find && cursor.cmd.virtual === false)) &&
485-
(typeof result.documents[0].cursor !== 'string' ||
486-
result.documents[0]['$err'] ||
487-
result.documents[0]['errmsg'] ||
488-
Array.isArray(result.documents[0].result))
489-
) {
490-
// We have an error document, return the error
491-
if (result.documents[0]['$err'] || result.documents[0]['errmsg']) {
492-
return done(new MongoError(result.documents[0]), null);
477+
if (Array.isArray(result.documents) && result.documents.length === 1) {
478+
const document = result.documents[0];
479+
480+
if (result.queryFailure) {
481+
return done(new MongoError(document), null);
493482
}
494483

495-
// We have a cursor document
496-
if (result.documents[0].cursor != null && typeof result.documents[0].cursor !== 'string') {
497-
const id = result.documents[0].cursor.id;
498-
// If we have a namespace change set the new namespace for getmores
499-
if (result.documents[0].cursor.ns) {
500-
cursor.ns = result.documents[0].cursor.ns;
484+
// Check if we have a command cursor
485+
if (!cursor.cmd.find || (cursor.cmd.find && cursor.cmd.virtual === false)) {
486+
// We have an error document, return the error
487+
if (document.$err || document.errmsg) {
488+
return done(new MongoError(document), null);
501489
}
502-
// Promote id to long if needed
503-
cursor.cursorState.cursorId = typeof id === 'number' ? Long.fromNumber(id) : id;
504-
cursor.cursorState.lastCursorId = cursor.cursorState.cursorId;
505-
cursor.cursorState.operationTime = result.documents[0].operationTime;
506-
507-
// If we have a firstBatch set it
508-
if (Array.isArray(result.documents[0].cursor.firstBatch)) {
509-
cursor.cursorState.documents = result.documents[0].cursor.firstBatch; //.reverse();
510-
}
511-
512-
// Return after processing command cursor
513-
return done(null, result);
514-
}
515490

516-
if (Array.isArray(result.documents[0].result)) {
517-
cursor.cursorState.documents = result.documents[0].result;
518-
cursor.cursorState.cursorId = Long.ZERO;
519-
return done(null, result);
491+
// We have a cursor document
492+
if (document.cursor != null && typeof document.cursor !== 'string') {
493+
const id = document.cursor.id;
494+
// If we have a namespace change set the new namespace for getmores
495+
if (document.cursor.ns) {
496+
cursor.ns = document.cursor.ns;
497+
}
498+
// Promote id to long if needed
499+
cursor.cursorState.cursorId = typeof id === 'number' ? Long.fromNumber(id) : id;
500+
cursor.cursorState.lastCursorId = cursor.cursorState.cursorId;
501+
cursor.cursorState.operationTime = document.operationTime;
502+
503+
// If we have a firstBatch set it
504+
if (Array.isArray(document.cursor.firstBatch)) {
505+
cursor.cursorState.documents = document.cursor.firstBatch;
506+
}
507+
508+
// Return after processing command cursor
509+
return done(null, result);
510+
}
520511
}
521512
}
522513

test/functional/change_stream.test.js

+46
Original file line numberDiff line numberDiff line change
@@ -2799,3 +2799,49 @@ describe('Change Stream Resume Error Tests', function() {
27992799
})
28002800
});
28012801
});
2802+
context('NODE-2626 - handle null changes without error', function() {
2803+
let mockServer;
2804+
afterEach(() => mock.cleanup());
2805+
beforeEach(() => mock.createServer().then(server => (mockServer = server)));
2806+
it('changeStream should close if cursor id for initial aggregate is Long.ZERO', function(done) {
2807+
mockServer.setMessageHandler(req => {
2808+
const doc = req.document;
2809+
if (doc.ismaster) {
2810+
return req.reply(mock.DEFAULT_ISMASTER_36);
2811+
}
2812+
if (doc.aggregate) {
2813+
return req.reply({
2814+
ok: 1,
2815+
cursor: {
2816+
id: Long.ZERO,
2817+
firstBatch: []
2818+
}
2819+
});
2820+
}
2821+
if (doc.getMore) {
2822+
return req.reply({
2823+
ok: 1,
2824+
cursor: {
2825+
id: new Long(1407, 1407),
2826+
nextBatch: []
2827+
}
2828+
});
2829+
}
2830+
req.reply({ ok: 1 });
2831+
});
2832+
const client = this.configuration.newClient(`mongodb://${mockServer.uri()}/`, {
2833+
useUnifiedTopology: true
2834+
});
2835+
client.connect(err => {
2836+
expect(err).to.not.exist;
2837+
const collection = client.db('cs').collection('test');
2838+
const changeStream = collection.watch();
2839+
changeStream.next((err, doc) => {
2840+
expect(err).to.exist;
2841+
expect(doc).to.not.exist;
2842+
expect(err.message).to.equal('ChangeStream is closed');
2843+
changeStream.close(() => client.close(done));
2844+
});
2845+
});
2846+
});
2847+
});

0 commit comments

Comments
 (0)