From 473eaf4ac69f09626345becd2e7554f5138939ce Mon Sep 17 00:00:00 2001 From: ebinks Date: Thu, 20 Dec 2018 11:48:07 -0500 Subject: [PATCH 1/8] add callback to pubsub.unsubscribe and test (#300) --- src/pubsub.js | 4 +++- test/pubsub.node.js | 20 ++++++++++++-------- 2 files changed, 15 insertions(+), 9 deletions(-) diff --git a/src/pubsub.js b/src/pubsub.js index 9bff4a1ca4..a4dacb06d7 100644 --- a/src/pubsub.js +++ b/src/pubsub.js @@ -33,7 +33,7 @@ module.exports = (node) => { subscribe(callback) }, - unsubscribe: (topic, handler) => { + unsubscribe: (topic, handler, callback) => { if (!node.isStarted() && !floodSub.started) { throw new Error(NOT_STARTED_YET) } @@ -43,6 +43,8 @@ module.exports = (node) => { if (floodSub.listenerCount(topic) === 0) { floodSub.unsubscribe(topic) } + + setImmediate(() => callback()) }, publish: (topic, data, callback) => { diff --git a/test/pubsub.node.js b/test/pubsub.node.js index 0aaa75c99e..f9c0c76baf 100644 --- a/test/pubsub.node.js +++ b/test/pubsub.node.js @@ -52,21 +52,25 @@ function stopTwo (nodes, callback) { // TODO: consider if all or some of those should come here describe('.pubsub', () => { describe('.pubsub on (default)', (done) => { - it('start two nodes and send one message', (done) => { + it('start two nodes and send one message, then unsubscribe', (done) => { + const data = Buffer.from('test') + const handler = (msg, nodes, cb) => { + expect(msg.data).to.eql(data) + cb(null, nodes) + } waterfall([ (cb) => startTwo(cb), (nodes, cb) => { - const data = Buffer.from('test') - nodes[0].pubsub.subscribe('pubsub', - (msg) => { - expect(msg.data).to.eql(data) - cb(null, nodes) - }, - (err) => { + nodes[0].pubsub.subscribe('pubsub', (msg, nodes, cb) => handler, (err) => { expect(err).to.not.exist() setTimeout(() => nodes[1].pubsub.publish('pubsub', data, (err) => { expect(err).to.not.exist() }), 500) + setTimeout(() => nodes[0].pubsub.unsubscribe('pubsub', handler, (err) => { + expect(err).to.not.exist() + console.log("\tunsubscribed!") + done() + }), 600) } ) }, From 2d724d79a70962c71f2b2ff88e6f90e647ce7bd2 Mon Sep 17 00:00:00 2001 From: ebinks Date: Thu, 20 Dec 2018 12:07:38 -0500 Subject: [PATCH 2/8] fix linting issues --- test/pubsub.node.js | 19 +++++++++---------- 1 file changed, 9 insertions(+), 10 deletions(-) diff --git a/test/pubsub.node.js b/test/pubsub.node.js index f9c0c76baf..2ce837ca9d 100644 --- a/test/pubsub.node.js +++ b/test/pubsub.node.js @@ -62,17 +62,16 @@ describe('.pubsub', () => { (cb) => startTwo(cb), (nodes, cb) => { nodes[0].pubsub.subscribe('pubsub', (msg, nodes, cb) => handler, (err) => { + expect(err).to.not.exist() + setTimeout(() => nodes[1].pubsub.publish('pubsub', data, (err) => { expect(err).to.not.exist() - setTimeout(() => nodes[1].pubsub.publish('pubsub', data, (err) => { - expect(err).to.not.exist() - }), 500) - setTimeout(() => nodes[0].pubsub.unsubscribe('pubsub', handler, (err) => { - expect(err).to.not.exist() - console.log("\tunsubscribed!") - done() - }), 600) - } - ) + }), 500) + setTimeout(() => nodes[0].pubsub.unsubscribe('pubsub', handler, (err) => { + expect(err).to.not.exist() + console.log('tunsubscribed!') + done() + }), 600) + }) }, (nodes, cb) => stopTwo(nodes, cb) ], done) From e937f735fe0e618b74f35cf20bd9e18d24217f9d Mon Sep 17 00:00:00 2001 From: ebinks Date: Thu, 20 Dec 2018 13:57:03 -0500 Subject: [PATCH 3/8] pubsub.unsubscribe: check that callback is a function; removed timeout from tests --- src/pubsub.js | 4 +++- test/pubsub.node.js | 10 +++++----- 2 files changed, 8 insertions(+), 6 deletions(-) diff --git a/src/pubsub.js b/src/pubsub.js index a4dacb06d7..4547de9bf2 100644 --- a/src/pubsub.js +++ b/src/pubsub.js @@ -44,7 +44,9 @@ module.exports = (node) => { floodSub.unsubscribe(topic) } - setImmediate(() => callback()) + if (typeof callback === 'function') { + setImmediate(() => callback()) + } }, publish: (topic, data, callback) => { diff --git a/test/pubsub.node.js b/test/pubsub.node.js index 2ce837ca9d..4de92ad5f8 100644 --- a/test/pubsub.node.js +++ b/test/pubsub.node.js @@ -63,14 +63,14 @@ describe('.pubsub', () => { (nodes, cb) => { nodes[0].pubsub.subscribe('pubsub', (msg, nodes, cb) => handler, (err) => { expect(err).to.not.exist() - setTimeout(() => nodes[1].pubsub.publish('pubsub', data, (err) => { + nodes[1].pubsub.publish('pubsub', data, (err) => { expect(err).to.not.exist() - }), 500) - setTimeout(() => nodes[0].pubsub.unsubscribe('pubsub', handler, (err) => { + }) + nodes[0].pubsub.unsubscribe('pubsub', handler, (err) => { expect(err).to.not.exist() - console.log('tunsubscribed!') + console.log('\tunsubscribed!') done() - }), 600) + }) }) }, (nodes, cb) => stopTwo(nodes, cb) From e0d715b98e158bf1bb85fa3ab2619311aaea0e9d Mon Sep 17 00:00:00 2001 From: ebinks Date: Thu, 20 Dec 2018 17:05:26 -0500 Subject: [PATCH 4/8] remove console.log; add chai-checkmark --- test/pubsub.node.js | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/test/pubsub.node.js b/test/pubsub.node.js index 4de92ad5f8..0f160ab45b 100644 --- a/test/pubsub.node.js +++ b/test/pubsub.node.js @@ -5,6 +5,7 @@ const chai = require('chai') chai.use(require('dirty-chai')) +chai.use(require('chai-checkmark')) const expect = chai.expect const parallel = require('async/parallel') const waterfall = require('async/waterfall') @@ -53,6 +54,7 @@ function stopTwo (nodes, callback) { describe('.pubsub', () => { describe('.pubsub on (default)', (done) => { it('start two nodes and send one message, then unsubscribe', (done) => { + expect(4).checks(done) const data = Buffer.from('test') const handler = (msg, nodes, cb) => { expect(msg.data).to.eql(data) @@ -67,9 +69,8 @@ describe('.pubsub', () => { expect(err).to.not.exist() }) nodes[0].pubsub.unsubscribe('pubsub', handler, (err) => { - expect(err).to.not.exist() - console.log('\tunsubscribed!') done() + expect(err).to.not.exist() }) }) }, From b7eb2180e0cd32f13c17e7497d5a16659e499355 Mon Sep 17 00:00:00 2001 From: ebinks Date: Fri, 21 Dec 2018 11:19:35 -0500 Subject: [PATCH 5/8] move done to end of waterfall --- test/pubsub.node.js | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/test/pubsub.node.js b/test/pubsub.node.js index 0f160ab45b..52f2276ab2 100644 --- a/test/pubsub.node.js +++ b/test/pubsub.node.js @@ -69,12 +69,12 @@ describe('.pubsub', () => { expect(err).to.not.exist() }) nodes[0].pubsub.unsubscribe('pubsub', handler, (err) => { - done() expect(err).to.not.exist() }) }) }, - (nodes, cb) => stopTwo(nodes, cb) + (nodes, cb) => stopTwo(nodes, cb), + done() ], done) }) }) From e4a45c60e0e32582535b41a683985b802483e1d0 Mon Sep 17 00:00:00 2001 From: ebinks Date: Fri, 4 Jan 2019 14:24:07 -0500 Subject: [PATCH 6/8] update pubsub test --- test/pubsub.node.js | 49 +++++++++++++++++++++++++++------------------ 1 file changed, 29 insertions(+), 20 deletions(-) diff --git a/test/pubsub.node.js b/test/pubsub.node.js index 52f2276ab2..b08be06c30 100644 --- a/test/pubsub.node.js +++ b/test/pubsub.node.js @@ -8,6 +8,7 @@ chai.use(require('dirty-chai')) chai.use(require('chai-checkmark')) const expect = chai.expect const parallel = require('async/parallel') +const series = require('async/series') const waterfall = require('async/waterfall') const _times = require('lodash.times') @@ -54,28 +55,36 @@ function stopTwo (nodes, callback) { describe('.pubsub', () => { describe('.pubsub on (default)', (done) => { it('start two nodes and send one message, then unsubscribe', (done) => { - expect(4).checks(done) + // Check the final series error, and the publish handler + expect(2).checks(done) + + let nodes const data = Buffer.from('test') - const handler = (msg, nodes, cb) => { - expect(msg.data).to.eql(data) - cb(null, nodes) + const handler = (msg) => { + // verify the data is correct and mark the expect + expect(msg.data).to.eql(data).mark() } - waterfall([ - (cb) => startTwo(cb), - (nodes, cb) => { - nodes[0].pubsub.subscribe('pubsub', (msg, nodes, cb) => handler, (err) => { - expect(err).to.not.exist() - nodes[1].pubsub.publish('pubsub', data, (err) => { - expect(err).to.not.exist() - }) - nodes[0].pubsub.unsubscribe('pubsub', handler, (err) => { - expect(err).to.not.exist() - }) - }) - }, - (nodes, cb) => stopTwo(nodes, cb), - done() - ], done) + + series([ + // Start the nodes + (cb) => startTwo((err, _nodes) => { + nodes = _nodes + cb(err) + }), + // subscribe on the first + (cb) => nodes[0].pubsub.subscribe('pubsub', handler, cb), + // Wait a moment before publishing + (cb) => setTimeout(cb, 500), + // publish on the second + (cb) => nodes[1].pubsub.publish('pubsub', data, cb), + // unsubscribe on the first + (cb) => nodes[0].pubsub.unsubscribe('pubsub', handler, cb), + // Stop both nodes + (cb) => stopTwo(nodes, cb) + ], (err) => { + // Verify there was no error, and mark the expect + expect(err).to.not.exist().mark() + }) }) }) From e446e7fee6b813e0df7fa25c0ae6e73cec1e2b82 Mon Sep 17 00:00:00 2001 From: ebinks Date: Fri, 4 Jan 2019 14:40:25 -0500 Subject: [PATCH 7/8] fix lint; remove unneeded waterfall --- test/pubsub.node.js | 1 - 1 file changed, 1 deletion(-) diff --git a/test/pubsub.node.js b/test/pubsub.node.js index b08be06c30..fe7846436b 100644 --- a/test/pubsub.node.js +++ b/test/pubsub.node.js @@ -9,7 +9,6 @@ chai.use(require('chai-checkmark')) const expect = chai.expect const parallel = require('async/parallel') const series = require('async/series') -const waterfall = require('async/waterfall') const _times = require('lodash.times') const createNode = require('./utils/create-node') From 2dce1ba9c34a8b1313f83956bb2a1b735d60885c Mon Sep 17 00:00:00 2001 From: noot Date: Fri, 1 Feb 2019 13:11:37 -0500 Subject: [PATCH 8/8] add delay before unsubscribe --- test/pubsub.node.js | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/test/pubsub.node.js b/test/pubsub.node.js index fe7846436b..651a52bf49 100644 --- a/test/pubsub.node.js +++ b/test/pubsub.node.js @@ -77,7 +77,8 @@ describe('.pubsub', () => { // publish on the second (cb) => nodes[1].pubsub.publish('pubsub', data, cb), // unsubscribe on the first - (cb) => nodes[0].pubsub.unsubscribe('pubsub', handler, cb), + (cb) => setTimeout(cb, 500), + (cb) => nodes[0].pubsub.unsubscribe('pubsub', handler, cb), // Stop both nodes (cb) => stopTwo(nodes, cb) ], (err) => {