Skip to content
This repository was archived by the owner on Apr 29, 2020. It is now read-only.

Commit 16ad893

Browse files
authored
Merge pull request #84 from ipfs/pull
refactor: use pull-streams
2 parents 0eabb18 + 716e738 commit 16ad893

File tree

5 files changed

+91
-97
lines changed

5 files changed

+91
-97
lines changed

gulpfile.js

+2-1
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ const multiaddr = require('multiaddr')
55
const Node = require('libp2p-ipfs').Node
66
const Peer = require('peer-info')
77
const Id = require('peer-id')
8+
const pull = require('pull-stream')
89

910
const sigServer = require('libp2p-webrtc-star/src/signalling-server')
1011
let sigS
@@ -21,7 +22,7 @@ gulp.task('libnode:start', (done) => {
2122
node = new Node(peer)
2223
node.start(() => {
2324
node.handle('/echo/1.0.0', (conn) => {
24-
conn.pipe(conn)
25+
pull(conn, conn)
2526
})
2627
ready()
2728
})

package.json

+12-9
Original file line numberDiff line numberDiff line change
@@ -32,22 +32,25 @@
3232
"homepage": "https://github.com/ipfs/js-libp2p-ipfs-browser#readme",
3333
"devDependencies": {
3434
"aegir": "^8.0.0",
35-
"bl": "^1.1.2",
3635
"chai": "^3.5.0",
3736
"gulp": "^3.9.1",
38-
"libp2p-ipfs": "^0.12.1",
37+
"libp2p-ipfs": "^0.13.0",
3938
"peer-id": "^0.7.0",
4039
"pre-commit": "^1.1.3",
40+
"pull-goodbye": "0.0.1",
41+
"pull-serializer": "^0.3.2",
42+
"pull-stream": "^3.4.5",
4143
"run-parallel": "^1.1.6",
4244
"webrtcsupport": "^2.2.0"
4345
},
4446
"dependencies": {
45-
"babel-runtime": "^6.9.0",
46-
"libp2p-spdy": "^0.8.1",
47-
"libp2p-swarm": "^0.22.2",
48-
"libp2p-webrtc-star": "^0.3.2",
49-
"libp2p-websockets": "^0.7.1",
50-
"mafmt": "^2.1.1",
47+
"babel-runtime": "^6.11.6",
48+
"libp2p-secio": "^0.4.2",
49+
"libp2p-spdy": "^0.9.0",
50+
"libp2p-swarm": "^0.23.0",
51+
"libp2p-webrtc-star": "^0.4.3",
52+
"libp2p-websockets": "^0.8.1",
53+
"mafmt": "^2.1.2",
5154
"multiaddr": "^2.0.2",
5255
"peer-book": "^0.3.0",
5356
"peer-id": "^0.7.0",
@@ -59,4 +62,4 @@
5962
"dignifiedquire <[email protected]>",
6063
"greenkeeperio-bot <[email protected]>"
6164
]
62-
}
65+
}

src/index.js

+4-2
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,12 @@
11
'use strict'
22

33
const Swarm = require('libp2p-swarm')
4-
const PeerInfo = require('peer-info')
5-
const PeerId = require('peer-id')
64
const WS = require('libp2p-websockets')
75
const WebRTCStar = require('libp2p-webrtc-star')
86
const spdy = require('libp2p-spdy')
7+
const secio = require('libp2p-secio')
8+
const PeerInfo = require('peer-info')
9+
const PeerId = require('peer-id')
910
const EE = require('events').EventEmitter
1011
const multiaddr = require('multiaddr')
1112
const PeerBook = require('peer-book')
@@ -37,6 +38,7 @@ exports.Node = function Node (pInfo, pBook) {
3738
this.swarm = new Swarm(pInfo)
3839
this.swarm.connection.addStreamMuxer(spdy)
3940
this.swarm.connection.reuse()
41+
this.swarm.connection.crypto(secio.tag, secio.encrypt)
4042

4143
this.swarm.on('peer-mux-established', (peerInfo) => {
4244
this.peerBook.put(peerInfo)

test/webrtc-star-only.js

+12-9
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ const multiaddr = require('multiaddr')
66
const PeerInfo = require('peer-info')
77
const peerId = require('peer-id')
88
const parallel = require('run-parallel')
9-
const bl = require('bl')
9+
const pull = require('pull-stream')
1010

1111
const libp2p = require('../src')
1212

@@ -47,7 +47,7 @@ describe('libp2p-ipfs-browser (webrtc only)', function () {
4747

4848
it('handle a protocol on the first node', (done) => {
4949
node2.handle('/echo/1.0.0', (conn) => {
50-
conn.pipe(conn)
50+
pull(conn, conn)
5151
})
5252
done()
5353
})
@@ -65,13 +65,16 @@ describe('libp2p-ipfs-browser (webrtc only)', function () {
6565
const peers2 = node2.peerBook.getAll()
6666
expect(err).to.not.exist
6767
expect(Object.keys(peers2)).to.have.length(1)
68-
conn.pipe(bl((err, data) => {
69-
expect(err).to.not.exist
70-
expect(data.toString()).to.equal(text)
71-
done()
72-
}))
73-
conn.write(text)
74-
conn.end()
68+
69+
pull(
70+
pull.values([Buffer(text)]),
71+
conn,
72+
pull.collect((err, data) => {
73+
expect(err).to.not.exist
74+
expect(data[0].toString()).to.equal(text)
75+
done()
76+
})
77+
)
7578
}
7679
})
7780
})

test/websockets-only.js

+61-76
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,20 @@
1+
/* eslint max-nested-callbacks: ["error", 8] */
12
/* eslint-env mocha */
23
'use strict'
34

45
const expect = require('chai').expect
56
const multiaddr = require('multiaddr')
67
const PeerInfo = require('peer-info')
78
const PeerId = require('peer-id')
9+
const pull = require('pull-stream')
10+
const goodbye = require('pull-goodbye')
11+
const serializer = require('pull-serializer')
812

913
const libp2p = require('../src')
1014
const rawPeer = require('./peer.json')
1115
const id = PeerId.createFromPrivKey(rawPeer.privKey)
12-
const bl = require('bl')
1316

1417
describe('libp2p-ipfs-browser (websockets only)', function () {
15-
this.timeout(20 * 1000)
16-
1718
let peerB
1819
let nodeA
1920

@@ -24,6 +25,10 @@ describe('libp2p-ipfs-browser (websockets only)', function () {
2425
done()
2526
})
2627

28+
after((done) => {
29+
nodeA.stop(done)
30+
})
31+
2732
it('create libp2pNode', () => {
2833
nodeA = new libp2p.Node()
2934
})
@@ -56,13 +61,16 @@ describe('libp2p-ipfs-browser (websockets only)', function () {
5661
const peers = nodeA.peerBook.getAll()
5762
expect(err).to.not.exist
5863
expect(Object.keys(peers)).to.have.length(1)
59-
conn.pipe(bl((err, data) => {
60-
expect(err).to.not.exist
61-
expect(data.toString()).to.equal('hey')
62-
done()
63-
}))
64-
conn.write('hey')
65-
conn.end()
64+
65+
pull(
66+
pull.values([Buffer('hey')]),
67+
conn,
68+
pull.collect((err, data) => {
69+
expect(err).to.not.exist
70+
expect(data).to.be.eql([Buffer('hey')])
71+
done()
72+
})
73+
)
6674
})
6775
})
6876

@@ -103,13 +111,16 @@ describe('libp2p-ipfs-browser (websockets only)', function () {
103111
const peers = nodeA.peerBook.getAll()
104112
expect(err).to.not.exist
105113
expect(Object.keys(peers)).to.have.length(1)
106-
conn.pipe(bl((err, data) => {
107-
expect(err).to.not.exist
108-
expect(data.toString()).to.equal('hey')
109-
done()
110-
}))
111-
conn.write('hey')
112-
conn.end()
114+
115+
pull(
116+
pull.values([Buffer('hey')]),
117+
conn,
118+
pull.collect((err, data) => {
119+
expect(err).to.not.exist
120+
expect(data).to.be.eql([Buffer('hey')])
121+
done()
122+
})
123+
)
113124
})
114125
})
115126

@@ -134,70 +145,44 @@ describe('libp2p-ipfs-browser (websockets only)', function () {
134145
it.skip('libp2p.dialById on Protocol nodeA to nodeB', (done) => {})
135146
it.skip('libp2p.hangupById nodeA to nodeB', (done) => {})
136147

137-
it('stress test: one big write', (done) => {
138-
const message = new Buffer(1000000).fill('a').toString('hex')
139-
140-
nodeA.dialByPeerInfo(peerB, '/echo/1.0.0', (err, conn) => {
141-
expect(err).to.not.exist
142-
143-
conn.write(message)
144-
conn.write('STOP')
145-
146-
let result = ''
147-
148-
conn.on('data', (data) => {
149-
if (data.toString() === 'STOP') {
150-
conn.end()
151-
return
152-
}
153-
result += data.toString()
154-
})
155-
156-
conn.on('end', () => {
157-
expect(result).to.equal(message)
158-
done()
148+
describe('stress', () => {
149+
it('one big write', (done) => {
150+
nodeA.dialByPeerInfo(peerB, '/echo/1.0.0', (err, conn) => {
151+
expect(err).to.not.exist
152+
const rawMessage = new Buffer(1000000).fill('a')
153+
154+
const s = serializer(goodbye({
155+
source: pull.values([rawMessage]),
156+
sink: pull.collect((err, results) => {
157+
expect(err).to.not.exist
158+
expect(results).to.have.length(1)
159+
expect(Buffer(results[0])).to.have.length(rawMessage.length)
160+
done()
161+
})
162+
}))
163+
pull(s, conn, s)
159164
})
160165
})
161-
})
162-
163-
it('stress test: many writes in 2 batches', (done) => {
164-
let expected = ''
165-
let counter = 0
166-
167-
nodeA.dialByPeerInfo(peerB, '/echo/1.0.0', (err, conn) => {
168-
expect(err).to.not.exist
169-
170-
while (++counter < 10000) {
171-
conn.write(`${counter} `)
172-
expected += `${counter} `
173-
}
174166

175-
while (++counter < 20000) {
176-
conn.write(`${counter} `)
177-
expected += `${counter} `
178-
}
179-
180-
setTimeout(() => {
181-
conn.write('STOP')
182-
}, 2000)
183-
184-
let result = ''
185-
conn.on('data', (data) => {
186-
if (data.toString() === 'STOP') {
187-
conn.end()
188-
return
189-
}
190-
result += data.toString()
191-
})
167+
it('many writes', (done) => {
168+
nodeA.dialByPeerInfo(peerB, '/echo/1.0.0', (err, conn) => {
169+
expect(err).to.not.exist
192170

193-
conn.on('end', () => {
194-
expect(result).to.equal(expected)
195-
done()
171+
const s = serializer(goodbye({
172+
source: pull(
173+
pull.infinite(),
174+
pull.take(1000),
175+
pull.map((val) => Buffer(val.toString()))
176+
),
177+
sink: pull.collect((err, result) => {
178+
expect(err).to.not.exist
179+
expect(result).to.have.length(1000)
180+
done()
181+
})
182+
}))
183+
184+
pull(s, conn, s)
196185
})
197186
})
198187
})
199-
200-
it('stop the libp2pnode', (done) => {
201-
nodeA.stop(done)
202-
})
203188
})

0 commit comments

Comments
 (0)