Skip to content

fix bug in pinsets and add a stress test for the scenario #3273

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Oct 9, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
188 changes: 79 additions & 109 deletions pin/set.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,14 @@ package pin

import (
"bytes"
"context"
"crypto/rand"
"encoding/binary"
"errors"
"fmt"
"hash/fnv"
"sort"
"unsafe"

"context"
"github.com/ipfs/go-ipfs/merkledag"
"github.com/ipfs/go-ipfs/pin/internal/pb"
"gx/ipfs/QmYEoKZXHoAToWfhGF3vryhMn3WWhE1o2MasQ8uzY5iDi9/go-key"
Expand All @@ -19,8 +18,11 @@ import (
)

const (
// defaultFanout specifies the default number of fan-out links per layer
defaultFanout = 256
maxItems = 8192

// maxItems is the maximum number of items that will fit in a single bucket
maxItems = 8192
)

func randomSeed() (uint32, error) {
Expand All @@ -40,36 +42,12 @@ func hash(seed uint32, c *cid.Cid) uint32 {
return h.Sum32()
}

type itemIterator func() (c *cid.Cid, data []byte, ok bool)
type itemIterator func() (c *cid.Cid, ok bool)

type keyObserver func(*cid.Cid)

// refcount is the marshaled format of refcounts. It may change
// between versions; this is valid for version 1. Changing it may
// become desirable if there are many links with refcount > 255.
//
// There are two guarantees that need to be preserved, if this is
// changed:
//
// - the marshaled format is of fixed size, matching
// unsafe.Sizeof(refcount(0))
// - methods of refcount handle endianness, and may
// in later versions need encoding/binary.
type refcount uint8

func (r refcount) Bytes() []byte {
return []byte{byte(r)}
}

// readRefcount returns the idx'th refcount in []byte, which is
// assumed to be a sequence of refcount.Bytes results.
func (r *refcount) ReadFromIdx(buf []byte, idx int) {
*r = refcount(buf[idx])
}

type sortByHash struct {
links []*merkledag.Link
data []byte
}

func (s sortByHash) Len() int {
Expand All @@ -82,27 +60,22 @@ func (s sortByHash) Less(a, b int) bool {

func (s sortByHash) Swap(a, b int) {
s.links[a], s.links[b] = s.links[b], s.links[a]
if len(s.data) != 0 {
const n = int(unsafe.Sizeof(refcount(0)))
tmp := make([]byte, n)
copy(tmp, s.data[a*n:a*n+n])
copy(s.data[a*n:a*n+n], s.data[b*n:b*n+n])
copy(s.data[b*n:b*n+n], tmp)
}
}

func storeItems(ctx context.Context, dag merkledag.DAGService, estimatedLen uint64, iter itemIterator, internalKeys keyObserver) (*merkledag.Node, error) {
seed, err := randomSeed()
if err != nil {
return nil, err
}
n := &merkledag.Node{
Links: make([]*merkledag.Link, 0, defaultFanout+maxItems),
}

n := &merkledag.Node{Links: make([]*merkledag.Link, 0, defaultFanout+maxItems)}
for i := 0; i < defaultFanout; i++ {
n.Links = append(n.Links, &merkledag.Link{Hash: emptyKey.Hash()})
}

// add emptyKey to our set of internal pinset objects
internalKeys(emptyKey)

hdr := &pb.Set{
Version: proto.Uint32(1),
Fanout: proto.Uint32(defaultFanout),
Expand All @@ -111,122 +84,138 @@ func storeItems(ctx context.Context, dag merkledag.DAGService, estimatedLen uint
if err := writeHdr(n, hdr); err != nil {
return nil, err
}
hdrLen := len(n.Data())

if estimatedLen < maxItems {
// it'll probably fit
for i := 0; i < maxItems; i++ {
k, data, ok := iter()
k, ok := iter()
if !ok {
// all done
break
}
n.Links = append(n.Links, &merkledag.Link{Hash: k.Hash()})
n.SetData(append(n.Data(), data...))
}
// sort by hash, also swap item Data
s := sortByHash{
links: n.Links[defaultFanout:],
data: n.Data()[hdrLen:],
}
sort.Stable(s)
}

// wasteful but simple
type item struct {
c *cid.Cid
data []byte
}
hashed := make(map[uint32][]item)
hashed := make([][]*cid.Cid, defaultFanout)
for {
k, data, ok := iter()
// This loop essentially enumerates every single item in the set
// and maps them all into a set of buckets. Each bucket will be recursively
// turned into its own sub-set, and so on down the chain. Each sub-set
// gets added to the dagservice, and put into its place in a set nodes
// links array.
//
// Previously, the bucket was selected by taking an int32 from the hash of
// the input key + seed. This was erroneous as we would later be assigning
// the created sub-sets into an array of length 256 by the modulus of the
// int32 hash value with 256. This resulted in overwriting existing sub-sets
// and losing pins. The fix (a few lines down from this comment), is to
// map the hash value down to the 8 bit keyspace here while creating the
// buckets. This way, we avoid any overlapping later on.
k, ok := iter()
if !ok {
break
}
h := hash(seed, k)
hashed[h] = append(hashed[h], item{k, data})
h := hash(seed, k) % defaultFanout
hashed[h] = append(hashed[h], k)
}

for h, items := range hashed {
childIter := func() (c *cid.Cid, data []byte, ok bool) {
if len(items) == 0 {
return nil, nil, false
}
first := items[0]
items = items[1:]
return first.c, first.data, true
if len(items) == 0 {
// recursion base case
continue
}

childIter := getCidListIterator(items)

// recursively create a pinset from the items for this bucket index
child, err := storeItems(ctx, dag, uint64(len(items)), childIter, internalKeys)
if err != nil {
return nil, err
}

size, err := child.Size()
if err != nil {
return nil, err
}

childKey, err := dag.Add(child)
if err != nil {
return nil, err
}

internalKeys(childKey)
l := &merkledag.Link{
Name: "",

// overwrite the 'empty key' in the existing links array
n.Links[h] = &merkledag.Link{
Hash: childKey.Hash(),
Size: size,
}
n.Links[int(h%defaultFanout)] = l
}
return n, nil
}

func readHdr(n *merkledag.Node) (*pb.Set, []byte, error) {
func readHdr(n *merkledag.Node) (*pb.Set, error) {
hdrLenRaw, consumed := binary.Uvarint(n.Data())
if consumed <= 0 {
return nil, nil, errors.New("invalid Set header length")
return nil, errors.New("invalid Set header length")
}
buf := n.Data()[consumed:]
if hdrLenRaw > uint64(len(buf)) {
return nil, nil, errors.New("impossibly large Set header length")

pbdata := n.Data()[consumed:]
if hdrLenRaw > uint64(len(pbdata)) {
return nil, errors.New("impossibly large Set header length")
}
// as hdrLenRaw was <= an int, we now know it fits in an int
hdrLen := int(hdrLenRaw)
var hdr pb.Set
if err := proto.Unmarshal(buf[:hdrLen], &hdr); err != nil {
return nil, nil, err
if err := proto.Unmarshal(pbdata[:hdrLen], &hdr); err != nil {
return nil, err
}
buf = buf[hdrLen:]

if v := hdr.GetVersion(); v != 1 {
return nil, nil, fmt.Errorf("unsupported Set version: %d", v)
return nil, fmt.Errorf("unsupported Set version: %d", v)
}
if uint64(hdr.GetFanout()) > uint64(len(n.Links)) {
return nil, nil, errors.New("impossibly large Fanout")
return nil, errors.New("impossibly large Fanout")
}
return &hdr, buf, nil
return &hdr, nil
}

func writeHdr(n *merkledag.Node, hdr *pb.Set) error {
hdrData, err := proto.Marshal(hdr)
if err != nil {
return err
}
n.SetData(make([]byte, binary.MaxVarintLen64, binary.MaxVarintLen64+len(hdrData)))
written := binary.PutUvarint(n.Data(), uint64(len(hdrData)))
n.SetData(n.Data()[:written])
n.SetData(append(n.Data(), hdrData...))

// make enough space for the length prefix and the marshalled header data
data := make([]byte, binary.MaxVarintLen64, binary.MaxVarintLen64+len(hdrData))

// write the uvarint length of the header data
uvarlen := binary.PutUvarint(data, uint64(len(hdrData)))

// append the actual protobuf data *after* the length value we wrote
data = append(data[:uvarlen], hdrData...)

n.SetData(data)
return nil
}

type walkerFunc func(buf []byte, idx int, link *merkledag.Link) error
type walkerFunc func(idx int, link *merkledag.Link) error

func walkItems(ctx context.Context, dag merkledag.DAGService, n *merkledag.Node, fn walkerFunc, children keyObserver) error {
hdr, buf, err := readHdr(n)
hdr, err := readHdr(n)
if err != nil {
return err
}
// readHdr guarantees fanout is a safe value
fanout := hdr.GetFanout()
for i, l := range n.Links[fanout:] {
if err := fn(buf, i, l); err != nil {
if err := fn(i, l); err != nil {
return err
}
}
Expand Down Expand Up @@ -262,7 +251,7 @@ func loadSet(ctx context.Context, dag merkledag.DAGService, root *merkledag.Node
}

var res []*cid.Cid
walk := func(buf []byte, idx int, link *merkledag.Link) error {
walk := func(idx int, link *merkledag.Link) error {
res = append(res, cid.NewCidV0(link.Hash))
return nil
}
Expand All @@ -272,40 +261,21 @@ func loadSet(ctx context.Context, dag merkledag.DAGService, root *merkledag.Node
return res, nil
}

func loadMultiset(ctx context.Context, dag merkledag.DAGService, root *merkledag.Node, name string, internalKeys keyObserver) (map[key.Key]uint64, error) {
l, err := root.GetNodeLink(name)
if err != nil {
return nil, fmt.Errorf("Failed to get link %s: %v", name, err)
}
c := cid.NewCidV0(l.Hash)
internalKeys(c)
n, err := l.GetNode(ctx, dag)
if err != nil {
return nil, fmt.Errorf("Failed to get node from link %s: %v", name, err)
}

refcounts := make(map[key.Key]uint64)
walk := func(buf []byte, idx int, link *merkledag.Link) error {
var r refcount
r.ReadFromIdx(buf, idx)
refcounts[key.Key(link.Hash)] += uint64(r)
return nil
}
if err := walkItems(ctx, dag, n, walk, internalKeys); err != nil {
return nil, err
}
return refcounts, nil
}

func storeSet(ctx context.Context, dag merkledag.DAGService, cids []*cid.Cid, internalKeys keyObserver) (*merkledag.Node, error) {
iter := func() (c *cid.Cid, data []byte, ok bool) {
func getCidListIterator(cids []*cid.Cid) itemIterator {
return func() (c *cid.Cid, ok bool) {
if len(cids) == 0 {
return nil, nil, false
return nil, false
}

first := cids[0]
cids = cids[1:]
return first, nil, true
return first, true
}
}

func storeSet(ctx context.Context, dag merkledag.DAGService, cids []*cid.Cid, internalKeys keyObserver) (*merkledag.Node, error) {
iter := getCidListIterator(cids)

n, err := storeItems(ctx, dag, uint64(len(cids)), iter, internalKeys)
if err != nil {
return nil, err
Expand Down
Loading