Skip to content

Commit 9ec9b8f

Browse files
committed
feat(ChangeStream): adds new resume functionality to ChangeStreams
- Adds support for the new startAfter options - Adds the ability to parse postBatchResumeTokens off of aggregate/getMore responses and leverage them when resuming. - Replaces property resumeToken with accessor resumeToken that will always have the most up to date resume token. Fixes NODE-1824 Fixes NODE-1866 Fixes NODE-1951 Fixes NODE-1979
1 parent e3c6418 commit 9ec9b8f

File tree

3 files changed

+137
-58
lines changed

3 files changed

+137
-58
lines changed

lib/change_stream.js

+96-42
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,84 @@ const CHANGE_DOMAIN_TYPES = {
3434
* @return {ChangeStream} a ChangeStream instance.
3535
*/
3636

37+
class ResumeTokenTracker extends EventEmitter {
38+
constructor(changeStream, options) {
39+
super();
40+
this.changeStream = changeStream;
41+
this.options = options;
42+
this._postBatchResumeToken = undefined;
43+
}
44+
45+
get resumeToken() {
46+
return this._resumeToken;
47+
}
48+
49+
init() {
50+
this._resumeToken = this.options.startAfter || this.options.resumeAfter;
51+
this._operationTime = this.options.startAtOperationTime;
52+
this._init = true;
53+
}
54+
55+
resumeInfo() {
56+
const resumeInfo = {};
57+
58+
if (this._init && this._resumeToken) {
59+
resumeInfo.resumeAfter = this._resumeToken;
60+
} else if (this._init && this._operationTime) {
61+
resumeInfo.startAtOperationTime = this._operationTime;
62+
} else {
63+
if (this.options.startAfter) {
64+
resumeInfo.startAfter = this.options.startAfter;
65+
}
66+
67+
if (this.options.resumeAfter) {
68+
resumeInfo.resumeAfter = this.options.resumeAfter;
69+
}
70+
71+
if (this.options.startAtOperationTime) {
72+
resumeInfo.startAtOperationTime = this.options.startAtOperationTime;
73+
}
74+
}
75+
76+
return resumeInfo;
77+
}
78+
79+
onResponse(postBatchResumeToken, operationTime) {
80+
if (this.changeStream.isClosed()) {
81+
return;
82+
}
83+
const cursor = this.changeStream.cursor;
84+
if (!postBatchResumeToken) {
85+
if (
86+
!(this._resumeToken || this._operationTime || cursor.bufferedCount()) &&
87+
cursor.server &&
88+
cursor.server.ismaster.maxWireVersion >= 7
89+
) {
90+
this._operationTime = operationTime;
91+
}
92+
return;
93+
} else {
94+
this._postBatchResumeToken = postBatchResumeToken;
95+
if (cursor.cursorState.documents.length === 0) {
96+
this._resumeToken = this._postBatchResumeToken;
97+
}
98+
}
99+
100+
this.emit('response');
101+
}
102+
103+
onNext(doc) {
104+
if (this.changeStream.isClosed()) {
105+
return;
106+
}
107+
if (this._postBatchResumeToken && this.changeStream.cursor.bufferedCount() === 0) {
108+
this._resumeToken = this._postBatchResumeToken;
109+
} else {
110+
this._resumeToken = doc._id;
111+
}
112+
}
113+
}
114+
37115
class ChangeStream extends EventEmitter {
38116
constructor(changeDomain, pipeline, options) {
39117
super();
@@ -69,17 +147,13 @@ class ChangeStream extends EventEmitter {
69147
this.options.readPreference = changeDomain.s.readPreference;
70148
}
71149

72-
// We need to get the operationTime as early as possible
73-
const isMaster = this.topology.lastIsMaster();
74-
if (!isMaster) {
75-
throw new MongoError('Topology does not have an ismaster yet.');
76-
}
77-
78-
this.operationTime = isMaster.operationTime;
150+
this._resumeTokenTracker = new ResumeTokenTracker(this, options);
79151

80152
// Create contained Change Stream cursor
81153
this.cursor = createChangeStreamCursor(this);
82154

155+
this._resumeTokenTracker.init();
156+
83157
// Listen for any `change` listeners being added to ChangeStream
84158
this.on('newListener', eventName => {
85159
if (eventName === 'change' && this.cursor && this.listenerCount('change') === 0) {
@@ -97,6 +171,14 @@ class ChangeStream extends EventEmitter {
97171
});
98172
}
99173

174+
/**
175+
* The cached resume token that will be used to resume
176+
* after the most recently returned change.
177+
*/
178+
get resumeToken() {
179+
return this._resumeTokenTracker.resumeToken;
180+
}
181+
100182
/**
101183
* Check if there is any document still available in the Change Stream
102184
* @function ChangeStream.prototype.hasNext
@@ -217,10 +299,6 @@ class ChangeStream extends EventEmitter {
217299

218300
// Create a new change stream cursor based on self's configuration
219301
var createChangeStreamCursor = function(self) {
220-
if (self.resumeToken) {
221-
self.options.resumeAfter = self.resumeToken;
222-
}
223-
224302
var changeStreamCursor = buildChangeStreamAggregationCommand(self);
225303

226304
/**
@@ -277,39 +355,20 @@ var createChangeStreamCursor = function(self) {
277355
return changeStreamCursor;
278356
};
279357

280-
function getResumeToken(self) {
281-
return self.resumeToken || self.options.resumeAfter;
282-
}
283-
284-
function getStartAtOperationTime(self) {
285-
const isMaster = self.topology.lastIsMaster() || {};
286-
return (
287-
isMaster.maxWireVersion && isMaster.maxWireVersion >= 7 && self.options.startAtOperationTime
288-
);
289-
}
290-
291358
var buildChangeStreamAggregationCommand = function(self) {
292359
const topology = self.topology;
293360
const namespace = self.namespace;
294361
const pipeline = self.pipeline;
295362
const options = self.options;
363+
const resumeTokenTracker = self._resumeTokenTracker;
296364

297-
var changeStreamStageOptions = {
298-
fullDocument: options.fullDocument || 'default'
299-
};
300-
301-
const resumeToken = getResumeToken(self);
302-
const startAtOperationTime = getStartAtOperationTime(self);
303-
if (resumeToken) {
304-
changeStreamStageOptions.resumeAfter = resumeToken;
305-
}
306-
307-
if (startAtOperationTime) {
308-
changeStreamStageOptions.startAtOperationTime = startAtOperationTime;
309-
}
365+
const changeStreamStageOptions = Object.assign(
366+
{ fullDocument: options.fullDocument || 'default' },
367+
resumeTokenTracker.resumeInfo()
368+
);
310369

311370
// Map cursor options
312-
var cursorOptions = {};
371+
var cursorOptions = { resumeTokenTracker };
313372
cursorOptionNames.forEach(function(optionName) {
314373
if (options[optionName]) {
315374
cursorOptions[optionName] = options[optionName];
@@ -384,11 +443,6 @@ function processNewChange(args) {
384443
if (isResumableError(error) && !changeStream.attemptingResume) {
385444
changeStream.attemptingResume = true;
386445

387-
if (!(getResumeToken(changeStream) || getStartAtOperationTime(changeStream))) {
388-
const startAtOperationTime = changeStream.cursor.cursorState.operationTime;
389-
changeStream.options = Object.assign({ startAtOperationTime }, changeStream.options);
390-
}
391-
392446
// stop listening to all events from old cursor
393447
['data', 'close', 'end', 'error'].forEach(event =>
394448
changeStream.cursor.removeAllListeners(event)
@@ -450,7 +504,7 @@ function processNewChange(args) {
450504
return changeStream.promiseLibrary.reject(noResumeTokenError);
451505
}
452506

453-
changeStream.resumeToken = change._id;
507+
changeStream._resumeTokenTracker.onNext(change);
454508

455509
// wipe the startAtOperationTime if there was one so that there won't be a conflict
456510
// between resumeToken and startAtOperationTime if we need to reconnect the cursor

lib/core/cursor.js

+13-1
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,8 @@ var Cursor = function(bson, ns, cmd, options, topology, topologyOptions) {
7979
currentLimit: 0,
8080
// Result field name if not a cursor (contains the array of results)
8181
transforms: options.transforms,
82-
raw: options.raw || (cmd && cmd.raw)
82+
raw: options.raw || (cmd && cmd.raw),
83+
resumeTokenTracker: options.resumeTokenTracker
8384
};
8485

8586
if (typeof options.session === 'object') {
@@ -462,6 +463,10 @@ var nextFunction = function(self, callback) {
462463
return handleCallback(callback, err);
463464
}
464465

466+
if (self.cursorState.resumeTokenTracker) {
467+
self.cursorState.resumeTokenTracker.onResponse(doc.cursor.postBatchResumeToken);
468+
}
469+
465470
if (self.cursorState.cursorId && self.cursorState.cursorId.isZero() && self._endSession) {
466471
self._endSession();
467472
}
@@ -670,6 +675,13 @@ function initializeCursor(cursor, callback) {
670675
cursor.cursorState.documents = result.documents[0].cursor.firstBatch; //.reverse();
671676
}
672677

678+
if (cursor.cursorState.resumeTokenTracker) {
679+
cursor.cursorState.resumeTokenTracker.onResponse(
680+
result.documents[0].cursor.postBatchResumeToken,
681+
result.documents[0].operationTime
682+
);
683+
}
684+
673685
// Return after processing command cursor
674686
return done(result);
675687
}

test/functional/change_stream_tests.js

+28-15
Original file line numberDiff line numberDiff line change
@@ -1639,12 +1639,11 @@ describe('Change Streams', function() {
16391639
const dbName = 'integration_tests';
16401640
const collectionName = 'resumeWithStartAtOperationTime';
16411641
const connectOptions = {
1642-
socketTimeoutMS: 500,
1643-
validateOptions: true
1642+
validateOptions: true,
1643+
monitorCommands: true
16441644
};
16451645

16461646
let getMoreCounter = 0;
1647-
let aggregateCounter = 0;
16481647
let changeStream;
16491648
let server;
16501649
let client;
@@ -1660,47 +1659,61 @@ describe('Change Streams', function() {
16601659
function primaryServerHandler(request) {
16611660
try {
16621661
const doc = request.document;
1663-
16641662
if (doc.ismaster) {
16651663
return request.reply(makeIsMaster(server));
16661664
} else if (doc.aggregate) {
1667-
if (aggregateCounter++ > 0) {
1668-
expect(doc).to.have.nested.property('pipeline[0].$changeStream.startAtOperationTime');
1669-
expect(doc.pipeline[0].$changeStream.startAtOperationTime.equals(OPERATION_TIME)).to
1670-
.be.ok;
1671-
expect(doc).to.not.have.nested.property('pipeline[0].$changeStream.resumeAfter');
1672-
} else {
1673-
expect(doc).to.not.have.nested.property(
1674-
'pipeline[0].$changeStream.startAtOperationTime'
1675-
);
1676-
expect(doc).to.not.have.nested.property('pipeline[0].$changeStream.resumeAfter');
1677-
}
16781665
return request.reply(AGGREGATE_RESPONSE);
16791666
} else if (doc.getMore) {
16801667
if (getMoreCounter++ === 0) {
1668+
request.reply({ ok: 0 });
16811669
return;
16821670
}
16831671

16841672
request.reply(GET_MORE_RESPONSE);
16851673
} else if (doc.endSessions) {
16861674
request.reply({ ok: 1 });
1675+
} else if (doc.killCursors) {
1676+
request.reply({ ok: 1 });
16871677
}
16881678
} catch (e) {
16891679
finish(e);
16901680
}
16911681
}
16921682

1683+
const started = [];
1684+
16931685
mock
16941686
.createServer()
16951687
.then(_server => (server = _server))
16961688
.then(() => server.setMessageHandler(primaryServerHandler))
16971689
.then(() => (client = configuration.newClient(`mongodb://${server.uri()}`, connectOptions)))
16981690
.then(() => client.connect())
1691+
.then(() => {
1692+
client.on('commandStarted', e => {
1693+
if (e.commandName === 'aggregate') {
1694+
started.push(e);
1695+
}
1696+
});
1697+
})
16991698
.then(() => client.db(dbName))
17001699
.then(db => db.collection(collectionName))
17011700
.then(col => col.watch(pipeline))
17021701
.then(_changeStream => (changeStream = _changeStream))
17031702
.then(() => changeStream.next())
1703+
.then(() => {
1704+
const first = started[0].command;
1705+
expect(first).to.have.nested.property('pipeline[0].$changeStream');
1706+
const firstStage = first.pipeline[0].$changeStream;
1707+
expect(firstStage).to.not.have.property('resumeAfter');
1708+
expect(firstStage).to.not.have.property('startAtOperationTime');
1709+
1710+
const second = started[1].command;
1711+
expect(second).to.have.nested.property('pipeline[0].$changeStream');
1712+
const secondStage = second.pipeline[0].$changeStream;
1713+
expect(secondStage).to.not.have.property('resumeAfter');
1714+
expect(secondStage).to.have.property('startAtOperationTime');
1715+
expect(secondStage.startAtOperationTime.equals(OPERATION_TIME)).to.be.ok;
1716+
})
17041717
.then(() => finish(), err => finish(err));
17051718
}
17061719
});

0 commit comments

Comments
 (0)