Skip to content

Commit 510bd2c

Browse files
authored
feat: encode record-store keys in pubsub (#9)
1 parent 068ec27 commit 510bd2c

File tree

4 files changed

+80
-48
lines changed

4 files changed

+80
-48
lines changed

package.json

+2-2
Original file line numberDiff line numberDiff line change
@@ -33,10 +33,10 @@
3333
"homepage": "https://github.com/ipfs/js-datastore-pubsub#readme",
3434
"dependencies": {
3535
"assert": "^1.4.1",
36-
"base32.js": "~0.1.0",
3736
"debug": "^4.1.0",
3837
"err-code": "^1.1.2",
39-
"interface-datastore": "~0.6.0"
38+
"interface-datastore": "~0.6.0",
39+
"multibase": "~0.6.0"
4040
},
4141
"devDependencies": {
4242
"aegir": "^17.1.0",

src/index.js

+20-14
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
'use strict'
22

33
const { Key } = require('interface-datastore')
4-
const { encodeBase32 } = require('./utils')
4+
const { encodeBase32, keyToTopic, topicToKey } = require('./utils')
55

66
const errcode = require('err-code')
77
const assert = require('assert')
@@ -24,19 +24,19 @@ class DatastorePubsub {
2424
* @memberof DatastorePubsub
2525
*/
2626
constructor (pubsub, datastore, peerId, validator, subscriptionKeyFn) {
27-
assert.equal(typeof validator, 'object', 'missing validator')
28-
assert.equal(typeof validator.validate, 'function', 'missing validate function')
29-
assert.equal(typeof validator.select, 'function', 'missing select function')
30-
subscriptionKeyFn && assert.equal(typeof subscriptionKeyFn, 'function', 'invalid subscriptionKeyFn received')
27+
assert.strictEqual(typeof validator, 'object', 'missing validator')
28+
assert.strictEqual(typeof validator.validate, 'function', 'missing validate function')
29+
assert.strictEqual(typeof validator.select, 'function', 'missing select function')
30+
subscriptionKeyFn && assert.strictEqual(typeof subscriptionKeyFn, 'function', 'invalid subscriptionKeyFn received')
3131

3232
this._pubsub = pubsub
3333
this._datastore = datastore
3434
this._peerId = peerId
3535
this._validator = validator
3636
this._handleSubscriptionKeyFn = subscriptionKeyFn
3737

38-
// Bind _handleSubscription function, which is called by pubsub.
39-
this._handleSubscription = this._handleSubscription.bind(this)
38+
// Bind _onMessage function, which is called by pubsub.
39+
this._onMessage = this._onMessage.bind(this)
4040
}
4141

4242
/**
@@ -61,7 +61,7 @@ class DatastorePubsub {
6161
return callback(errcode(new Error(errMsg), 'ERR_INVALID_VALUE_RECEIVED'))
6262
}
6363

64-
const stringifiedTopic = key.toString()
64+
const stringifiedTopic = keyToTopic(key)
6565

6666
log(`publish value for topic ${stringifiedTopic}`)
6767

@@ -83,7 +83,7 @@ class DatastorePubsub {
8383
return callback(errcode(new Error(errMsg), 'ERR_INVALID_DATASTORE_KEY'))
8484
}
8585

86-
const stringifiedTopic = key.toString()
86+
const stringifiedTopic = keyToTopic(key)
8787

8888
this._pubsub.ls((err, res) => {
8989
if (err) {
@@ -96,7 +96,7 @@ class DatastorePubsub {
9696
}
9797

9898
// Subscribe
99-
this._pubsub.subscribe(stringifiedTopic, this._handleSubscription, (err) => {
99+
this._pubsub.subscribe(stringifiedTopic, this._onMessage, (err) => {
100100
if (err) {
101101
const errMsg = `cannot subscribe topic ${stringifiedTopic}`
102102

@@ -116,9 +116,9 @@ class DatastorePubsub {
116116
* @returns {void}
117117
*/
118118
unsubscribe (key) {
119-
const stringifiedTopic = key.toString()
119+
const stringifiedTopic = keyToTopic(key)
120120

121-
this._pubsub.unsubscribe(stringifiedTopic, this._handleSubscription)
121+
this._pubsub.unsubscribe(stringifiedTopic, this._onMessage)
122122
}
123123

124124
// Get record from local datastore
@@ -152,9 +152,15 @@ class DatastorePubsub {
152152
}
153153

154154
// handles pubsub subscription messages
155-
_handleSubscription (msg) {
155+
_onMessage (msg) {
156156
const { data, from, topicIDs } = msg
157-
const key = topicIDs[0]
157+
let key
158+
try {
159+
key = topicToKey(topicIDs[0])
160+
} catch (err) {
161+
log.error(err)
162+
return
163+
}
158164

159165
log(`message received for ${key} topic`)
160166

src/utils.js

+26-3
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,31 @@
11
'use strict'
22

3-
const base32 = require('base32.js')
3+
const multibase = require('multibase')
4+
const errcode = require('err-code')
5+
6+
const namespace = '/record/'
7+
const base64urlCode = 'u' // base64url code from multibase
48

59
module.exports.encodeBase32 = (buf) => {
6-
const enc = new base32.Encoder()
7-
return enc.write(buf).finalize()
10+
return multibase.encode('base32', buf).slice(1) // slice off multibase codec
11+
}
12+
13+
// converts a binary record key to a pubsub topic key.
14+
module.exports.keyToTopic = (key) => {
15+
// Record-store keys are arbitrary binary. However, pubsub requires UTF-8 string topic IDs
16+
// Encodes to "/record/base64url(key)"
17+
const b64url = multibase.encode('base64url', key).slice(1).toString()
18+
19+
return `${namespace}${b64url}`
20+
}
21+
22+
// converts a pubsub topic key to a binary record key.
23+
module.exports.topicToKey = (topic) => {
24+
if (topic.substring(0, namespace.length) !== namespace) {
25+
throw errcode(new Error('topic received is not from a record'), 'ERR_TOPIC_IS_NOT_FROM_RECORD_NAMESPACE')
26+
}
27+
28+
const key = `${base64urlCode}${topic.substring(namespace.length)}`
29+
30+
return multibase.decode(key).toString()
831
}

test/index.spec.js

+32-29
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ const { Key } = require('interface-datastore')
1515
const { Record } = require('libp2p-record')
1616

1717
const DatastorePubsub = require('../src')
18+
const { keyToTopic } = require('../src/utils')
1819
const { connect, waitFor, waitForPeerToSubscribe, spawnDaemon, stopDaemon } = require('./utils')
1920

2021
// Always returning the expected values
@@ -115,11 +116,12 @@ describe('datastore-pubsub', function () {
115116

116117
it('should subscribe the topic, but receive error as no entry is stored locally', function (done) {
117118
const dsPubsubA = new DatastorePubsub(pubsubA, datastoreA, peerIdA, smoothValidator)
119+
const subsTopic = keyToTopic(`/${keyRef}`)
118120

119121
pubsubA.ls((err, res) => {
120122
expect(err).to.not.exist()
121123
expect(res).to.exist()
122-
expect(res).to.not.include(`/${keyRef}`) // not subscribed key reference yet
124+
expect(res).to.not.include(subsTopic) // not subscribed key reference yet
123125

124126
dsPubsubA.get(key, (err) => {
125127
expect(err).to.exist() // not locally stored record
@@ -128,7 +130,7 @@ describe('datastore-pubsub', function () {
128130
pubsubA.ls((err, res) => {
129131
expect(err).to.not.exist()
130132
expect(res).to.exist()
131-
expect(res).to.include(`/${keyRef}`) // subscribed key reference
133+
expect(res).to.include(subsTopic) // subscribed key reference
132134
done()
133135
})
134136
})
@@ -138,11 +140,12 @@ describe('datastore-pubsub', function () {
138140
it('should put correctly to daemon A and daemon B should not receive it without subscribing', function (done) {
139141
const dsPubsubA = new DatastorePubsub(pubsubA, datastoreA, peerIdA, smoothValidator)
140142
const dsPubsubB = new DatastorePubsub(pubsubB, datastoreB, peerIdB, smoothValidator)
143+
const subsTopic = keyToTopic(`/${keyRef}`)
141144

142145
pubsubB.ls((err, res) => {
143146
expect(err).to.not.exist()
144147
expect(res).to.exist()
145-
expect(res).to.not.include(`/${keyRef}`) // not subscribed
148+
expect(res).to.not.include(subsTopic) // not subscribed
146149

147150
dsPubsubA.put(key, serializedRecord, (err) => {
148151
expect(err).to.not.exist()
@@ -169,7 +172,7 @@ describe('datastore-pubsub', function () {
169172
}
170173
const dsPubsubA = new DatastorePubsub(pubsubA, datastoreA, peerIdA, smoothValidator)
171174
const dsPubsubB = new DatastorePubsub(pubsubB, datastoreB, peerIdB, customValidator)
172-
const topic = `/${keyRef}`
175+
const subsTopic = keyToTopic(`/${keyRef}`)
173176
let receivedMessage = false
174177

175178
function messageHandler () {
@@ -181,9 +184,9 @@ describe('datastore-pubsub', function () {
181184
expect(res).to.not.exist() // no value available, but subscribed now
182185

183186
series([
184-
(cb) => waitForPeerToSubscribe(topic, ipfsdBId, ipfsdA, cb),
187+
(cb) => waitForPeerToSubscribe(subsTopic, ipfsdBId, ipfsdA, cb),
185188
// subscribe in order to understand when the message arrive to the node
186-
(cb) => pubsubB.subscribe(topic, messageHandler, cb),
189+
(cb) => pubsubB.subscribe(subsTopic, messageHandler, cb),
187190
(cb) => dsPubsubA.put(key, serializedRecord, cb),
188191
// wait until message arrives
189192
(cb) => waitFor(() => receivedMessage === true, cb),
@@ -200,7 +203,7 @@ describe('datastore-pubsub', function () {
200203
it('should put correctly to daemon A and daemon B should receive it as it tried to get it first and subscribed it', function (done) {
201204
const dsPubsubA = new DatastorePubsub(pubsubA, datastoreA, peerIdA, smoothValidator)
202205
const dsPubsubB = new DatastorePubsub(pubsubB, datastoreB, peerIdB, smoothValidator)
203-
const topic = `/${keyRef}`
206+
const subsTopic = keyToTopic(`/${keyRef}`)
204207
let receivedMessage = false
205208

206209
function messageHandler () {
@@ -210,16 +213,16 @@ describe('datastore-pubsub', function () {
210213
pubsubB.ls((err, res) => {
211214
expect(err).to.not.exist()
212215
expect(res).to.exist()
213-
expect(res).to.not.include(topic) // not subscribed
216+
expect(res).to.not.include(subsTopic) // not subscribed
214217

215218
dsPubsubB.get(key, (err, res) => {
216219
expect(err).to.exist()
217220
expect(res).to.not.exist() // not value available, but subscribed now
218221

219222
series([
220-
(cb) => waitForPeerToSubscribe(topic, ipfsdBId, ipfsdA, cb),
223+
(cb) => waitForPeerToSubscribe(subsTopic, ipfsdBId, ipfsdA, cb),
221224
// subscribe in order to understand when the message arrive to the node
222-
(cb) => pubsubB.subscribe(topic, messageHandler, cb),
225+
(cb) => pubsubB.subscribe(subsTopic, messageHandler, cb),
223226
(cb) => dsPubsubA.put(key, serializedRecord, cb),
224227
// wait until message arrives
225228
(cb) => waitFor(() => receivedMessage === true, cb),
@@ -300,7 +303,7 @@ describe('datastore-pubsub', function () {
300303
}
301304
const dsPubsubA = new DatastorePubsub(pubsubA, datastoreA, peerIdA, smoothValidator)
302305
const dsPubsubB = new DatastorePubsub(pubsubB, datastoreB, peerIdB, customValidator)
303-
const topic = `/${keyRef}`
306+
const subsTopic = keyToTopic(`/${keyRef}`)
304307
let receivedMessage = false
305308

306309
function messageHandler () {
@@ -312,9 +315,9 @@ describe('datastore-pubsub', function () {
312315
expect(res).to.not.exist() // not value available, but subscribed now
313316

314317
series([
315-
(cb) => waitForPeerToSubscribe(topic, ipfsdBId, ipfsdA, cb),
318+
(cb) => waitForPeerToSubscribe(subsTopic, ipfsdBId, ipfsdA, cb),
316319
// subscribe in order to understand when the message arrive to the node
317-
(cb) => pubsubB.subscribe(topic, messageHandler, cb),
320+
(cb) => pubsubB.subscribe(subsTopic, messageHandler, cb),
318321
(cb) => dsPubsubA.put(key, serializedRecord, cb),
319322
// wait until message arrives
320323
(cb) => waitFor(() => receivedMessage === true, cb),
@@ -345,7 +348,7 @@ describe('datastore-pubsub', function () {
345348

346349
const dsPubsubA = new DatastorePubsub(pubsubA, datastoreA, peerIdA, smoothValidator)
347350
const dsPubsubB = new DatastorePubsub(pubsubB, datastoreB, peerIdB, customValidator)
348-
const topic = `/${keyRef}`
351+
const subsTopic = keyToTopic(`/${keyRef}`)
349352
let receivedMessage = false
350353

351354
function messageHandler () {
@@ -357,9 +360,9 @@ describe('datastore-pubsub', function () {
357360
expect(res).to.not.exist() // not value available, but subscribed now
358361

359362
series([
360-
(cb) => waitForPeerToSubscribe(topic, ipfsdBId, ipfsdA, cb),
363+
(cb) => waitForPeerToSubscribe(subsTopic, ipfsdBId, ipfsdA, cb),
361364
// subscribe in order to understand when the message arrive to the node
362-
(cb) => pubsubB.subscribe(topic, messageHandler, cb),
365+
(cb) => pubsubB.subscribe(subsTopic, messageHandler, cb),
363366
(cb) => dsPubsubA.put(key, serializedRecord, cb),
364367
// wait until message arrives
365368
(cb) => waitFor(() => receivedMessage === true, cb),
@@ -396,7 +399,7 @@ describe('datastore-pubsub', function () {
396399

397400
const dsPubsubA = new DatastorePubsub(pubsubA, datastoreA, peerIdA, smoothValidator)
398401
const dsPubsubB = new DatastorePubsub(pubsubB, datastoreB, peerIdB, customValidator)
399-
const topic = `/${keyRef}`
402+
const subsTopic = keyToTopic(`/${keyRef}`)
400403
let receivedMessage = false
401404

402405
function messageHandler () {
@@ -408,9 +411,9 @@ describe('datastore-pubsub', function () {
408411
expect(res).to.not.exist() // not value available, but it is subscribed now
409412

410413
series([
411-
(cb) => waitForPeerToSubscribe(topic, ipfsdBId, ipfsdA, cb),
414+
(cb) => waitForPeerToSubscribe(subsTopic, ipfsdBId, ipfsdA, cb),
412415
// subscribe in order to understand when the message arrive to the node
413-
(cb) => pubsubB.subscribe(topic, messageHandler, cb),
416+
(cb) => pubsubB.subscribe(subsTopic, messageHandler, cb),
414417
(cb) => dsPubsubA.put(key, serializedRecord, cb),
415418
// wait until message arrives
416419
(cb) => waitFor(() => receivedMessage === true, cb),
@@ -431,14 +434,14 @@ describe('datastore-pubsub', function () {
431434
})
432435
})
433436

434-
it('should subscribe the topic and after a message being received, discarde it using the subscriptionKeyFn', function (done) {
437+
it('should subscribe the topic and after a message being received, discard it using the subscriptionKeyFn', function (done) {
435438
const subscriptionKeyFn = (topic, callback) => {
436-
expect(topic).to.equal(key.toString())
439+
expect(topic).to.equal(`/${keyRef}`)
437440
callback(new Error('DISCARD MESSAGE'))
438441
}
439442
const dsPubsubA = new DatastorePubsub(pubsubA, datastoreA, peerIdA, smoothValidator)
440443
const dsPubsubB = new DatastorePubsub(pubsubB, datastoreB, peerIdB, smoothValidator, subscriptionKeyFn)
441-
const topic = `/${keyRef}`
444+
const subsTopic = keyToTopic(`/${keyRef}`)
442445
let receivedMessage = false
443446

444447
function messageHandler () {
@@ -448,16 +451,16 @@ describe('datastore-pubsub', function () {
448451
pubsubB.ls((err, res) => {
449452
expect(err).to.not.exist()
450453
expect(res).to.exist()
451-
expect(res).to.not.include(topic) // not subscribed
454+
expect(res).to.not.include(subsTopic) // not subscribed
452455

453456
dsPubsubB.get(key, (err, res) => {
454457
expect(err).to.exist()
455458
expect(res).to.not.exist() // not value available, but subscribed now
456459

457460
series([
458-
(cb) => waitForPeerToSubscribe(topic, ipfsdBId, ipfsdA, cb),
461+
(cb) => waitForPeerToSubscribe(subsTopic, ipfsdBId, ipfsdA, cb),
459462
// subscribe in order to understand when the message arrive to the node
460-
(cb) => pubsubB.subscribe(topic, messageHandler, cb),
463+
(cb) => pubsubB.subscribe(subsTopic, messageHandler, cb),
461464
(cb) => dsPubsubA.put(key, serializedRecord, cb),
462465
// wait until message arrives
463466
(cb) => waitFor(() => receivedMessage === true, cb),
@@ -478,7 +481,7 @@ describe('datastore-pubsub', function () {
478481
}
479482
const dsPubsubA = new DatastorePubsub(pubsubA, datastoreA, peerIdA, smoothValidator)
480483
const dsPubsubB = new DatastorePubsub(pubsubB, datastoreB, peerIdB, smoothValidator, subscriptionKeyFn)
481-
const topic = `/${keyRef}`
484+
const subsTopic = keyToTopic(`/${keyRef}`)
482485
const keyNew = Buffer.from(`${key.toString()}new`)
483486
let receivedMessage = false
484487

@@ -489,16 +492,16 @@ describe('datastore-pubsub', function () {
489492
pubsubB.ls((err, res) => {
490493
expect(err).to.not.exist()
491494
expect(res).to.exist()
492-
expect(res).to.not.include(topic) // not subscribed
495+
expect(res).to.not.include(subsTopic) // not subscribed
493496

494497
dsPubsubB.get(key, (err, res) => {
495498
expect(err).to.exist()
496499
expect(res).to.not.exist() // not value available, but subscribed now
497500

498501
series([
499-
(cb) => waitForPeerToSubscribe(topic, ipfsdBId, ipfsdA, cb),
502+
(cb) => waitForPeerToSubscribe(subsTopic, ipfsdBId, ipfsdA, cb),
500503
// subscribe in order to understand when the message arrive to the node
501-
(cb) => pubsubB.subscribe(topic, messageHandler, cb),
504+
(cb) => pubsubB.subscribe(subsTopic, messageHandler, cb),
502505
(cb) => dsPubsubA.put(key, serializedRecord, cb),
503506
// wait until message arrives
504507
(cb) => waitFor(() => receivedMessage === true, cb),

0 commit comments

Comments
 (0)