Skip to content

Commit 391b78a

Browse files
Merge pull request #3273 from ipfs/fix/pin-fail
fix bug in pinsets and add a stress test for the scenario
2 parents 2fd045f + a49483b commit 391b78a

File tree

2 files changed

+139
-116
lines changed

2 files changed

+139
-116
lines changed

pin/set.go

+79-109
Original file line numberDiff line numberDiff line change
@@ -2,15 +2,14 @@ package pin
22

33
import (
44
"bytes"
5+
"context"
56
"crypto/rand"
67
"encoding/binary"
78
"errors"
89
"fmt"
910
"hash/fnv"
1011
"sort"
11-
"unsafe"
1212

13-
"context"
1413
"github.com/ipfs/go-ipfs/merkledag"
1514
"github.com/ipfs/go-ipfs/pin/internal/pb"
1615
"gx/ipfs/QmYEoKZXHoAToWfhGF3vryhMn3WWhE1o2MasQ8uzY5iDi9/go-key"
@@ -19,8 +18,11 @@ import (
1918
)
2019

2120
const (
21+
// defaultFanout specifies the default number of fan-out links per layer
2222
defaultFanout = 256
23-
maxItems = 8192
23+
24+
// maxItems is the maximum number of items that will fit in a single bucket
25+
maxItems = 8192
2426
)
2527

2628
func randomSeed() (uint32, error) {
@@ -40,36 +42,12 @@ func hash(seed uint32, c *cid.Cid) uint32 {
4042
return h.Sum32()
4143
}
4244

43-
type itemIterator func() (c *cid.Cid, data []byte, ok bool)
45+
type itemIterator func() (c *cid.Cid, ok bool)
4446

4547
type keyObserver func(*cid.Cid)
4648

47-
// refcount is the marshaled format of refcounts. It may change
48-
// between versions; this is valid for version 1. Changing it may
49-
// become desirable if there are many links with refcount > 255.
50-
//
51-
// There are two guarantees that need to be preserved, if this is
52-
// changed:
53-
//
54-
// - the marshaled format is of fixed size, matching
55-
// unsafe.Sizeof(refcount(0))
56-
// - methods of refcount handle endianness, and may
57-
// in later versions need encoding/binary.
58-
type refcount uint8
59-
60-
func (r refcount) Bytes() []byte {
61-
return []byte{byte(r)}
62-
}
63-
64-
// readRefcount returns the idx'th refcount in []byte, which is
65-
// assumed to be a sequence of refcount.Bytes results.
66-
func (r *refcount) ReadFromIdx(buf []byte, idx int) {
67-
*r = refcount(buf[idx])
68-
}
69-
7049
type sortByHash struct {
7150
links []*merkledag.Link
72-
data []byte
7351
}
7452

7553
func (s sortByHash) Len() int {
@@ -82,27 +60,22 @@ func (s sortByHash) Less(a, b int) bool {
8260

8361
func (s sortByHash) Swap(a, b int) {
8462
s.links[a], s.links[b] = s.links[b], s.links[a]
85-
if len(s.data) != 0 {
86-
const n = int(unsafe.Sizeof(refcount(0)))
87-
tmp := make([]byte, n)
88-
copy(tmp, s.data[a*n:a*n+n])
89-
copy(s.data[a*n:a*n+n], s.data[b*n:b*n+n])
90-
copy(s.data[b*n:b*n+n], tmp)
91-
}
9263
}
9364

9465
func storeItems(ctx context.Context, dag merkledag.DAGService, estimatedLen uint64, iter itemIterator, internalKeys keyObserver) (*merkledag.Node, error) {
9566
seed, err := randomSeed()
9667
if err != nil {
9768
return nil, err
9869
}
99-
n := &merkledag.Node{
100-
Links: make([]*merkledag.Link, 0, defaultFanout+maxItems),
101-
}
70+
71+
n := &merkledag.Node{Links: make([]*merkledag.Link, 0, defaultFanout+maxItems)}
10272
for i := 0; i < defaultFanout; i++ {
10373
n.Links = append(n.Links, &merkledag.Link{Hash: emptyKey.Hash()})
10474
}
75+
76+
// add emptyKey to our set of internal pinset objects
10577
internalKeys(emptyKey)
78+
10679
hdr := &pb.Set{
10780
Version: proto.Uint32(1),
10881
Fanout: proto.Uint32(defaultFanout),
@@ -111,122 +84,138 @@ func storeItems(ctx context.Context, dag merkledag.DAGService, estimatedLen uint
11184
if err := writeHdr(n, hdr); err != nil {
11285
return nil, err
11386
}
114-
hdrLen := len(n.Data())
11587

11688
if estimatedLen < maxItems {
11789
// it'll probably fit
11890
for i := 0; i < maxItems; i++ {
119-
k, data, ok := iter()
91+
k, ok := iter()
12092
if !ok {
12193
// all done
12294
break
12395
}
12496
n.Links = append(n.Links, &merkledag.Link{Hash: k.Hash()})
125-
n.SetData(append(n.Data(), data...))
12697
}
12798
// sort by hash, also swap item Data
12899
s := sortByHash{
129100
links: n.Links[defaultFanout:],
130-
data: n.Data()[hdrLen:],
131101
}
132102
sort.Stable(s)
133103
}
134104

135-
// wasteful but simple
136-
type item struct {
137-
c *cid.Cid
138-
data []byte
139-
}
140-
hashed := make(map[uint32][]item)
105+
hashed := make([][]*cid.Cid, defaultFanout)
141106
for {
142-
k, data, ok := iter()
107+
// This loop essentially enumerates every single item in the set
108+
// and maps them all into a set of buckets. Each bucket will be recursively
109+
// turned into its own sub-set, and so on down the chain. Each sub-set
110+
// gets added to the dagservice, and put into its place in a set nodes
111+
// links array.
112+
//
113+
// Previously, the bucket was selected by taking an int32 from the hash of
114+
// the input key + seed. This was erroneous as we would later be assigning
115+
// the created sub-sets into an array of length 256 by the modulus of the
116+
// int32 hash value with 256. This resulted in overwriting existing sub-sets
117+
// and losing pins. The fix (a few lines down from this comment), is to
118+
// map the hash value down to the 8 bit keyspace here while creating the
119+
// buckets. This way, we avoid any overlapping later on.
120+
k, ok := iter()
143121
if !ok {
144122
break
145123
}
146-
h := hash(seed, k)
147-
hashed[h] = append(hashed[h], item{k, data})
124+
h := hash(seed, k) % defaultFanout
125+
hashed[h] = append(hashed[h], k)
148126
}
127+
149128
for h, items := range hashed {
150-
childIter := func() (c *cid.Cid, data []byte, ok bool) {
151-
if len(items) == 0 {
152-
return nil, nil, false
153-
}
154-
first := items[0]
155-
items = items[1:]
156-
return first.c, first.data, true
129+
if len(items) == 0 {
130+
// recursion base case
131+
continue
157132
}
133+
134+
childIter := getCidListIterator(items)
135+
136+
// recursively create a pinset from the items for this bucket index
158137
child, err := storeItems(ctx, dag, uint64(len(items)), childIter, internalKeys)
159138
if err != nil {
160139
return nil, err
161140
}
141+
162142
size, err := child.Size()
163143
if err != nil {
164144
return nil, err
165145
}
146+
166147
childKey, err := dag.Add(child)
167148
if err != nil {
168149
return nil, err
169150
}
151+
170152
internalKeys(childKey)
171-
l := &merkledag.Link{
172-
Name: "",
153+
154+
// overwrite the 'empty key' in the existing links array
155+
n.Links[h] = &merkledag.Link{
173156
Hash: childKey.Hash(),
174157
Size: size,
175158
}
176-
n.Links[int(h%defaultFanout)] = l
177159
}
178160
return n, nil
179161
}
180162

181-
func readHdr(n *merkledag.Node) (*pb.Set, []byte, error) {
163+
func readHdr(n *merkledag.Node) (*pb.Set, error) {
182164
hdrLenRaw, consumed := binary.Uvarint(n.Data())
183165
if consumed <= 0 {
184-
return nil, nil, errors.New("invalid Set header length")
166+
return nil, errors.New("invalid Set header length")
185167
}
186-
buf := n.Data()[consumed:]
187-
if hdrLenRaw > uint64(len(buf)) {
188-
return nil, nil, errors.New("impossibly large Set header length")
168+
169+
pbdata := n.Data()[consumed:]
170+
if hdrLenRaw > uint64(len(pbdata)) {
171+
return nil, errors.New("impossibly large Set header length")
189172
}
190173
// as hdrLenRaw was <= an int, we now know it fits in an int
191174
hdrLen := int(hdrLenRaw)
192175
var hdr pb.Set
193-
if err := proto.Unmarshal(buf[:hdrLen], &hdr); err != nil {
194-
return nil, nil, err
176+
if err := proto.Unmarshal(pbdata[:hdrLen], &hdr); err != nil {
177+
return nil, err
195178
}
196-
buf = buf[hdrLen:]
197179

198180
if v := hdr.GetVersion(); v != 1 {
199-
return nil, nil, fmt.Errorf("unsupported Set version: %d", v)
181+
return nil, fmt.Errorf("unsupported Set version: %d", v)
200182
}
201183
if uint64(hdr.GetFanout()) > uint64(len(n.Links)) {
202-
return nil, nil, errors.New("impossibly large Fanout")
184+
return nil, errors.New("impossibly large Fanout")
203185
}
204-
return &hdr, buf, nil
186+
return &hdr, nil
205187
}
206188

207189
func writeHdr(n *merkledag.Node, hdr *pb.Set) error {
208190
hdrData, err := proto.Marshal(hdr)
209191
if err != nil {
210192
return err
211193
}
212-
n.SetData(make([]byte, binary.MaxVarintLen64, binary.MaxVarintLen64+len(hdrData)))
213-
written := binary.PutUvarint(n.Data(), uint64(len(hdrData)))
214-
n.SetData(n.Data()[:written])
215-
n.SetData(append(n.Data(), hdrData...))
194+
195+
// make enough space for the length prefix and the marshalled header data
196+
data := make([]byte, binary.MaxVarintLen64, binary.MaxVarintLen64+len(hdrData))
197+
198+
// write the uvarint length of the header data
199+
uvarlen := binary.PutUvarint(data, uint64(len(hdrData)))
200+
201+
// append the actual protobuf data *after* the length value we wrote
202+
data = append(data[:uvarlen], hdrData...)
203+
204+
n.SetData(data)
216205
return nil
217206
}
218207

219-
type walkerFunc func(buf []byte, idx int, link *merkledag.Link) error
208+
type walkerFunc func(idx int, link *merkledag.Link) error
220209

221210
func walkItems(ctx context.Context, dag merkledag.DAGService, n *merkledag.Node, fn walkerFunc, children keyObserver) error {
222-
hdr, buf, err := readHdr(n)
211+
hdr, err := readHdr(n)
223212
if err != nil {
224213
return err
225214
}
226215
// readHdr guarantees fanout is a safe value
227216
fanout := hdr.GetFanout()
228217
for i, l := range n.Links[fanout:] {
229-
if err := fn(buf, i, l); err != nil {
218+
if err := fn(i, l); err != nil {
230219
return err
231220
}
232221
}
@@ -262,7 +251,7 @@ func loadSet(ctx context.Context, dag merkledag.DAGService, root *merkledag.Node
262251
}
263252

264253
var res []*cid.Cid
265-
walk := func(buf []byte, idx int, link *merkledag.Link) error {
254+
walk := func(idx int, link *merkledag.Link) error {
266255
res = append(res, cid.NewCidV0(link.Hash))
267256
return nil
268257
}
@@ -272,40 +261,21 @@ func loadSet(ctx context.Context, dag merkledag.DAGService, root *merkledag.Node
272261
return res, nil
273262
}
274263

275-
func loadMultiset(ctx context.Context, dag merkledag.DAGService, root *merkledag.Node, name string, internalKeys keyObserver) (map[key.Key]uint64, error) {
276-
l, err := root.GetNodeLink(name)
277-
if err != nil {
278-
return nil, fmt.Errorf("Failed to get link %s: %v", name, err)
279-
}
280-
c := cid.NewCidV0(l.Hash)
281-
internalKeys(c)
282-
n, err := l.GetNode(ctx, dag)
283-
if err != nil {
284-
return nil, fmt.Errorf("Failed to get node from link %s: %v", name, err)
285-
}
286-
287-
refcounts := make(map[key.Key]uint64)
288-
walk := func(buf []byte, idx int, link *merkledag.Link) error {
289-
var r refcount
290-
r.ReadFromIdx(buf, idx)
291-
refcounts[key.Key(link.Hash)] += uint64(r)
292-
return nil
293-
}
294-
if err := walkItems(ctx, dag, n, walk, internalKeys); err != nil {
295-
return nil, err
296-
}
297-
return refcounts, nil
298-
}
299-
300-
func storeSet(ctx context.Context, dag merkledag.DAGService, cids []*cid.Cid, internalKeys keyObserver) (*merkledag.Node, error) {
301-
iter := func() (c *cid.Cid, data []byte, ok bool) {
264+
func getCidListIterator(cids []*cid.Cid) itemIterator {
265+
return func() (c *cid.Cid, ok bool) {
302266
if len(cids) == 0 {
303-
return nil, nil, false
267+
return nil, false
304268
}
269+
305270
first := cids[0]
306271
cids = cids[1:]
307-
return first, nil, true
272+
return first, true
308273
}
274+
}
275+
276+
func storeSet(ctx context.Context, dag merkledag.DAGService, cids []*cid.Cid, internalKeys keyObserver) (*merkledag.Node, error) {
277+
iter := getCidListIterator(cids)
278+
309279
n, err := storeItems(ctx, dag, uint64(len(cids)), iter, internalKeys)
310280
if err != nil {
311281
return nil, err

0 commit comments

Comments
 (0)