This repository was archived by the owner on Feb 12, 2024. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 1.2k
/
Copy pathexport.js
132 lines (115 loc) · 3.57 KB
/
export.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
import { CID } from 'multiformats/cid'
import { createUnsafe } from 'multiformats/block'
import { base58btc } from 'multiformats/bases/base58'
import { CarWriter } from '@ipld/car/writer'
import { withTimeoutOption } from 'ipfs-core-utils/with-timeout-option'
import debug from 'debug'
import * as raw from 'multiformats/codecs/raw'
import * as json from 'multiformats/codecs/json'
const log = debug('ipfs:components:dag:import')
// blocks that we're OK with not inspecting for links
/** @type {number[]} */
const NO_LINKS_CODECS = [
raw.code, // raw
json.code // JSON
]
/**
* @typedef {import('../../types').Preload} Preload
* @typedef {import('ipfs-repo').IPFSRepo} IPFSRepo
* @typedef {import('@ipld/car/api').BlockWriter} BlockWriter
* @typedef {import('ipfs-core-types/src/utils').AbortOptions} AbortOptions
*/
/**
* @param {Object} config
* @param {IPFSRepo} config.repo
* @param {Preload} config.preload
* @param {import('ipfs-core-utils/multicodecs').Multicodecs} config.codecs
*/
export function createExport ({ repo, preload, codecs }) {
/**
* @type {import('ipfs-core-types/src/dag').API<{}>["export"]}
*/
async function * dagExport (root, options = {}) {
if (options.preload !== false) {
preload(root)
}
const cid = CID.asCID(root)
if (!cid) {
throw new Error(`Unexpected error converting CID type: ${root}`)
}
log(`Exporting ${cid} as car`)
const { writer, out } = await CarWriter.create([cid])
// we need to write with one async channel and send the CarWriter output
// with another to the caller, but if the write causes an error we capture
// that and make sure it gets propagated
/** @type {Error|null} */
let err = null
;(async () => {
try {
await traverseWrite(
repo,
{ signal: options.signal, timeout: options.timeout },
cid,
writer,
codecs)
} catch (/** @type {any} */ e) {
err = e
} finally {
writer.close()
}
})()
for await (const chunk of out) {
if (err) {
break
}
yield chunk
}
if (err) {
throw err
}
}
return withTimeoutOption(dagExport)
}
/**
* @param {IPFSRepo} repo
* @param {AbortOptions} options
* @param {CID} cid
* @param {BlockWriter} writer
* @param {import('ipfs-core-utils/multicodecs').Multicodecs} codecs
* @param {Set<string>} seen
* @returns {Promise<void>}
*/
async function traverseWrite (repo, options, cid, writer, codecs, seen = new Set()) {
const b58Cid = cid.toString(base58btc)
if (seen.has(b58Cid)) {
return
}
const block = await getBlock(repo, options, cid, codecs)
log(`Adding block ${cid} to car`)
await writer.put(block)
seen.add(b58Cid)
// recursive traversal of all links
for (const link of block.links) {
await traverseWrite(repo, options, link, writer, codecs, seen)
}
}
/**
* @param {IPFSRepo} repo
* @param {AbortOptions} options
* @param {CID} cid
* @param {import('ipfs-core-utils/multicodecs').Multicodecs} codecs
* @returns {Promise<{cid:CID, bytes:Uint8Array, links:CID[]}>}
*/
async function getBlock (repo, options, cid, codecs) {
const bytes = await repo.blocks.get(cid, options)
/** @type {CID[]} */
let links = []
const codec = await codecs.getCodec(cid.code)
if (codec) {
const block = createUnsafe({ bytes, cid, codec })
links = [...block.links()].map((l) => l[1])
} else if (!NO_LINKS_CODECS.includes(cid.code)) {
throw new Error(`Can't decode links in block with codec 0x${cid.code.toString(16)} to form complete DAG`)
}
return { cid, bytes, links }
}