Skip to content

Commit 31df87d

Browse files
committed
feat: support multiple peer and content routing modules
1 parent a8e07f9 commit 31df87d

7 files changed

+276
-22
lines changed

src/config.js

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,10 +14,10 @@ const OptionsSchema = Joi.object({
1414
connProtector: Joi.object().keys({
1515
protect: Joi.func().required()
1616
}).unknown(),
17-
contentRouting: Joi.object(),
17+
contentRouting: Joi.array().items(Joi.object()).allow(null),
1818
dht: ModuleSchema.allow(null),
1919
peerDiscovery: Joi.array().items(ModuleSchema).allow(null),
20-
peerRouting: Joi.object(),
20+
peerRouting: Joi.array().items(Joi.object()).allow(null),
2121
streamMuxer: Joi.array().items(ModuleSchema).allow(null),
2222
transport: Joi.array().items(ModuleSchema).min(1).required()
2323
}).required(),

src/content-routing.js

Lines changed: 57 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,71 @@
11
'use strict'
22

3+
const tryEach = require('async/tryEach')
4+
const parallel = require('async/parallel')
5+
36
module.exports = (node) => {
7+
const routers = node._modules.contentRouting || []
8+
9+
// If we have the dht, make it first
10+
if (node._dht) {
11+
routers.unshift(node._dht)
12+
}
13+
414
return {
15+
/**
16+
* Iterates over all content routers in series to find providers of the given key.
17+
* Once a content router succeeds, iteration will stop.
18+
*
19+
* @param {CID} key The CID key of the content to find
20+
* @param {number} timeout How long the query should run
21+
* @param {function(Error, Result<Array>)} callback
22+
* @returns {void}
23+
*/
524
findProviders: (key, timeout, callback) => {
6-
if (!node._dht) {
7-
return callback(new Error('DHT is not available'))
25+
if (routers.length === 0) {
26+
return callback(new Error('No content routers available'))
827
}
928

10-
node._dht.findProviders(key, timeout, callback)
29+
const tasks = routers.map((router) => {
30+
return (cb) => router.findProviders(key, timeout, (err, results) => {
31+
if (err) {
32+
return cb(err)
33+
}
34+
35+
// If we don't have any results, we need to provide an error to keep trying
36+
if (!results || Object.keys(results).length === 0) {
37+
return cb(true, null)
38+
}
39+
40+
cb(null, results)
41+
})
42+
})
43+
44+
tryEach(tasks, (err, results) => {
45+
if (err && err !== true) {
46+
return callback(err)
47+
}
48+
results = results || []
49+
callback(null, results)
50+
})
1151
},
52+
53+
/**
54+
* Iterates over all content routers in parallel to notify it is
55+
* a provider of the given key.
56+
*
57+
* @param {CID} key The CID key of the content to find
58+
* @param {function(Error)} callback
59+
* @returns {void}
60+
*/
1261
provide: (key, callback) => {
13-
if (!node._dht) {
14-
return callback(new Error('DHT is not available'))
62+
if (routers.length === 0) {
63+
return callback(new Error('No content routers available'))
1564
}
1665

17-
node._dht.provide(key, callback)
66+
parallel(routers.map((router) => {
67+
return (cb) => router.provide(key, cb)
68+
}), callback)
1869
}
1970
}
2071
}

src/index.js

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -99,9 +99,9 @@ class Node extends EventEmitter {
9999
}
100100

101101
// Attach remaining APIs
102-
// If peer or content routing modules have been provided, use those, otherwise use the dht
103-
this.peerRouting = this._modules.peerRouting || peerRouting(this)
104-
this.contentRouting = this._modules.contentRouting || contentRouting(this)
102+
// peer and content routing will automatically get modules from _modules and _dht
103+
this.peerRouting = peerRouting(this)
104+
this.contentRouting = contentRouting(this)
105105
this.dht = dht(this)
106106

107107
this._getPeerInfo = getPeerInfo(this)

src/peer-routing.js

Lines changed: 40 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,50 @@
11
'use strict'
22

3+
const tryEach = require('async/tryEach')
4+
35
module.exports = (node) => {
6+
const routers = node._modules.peerRouting || []
7+
8+
// If we have the dht, make it first
9+
if (node._dht) {
10+
routers.unshift(node._dht)
11+
}
12+
413
return {
14+
/**
15+
* Iterates over all peer routers in series to find the given peer.
16+
*
17+
* @param {String} id The id of the peer to find
18+
* @param {function(Error, Result<Array>)}
19+
* @returns {void}
20+
*/
521
findPeer: (id, callback) => {
6-
if (!node._dht) {
7-
return callback(new Error('DHT is not available'))
22+
if (routers.length === 0) {
23+
return callback(new Error('No peer routers available'))
824
}
925

10-
node._dht.findPeer(id, callback)
26+
const tasks = routers.map((router) => {
27+
return (cb) => router.findPeer(id, (err, result) => {
28+
if (err) {
29+
return cb(err)
30+
}
31+
32+
// If we don't have a result, we need to provide an error to keep trying
33+
if (!result || Object.keys(result).length === 0) {
34+
return cb(true, null)
35+
}
36+
37+
cb(null, result)
38+
})
39+
})
40+
41+
tryEach(tasks, (err, results) => {
42+
if (err && err !== true) {
43+
return callback(err)
44+
}
45+
results = results || null
46+
callback(null, results)
47+
})
1148
}
1249
}
1350
}

test/config.spec.js

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -109,8 +109,8 @@ describe('configuration', () => {
109109
modules: {
110110
transport: [ WS ],
111111
peerDiscovery: [ Bootstrap ],
112-
peerRouting: peerRouter,
113-
contentRouting: contentRouter
112+
peerRouting: [ peerRouter ],
113+
contentRouting: [ contentRouter ]
114114
},
115115
config: {
116116
peerDiscovery: {
@@ -123,8 +123,8 @@ describe('configuration', () => {
123123
}
124124

125125
expect(validateConfig(options).modules).to.deep.include({
126-
peerRouting: peerRouter,
127-
contentRouting: contentRouter
126+
peerRouting: [ peerRouter ],
127+
contentRouting: [ contentRouter ]
128128
})
129129
})
130130

test/content-routing.node.js

Lines changed: 97 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -140,7 +140,7 @@ describe('.contentRouting', () => {
140140
nodeA = new Node({
141141
peerInfo,
142142
modules: {
143-
contentRouting: delegate
143+
contentRouting: [ delegate ]
144144
},
145145
config: {
146146
relay: {
@@ -157,7 +157,8 @@ describe('.contentRouting', () => {
157157
], done)
158158
})
159159

160-
afterEach(() => nock.cleanAll)
160+
after((done) => nodeA.stop(done))
161+
afterEach(() => nock.cleanAll())
161162

162163
describe('provide', () => {
163164
it('should use the delegate router to provide', (done) => {
@@ -272,4 +273,98 @@ describe('.contentRouting', () => {
272273
})
273274
})
274275
})
276+
277+
describe('via the dht and a delegate', () => {
278+
let nodeA
279+
let delegate
280+
281+
before((done) => {
282+
waterfall([
283+
(cb) => {
284+
createPeerInfo(cb)
285+
},
286+
// Create the node using the delegate
287+
(peerInfo, cb) => {
288+
delegate = new DelegatedContentRouter(peerInfo.id, {
289+
host: '0.0.0.0',
290+
protocol: 'http',
291+
port: 60197
292+
}, [
293+
ma('/ip4/0.0.0.0/tcp/60194')
294+
])
295+
nodeA = new Node({
296+
peerInfo,
297+
modules: {
298+
contentRouting: [ delegate ]
299+
},
300+
config: {
301+
relay: {
302+
enabled: true,
303+
hop: {
304+
enabled: true,
305+
active: false
306+
}
307+
},
308+
EXPERIMENTAL: {
309+
dht: true
310+
}
311+
}
312+
})
313+
nodeA.start(cb)
314+
}
315+
], done)
316+
})
317+
318+
after((done) => nodeA.stop(done))
319+
320+
describe('provide', () => {
321+
it('should use both the dht and delegate router to provide', (done) => {
322+
const dhtStub = sinon.stub(nodeA._dht, 'provide').callsFake(() => {})
323+
const delegateStub = sinon.stub(delegate, 'provide').callsFake(() => {
324+
expect(dhtStub.calledOnce).to.equal(true)
325+
expect(delegateStub.calledOnce).to.equal(true)
326+
delegateStub.restore()
327+
dhtStub.restore()
328+
done()
329+
})
330+
nodeA.contentRouting.provide()
331+
})
332+
})
333+
334+
describe('findProviders', () => {
335+
it('should only use the dht if it finds providers', (done) => {
336+
const results = [true]
337+
const dhtStub = sinon.stub(nodeA._dht, 'findProviders').callsArgWith(2, null, results)
338+
const delegateStub = sinon.stub(delegate, 'findProviders').throws(() => {
339+
return new Error('the delegate should not have been called')
340+
})
341+
342+
nodeA.contentRouting.findProviders('a cid', 5000, (err, results) => {
343+
expect(err).to.not.exist()
344+
expect(results).to.equal(results)
345+
expect(dhtStub.calledOnce).to.equal(true)
346+
expect(delegateStub.notCalled).to.equal(true)
347+
delegateStub.restore()
348+
dhtStub.restore()
349+
done()
350+
})
351+
})
352+
353+
it('should use the delegate if the dht fails to find providers', (done) => {
354+
const results = [true]
355+
const dhtStub = sinon.stub(nodeA._dht, 'findProviders').callsArgWith(2, null, [])
356+
const delegateStub = sinon.stub(delegate, 'findProviders').callsArgWith(2, null, results)
357+
358+
nodeA.contentRouting.findProviders('a cid', 5000, (err, results) => {
359+
expect(err).to.not.exist()
360+
expect(results).to.deep.equal(results)
361+
expect(dhtStub.calledOnce).to.equal(true)
362+
expect(delegateStub.calledOnce).to.equal(true)
363+
delegateStub.restore()
364+
dhtStub.restore()
365+
done()
366+
})
367+
})
368+
})
369+
})
275370
})

test/peer-routing.node.js

Lines changed: 73 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,7 @@ describe('.peerRouting', () => {
111111
})
112112
createNode('/ip4/0.0.0.0/tcp/0', {
113113
modules: {
114-
peerRouting: delegate
114+
peerRouting: [ delegate ]
115115
}
116116
}, (err, node) => {
117117
expect(err).to.not.exist()
@@ -122,7 +122,8 @@ describe('.peerRouting', () => {
122122
], done)
123123
})
124124

125-
afterEach(nock.cleanAll)
125+
after((done) => nodeA.stop(done))
126+
afterEach(() => nock.cleanAll())
126127

127128
it('should use the delegate router to find peers', (done) => {
128129
const stub = sinon.stub(delegate, 'findPeer').callsFake(() => {
@@ -192,4 +193,74 @@ describe('.peerRouting', () => {
192193
})
193194
})
194195
})
196+
197+
describe('via the dht and a delegate', () => {
198+
let nodeA
199+
let delegate
200+
201+
before((done) => {
202+
parallel([
203+
// Create the node using the delegate
204+
(cb) => {
205+
delegate = new DelegatedPeerRouter({
206+
host: 'ipfs.io',
207+
protocol: 'https',
208+
port: '443'
209+
})
210+
createNode('/ip4/0.0.0.0/tcp/0', {
211+
modules: {
212+
peerRouting: [ delegate ]
213+
},
214+
config: {
215+
EXPERIMENTAL: {
216+
dht: true
217+
}
218+
}
219+
}, (err, node) => {
220+
expect(err).to.not.exist()
221+
nodeA = node
222+
nodeA.start(cb)
223+
})
224+
}
225+
], done)
226+
})
227+
228+
after((done) => nodeA.stop(done))
229+
230+
describe('findPeer', () => {
231+
it('should only use the dht if it find the peer', (done) => {
232+
const results = [true]
233+
const dhtStub = sinon.stub(nodeA._dht, 'findPeer').callsArgWith(1, null, results)
234+
const delegateStub = sinon.stub(delegate, 'findPeer').throws(() => {
235+
return new Error('the delegate should not have been called')
236+
})
237+
238+
nodeA.peerRouting.findPeer('a peer id', (err, results) => {
239+
expect(err).to.not.exist()
240+
expect(results).to.equal(results)
241+
expect(dhtStub.calledOnce).to.equal(true)
242+
expect(delegateStub.notCalled).to.equal(true)
243+
delegateStub.restore()
244+
dhtStub.restore()
245+
done()
246+
})
247+
})
248+
249+
it('should use the delegate if the dht fails to find the peer', (done) => {
250+
const results = [true]
251+
const dhtStub = sinon.stub(nodeA._dht, 'findPeer').callsArgWith(1, null, undefined)
252+
const delegateStub = sinon.stub(delegate, 'findPeer').callsArgWith(1, null, results)
253+
254+
nodeA.peerRouting.findPeer('a peer id', (err, results) => {
255+
expect(err).to.not.exist()
256+
expect(results).to.deep.equal(results)
257+
expect(dhtStub.calledOnce).to.equal(true)
258+
expect(delegateStub.calledOnce).to.equal(true)
259+
delegateStub.restore()
260+
dhtStub.restore()
261+
done()
262+
})
263+
})
264+
})
265+
})
195266
})

0 commit comments

Comments
 (0)