Skip to content
This repository was archived by the owner on Jun 26, 2023. It is now read-only.

Commit c4e612e

Browse files
committed
Converting to datastore-based pins requires loading all dag-storage pins into memory, in addition to indirect pins. This is unnecessary for conversion, and much memory can be saved by avoiding loading all pins.
1 parent 4c92071 commit c4e612e

File tree

3 files changed

+90
-27
lines changed

3 files changed

+90
-27
lines changed

ipldpinner/pin.go

+35
Original file line numberDiff line numberDiff line change
@@ -141,6 +141,41 @@ func New(dstore ds.Datastore, dserv, internal ipld.DAGService) (*pinner, error)
141141
}, nil
142142
}
143143

144+
// LoadKeys reads the pinned CIDs and sends them on the given channel. This is
145+
// used to read pins without loading them all into memory.
146+
func LoadKeys(ctx context.Context, dstore ds.Datastore, dserv, internal ipld.DAGService, recursive bool, keyChan chan<- cid.Cid) error {
147+
rootKey, err := dstore.Get(pinDatastoreKey)
148+
if err != nil {
149+
if err == ds.ErrNotFound {
150+
return nil
151+
}
152+
return err
153+
}
154+
rootCid, err := cid.Cast(rootKey)
155+
if err != nil {
156+
return err
157+
}
158+
159+
root, err := internal.Get(ctx, rootCid)
160+
if err != nil {
161+
return fmt.Errorf("cannot find pinning root object: %v", err)
162+
}
163+
164+
rootpb, ok := root.(*mdag.ProtoNode)
165+
if !ok {
166+
return mdag.ErrNotProtobuf
167+
}
168+
169+
var linkName string
170+
if recursive {
171+
linkName = linkRecursive
172+
} else {
173+
linkName = linkDirect
174+
}
175+
176+
return loadSetChan(ctx, internal, rootpb, linkName, keyChan)
177+
}
178+
144179
// Pin the given node, optionally recursive
145180
func (p *pinner) Pin(ctx context.Context, node ipld.Node, recurse bool) error {
146181
err := p.dserv.Add(ctx, node)

ipldpinner/set.go

+32-3
Original file line numberDiff line numberDiff line change
@@ -219,13 +219,15 @@ func walkItems(ctx context.Context, dag ipld.DAGService, n *merkledag.ProtoNode,
219219
// readHdr guarantees fanout is a safe value
220220
fanout := hdr.GetFanout()
221221
for i, l := range n.Links()[fanout:] {
222-
if err := fn(i, l); err != nil {
222+
if err = fn(i, l); err != nil {
223223
return err
224224
}
225225
}
226226
for _, l := range n.Links()[:fanout] {
227227
c := l.Cid
228-
children(c)
228+
if children != nil {
229+
children(c)
230+
}
229231
if c.Equals(emptyKey) {
230232
continue
231233
}
@@ -239,7 +241,7 @@ func walkItems(ctx context.Context, dag ipld.DAGService, n *merkledag.ProtoNode,
239241
return merkledag.ErrNotProtobuf
240242
}
241243

242-
if err := walkItems(ctx, dag, stpb, fn, children); err != nil {
244+
if err = walkItems(ctx, dag, stpb, fn, children); err != nil {
243245
return err
244246
}
245247
}
@@ -277,6 +279,33 @@ func loadSet(ctx context.Context, dag ipld.DAGService, root *merkledag.ProtoNode
277279
return res, nil
278280
}
279281

282+
func loadSetChan(ctx context.Context, dag ipld.DAGService, root *merkledag.ProtoNode, name string, keyChan chan<- cid.Cid) error {
283+
l, err := root.GetNodeLink(name)
284+
if err != nil {
285+
return err
286+
}
287+
288+
n, err := l.GetNode(ctx, dag)
289+
if err != nil {
290+
return err
291+
}
292+
293+
pbn, ok := n.(*merkledag.ProtoNode)
294+
if !ok {
295+
return merkledag.ErrNotProtobuf
296+
}
297+
298+
walk := func(idx int, link *ipld.Link) error {
299+
keyChan <- link.Cid
300+
return nil
301+
}
302+
303+
if err = walkItems(ctx, dag, pbn, walk, nil); err != nil {
304+
return err
305+
}
306+
return nil
307+
}
308+
280309
func getCidListIterator(cids []cid.Cid) itemIterator {
281310
return func() (c cid.Cid, ok bool) {
282311
if len(cids) == 0 {

pinconv/pinconv.go

+23-24
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ import (
77
"context"
88
"fmt"
99

10-
"github.com/ipfs/go-cid"
10+
cid "github.com/ipfs/go-cid"
1111
ds "github.com/ipfs/go-datastore"
1212
ipfspinner "github.com/ipfs/go-ipfs-pinner"
1313
"github.com/ipfs/go-ipfs-pinner/dspinner"
@@ -24,39 +24,38 @@ import (
2424
func ConvertPinsFromIPLDToDS(ctx context.Context, dstore ds.Datastore, dserv ipld.DAGService, internal ipld.DAGService) (ipfspinner.Pinner, int, error) {
2525
const ipldPinPath = "/local/pins"
2626

27-
ipldPinner, err := ipldpinner.New(dstore, dserv, internal)
28-
if err != nil {
29-
return nil, 0, err
30-
}
31-
3227
dsPinner, err := dspinner.New(ctx, dstore, dserv)
3328
if err != nil {
3429
return nil, 0, err
3530
}
3631

37-
seen := cid.NewSet()
38-
cids, err := ipldPinner.RecursiveKeys(ctx)
39-
if err != nil {
40-
return nil, 0, err
32+
var convCount int
33+
keyChan := make(chan cid.Cid)
34+
35+
go func() {
36+
err = ipldpinner.LoadKeys(ctx, dstore, dserv, internal, true, keyChan)
37+
close(keyChan)
38+
}()
39+
for key := range keyChan {
40+
dsPinner.PinWithMode(key, ipfspinner.Recursive)
41+
convCount++
4142
}
42-
for i := range cids {
43-
seen.Add(cids[i])
44-
dsPinner.PinWithMode(cids[i], ipfspinner.Recursive)
43+
if err != nil {
44+
return nil, 0, fmt.Errorf("cannot load recursive keys: %s", err)
4545
}
46-
convCount := len(cids)
4746

48-
cids, err = ipldPinner.DirectKeys(ctx)
49-
if err != nil {
50-
return nil, 0, err
47+
keyChan = make(chan cid.Cid)
48+
go func() {
49+
err = ipldpinner.LoadKeys(ctx, dstore, dserv, internal, false, keyChan)
50+
close(keyChan)
51+
}()
52+
for key := range keyChan {
53+
dsPinner.PinWithMode(key, ipfspinner.Direct)
54+
convCount++
5155
}
52-
for i := range cids {
53-
if seen.Has(cids[i]) {
54-
// Pin was already pinned recursively
55-
continue
56-
}
57-
dsPinner.PinWithMode(cids[i], ipfspinner.Direct)
56+
if err != nil {
57+
return nil, 0, fmt.Errorf("cannot load direct keys: %s", err)
5858
}
59-
convCount += len(cids)
6059

6160
err = dsPinner.Flush(ctx)
6261
if err != nil {

0 commit comments

Comments
 (0)