Skip to content
This repository was archived by the owner on Aug 23, 2019. It is now read-only.

Commit bb485e1

Browse files
committed
feat(dial): exposing connection handle flow
1 parent b4f3045 commit bb485e1

File tree

4 files changed

+169
-153
lines changed

4 files changed

+169
-153
lines changed

src/dial.js

+5-136
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,9 @@
11
'use strict'
22

3-
const multistream = require('multistream-select')
43
const Connection = require('interface-connection').Connection
54
const debug = require('debug')
65
const log = debug('libp2p:swarm:dial')
76

8-
const protocolMuxer = require('./protocol-muxer')
9-
107
module.exports = function dial (swarm) {
118
return (pi, protocol, callback) => {
129
if (typeof protocol === 'function') {
@@ -17,6 +14,7 @@ module.exports = function dial (swarm) {
1714
callback = callback || function noop () {}
1815

1916
const proxyConn = new Connection()
17+
const connHandler = swarm.connHandler(pi, protocol, proxyConn)
2018

2119
const b58Id = pi.id.toB58String()
2220
log('dialing %s', b58Id)
@@ -27,54 +25,22 @@ module.exports = function dial (swarm) {
2725
if (err) {
2826
return callback(err)
2927
}
30-
gotWarmedUpConn(conn)
28+
connHandler.handleNew(conn, callback)
3129
})
3230
} else {
3331
const conn = swarm.conns[b58Id]
3432
swarm.conns[b58Id] = undefined
35-
gotWarmedUpConn(conn)
33+
connHandler.handleWarmedUp(conn, callback)
3634
}
3735
} else {
3836
if (!protocol) {
3937
return callback()
4038
}
41-
gotMuxer(swarm.muxedConns[b58Id].muxer)
39+
connHandler.gotMuxer(swarm.muxedConns[b58Id].muxer, callback)
4240
}
4341

4442
return proxyConn
4543

46-
function gotWarmedUpConn (conn) {
47-
conn.setPeerInfo(pi)
48-
attemptMuxerUpgrade(conn, (err, muxer) => {
49-
if (!protocol) {
50-
if (err) {
51-
swarm.conns[b58Id] = conn
52-
}
53-
return callback()
54-
}
55-
56-
if (err) {
57-
// couldn't upgrade to Muxer, it is ok
58-
protocolHandshake(conn, protocol, callback)
59-
} else {
60-
gotMuxer(muxer)
61-
}
62-
})
63-
}
64-
65-
function gotMuxer (muxer) {
66-
if (swarm.identify) {
67-
// TODO: Consider:
68-
// 1. overload getPeerInfo
69-
// 2. exec identify (through getPeerInfo)
70-
// 3. update the peerInfo that is already stored in the conn
71-
}
72-
73-
openConnInMuxedConn(muxer, (conn) => {
74-
protocolHandshake(conn, protocol, callback)
75-
})
76-
}
77-
7844
function attemptDial (pi, cb) {
7945
const tKeys = swarm.availableTransports(pi)
8046

@@ -93,106 +59,9 @@ module.exports = function dial (swarm) {
9359
return nextTransport(tKeys.shift())
9460
}
9561

96-
cryptoDial()
97-
98-
function cryptoDial () {
99-
const ms = new multistream.Dialer()
100-
ms.handle(conn, (err) => {
101-
if (err) {
102-
return cb(err)
103-
}
104-
105-
const id = swarm._peerInfo.id
106-
log('selecting crypto: %s', swarm.crypto.tag)
107-
ms.select(swarm.crypto.tag, (err, conn) => {
108-
if (err) {
109-
return cb(err)
110-
}
111-
112-
const wrapped = swarm.crypto.encrypt(id, id.privKey, conn)
113-
cb(null, wrapped)
114-
})
115-
})
116-
}
62+
cb(null, conn)
11763
})
11864
}
11965
}
120-
121-
function attemptMuxerUpgrade (conn, cb) {
122-
const muxers = Object.keys(swarm.muxers)
123-
if (muxers.length === 0) {
124-
return cb(new Error('no muxers available'))
125-
}
126-
127-
// 1. try to handshake in one of the muxers available
128-
// 2. if succeeds
129-
// - add the muxedConn to the list of muxedConns
130-
// - add incomming new streams to connHandler
131-
132-
const ms = new multistream.Dialer()
133-
ms.handle(conn, (err) => {
134-
if (err) {
135-
return callback(new Error('multistream not supported'))
136-
}
137-
138-
nextMuxer(muxers.shift())
139-
})
140-
141-
function nextMuxer (key) {
142-
log('selecting %s', key)
143-
ms.select(key, (err, conn) => {
144-
if (err) {
145-
if (muxers.length === 0) {
146-
cb(new Error('could not upgrade to stream muxing'))
147-
} else {
148-
nextMuxer(muxers.shift())
149-
}
150-
return
151-
}
152-
153-
const muxedConn = swarm.muxers[key].dialer(conn)
154-
swarm.muxedConns[b58Id] = {}
155-
swarm.muxedConns[b58Id].muxer = muxedConn
156-
// should not be needed anymore - swarm.muxedConns[b58Id].conn = conn
157-
158-
swarm.emit('peer-mux-established', pi)
159-
160-
muxedConn.once('close', () => {
161-
const b58Str = pi.id.toB58String()
162-
delete swarm.muxedConns[b58Str]
163-
pi.disconnect()
164-
swarm._peerBook.get(b58Str).disconnect()
165-
swarm.emit('peer-mux-closed', pi)
166-
})
167-
168-
// For incoming streams, in case identify is on
169-
muxedConn.on('stream', (conn) => {
170-
protocolMuxer(swarm.protocols, conn)
171-
})
172-
173-
cb(null, muxedConn)
174-
})
175-
}
176-
}
177-
178-
function openConnInMuxedConn (muxer, cb) {
179-
cb(muxer.newStream())
180-
}
181-
182-
function protocolHandshake (conn, protocol, cb) {
183-
const ms = new multistream.Dialer()
184-
ms.handle(conn, (err) => {
185-
if (err) {
186-
return callback(err)
187-
}
188-
ms.select(protocol, (err, conn) => {
189-
if (err) {
190-
return callback(err)
191-
}
192-
proxyConn.setInnerConn(conn)
193-
callback(null, proxyConn)
194-
})
195-
})
196-
}
19766
}
19867
}

src/handler.js

+162
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,162 @@
1+
'use strict'
2+
3+
const multistream = require('multistream-select')
4+
const protocolMuxer = require('./protocol-muxer')
5+
6+
const debug = require('debug')
7+
const log = debug('libp2p:swarm:handle')
8+
9+
module.exports = function process (swarm) {
10+
return (pi, protocol, proxyConn) => {
11+
const b58Id = pi.id.toB58String()
12+
13+
function cryptoDial (conn, cb) {
14+
const ms = new multistream.Dialer()
15+
ms.handle(conn, (err) => {
16+
if (err) {
17+
return cb(err)
18+
}
19+
20+
const id = swarm._peerInfo.id
21+
log('selecting crypto: %s', swarm.crypto.tag)
22+
ms.select(swarm.crypto.tag, (err, conn) => {
23+
if (err) {
24+
return cb(err)
25+
}
26+
27+
const wrapped = swarm.crypto.encrypt(id, id.privKey, conn)
28+
cb(null, wrapped)
29+
})
30+
})
31+
}
32+
33+
function gotWarmedUpConn (conn, cb) {
34+
conn.setPeerInfo(pi)
35+
attemptMuxerUpgrade(conn, (err, muxer) => {
36+
if (!protocol) {
37+
if (err) {
38+
swarm.conns[b58Id] = conn
39+
}
40+
return cb()
41+
}
42+
43+
if (err) {
44+
// couldn't upgrade to Muxer, it is ok
45+
protocolHandshake(conn, protocol, cb)
46+
} else {
47+
gotMuxer(muxer, cb)
48+
}
49+
})
50+
}
51+
52+
function attemptMuxerUpgrade (conn, cb) {
53+
const muxers = Object.keys(swarm.muxers)
54+
if (muxers.length === 0) {
55+
return cb(new Error('no muxers available'))
56+
}
57+
58+
// 1. try to handshake in one of the muxers available
59+
// 2. if succeeds
60+
// - add the muxedConn to the list of muxedConns
61+
// - add incomming new streams to connHandler
62+
63+
const ms = new multistream.Dialer()
64+
ms.handle(conn, (err) => {
65+
if (err) {
66+
return cb(new Error('multistream not supported'))
67+
}
68+
69+
nextMuxer(muxers.shift())
70+
})
71+
72+
function nextMuxer (key) {
73+
log('selecting %s', key)
74+
ms.select(key, (err, conn) => {
75+
if (err) {
76+
if (muxers.length === 0) {
77+
cb(new Error('could not upgrade to stream muxing'))
78+
} else {
79+
nextMuxer(muxers.shift())
80+
}
81+
return
82+
}
83+
84+
const muxedConn = swarm.muxers[key].dialer(conn)
85+
swarm.muxedConns[b58Id] = {}
86+
swarm.muxedConns[b58Id].muxer = muxedConn
87+
// should not be needed anymore - swarm.muxedConns[b58Id].conn = conn
88+
89+
swarm.emit('peer-mux-established', pi)
90+
91+
muxedConn.once('close', () => {
92+
const b58Str = pi.id.toB58String()
93+
delete swarm.muxedConns[b58Str]
94+
pi.disconnect()
95+
swarm._peerBook.get(b58Str).disconnect()
96+
swarm.emit('peer-mux-closed', pi)
97+
})
98+
99+
// For incoming streams, in case identify is on
100+
muxedConn.on('stream', (conn) => {
101+
protocolMuxer(swarm.protocols, conn)
102+
})
103+
104+
cb(null, muxedConn)
105+
})
106+
}
107+
}
108+
109+
function openConnInMuxedConn (muxer, cb) {
110+
cb(muxer.newStream())
111+
}
112+
113+
function protocolHandshake (conn, protocol, cb) {
114+
const ms = new multistream.Dialer()
115+
ms.handle(conn, (err) => {
116+
if (err) {
117+
return cb(err)
118+
}
119+
ms.select(protocol, (err, conn) => {
120+
if (err) {
121+
return cb(err)
122+
}
123+
proxyConn.setInnerConn(conn)
124+
cb(null, proxyConn)
125+
})
126+
})
127+
}
128+
129+
function handleNew (conn, cb) {
130+
cryptoDial(conn, (err, conn) => {
131+
if (err) {
132+
log(err)
133+
return cb(err)
134+
}
135+
gotWarmedUpConn(conn, cb)
136+
})
137+
}
138+
139+
function handleWarmedUp (conn, cb) {
140+
gotWarmedUpConn(conn, cb)
141+
}
142+
143+
function gotMuxer (muxer, cb) {
144+
if (swarm.identify) {
145+
// TODO: Consider:
146+
// 1. overload getPeerInfo
147+
// 2. exec identify (through getPeerInfo)
148+
// 3. update the peerInfo that is already stored in the conn
149+
}
150+
151+
openConnInMuxedConn(muxer, (conn) => {
152+
protocolHandshake(conn, protocol, cb)
153+
})
154+
}
155+
156+
return {
157+
handleNew: handleNew,
158+
handleWarmedUp: handleWarmedUp,
159+
gotMuxer: gotMuxer
160+
}
161+
}
162+
}

src/index.js

+2
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ const each = require('async/each')
66
const series = require('async/series')
77
const transport = require('./transport')
88
const connection = require('./connection')
9+
const handler = require('./handler')
910
const dial = require('./dial')
1011
const protocolMuxer = require('./protocol-muxer')
1112
const plaintext = require('./plaintext')
@@ -56,6 +57,7 @@ function Swarm (peerInfo, peerBook) {
5657

5758
this.transport = transport(this)
5859
this.connection = connection(this)
60+
this.connHandler = handler(this)
5961

6062
this.availableTransports = (pi) => {
6163
const myAddrs = pi.multiaddrs.toArray()

0 commit comments

Comments
 (0)