Skip to content

Commit 55c9bfa

Browse files
feat: gossipsub 1.1 (#733)
* feat: gossipsub 1.1 BREAKING CHANGE: pubsub implementation is now directly exposed and its API was updated according to the new pubsub interface in js-libp2p-interfaces repo * chore: use gossipsub branch with src added * fix: add pubsub handlers adapter * chore: fix deps * chore: update pubsub docs and examples * chore: apply suggestions from code review Co-authored-by: Jacob Heun <[email protected]> * chore: use new floodsub * chore: change validator doc set Co-authored-by: Jacob Heun <[email protected]> * chore: add new gossipsub src Co-authored-by: Jacob Heun <[email protected]>
1 parent 1e86971 commit 55c9bfa

File tree

12 files changed

+247
-174
lines changed

12 files changed

+247
-174
lines changed

.aegir.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ const after = async () => {
4545
}
4646

4747
module.exports = {
48-
bundlesize: { maxSize: '205kB' },
48+
bundlesize: { maxSize: '225kB' },
4949
hooks: {
5050
pre: before,
5151
post: after

doc/API.md

Lines changed: 131 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,8 @@
4646
* [`pubsub.publish`](#pubsubpublish)
4747
* [`pubsub.subscribe`](#pubsubsubscribe)
4848
* [`pubsub.unsubscribe`](#pubsubunsubscribe)
49+
* [`pubsub.on`](#pubsubon)
50+
* [`pubsub.removeListener`](#pubsubremovelistener)
4951
* [`connectionManager.get`](#connectionmanagerget)
5052
* [`connectionManager.setPeerValue`](#connectionmanagersetpeervalue)
5153
* [`connectionManager.size`](#connectionmanagersize)
@@ -1327,16 +1329,15 @@ await libp2p.pubsub.publish(topic, data)
13271329

13281330
### pubsub.subscribe
13291331

1330-
Subscribes the given handler to a pubsub topic.
1332+
Subscribes to a pubsub topic.
13311333

1332-
`libp2p.pubsub.subscribe(topic, handler)`
1334+
`libp2p.pubsub.subscribe(topic)`
13331335

13341336
#### Parameters
13351337

13361338
| Name | Type | Description |
13371339
|------|------|-------------|
13381340
| topic | `string` | topic to subscribe |
1339-
| handler | `function({ from: string, data: Uint8Array, seqno: Uint8Array, topicIDs: Array<string>, signature: Uint8Array, key: Uint8Array })` | handler for new data on topic |
13401341

13411342
#### Returns
13421343

@@ -1352,21 +1353,21 @@ const handler = (msg) => {
13521353
// msg.data - pubsub data received
13531354
}
13541355

1355-
libp2p.pubsub.subscribe(topic, handler)
1356+
libp2p.pubsub.on(topic, handler)
1357+
libp2p.pubsub.subscribe(topic)
13561358
```
13571359

13581360
### pubsub.unsubscribe
13591361

1360-
Unsubscribes the given handler from a pubsub topic. If no handler is provided, all handlers for the topic are removed.
1362+
Unsubscribes from a pubsub topic.
13611363

1362-
`libp2p.pubsub.unsubscribe(topic, handler)`
1364+
`libp2p.pubsub.unsubscribe(topic)`
13631365

13641366
#### Parameters
13651367

13661368
| Name | Type | Description |
13671369
|------|------|-------------|
13681370
| topic | `string` | topic to unsubscribe |
1369-
| handler | `function(<object>)` | handler subscribed |
13701371

13711372
#### Returns
13721373

@@ -1382,7 +1383,129 @@ const handler = (msg) => {
13821383
// msg.data - pubsub data received
13831384
}
13841385

1385-
libp2p.pubsub.unsubscribe(topic, handler)
1386+
libp2p.pubsub.removeListener(topic handler)
1387+
libp2p.pubsub.unsubscribe(topic)
1388+
```
1389+
1390+
## pubsub.on
1391+
1392+
A Pubsub router is an [EventEmitter](https://nodejs.org/api/events.html#events_class_eventemitter) and uses its events for pubsub message handlers.
1393+
1394+
`libp2p.pubsub.on(topic, handler)`
1395+
1396+
#### Parameters
1397+
1398+
| Name | Type | Description |
1399+
|------|------|-------------|
1400+
| topic | `string` | topic to listen |
1401+
| handler | `function({ from: string, data: Uint8Array, seqno: Uint8Array, topicIDs: Array<string>, signature: Uint8Array, key: Uint8Array })` | handler for new data on topic |
1402+
1403+
#### Returns
1404+
1405+
| Type | Description |
1406+
|------|-------------|
1407+
| `void` | |
1408+
1409+
#### Example
1410+
1411+
```js
1412+
const topic = 'topic'
1413+
const handler = (msg) => {
1414+
// msg.data - pubsub data received
1415+
}
1416+
1417+
libp2p.pubsub.on(topic, handler)
1418+
libp2p.pubsub.subscribe(topic)
1419+
```
1420+
1421+
## pubsub.removeListener
1422+
1423+
A Pubsub router is an [EventEmitter](https://nodejs.org/api/events.html#events_class_eventemitter) and uses its events for pubsub message handlers.
1424+
1425+
`libp2p.pubsub.removeListener(topic, handler)`
1426+
1427+
#### Parameters
1428+
1429+
| Name | Type | Description |
1430+
|------|------|-------------|
1431+
| topic | `string` | topic to remove listener |
1432+
| handler | `function({ from: string, data: Uint8Array, seqno: Uint8Array, topicIDs: Array<string>, signature: Uint8Array, key: Uint8Array })` | handler for new data on topic |
1433+
1434+
#### Returns
1435+
1436+
| Type | Description |
1437+
|------|-------------|
1438+
| `void` | |
1439+
1440+
#### Example
1441+
1442+
```js
1443+
const topic = 'topic'
1444+
const handler = (msg) => {
1445+
// msg.data - pubsub data received
1446+
}
1447+
1448+
libp2p.pubsub.removeListener(topic handler)
1449+
libp2p.pubsub.unsubscribe(topic)
1450+
```
1451+
1452+
## pubsub.topicValidators.set
1453+
1454+
Pubsub routers support message validators per topic, which will validate the message before its propagations. Set is used to specify a validator for a topic.
1455+
1456+
`libp2p.pubsub.topicValidators.set(topic, validator)`
1457+
1458+
#### Parameters
1459+
1460+
| Name | Type | Description |
1461+
|------|------|-------------|
1462+
| topic | `string` | topic to bind a validator |
1463+
| handler | `function({ topic: string, msg: RPC })` | validator for new data on topic |
1464+
1465+
#### Returns
1466+
1467+
| Type | Description |
1468+
|------|-------------|
1469+
| `Map<string, function(string, RPC)>` | The `Map` object |
1470+
1471+
#### Example
1472+
1473+
```js
1474+
const topic = 'topic'
1475+
const validateMessage = (msgTopic, msg) => {
1476+
const input = uint8ArrayToString(msg.data)
1477+
const validInputs = ['a', 'b', 'c']
1478+
1479+
if (!validInputs.includes(input)) {
1480+
throw new Error('no valid input received')
1481+
}
1482+
}
1483+
libp2p.pubsub.topicValidators.set(topic, validateMessage)
1484+
```
1485+
1486+
## pubsub.topicValidators.delete
1487+
1488+
Pubsub routers support message validators per topic, which will validate the message before its propagations. Delete is used to remove a validator for a topic.
1489+
1490+
`libp2p.pubsub.topicValidators.delete(topic)`
1491+
1492+
#### Parameters
1493+
1494+
| Name | Type | Description |
1495+
|------|------|-------------|
1496+
| topic | `string` | topic to remove a validator |
1497+
1498+
#### Returns
1499+
1500+
| Type | Description |
1501+
|------|-------------|
1502+
| `boolean` | `true` if an element in the Map object existed and has been removed, or `false` if the element does not exist. |
1503+
1504+
#### Example
1505+
1506+
```js
1507+
const topic = 'topic'
1508+
libp2p.pubsub.topicValidators.delete(topic)
13861509
```
13871510

13881511
### connectionManager.get

examples/pubsub/1.js

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ const { NOISE } = require('libp2p-noise')
88
const SECIO = require('libp2p-secio')
99
const Gossipsub = require('libp2p-gossipsub')
1010
const uint8ArrayFromString = require('uint8arrays/from-string')
11+
const uint8ArrayToString = require('uint8arrays/to-string')
1112

1213
const createNode = async () => {
1314
const node = await Libp2p.create({
@@ -38,13 +39,15 @@ const createNode = async () => {
3839
node1.peerStore.addressBook.set(node2.peerId, node2.multiaddrs)
3940
await node1.dial(node2.peerId)
4041

41-
await node1.pubsub.subscribe(topic, (msg) => {
42-
console.log(`node1 received: ${msg.data.toString()}`)
42+
node1.pubsub.on(topic, (msg) => {
43+
console.log(`node1 received: ${uint8ArrayToString(msg.data)}`)
4344
})
45+
await node1.pubsub.subscribe(topic)
4446

45-
await node2.pubsub.subscribe(topic, (msg) => {
46-
console.log(`node2 received: ${msg.data.toString()}`)
47+
node2.pubsub.on(topic, (msg) => {
48+
console.log(`node2 received: ${uint8ArrayToString(msg.data)}`)
4749
})
50+
await node2.pubsub.subscribe(topic)
4851

4952
// node2 publishes "news" every second
5053
setInterval(() => {

examples/pubsub/README.md

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -47,13 +47,15 @@ node1.peerStore.addressBook.set(node2.peerId, node2.multiaddrs)
4747

4848
await node1.dial(node2.peerId)
4949

50-
await node1.pubsub.subscribe(topic, (msg) => {
51-
console.log(`node1 received: ${msg.data.toString()}`)
50+
node1.pubsub.on(topic, (msg) => {
51+
console.log(`node1 received: ${uint8ArrayToString(msg.data)}`)
5252
})
53+
await node1.pubsub.subscribe(topic)
5354

54-
await node2.pubsub.subscribe(topic, (msg) => {
55-
console.log(`node2 received: ${msg.data.toString()}`)
55+
node2.pubsub.on(topic, (msg) => {
56+
console.log(`node2 received: ${uint8ArrayToString(msg.data)}`)
5657
})
58+
await node2.pubsub.subscribe(topic)
5759

5860
// node2 publishes "news" every second
5961
setInterval(() => {

examples/pubsub/message-filtering/1.js

Lines changed: 19 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ const { NOISE } = require('libp2p-noise')
88
const SECIO = require('libp2p-secio')
99
const Gossipsub = require('libp2p-gossipsub')
1010
const uint8ArrayFromString = require('uint8arrays/from-string')
11+
const uint8ArrayToString = require('uint8arrays/to-string')
1112

1213
const createNode = async () => {
1314
const node = await Libp2p.create({
@@ -43,29 +44,34 @@ const createNode = async () => {
4344
await node2.dial(node3.peerId)
4445

4546
//subscribe
46-
await node1.pubsub.subscribe(topic, (msg) => {
47-
console.log(`node1 received: ${msg.data.toString()}`)
47+
node1.pubsub.on(topic, (msg) => {
48+
console.log(`node1 received: ${uint8ArrayToString(msg.data)}`)
4849
})
50+
await node1.pubsub.subscribe(topic)
4951

50-
await node2.pubsub.subscribe(topic, (msg) => {
51-
console.log(`node2 received: ${msg.data.toString()}`)
52+
node2.pubsub.on(topic, (msg) => {
53+
console.log(`node2 received: ${uint8ArrayToString(msg.data)}`)
5254
})
55+
await node2.pubsub.subscribe(topic)
5356

54-
await node3.pubsub.subscribe(topic, (msg) => {
55-
console.log(`node3 received: ${msg.data.toString()}`)
57+
node3.pubsub.on(topic, (msg) => {
58+
console.log(`node3 received: ${uint8ArrayToString(msg.data)}`)
5659
})
60+
await node3.pubsub.subscribe(topic)
5761

58-
const validateFruit = (msgTopic, peer, msg) => {
59-
const fruit = msg.data.toString();
62+
const validateFruit = (msgTopic, msg) => {
63+
const fruit = uint8ArrayToString(msg.data)
6064
const validFruit = ['banana', 'apple', 'orange']
61-
const valid = validFruit.includes(fruit);
62-
return valid;
65+
66+
if (!validFruit.includes(fruit)) {
67+
throw new Error('no valid fruit received')
68+
}
6369
}
6470

6571
//validate fruit
66-
node1.pubsub._pubsub.topicValidators.set(topic, validateFruit);
67-
node2.pubsub._pubsub.topicValidators.set(topic, validateFruit);
68-
node3.pubsub._pubsub.topicValidators.set(topic, validateFruit);
72+
node1.pubsub.topicValidators.set(topic, validateFruit)
73+
node2.pubsub.topicValidators.set(topic, validateFruit)
74+
node3.pubsub.topicValidators.set(topic, validateFruit)
6975

7076
// node1 publishes "fruits" every five seconds
7177
var count = 0;

examples/pubsub/message-filtering/README.md

Lines changed: 18 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -44,31 +44,36 @@ Now we' can subscribe to the fruit topic and log incoming messages.
4444
```JavaScript
4545
const topic = 'fruit'
4646

47-
await node1.pubsub.subscribe(topic, (msg) => {
48-
console.log(`node1 received: ${msg.data.toString()}`)
47+
node1.pubsub.on(topic, (msg) => {
48+
console.log(`node1 received: ${uint8ArrayToString(msg.data)}`)
4949
})
50+
await node1.pubsub.subscribe(topic)
5051

51-
await node2.pubsub.subscribe(topic, (msg) => {
52-
console.log(`node2 received: ${msg.data.toString()}`)
52+
node2.pubsub.on(topic, (msg) => {
53+
console.log(`node2 received: ${uint8ArrayToString(msg.data)}`)
5354
})
55+
await node2.pubsub.subscribe(topic)
5456

55-
await node3.pubsub.subscribe(topic, (msg) => {
56-
console.log(`node3 received: ${msg.data.toString()}`)
57+
node3.pubsub.on(topic, (msg) => {
58+
console.log(`node3 received: ${uint8ArrayToString(msg.data)}`)
5759
})
60+
await node3.pubsub.subscribe(topic)
5861
```
5962
Finally, let's define the additional filter in the fruit topic.
6063

6164
```JavaScript
62-
const validateFruit = (msgTopic, peer, msg) => {
63-
const fruit = msg.data.toString();
65+
const validateFruit = (msgTopic, msg) => {
66+
const fruit = uint8ArrayToString(msg.data)
6467
const validFruit = ['banana', 'apple', 'orange']
65-
const valid = validFruit.includes(fruit);
66-
return valid;
68+
69+
if (!validFruit.includes(fruit)) {
70+
throw new Error('no valid fruit received')
71+
}
6772
}
6873

69-
node1.pubsub._pubsub.topicValidators.set(topic, validateFruit);
70-
node2.pubsub._pubsub.topicValidators.set(topic, validateFruit);
71-
node3.pubsub._pubsub.topicValidators.set(topic, validateFruit);
74+
node1.pubsub.topicValidators.set(topic, validateFruit)
75+
node2.pubsub.topicValidators.set(topic, validateFruit)
76+
node3.pubsub.topicValidators.set(topic, validateFruit)
7277
```
7378

7479
In this example, node one has an outdated version of the system, or is a malicious node. When it tries to publish fruit, the messages are re-shared and all the nodes share the message. However, when it tries to publish a vehicle the message is not re-shared.

package.json

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@
5959
"it-pipe": "^1.1.0",
6060
"it-protocol-buffers": "^0.2.0",
6161
"libp2p-crypto": "^0.18.0",
62-
"libp2p-interfaces": "^0.5.0",
62+
"libp2p-interfaces": "^0.5.1",
6363
"libp2p-utils": "^0.2.0",
6464
"mafmt": "^8.0.0",
6565
"merge-options": "^2.0.0",
@@ -92,16 +92,17 @@
9292
"cids": "^1.0.0",
9393
"delay": "^4.3.0",
9494
"dirty-chai": "^2.0.1",
95-
"interop-libp2p": "^0.2.0",
95+
"interop-libp2p": "libp2p/interop#chore/gossipsub-1.1",
9696
"ipfs-http-client": "^46.0.0",
9797
"it-concat": "^1.0.0",
9898
"it-pair": "^1.0.0",
9999
"it-pushable": "^1.4.0",
100+
"libp2p": ".",
100101
"libp2p-bootstrap": "^0.12.0",
101102
"libp2p-delegated-content-routing": "^0.6.0",
102103
"libp2p-delegated-peer-routing": "^0.6.0",
103-
"libp2p-floodsub": "^0.22.0",
104-
"libp2p-gossipsub": "^0.5.0",
104+
"libp2p-floodsub": "^0.23.0",
105+
"libp2p-gossipsub": "ChainSafe/js-libp2p-gossipsub#chore/use-libp2p-interfaces0.5.1-with-src",
105106
"libp2p-kad-dht": "^0.20.0",
106107
"libp2p-mdns": "^0.15.0",
107108
"libp2p-mplex": "^0.10.0",

0 commit comments

Comments
 (0)