Skip to content

Commit 7c6f648

Browse files
committed
feat: encode record-store keys in pubsub
1 parent 5b1afc3 commit 7c6f648

File tree

4 files changed

+50
-31
lines changed

4 files changed

+50
-31
lines changed

package.json

+1
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
"dependencies": {
3535
"assert": "^1.4.1",
3636
"base32.js": "~0.1.0",
37+
"base64url": "^3.0.0",
3738
"debug": "^4.1.0",
3839
"err-code": "^1.1.2",
3940
"interface-datastore": "~0.6.0"

src/index.js

+5-5
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
'use strict'
22

33
const { Key } = require('interface-datastore')
4-
const { encodeBase32 } = require('./utils')
4+
const { encodeBase32, keyToTopic, topicToKey } = require('./utils')
55

66
const errcode = require('err-code')
77
const assert = require('assert')
@@ -61,7 +61,7 @@ class DatastorePubsub {
6161
return callback(errcode(new Error(errMsg), 'ERR_INVALID_VALUE_RECEIVED'))
6262
}
6363

64-
const stringifiedTopic = key.toString()
64+
const stringifiedTopic = keyToTopic(key)
6565

6666
log(`publish value for topic ${stringifiedTopic}`)
6767

@@ -83,7 +83,7 @@ class DatastorePubsub {
8383
return callback(errcode(new Error(errMsg), 'ERR_INVALID_DATASTORE_KEY'))
8484
}
8585

86-
const stringifiedTopic = key.toString()
86+
const stringifiedTopic = keyToTopic(key)
8787

8888
this._pubsub.ls((err, res) => {
8989
if (err) {
@@ -116,7 +116,7 @@ class DatastorePubsub {
116116
* @returns {void}
117117
*/
118118
unsubscribe (key) {
119-
const stringifiedTopic = key.toString()
119+
const stringifiedTopic = keyToTopic(key)
120120

121121
this._pubsub.unsubscribe(stringifiedTopic, this._handleSubscription)
122122
}
@@ -154,7 +154,7 @@ class DatastorePubsub {
154154
// handles pubsub subscription messages
155155
_handleSubscription (msg) {
156156
const { data, from, topicIDs } = msg
157-
const key = topicIDs[0]
157+
const key = topicToKey(topicIDs[0])
158158

159159
log(`message received for ${key} topic`)
160160

src/utils.js

+15
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,23 @@
11
'use strict'
22

33
const base32 = require('base32.js')
4+
const base64url = require('base64url')
5+
6+
const namespace = '/record/'
47

58
module.exports.encodeBase32 = (buf) => {
69
const enc = new base32.Encoder()
710
return enc.write(buf).finalize()
811
}
12+
13+
// converts a binary record key to a pubsub topic key.
14+
module.exports.keyToTopic = (key) => {
15+
// Record-store keys are arbitrary binary. However, pubsub requires UTF-8 string topic IDs
16+
// Encodes to "/record/base64url(key)"
17+
return `${namespace}${base64url.encode(key)}`
18+
}
19+
20+
// converts a pubsub topic key to a binary record key.
21+
module.exports.topicToKey = (topic) => {
22+
return base64url.decode(topic.substring(namespace.length))
23+
}

test/index.spec.js

+29-26
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ const { Key } = require('interface-datastore')
1515
const { Record } = require('libp2p-record')
1616

1717
const DatastorePubsub = require('../src')
18+
const { keyToTopic } = require('../src/utils')
1819
const { connect, waitFor, waitForPeerToSubscribe, spawnDaemon, stopDaemon } = require('./utils')
1920

2021
// Always returning the expected values
@@ -115,11 +116,12 @@ describe('datastore-pubsub', function () {
115116

116117
it('should subscribe the topic, but receive error as no entry is stored locally', function (done) {
117118
const dsPubsubA = new DatastorePubsub(pubsubA, datastoreA, peerIdA, smoothValidator)
119+
const subsTopic = keyToTopic(`/${keyRef}`)
118120

119121
pubsubA.ls((err, res) => {
120122
expect(err).to.not.exist()
121123
expect(res).to.exist()
122-
expect(res).to.not.include(`/${keyRef}`) // not subscribed key reference yet
124+
expect(res).to.not.include(subsTopic) // not subscribed key reference yet
123125

124126
dsPubsubA.get(key, (err) => {
125127
expect(err).to.exist() // not locally stored record
@@ -128,7 +130,7 @@ describe('datastore-pubsub', function () {
128130
pubsubA.ls((err, res) => {
129131
expect(err).to.not.exist()
130132
expect(res).to.exist()
131-
expect(res).to.include(`/${keyRef}`) // subscribed key reference
133+
expect(res).to.include(subsTopic) // subscribed key reference
132134
done()
133135
})
134136
})
@@ -138,11 +140,12 @@ describe('datastore-pubsub', function () {
138140
it('should put correctly to daemon A and daemon B should not receive it without subscribing', function (done) {
139141
const dsPubsubA = new DatastorePubsub(pubsubA, datastoreA, peerIdA, smoothValidator)
140142
const dsPubsubB = new DatastorePubsub(pubsubB, datastoreB, peerIdB, smoothValidator)
143+
const subsTopic = keyToTopic(`/${keyRef}`)
141144

142145
pubsubB.ls((err, res) => {
143146
expect(err).to.not.exist()
144147
expect(res).to.exist()
145-
expect(res).to.not.include(`/${keyRef}`) // not subscribed
148+
expect(res).to.not.include(subsTopic) // not subscribed
146149

147150
dsPubsubA.put(key, serializedRecord, (err) => {
148151
expect(err).to.not.exist()
@@ -201,7 +204,7 @@ describe('datastore-pubsub', function () {
201204
it('should put correctly to daemon A and daemon B should receive it as it tried to get it first and subscribed it', function (done) {
202205
const dsPubsubA = new DatastorePubsub(pubsubA, datastoreA, peerIdA, smoothValidator)
203206
const dsPubsubB = new DatastorePubsub(pubsubB, datastoreB, peerIdB, smoothValidator)
204-
const topic = `/${keyRef}`
207+
const subsTopic = keyToTopic(`/${keyRef}`)
205208
let receivedMessage = false
206209

207210
function messageHandler () {
@@ -211,16 +214,16 @@ describe('datastore-pubsub', function () {
211214
pubsubB.ls((err, res) => {
212215
expect(err).to.not.exist()
213216
expect(res).to.exist()
214-
expect(res).to.not.include(topic) // not subscribed
217+
expect(res).to.not.include(subsTopic) // not subscribed
215218

216219
dsPubsubB.get(key, (err, res) => {
217220
expect(err).to.exist()
218221
expect(res).to.not.exist() // not value available, but subscribed now
219222

220223
series([
221-
(cb) => waitForPeerToSubscribe(topic, ipfsdBId, ipfsdA, cb),
224+
(cb) => waitForPeerToSubscribe(subsTopic, ipfsdBId, ipfsdA, cb),
222225
// subscribe in order to understand when the message arrive to the node
223-
(cb) => pubsubB.subscribe(topic, messageHandler, cb),
226+
(cb) => pubsubB.subscribe(subsTopic, messageHandler, cb),
224227
(cb) => dsPubsubA.put(key, serializedRecord, cb),
225228
// wait until message arrives
226229
(cb) => waitFor(() => receivedMessage === true, cb),
@@ -301,7 +304,7 @@ describe('datastore-pubsub', function () {
301304
}
302305
const dsPubsubA = new DatastorePubsub(pubsubA, datastoreA, peerIdA, smoothValidator)
303306
const dsPubsubB = new DatastorePubsub(pubsubB, datastoreB, peerIdB, customValidator)
304-
const topic = `/${keyRef}`
307+
const subsTopic = keyToTopic(`/${keyRef}`)
305308
let receivedMessage = false
306309

307310
function messageHandler () {
@@ -313,9 +316,9 @@ describe('datastore-pubsub', function () {
313316
expect(res).to.not.exist() // not value available, but subscribed now
314317

315318
series([
316-
(cb) => waitForPeerToSubscribe(topic, ipfsdBId, ipfsdA, cb),
319+
(cb) => waitForPeerToSubscribe(subsTopic, ipfsdBId, ipfsdA, cb),
317320
// subscribe in order to understand when the message arrive to the node
318-
(cb) => pubsubB.subscribe(topic, messageHandler, cb),
321+
(cb) => pubsubB.subscribe(subsTopic, messageHandler, cb),
319322
(cb) => dsPubsubA.put(key, serializedRecord, cb),
320323
// wait until message arrives
321324
(cb) => waitFor(() => receivedMessage === true, cb),
@@ -346,7 +349,7 @@ describe('datastore-pubsub', function () {
346349

347350
const dsPubsubA = new DatastorePubsub(pubsubA, datastoreA, peerIdA, smoothValidator)
348351
const dsPubsubB = new DatastorePubsub(pubsubB, datastoreB, peerIdB, customValidator)
349-
const topic = `/${keyRef}`
352+
const subsTopic = keyToTopic(`/${keyRef}`)
350353
let receivedMessage = false
351354

352355
function messageHandler () {
@@ -358,9 +361,9 @@ describe('datastore-pubsub', function () {
358361
expect(res).to.not.exist() // not value available, but subscribed now
359362

360363
series([
361-
(cb) => waitForPeerToSubscribe(topic, ipfsdBId, ipfsdA, cb),
364+
(cb) => waitForPeerToSubscribe(subsTopic, ipfsdBId, ipfsdA, cb),
362365
// subscribe in order to understand when the message arrive to the node
363-
(cb) => pubsubB.subscribe(topic, messageHandler, cb),
366+
(cb) => pubsubB.subscribe(subsTopic, messageHandler, cb),
364367
(cb) => dsPubsubA.put(key, serializedRecord, cb),
365368
// wait until message arrives
366369
(cb) => waitFor(() => receivedMessage === true, cb),
@@ -397,7 +400,7 @@ describe('datastore-pubsub', function () {
397400

398401
const dsPubsubA = new DatastorePubsub(pubsubA, datastoreA, peerIdA, smoothValidator)
399402
const dsPubsubB = new DatastorePubsub(pubsubB, datastoreB, peerIdB, customValidator)
400-
const topic = `/${keyRef}`
403+
const subsTopic = keyToTopic(`/${keyRef}`)
401404
let receivedMessage = false
402405

403406
function messageHandler () {
@@ -409,9 +412,9 @@ describe('datastore-pubsub', function () {
409412
expect(res).to.not.exist() // not value available, but it is subscribed now
410413

411414
series([
412-
(cb) => waitForPeerToSubscribe(topic, ipfsdBId, ipfsdA, cb),
415+
(cb) => waitForPeerToSubscribe(subsTopic, ipfsdBId, ipfsdA, cb),
413416
// subscribe in order to understand when the message arrive to the node
414-
(cb) => pubsubB.subscribe(topic, messageHandler, cb),
417+
(cb) => pubsubB.subscribe(subsTopic, messageHandler, cb),
415418
(cb) => dsPubsubA.put(key, serializedRecord, cb),
416419
// wait until message arrives
417420
(cb) => waitFor(() => receivedMessage === true, cb),
@@ -432,14 +435,14 @@ describe('datastore-pubsub', function () {
432435
})
433436
})
434437

435-
it('should subscribe the topic and after a message being received, discarde it using the subscriptionKeyFn', function (done) {
438+
it('should subscribe the topic and after a message being received, discard it using the subscriptionKeyFn', function (done) {
436439
const subscriptionKeyFn = (topic, callback) => {
437-
expect(topic).to.equal(key.toString())
440+
expect(topic).to.equal(`/${keyRef}`)
438441
callback(new Error('DISCARD MESSAGE'))
439442
}
440443
const dsPubsubA = new DatastorePubsub(pubsubA, datastoreA, peerIdA, smoothValidator)
441444
const dsPubsubB = new DatastorePubsub(pubsubB, datastoreB, peerIdB, smoothValidator, subscriptionKeyFn)
442-
const topic = `/${keyRef}`
445+
const subsTopic = keyToTopic(`/${keyRef}`)
443446
let receivedMessage = false
444447

445448
function messageHandler () {
@@ -449,16 +452,16 @@ describe('datastore-pubsub', function () {
449452
pubsubB.ls((err, res) => {
450453
expect(err).to.not.exist()
451454
expect(res).to.exist()
452-
expect(res).to.not.include(topic) // not subscribed
455+
expect(res).to.not.include(subsTopic) // not subscribed
453456

454457
dsPubsubB.get(key, (err, res) => {
455458
expect(err).to.exist()
456459
expect(res).to.not.exist() // not value available, but subscribed now
457460

458461
series([
459-
(cb) => waitForPeerToSubscribe(topic, ipfsdBId, ipfsdA, cb),
462+
(cb) => waitForPeerToSubscribe(subsTopic, ipfsdBId, ipfsdA, cb),
460463
// subscribe in order to understand when the message arrive to the node
461-
(cb) => pubsubB.subscribe(topic, messageHandler, cb),
464+
(cb) => pubsubB.subscribe(subsTopic, messageHandler, cb),
462465
(cb) => dsPubsubA.put(key, serializedRecord, cb),
463466
// wait until message arrives
464467
(cb) => waitFor(() => receivedMessage === true, cb),
@@ -479,7 +482,7 @@ describe('datastore-pubsub', function () {
479482
}
480483
const dsPubsubA = new DatastorePubsub(pubsubA, datastoreA, peerIdA, smoothValidator)
481484
const dsPubsubB = new DatastorePubsub(pubsubB, datastoreB, peerIdB, smoothValidator, subscriptionKeyFn)
482-
const topic = `/${keyRef}`
485+
const subsTopic = keyToTopic(`/${keyRef}`)
483486
const keyNew = Buffer.from(`${key.toString()}new`)
484487
let receivedMessage = false
485488

@@ -490,16 +493,16 @@ describe('datastore-pubsub', function () {
490493
pubsubB.ls((err, res) => {
491494
expect(err).to.not.exist()
492495
expect(res).to.exist()
493-
expect(res).to.not.include(topic) // not subscribed
496+
expect(res).to.not.include(subsTopic) // not subscribed
494497

495498
dsPubsubB.get(key, (err, res) => {
496499
expect(err).to.exist()
497500
expect(res).to.not.exist() // not value available, but subscribed now
498501

499502
series([
500-
(cb) => waitForPeerToSubscribe(topic, ipfsdBId, ipfsdA, cb),
503+
(cb) => waitForPeerToSubscribe(subsTopic, ipfsdBId, ipfsdA, cb),
501504
// subscribe in order to understand when the message arrive to the node
502-
(cb) => pubsubB.subscribe(topic, messageHandler, cb),
505+
(cb) => pubsubB.subscribe(subsTopic, messageHandler, cb),
503506
(cb) => dsPubsubA.put(key, serializedRecord, cb),
504507
// wait until message arrives
505508
(cb) => waitFor(() => receivedMessage === true, cb),

0 commit comments

Comments
 (0)