Skip to content
This repository was archived by the owner on Aug 23, 2019. It is now read-only.

[WIP] move to pull-streams #2

Merged
merged 2 commits into from
Sep 6, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 41 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,43 @@
libp2p-identify
===============
# js-libp2p-identify

[![](https://img.shields.io/badge/made%20by-Protocol%20Labs-blue.svg?style=flat-square)](http://ipn.io)
[![](https://img.shields.io/badge/project-IPFS-blue.svg?style=flat-square)](http://ipfs.io/)
[![](https://img.shields.io/badge/freenode-%23ipfs-blue.svg?style=flat-square)](http://webchat.freenode.net/?channels=%23ipfs)
[![Coverage Status](https://coveralls.io/repos/github/libp2p/js-libp2p-identify/badge.svg?branch=master)](https://coveralls.io/github/libp2p/js-libp2p-identify?branch=master)
[![Travis CI](https://travis-ci.org/libp2p/js-libp2p-identify.svg?branch=master)](https://travis-ci.org/libp2p/js-libp2p-identify)
[![Circle CI](https://circleci.com/gh/libp2p/js-libp2p-identify.svg?style=svg)](https://circleci.com/gh/libp2p/js-libp2p-identify)
[![Dependency Status](https://david-dm.org/libp2p/js-libp2p-identify.svg?style=flat-square)](https://david-dm.org/libp2p/js-libp2p-identify) [![js-standard-style](https://img.shields.io/badge/code%20style-standard-brightgreen.svg?style=flat-square)](https://github.com/feross/standard)

> libp2p Identify Protocol

## Description

Identify is a STUN protocol, used by libp2p-swarmin order to broadcast and learn about the `ip:port` pairs a specific peer is available through and to know when a new stream muxer is established, so a conn can be reused.

## How does it work

Best way to understand the current design is through this issue: https://github.com/libp2p/js-libp2p-swarm/issues/78

### This module uses `pull-streams`

We expose a streaming interface based on `pull-streams`, rather then on the Node.js core streams implementation (aka Node.js streams). `pull-streams` offers us a better mechanism for error handling and flow control guarantees. If you would like to know more about why we did this, see the discussion at this [issue](https://github.com/ipfs/js-ipfs/issues/362).

You can learn more about pull-streams at:

- [The history of Node.js streams, nodebp April 2014](https://www.youtube.com/watch?v=g5ewQEuXjsQ)
- [The history of streams, 2016](http://dominictarr.com/post/145135293917/history-of-streams)
- [pull-streams, the simple streaming primitive](http://dominictarr.com/post/149248845122/pull-streams-pull-streams-are-a-very-simple)
- [pull-streams documentation](https://pull-stream.github.io/)

#### Converting `pull-streams` to Node.js Streams

If you are a Node.js streams user, you can convert a pull-stream to a Node.js stream using the module [`pull-stream-to-stream`](https://github.com/dominictarr/pull-stream-to-stream), giving you an instance of a Node.js stream that is linked to the pull-stream. For example:

```js
const pullToStream = require('pull-stream-to-stream')

const nodeStreamInstance = pullToStream(pullStreamInstance)
// nodeStreamInstance is an instance of a Node.js Stream
```

To learn more about this utility, visit https://pull-stream.github.io/#pull-stream-to-stream.
14 changes: 8 additions & 6 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@
"lint": "aegir-lint",
"build": "aegir-build",
"test": "aegir-test",
"test:node": "aegir-test test:node",
"test:browser": "aegir-test test:browser",
"test:node": "aegir-test --env node",
"test:browser": "aegir-test --env browser",
"release": "aegir-release",
"release-minor": "aegir-release --type minor",
"release-major": "aegir-release --type major",
Expand Down Expand Up @@ -39,17 +39,19 @@
"homepage": "https://github.com/libp2p/js-libp2p-identify#readme",
"devDependencies": {
"aegir": "^8.0.0",
"chai": "^3.5.0",
"pre-commit": "^1.1.3",
"stream-pair": "^1.0.3"
"pull-pair": "^1.1.0"
},
"dependencies": {
"length-prefixed-stream": "^1.5.0",
"multiaddr": "^2.0.2",
"peer-id": "^0.7.0",
"peer-info": "^0.7.0",
"protocol-buffers": "^3.1.6"
"protocol-buffers": "^3.1.6",
"pull-length-prefixed": "^1.0.0",
"pull-stream": "^3.4.3"
},
"contributors": [
"David Dias <[email protected]>"
]
}
}
49 changes: 49 additions & 0 deletions src/dialer.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
'use strict'
const PeerInfo = require('peer-info')
const PeerId = require('peer-id')
const multiaddr = require('multiaddr')
const pull = require('pull-stream')
const lp = require('pull-length-prefixed')

const msg = require('./message')

module.exports = (conn, callback) => {
pull(
conn,
lp.decode(),
pull.take(1),
pull.collect((err, data) => {
if (err) {
return callback(err)
}

const input = msg.decode(data[0])

const id = PeerId.createFromPubKey(input.publicKey)
const info = new PeerInfo(id)
input.listenAddrs
.map(multiaddr)
.forEach((ma) => info.multiaddr.add(ma))

callback(null, info, getObservedAddrs(input))
})
)
}

function getObservedAddrs (input) {
if (!hasObservedAddr(input)) {
return []
}

let addrs = input.observedAddr

if (!Array.isArray(input.observedAddr)) {
addrs = [addrs]
}

return addrs.map((oa) => multiaddr(oa))
}

function hasObservedAddr (input) {
return input.observedAddr && input.observedAddr.length > 0
}
87 changes: 2 additions & 85 deletions src/index.js
Original file line number Diff line number Diff line change
@@ -1,89 +1,6 @@
/*
* Identify is one of the protocols swarms speaks in order to
* broadcast and learn about the ip:port pairs a specific peer
* is available through and to know when a new stream muxer is
* established, so a conn can be reused
*/

'use strict'

const fs = require('fs')
const path = require('path')
const PeerInfo = require('peer-info')
const PeerId = require('peer-id')
const multiaddr = require('multiaddr')
const bl = require('bl')

const lpstream = require('length-prefixed-stream')
const protobuf = require('protocol-buffers')
const schema = fs.readFileSync(path.join(__dirname, 'identify.proto'))
const idPb = protobuf(schema)

exports = module.exports
exports.multicodec = '/ipfs/id/1.0.0'

exports.exec = (conn, callback) => {
const decode = lpstream.decode()

conn
.pipe(decode)
.pipe(bl((err, data) => {
if (err) {
return callback(err)
}
const msg = idPb.Identify.decode(data)
let observedAddrs = []
if (hasObservedAddr(msg)) {
if (!Array.isArray(msg.observedAddr)) {
msg.observedAddr = [msg.observedAddr]
}
observedAddrs = msg.observedAddr.map((oa) => {
return multiaddr(oa)
})
}

const pId = PeerId.createFromPubKey(msg.publicKey)
const pInfo = new PeerInfo(pId)
msg.listenAddrs.forEach((ma) => {
pInfo.multiaddr.add(multiaddr(ma))
})

callback(null, pInfo, observedAddrs)
}))

conn.end()
}

exports.handler = (pInfoSelf) => {
return (conn) => {
// send what I see from the other + my Info
const encode = lpstream.encode()

encode.pipe(conn)

conn.getObservedAddrs((err, observedAddrs) => {
if (err) { return }
observedAddrs = observedAddrs[0]

let publicKey = new Buffer(0)
if (pInfoSelf.id.pubKey) {
publicKey = pInfoSelf.id.pubKey.bytes
}

const msgSend = idPb.Identify.encode({
protocolVersion: 'ipfs/0.1.0',
agentVersion: 'na',
publicKey: publicKey,
listenAddrs: pInfoSelf.multiaddrs.map((ma) => ma.buffer),
observedAddr: observedAddrs ? observedAddrs.buffer : new Buffer('')
})

encode.write(msgSend)
encode.end()
})
}
}

function hasObservedAddr (msg) {
return msg.observedAddr && msg.observedAddr.length > 0
}
exports.listener = require('./listener')
exports.dialer = require('./dialer')
33 changes: 33 additions & 0 deletions src/listener.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
'use strict'

const pull = require('pull-stream')
const lp = require('pull-length-prefixed')

const msg = require('./message')

module.exports = (conn, pInfoSelf) => {
// send what I see from the other + my Info
conn.getObservedAddrs((err, observedAddrs) => {
if (err) { return }
observedAddrs = observedAddrs[0]

let publicKey = new Buffer(0)
if (pInfoSelf.id.pubKey) {
publicKey = pInfoSelf.id.pubKey.bytes
}

const msgSend = msg.encode({
protocolVersion: 'ipfs/0.1.0',
agentVersion: 'na',
publicKey: publicKey,
listenAddrs: pInfoSelf.multiaddrs.map((ma) => ma.buffer),
observedAddr: observedAddrs ? observedAddrs.buffer : new Buffer('')
})

pull(
pull.values([msgSend]),
lp.encode(),
conn
)
})
}
File renamed without changes.
8 changes: 8 additions & 0 deletions src/message/index.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
'use strict'

const fs = require('fs')
const path = require('path')
const protobuf = require('protocol-buffers')
const schema = fs.readFileSync(path.join(__dirname, 'identify.proto'))

module.exports = protobuf(schema).Identify
56 changes: 56 additions & 0 deletions test/dialer.spec.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/* eslint-env mocha */
'use strict'

const pull = require('pull-stream')
const expect = require('chai').expect
const pair = require('pull-pair/duplex')
const PeerInfo = require('peer-info')
const lp = require('pull-length-prefixed')
const multiaddr = require('multiaddr')

const msg = require('../src/message')
const identify = require('../src')

describe('identify.dialer', () => {
it('works', (done) => {
const p = pair()
const original = new PeerInfo()
original.multiaddr.add(multiaddr('/ip4/127.0.0.1/tcp/5002'))
const input = msg.encode({
protocolVersion: 'ipfs/0.1.0',
agentVersion: 'na',
publicKey: original.id.pubKey.bytes,
listenAddrs: [multiaddr('/ip4/127.0.0.1/tcp/5002').buffer],
observedAddr: multiaddr('/ip4/127.0.0.1/tcp/5001').buffer
})

pull(
pull.values([input]),
lp.encode(),
p[0]
)

identify.dialer(p[1], (err, info, observedAddrs) => {
expect(err).to.not.exist
expect(
info.id.pubKey.bytes
).to.be.eql(
original.id.pubKey.bytes
)

expect(
info.multiaddrs
).to.be.eql(
original.multiaddrs
)

expect(
observedAddrs
).to.be.eql(
[multiaddr('/ip4/127.0.0.1/tcp/5001')]
)

done()
})
})
})
4 changes: 0 additions & 4 deletions test/identify.spec.js

This file was deleted.

16 changes: 16 additions & 0 deletions test/index.spec.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
/* eslint-env mocha */
'use strict'

const expect = require('chai').expect

const identify = require('../src')

describe('identify', () => {
it('multicodec', () => {
expect(
identify.multicodec
).to.be.eql(
'/ipfs/id/1.0.0'
)
})
})
46 changes: 46 additions & 0 deletions test/listener.spec.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/* eslint-env mocha */
'use strict'

const pull = require('pull-stream')
const expect = require('chai').expect
const pair = require('pull-pair/duplex')
const PeerInfo = require('peer-info')
const lp = require('pull-length-prefixed')
const multiaddr = require('multiaddr')

const msg = require('../src/message')
const identify = require('../src')

describe('identify.listener', () => {
it('works', (done) => {
const p = pair()
const info = new PeerInfo()
info.multiaddr.add(multiaddr('/ip4/127.0.0.1/tcp/5002'))
pull(
p[1],
lp.decode(),
pull.collect((err, result) => {
expect(err).to.not.exist

const input = msg.decode(result[0])
expect(
input
).to.be.eql({
protocolVersion: 'ipfs/0.1.0',
agentVersion: 'na',
publicKey: info.id.pubKey.bytes,
listenAddrs: [multiaddr('/ip4/127.0.0.1/tcp/5002').buffer],
observedAddr: multiaddr('/ip4/127.0.0.1/tcp/5001').buffer
})
done()
})
)

const conn = p[0]
conn.getObservedAddrs = (cb) => {
cb(null, [multiaddr('/ip4/127.0.0.1/tcp/5001')])
}

identify.listener(conn, info)
})
})