@@ -10,95 +10,97 @@ const nest = (map, key, cid) => {
10
10
map . set ( key . shift ( ) , cid )
11
11
}
12
12
13
- const mkcid = c => c instanceof CID ? c : new CID ( c )
13
+ const mkcid = c => c . toBaseEncodedString ? c : new CID ( c )
14
14
15
15
class ComplexIPLDGraph {
16
- constructor ( store , cbor ) {
16
+ constructor ( store , root , cbor ) {
17
17
this . shardPaths = new Map ( )
18
18
if ( ! cbor ) {
19
19
cbor = fancyCBOR
20
20
}
21
21
this . cbor = cbor ( ( ...args ) => store . get ( ...args ) )
22
22
this . store = store
23
+ this . root = root
23
24
this . _clear ( )
25
+ this . _bulk = store . bulk ( )
26
+ this . _shardPaths = new Map ( )
24
27
}
25
28
_clear ( ) {
29
+ this . _getCache = new Map ( )
26
30
this . _pending = new Map ( )
27
31
this . _patches = new Map ( )
28
- this . _bulk = null
29
32
}
30
33
shardPath ( path , handler ) {
31
34
path = path . split ( '/' ) . filter ( x => x )
32
35
if ( path [ path . length - 1 ] !== '*' ) {
33
36
throw new Error ( 'All shard paths must end in "*".' )
34
37
}
35
- this . shardPaths . set ( path , handler )
38
+ nest ( this . _shardPaths , path , handler )
36
39
}
37
- async _realKey ( path ) {
40
+ _realKey ( path ) {
38
41
path = path . split ( '/' ) . filter ( x => x )
39
42
let _realpath = [ ]
40
- let _shardKeys = new Set ( this . shardPaths . keys ( ) )
41
-
42
- let i = 0
43
+ let maps = [ this . _shardPaths ]
43
44
while ( path . length ) {
44
45
let key = path . shift ( )
46
+ let nextMaps = [ ]
45
47
let changed = false
46
- for ( let _path of Array . from ( _shardKeys ) ) {
47
- let _key = _path [ i ]
48
- if ( ! _key ) continue
49
- if ( _key === '*' ) {
50
- _realpath . push ( await this . shardPaths . get ( _path ) ( key ) )
51
- changed = true
52
- break
53
- } else if ( _key . startsWith ( ':' ) ) {
54
- continue
55
- } else if ( _key === key ) {
56
- continue
57
- } else {
58
- _shardKeys . delete ( _path )
48
+ for ( let map of maps ) {
49
+ for ( let [ _key , handler ] of map . entries ( ) ) {
50
+ if ( _key === key || _key . startsWith ( ':' ) ) {
51
+ nextMaps . push ( handler )
52
+ }
53
+ if ( _key === '*' && ! changed ) {
54
+ _realpath . push ( handler ( key ) )
55
+ changed = true
56
+ }
59
57
}
58
+ maps = nextMaps
60
59
}
61
60
if ( ! changed ) _realpath . push ( key )
62
- i ++
63
61
}
64
- // handlers can return '/' in keys
65
- _realpath = _realpath . join ( '/' ) . split ( '/' )
66
- return _realpath
62
+ return _realpath . join ( '/' ) . split ( '/' )
67
63
}
68
- async _kick ( ) {
69
- if ( ! this . _bulk ) this . _bulk = await this . store . bulk ( )
70
- if ( ! this . _draining ) {
71
- this . _draining = ( async ( ) => {
72
- for ( let [ path , block ] of this . _pending . entries ( ) ) {
73
- path = await this . _realKey ( path )
74
- this . _bulk . put ( block . cid , block . data )
75
- nest ( this . _patches , path , block . cid )
76
- this . _draining = null
64
+ _prime ( path ) {
65
+ /* pre-fetch intermediate node's we'll need to build the graph */
66
+ path = Array . from ( path )
67
+ let run = ( parent ) => {
68
+ if ( path . length ) {
69
+ let key = path . shift ( )
70
+ if ( parent [ key ] && parent [ key ] [ '/' ] ) {
71
+ this . get ( mkcid ( parent [ key ] [ '/' ] ) ) . then ( run )
77
72
}
78
- } ) ( )
73
+ }
79
74
}
80
- return this . _draining
75
+ this . get ( this . root ) . then ( run )
81
76
}
82
77
_queue ( path , block ) {
83
78
this . _pending . set ( path , block )
84
- this . _kick ( )
79
+
80
+ path = this . _realKey ( path )
81
+ this . _prime ( path )
82
+ nest ( this . _patches , path , block . cid )
83
+ this . _draining = null
85
84
}
86
- add ( path , block ) {
85
+ async add ( path , block ) {
86
+ if ( this . _spent ) {
87
+ throw new Error ( 'This graph instance has already been flushed.' )
88
+ }
87
89
this . _queue ( path , block )
90
+ return this . _bulk . put ( block . cid , block . data )
88
91
}
89
- async flush ( root , clobber = true ) {
90
- if ( ! root ) {
91
- root = this . root
92
+ async flush ( ) {
93
+ if ( this . _spent ) {
94
+ throw new Error ( 'This graph instance has already been flushed.' )
92
95
}
96
+ let root = this . root
93
97
if ( ! root ) throw new Error ( 'No root node.' )
94
98
root = mkcid ( root )
95
- await this . _kick ( )
96
- await this . _kick ( )
97
99
98
100
let mkcbor = async obj => {
99
101
let cid
100
102
for await ( let block of this . cbor . serialize ( obj ) ) {
101
- this . _bulk . put ( block . cid , block . data )
103
+ await this . _bulk . put ( block . cid , block . data )
102
104
cid = block . cid
103
105
}
104
106
return cid
@@ -107,24 +109,23 @@ class ComplexIPLDGraph {
107
109
return { '/' : cid . toBaseEncodedString ( ) }
108
110
}
109
111
112
+ this . nodesWalked = 0
113
+
110
114
let _iter = async ( map , node ) => {
115
+ this . nodesWalked ++
111
116
for ( let [ key , value ] of map . entries ( ) ) {
112
117
if ( value instanceof Map ) {
113
118
let _node
114
119
let cid
115
- if ( node [ key ] ) {
120
+ if ( node [ key ] && node [ key ] [ '/' ] ) {
116
121
cid = mkcid ( node [ key ] [ '/' ] )
117
122
_node = await this . get ( cid )
118
123
} else {
119
124
_node = { }
120
125
}
121
126
node [ key ] = toLink ( await _iter ( value , _node ) )
122
- if ( clobber && cid &&
123
- node [ key ] [ '/' ] !== cid . toBaseEncodedString ( ) ) {
124
- this . _bulk . del ( cid )
125
- }
126
127
} else {
127
- if ( ! ( value instanceof CID ) ) throw new Error ( 'Value not CID.' )
128
+ if ( ! ( value . toBaseEncodedString ) ) throw new Error ( 'Value not CID.' )
128
129
node [ key ] = toLink ( value )
129
130
}
130
131
}
@@ -135,23 +136,20 @@ class ComplexIPLDGraph {
135
136
let cid = await _iter ( this . _patches , await this . get ( root ) )
136
137
this . _graphBuildTime = Date . now ( ) - start
137
138
138
- if ( clobber &&
139
- root . toBaseEncodedString ( ) !== cid . toBaseEncodedString ( ) ) {
140
- this . _bulk . del ( root )
141
- }
142
-
143
- start = Date . now ( )
144
139
await this . _bulk . flush ( )
145
- this . _flushTime = Date . now ( ) - start
146
140
147
141
this . _clear ( )
148
-
142
+ this . _spent = true
149
143
return cid
150
144
}
151
145
152
146
async get ( cid ) {
153
- let buffer = await this . store . get ( cid )
154
- return this . cbor . deserialize ( buffer )
147
+ if ( cid . toBaseEncodedString ) cid = cid . toBaseEncodedString ( )
148
+ if ( ! this . _getCache . has ( cid ) ) {
149
+ let p = this . store . get ( cid ) . then ( b => this . cbor . deserialize ( b ) )
150
+ this . _getCache . set ( cid , p )
151
+ }
152
+ return this . _getCache . get ( cid )
155
153
}
156
154
157
155
async resolve ( path , root ) {
0 commit comments