Skip to content

feat: encode record-store keys in pubsub #9

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Nov 29, 2018
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
"dependencies": {
"assert": "^1.4.1",
"base32.js": "~0.1.0",
"base64url": "^3.0.0",
"debug": "^4.1.0",
"err-code": "^1.1.2",
"interface-datastore": "~0.6.0"
Expand Down
10 changes: 5 additions & 5 deletions src/index.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
'use strict'

const { Key } = require('interface-datastore')
const { encodeBase32 } = require('./utils')
const { encodeBase32, keyToTopic, topicToKey } = require('./utils')

const errcode = require('err-code')
const assert = require('assert')
Expand Down Expand Up @@ -61,7 +61,7 @@ class DatastorePubsub {
return callback(errcode(new Error(errMsg), 'ERR_INVALID_VALUE_RECEIVED'))
}

const stringifiedTopic = key.toString()
const stringifiedTopic = keyToTopic(key)

log(`publish value for topic ${stringifiedTopic}`)

Expand All @@ -83,7 +83,7 @@ class DatastorePubsub {
return callback(errcode(new Error(errMsg), 'ERR_INVALID_DATASTORE_KEY'))
}

const stringifiedTopic = key.toString()
const stringifiedTopic = keyToTopic(key)

this._pubsub.ls((err, res) => {
if (err) {
Expand Down Expand Up @@ -116,7 +116,7 @@ class DatastorePubsub {
* @returns {void}
*/
unsubscribe (key) {
const stringifiedTopic = key.toString()
const stringifiedTopic = keyToTopic(key)

this._pubsub.unsubscribe(stringifiedTopic, this._handleSubscription)
}
Expand Down Expand Up @@ -154,7 +154,7 @@ class DatastorePubsub {
// handles pubsub subscription messages
_handleSubscription (msg) {
const { data, from, topicIDs } = msg
const key = topicIDs[0]
const key = topicToKey(topicIDs[0])

log(`message received for ${key} topic`)

Expand Down
15 changes: 15 additions & 0 deletions src/utils.js
Original file line number Diff line number Diff line change
@@ -1,8 +1,23 @@
'use strict'

const base32 = require('base32.js')
const base64url = require('base64url')

const namespace = '/record/'

module.exports.encodeBase32 = (buf) => {
const enc = new base32.Encoder()
return enc.write(buf).finalize()
}

// converts a binary record key to a pubsub topic key.
module.exports.keyToTopic = (key) => {
// Record-store keys are arbitrary binary. However, pubsub requires UTF-8 string topic IDs
// Encodes to "/record/base64url(key)"
return `${namespace}${base64url.encode(key)}`
}

// converts a pubsub topic key to a binary record key.
module.exports.topicToKey = (topic) => {
return base64url.decode(topic.substring(namespace.length))
}
61 changes: 32 additions & 29 deletions test/index.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ const { Key } = require('interface-datastore')
const { Record } = require('libp2p-record')

const DatastorePubsub = require('../src')
const { keyToTopic } = require('../src/utils')
const { connect, waitFor, waitForPeerToSubscribe, spawnDaemon, stopDaemon } = require('./utils')

// Always returning the expected values
Expand Down Expand Up @@ -115,11 +116,12 @@ describe('datastore-pubsub', function () {

it('should subscribe the topic, but receive error as no entry is stored locally', function (done) {
const dsPubsubA = new DatastorePubsub(pubsubA, datastoreA, peerIdA, smoothValidator)
const subsTopic = keyToTopic(`/${keyRef}`)

pubsubA.ls((err, res) => {
expect(err).to.not.exist()
expect(res).to.exist()
expect(res).to.not.include(`/${keyRef}`) // not subscribed key reference yet
expect(res).to.not.include(subsTopic) // not subscribed key reference yet

dsPubsubA.get(key, (err) => {
expect(err).to.exist() // not locally stored record
Expand All @@ -128,7 +130,7 @@ describe('datastore-pubsub', function () {
pubsubA.ls((err, res) => {
expect(err).to.not.exist()
expect(res).to.exist()
expect(res).to.include(`/${keyRef}`) // subscribed key reference
expect(res).to.include(subsTopic) // subscribed key reference
done()
})
})
Expand All @@ -138,11 +140,12 @@ describe('datastore-pubsub', function () {
it('should put correctly to daemon A and daemon B should not receive it without subscribing', function (done) {
const dsPubsubA = new DatastorePubsub(pubsubA, datastoreA, peerIdA, smoothValidator)
const dsPubsubB = new DatastorePubsub(pubsubB, datastoreB, peerIdB, smoothValidator)
const subsTopic = keyToTopic(`/${keyRef}`)

pubsubB.ls((err, res) => {
expect(err).to.not.exist()
expect(res).to.exist()
expect(res).to.not.include(`/${keyRef}`) // not subscribed
expect(res).to.not.include(subsTopic) // not subscribed

dsPubsubA.put(key, serializedRecord, (err) => {
expect(err).to.not.exist()
Expand All @@ -169,7 +172,7 @@ describe('datastore-pubsub', function () {
}
const dsPubsubA = new DatastorePubsub(pubsubA, datastoreA, peerIdA, smoothValidator)
const dsPubsubB = new DatastorePubsub(pubsubB, datastoreB, peerIdB, customValidator)
const topic = `/${keyRef}`
const subsTopic = keyToTopic(`/${keyRef}`)
let receivedMessage = false

function messageHandler () {
Expand All @@ -181,9 +184,9 @@ describe('datastore-pubsub', function () {
expect(res).to.not.exist() // no value available, but subscribed now

series([
(cb) => waitForPeerToSubscribe(topic, ipfsdBId, ipfsdA, cb),
(cb) => waitForPeerToSubscribe(subsTopic, ipfsdBId, ipfsdA, cb),
// subscribe in order to understand when the message arrive to the node
(cb) => pubsubB.subscribe(topic, messageHandler, cb),
(cb) => pubsubB.subscribe(subsTopic, messageHandler, cb),
(cb) => dsPubsubA.put(key, serializedRecord, cb),
// wait until message arrives
(cb) => waitFor(() => receivedMessage === true, cb),
Expand All @@ -200,7 +203,7 @@ describe('datastore-pubsub', function () {
it('should put correctly to daemon A and daemon B should receive it as it tried to get it first and subscribed it', function (done) {
const dsPubsubA = new DatastorePubsub(pubsubA, datastoreA, peerIdA, smoothValidator)
const dsPubsubB = new DatastorePubsub(pubsubB, datastoreB, peerIdB, smoothValidator)
const topic = `/${keyRef}`
const subsTopic = keyToTopic(`/${keyRef}`)
let receivedMessage = false

function messageHandler () {
Expand All @@ -210,16 +213,16 @@ describe('datastore-pubsub', function () {
pubsubB.ls((err, res) => {
expect(err).to.not.exist()
expect(res).to.exist()
expect(res).to.not.include(topic) // not subscribed
expect(res).to.not.include(subsTopic) // not subscribed

dsPubsubB.get(key, (err, res) => {
expect(err).to.exist()
expect(res).to.not.exist() // not value available, but subscribed now

series([
(cb) => waitForPeerToSubscribe(topic, ipfsdBId, ipfsdA, cb),
(cb) => waitForPeerToSubscribe(subsTopic, ipfsdBId, ipfsdA, cb),
// subscribe in order to understand when the message arrive to the node
(cb) => pubsubB.subscribe(topic, messageHandler, cb),
(cb) => pubsubB.subscribe(subsTopic, messageHandler, cb),
(cb) => dsPubsubA.put(key, serializedRecord, cb),
// wait until message arrives
(cb) => waitFor(() => receivedMessage === true, cb),
Expand Down Expand Up @@ -300,7 +303,7 @@ describe('datastore-pubsub', function () {
}
const dsPubsubA = new DatastorePubsub(pubsubA, datastoreA, peerIdA, smoothValidator)
const dsPubsubB = new DatastorePubsub(pubsubB, datastoreB, peerIdB, customValidator)
const topic = `/${keyRef}`
const subsTopic = keyToTopic(`/${keyRef}`)
let receivedMessage = false

function messageHandler () {
Expand All @@ -312,9 +315,9 @@ describe('datastore-pubsub', function () {
expect(res).to.not.exist() // not value available, but subscribed now

series([
(cb) => waitForPeerToSubscribe(topic, ipfsdBId, ipfsdA, cb),
(cb) => waitForPeerToSubscribe(subsTopic, ipfsdBId, ipfsdA, cb),
// subscribe in order to understand when the message arrive to the node
(cb) => pubsubB.subscribe(topic, messageHandler, cb),
(cb) => pubsubB.subscribe(subsTopic, messageHandler, cb),
(cb) => dsPubsubA.put(key, serializedRecord, cb),
// wait until message arrives
(cb) => waitFor(() => receivedMessage === true, cb),
Expand Down Expand Up @@ -345,7 +348,7 @@ describe('datastore-pubsub', function () {

const dsPubsubA = new DatastorePubsub(pubsubA, datastoreA, peerIdA, smoothValidator)
const dsPubsubB = new DatastorePubsub(pubsubB, datastoreB, peerIdB, customValidator)
const topic = `/${keyRef}`
const subsTopic = keyToTopic(`/${keyRef}`)
let receivedMessage = false

function messageHandler () {
Expand All @@ -357,9 +360,9 @@ describe('datastore-pubsub', function () {
expect(res).to.not.exist() // not value available, but subscribed now

series([
(cb) => waitForPeerToSubscribe(topic, ipfsdBId, ipfsdA, cb),
(cb) => waitForPeerToSubscribe(subsTopic, ipfsdBId, ipfsdA, cb),
// subscribe in order to understand when the message arrive to the node
(cb) => pubsubB.subscribe(topic, messageHandler, cb),
(cb) => pubsubB.subscribe(subsTopic, messageHandler, cb),
(cb) => dsPubsubA.put(key, serializedRecord, cb),
// wait until message arrives
(cb) => waitFor(() => receivedMessage === true, cb),
Expand Down Expand Up @@ -396,7 +399,7 @@ describe('datastore-pubsub', function () {

const dsPubsubA = new DatastorePubsub(pubsubA, datastoreA, peerIdA, smoothValidator)
const dsPubsubB = new DatastorePubsub(pubsubB, datastoreB, peerIdB, customValidator)
const topic = `/${keyRef}`
const subsTopic = keyToTopic(`/${keyRef}`)
let receivedMessage = false

function messageHandler () {
Expand All @@ -408,9 +411,9 @@ describe('datastore-pubsub', function () {
expect(res).to.not.exist() // not value available, but it is subscribed now

series([
(cb) => waitForPeerToSubscribe(topic, ipfsdBId, ipfsdA, cb),
(cb) => waitForPeerToSubscribe(subsTopic, ipfsdBId, ipfsdA, cb),
// subscribe in order to understand when the message arrive to the node
(cb) => pubsubB.subscribe(topic, messageHandler, cb),
(cb) => pubsubB.subscribe(subsTopic, messageHandler, cb),
(cb) => dsPubsubA.put(key, serializedRecord, cb),
// wait until message arrives
(cb) => waitFor(() => receivedMessage === true, cb),
Expand All @@ -431,14 +434,14 @@ describe('datastore-pubsub', function () {
})
})

it('should subscribe the topic and after a message being received, discarde it using the subscriptionKeyFn', function (done) {
it('should subscribe the topic and after a message being received, discard it using the subscriptionKeyFn', function (done) {
const subscriptionKeyFn = (topic, callback) => {
expect(topic).to.equal(key.toString())
expect(topic).to.equal(`/${keyRef}`)
callback(new Error('DISCARD MESSAGE'))
}
const dsPubsubA = new DatastorePubsub(pubsubA, datastoreA, peerIdA, smoothValidator)
const dsPubsubB = new DatastorePubsub(pubsubB, datastoreB, peerIdB, smoothValidator, subscriptionKeyFn)
const topic = `/${keyRef}`
const subsTopic = keyToTopic(`/${keyRef}`)
let receivedMessage = false

function messageHandler () {
Expand All @@ -448,16 +451,16 @@ describe('datastore-pubsub', function () {
pubsubB.ls((err, res) => {
expect(err).to.not.exist()
expect(res).to.exist()
expect(res).to.not.include(topic) // not subscribed
expect(res).to.not.include(subsTopic) // not subscribed

dsPubsubB.get(key, (err, res) => {
expect(err).to.exist()
expect(res).to.not.exist() // not value available, but subscribed now

series([
(cb) => waitForPeerToSubscribe(topic, ipfsdBId, ipfsdA, cb),
(cb) => waitForPeerToSubscribe(subsTopic, ipfsdBId, ipfsdA, cb),
// subscribe in order to understand when the message arrive to the node
(cb) => pubsubB.subscribe(topic, messageHandler, cb),
(cb) => pubsubB.subscribe(subsTopic, messageHandler, cb),
(cb) => dsPubsubA.put(key, serializedRecord, cb),
// wait until message arrives
(cb) => waitFor(() => receivedMessage === true, cb),
Expand All @@ -478,7 +481,7 @@ describe('datastore-pubsub', function () {
}
const dsPubsubA = new DatastorePubsub(pubsubA, datastoreA, peerIdA, smoothValidator)
const dsPubsubB = new DatastorePubsub(pubsubB, datastoreB, peerIdB, smoothValidator, subscriptionKeyFn)
const topic = `/${keyRef}`
const subsTopic = keyToTopic(`/${keyRef}`)
const keyNew = Buffer.from(`${key.toString()}new`)
let receivedMessage = false

Expand All @@ -489,16 +492,16 @@ describe('datastore-pubsub', function () {
pubsubB.ls((err, res) => {
expect(err).to.not.exist()
expect(res).to.exist()
expect(res).to.not.include(topic) // not subscribed
expect(res).to.not.include(subsTopic) // not subscribed

dsPubsubB.get(key, (err, res) => {
expect(err).to.exist()
expect(res).to.not.exist() // not value available, but subscribed now

series([
(cb) => waitForPeerToSubscribe(topic, ipfsdBId, ipfsdA, cb),
(cb) => waitForPeerToSubscribe(subsTopic, ipfsdBId, ipfsdA, cb),
// subscribe in order to understand when the message arrive to the node
(cb) => pubsubB.subscribe(topic, messageHandler, cb),
(cb) => pubsubB.subscribe(subsTopic, messageHandler, cb),
(cb) => dsPubsubA.put(key, serializedRecord, cb),
// wait until message arrives
(cb) => waitFor(() => receivedMessage === true, cb),
Expand Down