Skip to content

Commit 5f9a4f6

Browse files
authored
fix: improve stopping logic (libp2p#324)
1 parent aeadc1b commit 5f9a4f6

File tree

6 files changed

+144
-107
lines changed

6 files changed

+144
-107
lines changed

src/connection/index.js

+2-1
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,8 @@ class ConnectionFSM extends BaseConnection {
9595
UPGRADING: { // Attempting to upgrade the connection with muxers
9696
stop: 'CONNECTED', // If we cannot mux, stop upgrading
9797
done: 'MUXED',
98-
error: 'ERRORED'
98+
error: 'ERRORED',
99+
disconnect: 'DISCONNECTING'
99100
},
100101
MUXED: {
101102
disconnect: 'DISCONNECTING'

src/dialer/index.js

+14-4
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,8 @@ const {
1212
module.exports = function (_switch) {
1313
const dialQueueManager = new DialQueueManager(_switch)
1414

15-
_switch.state.on('STOPPING:enter', abort)
15+
_switch.state.on('STARTED:enter', start)
16+
_switch.state.on('STOPPING:enter', stop)
1617

1718
/**
1819
* @param {DialRequest} dialRequest
@@ -34,14 +35,24 @@ module.exports = function (_switch) {
3435
dialQueueManager.add({ peerInfo, protocol, useFSM, callback })
3536
}
3637

38+
/**
39+
* Starts the `DialQueueManager`
40+
*
41+
* @param {function} callback
42+
*/
43+
function start (callback) {
44+
dialQueueManager.start()
45+
callback()
46+
}
47+
3748
/**
3849
* Aborts all dials that are queued. This should
3950
* only be used when the Switch is being stopped
4051
*
4152
* @param {function} callback
4253
*/
43-
function abort (callback) {
44-
dialQueueManager.abort()
54+
function stop (callback) {
55+
dialQueueManager.stop()
4556
callback()
4657
}
4758

@@ -77,7 +88,6 @@ module.exports = function (_switch) {
7788
return {
7889
dial,
7990
dialFSM,
80-
abort,
8191
clearBlacklist,
8292
BLACK_LIST_ATTEMPTS: isNaN(_switch._options.blackListAttempts) ? BLACK_LIST_ATTEMPTS : _switch._options.blackListAttempts,
8393
BLACK_LIST_TTL: isNaN(_switch._options.blacklistTTL) ? BLACK_LIST_TTL : _switch._options.blacklistTTL,

src/dialer/queueManager.js

+12-1
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ class DialQueueManager {
2222
this._queues = {}
2323
this.switch = _switch
2424
this._cleanInterval = retimer(this._clean.bind(this), QUARTER_HOUR)
25+
this.start()
2526
}
2627

2728
/**
@@ -66,13 +67,21 @@ class DialQueueManager {
6667
this._cleanInterval.reschedule(QUARTER_HOUR)
6768
}
6869

70+
/**
71+
* Allows the `DialQueueManager` to execute dials
72+
*/
73+
start () {
74+
this.isRunning = true
75+
}
76+
6977
/**
7078
* Iterates over all items in the DialerQueue
7179
* and executes there callback with an error.
7280
*
7381
* This causes the entire DialerQueue to be drained
7482
*/
75-
abort () {
83+
stop () {
84+
this.isRunning = false
7685
// Clear the general queue
7786
this._queue.clear()
7887
// Clear the cold call queue
@@ -140,6 +149,8 @@ class DialQueueManager {
140149
* Will execute up to `MAX_PARALLEL_DIALS` dials
141150
*/
142151
run () {
152+
if (!this.isRunning) return
153+
143154
if (this._dialingQueues.size < this.switch.dialer.MAX_PARALLEL_DIALS) {
144155
let nextQueue = { done: true }
145156
// Check the queue first and fall back to the cold call queue

src/index.js

+6-2
Original file line numberDiff line numberDiff line change
@@ -232,7 +232,8 @@ class Switch extends EventEmitter {
232232
}, (err) => {
233233
if (err) {
234234
log.error(err)
235-
return this.emit('error', err)
235+
this.emit('error', err)
236+
return this.state('stop')
236237
}
237238
this.state('done')
238239
})
@@ -250,7 +251,10 @@ class Switch extends EventEmitter {
250251
(cb) => {
251252
each(this.transports, (transport, cb) => {
252253
each(transport.listeners, (listener, cb) => {
253-
listener.close(cb)
254+
listener.close((err) => {
255+
if (err) log.error(err)
256+
cb()
257+
})
254258
}, cb)
255259
}, cb)
256260
},

test/circuit-relay.node.js

+96-94
Original file line numberDiff line numberDiff line change
@@ -24,124 +24,126 @@ const switchOptions = {
2424
}
2525

2626
describe(`circuit`, function () {
27-
let swarmA // TCP and WS
28-
let swarmB // WS
29-
let swarmC // no transports
30-
let dialSpyA
27+
describe('basic', () => {
28+
let swarmA // TCP and WS
29+
let swarmB // WS
30+
let swarmC // no transports
31+
let dialSpyA
3132

32-
before((done) => createInfos(3, (err, infos) => {
33-
expect(err).to.not.exist()
33+
before((done) => createInfos(3, (err, infos) => {
34+
expect(err).to.not.exist()
3435

35-
const peerA = infos[0]
36-
const peerB = infos[1]
37-
const peerC = infos[2]
36+
const peerA = infos[0]
37+
const peerB = infos[1]
38+
const peerC = infos[2]
3839

39-
peerA.multiaddrs.add('/ip4/0.0.0.0/tcp/9001')
40-
peerB.multiaddrs.add('/ip4/127.0.0.1/tcp/9002/ws')
40+
peerA.multiaddrs.add('/ip4/0.0.0.0/tcp/9001')
41+
peerB.multiaddrs.add('/ip4/127.0.0.1/tcp/9002/ws')
4142

42-
swarmA = new Swarm(peerA, new PeerBook(), switchOptions)
43-
swarmB = new Swarm(peerB, new PeerBook())
44-
swarmC = new Swarm(peerC, new PeerBook())
43+
swarmA = new Swarm(peerA, new PeerBook(), switchOptions)
44+
swarmB = new Swarm(peerB, new PeerBook())
45+
swarmC = new Swarm(peerC, new PeerBook())
4546

46-
swarmA.transport.add('tcp', new TCP())
47-
swarmA.transport.add('ws', new WS())
48-
swarmB.transport.add('ws', new WS())
47+
swarmA.transport.add('tcp', new TCP())
48+
swarmA.transport.add('ws', new WS())
49+
swarmB.transport.add('ws', new WS())
4950

50-
dialSpyA = sinon.spy(swarmA.transport, 'dial')
51+
dialSpyA = sinon.spy(swarmA.transport, 'dial')
5152

52-
done()
53-
}))
53+
done()
54+
}))
5455

55-
after((done) => {
56-
parallel([
57-
(cb) => swarmA.stop(cb),
58-
(cb) => swarmB.stop(cb)
59-
], done)
60-
})
56+
after((done) => {
57+
parallel([
58+
(cb) => swarmA.stop(cb),
59+
(cb) => swarmB.stop(cb)
60+
], done)
61+
})
6162

62-
it('circuit not enabled and all transports failed', (done) => {
63-
swarmA.dial(swarmC._peerInfo, (err, conn) => {
64-
expect(err).to.exist()
65-
expect(err).to.match(/Circuit not enabled and all transports failed to dial peer/)
66-
expect(conn).to.not.exist()
67-
done()
63+
it('circuit not enabled and all transports failed', (done) => {
64+
swarmA.dial(swarmC._peerInfo, (err, conn) => {
65+
expect(err).to.exist()
66+
expect(err).to.match(/Circuit not enabled and all transports failed to dial peer/)
67+
expect(conn).to.not.exist()
68+
done()
69+
})
6870
})
69-
})
7071

71-
it('.enableCircuitRelay', () => {
72-
swarmA.connection.enableCircuitRelay({ enabled: true })
73-
expect(Object.keys(swarmA.transports).length).to.equal(3)
72+
it('.enableCircuitRelay', () => {
73+
swarmA.connection.enableCircuitRelay({ enabled: true })
74+
expect(Object.keys(swarmA.transports).length).to.equal(3)
7475

75-
swarmB.connection.enableCircuitRelay({ enabled: true })
76-
expect(Object.keys(swarmB.transports).length).to.equal(2)
77-
})
76+
swarmB.connection.enableCircuitRelay({ enabled: true })
77+
expect(Object.keys(swarmB.transports).length).to.equal(2)
78+
})
7879

79-
it('listed on the transports map', () => {
80-
expect(swarmA.transports.Circuit).to.exist()
81-
expect(swarmB.transports.Circuit).to.exist()
82-
})
80+
it('listed on the transports map', () => {
81+
expect(swarmA.transports.Circuit).to.exist()
82+
expect(swarmB.transports.Circuit).to.exist()
83+
})
8384

84-
it('add /p2p-circuit addrs on start', (done) => {
85-
parallel([
86-
(cb) => swarmA.start(cb),
87-
(cb) => swarmB.start(cb)
88-
], (err) => {
89-
expect(err).to.not.exist()
90-
expect(swarmA._peerInfo.multiaddrs.toArray().filter((a) => a.toString()
91-
.includes(`/p2p-circuit`)).length).to.be.at.least(3)
92-
// ensure swarmA has had 0.0.0.0 replaced in the addresses
93-
expect(swarmA._peerInfo.multiaddrs.toArray().filter((a) => a.toString()
94-
.includes(`/0.0.0.0`)).length).to.equal(0)
95-
expect(swarmB._peerInfo.multiaddrs.toArray().filter((a) => a.toString()
96-
.includes(`/p2p-circuit`)).length).to.be.at.least(2)
97-
done()
85+
it('add /p2p-circuit addrs on start', (done) => {
86+
parallel([
87+
(cb) => swarmA.start(cb),
88+
(cb) => swarmB.start(cb)
89+
], (err) => {
90+
expect(err).to.not.exist()
91+
expect(swarmA._peerInfo.multiaddrs.toArray().filter((a) => a.toString()
92+
.includes(`/p2p-circuit`)).length).to.be.at.least(3)
93+
// ensure swarmA has had 0.0.0.0 replaced in the addresses
94+
expect(swarmA._peerInfo.multiaddrs.toArray().filter((a) => a.toString()
95+
.includes(`/0.0.0.0`)).length).to.equal(0)
96+
expect(swarmB._peerInfo.multiaddrs.toArray().filter((a) => a.toString()
97+
.includes(`/p2p-circuit`)).length).to.be.at.least(2)
98+
done()
99+
})
98100
})
99-
})
100101

101-
it('dial circuit only once', (done) => {
102-
swarmA._peerInfo.multiaddrs.clear()
103-
swarmA._peerInfo.multiaddrs
104-
.add(`/dns4/wrtc-star.discovery.libp2p.io/tcp/443/wss/p2p-webrtc-star`)
102+
it('dial circuit only once', (done) => {
103+
swarmA._peerInfo.multiaddrs.clear()
104+
swarmA._peerInfo.multiaddrs
105+
.add(`/dns4/wrtc-star.discovery.libp2p.io/tcp/443/wss/p2p-webrtc-star`)
105106

106-
swarmA.dial(swarmC._peerInfo, (err, conn) => {
107-
expect(err).to.exist()
108-
expect(err).to.match(/No available transports to dial peer/)
109-
expect(conn).to.not.exist()
110-
expect(dialSpyA.callCount).to.be.eql(1)
111-
done()
107+
swarmA.dial(swarmC._peerInfo, (err, conn) => {
108+
expect(err).to.exist()
109+
expect(err).to.match(/No available transports to dial peer/)
110+
expect(conn).to.not.exist()
111+
expect(dialSpyA.callCount).to.be.eql(1)
112+
done()
113+
})
112114
})
113-
})
114115

115-
it('dial circuit last', (done) => {
116-
const peerC = swarmC._peerInfo
117-
peerC.multiaddrs.clear()
118-
peerC.multiaddrs.add(`/p2p-circuit/ipfs/ABCD`)
119-
peerC.multiaddrs.add(`/ip4/127.0.0.1/tcp/9998/ipfs/ABCD`)
120-
peerC.multiaddrs.add(`/ip4/127.0.0.1/tcp/9999/ws/ipfs/ABCD`)
121-
122-
swarmA.dial(peerC, (err, conn) => {
123-
expect(err).to.exist()
124-
expect(conn).to.not.exist()
125-
expect(dialSpyA.lastCall.args[0]).to.be.eql('Circuit')
126-
done()
116+
it('dial circuit last', (done) => {
117+
const peerC = swarmC._peerInfo
118+
peerC.multiaddrs.clear()
119+
peerC.multiaddrs.add(`/p2p-circuit/ipfs/ABCD`)
120+
peerC.multiaddrs.add(`/ip4/127.0.0.1/tcp/9998/ipfs/ABCD`)
121+
peerC.multiaddrs.add(`/ip4/127.0.0.1/tcp/9999/ws/ipfs/ABCD`)
122+
123+
swarmA.dial(peerC, (err, conn) => {
124+
expect(err).to.exist()
125+
expect(conn).to.not.exist()
126+
expect(dialSpyA.lastCall.args[0]).to.be.eql('Circuit')
127+
done()
128+
})
127129
})
128-
})
129130

130-
it('should not try circuit if no transports enabled', (done) => {
131-
swarmC.dial(swarmA._peerInfo, (err, conn) => {
132-
expect(err).to.exist()
133-
expect(conn).to.not.exist()
131+
it('should not try circuit if no transports enabled', (done) => {
132+
swarmC.dial(swarmA._peerInfo, (err, conn) => {
133+
expect(err).to.exist()
134+
expect(conn).to.not.exist()
134135

135-
expect(err).to.match(/No transports registered, dial not possible/)
136-
done()
136+
expect(err).to.match(/No transports registered, dial not possible/)
137+
done()
138+
})
137139
})
138-
})
139140

140-
it('should not dial circuit if other transport succeed', (done) => {
141-
swarmA.dial(swarmB._peerInfo, (err) => {
142-
expect(err).not.to.exist()
143-
expect(dialSpyA.lastCall.args[0]).to.not.be.eql('Circuit')
144-
done()
141+
it('should not dial circuit if other transport succeed', (done) => {
142+
swarmA.dial(swarmB._peerInfo, (err) => {
143+
expect(err).not.to.exist()
144+
expect(dialSpyA.lastCall.args[0]).to.not.be.eql('Circuit')
145+
done()
146+
})
145147
})
146148
})
147149

test/dial-fsm.node.js

+14-5
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ const dirtyChai = require('dirty-chai')
77
const expect = chai.expect
88
chai.use(require('chai-checkmark'))
99
chai.use(dirtyChai)
10+
const sinon = require('sinon')
1011
const PeerBook = require('peer-book')
1112
const parallel = require('async/parallel')
1213
const WS = require('libp2p-websockets')
@@ -348,16 +349,24 @@ describe('dialFSM', () => {
348349

349350
switchA.dialFSM(switchB._peerInfo, '/abort-queue/1.0.0', (err, connFSM) => {
350351
expect(err).to.not.exist()
351-
connFSM._state.on('UPGRADING:enter', (cb) => {
352-
expect(2).checks(done)
352+
// 2 conn aborts, 1 close, and 1 stop
353+
expect(4).checks(done)
354+
355+
connFSM.once('close', (err) => {
356+
expect(err).to.not.exist().mark()
357+
})
358+
359+
sinon.stub(connFSM, '_onUpgrading').callsFake(() => {
353360
switchA.dialFSM(switchB._peerInfo, '/abort-queue/1.0.0', (err) => {
354-
expect(err).to.exist().mark()
361+
expect(err.code).to.eql('DIAL_ABORTED').mark()
355362
})
356363
switchA.dialFSM(switchB._peerInfo, '/abort-queue/1.0.0', (err) => {
357-
expect(err).to.exist().mark()
364+
expect(err.code).to.eql('DIAL_ABORTED').mark()
358365
})
359366

360-
switchA.stop(cb)
367+
switchA.stop((err) => {
368+
expect(err).to.not.exist().mark()
369+
})
361370
})
362371
})
363372
})

0 commit comments

Comments
 (0)