Skip to content

Commit 8936fec

Browse files
committed
Initial commit.
0 parents  commit 8936fec

7 files changed

+635
-0
lines changed

README.md

+8
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
# ipld-complex-graph
2+
3+
IPLD complex graph library. Allows you to define graph structures that
4+
support tree sharding and node sharding.
5+
6+
All write operations are performed in bulk against an `ipld-store` instance.
7+
8+
Currently only supports appending to the graph.

fancy-cbor.js

+216
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,216 @@
1+
const cbor = require('borc')
2+
const multihashes = require('multihashes')
3+
const crypto = require('crypto')
4+
const CID = require('cids')
5+
const Block = require('ipfs-block')
6+
const isCircular = require('is-circular')
7+
8+
const sha2 = b => crypto.createHash('sha256').update(b).digest()
9+
10+
const CID_CBOR_TAG = 42
11+
12+
/* start copy from exisisting dag-cbor */
13+
function tagCID (cid) {
14+
if (typeof cid === 'string') {
15+
cid = new CID(cid).buffer
16+
}
17+
18+
return new cbor.Tagged(CID_CBOR_TAG, Buffer.concat([
19+
Buffer.from('00', 'hex'), // thanks jdag
20+
cid
21+
]))
22+
}
23+
24+
function replaceCIDbyTAG (dagNode) {
25+
let circular
26+
try {
27+
circular = isCircular(dagNode)
28+
} catch (e) {
29+
circular = false
30+
}
31+
if (circular) {
32+
throw new Error('The object passed has circular references')
33+
}
34+
35+
function transform (obj) {
36+
if (!obj || Buffer.isBuffer(obj) || typeof obj === 'string') {
37+
return obj
38+
}
39+
40+
if (Array.isArray(obj)) {
41+
return obj.map(transform)
42+
}
43+
44+
const keys = Object.keys(obj)
45+
46+
// only `{'/': 'link'}` are valid
47+
if (keys.length === 1 && keys[0] === '/') {
48+
// Multiaddr encoding
49+
// if (typeof link === 'string' && isMultiaddr(link)) {
50+
// link = new Multiaddr(link).buffer
51+
// }
52+
53+
return tagCID(obj['/'])
54+
} else if (keys.length > 0) {
55+
// Recursive transform
56+
let out = {}
57+
keys.forEach((key) => {
58+
if (typeof obj[key] === 'object') {
59+
out[key] = transform(obj[key])
60+
} else {
61+
out[key] = obj[key]
62+
}
63+
})
64+
return out
65+
} else {
66+
return obj
67+
}
68+
}
69+
70+
return transform(dagNode)
71+
}
72+
/* end copy from existing dag-cbor */
73+
74+
const chunk = function * (obj, size = 500) {
75+
let i = 0
76+
let keys = Object.keys(obj)
77+
78+
let _slice = (start, end) => {
79+
let o = {}
80+
keys.slice(start, end).forEach(k => {
81+
o[k] = obj[k]
82+
})
83+
return o
84+
}
85+
86+
while (i < keys.length) {
87+
yield _slice(i, i + size)
88+
i = i + size
89+
}
90+
yield _slice(i)
91+
}
92+
const asBlock = (buffer, type) => {
93+
let hash = multihashes.encode(sha2(buffer), 'sha2-256')
94+
let cid = new CID(1, type, hash)
95+
return new Block(buffer, cid)
96+
}
97+
98+
class NotFound extends Error {
99+
get code () {
100+
return 404
101+
}
102+
}
103+
104+
class IPLD {
105+
constructor (get, maxsize = 1e+7 /* 10megs */) {
106+
this._get = get
107+
this._maxBlockSize = 1e+6 // 1meg
108+
this._maxSize = maxsize
109+
this._decoder = new cbor.Decoder({
110+
tags: {
111+
[CID_CBOR_TAG]: (val) => {
112+
val = val.slice(1)
113+
return {'/': val}
114+
}
115+
},
116+
size: maxsize
117+
})
118+
}
119+
get multicodec () {
120+
return 'dag-cbor'
121+
}
122+
_cid (buffer) {
123+
let hash = multihashes.encode(sha2(buffer), 'sha2-256')
124+
let cid = new CID(1, 'dag-cbor', hash)
125+
return cid.toBaseEncodedString()
126+
}
127+
async cids (buffer) {
128+
let self = this
129+
return (function * () {
130+
yield self._cid(buffer)
131+
let root = self._deserialize(buffer)
132+
if (root['._'] === 'dag-split') {
133+
let cids = root.chunks.map(b => b['/'])
134+
for (let cid of cids) {
135+
yield cid
136+
}
137+
}
138+
})()
139+
// return [iterable of cids]
140+
}
141+
async resolve (buffer, path) {
142+
if (!Array.isArray(path)) {
143+
path = path.split('/').filter(x => x)
144+
}
145+
let root = await this.deserialize(buffer)
146+
147+
while (path.length) {
148+
let prop = path.shift()
149+
root = root[prop]
150+
if (typeof root === 'undefined') {
151+
throw NotFound(`Cannot find link "${prop}".`)
152+
}
153+
if (typeof root === 'object' && root['/']) {
154+
let c = new CID(root['/'])
155+
if (c.codec !== 'dag-cbor') {
156+
return {value: c, remaining: path.join('/')}
157+
}
158+
let buff = await this._get(c.toBaseEncodedString())
159+
return this.resolve(buff, path)
160+
}
161+
}
162+
return {value: root, remaining: path.join('/')}
163+
}
164+
_deserialize (buffer) {
165+
return this._decoder.decodeFirst(buffer)
166+
}
167+
_serialize (dagNode) {
168+
let dagNodeTagged = replaceCIDbyTAG(dagNode)
169+
return cbor.encode(dagNodeTagged)
170+
}
171+
serialize (dagNode) {
172+
// TODO: handle large objects
173+
let buffer = this._serialize(dagNode)
174+
if (buffer.length > this._maxSize) {
175+
throw new Error('cbor node is too large.')
176+
}
177+
let maxBlockSize = this._maxBlockSize
178+
let _serialize = this._serialize
179+
if (buffer.length > maxBlockSize) {
180+
return (function * () {
181+
let node = {'._': 'dag-split'}
182+
node.chunks = []
183+
for (let _chunk of chunk(dagNode)) {
184+
let block = asBlock(_serialize(_chunk), 'dag-cbor')
185+
yield block
186+
node.chunks.push({'/': block.cid.toBaseEncodedString()})
187+
}
188+
yield asBlock(_serialize(node), 'dag-cbor')
189+
})()
190+
} else {
191+
return [asBlock(buffer, 'dag-cbor')]
192+
}
193+
// return iterable of Blocks
194+
}
195+
async deserialize (buffer) {
196+
let root = this._deserialize(buffer)
197+
if (root['._'] === 'dag-split') {
198+
let cids = root.chunks.map(b => {
199+
return (new CID(b['/'])).toBaseEncodedString()
200+
})
201+
let blocks = cids.map(c => this._get(c))
202+
blocks = await Promise.all(blocks)
203+
return Object.assign(...blocks.map(b => this._deserialize(b)))
204+
} else {
205+
return root
206+
}
207+
// return native type
208+
}
209+
async tree (buffer) {
210+
// TODO: replaces streaming parsing for cbor using iterator.
211+
return Object.keys(await this.deserialize(buffer))
212+
// returns iterable of keys
213+
}
214+
}
215+
216+
module.exports = (get) => new IPLD(get)

index.js

+166
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,166 @@
1+
const fancyCBOR = require('./fancy-cbor')
2+
const CID = require('cids')
3+
4+
const nest = (map, key, cid) => {
5+
while (key.length > 1) {
6+
let _key = key.shift()
7+
if (!map.has(_key)) map.set(_key, new Map())
8+
map = map.get(_key)
9+
}
10+
map.set(key.shift(), cid)
11+
}
12+
13+
const mkcid = c => c instanceof CID ? c : new CID(c)
14+
15+
class ComplexIPLDGraph {
16+
constructor (store, cbor) {
17+
this.shardPaths = new Map()
18+
if (!cbor) {
19+
cbor = fancyCBOR
20+
}
21+
this.cbor = cbor((...args) => store.get(...args))
22+
this.store = store
23+
this._pending = new Map()
24+
this._patches = new Map()
25+
}
26+
shardPath (path, handler) {
27+
path = path.split('/').filter(x => x)
28+
if (path[path.length - 1] !== '*') {
29+
throw new Error('All shard paths must end in "*".')
30+
}
31+
this.shardPaths.set(path, handler)
32+
}
33+
async _realKey (path) {
34+
path = path.split('/').filter(x => x)
35+
let _realpath = []
36+
let _shardKeys = new Set(this.shardPaths.keys())
37+
38+
let i = 0
39+
while (path.length) {
40+
let key = path.shift()
41+
let changed = false
42+
for (let _path of Array.from(_shardKeys)) {
43+
let _key = _path[i]
44+
if (!_key) continue
45+
if (_key === '*') {
46+
_realpath.push(await this.shardPaths.get(_path)(key))
47+
changed = true
48+
break
49+
} else if (_key.startsWith(':')) {
50+
continue
51+
} else if (_key === key) {
52+
continue
53+
} else {
54+
_shardKeys.delete(_path)
55+
}
56+
}
57+
if (!changed) _realpath.push(key)
58+
i++
59+
}
60+
// handlers can return '/' in keys
61+
_realpath = _realpath.join('/').split('/')
62+
return _realpath
63+
}
64+
async _kick () {
65+
if (!this._bulk) this._bulk = await this.store.bulk()
66+
if (!this._draining) {
67+
this._draining = (async () => {
68+
for (let [path, block] of this._pending.entries()) {
69+
path = await this._realKey(path)
70+
this._bulk.put(block.cid, block.data)
71+
nest(this._patches, path, block.cid)
72+
}
73+
})()
74+
}
75+
return this._draining
76+
}
77+
_queue (path, block) {
78+
this._pending.set(path, block)
79+
this._kick()
80+
}
81+
add (path, block) {
82+
this._queue(path, block)
83+
}
84+
async flush (root, clobber = true) {
85+
if (!root) {
86+
root = this.root
87+
}
88+
if (!root) throw new Error('No root node.')
89+
await this._kick()
90+
await this._kick()
91+
92+
let mkcbor = async obj => {
93+
let cid
94+
for await (let block of this.cbor.serialize(obj)) {
95+
this._bulk.put(block.cid, block.data)
96+
cid = block.cid
97+
}
98+
return cid
99+
}
100+
let toLink = cid => {
101+
return {'/': cid.toBaseEncodedString()}
102+
}
103+
104+
let _iter = async (map, node) => {
105+
for (let [key, value] of map.entries()) {
106+
if (value instanceof Map) {
107+
if (node[key]) {
108+
let cid = mkcid(node[key]['/'])
109+
let _node = this.get(cid)
110+
let _cid = await _iter(value, _node)
111+
node[key] = toLink(_cid)
112+
if (clobber &&
113+
_cid.toBaseEncodedString() !== cid.toBaseEncodedString()) {
114+
this._bulk.del(cid)
115+
}
116+
} else {
117+
node[key] = toLink(await _iter(value, {}))
118+
}
119+
} else {
120+
if (!(value instanceof CID)) throw new Error('Value not CID.')
121+
node[key] = toLink(value)
122+
}
123+
}
124+
return mkcbor(node)
125+
}
126+
let start = Date.now()
127+
let cid = await _iter(this._patches, await this.get(root))
128+
this._graphBuildTime = Date.now() - start
129+
130+
start = Date.now()
131+
await this._bulk.flush()
132+
this._flushTime = Date.now() - start
133+
134+
return cid
135+
}
136+
137+
async get (cid) {
138+
let buffer = await this.store.get(cid)
139+
return this.cbor.deserialize(buffer)
140+
}
141+
142+
async resolve (path, root) {
143+
path = await this._realKey(path)
144+
let cid = mkcid(root || this.root)
145+
while (path.length) {
146+
cid = mkcid(cid)
147+
if (cid.codec === 'dag-cbor') {
148+
let node = await this.get(cid)
149+
let key = path.shift()
150+
if (!node[key]) throw new Error('NotFound')
151+
cid = mkcid(node[key]['/'])
152+
} else {
153+
break
154+
}
155+
}
156+
let value
157+
if (cid.codec === 'dag-cbor') {
158+
value = await this.get(cid)
159+
} else {
160+
value = await this.store.get(cid)
161+
}
162+
return {value, remaining: path.join('/')}
163+
}
164+
}
165+
166+
module.exports = (...args) => new ComplexIPLDGraph(...args)

0 commit comments

Comments
 (0)