Skip to content
This repository was archived by the owner on Mar 10, 2020. It is now read-only.

Commit 63940b4

Browse files
committed
perf: write files to repo outside of write lock
perf: read files from repo outside of read lock fix: standardise error messages with go License: MIT Signed-off-by: achingbrain <[email protected]>
1 parent 158bb28 commit 63940b4

22 files changed

+305
-205
lines changed

package.json

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@
4242
"detect-node": "^2.0.3",
4343
"detect-webworker": "^1.0.0",
4444
"dirty-chai": "^2.0.1",
45-
"ipfs": "~0.30.0",
45+
"ipfs": "~0.31.2",
4646
"pull-buffer-stream": "^1.0.0",
4747
"safe-buffer": "^5.1.1",
4848
"tmp": "~0.0.33"
@@ -56,7 +56,7 @@
5656
"filereader-stream": "^2.0.0",
5757
"interface-datastore": "~0.4.2",
5858
"ipfs-unixfs": "~0.1.15",
59-
"ipfs-unixfs-engine": "~0.31.3",
59+
"ipfs-unixfs-engine": "~0.32.1",
6060
"is-pull-stream": "~0.0.0",
6161
"is-stream": "^1.1.0",
6262
"joi": "^13.4.0",
@@ -65,6 +65,7 @@
6565
"once": "^1.4.0",
6666
"promisify-es6": "^1.0.3",
6767
"pull-cat": "^1.1.11",
68+
"pull-defer": "~0.2.2",
6869
"pull-paramap": "^1.2.2",
6970
"pull-pushable": "^2.2.0",
7071
"pull-stream": "^3.6.8",

src/cli/ls.js

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,13 @@ module.exports = {
2121
coerce: asBoolean,
2222
describe: 'Use long listing format.'
2323
},
24+
unsorted: {
25+
alias: 'U',
26+
type: 'boolean',
27+
default: false,
28+
coerce: asBoolean,
29+
describe: 'Do not sort; list entries in directory order.'
30+
},
2431
cidBase: {
2532
alias: 'cid-base',
2633
default: 'base58btc',
@@ -33,12 +40,14 @@ module.exports = {
3340
path,
3441
ipfs,
3542
long,
43+
unsorted,
3644
cidBase
3745
} = argv
3846

3947
argv.resolve(
4048
ipfs.files.ls(path || FILE_SEPARATOR, {
4149
long,
50+
unsorted,
4251
cidBase
4352
})
4453
.then(files => {

src/core/index.js

Lines changed: 33 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -5,28 +5,38 @@ const {
55
createLock
66
} = require('./utils')
77

8+
// These operations are read-locked at the function level and will execute simultaneously
89
const readOperations = {
910
ls: require('./ls'),
10-
read: require('./read'),
11-
readPullStream: require('./read-pull-stream'),
12-
readReadableStream: require('./read-readable-stream'),
1311
stat: require('./stat')
1412
}
1513

14+
// These operations are locked at the function level and will execute in series
1615
const writeOperations = {
1716
cp: require('./cp'),
1817
flush: require('./flush'),
1918
mkdir: require('./mkdir'),
2019
mv: require('./mv'),
21-
rm: require('./rm'),
22-
write: require('./write')
20+
rm: require('./rm')
21+
}
22+
23+
// These operations are asynchronous and manage their own locking
24+
const upwrappedOperations = {
25+
write: require('./write'),
26+
read: require('./read')
27+
}
28+
29+
// These operations are synchronous and manage their own locking
30+
const upwrappedSynchronousOperations = {
31+
readPullStream: require('./read-pull-stream'),
32+
readReadableStream: require('./read-readable-stream')
2333
}
2434

25-
const wrap = (ipfs, mfs, operations, lock) => {
35+
const wrap = ({
36+
ipfs, mfs, operations, lock
37+
}) => {
2638
Object.keys(operations).forEach(key => {
27-
if (operations.hasOwnProperty(key)) {
28-
mfs[key] = promisify(lock(operations[key](ipfs)))
29-
}
39+
mfs[key] = promisify(lock(operations[key](ipfs)))
3040
})
3141
}
3242

@@ -51,8 +61,20 @@ module.exports = (ipfs, options) => {
5161

5262
const mfs = {}
5363

54-
wrap(ipfs, mfs, readOperations, readLock)
55-
wrap(ipfs, mfs, writeOperations, writeLock)
64+
wrap({
65+
ipfs, mfs, operations: readOperations, lock: readLock
66+
})
67+
wrap({
68+
ipfs, mfs, operations: writeOperations, lock: writeLock
69+
})
70+
71+
Object.keys(upwrappedOperations).forEach(key => {
72+
mfs[key] = promisify(upwrappedOperations[key](ipfs))
73+
})
74+
75+
Object.keys(upwrappedSynchronousOperations).forEach(key => {
76+
mfs[key] = upwrappedSynchronousOperations[key](ipfs)
77+
})
5678

5779
return mfs
5880
}

src/core/ls.js

Lines changed: 14 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,8 @@ const {
1313

1414
const defaultOptions = {
1515
long: false,
16-
cidBase: 'base58btc'
16+
cidBase: 'base58btc',
17+
unsorted: false
1718
}
1819

1920
module.exports = (ipfs) => {
@@ -64,6 +65,17 @@ module.exports = (ipfs) => {
6465
}
6566
},
6667

68+
// https://github.com/ipfs/go-ipfs/issues/5181
69+
(files, cb) => {
70+
if (options.unsorted) {
71+
return cb(null, files)
72+
}
73+
74+
return cb(null, files.sort((a, b) => {
75+
return b.name.localeCompare(a.name)
76+
}))
77+
},
78+
6779
// https://github.com/ipfs/go-ipfs/issues/5026
6880
(files, cb) => cb(null, files.map(file => {
6981
if (FILE_TYPES.hasOwnProperty(file.type)) {
@@ -77,18 +89,7 @@ module.exports = (ipfs) => {
7789
}
7890

7991
return file
80-
})),
81-
82-
// https://github.com/ipfs/go-ipfs/issues/5181
83-
(files, cb) => {
84-
if (options.long) {
85-
return cb(null, files.sort((a, b) => {
86-
return b.name.localeCompare(a.name)
87-
}))
88-
}
89-
90-
cb(null, files)
91-
}
92+
}))
9293
], callback)
9394
}
9495
}

src/core/mkdir.js

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -50,8 +50,8 @@ module.exports = (ipfs) => {
5050
return cb(new Error('file already exists'))
5151
}
5252

53-
if (error.message.includes('did not exist')) {
54-
log(`${path} did not exist`)
53+
if (error.message.includes('does not exist')) {
54+
log(`${path} does not exist`)
5555
return cb()
5656
}
5757

src/core/read-pull-stream.js

Lines changed: 39 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,14 @@
22

33
const exporter = require('ipfs-unixfs-engine').exporter
44
const pull = require('pull-stream/pull')
5+
const once = require('pull-stream/sources/once')
6+
const asyncMap = require('pull-stream/throughs/async-map')
7+
const defer = require('pull-defer')
58
const collect = require('pull-stream/sinks/collect')
6-
const waterfall = require('async/waterfall')
79
const UnixFs = require('ipfs-unixfs')
810
const {
9-
traverseTo
11+
traverseTo,
12+
createLock
1013
} = require('./utils')
1114
const log = require('debug')('ipfs:mfs:read-pull-stream')
1215

@@ -16,39 +19,49 @@ const defaultOptions = {
1619
}
1720

1821
module.exports = (ipfs) => {
19-
return function mfsReadPullStream (path, options, callback) {
20-
if (typeof options === 'function') {
21-
callback = options
22-
options = {}
23-
}
24-
22+
return function mfsReadPullStream (path, options = {}) {
2523
options = Object.assign({}, defaultOptions, options)
2624

2725
log(`Reading ${path}`)
2826

29-
waterfall([
30-
(done) => traverseTo(ipfs, path, {
31-
parents: false
32-
}, done),
33-
(result, done) => {
27+
const deferred = defer.source()
28+
29+
pull(
30+
once(path),
31+
asyncMap((path, cb) => {
32+
createLock().readLock((next) => {
33+
traverseTo(ipfs, path, {
34+
parents: false
35+
}, next)
36+
})(cb)
37+
}),
38+
asyncMap((result, cb) => {
3439
const node = result.node
3540
const meta = UnixFs.unmarshal(node.data)
3641

3742
if (meta.type !== 'file') {
38-
return done(new Error(`${path} was not a file`))
43+
return cb(new Error(`${path} was not a file`))
3944
}
4045

41-
waterfall([
42-
(next) => pull(
43-
exporter(node.multihash, ipfs.dag, {
44-
offset: options.offset,
45-
length: options.length
46-
}),
47-
collect(next)
48-
),
49-
(files, next) => next(null, files[0].content)
50-
], done)
51-
}
52-
], callback)
46+
pull(
47+
exporter(node.multihash, ipfs.dag, {
48+
offset: options.offset,
49+
length: options.length
50+
}),
51+
collect((error, files) => {
52+
cb(error, error ? null : files[0].content)
53+
})
54+
)
55+
}),
56+
collect((error, streams) => {
57+
if (error) {
58+
return deferred.abort(error)
59+
}
60+
61+
deferred.resolve(streams[0])
62+
})
63+
)
64+
65+
return deferred
5366
}
5467
}

src/core/read-readable-stream.js

Lines changed: 2 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,10 @@
11
'use strict'
22

3-
const waterfall = require('async/waterfall')
43
const readPullStream = require('./read-pull-stream')
54
const toStream = require('pull-stream-to-stream')
65

76
module.exports = (ipfs) => {
8-
return function mfsReadReadableStream (path, options, callback) {
9-
if (typeof options === 'function') {
10-
callback = options
11-
options = {}
12-
}
13-
14-
waterfall([
15-
(cb) => readPullStream(ipfs)(path, options, cb),
16-
(stream, cb) => cb(null, toStream.source(stream))
17-
], callback)
7+
return function mfsReadReadableStream (path, options = {}) {
8+
return toStream.source(readPullStream(ipfs)(path, options))
189
}
1910
}

src/core/read.js

Lines changed: 10 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22

33
const pull = require('pull-stream/pull')
44
const collect = require('pull-stream/sinks/collect')
5-
const waterfall = require('async/waterfall')
65
const readPullStream = require('./read-pull-stream')
76

87
module.exports = (ipfs) => {
@@ -12,15 +11,15 @@ module.exports = (ipfs) => {
1211
options = {}
1312
}
1413

15-
waterfall([
16-
(cb) => readPullStream(ipfs)(path, options, cb),
17-
(stream, cb) => pull(
18-
stream,
19-
collect(cb)
20-
),
21-
(buffers, cb) => {
22-
cb(null, Buffer.concat(buffers))
23-
}
24-
], callback)
14+
pull(
15+
readPullStream(ipfs)(path, options),
16+
collect((error, buffers) => {
17+
if (error) {
18+
return callback(error)
19+
}
20+
21+
return callback(null, Buffer.concat(buffers))
22+
})
23+
)
2524
}
2625
}

src/core/utils/create-lock.js

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,13 @@
33
const mortice = require('mortice')
44
const log = require('debug')('ipfs:mfs:lock')
55

6+
let lock
7+
68
module.exports = (repoOwner) => {
9+
if (lock) {
10+
return lock
11+
}
12+
713
const mutex = mortice({
814
// ordinarily the main thread would store the read/write lock but
915
// if we are the thread that owns the repo, we can store the lock
@@ -17,7 +23,8 @@ module.exports = (repoOwner) => {
1723
mutex[`${type}Lock`](() => {
1824
return new Promise((resolve, reject) => {
1925
args.push((error, result) => {
20-
log(`${type} operation callback invoked${error ? ' with error' : ''}`)
26+
log(`${type} operation callback invoked${error ? ' with error: ' + error.message : ''}`)
27+
2128
if (error) {
2229
return reject(error)
2330
}
@@ -35,7 +42,7 @@ module.exports = (repoOwner) => {
3542
cb(null, result)
3643
})
3744
.catch((error) => {
38-
log(`Finished ${type} operation with error`)
45+
log(`Finished ${type} operation with error: ${error.message}`)
3946
if (callback) {
4047
return callback(error)
4148
}
@@ -46,7 +53,7 @@ module.exports = (repoOwner) => {
4653
})
4754
}
4855

49-
return {
56+
lock = {
5057
readLock: (func) => {
5158
return function () {
5259
const args = Array.from(arguments)
@@ -65,4 +72,6 @@ module.exports = (repoOwner) => {
6572
}
6673
}
6774
}
75+
76+
return lock
6877
}

src/core/utils/index.js

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ module.exports = {
1111
formatCid: require('./format-cid'),
1212
limitStreamBytes: require('./limit-stream-bytes'),
1313
loadNode: require('./load-node'),
14+
toPullSource: require('./to-pull-source'),
1415
toSourcesAndDestination: require('./to-sources-and-destination'),
1516
toSources: require('./to-sources'),
1617
traverseTo: require('./traverse-to'),

0 commit comments

Comments
 (0)