-
Notifications
You must be signed in to change notification settings - Fork 6
/
Copy pathindex.js
293 lines (233 loc) · 8.91 KB
/
index.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
'use strict'
const { Record } = require('libp2p-record')
const { Key } = require('interface-datastore')
const { encodeBase32 } = require('./utils')
const errcode = require('err-code')
const assert = require('assert')
const debug = require('debug')
const log = debug('datastore-pubsub:publisher')
log.error = debug('datastore-pubsub:publisher:error')
// DatastorePubsub is responsible for providing an api for pubsub to be used as a datastore with
// [TieredDatastore]{@link https://github.com/ipfs/js-datastore-core/blob/master/src/tiered.js}
class DatastorePubsub {
/**
* Creates an instance of DatastorePubsub.
* @param {*} pubsub - pubsub implementation.
* @param {*} datastore - datastore instance.
* @param {*} peerId - peer-id instance.
* @param {Object} validator - validator functions.
* @param {function(record, peerId, callback)} validator.validate - function to validate a record.
* @param {function(received, current, callback)} validator.select - function to select the newest between two records.
* @param {function(key, callback)} subscriptionKeyFn - function to manipulate the key topic received before processing it.
* @memberof DatastorePubsub
*/
constructor (pubsub, datastore, peerId, validator, subscriptionKeyFn) {
assert.equal(typeof validator, 'object', 'missing validator')
assert.equal(typeof validator.validate, 'function', 'missing validate function')
assert.equal(typeof validator.select, 'function', 'missing select function')
subscriptionKeyFn && assert.equal(typeof subscriptionKeyFn, 'function', 'invalid subscriptionKeyFn received')
this._pubsub = pubsub
this._datastore = datastore
this._peerId = peerId
this._validator = validator
this._handleSubscriptionKeyFn = subscriptionKeyFn
// Bind _handleSubscription function, which is called by pubsub.
this._handleSubscription = this._handleSubscription.bind(this)
}
/**
* Publishes a value through pubsub.
* @param {Buffer} key identifier of the value to be published.
* @param {Buffer} val value to be propagated.
* @param {function(Error)} callback
* @returns {void}
*/
put (key, val, callback) {
if (!Buffer.isBuffer(key)) {
const errMsg = `datastore key does not have a valid format`
log.error(errMsg)
return callback(errcode(new Error(errMsg), 'ERR_INVALID_DATASTORE_KEY'))
}
if (!Buffer.isBuffer(val)) {
const errMsg = `received value is not a buffer`
log.error(errMsg)
return callback(errcode(new Error(errMsg), 'ERR_INVALID_VALUE_RECEIVED'))
}
const stringifiedTopic = key.toString()
log(`publish value for topic ${stringifiedTopic}`)
// Publish record to pubsub
this._pubsub.publish(stringifiedTopic, val, callback)
}
/**
* Try to subscribe a topic with Pubsub and returns the local value if available.
* @param {Buffer} key identifier of the value to be subscribed.
* @param {function(Error, Buffer)} callback
* @returns {void}
*/
get (key, callback) {
if (!Buffer.isBuffer(key)) {
const errMsg = `datastore key does not have a valid format`
log.error(errMsg)
return callback(errcode(new Error(errMsg), 'ERR_INVALID_DATASTORE_KEY'))
}
const stringifiedTopic = key.toString()
// Subscribe
this._pubsub.subscribe(stringifiedTopic, this._handleSubscription, (err) => {
if (err) {
const errMsg = `cannot subscribe topic ${stringifiedTopic}`
log.error(errMsg)
return callback(errcode(new Error(errMsg), 'ERR_SUBSCRIBING_TOPIC'))
}
log(`subscribed values for key ${stringifiedTopic}`)
this._getLocal(key, callback)
})
}
/**
* Unsubscribe topic.
* @param {Buffer} key identifier of the value to unsubscribe.
* @returns {void}
*/
unsubscribe (key) {
const stringifiedTopic = key.toString()
this._pubsub.unsubscribe(stringifiedTopic, this._handleSubscription)
}
// Get record from local datastore
_getLocal (key, callback) {
// encode key - base32(/ipns/{cid})
const routingKey = new Key('/' + encodeBase32(key), false)
this._datastore.get(routingKey, (err, dsVal) => {
if (err) {
const errMsg = `local record requested was not found for ${routingKey.toString()}`
log.error(errMsg)
return callback(errcode(new Error(errMsg), 'ERR_NO_LOCAL_RECORD_FOUND'))
}
if (!Buffer.isBuffer(dsVal)) {
const errMsg = `found record that we couldn't convert to a value`
log.error(errMsg)
return callback(errcode(new Error(errMsg), 'ERR_INVALID_RECORD_RECEIVED'))
}
callback(null, dsVal)
})
}
// handles pubsub subscription messages
_handleSubscription (msg) {
const { data, from, topicIDs } = msg
const key = topicIDs[0]
log(`message received for ${key} topic`)
// Stop if the message is from the peer (it already stored it while publishing to pubsub)
if (from === this._peerId.toB58String()) {
log(`message discarded as it is from the same peer`)
return
}
if (this._handleSubscriptionKeyFn) {
this._handleSubscriptionKeyFn(key, (err, res) => {
if (err) {
log.error('message discarded by the subscriptionKeyFn')
return
}
this._storeIfSubscriptionIsBetter(res, data)
})
} else {
this._storeIfSubscriptionIsBetter(key, data)
}
}
// Store the received record if it is better than the current stored
_storeIfSubscriptionIsBetter (key, data) {
this._isBetter(key, data, (err, res) => {
if (!err && res) {
this._storeRecord(Buffer.from(key), data)
}
})
}
// Validate record according to the received validation function
_validateRecord (value, peerId, callback) {
this._validator.validate(value, peerId, callback)
}
// Select the best record according to the received select function.
_selectRecord (receivedRecord, currentRecord, callback) {
this._validator.select(receivedRecord, currentRecord, (err, res) => {
if (err) {
log.error(err)
return callback(err)
}
// If the selected was the first (0), it should be stored (true)
callback(null, res === 0)
})
}
// Verify if the record received through pubsub is valid and better than the one currently stored
_isBetter (key, val, callback) {
let receivedRecord
try {
receivedRecord = Record.deserialize(val)
} catch (err) {
log.error(err)
return callback(err)
}
// validate received record
this._validateRecord(receivedRecord.value, receivedRecord.author, (err, valid) => {
// If not valid, it is not better than the one currently available
if (err || !valid) {
const errMsg = 'record received through pubsub is not valid'
log.error(errMsg)
return callback(errcode(new Error(errMsg), 'ERR_NOT_VALID_RECORD'))
}
// Get Local record
const dsKey = new Key(key)
this._getLocal(dsKey.toBuffer(), (err, res) => {
// if the old one is invalid, the new one is *always* better
if (err) {
return callback(null, true)
}
// if the same record, do not need to store
if (res.equals(val)) {
return callback(null, false)
}
const currentRecord = Record.deserialize(res)
// verify if the received record should replace the current one
this._selectRecord(receivedRecord.value, currentRecord.value, callback)
})
})
}
// add record to datastore
_storeRecord (key, data) {
// encode key - base32(/ipns/{cid})
const routingKey = new Key('/' + encodeBase32(key), false)
this._datastore.put(routingKey, data, (err) => {
if (err) {
log.error(`record for ${key.toString()} could not be stored in the routing`)
return
}
log(`record for ${key.toString()} was stored in the datastore`)
})
}
open (callback) {
const errMsg = `open function was not implemented yet`
log.error(errMsg)
return callback(errcode(new Error(errMsg), 'ERR_NOT_IMPLEMENTED_YET'))
}
has (key, callback) {
const errMsg = `has function was not implemented yet`
log.error(errMsg)
return callback(errcode(new Error(errMsg), 'ERR_NOT_IMPLEMENTED_YET'))
}
delete (key, callback) {
const errMsg = `delete function was not implemented yet`
log.error(errMsg)
return callback(errcode(new Error(errMsg), 'ERR_NOT_IMPLEMENTED_YET'))
}
close (callback) {
const errMsg = `close function was not implemented yet`
log.error(errMsg)
return callback(errcode(new Error(errMsg), 'ERR_NOT_IMPLEMENTED_YET'))
}
batch () {
const errMsg = `batch function was not implemented yet`
log.error(errMsg)
throw errcode(new Error(errMsg), 'ERR_NOT_IMPLEMENTED_YET')
}
query () {
const errMsg = `query function was not implemented yet`
log.error(errMsg)
throw errcode(new Error(errMsg), 'ERR_NOT_IMPLEMENTED_YET')
}
}
exports = module.exports = DatastorePubsub