@@ -10,6 +10,8 @@ import (
10
10
ft "github.com/ipfs/go-ipfs/unixfs"
11
11
ftpb "github.com/ipfs/go-ipfs/unixfs/pb"
12
12
13
+ cid "gx/ipfs/QmNp85zy9RLrQ5oQD4hPyS39ezrrXpcaa7R4Y9kxdWQLLQ/go-cid"
14
+ node "gx/ipfs/QmPN7cwmpcc4DWXb4KTB9dNAJgjuPY69h3npsMfhRrQL9c/go-ipld-format"
13
15
proto "gx/ipfs/QmZ4Qi3GaRbjcx28Sme5eMH7RQjGkt8wHxt2a65oLaeFEV/gogo-protobuf/proto"
14
16
)
15
17
@@ -30,6 +32,9 @@ type pbDagReader struct {
30
32
// NodeGetters for each of 'nodes' child links
31
33
promises []mdag.NodeGetter
32
34
35
+ // the cid of each child of the current node
36
+ links []* cid.Cid
37
+
33
38
// the index of the child link currently being read from
34
39
linkPosition int
35
40
@@ -47,30 +52,54 @@ var _ DagReader = (*pbDagReader)(nil)
47
52
48
53
func NewPBFileReader (ctx context.Context , n * mdag.ProtoNode , pb * ftpb.Data , serv mdag.DAGService ) * pbDagReader {
49
54
fctx , cancel := context .WithCancel (ctx )
50
- promises := mdag . GetDAG ( fctx , serv , n )
55
+ curLinks := getLinkCids ( n )
51
56
return & pbDagReader {
52
57
node : n ,
53
58
serv : serv ,
54
59
buf : NewBufDagReader (pb .GetData ()),
55
- promises : promises ,
60
+ promises : make ([]mdag.NodeGetter , len (curLinks )),
61
+ links : curLinks ,
56
62
ctx : fctx ,
57
63
cancel : cancel ,
58
64
pbdata : pb ,
59
65
}
60
66
}
61
67
68
+ const preloadSize = 10
69
+
70
+ func (dr * pbDagReader ) preloadNextNodes (ctx context.Context ) {
71
+ beg := dr .linkPosition
72
+ end := beg + preloadSize
73
+ if end >= len (dr .links ) {
74
+ end = len (dr .links )
75
+ }
76
+
77
+ for i , p := range mdag .GetNodes (ctx , dr .serv , dr .links [beg :end ]) {
78
+ dr .promises [beg + i ] = p
79
+ }
80
+ }
81
+
62
82
// precalcNextBuf follows the next link in line and loads it from the
63
83
// DAGService, setting the next buffer to read from
64
84
func (dr * pbDagReader ) precalcNextBuf (ctx context.Context ) error {
65
- dr .buf .Close () // Just to make sure
85
+ if dr .buf != nil {
86
+ dr .buf .Close () // Just to make sure
87
+ dr .buf = nil
88
+ }
89
+
66
90
if dr .linkPosition >= len (dr .promises ) {
67
91
return io .EOF
68
92
}
69
93
94
+ if dr .promises [dr .linkPosition ] == nil {
95
+ dr .preloadNextNodes (ctx )
96
+ }
97
+
70
98
nxt , err := dr .promises [dr .linkPosition ].Get (ctx )
71
99
if err != nil {
72
100
return err
73
101
}
102
+ dr .promises [dr .linkPosition ] = nil
74
103
dr .linkPosition ++
75
104
76
105
switch nxt := nxt .(type ) {
@@ -105,6 +134,15 @@ func (dr *pbDagReader) precalcNextBuf(ctx context.Context) error {
105
134
}
106
135
}
107
136
137
+ func getLinkCids (n node.Node ) []* cid.Cid {
138
+ links := n .Links ()
139
+ out := make ([]* cid.Cid , 0 , len (links ))
140
+ for _ , l := range links {
141
+ out = append (out , l .Cid )
142
+ }
143
+ return out
144
+ }
145
+
108
146
// Size return the total length of the data from the DAG structured file.
109
147
func (dr * pbDagReader ) Size () uint64 {
110
148
return dr .pbdata .GetFilesize ()
@@ -199,7 +237,9 @@ func (dr *pbDagReader) Seek(offset int64, whence int) (int64, error) {
199
237
left := offset
200
238
if int64 (len (pb .Data )) >= offset {
201
239
// Close current buf to close potential child dagreader
202
- dr .buf .Close ()
240
+ if dr .buf != nil {
241
+ dr .buf .Close ()
242
+ }
203
243
dr .buf = NewBufDagReader (pb .GetData ()[offset :])
204
244
205
245
// start reading links from the beginning
0 commit comments