Skip to content

Commit a4521dd

Browse files
pgtedaviddias
authored andcommitted
feat: observe traffic and expose statistics (libp2p#243)
1 parent 613e30d commit a4521dd

13 files changed

+862
-91
lines changed

README.md

+127-3
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ libp2p-switch is used by [libp2p](https://github.com/libp2p/js-libp2p) but it ca
2929
- [`switch.start(callback)`](#swarmlistencallback)
3030
- [`switch.stop(callback)`](#swarmclosecallback)
3131
- [`switch.connection`](#connection)
32+
- [`switch.stats`](#stats-api)
3233
- [Internal Transports API](#transports)
3334
- [Design Notes](#designnotes)
3435
- [Multitransport](#multitransport)
@@ -51,9 +52,26 @@ libp2p-switch is used by [libp2p](https://github.com/libp2p/js-libp2p) but it ca
5152
```JavaScript
5253
const switch = require('libp2p-switch')
5354

54-
const sw = new switch(peerInfo [, peerBook])
55+
const sw = new switch(peerInfo , peerBook [, options])
5556
```
5657

58+
If defined, `options` should be an object with the following keys and respective values:
59+
60+
- `stats`: an object with the following keys and respective values:
61+
- `maxOldPeersRetention`: maximum old peers retention. For when peers disconnect and keeping the stats around in case they reconnect. Defaults to `100`.
62+
- `computeThrottleMaxQueueSize`: maximum queue size to perform stats computation throttling. Defaults to `1000`.
63+
- `computeThrottleTimeout`: Throttle timeout, in miliseconds. Defaults to `2000`,
64+
- `movingAverageIntervals`: Array containin the intervals, in miliseconds, for which moving averages are calculated. Defaults to:
65+
66+
```js
67+
[
68+
60 * 1000, // 1 minute
69+
5 * 60 * 1000, // 5 minutes
70+
15 * 60 * 1000 // 15 minutes
71+
]
72+
```
73+
74+
5775
## API
5876

5977
- peerInfo is a [PeerInfo](https://github.com/libp2p/js-peer-info) object that has the peer information.
@@ -147,6 +165,111 @@ Enable circuit relaying.
147165
- active - is it an active or passive relay (default false)
148166
- `callback`
149167

168+
### Stats API
169+
170+
##### `switch.stats.emit('update')`
171+
172+
Every time any stat value changes, this object emits an `update` event.
173+
174+
#### Global stats
175+
176+
##### `switch.stats.global.snapshot`
177+
178+
Should return a stats snapshot, which is an object containing the following keys and respective values:
179+
180+
- dataSent: amount of bytes sent, [Big](https://github.com/MikeMcl/big.js#readme) number
181+
- dataReceived: amount of bytes received, [Big](https://github.com/MikeMcl/big.js#readme) number
182+
183+
##### `switch.stats.global.movingAverages`
184+
185+
Returns an object containing the following keys:
186+
187+
- dataSent
188+
- dataReceived
189+
190+
Each one of them contains an object that has a key for each interval (`60000`, `300000` and `900000` miliseconds).
191+
192+
Each one of these values is [an exponential moving-average instance](https://github.com/pgte/moving-average#readme).
193+
194+
#### Per-transport stats
195+
196+
##### `switch.stats.transports()`
197+
198+
Returns an array containing the tags (string) for each observed transport.
199+
200+
##### `switch.stats.forTransport(transportTag).snapshot`
201+
202+
Should return a stats snapshot, which is an object containing the following keys and respective values:
203+
204+
- dataSent: amount of bytes sent, [Big](https://github.com/MikeMcl/big.js#readme) number
205+
- dataReceived: amount of bytes received, [Big](https://github.com/MikeMcl/big.js#readme) number
206+
207+
##### `switch.stats.forTransport(transportTag).movingAverages`
208+
209+
Returns an object containing the following keys:
210+
211+
dataSent
212+
dataReceived
213+
214+
Each one of them contains an object that has a key for each interval (`60000`, `300000` and `900000` miliseconds).
215+
216+
Each one of these values is [an exponential moving-average instance](https://github.com/pgte/moving-average#readme).
217+
218+
#### Per-protocol stats
219+
220+
##### `switch.stats.protocols()`
221+
222+
Returns an array containing the tags (string) for each observed protocol.
223+
224+
##### `switch.stats.forProtocol(protocolTag).snapshot`
225+
226+
Should return a stats snapshot, which is an object containing the following keys and respective values:
227+
228+
- dataSent: amount of bytes sent, [Big](https://github.com/MikeMcl/big.js#readme) number
229+
- dataReceived: amount of bytes received, [Big](https://github.com/MikeMcl/big.js#readme) number
230+
231+
232+
##### `switch.stats.forProtocol(protocolTag).movingAverages`
233+
234+
Returns an object containing the following keys:
235+
236+
- dataSent
237+
- dataReceived
238+
239+
Each one of them contains an object that has a key for each interval (`60000`, `300000` and `900000` miliseconds).
240+
241+
Each one of these values is [an exponential moving-average instance](https://github.com/pgte/moving-average#readme).
242+
243+
#### Per-peer stats
244+
245+
##### `switch.stats.peers()`
246+
247+
Returns an array containing the peerIDs (B58-encoded string) for each observed peer.
248+
249+
##### `switch.stats.forPeer(peerId:String).snapshot`
250+
251+
Should return a stats snapshot, which is an object containing the following keys and respective values:
252+
253+
- dataSent: amount of bytes sent, [Big](https://github.com/MikeMcl/big.js#readme) number
254+
- dataReceived: amount of bytes received, [Big](https://github.com/MikeMcl/big.js#readme) number
255+
256+
257+
##### `switch.stats.forPeer(peerId:String).movingAverages`
258+
259+
Returns an object containing the following keys:
260+
261+
- dataSent
262+
- dataReceived
263+
264+
Each one of them contains an object that has a key for each interval (`60000`, `300000` and `900000` miliseconds).
265+
266+
Each one of these values is [an exponential moving-average instance](https://github.com/pgte/moving-average#readme).
267+
268+
#### Stats update interval
269+
270+
Stats are not updated in real-time. Instead, measurements are buffered and stats are updated at an interval. The maximum interval can be defined through the `Switch` constructor option `stats.computeThrottleTimeout`, defined in miliseconds.
271+
272+
150273
### Internal Transports API
151274

152275
##### `switch.transport.add(key, transport, options)`
@@ -212,9 +335,10 @@ Identify is a protocol that switchs mounts on top of itself, to identify the con
212335
- a) peer A dials a conn to peer B
213336
- b) that conn gets upgraded to a stream multiplexer that both peers agree
214337
- c) peer B executes de identify protocol
215-
- d) peer B now can open streams to peer A, knowing which is the identity of peer A
338+
- d) peer B now can open streams to peer A, knowing which is the
339+
identity of peer A
216340

217-
In addition to this, we also share the 'observed addresses' by the other peer, which is extremely useful information for different kinds of network topologies.
341+
In addition to this, we also share the "observed addresses" by the other peer, which is extremely useful information for different kinds of network topologies.
218342

219343
### Notes
220344

package.json

+4-1
Original file line numberDiff line numberDiff line change
@@ -56,18 +56,21 @@
5656
},
5757
"dependencies": {
5858
"async": "^2.6.0",
59+
"big.js": "^5.0.3",
5960
"debug": "^3.1.0",
6061
"interface-connection": "~0.3.2",
6162
"ip-address": "^5.8.9",
6263
"libp2p-circuit": "~0.1.4",
6364
"libp2p-identify": "~0.6.3",
6465
"lodash.includes": "^4.3.0",
66+
"moving-average": "^1.0.0",
6567
"multiaddr": "^3.0.2",
6668
"multistream-select": "~0.14.1",
6769
"once": "^1.4.0",
6870
"peer-id": "~0.10.6",
6971
"peer-info": "~0.11.6",
70-
"pull-stream": "^3.6.1"
72+
"pull-stream": "^3.6.1",
73+
"quick-lru": "^1.1.0"
7174
},
7275
"contributors": [
7376
"Arnaud <[email protected]>",

src/connection.js

+28-26
Original file line numberDiff line numberDiff line change
@@ -4,36 +4,33 @@ const identify = require('libp2p-identify')
44
const multistream = require('multistream-select')
55
const waterfall = require('async/waterfall')
66
const debug = require('debug')
7-
const log = debug('libp2p:swarm:connection')
7+
const log = debug('libp2p:switch:connection')
88
const once = require('once')
99
const setImmediate = require('async/setImmediate')
1010

1111
const Circuit = require('libp2p-circuit')
1212

13-
const protocolMuxer = require('./protocol-muxer')
1413
const plaintext = require('./plaintext')
1514

16-
module.exports = function connection (swarm) {
15+
module.exports = function connection (swtch) {
1716
return {
1817
addUpgrade () {},
1918

2019
addStreamMuxer (muxer) {
2120
// for dialing
22-
swarm.muxers[muxer.multicodec] = muxer
21+
swtch.muxers[muxer.multicodec] = muxer
2322

2423
// for listening
25-
swarm.handle(muxer.multicodec, (protocol, conn) => {
24+
swtch.handle(muxer.multicodec, (protocol, conn) => {
2625
const muxedConn = muxer.listener(conn)
2726

28-
muxedConn.on('stream', (conn) => {
29-
protocolMuxer(swarm.protocols, conn)
30-
})
27+
muxedConn.on('stream', swtch.protocolMuxer(null))
3128

3229
// If identify is enabled
3330
// 1. overload getPeerInfo
3431
// 2. call getPeerInfo
3532
// 3. add this conn to the pool
36-
if (swarm.identify) {
33+
if (swtch.identify) {
3734
// overload peerInfo to use Identify instead
3835
conn.getPeerInfo = (cb) => {
3936
const conn = muxedConn.newStream()
@@ -46,11 +43,16 @@ module.exports = function connection (swarm) {
4643
(conn, cb) => identify.dialer(conn, cb),
4744
(peerInfo, observedAddrs, cb) => {
4845
observedAddrs.forEach((oa) => {
49-
swarm._peerInfo.multiaddrs.addSafe(oa)
46+
swtch._peerInfo.multiaddrs.addSafe(oa)
5047
})
5148
cb(null, peerInfo)
5249
}
53-
], cb)
50+
], (err, pi) => {
51+
if (pi) {
52+
conn.setPeerInfo(pi)
53+
}
54+
cb(err, pi)
55+
})
5456
}
5557

5658
conn.getPeerInfo((err, peerInfo) => {
@@ -59,7 +61,7 @@ module.exports = function connection (swarm) {
5961
}
6062
const b58Str = peerInfo.id.toB58String()
6163

62-
swarm.muxedConns[b58Str] = { muxer: muxedConn }
64+
swtch.muxedConns[b58Str] = { muxer: muxedConn }
6365

6466
if (peerInfo.multiaddrs.size > 0) {
6567
// with incomming conn and through identify, going to pick one
@@ -72,16 +74,16 @@ module.exports = function connection (swarm) {
7274
// no addr, use just their IPFS id
7375
peerInfo.connect(`/ipfs/${b58Str}`)
7476
}
75-
peerInfo = swarm._peerBook.put(peerInfo)
77+
peerInfo = swtch._peerBook.put(peerInfo)
7678

7779
muxedConn.on('close', () => {
78-
delete swarm.muxedConns[b58Str]
80+
delete swtch.muxedConns[b58Str]
7981
peerInfo.disconnect()
80-
peerInfo = swarm._peerBook.put(peerInfo)
81-
setImmediate(() => swarm.emit('peer-mux-closed', peerInfo))
82+
peerInfo = swtch._peerBook.put(peerInfo)
83+
setImmediate(() => swtch.emit('peer-mux-closed', peerInfo))
8284
})
8385

84-
setImmediate(() => swarm.emit('peer-mux-established', peerInfo))
86+
setImmediate(() => swtch.emit('peer-mux-established', peerInfo))
8587
})
8688
}
8789

@@ -90,9 +92,9 @@ module.exports = function connection (swarm) {
9092
},
9193

9294
reuse () {
93-
swarm.identify = true
94-
swarm.handle(identify.multicodec, (protocol, conn) => {
95-
identify.listener(conn, swarm._peerInfo)
95+
swtch.identify = true
96+
swtch.handle(identify.multicodec, (protocol, conn) => {
97+
identify.listener(conn, swtch._peerInfo)
9698
})
9799
},
98100

@@ -106,7 +108,7 @@ module.exports = function connection (swarm) {
106108

107109
// TODO: (dryajov) should we enable circuit listener and
108110
// dialer by default?
109-
swarm.transport.add(Circuit.tag, new Circuit(swarm, config))
111+
swtch.transport.add(Circuit.tag, new Circuit(swtch, config))
110112
}
111113
},
112114

@@ -116,15 +118,15 @@ module.exports = function connection (swarm) {
116118
encrypt = plaintext.encrypt
117119
}
118120

119-
swarm.unhandle(swarm.crypto.tag)
120-
swarm.handle(tag, (protocol, conn) => {
121-
const myId = swarm._peerInfo.id
121+
swtch.unhandle(swtch.crypto.tag)
122+
swtch.handle(tag, (protocol, conn) => {
123+
const myId = swtch._peerInfo.id
122124
const secure = encrypt(myId, conn, undefined, () => {
123-
protocolMuxer(swarm.protocols, secure)
125+
swtch.protocolMuxer(null)(secure)
124126
})
125127
})
126128

127-
swarm.crypto = {tag, encrypt}
129+
swtch.crypto = {tag, encrypt}
128130
}
129131
}
130132
}

0 commit comments

Comments
 (0)