Skip to content
This repository was archived by the owner on Aug 29, 2023. It is now read-only.

Commit 5e89a26

Browse files
dignifiedquiredaviddias
authored andcommitted
feat(pull): migration to pull-streams
1 parent caa8d6d commit 5e89a26

File tree

7 files changed

+348
-417
lines changed

7 files changed

+348
-417
lines changed

README.md

+22-15
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
1-
js-libp2p-tcp
2-
===============
1+
# js-libp2p-tcp
32

43
[![](https://img.shields.io/badge/made%20by-Protocol%20Labs-blue.svg?style=flat-square)](http://ipn.io)
54
[![](https://img.shields.io/badge/freenode-%23ipfs-blue.svg?style=flat-square)](http://webchat.freenode.net/?channels=%23ipfs)
@@ -21,39 +20,47 @@ js-libp2p-tcp
2120
`multiaddr`. This small shim will enable libp2p to use other different
2221
transports.
2322

23+
**Note:** This module uses [pull-streams](https://pull-stream.github.io) for all stream based interfaces.
24+
2425
## Example
2526

2627
```js
2728
const TCP = require('libp2p-tcp')
2829
const multiaddr = require('multiaddr')
30+
const pull = require('pull-stream')
2931

3032
const mh1 = multiaddr('/ip4/127.0.0.1/tcp/9090')
3133
const mh2 = multiaddr('/ip6/::/tcp/9092')
3234

3335
const tcp = new TCP()
3436

35-
var listener = tcp.createListener(mh1, function handler (socket) {
36-
console.log('connection')
37-
socket.end('bye')
37+
const listener = tcp.createListener(mh1, (socket) => {
38+
console.log('new connection opened')
39+
pull(
40+
pull.values(['hello']),
41+
socket
42+
)
3843
})
3944

40-
listener.listen(mh1, function ready () {
41-
console.log('ready')
45+
listener.listen(() => {
46+
console.log('listening')
4247

43-
const client = tcp.dial(mh1)
44-
client.pipe(process.stdout)
45-
client.on('end', () => {
46-
listener.close()
47-
})
48+
pull(
49+
tcp.dial(mh1),
50+
pull.log,
51+
pull.onEnd(() => {
52+
tcp.close()
53+
})
54+
)
4855
})
4956
```
5057

5158
outputs
5259

5360
```
54-
ready
55-
connection
56-
bye
61+
listening
62+
new connection opened
63+
hello
5764
```
5865

5966
## Installation

package.json

+7-7
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
"name": "libp2p-tcp",
33
"version": "0.7.4",
44
"description": "Node.js implementation of the TCP module that libp2p uses, which implements the interface-connection and interface-transport interfaces",
5-
"main": "lib/index.js",
5+
"main": "src/index.js",
66
"jsnext:main": "src/index.js",
77
"scripts": {
88
"lint": "aegir-lint",
@@ -32,19 +32,19 @@
3232
},
3333
"homepage": "https://github.com/diasdavid/js-libp2p-tcp",
3434
"devDependencies": {
35-
"aegir": "^4.0.0",
35+
"aegir": "^6.0.0",
3636
"chai": "^3.5.0",
37-
"interface-transport": "^0.2.0",
38-
"pre-commit": "^1.1.2",
39-
"tape": "^4.5.1"
37+
"lodash.isfunction": "^3.0.8",
38+
"pre-commit": "^1.1.2"
4039
},
4140
"dependencies": {
4241
"interface-connection": "0.1.8",
4342
"ip-address": "^5.8.0",
4443
"lodash.contains": "^2.4.3",
4544
"mafmt": "^2.1.2",
4645
"multiaddr": "^2.0.2",
47-
"run-parallel": "^1.1.6"
46+
"pull": "^2.1.1",
47+
"stream-to-pull-stream": "^1.7.0"
4848
},
4949
"contributors": [
5050
"David Dias <[email protected]>",
@@ -53,4 +53,4 @@
5353
"Stephen Whitmore <[email protected]>",
5454
"dignifiedquire <[email protected]>"
5555
]
56-
}
56+
}

src/get-multiaddr.js

+22
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
'use strict'
2+
3+
const multiaddr = require('multiaddr')
4+
const Address6 = require('ip-address').Address6
5+
6+
module.exports = (socket) => {
7+
var mh
8+
9+
if (socket.remoteFamily === 'IPv6') {
10+
var addr = new Address6(socket.remoteAddress)
11+
if (addr.v4) {
12+
var ip4 = addr.to4().correctForm()
13+
mh = multiaddr('/ip4/' + ip4 + '/tcp/' + socket.remotePort)
14+
} else {
15+
mh = multiaddr('/ip6/' + socket.remoteAddress + '/tcp/' + socket.remotePort)
16+
}
17+
} else {
18+
mh = multiaddr('/ip4/' + socket.remoteAddress + '/tcp/' + socket.remotePort)
19+
}
20+
21+
return mh
22+
}

src/index.js

+21-189
Original file line numberDiff line numberDiff line change
@@ -1,198 +1,48 @@
11
'use strict'
22

3-
const debug = require('debug')
4-
const log = debug('libp2p:tcp')
5-
const tcp = require('net')
6-
const multiaddr = require('multiaddr')
7-
const Address6 = require('ip-address').Address6
3+
const net = require('net')
4+
const toPull = require('stream-to-pull-stream')
85
const mafmt = require('mafmt')
9-
// const parallel = require('run-parallel')
106
const contains = require('lodash.contains')
11-
const os = require('os')
7+
const isFunction = require('lodash.isfunction')
128
const Connection = require('interface-connection').Connection
9+
const debug = require('debug')
10+
const log = debug('libp2p:tcp')
1311

14-
exports = module.exports = TCP
15-
16-
const IPFS_CODE = 421
17-
const CLOSE_TIMEOUT = 2000
18-
19-
function TCP () {
20-
if (!(this instanceof TCP)) {
21-
return new TCP()
22-
}
12+
const createListener = require('./listener')
2313

24-
this.dial = function (ma, options, callback) {
25-
if (typeof options === 'function') {
26-
callback = options
14+
module.exports = class TCP {
15+
dial (ma, options, cb) {
16+
if (isFunction(options)) {
17+
cb = options
2718
options = {}
2819
}
2920

30-
if (!callback) {
31-
callback = function noop () {}
21+
if (!cb) {
22+
cb = () => {}
3223
}
3324

34-
const socket = tcp.connect(ma.toOptions())
35-
const conn = new Connection(socket)
36-
37-
socket.on('timeout', () => {
38-
conn.emit('timeout')
39-
})
40-
41-
socket.once('error', (err) => {
42-
callback(err)
43-
})
44-
45-
socket.on('connect', () => {
46-
callback(null, conn)
47-
conn.emit('connect')
48-
})
25+
const cOpts = ma.toOptions()
26+
log('Connecting to %s %s', cOpts.port, cOpts.host)
27+
const socket = toPull.duplex(net.connect(cOpts, cb))
4928

50-
conn.getObservedAddrs = (cb) => {
29+
socket.getObservedAddrs = (cb) => {
5130
return cb(null, [ma])
5231
}
5332

54-
return conn
33+
return new Connection(socket)
5534
}
5635

57-
this.createListener = (options, handler) => {
58-
if (typeof options === 'function') {
36+
createListener (options, handler) {
37+
if (isFunction(options)) {
5938
handler = options
6039
options = {}
6140
}
6241

63-
const listener = tcp.createServer((socket) => {
64-
const conn = new Connection(socket)
65-
66-
conn.getObservedAddrs = (cb) => {
67-
return cb(null, [getMultiaddr(socket)])
68-
}
69-
handler(conn)
70-
})
71-
72-
let ipfsId
73-
let listeningMultiaddr
74-
75-
listener._listen = listener.listen
76-
listener.listen = (ma, callback) => {
77-
listeningMultiaddr = ma
78-
if (contains(ma.protoNames(), 'ipfs')) {
79-
ipfsId = ma.stringTuples().filter((tuple) => {
80-
if (tuple[0] === IPFS_CODE) {
81-
return true
82-
}
83-
})[0][1]
84-
listeningMultiaddr = ma.decapsulate('ipfs')
85-
}
86-
87-
listener._listen(listeningMultiaddr.toOptions(), callback)
88-
}
89-
90-
listener._close = listener.close
91-
listener.close = (options, callback) => {
92-
if (typeof options === 'function') {
93-
callback = options
94-
options = {}
95-
}
96-
if (!callback) { callback = function noop () {} }
97-
if (!options) { options = {} }
98-
99-
let closed = false
100-
listener._close(callback)
101-
listener.once('close', () => {
102-
closed = true
103-
})
104-
setTimeout(() => {
105-
if (closed) {
106-
return
107-
}
108-
log('unable to close graciously, destroying conns')
109-
Object.keys(listener.__connections).forEach((key) => {
110-
log('destroying %s', key)
111-
listener.__connections[key].destroy()
112-
})
113-
}, options.timeout || CLOSE_TIMEOUT)
114-
}
115-
116-
// Keep track of open connections to destroy in case of timeout
117-
listener.__connections = {}
118-
listener.on('connection', (socket) => {
119-
const key = `${socket.remoteAddress}:${socket.remotePort}`
120-
listener.__connections[key] = socket
121-
122-
socket.on('close', () => {
123-
delete listener.__connections[key]
124-
})
125-
})
126-
127-
listener.getAddrs = (callback) => {
128-
const multiaddrs = []
129-
const address = listener.address()
130-
131-
// Because TCP will only return the IPv6 version
132-
// we need to capture from the passed multiaddr
133-
if (listeningMultiaddr.toString().indexOf('ip4') !== -1) {
134-
let m = listeningMultiaddr.decapsulate('tcp')
135-
m = m.encapsulate('/tcp/' + address.port)
136-
if (ipfsId) {
137-
m = m.encapsulate('/ipfs/' + ipfsId)
138-
}
139-
140-
if (m.toString().indexOf('0.0.0.0') !== -1) {
141-
const netInterfaces = os.networkInterfaces()
142-
Object.keys(netInterfaces).forEach((niKey) => {
143-
netInterfaces[niKey].forEach((ni) => {
144-
if (ni.family === 'IPv4') {
145-
multiaddrs.push(multiaddr(m.toString().replace('0.0.0.0', ni.address)))
146-
}
147-
})
148-
})
149-
} else {
150-
multiaddrs.push(m)
151-
}
152-
}
153-
154-
if (address.family === 'IPv6') {
155-
let ma = multiaddr('/ip6/' + address.address + '/tcp/' + address.port)
156-
if (ipfsId) {
157-
ma = ma.encapsulate('/ipfs/' + ipfsId)
158-
}
159-
160-
multiaddrs.push(ma)
161-
}
162-
163-
callback(null, multiaddrs)
164-
}
165-
166-
return listener
167-
/*
168-
listener.listen(m.toOptions(), () => {
169-
// Node.js likes to convert addr to IPv6 (when 0.0.0.0 for e.g)
170-
const address = listener.address()
171-
if (m.toString().indexOf('ip4')) {
172-
m = m.decapsulate('tcp')
173-
m = m.encapsulate('/tcp/' + address.port)
174-
if (ipfsHashId) {
175-
m = m.encapsulate('/ipfs/' + ipfsHashId)
176-
}
177-
freshMultiaddrs.push(m)
178-
}
179-
180-
if (address.family === 'IPv6') {
181-
let mh = multiaddr('/ip6/' + address.address + '/tcp/' + address.port)
182-
if (ipfsHashId) {
183-
mh = mh.encapsulate('/ipfs/' + ipfsHashId)
184-
}
185-
186-
freshMultiaddrs.push(mh)
187-
}
188-
189-
cb()
190-
})
191-
listeners.push(listener)
192-
*/
42+
return createListener(handler)
19343
}
19444

195-
this.filter = (multiaddrs) => {
45+
filter (multiaddrs) {
19646
if (!Array.isArray(multiaddrs)) {
19747
multiaddrs = [multiaddrs]
19848
}
@@ -204,21 +54,3 @@ function TCP () {
20454
})
20555
}
20656
}
207-
208-
function getMultiaddr (socket) {
209-
var mh
210-
211-
if (socket.remoteFamily === 'IPv6') {
212-
var addr = new Address6(socket.remoteAddress)
213-
if (addr.v4) {
214-
var ip4 = addr.to4().correctForm()
215-
mh = multiaddr('/ip4/' + ip4 + '/tcp/' + socket.remotePort)
216-
} else {
217-
mh = multiaddr('/ip6/' + socket.remoteAddress + '/tcp/' + socket.remotePort)
218-
}
219-
} else {
220-
mh = multiaddr('/ip4/' + socket.remoteAddress + '/tcp/' + socket.remotePort)
221-
}
222-
223-
return mh
224-
}

0 commit comments

Comments
 (0)