Skip to content

Commit 828e685

Browse files
authored
fix: improve connection tracking (libp2p#318)
* fix: centralize connection events and peer connects * fix: remove unneeded peerBook put
1 parent 4a543cb commit 828e685

File tree

6 files changed

+40
-40
lines changed

6 files changed

+40
-40
lines changed

src/connection/incoming.js

-3
Original file line numberDiff line numberDiff line change
@@ -68,9 +68,6 @@ class IncomingConnectionFSM extends BaseConnection {
6868
this.emit('muxed', this.conn)
6969
})
7070
this._state.on('DISCONNECTING', () => {
71-
if (this.theirPeerInfo) {
72-
this.theirPeerInfo.disconnect()
73-
}
7471
this._state('done')
7572
})
7673
}

src/connection/index.js

+3-10
Original file line numberDiff line numberDiff line change
@@ -268,11 +268,6 @@ class ConnectionFSM extends BaseConnection {
268268
_onDisconnecting () {
269269
this.log('disconnecting from %s', this.theirB58Id, Boolean(this.muxer))
270270

271-
// Issue disconnects on both Peers
272-
if (this.theirPeerInfo) {
273-
this.theirPeerInfo.disconnect()
274-
}
275-
276271
this.switch.connection.remove(this)
277272

278273
delete this.switch.conns[this.theirB58Id]
@@ -284,7 +279,6 @@ class ConnectionFSM extends BaseConnection {
284279
tasks.push((cb) => {
285280
this.muxer.end(() => {
286281
delete this.muxer
287-
this.switch.emit('peer-mux-closed', this.theirPeerInfo)
288282
cb()
289283
})
290284
})
@@ -325,13 +319,13 @@ class ConnectionFSM extends BaseConnection {
325319
return this.close(maybeUnexpectedEnd(err))
326320
}
327321

328-
const conn = observeConnection(null, this.switch.crypto.tag, _conn, this.switch.observer)
329-
330-
this.conn = this.switch.crypto.encrypt(this.ourPeerInfo.id, conn, this.theirPeerInfo.id, (err) => {
322+
const observedConn = observeConnection(null, this.switch.crypto.tag, _conn, this.switch.observer)
323+
const encryptedConn = this.switch.crypto.encrypt(this.ourPeerInfo.id, observedConn, this.theirPeerInfo.id, (err) => {
331324
if (err) {
332325
return this.close(err)
333326
}
334327

328+
this.conn = encryptedConn
335329
this.conn.setPeerInfo(this.theirPeerInfo)
336330
this._state('done')
337331
})
@@ -392,7 +386,6 @@ class ConnectionFSM extends BaseConnection {
392386
this.switch.protocolMuxer(null)(conn)
393387
})
394388

395-
this.switch.emit('peer-mux-established', this.theirPeerInfo)
396389
this._didUpgrade(null)
397390

398391
// Run identify on the connection

src/connection/manager.js

+29-15
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ class ConnectionManager {
3333
// Only add it if it's not there
3434
if (!this.get(connection)) {
3535
this.connections[connection.theirB58Id].push(connection)
36+
this.switch.emit('peer-mux-established', connection.theirPeerInfo)
3637
}
3738
}
3839

@@ -78,14 +79,26 @@ class ConnectionManager {
7879
* @returns {void}
7980
*/
8081
remove (connection) {
81-
if (!this.connections[connection.theirB58Id]) return
82+
// No record of the peer, disconnect it
83+
if (!this.connections[connection.theirB58Id]) {
84+
connection.theirPeerInfo.disconnect()
85+
this.switch.emit('peer-mux-closed', connection.theirPeerInfo)
86+
return
87+
}
8288

8389
for (let i = 0; i < this.connections[connection.theirB58Id].length; i++) {
8490
if (this.connections[connection.theirB58Id][i] === connection) {
8591
this.connections[connection.theirB58Id].splice(i, 1)
86-
return
92+
break
8793
}
8894
}
95+
96+
// The peer is fully disconnected
97+
if (this.connections[connection.theirB58Id].length === 0) {
98+
delete this.connections[connection.theirB58Id]
99+
connection.theirPeerInfo.disconnect()
100+
this.switch.emit('peer-mux-closed', connection.theirPeerInfo)
101+
}
89102
}
90103

91104
/**
@@ -175,6 +188,7 @@ class ConnectionManager {
175188
return log('identify not successful')
176189
}
177190
const b58Str = peerInfo.id.toB58String()
191+
peerInfo = this.switch._peerBook.put(peerInfo)
178192

179193
const connection = new ConnectionFSM({
180194
_switch: this.switch,
@@ -185,24 +199,24 @@ class ConnectionManager {
185199
})
186200
this.switch.connection.add(connection)
187201

188-
if (peerInfo.multiaddrs.size > 0) {
189-
// with incomming conn and through identify, going to pick one
190-
// of the available multiaddrs from the other peer as the one
191-
// I'm connected to as we really can't be sure at the moment
192-
// TODO add this consideration to the connection abstraction!
193-
peerInfo.connect(peerInfo.multiaddrs.toArray()[0])
194-
} else {
195-
// for the case of websockets in the browser, where peers have
196-
// no addr, use just their IPFS id
197-
peerInfo.connect(`/ipfs/${b58Str}`)
202+
// Only update if it's not already connected
203+
if (!peerInfo.isConnected()) {
204+
if (peerInfo.multiaddrs.size > 0) {
205+
// with incomming conn and through identify, going to pick one
206+
// of the available multiaddrs from the other peer as the one
207+
// I'm connected to as we really can't be sure at the moment
208+
// TODO add this consideration to the connection abstraction!
209+
peerInfo.connect(peerInfo.multiaddrs.toArray()[0])
210+
} else {
211+
// for the case of websockets in the browser, where peers have
212+
// no addr, use just their IPFS id
213+
peerInfo.connect(`/ipfs/${b58Str}`)
214+
}
198215
}
199-
peerInfo = this.switch._peerBook.put(peerInfo)
200216

201217
muxedConn.once('close', () => {
202218
connection.close()
203219
})
204-
205-
this.switch.emit('peer-mux-established', peerInfo)
206220
})
207221
})
208222
}

src/dialer/queue.js

+4-7
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22

33
const ConnectionFSM = require('../connection')
44
const { DIAL_ABORTED, ERR_BLACKLISTED } = require('../errors')
5-
const Connection = require('interface-connection').Connection
65
const nextTick = require('async/nextTick')
76
const once = require('once')
87
const debug = require('debug')
@@ -45,10 +44,8 @@ function createConnectionWithProtocol ({ protocol, connection, callback }) {
4544
return callback(err)
4645
}
4746

48-
const proxyConnection = new Connection()
49-
proxyConnection.setPeerInfo(connection.theirPeerInfo)
50-
proxyConnection.setInnerConn(conn)
51-
callback(null, proxyConnection)
47+
conn.setPeerInfo(connection.theirPeerInfo)
48+
callback(null, conn)
5249
})
5350
}
5451

@@ -192,6 +189,8 @@ class Queue {
192189
conn: null
193190
})
194191

192+
this.switch.connection.add(connectionFSM)
193+
195194
// Add control events and start the dialer
196195
connectionFSM.once('connected', () => connectionFSM.protect())
197196
connectionFSM.once('private', () => connectionFSM.encrypt())
@@ -252,15 +251,13 @@ class Queue {
252251
// If we're not muxed yet, add listeners
253252
connectionFSM.once('muxed', () => {
254253
this.blackListCount = 0 // reset blacklisting on good connections
255-
this.switch.connection.add(connectionFSM)
256254
queuedDial.connection = connectionFSM
257255
createConnectionWithProtocol(queuedDial)
258256
next()
259257
})
260258

261259
connectionFSM.once('unmuxed', () => {
262260
this.blackListCount = 0
263-
this.switch.connection.add(connectionFSM)
264261
queuedDial.connection = connectionFSM
265262
createConnectionWithProtocol(queuedDial)
266263
next()

src/transport.js

-1
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,6 @@ class TransportManager {
106106
}
107107

108108
peerInfo.connect(success.multiaddr)
109-
this.switch._peerBook.put(peerInfo)
110109
callback(null, success.conn)
111110
})
112111
}

test/dial-fsm.node.js

+4-4
Original file line numberDiff line numberDiff line change
@@ -240,8 +240,8 @@ describe('dialFSM', () => {
240240

241241
// Expect 4 `peer-mux-established` events
242242
expect(4).checks(() => {
243-
// Expect 4 `peer-mux-closed`, plus 1 hangup
244-
expect(5).checks(() => {
243+
// Expect 2 `peer-mux-closed`, plus 1 hangup
244+
expect(3).checks(() => {
245245
switchA.removeAllListeners('peer-mux-closed')
246246
switchB.removeAllListeners('peer-mux-closed')
247247
switchA.removeAllListeners('peer-mux-established')
@@ -286,8 +286,8 @@ describe('dialFSM', () => {
286286
switchA.handle(protocol, (_, conn) => { pull(conn, conn) })
287287
switchB.handle(protocol, (_, conn) => { pull(conn, conn) })
288288

289-
// 4 close checks and 1 hangup check
290-
expect(5).checks(() => {
289+
// 2 close checks and 1 hangup check
290+
expect(2).checks(() => {
291291
switchA.removeAllListeners('peer-mux-closed')
292292
switchB.removeAllListeners('peer-mux-closed')
293293
// restart the node for subsequent tests

0 commit comments

Comments
 (0)