Skip to content
This repository was archived by the owner on Jun 27, 2023. It is now read-only.

feat: hamt enumlinks custom #111

Merged
merged 10 commits into from
Nov 12, 2021
6 changes: 6 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,13 @@ require (
github.com/gopherjs/gopherjs v0.0.0-20190430165422-3e4dfb77656c // indirect
github.com/ipfs/go-bitfield v1.0.0
github.com/ipfs/go-bitswap v0.1.2 // indirect
github.com/ipfs/go-block-format v0.0.2
github.com/ipfs/go-blockservice v0.1.0
github.com/ipfs/go-cid v0.0.7
github.com/ipfs/go-datastore v0.0.5
github.com/ipfs/go-ipfs-blockstore v0.0.1
github.com/ipfs/go-ipfs-chunker v0.0.1
github.com/ipfs/go-ipfs-exchange-offline v0.0.1
github.com/ipfs/go-ipfs-files v0.0.3
github.com/ipfs/go-ipfs-posinfo v0.0.1
github.com/ipfs/go-ipfs-util v0.0.1
Expand All @@ -21,6 +26,7 @@ require (
github.com/spaolacci/murmur3 v1.1.0
github.com/stretchr/testify v1.7.0
github.com/warpfork/go-wish v0.0.0-20190328234359-8b3e70f8e830 // indirect
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9
)

go 1.16
1 change: 1 addition & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -335,6 +335,7 @@ golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwY
golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9 h1:SQFwaSi55rU7vdNs9Yr0Z324VNlrF+0wMqRXT4St8ck=
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
Expand Down
193 changes: 160 additions & 33 deletions hamt/hamt.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ import (
"context"
"fmt"
"os"
"sync"

"golang.org/x/sync/errgroup"

format "github.com/ipfs/go-unixfs"
"github.com/ipfs/go-unixfs/internal"
Expand Down Expand Up @@ -372,59 +375,183 @@ func (ds *Shard) EnumLinksAsync(ctx context.Context) <-chan format.LinkResult {
go func() {
defer close(linkResults)
defer cancel()
getLinks := makeAsyncTrieGetLinks(ds.dserv, linkResults)
cset := cid.NewSet()
rootNode, err := ds.Node()
if err != nil {
emitResult(ctx, linkResults, format.LinkResult{Link: nil, Err: err})
return
}
err = dag.Walk(ctx, getLinks, rootNode.Cid(), cset.Visit, dag.Concurrent())

err := parallelWalkDepth(ctx, ds, ds.dserv, func(formattedLink *ipld.Link) error {
emitResult(ctx, linkResults, format.LinkResult{Link: formattedLink, Err: nil})
return nil
})
if err != nil {
emitResult(ctx, linkResults, format.LinkResult{Link: nil, Err: err})
}
}()
return linkResults
}

// makeAsyncTrieGetLinks builds a getLinks function that can be used with EnumerateChildrenAsync
// to iterate a HAMT shard. It takes an IPLD Dag Service to fetch nodes, and a call back that will get called
// on all links to leaf nodes in a HAMT tree, so they can be collected for an EnumLinks operation
func makeAsyncTrieGetLinks(dagService ipld.DAGService, linkResults chan<- format.LinkResult) dag.GetLinks {

return func(ctx context.Context, currentCid cid.Cid) ([]*ipld.Link, error) {
node, err := dagService.Get(ctx, currentCid)
if err != nil {
return nil, err
}
directoryShard, err := NewHamtFromDag(dagService, node)
if err != nil {
return nil, err
}
type listCidShardUnion struct {
links []cid.Cid
shards []*Shard
}

childShards := make([]*ipld.Link, 0, directoryShard.childer.length())
links := directoryShard.childer.links
for idx := range directoryShard.childer.children {
lnk := links[idx]
lnkLinkType, err := directoryShard.childLinkType(lnk)
func (ds *Shard) walkLinks(processLinkValues func(formattedLink *ipld.Link) error) (*listCidShardUnion, error) {
res := &listCidShardUnion{}

for idx, lnk := range ds.childer.links {
if nextShard := ds.childer.children[idx]; nextShard == nil {
lnkLinkType, err := ds.childLinkType(lnk)
if err != nil {
return nil, err
}
if lnkLinkType == shardLink {
childShards = append(childShards, lnk)
} else {
sv, err := directoryShard.makeShardValue(lnk)

switch lnkLinkType {
case shardValueLink:
sv, err := ds.makeShardValue(lnk)
if err != nil {
return nil, err
}
formattedLink := sv.val
formattedLink.Name = sv.key
emitResult(ctx, linkResults, format.LinkResult{Link: formattedLink, Err: nil})

if err := processLinkValues(formattedLink); err != nil {
return nil, err
}
case shardLink:
res.links = append(res.links, lnk.Cid)
default:
return nil, fmt.Errorf("unsupported shard link type")
}

} else {
if nextShard.val != nil {
formattedLink := &ipld.Link{
Name: nextShard.key,
Size: nextShard.val.Size,
Cid: nextShard.val.Cid,
}
if err := processLinkValues(formattedLink); err != nil {
return nil, err
}
} else {
res.shards = append(res.shards, nextShard)
}
}
}
return res, nil
}

func parallelWalkDepth(ctx context.Context, root *Shard, dserv ipld.DAGService, processShardValues func(formattedLink *ipld.Link) error) error {
const concurrency = 32

var visitlk sync.Mutex
visitSet := cid.NewSet()
visit := visitSet.Visit

// Setup synchronization
grp, errGrpCtx := errgroup.WithContext(ctx)

// Input and output queues for workers.
feed := make(chan *listCidShardUnion)
out := make(chan *listCidShardUnion)
done := make(chan struct{})

for i := 0; i < concurrency; i++ {
grp.Go(func() error {
for shardOrCID := range feed {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: The walk algorithm here is more expansive than the original and its differences should be documented (as this is clearly a copy of the other and anyone reading this code will be thinking of the original when trying to reason through it). (Not referring to the GetMany optimization which is valid in itself and could even be incorporated to the Shard logic.)

In the original we process one parent node at a time (represented by its CID), extract its children, filter which should be emitted as output (value link/shards), and push the rest to the queue/feed one at a time to be processed independently in the next iteration, each as new parent node.

Here we send (after filtering) all the children together as a bulk (lists in listCidShardUnion) and extract all their children in turn together. (It might be a valid optimization and this comment is not against it, just advocating for more documentation around it). I'm not sure if this affects the traversal behavior expected by TestHAMTEnumerationWhenComputingSize; I don't think so but need more time to think about it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(edit: it was affecting tests, see comment below)

for _, nextShard := range shardOrCID.shards {
nextLinks, err := nextShard.walkLinks(processShardValues)
if err != nil {
return err
}

select {
case out <- nextLinks:
case <-errGrpCtx.Done():
return nil
}
}

var linksToVisit []cid.Cid
for _, nextLink := range shardOrCID.links {
var shouldVisit bool

visitlk.Lock()
shouldVisit = visit(nextLink)
visitlk.Unlock()

if shouldVisit {
linksToVisit = append(linksToVisit, nextLink)
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I think we could drop this optimization (to simplify the code) as I wouldn't expect to have repeated internal (non-value) shard nodes.


chNodes := dserv.GetMany(errGrpCtx, linksToVisit)
for optNode := range chNodes {
if optNode.Err != nil {
return optNode.Err
}

nextShard, err := NewHamtFromDag(dserv, optNode.Node)
if err != nil {
return err
}

nextLinks, err := nextShard.walkLinks(processShardValues)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: The general nomenclature around listCidShardUnion is a bit confusing. My general take is that we are processing all the children from the same node at once, together, both in-memory Shards or stored links, but there are some conflicting names:

  • The walkLinks function actually walks all children, both links and shards. (Similar the nextLinks in this line.)
  • The 'union' suffix in the structure's name makes me think we have an 'either of' situation.
  • Similar the shardOrCID name somewhere above in this function. Internally the HAMT stores each of its children in either of shard or link format. In that sense the 'union/or' terms are correct, but when processing all the children from a single node I think we should decouple ourselves from that mutually exclusive definition and focus on a single group of children (that yes, will be expressed in either of those two formats but doesn't seem key to the Walk algorithm).

if err != nil {
return err
}

select {
case out <- nextLinks:
case <-errGrpCtx.Done():
return nil
}
}

select {
case done <- struct{}{}:
case <-errGrpCtx.Done():
}
}
return nil
})
}

send := feed
var todoQueue []*listCidShardUnion
var inProgress int

next := &listCidShardUnion{
shards: []*Shard{root},
}

dispatcherLoop:
for {
select {
case send <- next:
inProgress++
if len(todoQueue) > 0 {
next = todoQueue[0]
todoQueue = todoQueue[1:]
} else {
next = nil
send = nil
}
case <-done:
inProgress--
if inProgress == 0 && next == nil {
break dispatcherLoop
}
case nextNodes := <-out:
if next == nil {
next = nextNodes
send = feed
} else {
todoQueue = append(todoQueue, nextNodes)
}
case <-errGrpCtx.Done():
break dispatcherLoop
}
return childShards, nil
}
close(feed)
return grp.Wait()
}

func emitResult(ctx context.Context, linkResults chan<- format.LinkResult, r format.LinkResult) {
Expand Down
66 changes: 48 additions & 18 deletions io/directory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,12 @@ package io
import (
"context"
"fmt"
blocks "github.com/ipfs/go-block-format"
bsrv "github.com/ipfs/go-blockservice"
ds "github.com/ipfs/go-datastore"
dssync "github.com/ipfs/go-datastore/sync"
blockstore "github.com/ipfs/go-ipfs-blockstore"
offline "github.com/ipfs/go-ipfs-exchange-offline"
"math"
"sort"
"strconv"
Expand Down Expand Up @@ -358,8 +364,11 @@ func TestHAMTEnumerationWhenComputingSize(t *testing.T) {
// with a regular structure to be able to predict how many Shard nodes we
// will need to fetch in order to reach the HAMTShardingSize threshold in
// sizeBelowThreshold (assuming a sequential DAG walk function).
ds := mdtest.Mock()
completeHAMTRoot, err := CreateCompleteHAMT(ds, treeHeight, shardWidth)

bstore := blockstore.NewBlockstore(dssync.MutexWrap(ds.NewMapDatastore()))
countGetsDS := newCountGetsDS(bstore)
dsrv := mdag.NewDAGService(bsrv.New(countGetsDS, offline.Exchange(countGetsDS)))
completeHAMTRoot, err := CreateCompleteHAMT(dsrv, treeHeight, shardWidth)
assert.NoError(t, err)

// With this structure and a BFS traversal (from `parallelWalkDepth`) then
Expand All @@ -374,8 +383,7 @@ func TestHAMTEnumerationWhenComputingSize(t *testing.T) {
// the HAMTShardingSize threshold.
nodesToFetch += thresholdToWidthRatio

countGetsDS := newCountGetsDS(ds)
hamtDir, err := newHAMTDirectoryFromNode(countGetsDS, completeHAMTRoot)
hamtDir, err := newHAMTDirectoryFromNode(dsrv, completeHAMTRoot)
assert.NoError(t, err)

countGetsDS.resetCounter()
Expand Down Expand Up @@ -537,21 +545,23 @@ func newEmptyHAMTDirectory(dserv ipld.DAGService, shardWidth int) (*HAMTDirector
// countGetsDS is a DAG service that keeps track of the number of
// unique CIDs fetched.
type countGetsDS struct {
ipld.DAGService
blockstore.Blockstore

cidsFetched map[cid.Cid]struct{}
mapLock sync.Mutex
started bool

getRequestDelay time.Duration
}

var _ ipld.DAGService = (*countGetsDS)(nil)
var _ blockstore.Blockstore = (*countGetsDS)(nil)

func newCountGetsDS(ds ipld.DAGService) *countGetsDS {
func newCountGetsDS(bs blockstore.Blockstore) *countGetsDS {
return &countGetsDS{
ds,
bs,
make(map[cid.Cid]struct{}),
sync.Mutex{},
false,
0,
}
}
Expand All @@ -560,6 +570,7 @@ func (d *countGetsDS) resetCounter() {
d.mapLock.Lock()
defer d.mapLock.Unlock()
d.cidsFetched = make(map[cid.Cid]struct{})
d.started = true
}

func (d *countGetsDS) uniqueCidsFetched() int {
Expand All @@ -572,12 +583,7 @@ func (d *countGetsDS) setRequestDelay(timeout time.Duration) {
d.getRequestDelay = timeout
}

func (d *countGetsDS) Get(ctx context.Context, c cid.Cid) (ipld.Node, error) {
node, err := d.DAGService.Get(ctx, c)
if err != nil {
return nil, err
}

func (d *countGetsDS) maybeSleep(c cid.Cid) {
d.mapLock.Lock()
_, cidRequestedBefore := d.cidsFetched[c]
d.cidsFetched[c] = struct{}{}
Expand All @@ -588,11 +594,35 @@ func (d *countGetsDS) Get(ctx context.Context, c cid.Cid) (ipld.Node, error) {
// Subsequent requests get no timeout simulating an in-disk cache.
time.Sleep(d.getRequestDelay)
}
}

func (d *countGetsDS) Has(c cid.Cid) (bool, error) {
if d.started {
panic("implement me")
}
return d.Blockstore.Has(c)
}

func (d *countGetsDS) Get(c cid.Cid) (blocks.Block, error) {
blk, err := d.Blockstore.Get(c)
if err != nil {
return nil, err
}

return node, nil
d.maybeSleep(c)
return blk, nil
}

// Process sequentially (blocking) calling Get which tracks requests.
func (d *countGetsDS) GetMany(ctx context.Context, cids []cid.Cid) <-chan *ipld.NodeOption {
panic("GetMany not supported")
func (d *countGetsDS) GetSize(c cid.Cid) (int, error) {
if d.started {
panic("implement me")
}
return d.Blockstore.GetSize(c)
}

func (d *countGetsDS) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) {
if d.started {
panic("implement me")
}
return d.Blockstore.AllKeysChan(ctx)
}