Skip to content
This repository was archived by the owner on Mar 10, 2020. It is now read-only.

Commit 9302f01

Browse files
committed
fix: handle sub-sub shards properly
License: MIT Signed-off-by: achingbrain <[email protected]>
1 parent b2fbd5d commit 9302f01

File tree

9 files changed

+287
-186
lines changed

9 files changed

+287
-186
lines changed

src/core/utils/add-link.js

Lines changed: 143 additions & 90 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ const series = require('async/series')
1111
const log = require('debug')('ipfs:mfs:core:utils:add-link')
1212
const UnixFS = require('ipfs-unixfs')
1313
const Bucket = require('hamt-sharding/src/bucket')
14-
const loadNode = require('./load-node')
14+
const whilst = require('async/whilst')
1515

1616
const defaultOptions = {
1717
parent: undefined,
@@ -133,52 +133,26 @@ const addToDirectory = (context, options, callback) => {
133133

134134
const addToShardedDirectory = (context, options, callback) => {
135135
return waterfall([
136-
(cb) => recreateHamtLevel(options.parent.links, cb),
137-
(rootBucket, cb) => findPosition(options.name, rootBucket, (err, position) => cb(err, { rootBucket, position })),
138-
({ rootBucket, position }, cb) => {
139-
// the path to the root bucket
140-
let path = [{
141-
position: position.pos,
142-
bucket: position.bucket
143-
}]
144-
let currentBucket = position.bucket
145-
146-
while (currentBucket !== rootBucket) {
147-
path.push({
148-
bucket: currentBucket,
149-
position: currentBucket._posAtParent
150-
})
151-
152-
currentBucket = currentBucket._parent
153-
}
154-
155-
cb(null, {
156-
rootBucket,
157-
path
158-
})
136+
(cb) => generatePath(context, options.name, options.parent, cb),
137+
({ rootBucket, path }, cb) => {
138+
updateShard(context, path, {
139+
name: options.name,
140+
cid: options.cid,
141+
size: options.size
142+
}, options, (err, result = {}) => cb(err, { rootBucket, ...result }))
159143
},
160-
({ rootBucket, path }, cb) => updateShard(context, options.parent, rootBucket, path, {
161-
name: options.name,
162-
cid: options.cid,
163-
size: options.size
164-
}, options, (err, results = {}) => cb(err, { rootBucket, node: results.node })),
165144
({ rootBucket, node }, cb) => updateHamtDirectory(context, node.links, rootBucket, options, cb)
166145
], callback)
167146
}
168147

169-
const updateShard = (context, parent, rootBucket, positions, child, options, callback) => {
148+
const updateShard = (context, positions, child, options, callback) => {
170149
const {
171150
bucket,
172-
position
151+
prefix,
152+
node
173153
} = positions.pop()
174154

175-
const prefix = position
176-
.toString('16')
177-
.toUpperCase()
178-
.padStart(2, '0')
179-
.substring(0, 2)
180-
181-
const link = parent.links
155+
const link = node.links
182156
.find(link => link.name.substring(0, 2) === prefix && link.name !== `${prefix}${child.name}`)
183157

184158
return waterfall([
@@ -201,44 +175,21 @@ const updateShard = (context, parent, rootBucket, positions, child, options, cal
201175
done(err, { cid: shard.cid, node: result && result.value })
202176
})
203177
},
204-
({ cid, node }, cb) => updateShardParent(context, bucket, parent, link.name, node, cid, prefix, options, cb)
178+
(result, cb) => updateShardParent(context, bucket, node, link.name, result.node, result.cid, prefix, options, cb)
205179
], cb)
206180
}
207181

208182
if (link && link.name.length === 2) {
209-
log(`Descending into sub-shard`, child.name)
183+
log(`Descending into sub-shard ${link.name} for ${child.name}`)
210184

211185
return waterfall([
212-
(cb) => loadNode(context, link, cb),
213-
({ node }, cb) => {
214-
Promise.all(
215-
node.links.map(link => {
216-
if (link.name.length === 2) {
217-
// add a bucket for the subshard of this subshard
218-
const pos = parseInt(link.name, 16)
219-
220-
bucket._putObjectAt(pos, new Bucket({
221-
hashFn: DirSharded.hashFn
222-
}, bucket, pos))
223-
224-
return Promise.resolve()
225-
}
226-
227-
// add to the root and let changes cascade down
228-
return rootBucket.put(link.name.substring(2), true)
229-
})
230-
)
231-
.then(() => cb(null, { node }))
232-
.catch(error => cb(error))
233-
},
234-
({ node }, cb) => updateShard(context, node, bucket, positions, child, options, cb),
235-
({ cid, node }, cb) => updateShardParent(context, bucket, parent, link.name, node, cid, prefix, options, cb)
186+
(cb) => updateShard(context, positions, child, options, cb),
187+
(result, cb) => updateShardParent(context, bucket, node, link.name, result.node, result.cid, prefix, options, cb)
236188
], cb)
237189
}
238190

239191
log(`Adding or replacing file`, prefix + child.name)
240-
241-
updateShardParent(context, bucket, parent, prefix + child.name, child, child.cid, prefix + child.name, options, cb)
192+
updateShardParent(context, bucket, node, prefix + child.name, child, child.cid, prefix + child.name, options, cb)
242193
}
243194
], callback)
244195
}
@@ -279,20 +230,7 @@ const createShard = (context, contents, options, callback) => {
279230

280231
const updateShardParent = (context, bucket, parent, name, node, cid, prefix, options, callback) => {
281232
waterfall([
282-
(done) => {
283-
if (name) {
284-
if (name === prefix) {
285-
log(`Updating link ${name} in shard parent`)
286-
} else {
287-
log(`Removing link ${name} from shard parent, adding link ${prefix}`)
288-
}
289-
290-
return DAGNode.rmLink(parent, name, done)
291-
}
292-
293-
log(`Adding link ${prefix} to shard parent`)
294-
done(null, parent)
295-
},
233+
(done) => DAGNode.rmLink(parent, name, done),
296234
(parent, done) => DAGNode.addLink(parent, new DAGLink(prefix, node.size, cid), done),
297235
(parent, done) => updateHamtDirectory(context, parent.links, bucket, options, done)
298236
], callback)
@@ -324,12 +262,21 @@ const updateHamtDirectory = (context, links, bucket, options, callback) => {
324262
], callback)
325263
}
326264

327-
const recreateHamtLevel = (links, callback) => {
265+
const recreateHamtLevel = (links, rootBucket, parentBucket, positionAtParent, callback) => {
328266
// recreate this level of the HAMT
329267
const bucket = new Bucket({
330-
hashFn: DirSharded.hashFn
331-
})
268+
hashFn: DirSharded.hashFn,
269+
hash: parentBucket ? parentBucket._options.hash : undefined
270+
}, parentBucket, positionAtParent)
271+
272+
if (parentBucket) {
273+
parentBucket._putObjectAt(positionAtParent, bucket)
274+
}
275+
276+
addLinksToHamtBucket(links, bucket, rootBucket, callback)
277+
}
332278

279+
const addLinksToHamtBucket = (links, bucket, rootBucket, callback) => {
333280
Promise.all(
334281
links.map(link => {
335282
if (link.name.length === 2) {
@@ -342,19 +289,125 @@ const recreateHamtLevel = (links, callback) => {
342289
return Promise.resolve()
343290
}
344291

345-
return bucket.put(link.name.substring(2), true)
292+
return (rootBucket || bucket).put(link.name.substring(2), true)
346293
})
347294
)
348-
.then(() => callback(null, bucket))
349-
.catch(error => callback(error))
295+
.catch(err => {
296+
callback(err)
297+
callback = null
298+
})
299+
.then(() => callback && callback(null, bucket))
300+
}
301+
302+
const toPrefix = (position) => {
303+
return position
304+
.toString('16')
305+
.toUpperCase()
306+
.padStart(2, '0')
307+
.substring(0, 2)
350308
}
351309

352-
const findPosition = async (name, bucket, callback) => {
353-
const position = await bucket._findNewBucketAndPos(name)
310+
const generatePath = (context, fileName, rootNode, callback) => {
311+
// start at the root bucket and descend, loading nodes as we go
312+
recreateHamtLevel(rootNode.links, null, null, null, async (err, rootBucket) => {
313+
if (err) {
314+
return callback(err)
315+
}
316+
317+
const position = await rootBucket._findNewBucketAndPos(fileName)
318+
319+
// the path to the root bucket
320+
let path = [{
321+
bucket: position.bucket,
322+
prefix: toPrefix(position.pos)
323+
}]
324+
let currentBucket = position.bucket
325+
326+
while (currentBucket !== rootBucket) {
327+
path.push({
328+
bucket: currentBucket,
329+
prefix: toPrefix(currentBucket._posAtParent)
330+
})
331+
332+
currentBucket = currentBucket._parent
333+
}
334+
335+
path[path.length - 1].node = rootNode
336+
337+
let index = path.length
338+
339+
// load DAGNode for each path segment
340+
whilst(
341+
() => index > 0,
342+
(next) => {
343+
index--
344+
345+
const segment = path[index]
354346

355-
await bucket.put(name, true)
347+
// find prefix in links
348+
const link = segment.node.links
349+
.filter(link => link.name.substring(0, 2) === segment.prefix)
350+
.pop()
356351

357-
callback(null, position)
352+
if (!link) {
353+
// reached bottom of tree, file will be added to the current bucket
354+
log(`Link ${segment.prefix}${fileName} will be added`)
355+
return next(null, path)
356+
}
357+
358+
if (link.name === `${segment.prefix}${fileName}`) {
359+
log(`Link ${segment.prefix}${fileName} will be replaced`)
360+
// file already existed, file will be added to the current bucket
361+
return next(null, path)
362+
}
363+
364+
// found subshard
365+
log(`Found subshard ${segment.prefix}`)
366+
context.ipld.get(link.cid, (err, result) => {
367+
if (err) {
368+
return next(err)
369+
}
370+
371+
// subshard hasn't been loaded, descend to the next level of the HAMT
372+
if (!path[index - 1]) {
373+
log(`Loaded new subshard ${segment.prefix}`)
374+
const node = result.value
375+
376+
return recreateHamtLevel(node.links, rootBucket, segment.bucket, parseInt(segment.prefix, 16), async (err, bucket) => {
377+
if (err) {
378+
return next(err)
379+
}
380+
381+
const position = await rootBucket._findNewBucketAndPos(fileName)
382+
383+
index++
384+
path.unshift({
385+
bucket: position.bucket,
386+
prefix: toPrefix(position.pos),
387+
node: node
388+
})
389+
390+
next()
391+
})
392+
}
393+
394+
const nextSegment = path[index - 1]
395+
396+
// add intermediate links to bucket
397+
addLinksToHamtBucket(result.value.links, nextSegment.bucket, rootBucket, (error) => {
398+
nextSegment.node = result.value
399+
400+
next(error)
401+
})
402+
})
403+
},
404+
async (err, path) => {
405+
await rootBucket.put(fileName, true)
406+
407+
callback(err, { rootBucket, path })
408+
}
409+
)
410+
})
358411
}
359412

360413
module.exports = addLink

test/helpers/create-shard.js

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
'use strict'
2+
3+
const pull = require('pull-stream/pull')
4+
const values = require('pull-stream/sources/values')
5+
const collect = require('pull-stream/sinks/collect')
6+
const importer = require('ipfs-unixfs-importer')
7+
const CID = require('cids')
8+
9+
const createShard = (mfs, files, shardSplitThreshold = 10) => {
10+
return new Promise((resolve, reject) => {
11+
pull(
12+
values(files),
13+
importer(mfs.ipld, {
14+
shardSplitThreshold,
15+
reduceSingleLeafToSelf: false, // same as go-ipfs-mfs implementation, differs from `ipfs add`(!)
16+
leafType: 'raw' // same as go-ipfs-mfs implementation, differs from `ipfs add`(!)
17+
}),
18+
collect((err, files) => {
19+
if (err) {
20+
return reject(files)
21+
}
22+
23+
const dir = files[files.length - 1]
24+
25+
resolve(new CID(dir.multihash))
26+
})
27+
)
28+
})
29+
}
30+
31+
module.exports = createShard

test/helpers/create-sharded-directory.js

Lines changed: 8 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -3,42 +3,18 @@
33
const chai = require('chai')
44
chai.use(require('dirty-chai'))
55
const expect = chai.expect
6-
const crypto = require('crypto')
7-
const pull = require('pull-stream/pull')
8-
const values = require('pull-stream/sources/values')
9-
const collect = require('pull-stream/sinks/collect')
10-
const importer = require('ipfs-unixfs-importer')
11-
const CID = require('cids')
6+
const createShard = require('./create-shard')
127

138
module.exports = async (mfs, shardSplitThreshold = 10, files = shardSplitThreshold) => {
149
const dirPath = `/sharded-dir-${Math.random()}`
10+
const cid = await createShard(mfs, new Array(files).fill(0).map((_, index) => ({
11+
path: `${dirPath}/file-${index}`,
12+
content: Buffer.from([0, 1, 2, 3, 4, 5, index])
13+
})), shardSplitThreshold)
1514

16-
return new Promise((resolve, reject) => {
17-
pull(
18-
values(
19-
new Array(files).fill(0).map((_, index) => ({
20-
path: `${dirPath}/file-${index}`,
21-
content: crypto.randomBytes(5)
22-
}))
23-
),
24-
importer(mfs.ipld, {
25-
shardSplitThreshold,
26-
reduceSingleLeafToSelf: false, // same as go-ipfs-mfs implementation, differs from `ipfs add`(!)
27-
leafType: 'raw' // same as go-ipfs-mfs implementation, differs from `ipfs add`(!)
28-
}),
29-
collect(async (err, files) => {
30-
if (err) {
31-
return reject(files)
32-
}
15+
await mfs.cp(`/ipfs/${cid.toBaseEncodedString()}`, dirPath)
3316

34-
const dir = files[files.length - 1]
17+
expect((await mfs.stat(dirPath)).type).to.equal('hamt-sharded-directory')
3518

36-
await mfs.cp(`/ipfs/${new CID(dir.multihash).toBaseEncodedString()}`, dirPath)
37-
38-
expect((await mfs.stat(dirPath)).type).to.equal('hamt-sharded-directory')
39-
40-
resolve(dirPath)
41-
})
42-
)
43-
})
19+
return dirPath
4420
}

0 commit comments

Comments
 (0)