-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathindex.js
179 lines (166 loc) · 4.57 KB
/
index.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
const fancyCBOR = require('./fancy-cbor')
const CID = require('cids')
const nest = (map, key, cid) => {
while (key.length > 1) {
let _key = key.shift()
if (!map.has(_key)) map.set(_key, new Map())
map = map.get(_key)
}
map.set(key.shift(), cid)
}
const mkcid = c => c.toBaseEncodedString ? c : new CID(c)
class ComplexIPLDGraph {
constructor (store, root, cbor) {
this.shardPaths = new Map()
if (!cbor) {
cbor = fancyCBOR
}
this.cbor = cbor((...args) => store.get(...args))
this.store = store
this.root = root
this._clear()
this._bulk = store.bulk()
this._shardPaths = new Map()
}
_clear () {
this._getCache = new Map()
this._pending = new Map()
this._patches = new Map()
}
shardPath (path, handler) {
path = path.split('/').filter(x => x)
if (path[path.length - 1] !== '*') {
throw new Error('All shard paths must end in "*".')
}
nest(this._shardPaths, path, handler)
}
_realKey (path) {
path = path.split('/').filter(x => x)
let _realpath = []
let maps = [this._shardPaths]
while (path.length) {
let key = path.shift()
let nextMaps = []
let changed = false
for (let map of maps) {
for (let [_key, handler] of map.entries()) {
if (_key === key || _key.startsWith(':')) {
nextMaps.push(handler)
}
if (_key === '*' && !changed) {
_realpath.push(handler(key))
changed = true
}
}
maps = nextMaps
}
if (!changed) _realpath.push(key)
}
return _realpath.join('/').split('/')
}
_prime (path) {
/* pre-fetch intermediate node's we'll need to build the graph */
path = Array.from(path)
let run = (parent) => {
if (path.length) {
let key = path.shift()
if (parent[key] && parent[key]['/']) {
this.get(mkcid(parent[key]['/'])).then(run)
}
}
}
this.get(this.root).then(run)
}
_queue (path, block) {
this._pending.set(path, block)
path = this._realKey(path)
this._prime(path)
nest(this._patches, path, block.cid)
this._draining = null
}
async add (path, block) {
if (this._spent) {
throw new Error('This graph instance has already been flushed.')
}
this._queue(path, block)
return this._bulk.put(block.cid, block.data)
}
async flush () {
if (this._spent) {
throw new Error('This graph instance has already been flushed.')
}
let root = this.root
if (!root) throw new Error('No root node.')
root = mkcid(root)
let mkcbor = async obj => {
let cid
for await (let block of this.cbor.serialize(obj)) {
await this._bulk.put(block.cid, block.data)
cid = block.cid
}
return cid
}
let toLink = cid => {
return {'/': cid.toBaseEncodedString()}
}
this.nodesWalked = 0
let _iter = async (map, node) => {
this.nodesWalked++
for (let [key, value] of map.entries()) {
if (value instanceof Map) {
let _node
let cid
if (node[key] && node[key]['/']) {
cid = mkcid(node[key]['/'])
_node = await this.get(cid)
} else {
_node = {}
}
node[key] = toLink(await _iter(value, _node))
} else {
if (!(value.toBaseEncodedString)) throw new Error('Value not CID.')
node[key] = toLink(value)
}
}
return mkcbor(node)
}
let start = Date.now()
let cid = await _iter(this._patches, await this.get(root))
this._graphBuildTime = Date.now() - start
await this._bulk.flush()
this._clear()
this._spent = true
return cid
}
get (cid) {
if (cid.toBaseEncodedString) cid = cid.toBaseEncodedString()
if (!this._getCache.has(cid)) {
let p = this.store.get(cid).then(b => this.cbor.deserialize(b))
this._getCache.set(cid, p)
}
return this._getCache.get(cid)
}
async resolve (path, root) {
path = await this._realKey(path)
let cid = mkcid(root || this.root)
while (path.length) {
cid = mkcid(cid)
if (cid.codec === 'dag-cbor') {
let node = await this.get(cid)
let key = path.shift()
if (!node[key]) throw new Error('NotFound')
cid = mkcid(node[key]['/'])
} else {
break
}
}
let value
if (cid.codec === 'dag-cbor') {
value = await this.get(cid)
} else {
value = await this.store.get(cid)
}
return {value, remaining: path.join('/')}
}
}
module.exports = (...args) => new ComplexIPLDGraph(...args)