Skip to content

Commit 092df58

Browse files
committed
clear out memory after reads from the dagreader
License: MIT Signed-off-by: Jeromy <[email protected]>
1 parent 64ae934 commit 092df58

File tree

1 file changed

+56
-4
lines changed

1 file changed

+56
-4
lines changed

unixfs/io/pbdagreader.go

+56-4
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,9 @@ import (
1010
ft "github.com/ipfs/go-ipfs/unixfs"
1111
ftpb "github.com/ipfs/go-ipfs/unixfs/pb"
1212

13+
node "gx/ipfs/QmNwUEK7QbwSqyKBu3mMtToo8SUc6wQJ7gdZq4gGGJqfnf/go-ipld-format"
1314
proto "gx/ipfs/QmZ4Qi3GaRbjcx28Sme5eMH7RQjGkt8wHxt2a65oLaeFEV/gogo-protobuf/proto"
15+
cid "gx/ipfs/QmeSrf6pzut73u6zLQkRFQ3ygt3k6XFT2kjdYP8Tnkwwyg/go-cid"
1416
)
1517

1618
// DagReader provides a way to easily read the data contained in a dag.
@@ -30,6 +32,9 @@ type pbDagReader struct {
3032
// NodeGetters for each of 'nodes' child links
3133
promises []mdag.NodeGetter
3234

35+
// the cid of each child of the current node
36+
links []*cid.Cid
37+
3338
// the index of the child link currently being read from
3439
linkPosition int
3540

@@ -47,30 +52,54 @@ var _ DagReader = (*pbDagReader)(nil)
4752

4853
func NewPBFileReader(ctx context.Context, n *mdag.ProtoNode, pb *ftpb.Data, serv mdag.DAGService) *pbDagReader {
4954
fctx, cancel := context.WithCancel(ctx)
50-
promises := mdag.GetDAG(fctx, serv, n)
55+
curLinks := getLinkCids(n)
5156
return &pbDagReader{
5257
node: n,
5358
serv: serv,
5459
buf: NewBufDagReader(pb.GetData()),
55-
promises: promises,
60+
promises: make([]mdag.NodeGetter, len(curLinks)),
61+
links: curLinks,
5662
ctx: fctx,
5763
cancel: cancel,
5864
pbdata: pb,
5965
}
6066
}
6167

68+
const preloadSize = 10
69+
70+
func (dr *pbDagReader) preloadNextNodes(ctx context.Context) {
71+
beg := dr.linkPosition
72+
end := beg + preloadSize
73+
if end >= len(dr.links) {
74+
end = len(dr.links)
75+
}
76+
77+
for i, p := range mdag.GetNodes(ctx, dr.serv, dr.links[beg:end]) {
78+
dr.promises[beg+i] = p
79+
}
80+
}
81+
6282
// precalcNextBuf follows the next link in line and loads it from the
6383
// DAGService, setting the next buffer to read from
6484
func (dr *pbDagReader) precalcNextBuf(ctx context.Context) error {
65-
dr.buf.Close() // Just to make sure
85+
if dr.buf != nil {
86+
dr.buf.Close() // Just to make sure
87+
dr.buf = nil
88+
}
89+
6690
if dr.linkPosition >= len(dr.promises) {
6791
return io.EOF
6892
}
6993

94+
if dr.promises[dr.linkPosition] == nil {
95+
dr.preloadNextNodes(ctx)
96+
}
97+
7098
nxt, err := dr.promises[dr.linkPosition].Get(ctx)
7199
if err != nil {
72100
return err
73101
}
102+
dr.promises[dr.linkPosition] = nil
74103
dr.linkPosition++
75104

76105
switch nxt := nxt.(type) {
@@ -105,6 +134,15 @@ func (dr *pbDagReader) precalcNextBuf(ctx context.Context) error {
105134
}
106135
}
107136

137+
func getLinkCids(n node.Node) []*cid.Cid {
138+
links := n.Links()
139+
out := make([]*cid.Cid, 0, len(links))
140+
for _, l := range links {
141+
out = append(out, l.Cid)
142+
}
143+
return out
144+
}
145+
108146
// Size return the total length of the data from the DAG structured file.
109147
func (dr *pbDagReader) Size() uint64 {
110148
return dr.pbdata.GetFilesize()
@@ -117,6 +155,12 @@ func (dr *pbDagReader) Read(b []byte) (int, error) {
117155

118156
// CtxReadFull reads data from the DAG structured file
119157
func (dr *pbDagReader) CtxReadFull(ctx context.Context, b []byte) (int, error) {
158+
if dr.buf == nil {
159+
if err := dr.precalcNextBuf(ctx); err != nil {
160+
return 0, err
161+
}
162+
}
163+
120164
// If no cached buffer, load one
121165
total := 0
122166
for {
@@ -145,6 +189,12 @@ func (dr *pbDagReader) CtxReadFull(ctx context.Context, b []byte) (int, error) {
145189
}
146190

147191
func (dr *pbDagReader) WriteTo(w io.Writer) (int64, error) {
192+
if dr.buf == nil {
193+
if err := dr.precalcNextBuf(dr.ctx); err != nil {
194+
return 0, err
195+
}
196+
}
197+
148198
// If no cached buffer, load one
149199
total := int64(0)
150200
for {
@@ -199,7 +249,9 @@ func (dr *pbDagReader) Seek(offset int64, whence int) (int64, error) {
199249
left := offset
200250
if int64(len(pb.Data)) >= offset {
201251
// Close current buf to close potential child dagreader
202-
dr.buf.Close()
252+
if dr.buf != nil {
253+
dr.buf.Close()
254+
}
203255
dr.buf = NewBufDagReader(pb.GetData()[offset:])
204256

205257
// start reading links from the beginning

0 commit comments

Comments
 (0)