Skip to content

Pin ls traverses all indirect pins #6705

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Dec 2, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
92 changes: 24 additions & 68 deletions core/commands/pin.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,15 @@ import (
core "github.com/ipfs/go-ipfs/core"
cmdenv "github.com/ipfs/go-ipfs/core/commands/cmdenv"
e "github.com/ipfs/go-ipfs/core/commands/e"
coreapi "github.com/ipfs/go-ipfs/core/coreapi"
pin "github.com/ipfs/go-ipfs/pin"

bserv "github.com/ipfs/go-blockservice"
cid "github.com/ipfs/go-cid"
cidenc "github.com/ipfs/go-cidutil/cidenc"
cmds "github.com/ipfs/go-ipfs-cmds"
offline "github.com/ipfs/go-ipfs-exchange-offline"
ipld "github.com/ipfs/go-ipld-format"
dag "github.com/ipfs/go-merkledag"
verifcid "github.com/ipfs/go-verifcid"
coreiface "github.com/ipfs/interface-go-ipfs-core"
Expand Down Expand Up @@ -352,7 +354,7 @@ Example:
if len(req.Arguments) > 0 {
err = pinLsKeys(req, typeStr, n, api, emit)
} else {
err = pinLsAll(req, typeStr, n, emit)
err = pinLsAll(req, typeStr, n.Pinning, n.DAG, emit)
}
if err != nil {
return err
Expand Down Expand Up @@ -475,84 +477,38 @@ func pinLsKeys(req *cmds.Request, typeStr string, n *core.IpfsNode, api coreifac
return nil
}

func pinLsAll(req *cmds.Request, typeStr string, n *core.IpfsNode, emit func(value interface{}) error) error {
func pinLsAll(req *cmds.Request, typeStr string, pinning pin.Pinner, dag ipld.DAGService, emit func(value interface{}) error) error {
pinCh, errCh := coreapi.PinLsAll(req.Context, typeStr, pinning, dag)

enc, err := cmdenv.GetCidEncoder(req)
if err != nil {
return err
}

keys := cid.NewSet()

AddToResultKeys := func(keyList []cid.Cid, typeStr string) error {
for _, c := range keyList {
if keys.Visit(c) {
err := emit(&PinLsOutputWrapper{
PinLsObject: PinLsObject{
Type: typeStr,
Cid: enc.Encode(c),
},
})
if err != nil {
return err
}
ctx := req.Context
loop:
for {
select {
case p, ok := <-pinCh:
if !ok {
break loop
}
}
return nil
}

if typeStr == "direct" || typeStr == "all" {
dkeys, err := n.Pinning.DirectKeys(req.Context)
if err != nil {
return err
}
err = AddToResultKeys(dkeys, "direct")
if err != nil {
return err
}
}
if typeStr == "recursive" || typeStr == "all" {
rkeys, err := n.Pinning.RecursiveKeys(req.Context)
if err != nil {
return err
}
err = AddToResultKeys(rkeys, "recursive")
if err != nil {
return err
}
}
if typeStr == "indirect" || typeStr == "all" {
rkeys, err := n.Pinning.RecursiveKeys(req.Context)
if err != nil {
return err
}
for _, k := range rkeys {
var visitErr error
err := dag.Walk(req.Context, dag.GetLinksWithDAG(n.DAG), k, func(c cid.Cid) bool {
r := keys.Visit(c)
if r {
err := emit(&PinLsOutputWrapper{
PinLsObject: PinLsObject{
Type: "indirect",
Cid: enc.Encode(c),
},
})
if err != nil {
visitErr = err
}
}
return r
}, dag.SkipRoot(), dag.Concurrent())

if visitErr != nil {
return visitErr
}
if err != nil {
if err := emit(&PinLsOutputWrapper{
PinLsObject: PinLsObject{
Type: p.Type(),
Cid: enc.Encode(p.Path().Cid()),
},
}); err != nil {
return err
}

case <-ctx.Done():
return ctx.Err()
}
}

return nil
err = <-errCh
return err
}

const (
Expand Down
150 changes: 108 additions & 42 deletions core/coreapi/pin.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,15 @@ package coreapi
import (
"context"
"fmt"

bserv "github.com/ipfs/go-blockservice"
cid "github.com/ipfs/go-cid"
"github.com/ipfs/go-cid"
offline "github.com/ipfs/go-ipfs-exchange-offline"
merkledag "github.com/ipfs/go-merkledag"
"github.com/ipfs/go-ipfs/pin"
ipld "github.com/ipfs/go-ipld-format"
"github.com/ipfs/go-merkledag"
coreiface "github.com/ipfs/interface-go-ipfs-core"
caopts "github.com/ipfs/interface-go-ipfs-core/options"
path "github.com/ipfs/interface-go-ipfs-core/path"
"github.com/ipfs/interface-go-ipfs-core/path"
)

type PinAPI CoreAPI
Expand Down Expand Up @@ -194,57 +195,122 @@ func (p *pinInfo) Type() string {
}

func (api *PinAPI) pinLsAll(typeStr string, ctx context.Context) ([]coreiface.Pin, error) {
pinCh, errCh := PinLsAll(ctx, typeStr, api.pinning, api.dag)

var pins []coreiface.Pin
loop:
for {
select {
case p, ok := <-pinCh:
if !ok {
break loop
}
pins = append(pins, p)
case <-ctx.Done():
return nil, ctx.Err()
}
}
err := <-errCh
if err != nil {
return nil, err
}

return pins, nil
}

keys := make(map[cid.Cid]*pinInfo)
// PinLsAll is an internal function for returning a list of pins
func PinLsAll(ctx context.Context, typeStr string, pin pin.Pinner, dag ipld.DAGService) (chan coreiface.Pin, chan error) {
ch := make(chan coreiface.Pin, 32)
errCh := make(chan error, 1)

AddToResultKeys := func(keyList []cid.Cid, typeStr string) {
keys := cid.NewSet()
AddToResultKeys := func(keyList []cid.Cid, typeStr string) error {
for _, c := range keyList {
keys[c] = &pinInfo{
pinType: typeStr,
path: path.IpldPath(c),
if keys.Visit(c) {
select {
case ch <- &pinInfo{
pinType: typeStr,
path: path.IpldPath(c),
}:
case <-ctx.Done():
return ctx.Err()
}
}
}
return nil
}

if typeStr == "direct" || typeStr == "all" {
dkeys, err := api.pinning.DirectKeys(ctx)
if err != nil {
return nil, err
}
AddToResultKeys(dkeys, "direct")
}
if typeStr == "indirect" || typeStr == "all" {
set := cid.NewSet()
rkeys, err := api.pinning.RecursiveKeys(ctx)
if err != nil {
return nil, err
}
for _, k := range rkeys {
err := merkledag.Walk(
ctx, merkledag.GetLinksWithDAG(api.dag), k,
set.Visit,
merkledag.SkipRoot(), merkledag.Concurrent(),
)
go func() {
defer close(ch)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given the way this function is currently called, this can hide errors: we can observe this channel closing before we read the error off the error channel.

It's generally safer to just use a "result" channel. That makes it easier to call this function as the for x := range myCh syntax can be used.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Alternatively, we can close both the error and normal channels and, instead of listening on the error channel, run err := <-errCh after finishing the normal loop.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I did what you asked here, lmk.

defer close(errCh)
if typeStr == "direct" || typeStr == "all" {
dkeys, err := pin.DirectKeys(ctx)
if err != nil {
return nil, err
errCh <- err
return
}
if err := AddToResultKeys(dkeys, "direct"); err != nil {
errCh <- err
return
}
}
AddToResultKeys(set.Keys(), "indirect")
}
if typeStr == "recursive" || typeStr == "all" {
rkeys, err := api.pinning.RecursiveKeys(ctx)
if err != nil {
return nil, err
if typeStr == "recursive" || typeStr == "all" {
rkeys, err := pin.RecursiveKeys(ctx)
if err != nil {
errCh <- err
return
}
if err := AddToResultKeys(rkeys, "recursive"); err != nil {
errCh <- err
return
}
}
AddToResultKeys(rkeys, "recursive")
}
if typeStr == "indirect" || typeStr == "all" {
rkeys, err := pin.RecursiveKeys(ctx)
if err != nil {
errCh <- err
return
}

out := make([]coreiface.Pin, 0, len(keys))
for _, v := range keys {
out = append(out, v)
}
// If we're only listing indirect pins, we need to
// explicitly mark direct/recursive pins so we don't
// send them.
if typeStr == "indirect" {
dkeys, err := pin.DirectKeys(ctx)
if err != nil {
errCh <- err
return
}

for _, k := range dkeys {
keys.Add(k)
}
for _, k := range rkeys {
keys.Add(k)
}
}

return out, nil
indirectKeys := cid.NewSet()
for _, k := range rkeys {
err := merkledag.Walk(ctx, merkledag.GetLinksWithDAG(dag), k, func(c cid.Cid) bool {
r := indirectKeys.Visit(c)
if r {
if err := AddToResultKeys([]cid.Cid{c}, "indirect"); err != nil {
return false
}
}
return r
}, merkledag.SkipRoot(), merkledag.Concurrent())

if err != nil {
errCh <- err
return
}
}
}
}()

return ch, errCh
}

func (api *PinAPI) core() coreiface.CoreAPI {
Expand Down
2 changes: 1 addition & 1 deletion core/coreapi/unixfs.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ type UnixfsAPI CoreAPI
var nilNode *core.IpfsNode
var once sync.Once

func getOrCreateNilNode() (*core.IpfsNode,error) {
func getOrCreateNilNode() (*core.IpfsNode, error) {
once.Do(func() {
if nilNode != nil {
return
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ require (
github.com/ipfs/go-path v0.0.7
github.com/ipfs/go-unixfs v0.2.1
github.com/ipfs/go-verifcid v0.0.1
github.com/ipfs/interface-go-ipfs-core v0.2.3
github.com/ipfs/interface-go-ipfs-core v0.2.5
github.com/jbenet/go-is-domain v1.0.3
github.com/jbenet/go-random v0.0.0-20190219211222-123a90aedc0c
github.com/jbenet/go-temp-err-catcher v0.0.0-20150120210811-aac704a3f4f2
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -259,8 +259,8 @@ github.com/ipfs/go-unixfs v0.2.1 h1:g51t9ODICFZ3F51FPivm8dE7NzYcdAQNUL9wGP5AYa0=
github.com/ipfs/go-unixfs v0.2.1/go.mod h1:IwAAgul1UQIcNZzKPYZWOCijryFBeCV79cNubPzol+k=
github.com/ipfs/go-verifcid v0.0.1 h1:m2HI7zIuR5TFyQ1b79Da5N9dnnCP1vcu2QqawmWlK2E=
github.com/ipfs/go-verifcid v0.0.1/go.mod h1:5Hrva5KBeIog4A+UpqlaIU+DEstipcJYQQZc0g37pY0=
github.com/ipfs/interface-go-ipfs-core v0.2.3 h1:E6uQ+1fJjkxJWlL9lAE72a5FWeyeeNL3GitLy8+jq3Y=
github.com/ipfs/interface-go-ipfs-core v0.2.3/go.mod h1:Tihp8zxGpUeE3Tokr94L6zWZZdkRQvG5TL6i9MuNE+s=
github.com/ipfs/interface-go-ipfs-core v0.2.5 h1:/rspOe8RbIxwtssEXHB+X9JXhOBDCQt8x50d2kFPXL8=
github.com/ipfs/interface-go-ipfs-core v0.2.5/go.mod h1:Tihp8zxGpUeE3Tokr94L6zWZZdkRQvG5TL6i9MuNE+s=
github.com/jackpal/gateway v1.0.4/go.mod h1:lTpwd4ACLXmpyiCTRtfiNyVnUmqT9RivzCDQetPfnjA=
github.com/jackpal/gateway v1.0.5 h1:qzXWUJfuMdlLMtt0a3Dgt+xkWQiA5itDEITVJtuSwMc=
github.com/jackpal/gateway v1.0.5/go.mod h1:lTpwd4ACLXmpyiCTRtfiNyVnUmqT9RivzCDQetPfnjA=
Expand Down