Skip to content

Commit 98a18d8

Browse files
nootjacobheun
authored andcommitted
fix: add callback to pubsub.unsubscribe and test (#300)
1 parent c4cab00 commit 98a18d8

File tree

2 files changed

+38
-22
lines changed

2 files changed

+38
-22
lines changed

src/pubsub.js

+5-1
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ module.exports = (node) => {
3333
subscribe(callback)
3434
},
3535

36-
unsubscribe: (topic, handler) => {
36+
unsubscribe: (topic, handler, callback) => {
3737
if (!node.isStarted() && !floodSub.started) {
3838
throw new Error(NOT_STARTED_YET)
3939
}
@@ -43,6 +43,10 @@ module.exports = (node) => {
4343
if (floodSub.listenerCount(topic) === 0) {
4444
floodSub.unsubscribe(topic)
4545
}
46+
47+
if (typeof callback === 'function') {
48+
setImmediate(() => callback())
49+
}
4650
},
4751

4852
publish: (topic, data, callback) => {

test/pubsub.node.js

+33-21
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,10 @@
55

66
const chai = require('chai')
77
chai.use(require('dirty-chai'))
8+
chai.use(require('chai-checkmark'))
89
const expect = chai.expect
910
const parallel = require('async/parallel')
10-
const waterfall = require('async/waterfall')
11+
const series = require('async/series')
1112
const _times = require('lodash.times')
1213

1314
const createNode = require('./utils/create-node')
@@ -52,26 +53,37 @@ function stopTwo (nodes, callback) {
5253
// TODO: consider if all or some of those should come here
5354
describe('.pubsub', () => {
5455
describe('.pubsub on (default)', (done) => {
55-
it('start two nodes and send one message', (done) => {
56-
waterfall([
57-
(cb) => startTwo(cb),
58-
(nodes, cb) => {
59-
const data = Buffer.from('test')
60-
nodes[0].pubsub.subscribe('pubsub',
61-
(msg) => {
62-
expect(msg.data).to.eql(data)
63-
cb(null, nodes)
64-
},
65-
(err) => {
66-
expect(err).to.not.exist()
67-
setTimeout(() => nodes[1].pubsub.publish('pubsub', data, (err) => {
68-
expect(err).to.not.exist()
69-
}), 500)
70-
}
71-
)
72-
},
73-
(nodes, cb) => stopTwo(nodes, cb)
74-
], done)
56+
it('start two nodes and send one message, then unsubscribe', (done) => {
57+
// Check the final series error, and the publish handler
58+
expect(2).checks(done)
59+
60+
let nodes
61+
const data = Buffer.from('test')
62+
const handler = (msg) => {
63+
// verify the data is correct and mark the expect
64+
expect(msg.data).to.eql(data).mark()
65+
}
66+
67+
series([
68+
// Start the nodes
69+
(cb) => startTwo((err, _nodes) => {
70+
nodes = _nodes
71+
cb(err)
72+
}),
73+
// subscribe on the first
74+
(cb) => nodes[0].pubsub.subscribe('pubsub', handler, cb),
75+
// Wait a moment before publishing
76+
(cb) => setTimeout(cb, 500),
77+
// publish on the second
78+
(cb) => nodes[1].pubsub.publish('pubsub', data, cb),
79+
// unsubscribe on the first
80+
(cb) => nodes[0].pubsub.unsubscribe('pubsub', handler, cb),
81+
// Stop both nodes
82+
(cb) => stopTwo(nodes, cb)
83+
], (err) => {
84+
// Verify there was no error, and mark the expect
85+
expect(err).to.not.exist().mark()
86+
})
7587
})
7688
})
7789

0 commit comments

Comments
 (0)