Skip to content

Commit 61ff544

Browse files
mbroadstdaprahamian
authored andcommitted
refactor(change-stream-cursor): copy all options on resume
1 parent 18e4d75 commit 61ff544

File tree

2 files changed

+43
-5
lines changed

2 files changed

+43
-5
lines changed

lib/change_stream.js

+8-5
Original file line numberDiff line numberDiff line change
@@ -267,15 +267,18 @@ class ChangeStreamCursor extends Cursor {
267267
}
268268

269269
get resumeOptions() {
270-
if (this.resumeToken) {
271-
return { resumeAfter: this.resumeToken };
270+
const result = {};
271+
for (const optionName of CURSOR_OPTIONS) {
272+
if (this.options[optionName]) result[optionName] = this.options[optionName];
272273
}
273274

274-
if (this.startAtOperationTime && maxWireVersion(this.server) >= 7) {
275-
return { startAtOperationTime: this.startAtOperationTime };
275+
if (this.resumeToken) {
276+
Object.assign(result, { resumeAfter: this.resumeToken });
277+
} else if (this.startAtOperationTime && maxWireVersion(this.server) >= 7) {
278+
Object.assign(result, { startAtOperationTime: this.startAtOperationTime });
276279
}
277280

278-
return null;
281+
return result;
279282
}
280283

281284
_initializeCursor(callback) {

test/functional/change_stream_tests.js

+35
Original file line numberDiff line numberDiff line change
@@ -1585,6 +1585,41 @@ describe('Change Streams', function() {
15851585
}
15861586
});
15871587

1588+
it('should maintain change stream options on resume', {
1589+
metadata: { requires: { topology: 'replicaset', mongodb: '>=3.5.10' } },
1590+
test: function(done) {
1591+
const configuration = this.configuration;
1592+
const client = configuration.newClient();
1593+
1594+
const collectionName = 'resumeAfterKillCursor';
1595+
1596+
let db;
1597+
let coll;
1598+
let changeStream;
1599+
1600+
function close(e) {
1601+
changeStream.close(() => client.close(() => done(e)));
1602+
}
1603+
1604+
const changeStreamOptions = {
1605+
fullDocument: 'updateLookup',
1606+
collation: { maxVariable: 'punct' },
1607+
maxAwaitTimeMS: 20000,
1608+
batchSize: 200
1609+
};
1610+
1611+
client
1612+
.connect()
1613+
.then(() => (db = client.db('integration_tests')))
1614+
.then(() => (coll = db.collection(collectionName)))
1615+
.then(() => (changeStream = coll.watch([], changeStreamOptions)))
1616+
.then(() => {
1617+
expect(changeStream.cursor.resumeOptions).to.containSubset(changeStreamOptions);
1618+
})
1619+
.then(() => close(), e => close(e));
1620+
}
1621+
});
1622+
15881623
it('Should include a startAtOperationTime field when resuming if no changes have been received', {
15891624
metadata: { requires: { topology: 'replicaset', mongodb: '>=3.7.3' } },
15901625
test: function(done) {

0 commit comments

Comments
 (0)