Skip to content

Commit dc10da5

Browse files
committed
feat: encode record-store keys in pubsub
1 parent a73d1a0 commit dc10da5

File tree

4 files changed

+50
-31
lines changed

4 files changed

+50
-31
lines changed

package.json

+1
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
"dependencies": {
3535
"assert": "^1.4.1",
3636
"base32.js": "~0.1.0",
37+
"base64url": "^3.0.0",
3738
"debug": "^3.1.0",
3839
"err-code": "^1.1.2",
3940
"interface-datastore": "~0.4.2",

src/index.js

+5-5
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
const { Record } = require('libp2p-record')
44
const { Key } = require('interface-datastore')
5-
const { encodeBase32 } = require('./utils')
5+
const { encodeBase32, keyToTopic, topicToKey } = require('./utils')
66

77
const errcode = require('err-code')
88
const assert = require('assert')
@@ -62,7 +62,7 @@ class DatastorePubsub {
6262
return callback(errcode(new Error(errMsg), 'ERR_INVALID_VALUE_RECEIVED'))
6363
}
6464

65-
const stringifiedTopic = key.toString()
65+
const stringifiedTopic = keyToTopic(key)
6666

6767
log(`publish value for topic ${stringifiedTopic}`)
6868

@@ -84,7 +84,7 @@ class DatastorePubsub {
8484
return callback(errcode(new Error(errMsg), 'ERR_INVALID_DATASTORE_KEY'))
8585
}
8686

87-
const stringifiedTopic = key.toString()
87+
const stringifiedTopic = keyToTopic(key)
8888

8989
this._pubsub.ls((err, res) => {
9090
if (err) {
@@ -117,7 +117,7 @@ class DatastorePubsub {
117117
* @returns {void}
118118
*/
119119
unsubscribe (key) {
120-
const stringifiedTopic = key.toString()
120+
const stringifiedTopic = keyToTopic(key)
121121

122122
this._pubsub.unsubscribe(stringifiedTopic, this._handleSubscription)
123123
}
@@ -149,7 +149,7 @@ class DatastorePubsub {
149149
// handles pubsub subscription messages
150150
_handleSubscription (msg) {
151151
const { data, from, topicIDs } = msg
152-
const key = topicIDs[0]
152+
const key = topicToKey(topicIDs[0])
153153

154154
log(`message received for ${key} topic`)
155155

src/utils.js

+15
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,23 @@
11
'use strict'
22

33
const base32 = require('base32.js')
4+
const base64url = require('base64url')
5+
6+
const namespace = '/record/'
47

58
module.exports.encodeBase32 = (buf) => {
69
const enc = new base32.Encoder()
710
return enc.write(buf).finalize()
811
}
12+
13+
// converts a binary record key to a pubsub topic key.
14+
module.exports.keyToTopic = (key) => {
15+
// Record-store keys are arbitrary binary. However, pubsub requires UTF-8 string topic IDs
16+
// Encodes to "/record/base64url(key)"
17+
return `${namespace}${base64url.encode(key)}`
18+
}
19+
20+
// converts a pubsub topic key to a binary record key.
21+
module.exports.topicToKey = (topic) => {
22+
return base64url.decode(topic.substring(namespace.length))
23+
}

test/index.spec.js

+29-26
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ const { Key } = require('interface-datastore')
1515
const { Record } = require('libp2p-record')
1616

1717
const DatastorePubsub = require('../src')
18+
const { keyToTopic } = require('../src/utils')
1819
const { connect, waitFor, waitForPeerToSubscribe, spawnDaemon, stopDaemon } = require('./utils')
1920

2021
// Always returning the expected values
@@ -115,19 +116,20 @@ describe('datastore-pubsub', function () {
115116

116117
it('should subscribe the topic, but receive error as no entry is stored locally', function (done) {
117118
const dsPubsubA = new DatastorePubsub(pubsubA, datastoreA, peerIdA, smoothValidator)
119+
const subsTopic = keyToTopic(`/${keyRef}`)
118120

119121
pubsubA.ls((err, res) => {
120122
expect(err).to.not.exist()
121123
expect(res).to.exist()
122-
expect(res).to.not.include(`/${keyRef}`) // not subscribed key reference yet
124+
expect(res).to.not.include(subsTopic) // not subscribed key reference yet
123125

124126
dsPubsubA.get(key, (err) => {
125127
expect(err).to.exist() // not locally stored record
126128

127129
pubsubA.ls((err, res) => {
128130
expect(err).to.not.exist()
129131
expect(res).to.exist()
130-
expect(res).to.include(`/${keyRef}`) // subscribed key reference
132+
expect(res).to.include(subsTopic) // subscribed key reference
131133
done()
132134
})
133135
})
@@ -137,11 +139,12 @@ describe('datastore-pubsub', function () {
137139
it('should put correctly to daemon A and daemon B should not receive it without subscribing', function (done) {
138140
const dsPubsubA = new DatastorePubsub(pubsubA, datastoreA, peerIdA, smoothValidator)
139141
const dsPubsubB = new DatastorePubsub(pubsubB, datastoreB, peerIdB, smoothValidator)
142+
const subsTopic = keyToTopic(`/${keyRef}`)
140143

141144
pubsubB.ls((err, res) => {
142145
expect(err).to.not.exist()
143146
expect(res).to.exist()
144-
expect(res).to.not.include(`/${keyRef}`) // not subscribed
147+
expect(res).to.not.include(subsTopic) // not subscribed
145148

146149
dsPubsubA.put(key, serializedRecord, (err) => {
147150
expect(err).to.not.exist()
@@ -157,7 +160,7 @@ describe('datastore-pubsub', function () {
157160
it('should put correctly to daemon A and daemon B should receive it as it tried to get it first and subscribed it', function (done) {
158161
const dsPubsubA = new DatastorePubsub(pubsubA, datastoreA, peerIdA, smoothValidator)
159162
const dsPubsubB = new DatastorePubsub(pubsubB, datastoreB, peerIdB, smoothValidator)
160-
const topic = `/${keyRef}`
163+
const subsTopic = keyToTopic(`/${keyRef}`)
161164
let receivedMessage = false
162165

163166
function messageHandler () {
@@ -167,16 +170,16 @@ describe('datastore-pubsub', function () {
167170
pubsubB.ls((err, res) => {
168171
expect(err).to.not.exist()
169172
expect(res).to.exist()
170-
expect(res).to.not.include(topic) // not subscribed
173+
expect(res).to.not.include(subsTopic) // not subscribed
171174

172175
dsPubsubB.get(key, (err, res) => {
173176
expect(err).to.exist()
174177
expect(res).to.not.exist() // not value available, but subscribed now
175178

176179
series([
177-
(cb) => waitForPeerToSubscribe(topic, ipfsdBId, ipfsdA, cb),
180+
(cb) => waitForPeerToSubscribe(subsTopic, ipfsdBId, ipfsdA, cb),
178181
// subscribe in order to understand when the message arrive to the node
179-
(cb) => pubsubB.subscribe(topic, messageHandler, cb),
182+
(cb) => pubsubB.subscribe(subsTopic, messageHandler, cb),
180183
(cb) => dsPubsubA.put(key, serializedRecord, cb),
181184
// wait until message arrives
182185
(cb) => waitFor(() => receivedMessage === true, cb),
@@ -257,7 +260,7 @@ describe('datastore-pubsub', function () {
257260
}
258261
const dsPubsubA = new DatastorePubsub(pubsubA, datastoreA, peerIdA, smoothValidator)
259262
const dsPubsubB = new DatastorePubsub(pubsubB, datastoreB, peerIdB, customValidator)
260-
const topic = `/${keyRef}`
263+
const subsTopic = keyToTopic(`/${keyRef}`)
261264
let receivedMessage = false
262265

263266
function messageHandler () {
@@ -269,9 +272,9 @@ describe('datastore-pubsub', function () {
269272
expect(res).to.not.exist() // not value available, but subscribed now
270273

271274
series([
272-
(cb) => waitForPeerToSubscribe(topic, ipfsdBId, ipfsdA, cb),
275+
(cb) => waitForPeerToSubscribe(subsTopic, ipfsdBId, ipfsdA, cb),
273276
// subscribe in order to understand when the message arrive to the node
274-
(cb) => pubsubB.subscribe(topic, messageHandler, cb),
277+
(cb) => pubsubB.subscribe(subsTopic, messageHandler, cb),
275278
(cb) => dsPubsubA.put(key, serializedRecord, cb),
276279
// wait until message arrives
277280
(cb) => waitFor(() => receivedMessage === true, cb),
@@ -302,7 +305,7 @@ describe('datastore-pubsub', function () {
302305

303306
const dsPubsubA = new DatastorePubsub(pubsubA, datastoreA, peerIdA, smoothValidator)
304307
const dsPubsubB = new DatastorePubsub(pubsubB, datastoreB, peerIdB, customValidator)
305-
const topic = `/${keyRef}`
308+
const subsTopic = keyToTopic(`/${keyRef}`)
306309
let receivedMessage = false
307310

308311
function messageHandler () {
@@ -314,9 +317,9 @@ describe('datastore-pubsub', function () {
314317
expect(res).to.not.exist() // not value available, but subscribed now
315318

316319
series([
317-
(cb) => waitForPeerToSubscribe(topic, ipfsdBId, ipfsdA, cb),
320+
(cb) => waitForPeerToSubscribe(subsTopic, ipfsdBId, ipfsdA, cb),
318321
// subscribe in order to understand when the message arrive to the node
319-
(cb) => pubsubB.subscribe(topic, messageHandler, cb),
322+
(cb) => pubsubB.subscribe(subsTopic, messageHandler, cb),
320323
(cb) => dsPubsubA.put(key, serializedRecord, cb),
321324
// wait until message arrives
322325
(cb) => waitFor(() => receivedMessage === true, cb),
@@ -353,7 +356,7 @@ describe('datastore-pubsub', function () {
353356

354357
const dsPubsubA = new DatastorePubsub(pubsubA, datastoreA, peerIdA, smoothValidator)
355358
const dsPubsubB = new DatastorePubsub(pubsubB, datastoreB, peerIdB, customValidator)
356-
const topic = `/${keyRef}`
359+
const subsTopic = keyToTopic(`/${keyRef}`)
357360
let receivedMessage = false
358361

359362
function messageHandler () {
@@ -365,9 +368,9 @@ describe('datastore-pubsub', function () {
365368
expect(res).to.not.exist() // not value available, but it is subscribed now
366369

367370
series([
368-
(cb) => waitForPeerToSubscribe(topic, ipfsdBId, ipfsdA, cb),
371+
(cb) => waitForPeerToSubscribe(subsTopic, ipfsdBId, ipfsdA, cb),
369372
// subscribe in order to understand when the message arrive to the node
370-
(cb) => pubsubB.subscribe(topic, messageHandler, cb),
373+
(cb) => pubsubB.subscribe(subsTopic, messageHandler, cb),
371374
(cb) => dsPubsubA.put(key, serializedRecord, cb),
372375
// wait until message arrives
373376
(cb) => waitFor(() => receivedMessage === true, cb),
@@ -388,14 +391,14 @@ describe('datastore-pubsub', function () {
388391
})
389392
})
390393

391-
it('should subscribe the topic and after a message being received, discarde it using the subscriptionKeyFn', function (done) {
394+
it('should subscribe the topic and after a message being received, discard it using the subscriptionKeyFn', function (done) {
392395
const subscriptionKeyFn = (topic, callback) => {
393-
expect(topic).to.equal(key.toString())
396+
expect(topic).to.equal(`/${keyRef}`)
394397
callback(new Error('DISCARD MESSAGE'))
395398
}
396399
const dsPubsubA = new DatastorePubsub(pubsubA, datastoreA, peerIdA, smoothValidator)
397400
const dsPubsubB = new DatastorePubsub(pubsubB, datastoreB, peerIdB, smoothValidator, subscriptionKeyFn)
398-
const topic = `/${keyRef}`
401+
const subsTopic = keyToTopic(`/${keyRef}`)
399402
let receivedMessage = false
400403

401404
function messageHandler () {
@@ -405,16 +408,16 @@ describe('datastore-pubsub', function () {
405408
pubsubB.ls((err, res) => {
406409
expect(err).to.not.exist()
407410
expect(res).to.exist()
408-
expect(res).to.not.include(topic) // not subscribed
411+
expect(res).to.not.include(subsTopic) // not subscribed
409412

410413
dsPubsubB.get(key, (err, res) => {
411414
expect(err).to.exist()
412415
expect(res).to.not.exist() // not value available, but subscribed now
413416

414417
series([
415-
(cb) => waitForPeerToSubscribe(topic, ipfsdBId, ipfsdA, cb),
418+
(cb) => waitForPeerToSubscribe(subsTopic, ipfsdBId, ipfsdA, cb),
416419
// subscribe in order to understand when the message arrive to the node
417-
(cb) => pubsubB.subscribe(topic, messageHandler, cb),
420+
(cb) => pubsubB.subscribe(subsTopic, messageHandler, cb),
418421
(cb) => dsPubsubA.put(key, serializedRecord, cb),
419422
// wait until message arrives
420423
(cb) => waitFor(() => receivedMessage === true, cb),
@@ -435,7 +438,7 @@ describe('datastore-pubsub', function () {
435438
}
436439
const dsPubsubA = new DatastorePubsub(pubsubA, datastoreA, peerIdA, smoothValidator)
437440
const dsPubsubB = new DatastorePubsub(pubsubB, datastoreB, peerIdB, smoothValidator, subscriptionKeyFn)
438-
const topic = `/${keyRef}`
441+
const subsTopic = keyToTopic(`/${keyRef}`)
439442
const keyNew = Buffer.from(`${key.toString()}new`)
440443
let receivedMessage = false
441444

@@ -446,16 +449,16 @@ describe('datastore-pubsub', function () {
446449
pubsubB.ls((err, res) => {
447450
expect(err).to.not.exist()
448451
expect(res).to.exist()
449-
expect(res).to.not.include(topic) // not subscribed
452+
expect(res).to.not.include(subsTopic) // not subscribed
450453

451454
dsPubsubB.get(key, (err, res) => {
452455
expect(err).to.exist()
453456
expect(res).to.not.exist() // not value available, but subscribed now
454457

455458
series([
456-
(cb) => waitForPeerToSubscribe(topic, ipfsdBId, ipfsdA, cb),
459+
(cb) => waitForPeerToSubscribe(subsTopic, ipfsdBId, ipfsdA, cb),
457460
// subscribe in order to understand when the message arrive to the node
458-
(cb) => pubsubB.subscribe(topic, messageHandler, cb),
461+
(cb) => pubsubB.subscribe(subsTopic, messageHandler, cb),
459462
(cb) => dsPubsubA.put(key, serializedRecord, cb),
460463
// wait until message arrives
461464
(cb) => waitFor(() => receivedMessage === true, cb),

0 commit comments

Comments
 (0)