Skip to content

Commit 27fbe56

Browse files
committed
refactor: always emit success/failure events after started
Previously we were emitting a success or failure event for heartbeats after the entire check, which would hide an extra attempt from the user. Now we emit started => success/failure events for every individual server check.
1 parent d341c01 commit 27fbe56

File tree

4 files changed

+124
-57
lines changed

4 files changed

+124
-57
lines changed

lib/core/sdam/monitor.js

+33-36
Original file line numberDiff line numberDiff line change
@@ -136,8 +136,27 @@ function checkServer(monitor, callback) {
136136
monitor[kConnection] = undefined;
137137
}
138138

139+
const start = process.hrtime();
139140
monitor.emit('serverHeartbeatStarted', new ServerHeartbeatStartedEvent(monitor.address));
140141

142+
function failureHandler(err) {
143+
monitor.emit(
144+
'serverHeartbeatFailed',
145+
new ServerHeartbeatFailedEvent(calculateDurationInMs(start), err, monitor.address)
146+
);
147+
148+
callback(err);
149+
}
150+
151+
function successHandler(isMaster) {
152+
monitor.emit(
153+
'serverHeartbeatSucceeded',
154+
new ServerHeartbeatSucceededEvent(calculateDurationInMs(start), isMaster, monitor.address)
155+
);
156+
157+
return callback(undefined, isMaster);
158+
}
159+
141160
if (monitor[kConnection] != null) {
142161
const connectTimeoutMS = monitor.options.connectTimeoutMS;
143162
monitor[kConnection].command(
@@ -146,10 +165,11 @@ function checkServer(monitor, callback) {
146165
{ socketTimeout: connectTimeoutMS },
147166
(err, result) => {
148167
if (err) {
149-
return callback(err);
168+
failureHandler(err);
169+
return;
150170
}
151171

152-
return callback(undefined, result.result);
172+
successHandler(result.result);
153173
}
154174
);
155175

@@ -160,37 +180,37 @@ function checkServer(monitor, callback) {
160180
connect(monitor.connectOptions, monitor[kCancellationToken], (err, conn) => {
161181
if (err) {
162182
monitor[kConnection] = undefined;
163-
callback(err);
183+
failureHandler(err);
164184
return;
165185
}
166186

167187
if (monitor.s.state === STATE_CLOSING || monitor.s.state === STATE_CLOSED) {
168188
conn.destroy({ force: true });
169-
callback(new MongoError('monitor was destroyed'));
189+
failureHandler(new MongoError('monitor was destroyed'));
170190
return;
171191
}
172192

173193
monitor[kConnection] = conn;
174-
callback(undefined, conn.ismaster);
194+
successHandler(conn.ismaster);
175195
});
176196
}
177197

178198
function monitorServer(monitor) {
179-
const start = process.hrtime();
180199
stateTransition(monitor, STATE_MONITORING);
181200

182201
// TODO: the next line is a legacy event, remove in v4
183202
process.nextTick(() => monitor.emit('monitoring', monitor[kServer]));
184203

185-
checkServer(monitor, (e0, isMaster) => {
186-
if (isMaster) {
187-
successHandler(monitor, start, isMaster);
204+
checkServer(monitor, e0 => {
205+
if (e0 == null) {
206+
rescheduleMonitoring(monitor);
188207
return;
189208
}
190209

191210
// otherwise an error occured on initial discovery, also bail
192211
if (monitor[kServer].description.type === ServerType.Unknown) {
193-
failureHandler(monitor, start, e0);
212+
monitor.emit('resetServer', e0);
213+
rescheduleMonitoring(monitor);
194214
return;
195215
}
196216

@@ -199,13 +219,12 @@ function monitorServer(monitor) {
199219
// change its type to `Unknown` only after retrying once.
200220
monitor.emit('resetConnectionPool');
201221

202-
checkServer(monitor, (e1, isMaster) => {
222+
checkServer(monitor, e1 => {
203223
if (e1) {
204-
failureHandler(monitor, start, e1);
205-
return;
224+
monitor.emit('resetServer', e1);
206225
}
207226

208-
successHandler(monitor, start, isMaster);
227+
rescheduleMonitoring(monitor);
209228
});
210229
});
211230
}
@@ -225,28 +244,6 @@ function rescheduleMonitoring(monitor, ms) {
225244
}, ms || heartbeatFrequencyMS);
226245
}
227246

228-
function successHandler(monitor, start, isMaster) {
229-
rescheduleMonitoring(monitor);
230-
231-
process.nextTick(() =>
232-
monitor.emit(
233-
'serverHeartbeatSucceeded',
234-
new ServerHeartbeatSucceededEvent(calculateDurationInMs(start), isMaster, monitor.address)
235-
)
236-
);
237-
}
238-
239-
function failureHandler(monitor, start, err) {
240-
rescheduleMonitoring(monitor);
241-
242-
process.nextTick(() =>
243-
monitor.emit(
244-
'serverHeartbeatFailed',
245-
new ServerHeartbeatFailedEvent(calculateDurationInMs(start), err, monitor.address)
246-
)
247-
);
248-
}
249-
250247
module.exports = {
251248
Monitor
252249
};

lib/core/sdam/server.js

+2-5
Original file line numberDiff line numberDiff line change
@@ -136,15 +136,12 @@ class Server extends EventEmitter {
136136
this.s.pool.clear();
137137
});
138138

139-
this[kMonitor].on('serverHeartbeatFailed', event => {
139+
this[kMonitor].on('resetServer', error => {
140140
// Revert to an `Unknown` state by emitting a default description with no isMaster, and the
141141
// error from the heartbeat attempt
142142
this.emit(
143143
'descriptionReceived',
144-
new ServerDescription(this.description.address, null, {
145-
roundTripTime: calculateRoundTripTime(this.description.roundTripTime, event.duration),
146-
error: event.failure
147-
})
144+
new ServerDescription(this.description.address, null, { error })
148145
);
149146
});
150147

lib/core/sdam/server_selection.js

+1-1
Original file line numberDiff line numberDiff line change
@@ -299,7 +299,7 @@ function selectServers(topology, selector, timeout, start, callback) {
299299

300300
const retrySelection = () => {
301301
// ensure all server monitors attempt monitoring soon
302-
topology.s.servers.forEach(server => server.requestCheck());
302+
topology.s.servers.forEach(server => process.nextTick(() => server.requestCheck()));
303303

304304
const iterationTimer = setTimeout(() => {
305305
topology.removeListener('topologyDescriptionChanged', descriptionChangedHandler);

test/unit/sdam/monitoring.test.js

+88-15
Original file line numberDiff line numberDiff line change
@@ -237,9 +237,13 @@ describe('monitoring', function() {
237237
});
238238
this.defer(() => monitor.close());
239239

240-
monitor.on('serverHeartbeatFailed', () => done(new Error('unexpected heartbeat failure')));
241-
242240
let resetRequested = false;
241+
monitor.on('serverHeartbeatFailed', () => {
242+
if (resetRequested) {
243+
done(new Error('unexpected heartbeat failure'));
244+
}
245+
});
246+
243247
monitor.on('resetConnectionPool', () => (resetRequested = true));
244248
monitor.on('serverHeartbeatSucceeded', () => {
245249
if (server.description.type === ServerType.Unknown) {
@@ -272,9 +276,11 @@ describe('monitoring', function() {
272276
if (failedCount === 0) {
273277
failedCount++;
274278
request.reply({ ok: 0, errmsg: 'first error message' });
275-
} else {
279+
} else if (failedCount === 1) {
276280
failedCount++;
277281
request.reply({ ok: 0, errmsg: 'second error message' });
282+
} else {
283+
request.reply(mock.DEFAULT_ISMASTER_36);
278284
}
279285
}
280286
});
@@ -288,22 +294,89 @@ describe('monitoring', function() {
288294

289295
let resetRequested = false;
290296
monitor.on('resetConnectionPool', () => (resetRequested = true));
291-
monitor.on('serverHeartbeatSucceeded', () => {
292-
if (server.description.type === ServerType.Unknown) {
293-
// this is the first successful heartbeat, set the server type
294-
server.description.type = ServerType.Standalone;
295-
return;
297+
monitor.once('serverHeartbeatSucceeded', () => {
298+
// this is the first successful heartbeat, set the server type
299+
server.description.type = ServerType.Standalone;
300+
301+
let failureCount = 0;
302+
monitor.on('serverHeartbeatFailed', event => {
303+
failureCount++;
304+
if (failureCount === 2) {
305+
expect(resetRequested).to.be.true;
306+
expect(event)
307+
.property('failure')
308+
.to.match(/second error message/);
309+
done();
310+
}
311+
});
312+
});
313+
314+
monitor.connect();
315+
});
316+
317+
it('should report events in the correct order during monitoring failure', function(done) {
318+
let failedCount = 0;
319+
let initialConnectCompleted = false;
320+
mockServer.setMessageHandler(request => {
321+
const doc = request.document;
322+
if (doc.ismaster) {
323+
if (!initialConnectCompleted) {
324+
request.reply(mock.DEFAULT_ISMASTER_36);
325+
initialConnectCompleted = true;
326+
return;
327+
}
328+
329+
if (failedCount === 0) {
330+
failedCount++;
331+
request.reply({ ok: 0, errmsg: 'first error message' });
332+
} else {
333+
failedCount++;
334+
request.reply({ ok: 0, errmsg: 'second error message' });
335+
}
296336
}
337+
});
297338

298-
done(new Error('unexpected heartbeat success'));
339+
const server = new MockServer(mockServer.address());
340+
const monitor = new Monitor(server, {
341+
heartbeatFrequencyMS: 250,
342+
minHeartbeatFrequencyMS: 50
299343
});
344+
this.defer(() => monitor.close());
300345

301-
monitor.on('serverHeartbeatFailed', event => {
302-
expect(resetRequested).to.be.true;
303-
expect(event)
304-
.property('failure')
305-
.to.match(/second error message/);
306-
done();
346+
let poolResetRequested = false;
347+
let serverResetRequested = false;
348+
monitor.on('resetConnectionPool', () => (poolResetRequested = true));
349+
monitor.on('resetServer', () => (serverResetRequested = true));
350+
351+
const events = [];
352+
monitor.once('serverHeartbeatSucceeded', () => {
353+
// this is the first successful heartbeat, set the server type
354+
server.description.type = ServerType.Standalone;
355+
356+
monitor.on('serverHeartbeatStarted', event => events.push(event));
357+
monitor.on('serverHeartbeatFailed', event => events.push(event));
358+
monitor.once('resetServer', err => {
359+
expect(poolResetRequested).to.be.true;
360+
expect(serverResetRequested).to.be.true;
361+
expect(events.map(e => e.constructor.name)).to.eql([
362+
'ServerHeartbeatStartedEvent',
363+
'ServerHeartbeatFailedEvent',
364+
'ServerHeartbeatStartedEvent',
365+
'ServerHeartbeatFailedEvent'
366+
]);
367+
368+
expect(events[1])
369+
.property('failure')
370+
.to.match(/first error message/);
371+
expect(events[3])
372+
.property('failure')
373+
.to.match(/second error message/);
374+
expect(events[3])
375+
.property('failure')
376+
.to.eql(err);
377+
378+
done();
379+
});
307380
});
308381

309382
monitor.connect();

0 commit comments

Comments
 (0)