Skip to content

Commit e20dcac

Browse files
committed
feat: encode record-store keys in pubsub
1 parent 068ec27 commit e20dcac

File tree

4 files changed

+53
-34
lines changed

4 files changed

+53
-34
lines changed

package.json

+1
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
"dependencies": {
3535
"assert": "^1.4.1",
3636
"base32.js": "~0.1.0",
37+
"base64url": "^3.0.0",
3738
"debug": "^4.1.0",
3839
"err-code": "^1.1.2",
3940
"interface-datastore": "~0.6.0"

src/index.js

+5-5
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
'use strict'
22

33
const { Key } = require('interface-datastore')
4-
const { encodeBase32 } = require('./utils')
4+
const { encodeBase32, keyToTopic, topicToKey } = require('./utils')
55

66
const errcode = require('err-code')
77
const assert = require('assert')
@@ -61,7 +61,7 @@ class DatastorePubsub {
6161
return callback(errcode(new Error(errMsg), 'ERR_INVALID_VALUE_RECEIVED'))
6262
}
6363

64-
const stringifiedTopic = key.toString()
64+
const stringifiedTopic = keyToTopic(key)
6565

6666
log(`publish value for topic ${stringifiedTopic}`)
6767

@@ -83,7 +83,7 @@ class DatastorePubsub {
8383
return callback(errcode(new Error(errMsg), 'ERR_INVALID_DATASTORE_KEY'))
8484
}
8585

86-
const stringifiedTopic = key.toString()
86+
const stringifiedTopic = keyToTopic(key)
8787

8888
this._pubsub.ls((err, res) => {
8989
if (err) {
@@ -116,7 +116,7 @@ class DatastorePubsub {
116116
* @returns {void}
117117
*/
118118
unsubscribe (key) {
119-
const stringifiedTopic = key.toString()
119+
const stringifiedTopic = keyToTopic(key)
120120

121121
this._pubsub.unsubscribe(stringifiedTopic, this._handleSubscription)
122122
}
@@ -154,7 +154,7 @@ class DatastorePubsub {
154154
// handles pubsub subscription messages
155155
_handleSubscription (msg) {
156156
const { data, from, topicIDs } = msg
157-
const key = topicIDs[0]
157+
const key = topicToKey(topicIDs[0])
158158

159159
log(`message received for ${key} topic`)
160160

src/utils.js

+15
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,23 @@
11
'use strict'
22

33
const base32 = require('base32.js')
4+
const base64url = require('base64url')
5+
6+
const namespace = '/record/'
47

58
module.exports.encodeBase32 = (buf) => {
69
const enc = new base32.Encoder()
710
return enc.write(buf).finalize()
811
}
12+
13+
// converts a binary record key to a pubsub topic key.
14+
module.exports.keyToTopic = (key) => {
15+
// Record-store keys are arbitrary binary. However, pubsub requires UTF-8 string topic IDs
16+
// Encodes to "/record/base64url(key)"
17+
return `${namespace}${base64url.encode(key)}`
18+
}
19+
20+
// converts a pubsub topic key to a binary record key.
21+
module.exports.topicToKey = (topic) => {
22+
return base64url.decode(topic.substring(namespace.length))
23+
}

test/index.spec.js

+32-29
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ const { Key } = require('interface-datastore')
1515
const { Record } = require('libp2p-record')
1616

1717
const DatastorePubsub = require('../src')
18+
const { keyToTopic } = require('../src/utils')
1819
const { connect, waitFor, waitForPeerToSubscribe, spawnDaemon, stopDaemon } = require('./utils')
1920

2021
// Always returning the expected values
@@ -115,11 +116,12 @@ describe('datastore-pubsub', function () {
115116

116117
it('should subscribe the topic, but receive error as no entry is stored locally', function (done) {
117118
const dsPubsubA = new DatastorePubsub(pubsubA, datastoreA, peerIdA, smoothValidator)
119+
const subsTopic = keyToTopic(`/${keyRef}`)
118120

119121
pubsubA.ls((err, res) => {
120122
expect(err).to.not.exist()
121123
expect(res).to.exist()
122-
expect(res).to.not.include(`/${keyRef}`) // not subscribed key reference yet
124+
expect(res).to.not.include(subsTopic) // not subscribed key reference yet
123125

124126
dsPubsubA.get(key, (err) => {
125127
expect(err).to.exist() // not locally stored record
@@ -128,7 +130,7 @@ describe('datastore-pubsub', function () {
128130
pubsubA.ls((err, res) => {
129131
expect(err).to.not.exist()
130132
expect(res).to.exist()
131-
expect(res).to.include(`/${keyRef}`) // subscribed key reference
133+
expect(res).to.include(subsTopic) // subscribed key reference
132134
done()
133135
})
134136
})
@@ -138,11 +140,12 @@ describe('datastore-pubsub', function () {
138140
it('should put correctly to daemon A and daemon B should not receive it without subscribing', function (done) {
139141
const dsPubsubA = new DatastorePubsub(pubsubA, datastoreA, peerIdA, smoothValidator)
140142
const dsPubsubB = new DatastorePubsub(pubsubB, datastoreB, peerIdB, smoothValidator)
143+
const subsTopic = keyToTopic(`/${keyRef}`)
141144

142145
pubsubB.ls((err, res) => {
143146
expect(err).to.not.exist()
144147
expect(res).to.exist()
145-
expect(res).to.not.include(`/${keyRef}`) // not subscribed
148+
expect(res).to.not.include(subsTopic) // not subscribed
146149

147150
dsPubsubA.put(key, serializedRecord, (err) => {
148151
expect(err).to.not.exist()
@@ -169,7 +172,7 @@ describe('datastore-pubsub', function () {
169172
}
170173
const dsPubsubA = new DatastorePubsub(pubsubA, datastoreA, peerIdA, smoothValidator)
171174
const dsPubsubB = new DatastorePubsub(pubsubB, datastoreB, peerIdB, customValidator)
172-
const topic = `/${keyRef}`
175+
const subsTopic = keyToTopic(`/${keyRef}`)
173176
let receivedMessage = false
174177

175178
function messageHandler () {
@@ -181,9 +184,9 @@ describe('datastore-pubsub', function () {
181184
expect(res).to.not.exist() // no value available, but subscribed now
182185

183186
series([
184-
(cb) => waitForPeerToSubscribe(topic, ipfsdBId, ipfsdA, cb),
187+
(cb) => waitForPeerToSubscribe(subsTopic, ipfsdBId, ipfsdA, cb),
185188
// subscribe in order to understand when the message arrive to the node
186-
(cb) => pubsubB.subscribe(topic, messageHandler, cb),
189+
(cb) => pubsubB.subscribe(subsTopic, messageHandler, cb),
187190
(cb) => dsPubsubA.put(key, serializedRecord, cb),
188191
// wait until message arrives
189192
(cb) => waitFor(() => receivedMessage === true, cb),
@@ -200,7 +203,7 @@ describe('datastore-pubsub', function () {
200203
it('should put correctly to daemon A and daemon B should receive it as it tried to get it first and subscribed it', function (done) {
201204
const dsPubsubA = new DatastorePubsub(pubsubA, datastoreA, peerIdA, smoothValidator)
202205
const dsPubsubB = new DatastorePubsub(pubsubB, datastoreB, peerIdB, smoothValidator)
203-
const topic = `/${keyRef}`
206+
const subsTopic = keyToTopic(`/${keyRef}`)
204207
let receivedMessage = false
205208

206209
function messageHandler () {
@@ -210,16 +213,16 @@ describe('datastore-pubsub', function () {
210213
pubsubB.ls((err, res) => {
211214
expect(err).to.not.exist()
212215
expect(res).to.exist()
213-
expect(res).to.not.include(topic) // not subscribed
216+
expect(res).to.not.include(subsTopic) // not subscribed
214217

215218
dsPubsubB.get(key, (err, res) => {
216219
expect(err).to.exist()
217220
expect(res).to.not.exist() // not value available, but subscribed now
218221

219222
series([
220-
(cb) => waitForPeerToSubscribe(topic, ipfsdBId, ipfsdA, cb),
223+
(cb) => waitForPeerToSubscribe(subsTopic, ipfsdBId, ipfsdA, cb),
221224
// subscribe in order to understand when the message arrive to the node
222-
(cb) => pubsubB.subscribe(topic, messageHandler, cb),
225+
(cb) => pubsubB.subscribe(subsTopic, messageHandler, cb),
223226
(cb) => dsPubsubA.put(key, serializedRecord, cb),
224227
// wait until message arrives
225228
(cb) => waitFor(() => receivedMessage === true, cb),
@@ -300,7 +303,7 @@ describe('datastore-pubsub', function () {
300303
}
301304
const dsPubsubA = new DatastorePubsub(pubsubA, datastoreA, peerIdA, smoothValidator)
302305
const dsPubsubB = new DatastorePubsub(pubsubB, datastoreB, peerIdB, customValidator)
303-
const topic = `/${keyRef}`
306+
const subsTopic = keyToTopic(`/${keyRef}`)
304307
let receivedMessage = false
305308

306309
function messageHandler () {
@@ -312,9 +315,9 @@ describe('datastore-pubsub', function () {
312315
expect(res).to.not.exist() // not value available, but subscribed now
313316

314317
series([
315-
(cb) => waitForPeerToSubscribe(topic, ipfsdBId, ipfsdA, cb),
318+
(cb) => waitForPeerToSubscribe(subsTopic, ipfsdBId, ipfsdA, cb),
316319
// subscribe in order to understand when the message arrive to the node
317-
(cb) => pubsubB.subscribe(topic, messageHandler, cb),
320+
(cb) => pubsubB.subscribe(subsTopic, messageHandler, cb),
318321
(cb) => dsPubsubA.put(key, serializedRecord, cb),
319322
// wait until message arrives
320323
(cb) => waitFor(() => receivedMessage === true, cb),
@@ -345,7 +348,7 @@ describe('datastore-pubsub', function () {
345348

346349
const dsPubsubA = new DatastorePubsub(pubsubA, datastoreA, peerIdA, smoothValidator)
347350
const dsPubsubB = new DatastorePubsub(pubsubB, datastoreB, peerIdB, customValidator)
348-
const topic = `/${keyRef}`
351+
const subsTopic = keyToTopic(`/${keyRef}`)
349352
let receivedMessage = false
350353

351354
function messageHandler () {
@@ -357,9 +360,9 @@ describe('datastore-pubsub', function () {
357360
expect(res).to.not.exist() // not value available, but subscribed now
358361

359362
series([
360-
(cb) => waitForPeerToSubscribe(topic, ipfsdBId, ipfsdA, cb),
363+
(cb) => waitForPeerToSubscribe(subsTopic, ipfsdBId, ipfsdA, cb),
361364
// subscribe in order to understand when the message arrive to the node
362-
(cb) => pubsubB.subscribe(topic, messageHandler, cb),
365+
(cb) => pubsubB.subscribe(subsTopic, messageHandler, cb),
363366
(cb) => dsPubsubA.put(key, serializedRecord, cb),
364367
// wait until message arrives
365368
(cb) => waitFor(() => receivedMessage === true, cb),
@@ -396,7 +399,7 @@ describe('datastore-pubsub', function () {
396399

397400
const dsPubsubA = new DatastorePubsub(pubsubA, datastoreA, peerIdA, smoothValidator)
398401
const dsPubsubB = new DatastorePubsub(pubsubB, datastoreB, peerIdB, customValidator)
399-
const topic = `/${keyRef}`
402+
const subsTopic = keyToTopic(`/${keyRef}`)
400403
let receivedMessage = false
401404

402405
function messageHandler () {
@@ -408,9 +411,9 @@ describe('datastore-pubsub', function () {
408411
expect(res).to.not.exist() // not value available, but it is subscribed now
409412

410413
series([
411-
(cb) => waitForPeerToSubscribe(topic, ipfsdBId, ipfsdA, cb),
414+
(cb) => waitForPeerToSubscribe(subsTopic, ipfsdBId, ipfsdA, cb),
412415
// subscribe in order to understand when the message arrive to the node
413-
(cb) => pubsubB.subscribe(topic, messageHandler, cb),
416+
(cb) => pubsubB.subscribe(subsTopic, messageHandler, cb),
414417
(cb) => dsPubsubA.put(key, serializedRecord, cb),
415418
// wait until message arrives
416419
(cb) => waitFor(() => receivedMessage === true, cb),
@@ -431,14 +434,14 @@ describe('datastore-pubsub', function () {
431434
})
432435
})
433436

434-
it('should subscribe the topic and after a message being received, discarde it using the subscriptionKeyFn', function (done) {
437+
it('should subscribe the topic and after a message being received, discard it using the subscriptionKeyFn', function (done) {
435438
const subscriptionKeyFn = (topic, callback) => {
436-
expect(topic).to.equal(key.toString())
439+
expect(topic).to.equal(`/${keyRef}`)
437440
callback(new Error('DISCARD MESSAGE'))
438441
}
439442
const dsPubsubA = new DatastorePubsub(pubsubA, datastoreA, peerIdA, smoothValidator)
440443
const dsPubsubB = new DatastorePubsub(pubsubB, datastoreB, peerIdB, smoothValidator, subscriptionKeyFn)
441-
const topic = `/${keyRef}`
444+
const subsTopic = keyToTopic(`/${keyRef}`)
442445
let receivedMessage = false
443446

444447
function messageHandler () {
@@ -448,16 +451,16 @@ describe('datastore-pubsub', function () {
448451
pubsubB.ls((err, res) => {
449452
expect(err).to.not.exist()
450453
expect(res).to.exist()
451-
expect(res).to.not.include(topic) // not subscribed
454+
expect(res).to.not.include(subsTopic) // not subscribed
452455

453456
dsPubsubB.get(key, (err, res) => {
454457
expect(err).to.exist()
455458
expect(res).to.not.exist() // not value available, but subscribed now
456459

457460
series([
458-
(cb) => waitForPeerToSubscribe(topic, ipfsdBId, ipfsdA, cb),
461+
(cb) => waitForPeerToSubscribe(subsTopic, ipfsdBId, ipfsdA, cb),
459462
// subscribe in order to understand when the message arrive to the node
460-
(cb) => pubsubB.subscribe(topic, messageHandler, cb),
463+
(cb) => pubsubB.subscribe(subsTopic, messageHandler, cb),
461464
(cb) => dsPubsubA.put(key, serializedRecord, cb),
462465
// wait until message arrives
463466
(cb) => waitFor(() => receivedMessage === true, cb),
@@ -478,7 +481,7 @@ describe('datastore-pubsub', function () {
478481
}
479482
const dsPubsubA = new DatastorePubsub(pubsubA, datastoreA, peerIdA, smoothValidator)
480483
const dsPubsubB = new DatastorePubsub(pubsubB, datastoreB, peerIdB, smoothValidator, subscriptionKeyFn)
481-
const topic = `/${keyRef}`
484+
const subsTopic = keyToTopic(`/${keyRef}`)
482485
const keyNew = Buffer.from(`${key.toString()}new`)
483486
let receivedMessage = false
484487

@@ -489,16 +492,16 @@ describe('datastore-pubsub', function () {
489492
pubsubB.ls((err, res) => {
490493
expect(err).to.not.exist()
491494
expect(res).to.exist()
492-
expect(res).to.not.include(topic) // not subscribed
495+
expect(res).to.not.include(subsTopic) // not subscribed
493496

494497
dsPubsubB.get(key, (err, res) => {
495498
expect(err).to.exist()
496499
expect(res).to.not.exist() // not value available, but subscribed now
497500

498501
series([
499-
(cb) => waitForPeerToSubscribe(topic, ipfsdBId, ipfsdA, cb),
502+
(cb) => waitForPeerToSubscribe(subsTopic, ipfsdBId, ipfsdA, cb),
500503
// subscribe in order to understand when the message arrive to the node
501-
(cb) => pubsubB.subscribe(topic, messageHandler, cb),
504+
(cb) => pubsubB.subscribe(subsTopic, messageHandler, cb),
502505
(cb) => dsPubsubA.put(key, serializedRecord, cb),
503506
// wait until message arrives
504507
(cb) => waitFor(() => receivedMessage === true, cb),

0 commit comments

Comments
 (0)