Skip to content

Commit ae6af20

Browse files
achingbrainjacobheun
authored andcommitted
fix: pubsub promisify (#456)
* fix: allow pubsub sub/unsub via promises * chore: fix linting errors
1 parent 2a80618 commit ae6af20

17 files changed

+196
-149
lines changed

package.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,7 @@
8181
"chai": "^4.2.0",
8282
"chai-checkmark": "^1.0.1",
8383
"cids": "^0.7.1",
84+
"delay": "^4.3.0",
8485
"dirty-chai": "^2.0.1",
8586
"electron-webrtc": "^0.3.0",
8687
"interface-datastore": "^0.6.0",

src/circuit/circuit/dialer.js

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -153,7 +153,7 @@ class Dialer {
153153
const relays = Array.from(this.relayPeers.values())
154154
const next = (nextRelay) => {
155155
if (!nextRelay) {
156-
const err = `no relay peers were found or all relays failed to dial`
156+
const err = 'no relay peers were found or all relays failed to dial'
157157
log.err(err)
158158
return cb(err)
159159
}
@@ -235,7 +235,7 @@ class Dialer {
235235
}
236236
const message = proto.CircuitRelay.decode(msg)
237237
if (message.type !== proto.CircuitRelay.Type.STATUS) {
238-
return callback(new Error(`Got invalid message type - ` +
238+
return callback(new Error('Got invalid message type - ' +
239239
`expected ${proto.CircuitRelay.Type.STATUS} got ${message.type}`))
240240
}
241241

src/circuit/circuit/hop.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -203,7 +203,7 @@ class Hop extends EE {
203203

204204
const message = proto.decode(msg)
205205
if (message.code !== proto.Status.SUCCESS) {
206-
return callback(new Error(`Unable to create circuit!`))
206+
return callback(new Error('Unable to create circuit!'))
207207
}
208208

209209
return callback(null, msg)

src/circuit/circuit/stream-handler.js

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ class StreamHandler {
4949
*/
5050
read (cb) {
5151
if (!this.isValid()) {
52-
return cb(new Error(`handler is not in a valid state`))
52+
return cb(new Error('handler is not in a valid state'))
5353
}
5454

5555
lp.decodeFromReader(
@@ -77,7 +77,7 @@ class StreamHandler {
7777
cb = cb || (() => {})
7878

7979
if (!this.isValid()) {
80-
return cb(new Error(`handler is not in a valid state`))
80+
return cb(new Error('handler is not in a valid state'))
8181
}
8282

8383
pull(

src/circuit/listener.js

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -132,10 +132,10 @@ module.exports = (swarm, options, connHandler) => {
132132
if (!mafmt.Circuit.matches(addr)) {
133133
if (addr.getPeerId()) {
134134
// by default we're reachable over any relay
135-
listenAddrs.push(multiaddr(`/p2p-circuit`).encapsulate(addr))
135+
listenAddrs.push(multiaddr('/p2p-circuit').encapsulate(addr))
136136
} else {
137137
const ma = `${addr}/ipfs/${swarm._peerInfo.id.toB58String()}`
138-
listenAddrs.push(multiaddr(`/p2p-circuit`).encapsulate(ma))
138+
listenAddrs.push(multiaddr('/p2p-circuit').encapsulate(ma))
139139
}
140140
} else {
141141
listenAddrs.push(addr.encapsulate(`/ipfs/${swarm._peerInfo.id.toB58String()}`))

src/pubsub.js

Lines changed: 29 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -32,27 +32,35 @@ module.exports = (node, Pubsub, config) => {
3232
* const handler = (message) => { }
3333
* libp2p.subscribe(topic, handler, callback)
3434
*/
35-
subscribe: promisify((topic, handler, options, callback) => {
35+
subscribe: (topic, handler, options, callback) => {
36+
// can't use promisify because it thinks the handler is a callback
3637
if (typeof options === 'function') {
3738
callback = options
3839
options = {}
3940
}
4041

4142
if (!node.isStarted() && !pubsub.started) {
42-
return nextTick(callback, errCode(new Error(messages.NOT_STARTED_YET), codes.PUBSUB_NOT_STARTED))
43-
}
43+
const err = errCode(new Error(messages.NOT_STARTED_YET), codes.PUBSUB_NOT_STARTED)
4444

45-
function subscribe (cb) {
46-
if (pubsub.listenerCount(topic) === 0) {
47-
pubsub.subscribe(topic)
45+
if (callback) {
46+
return nextTick(() => callback(err))
4847
}
4948

50-
pubsub.on(topic, handler)
51-
nextTick(cb)
49+
return Promise.reject(err)
5250
}
5351

54-
subscribe(callback)
55-
}),
52+
if (pubsub.listenerCount(topic) === 0) {
53+
pubsub.subscribe(topic)
54+
}
55+
56+
pubsub.on(topic, handler)
57+
58+
if (callback) {
59+
return nextTick(() => callback())
60+
}
61+
62+
return Promise.resolve()
63+
},
5664

5765
/**
5866
* Unsubscribes from a pubsub topic
@@ -76,9 +84,16 @@ module.exports = (node, Pubsub, config) => {
7684
*
7785
* libp2p.unsubscribe(topic, handler, callback)
7886
*/
79-
unsubscribe: promisify((topic, handler, callback) => {
87+
unsubscribe: (topic, handler, callback) => {
88+
// can't use promisify because it thinks the handler is a callback
8089
if (!node.isStarted() && !pubsub.started) {
81-
return nextTick(callback, errCode(new Error(messages.NOT_STARTED_YET), codes.PUBSUB_NOT_STARTED))
90+
const err = errCode(new Error(messages.NOT_STARTED_YET), codes.PUBSUB_NOT_STARTED)
91+
92+
if (callback) {
93+
return nextTick(() => callback(err))
94+
}
95+
96+
return Promise.reject(err)
8297
}
8398

8499
if (!handler) {
@@ -91,12 +106,12 @@ module.exports = (node, Pubsub, config) => {
91106
pubsub.unsubscribe(topic)
92107
}
93108

94-
if (typeof callback === 'function') {
109+
if (callback) {
95110
return nextTick(() => callback())
96111
}
97112

98113
return Promise.resolve()
99-
}),
114+
},
100115

101116
publish: promisify((topic, data, callback) => {
102117
if (!node.isStarted() && !pubsub.started) {

src/switch/connection/handler.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ const IncomingConnection = require('./incoming')
55
const observeConn = require('../observe-connection')
66

77
function listener (_switch) {
8-
const log = debug(`libp2p:switch:listener`)
8+
const log = debug('libp2p:switch:listener')
99

1010
/**
1111
* Takes a transport key and returns a connection handler function

src/switch/protocol-muxer.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ module.exports = function protocolMuxer (protocols, observer) {
4141

4242
ms.handle(parentConn, (err) => {
4343
if (err) {
44-
log.error(`multistream handshake failed`, err)
44+
log.error('multistream handshake failed', err)
4545
}
4646
})
4747
}

test/circuit/dialer.spec.js

Lines changed: 22 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ const dirtyChai = require('dirty-chai')
2424
const expect = chai.expect
2525
chai.use(dirtyChai)
2626

27-
describe(`dialer tests`, function () {
27+
describe('dialer tests', function () {
2828
let dialer
2929

3030
beforeEach(() => {
@@ -35,22 +35,22 @@ describe(`dialer tests`, function () {
3535
sinon.restore()
3636
})
3737

38-
describe(`.dial`, function () {
38+
describe('.dial', function () {
3939
beforeEach(function () {
4040
dialer.relayPeers = new Map()
4141
dialer.relayPeers.set(nodes.node2.id, new Connection())
4242
dialer.relayPeers.set(nodes.node3.id, new Connection())
4343
dialer.dial.callThrough()
4444
})
4545

46-
it(`fail on non circuit addr`, function () {
46+
it('fail on non circuit addr', function () {
4747
const dstMa = multiaddr(`/ipfs/${nodes.node4.id}`)
4848
expect(() => dialer.dial(dstMa, (err) => {
4949
err.to.match(/invalid circuit address/)
5050
}))
5151
})
5252

53-
it(`dial a peer`, function (done) {
53+
it('dial a peer', function (done) {
5454
const dstMa = multiaddr(`/p2p-circuit/ipfs/${nodes.node3.id}`)
5555
dialer._dialPeer.callsFake(function (dstMa, relay, callback) {
5656
return callback(null, dialer.relayPeers.get(nodes.node3.id))
@@ -63,7 +63,7 @@ describe(`dialer tests`, function () {
6363
})
6464
})
6565

66-
it(`dial a peer over the specified relay`, function (done) {
66+
it('dial a peer over the specified relay', function (done) {
6767
const dstMa = multiaddr(`/ipfs/${nodes.node3.id}/p2p-circuit/ipfs/${nodes.node4.id}`)
6868
dialer._dialPeer.callsFake(function (dstMa, relay, callback) {
6969
expect(relay.toString()).to.equal(`/ipfs/${nodes.node3.id}`)
@@ -78,7 +78,7 @@ describe(`dialer tests`, function () {
7878
})
7979
})
8080

81-
describe(`.canHop`, function () {
81+
describe('.canHop', function () {
8282
let fromConn = null
8383
const peer = new PeerInfo(PeerId.createFromB58String('QmQWqGdndSpAkxfk8iyiJyz3XXGkrDNujvc8vEst3baubA'))
8484

@@ -94,7 +94,7 @@ describe(`dialer tests`, function () {
9494
dialer._dialRelayHelper.callThrough()
9595
})
9696

97-
it(`should handle successful CAN_HOP`, (done) => {
97+
it('should handle successful CAN_HOP', (done) => {
9898
dialer._dialRelay.callsFake((_, cb) => {
9999
pull(
100100
values([{
@@ -114,7 +114,7 @@ describe(`dialer tests`, function () {
114114
})
115115
})
116116

117-
it(`should handle failed CAN_HOP`, function (done) {
117+
it('should handle failed CAN_HOP', function (done) {
118118
dialer._dialRelay.callsFake((_, cb) => {
119119
pull(
120120
values([{
@@ -135,7 +135,7 @@ describe(`dialer tests`, function () {
135135
})
136136
})
137137

138-
describe(`._dialPeer`, function () {
138+
describe('._dialPeer', function () {
139139
beforeEach(function () {
140140
dialer.relayPeers = new Map()
141141
dialer.relayPeers.set(nodes.node1.id, new Connection())
@@ -144,14 +144,14 @@ describe(`dialer tests`, function () {
144144
dialer._dialPeer.callThrough()
145145
})
146146

147-
it(`should dial a peer over any relay`, function (done) {
147+
it('should dial a peer over any relay', function (done) {
148148
const dstMa = multiaddr(`/ipfs/${nodes.node4.id}`)
149149
dialer._negotiateRelay.callsFake(function (conn, dstMa, callback) {
150150
if (conn === dialer.relayPeers.get(nodes.node3.id)) {
151151
return callback(null, dialer.relayPeers.get(nodes.node3.id))
152152
}
153153

154-
callback(new Error(`error`))
154+
callback(new Error('error'))
155155
})
156156

157157
dialer._dialPeer(dstMa, (err, conn) => {
@@ -162,22 +162,22 @@ describe(`dialer tests`, function () {
162162
})
163163
})
164164

165-
it(`should fail dialing a peer over any relay`, function (done) {
165+
it('should fail dialing a peer over any relay', function (done) {
166166
const dstMa = multiaddr(`/ipfs/${nodes.node4.id}`)
167167
dialer._negotiateRelay.callsFake(function (conn, dstMa, callback) {
168-
callback(new Error(`error`))
168+
callback(new Error('error'))
169169
})
170170

171171
dialer._dialPeer(dstMa, (err, conn) => {
172172
expect(conn).to.be.undefined()
173173
expect(err).to.not.be.null()
174-
expect(err).to.equal(`no relay peers were found or all relays failed to dial`)
174+
expect(err).to.equal('no relay peers were found or all relays failed to dial')
175175
done()
176176
})
177177
})
178178
})
179179

180-
describe(`._negotiateRelay`, function () {
180+
describe('._negotiateRelay', function () {
181181
const dstMa = multiaddr(`/ipfs/${nodes.node4.id}`)
182182

183183
let conn = null
@@ -188,7 +188,7 @@ describe(`dialer tests`, function () {
188188
PeerId.createFromJSON(nodes.node4, (_, peerId) => {
189189
PeerInfo.create(peerId, (err, peerInfo) => {
190190
peer = peerInfo
191-
peer.multiaddrs.add(`/p2p-circuit/ipfs/QmSswe1dCFRepmhjAMR5VfHeokGLcvVggkuDJm7RMfJSrE`)
191+
peer.multiaddrs.add('/p2p-circuit/ipfs/QmSswe1dCFRepmhjAMR5VfHeokGLcvVggkuDJm7RMfJSrE')
192192
done(err)
193193
})
194194
})
@@ -202,12 +202,12 @@ describe(`dialer tests`, function () {
202202
dialer.relayConns = new Map()
203203
dialer._negotiateRelay.callThrough()
204204
dialer._dialRelayHelper.callThrough()
205-
peer = new PeerInfo(PeerId.createFromB58String(`QmSswe1dCFRepmhjAMR5VfHeokGLcvVggkuDJm7RMfJSrE`))
205+
peer = new PeerInfo(PeerId.createFromB58String('QmSswe1dCFRepmhjAMR5VfHeokGLcvVggkuDJm7RMfJSrE'))
206206
p = pair()
207207
conn = new Connection(p[1])
208208
})
209209

210-
it(`should write the correct dst addr`, function (done) {
210+
it('should write the correct dst addr', function (done) {
211211
dialer._dialRelay.callsFake((_, cb) => {
212212
pull(
213213
p[0],
@@ -228,7 +228,7 @@ describe(`dialer tests`, function () {
228228
dialer._negotiateRelay(peer, dstMa, done)
229229
})
230230

231-
it(`should negotiate relay`, function (done) {
231+
it('should negotiate relay', function (done) {
232232
dialer._dialRelay.callsFake((_, cb) => {
233233
pull(
234234
p[0],
@@ -253,7 +253,7 @@ describe(`dialer tests`, function () {
253253
})
254254
})
255255

256-
it(`should fail with an invalid peer id`, function (done) {
256+
it('should fail with an invalid peer id', function (done) {
257257
const dstMa = multiaddr('/ip4/127.0.0.1/tcp/4001')
258258
dialer._dialRelay.callsFake((_, cb) => {
259259
pull(
@@ -279,7 +279,7 @@ describe(`dialer tests`, function () {
279279
})
280280
})
281281

282-
it(`should handle failed relay negotiation`, function (done) {
282+
it('should handle failed relay negotiation', function (done) {
283283
dialer._dialRelay.callsFake((_, cb) => {
284284
cb(null, conn)
285285
pull(
@@ -295,7 +295,7 @@ describe(`dialer tests`, function () {
295295
dialer._negotiateRelay(peer, dstMa, (err, conn) => {
296296
expect(err).to.not.be.null()
297297
expect(err).to.be.an.instanceOf(Error)
298-
expect(err.message).to.be.equal(`Got 400 error code trying to dial over relay`)
298+
expect(err.message).to.be.equal('Got 400 error code trying to dial over relay')
299299
done()
300300
})
301301
})

0 commit comments

Comments
 (0)