Skip to content

Commit 5e45409

Browse files
committed
chore: use new version of pubsub pr
1 parent 3027835 commit 5e45409

File tree

8 files changed

+116
-76
lines changed

8 files changed

+116
-76
lines changed

README.md

Lines changed: 21 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -29,20 +29,29 @@ Gossipsub is an implementation of pubsub based on meshsub and floodsub. You can
2929
```javascript
3030
const Gossipsub = require('libp2p-gossipsub')
3131

32-
const gsub = new Gossipsub(node)
32+
const registrar = {
33+
handle: (multicodecs, handle) => {
34+
// register multicodec to libp2p
35+
// handle function is called everytime a remote peer opens a stream to the peer.
36+
},
37+
register: (multicodecs, handlers) => {
38+
// handlers will be used to notify pubsub of peer connection establishment or closing
39+
},
40+
unregister: (id) => {
3341

34-
gsub.start((err) => {
35-
if (err) {
36-
console.log('Upsy', err)
3742
}
38-
gsub.on('fruit', (data) => {
39-
console.log(data)
40-
})
41-
gsub.subscribe('fruit')
43+
}
4244

43-
gsub.publish('fruit', new Buffer('banana'))
45+
const gsub = new Gossipsub(peerInfo, registrar, options)
46+
47+
await gsub.start()
48+
49+
gsub.on('fruit', (data) => {
50+
console.log(data)
4451
})
52+
gsub.subscribe('fruit')
4553

54+
gsub.publish('fruit', new Buffer('banana'))
4655
```
4756

4857
## API
@@ -51,14 +60,16 @@ gsub.start((err) => {
5160

5261
```js
5362
const options = {…}
54-
const gossipsub = new Gossipsub(libp2pNode, options)
63+
const gossipsub = new Gossipsub(peerInfo, registrar, options)
5564
```
5665

5766
Options is an optional object with the following key-value pairs:
5867

5968
* **`fallbackToFloodsub`**: boolean identifying whether the node should fallback to the floodsub protocol, if another connecting peer does not support gossipsub (defaults to **true**).
6069
* **`emitSelf`**: boolean identifying whether the node should emit to self on publish, in the event of the topic being subscribed (defaults to **false**).
6170

71+
For the remaining API, see https://github.com/libp2p/js-libp2p-pubsub
72+
6273
## Contribute
6374

6475
This module is actively under development. Please check out the issues and submit PRs!

src/index.js

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ class GossipSub extends BasicPubsub {
1616
/**
1717
* @param {PeerInfo} peerInfo instance of the peer's PeerInfo
1818
* @param {Object} registrar
19+
* @param {function} registrar.handle
1920
* @param {function} registrar.register
2021
* @param {function} registrar.unregister
2122
* @param {Object} [options]
@@ -28,6 +29,7 @@ class GossipSub extends BasicPubsub {
2829
assert(PeerInfo.isPeerInfo(peerInfo), 'peer info must be an instance of `peer-info`')
2930

3031
// registrar handling
32+
assert(registrar && typeof registrar.handle === 'function', 'a handle function must be provided in registrar')
3133
assert(registrar && typeof registrar.register === 'function', 'a register function must be provided in registrar')
3234
assert(registrar && typeof registrar.unregister === 'function', 'a unregister function must be provided in registrar')
3335

@@ -588,3 +590,4 @@ class GossipSub extends BasicPubsub {
588590
}
589591

590592
module.exports = GossipSub
593+
module.exports.multicodec = constants.GossipSubID

src/pubsub.js

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ class BasicPubSub extends Pubsub {
2727
* @param {string} props.multicodec protocol identificer to connect
2828
* @param {PeerInfo} props.peerInfo peer's peerInfo
2929
* @param {Object} props.registrar registrar for libp2p protocols
30+
* @param {function} props.registrar.handle
3031
* @param {function} props.registrar.register
3132
* @param {function} props.registrar.unregister
3233
* @param {Object} [props.options]
@@ -82,9 +83,10 @@ class BasicPubSub extends Pubsub {
8283
* @override
8384
* @param {PeerInfo} peerInfo peer info
8485
* @param {Connection} conn connection to the peer
86+
* @returns {Promise<void>}
8587
*/
86-
_onPeerConnected (peerInfo, conn) {
87-
super._onPeerConnected(peerInfo, conn)
88+
async _onPeerConnected (peerInfo, conn) {
89+
await super._onPeerConnected(peerInfo, conn)
8890
const idB58Str = peerInfo.id.toB58String()
8991
const peer = this.peers.get(idB58Str)
9092

@@ -110,7 +112,7 @@ class BasicPubSub extends Pubsub {
110112
await pipe(
111113
conn,
112114
lp.decode(),
113-
async function collect (source) {
115+
async function (source) {
114116
for await (const data of source) {
115117
const rpcMsg = Buffer.isBuffer(data) ? data : data.slice()
116118

test/2-nodes.spec.js

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,14 +7,14 @@ chai.use(require('chai-spies'))
77
const expect = chai.expect
88

99
const { GossipSubID: multicodec } = require('../src/constants')
10-
const DuplexPair = require('it-pair/duplex')
1110

1211
const {
1312
createGossipsub,
1413
createGossipsubNodes,
1514
createGossipsubConnectedNodes,
1615
mockRegistrar,
1716
expectSet,
17+
ConnectionPair,
1818
first
1919
} = require('./utils')
2020

@@ -66,7 +66,7 @@ describe('2 nodes', () => {
6666
const onConnect1 = registrarRecords[1][multicodec].onConnect
6767

6868
// Notice peers of connection
69-
const [d0, d1] = DuplexPair()
69+
const [d0, d1] = ConnectionPair()
7070
onConnect0(nodes[1].peerInfo, d0)
7171
onConnect1(nodes[0].peerInfo, d1)
7272

@@ -315,7 +315,7 @@ describe('2 nodes', () => {
315315
const onConnect1 = registrarRecords[1][multicodec].onConnect
316316

317317
// Notice peers of connection
318-
const [d0, d1] = DuplexPair()
318+
const [d0, d1] = ConnectionPair()
319319
onConnect0(nodes[1].peerInfo, d0)
320320
onConnect1(nodes[0].peerInfo, d1)
321321

test/floodsub.spec.js

Lines changed: 18 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -6,14 +6,15 @@ chai.use(require('dirty-chai'))
66

77
const expect = chai.expect
88
const times = require('lodash/times')
9-
const DuplexPair = require('it-pair/duplex')
109

1110
const { multicodec: floodsubMulticodec } = require('libp2p-floodsub')
1211

1312
const {
1413
createGossipsub,
1514
createFloodsubNode,
1615
expectSet,
16+
createMockRegistrar,
17+
ConnectionPair,
1718
first
1819
} = require('./utils')
1920

@@ -22,19 +23,6 @@ const shouldNotHappen = () => expect.fail()
2223
describe('gossipsub fallbacks to floodsub', () => {
2324
let registrarRecords = Array.from({ length: 2 })
2425

25-
const registrar = (registrarRecord) => ({
26-
register: (multicodecs, handlers) => {
27-
multicodecs.forEach((multicodec) => {
28-
registrarRecord[multicodec] = handlers
29-
})
30-
},
31-
unregister: (multicodecs) => {
32-
multicodecs.forEach((multicodec) => {
33-
delete registrarRecord[multicodec]
34-
})
35-
}
36-
})
37-
3826
describe('basics', () => {
3927
let nodeGs
4028
let nodeFs
@@ -43,8 +31,8 @@ describe('gossipsub fallbacks to floodsub', () => {
4331
registrarRecords[0] = {}
4432
registrarRecords[1] = {}
4533

46-
nodeGs = await createGossipsub(registrar(registrarRecords[0]), true)
47-
nodeFs = await createFloodsubNode(registrar(registrarRecords[1]), true)
34+
nodeGs = await createGossipsub(createMockRegistrar(registrarRecords[0]), true)
35+
nodeFs = await createFloodsubNode(createMockRegistrar(registrarRecords[1]), true)
4836
})
4937

5038
afterEach(async function () {
@@ -63,7 +51,7 @@ describe('gossipsub fallbacks to floodsub', () => {
6351
expect(onConnectFs).to.exist()
6452

6553
// Notice peers of connection
66-
const [d0, d1] = DuplexPair()
54+
const [d0, d1] = ConnectionPair()
6755
onConnectGs(nodeFs.peerInfo, d0)
6856
onConnectFs(nodeGs.peerInfo, d1)
6957

@@ -80,8 +68,8 @@ describe('gossipsub fallbacks to floodsub', () => {
8068
registrarRecords[0] = {}
8169
registrarRecords[1] = {}
8270

83-
nodeGs = await createGossipsub(registrar(registrarRecords[0]), true, { fallbackToFloodsub: false })
84-
nodeFs = await createFloodsubNode(registrar(registrarRecords[1]), true)
71+
nodeGs = await createGossipsub(createMockRegistrar(registrarRecords[0]), true, { fallbackToFloodsub: false })
72+
nodeFs = await createFloodsubNode(createMockRegistrar(registrarRecords[1]), true)
8573
})
8674

8775
after(async function () {
@@ -120,14 +108,14 @@ describe('gossipsub fallbacks to floodsub', () => {
120108
registrarRecords[0] = {}
121109
registrarRecords[1] = {}
122110

123-
nodeGs = await createGossipsub(registrar(registrarRecords[0]), true)
124-
nodeFs = await createFloodsubNode(registrar(registrarRecords[1]), true)
111+
nodeGs = await createGossipsub(createMockRegistrar(registrarRecords[0]), true)
112+
nodeFs = await createFloodsubNode(createMockRegistrar(registrarRecords[1]), true)
125113

126114
const onConnectGs = registrarRecords[0][floodsubMulticodec].onConnect
127115
const onConnectFs = registrarRecords[1][floodsubMulticodec].onConnect
128116

129117
// Notice peers of connection
130-
const [d0, d1] = DuplexPair()
118+
const [d0, d1] = ConnectionPair()
131119
onConnectGs(nodeFs.peerInfo, d0)
132120
onConnectFs(nodeGs.peerInfo, d1)
133121
})
@@ -175,14 +163,14 @@ describe('gossipsub fallbacks to floodsub', () => {
175163
registrarRecords[0] = {}
176164
registrarRecords[1] = {}
177165

178-
nodeGs = await createGossipsub(registrar(registrarRecords[0]), true)
179-
nodeFs = await createFloodsubNode(registrar(registrarRecords[1]), true)
166+
nodeGs = await createGossipsub(createMockRegistrar(registrarRecords[0]), true)
167+
nodeFs = await createFloodsubNode(createMockRegistrar(registrarRecords[1]), true)
180168

181169
const onConnectGs = registrarRecords[0][floodsubMulticodec].onConnect
182170
const onConnectFs = registrarRecords[1][floodsubMulticodec].onConnect
183171

184172
// Notice peers of connection
185-
const [d0, d1] = DuplexPair()
173+
const [d0, d1] = ConnectionPair()
186174
onConnectGs(nodeFs.peerInfo, d0)
187175
onConnectFs(nodeGs.peerInfo, d1)
188176

@@ -286,16 +274,16 @@ describe('gossipsub fallbacks to floodsub', () => {
286274
registrarRecords[0] = {}
287275
registrarRecords[1] = {}
288276

289-
nodeGs = await createGossipsub(registrar(registrarRecords[0]), true)
290-
nodeFs = await createFloodsubNode(registrar(registrarRecords[1]), true)
277+
nodeGs = await createGossipsub(createMockRegistrar(registrarRecords[0]), true)
278+
nodeFs = await createFloodsubNode(createMockRegistrar(registrarRecords[1]), true)
291279

292280
const onConnectGs = registrarRecords[0][floodsubMulticodec].onConnect
293281
const onConnectFs = registrarRecords[1][floodsubMulticodec].onConnect
294282

295283
// Notice peers of connection
296-
const [d0, d1] = DuplexPair()
297-
onConnectGs(nodeFs.peerInfo, d0)
298-
onConnectFs(nodeGs.peerInfo, d1)
284+
const [d0, d1] = ConnectionPair()
285+
await onConnectGs(nodeFs.peerInfo, d0)
286+
await onConnectFs(nodeGs.peerInfo, d1)
299287

300288
nodeGs.subscribe(topic)
301289
nodeFs.subscribe(topic)

test/mesh.spec.js

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,12 @@
22
/* eslint-env mocha */
33

44
const { expect } = require('chai')
5-
const DuplexPair = require('it-pair/duplex')
65

76
const { GossipSubDhi, GossipSubID: multicodec } = require('../src/constants')
8-
const { createGossipsubNodes } = require('./utils')
7+
const {
8+
createGossipsubNodes,
9+
ConnectionPair
10+
} = require('./utils')
911

1012
describe('mesh overlay', () => {
1113
let nodes, registrarRecords
@@ -39,7 +41,7 @@ describe('mesh overlay', () => {
3941
const onConnectN = registrarRecords[n][multicodec].onConnect
4042

4143
// Notice peers of connection
42-
const [d0, d1] = DuplexPair()
44+
const [d0, d1] = ConnectionPair()
4345
onConnect0(nodes[n].peerInfo, d0)
4446
onConnectN(nodes[0].peerInfo, d1)
4547
}
@@ -66,7 +68,7 @@ describe('mesh overlay', () => {
6668
const onConnectN = registrarRecords[i][multicodec].onConnect
6769

6870
// Notice peers of connection
69-
const [d0, d1] = DuplexPair()
71+
const [d0, d1] = ConnectionPair()
7072
onConnect0(nodes[i].peerInfo, d0)
7173
onConnectN(nodes[0].peerInfo, d1)
7274
}

test/multiple-nodes.spec.js

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -7,11 +7,11 @@ chai.use(require('dirty-chai'))
77
const expect = chai.expect
88
const promisify = require('promisify-es6')
99

10-
const DuplexPair = require('it-pair/duplex')
1110
const { GossipSubID: multicodec } = require('../src/constants')
1211
const {
1312
createGossipsubNodes,
14-
expectSet
13+
expectSet,
14+
ConnectionPair
1515
} = require('./utils')
1616

1717
describe('multiple nodes (more than 2)', () => {
@@ -40,11 +40,11 @@ describe('multiple nodes (more than 2)', () => {
4040
const onConnectC = registrarRecords[2][multicodec].onConnect
4141

4242
// Notice peers of connection
43-
const [d0, d1] = DuplexPair()
43+
const [d0, d1] = ConnectionPair()
4444
onConnectA(b.peerInfo, d0)
4545
onConnectB(a.peerInfo, d1)
4646

47-
const [d2, d3] = DuplexPair()
47+
const [d2, d3] = ConnectionPair()
4848
onConnectB(c.peerInfo, d2)
4949
onConnectC(b.peerInfo, d3)
5050
})
@@ -105,11 +105,11 @@ describe('multiple nodes (more than 2)', () => {
105105
const onConnectC = registrarRecords[2][multicodec].onConnect
106106

107107
// Notice peers of connection
108-
const [d0, d1] = DuplexPair()
108+
const [d0, d1] = ConnectionPair()
109109
onConnectA(b.peerInfo, d0)
110110
onConnectB(a.peerInfo, d1)
111111

112-
const [d2, d3] = DuplexPair()
112+
const [d2, d3] = ConnectionPair()
113113
onConnectB(c.peerInfo, d2)
114114
onConnectC(b.peerInfo, d3)
115115

@@ -200,11 +200,11 @@ describe('multiple nodes (more than 2)', () => {
200200
const onConnectC = registrarRecords[2][multicodec].onConnect
201201

202202
// Notice peers of connection
203-
const [d0, d1] = DuplexPair()
203+
const [d0, d1] = ConnectionPair()
204204
onConnectA(b.peerInfo, d0)
205205
onConnectB(a.peerInfo, d1)
206206

207-
const [d2, d3] = DuplexPair()
207+
const [d2, d3] = ConnectionPair()
208208
onConnectB(c.peerInfo, d2)
209209
onConnectC(b.peerInfo, d3)
210210

@@ -265,19 +265,19 @@ describe('multiple nodes (more than 2)', () => {
265265
const onConnectE = registrarRecords[4][multicodec].onConnect
266266

267267
// Notice peers of connection
268-
const [d0, d1] = DuplexPair()
268+
const [d0, d1] = ConnectionPair()
269269
onConnectA(b.peerInfo, d0)
270270
onConnectB(a.peerInfo, d1)
271271

272-
const [d2, d3] = DuplexPair()
272+
const [d2, d3] = ConnectionPair()
273273
onConnectB(c.peerInfo, d2)
274274
onConnectC(b.peerInfo, d3)
275275

276-
const [d4, d5] = DuplexPair()
276+
const [d4, d5] = ConnectionPair()
277277
onConnectC(d.peerInfo, d4)
278278
onConnectD(c.peerInfo, d5)
279279

280-
const [d6, d7] = DuplexPair()
280+
const [d6, d7] = ConnectionPair()
281281
onConnectD(e.peerInfo, d6)
282282
onConnectE(d.peerInfo, d7)
283283

0 commit comments

Comments
 (0)