diff --git a/.gitignore b/.gitignore index fca3404..e662837 100644 --- a/.gitignore +++ b/.gitignore @@ -9,6 +9,7 @@ logs *.log coverage +.nyc_output/ # Runtime data pids diff --git a/README.md b/README.md index 568a35c..11953dc 100644 --- a/README.md +++ b/README.md @@ -75,6 +75,11 @@ If defined, `options` should be an object with the following keys and respective ] ``` +### Private Networks + +libp2p-switch supports private networking. In order to enabled private networks, the `switch.protector` must be +set and must contain a `protect` method. You can see an example of this in the [private network +tests]([./test/pnet.node.js]). ## API diff --git a/package.json b/package.json index b137346..52d17a6 100644 --- a/package.json +++ b/package.json @@ -43,6 +43,7 @@ "dirty-chai": "^2.0.1", "gulp": "^3.9.1", "libp2p-mplex": "~0.7.0", + "libp2p-pnet": "~0.1.0", "libp2p-secio": "~0.10.0", "libp2p-spdy": "~0.12.1", "libp2p-tcp": "~0.12.0", diff --git a/src/dial.js b/src/dial.js index 272f2b4..a622b49 100644 --- a/src/dial.js +++ b/src/dial.js @@ -143,9 +143,18 @@ class Dialer { if (!this.protocol) { return cb() } + // If we have a muxer, create a new stream, otherwise it's a standard connection - const connection = muxer.newStream ? muxer.newStream() : muxer - this._performProtocolHandshake(connection, cb) + if (muxer.newStream) { + muxer.newStream((err, conn) => { + if (err) return cb(err) + + this._performProtocolHandshake(conn, cb) + }) + return + } + + this._performProtocolHandshake(muxer, cb) } ], (err, connection) => { callback(err, connection) @@ -181,8 +190,12 @@ class Dialer { this._attemptDial(cb) }, (baseConnection, cb) => { + // Create a private connection if it's needed + this._createPrivateConnection(baseConnection, cb) + }, + (connection, cb) => { // Add the Switch's crypt encryption to the connection - this._encryptConnection(baseConnection, cb) + this._encryptConnection(connection, cb) } ], (err, encryptedConnection) => { if (err) { @@ -193,6 +206,31 @@ class Dialer { }) } + /** + * If the switch has a private network protector, `switch.protector`, its `protect` + * method will be called with the given connection. The resulting, wrapped connection + * will be returned via the callback. + * + * @param {Connection} connection The connection to protect + * @param {function(Error, Connection)} callback + * @returns {void} + */ + _createPrivateConnection (connection, callback) { + if (this.switch.protector === null) { + return callback(null, connection) + } + + // If the switch has a protector, be private + const protectedConnection = this.switch.protector.protect(connection, (err) => { + if (err) { + return callback(err) + } + + protectedConnection.setPeerInfo(this.peerInfo) + callback(null, protectedConnection) + }) + } + /** * If the given PeerId key, `b58Id`, has an existing muxed connection * it will be returned via the callback, otherwise the connection diff --git a/src/errors.js b/src/errors.js new file mode 100644 index 0000000..bbc1193 --- /dev/null +++ b/src/errors.js @@ -0,0 +1,3 @@ +'use strict' + +module.exports.PROTECTOR_REQUIRED = 'No protector provided with private network enforced' diff --git a/src/index.js b/src/index.js index 1b096bd..a3db99d 100644 --- a/src/index.js +++ b/src/index.js @@ -12,6 +12,7 @@ const plaintext = require('./plaintext') const Observer = require('./observer') const Stats = require('./stats') const assert = require('assert') +const Errors = require('./errors') class Switch extends EE { constructor (peerInfo, peerBook, options) { @@ -52,6 +53,8 @@ class Switch extends EE { // Crypto details this.crypto = plaintext + this.protector = this._options.protector || null + this.transport = new TransportManager(this) this.connection = new ConnectionManager(this) @@ -197,3 +200,4 @@ class Switch extends EE { } module.exports = Switch +module.exports.errors = Errors diff --git a/src/transport.js b/src/transport.js index c9b7882..86ebb87 100644 --- a/src/transport.js +++ b/src/transport.js @@ -86,11 +86,24 @@ class TransportManager { * @returns {void} */ listen (key, options, handler, callback) { + let muxerHandler + // if no handler is passed, we pass conns to protocolMuxer if (!handler) { handler = this.switch.protocolMuxer(key) } + // If we have a protector make the connection private + if (this.switch.protector) { + muxerHandler = handler + handler = (parentConnection) => { + const connection = this.switch.protector.protect(parentConnection, () => { + // If we get an error here, we should stop talking to this peer + muxerHandler(connection) + }) + } + } + const transport = this.switch.transports[key] const multiaddrs = TransportManager.dialables( transport, diff --git a/test/node.js b/test/node.js index d4fabd5..4564c37 100644 --- a/test/node.js +++ b/test/node.js @@ -1,5 +1,6 @@ 'use strict' +require('./pnet.node') require('./transports.node') require('./stream-muxers.node') require('./secio.node') diff --git a/test/pnet.node.js b/test/pnet.node.js new file mode 100644 index 0000000..036eb80 --- /dev/null +++ b/test/pnet.node.js @@ -0,0 +1,155 @@ +/* eslint-env mocha */ +'use strict' + +const chai = require('chai') +const dirtyChai = require('dirty-chai') +const expect = chai.expect +chai.use(dirtyChai) +const parallel = require('async/parallel') +const TCP = require('libp2p-tcp') +const multiplex = require('libp2p-mplex') +const pull = require('pull-stream') +const PeerBook = require('peer-book') +const secio = require('libp2p-secio') +const Protector = require('libp2p-pnet') + +const utils = require('./utils') +const createInfos = utils.createInfos +const tryEcho = utils.tryEcho +const Switch = require('../src') + +const generatePSK = Protector.generate + +const psk = Buffer.alloc(95) +const psk2 = Buffer.alloc(95) +generatePSK(psk) +generatePSK(psk2) + +describe('Private Network', function () { + this.timeout(20 * 1000) + + let switchA + let switchB + let switchC + let switchD + + before((done) => createInfos(4, (err, infos) => { + expect(err).to.not.exist() + + const peerA = infos[0] + const peerB = infos[1] + const peerC = infos[2] + const peerD = infos[3] + + peerA.multiaddrs.add('/ip4/127.0.0.1/tcp/9001') + peerB.multiaddrs.add('/ip4/127.0.0.1/tcp/9002') + peerC.multiaddrs.add('/ip4/127.0.0.1/tcp/9003') + peerD.multiaddrs.add('/ip4/127.0.0.1/tcp/9004') + + switchA = new Switch(peerA, new PeerBook(), { + protector: new Protector(psk) + }) + switchB = new Switch(peerB, new PeerBook(), { + protector: new Protector(psk) + }) + // alternative way to add the protector + switchC = new Switch(peerC, new PeerBook()) + switchC.protector = new Protector(psk) + // Create a switch on a different private network + switchD = new Switch(peerD, new PeerBook(), { + protector: new Protector(psk2) + }) + + switchA.transport.add('tcp', new TCP()) + switchB.transport.add('tcp', new TCP()) + switchC.transport.add('tcp', new TCP()) + switchD.transport.add('tcp', new TCP()) + + switchA.connection.crypto(secio.tag, secio.encrypt) + switchB.connection.crypto(secio.tag, secio.encrypt) + switchC.connection.crypto(secio.tag, secio.encrypt) + switchD.connection.crypto(secio.tag, secio.encrypt) + + switchA.connection.addStreamMuxer(multiplex) + switchB.connection.addStreamMuxer(multiplex) + switchC.connection.addStreamMuxer(multiplex) + switchD.connection.addStreamMuxer(multiplex) + + parallel([ + (cb) => switchA.transport.listen('tcp', {}, null, cb), + (cb) => switchB.transport.listen('tcp', {}, null, cb), + (cb) => switchC.transport.listen('tcp', {}, null, cb), + (cb) => switchD.transport.listen('tcp', {}, null, cb) + ], done) + })) + + after(function (done) { + this.timeout(3 * 1000) + parallel([ + (cb) => switchA.stop(cb), + (cb) => switchB.stop(cb), + (cb) => switchC.stop(cb), + (cb) => switchD.stop(cb) + ], done) + }) + + it('should handle + dial on protocol', (done) => { + switchB.handle('/abacaxi/1.0.0', (protocol, conn) => pull(conn, conn)) + + switchA.dial(switchB._peerInfo, '/abacaxi/1.0.0', (err, conn) => { + expect(err).to.not.exist() + expect(Object.keys(switchA.muxedConns).length).to.equal(1) + tryEcho(conn, done) + }) + }) + + it('should dial to warm conn', (done) => { + switchB.dial(switchA._peerInfo, (err) => { + expect(err).to.not.exist() + expect(Object.keys(switchB.conns).length).to.equal(0) + expect(Object.keys(switchB.muxedConns).length).to.equal(1) + done() + }) + }) + + it('should dial on protocol, reuseing warmed conn', (done) => { + switchA.handle('/papaia/1.0.0', (protocol, conn) => pull(conn, conn)) + + switchB.dial(switchA._peerInfo, '/papaia/1.0.0', (err, conn) => { + expect(err).to.not.exist() + expect(Object.keys(switchB.conns).length).to.equal(0) + expect(Object.keys(switchB.muxedConns).length).to.equal(1) + tryEcho(conn, done) + }) + }) + + it('should enable identify to reuse incomming muxed conn', (done) => { + switchA.connection.reuse() + switchC.connection.reuse() + + switchC.dial(switchA._peerInfo, (err) => { + expect(err).to.not.exist() + setTimeout(() => { + expect(Object.keys(switchC.muxedConns).length).to.equal(1) + expect(Object.keys(switchA.muxedConns).length).to.equal(2) + done() + }, 500) + }) + }) + + /** + * This test is being skipped until a related issue with pull-reader overreading can be resolved + * Currently this test will time out instead of returning an error properly. This is the same issue + * in ipfs/interop, https://github.com/ipfs/interop/pull/24/commits/179978996ecaef39e78384091aa9669dcdb94cc0 + */ + it('should fail to talk to a switch on a different private network', function (done) { + switchD.dial(switchA._peerInfo, (err) => { + expect(err).to.exist() + }) + + // A successful connection will return in well under 2 seconds + setTimeout(() => { + done() + }, 2000) + }) +})