diff --git a/.circleci/config.yml b/.circleci/config.yml index db74b901..fcb08845 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -1,6 +1,6 @@ version: 2.1 orbs: - ci-go: ipfs/ci-go@0.2.9 + ci-go: ipfs/ci-go@0.3.1 executors: golang: diff --git a/allocator/allocator_test.go b/allocator/allocator_test.go index 43930410..09414100 100644 --- a/allocator/allocator_test.go +++ b/allocator/allocator_test.go @@ -319,7 +319,8 @@ func TestAllocator(t *testing.T) { select { case <-allocated: default: - pending = append(pending, pendingResultWithChan{pendingResult{op.p, op.amount}, allocated}) + + pending = append(pending, pendingResultWithChan{pendingResult{p: op.p, amount: op.amount}, allocated}) //nolint:gosimple } case releasePeer: err := allocator.ReleasePeerMemory(op.p) diff --git a/benchmarks/benchmark_test.go b/benchmarks/benchmark_test.go index 95875c24..48802393 100644 --- a/benchmarks/benchmark_test.go +++ b/benchmarks/benchmark_test.go @@ -29,17 +29,15 @@ import ( basicnode "github.com/ipld/go-ipld-prime/node/basic" ipldselector "github.com/ipld/go-ipld-prime/traversal/selector" "github.com/ipld/go-ipld-prime/traversal/selector/builder" - peer "github.com/libp2p/go-libp2p-core/peer" mocknet "github.com/libp2p/go-libp2p/p2p/net/mock" "github.com/stretchr/testify/require" + "golang.org/x/sync/errgroup" "github.com/ipfs/go-graphsync/benchmarks/testinstance" tn "github.com/ipfs/go-graphsync/benchmarks/testnet" graphsync "github.com/ipfs/go-graphsync/impl" ) -const stdBlockSize = 8000 - type runStats struct { Time time.Duration Name string @@ -96,34 +94,36 @@ func benchmarkRepeatedDisconnects(ctx context.Context, b *testing.B, numnodes in b.ReportAllocs() fetcher := instances[0] for i := 0; i < b.N; i++ { - var wg sync.WaitGroup ctx, cancel := context.WithTimeout(ctx, 10*time.Second) require.NoError(b, err) start := time.Now() + errgrp, grpctx := errgroup.WithContext(ctx) for j := 0; j < numnodes; j++ { instance := instances[j+1] - _, errChan := fetcher.Exchange.Request(ctx, instance.Peer, cidlink.Link{Cid: allCids[i][j]}, allSelector) + _, errChan := fetcher.Exchange.Request(grpctx, instance.Peer, cidlink.Link{Cid: allCids[i][j]}, allSelector) + other := instance.Peer - wg.Add(1) - go func(other peer.ID) { + errgrp.Go(func() error { defer func() { - mn.DisconnectPeers(fetcher.Peer, other) - wg.Done() + _ = mn.DisconnectPeers(fetcher.Peer, other) }() for { select { - case <-ctx.Done(): - return + case <-grpctx.Done(): + return nil case err, ok := <-errChan: if !ok { - return + return nil } - b.Fatalf("received error on request: %s", err.Error()) + return err } } - }(instance.Peer) + }) + + } + if err := errgrp.Wait(); err != nil { + b.Fatalf("received error on request: %s", err.Error()) } - wg.Wait() result := runStats{ Time: time.Since(start), Name: b.Name(), @@ -163,24 +163,24 @@ func p2pStrestTest(ctx context.Context, b *testing.B, numfiles int, df distFunc, b.ReportAllocs() for i := 0; i < b.N; i++ { fetcher := instances[i+1] - var wg sync.WaitGroup ctx, cancel := context.WithTimeout(ctx, 10*time.Second) require.NoError(b, err) start := time.Now() + errgrp, grpctx := errgroup.WithContext(ctx) for j := 0; j < numfiles; j++ { - responseChan, errChan := fetcher.Exchange.Request(ctx, instances[0].Peer, cidlink.Link{Cid: allCids[j]}, allSelector) - - wg.Add(1) - go func(j int) { - defer wg.Done() - for _ = range responseChan { + responseChan, errChan := fetcher.Exchange.Request(grpctx, instances[0].Peer, cidlink.Link{Cid: allCids[j]}, allSelector) + errgrp.Go(func() error { + for range responseChan { } for err := range errChan { - b.Fatalf("received error on request: %s", err.Error()) + return err } - }(j) + return nil + }) + } + if err := errgrp.Wait(); err != nil { + b.Fatalf("received error on request: %s", err.Error()) } - wg.Wait() result := runStats{ Time: time.Since(start), Name: b.Name(), @@ -216,31 +216,31 @@ func subtestDistributeAndFetch(ctx context.Context, b *testing.B, numnodes int, b.ReportAllocs() for i := 0; i < b.N; i++ { fetcher := instances[i+numnodes] - var wg sync.WaitGroup ctx, cancel := context.WithTimeout(ctx, 10*time.Second) require.NoError(b, err) start := time.Now() + errgrp, grpctx := errgroup.WithContext(ctx) for j := 0; j < numnodes; j++ { instance := instances[j] - _, errChan := fetcher.Exchange.Request(ctx, instance.Peer, cidlink.Link{Cid: destCids[j]}, allSelector) + _, errChan := fetcher.Exchange.Request(grpctx, instance.Peer, cidlink.Link{Cid: destCids[j]}, allSelector) - wg.Add(1) - go func() { - defer wg.Done() + errgrp.Go(func() error { for { select { case <-ctx.Done(): - return + return err case err, ok := <-errChan: if !ok { - return + return nil } - b.Fatalf("received error on request: %s", err.Error()) + return err } } - }() + }) + } + if err := errgrp.Wait(); err != nil { + b.Fatalf("received error on request: %s", err.Error()) } - wg.Wait() result := runStats{ Time: time.Since(start), Name: b.Name(), diff --git a/benchmarks/testinstance/testinstance.go b/benchmarks/testinstance/testinstance.go index 1b4048cd..8cc4b188 100644 --- a/benchmarks/testinstance/testinstance.go +++ b/benchmarks/testinstance/testinstance.go @@ -111,8 +111,7 @@ func Close(instances []Instance) error { // Instance is a test instance of bitswap + dependencies for integration testing type Instance struct { Peer peer.ID - Loader ipld.Loader - Storer ipld.Storer + LinkSystem ipld.LinkSystem Exchange graphsync.GraphExchange BlockStore blockstore.Blockstore Adapter gsnet.GraphSyncNetwork @@ -165,9 +164,8 @@ func NewInstance(ctx context.Context, net tn.Network, p tnet.Identity, gsOptions return Instance{}, err } - loader := storeutil.LoaderForBlockstore(bstore) - storer := storeutil.StorerForBlockstore(bstore) - gs := gsimpl.New(ctx, adapter, loader, storer, gsOptions...) + lsys := storeutil.LinkSystemForBlockstore(bstore) + gs := gsimpl.New(ctx, adapter, lsys, gsOptions...) gs.RegisterIncomingRequestHook(func(p peer.ID, request graphsync.RequestData, hookActions graphsync.IncomingRequestHookActions) { hookActions.ValidateRequest() }) @@ -176,8 +174,7 @@ func NewInstance(ctx context.Context, net tn.Network, p tnet.Identity, gsOptions Adapter: adapter, Peer: p.ID(), Exchange: gs, - Loader: loader, - Storer: storer, + LinkSystem: lsys, BlockStore: bstore, blockstoreDelay: bsdelay, ds: dstore, diff --git a/benchmarks/testnet/peernet.go b/benchmarks/testnet/peernet.go index 99f2704d..1bc660f4 100644 --- a/benchmarks/testnet/peernet.go +++ b/benchmarks/testnet/peernet.go @@ -24,7 +24,10 @@ func (pn *peernet) Adapter(p tnet.Identity) gsnet.GraphSyncNetwork { if err != nil { panic(err.Error()) } - pn.Mocknet.LinkAll() + err = pn.Mocknet.LinkAll() + if err != nil { + panic(err.Error()) + } return gsnet.NewFromLibp2pHost(client) } diff --git a/go.mod b/go.mod index 2a94b2c7..41538828 100644 --- a/go.mod +++ b/go.mod @@ -28,8 +28,8 @@ require ( github.com/ipfs/go-merkledag v0.3.2 github.com/ipfs/go-peertaskqueue v0.2.0 github.com/ipfs/go-unixfs v0.2.4 - github.com/ipld/go-ipld-prime v0.7.0 - github.com/ipld/go-ipld-prime-proto v0.1.1 + github.com/ipld/go-codec-dagpb v1.2.0 + github.com/ipld/go-ipld-prime v0.9.1-0.20210324083106-dc342a9917db github.com/jbenet/go-random v0.0.0-20190219211222-123a90aedc0c github.com/libp2p/go-buffer-pool v0.0.2 github.com/libp2p/go-libp2p v0.13.0 @@ -39,11 +39,11 @@ require ( github.com/libp2p/go-libp2p-testing v0.4.0 github.com/libp2p/go-msgio v0.0.6 github.com/multiformats/go-multiaddr v0.3.1 - github.com/multiformats/go-multihash v0.0.14 - github.com/polydawn/refmt v0.0.0-20201211092308-30ac6d18308e // indirect + github.com/multiformats/go-multihash v0.0.15 github.com/smartystreets/assertions v1.0.1 // indirect github.com/stretchr/testify v1.6.1 github.com/whyrusleeping/cbor-gen v0.0.0-20210219115102-f37d292932f2 + golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9 golang.org/x/tools v0.1.1-0.20210225150353-54dc8c5edb56 // indirect golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 google.golang.org/protobuf v1.25.0 diff --git a/go.sum b/go.sum index 2c0592f2..997b24ae 100644 --- a/go.sum +++ b/go.sum @@ -69,6 +69,8 @@ github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1m github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c= github.com/flynn/noise v0.0.0-20180327030543-2492fe189ae6 h1:u/UEqS66A5ckRmS4yNpjmVH56sVtS/RfclBAYocb4as= github.com/flynn/noise v0.0.0-20180327030543-2492fe189ae6/go.mod h1:1i71OnUq3iUe1ma7Lr6yG6/rjvM3emb6yoL7xLFzcVQ= +github.com/frankban/quicktest v1.11.3 h1:8sXhOn0uLys67V8EsXLc6eszDs8VXWxL3iRvebPhedY= +github.com/frankban/quicktest v1.11.3/go.mod h1:wRf/ReqHper53s+kmmSZizM8NamnL3IM0I9ntUbOk+k= github.com/fsnotify/fsnotify v1.4.7 h1:IXs+QLmnXW2CcXuY+8Mzv/fWEsPGWxqefPtCP5CnV9I= github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= github.com/go-check/check v0.0.0-20180628173108-788fd7840127/go.mod h1:9ES+weclKsC9YodN5RgxqK/VD9HM9JsCSh7rNhMZE98= @@ -98,8 +100,9 @@ github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5a github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= -github.com/google/go-cmp v0.5.0 h1:/QaMHBdZ26BB3SSst0Iwl10Epc+xhTquomWX0oZEB6w= github.com/google/go-cmp v0.5.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.5.4 h1:L8R9j+yAqZuZjsqh/z+F1NCffTKKLShY6zXTItVIZ8M= +github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/gopacket v1.1.17 h1:rMrlX2ZY2UbvT+sdz3+6J+pp2z+msCq9MxTU6ymxbBY= github.com/google/gopacket v1.1.17/go.mod h1:UdDNZ1OO62aGYVnPhxT1U6aI7ukYtA/kB8vaU0diBUM= github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI= @@ -223,7 +226,6 @@ github.com/ipfs/go-log/v2 v2.0.5/go.mod h1:eZs4Xt4ZUJQFM3DlanGhy7TkwwawCZcSByscw github.com/ipfs/go-log/v2 v2.1.1 h1:G4TtqN+V9y9HY9TA6BwbCVyyBZ2B9MbCjR2MtGx8FR0= github.com/ipfs/go-log/v2 v2.1.1/go.mod h1:2v2nsGfZsvvAJz13SyFzf9ObaqwHiHxsPLEHntrv9KM= github.com/ipfs/go-merkledag v0.2.3/go.mod h1:SQiXrtSts3KGNmgOzMICy5c0POOpUNQLvB3ClKnBAlk= -github.com/ipfs/go-merkledag v0.3.1/go.mod h1:fvkZNNZixVW6cKSZ/JfLlON5OlgTXNdRLz0p6QG/I2M= github.com/ipfs/go-merkledag v0.3.2 h1:MRqj40QkrWkvPswXs4EfSslhZ4RVPRbxwX11js0t1xY= github.com/ipfs/go-merkledag v0.3.2/go.mod h1:fvkZNNZixVW6cKSZ/JfLlON5OlgTXNdRLz0p6QG/I2M= github.com/ipfs/go-metrics-interface v0.0.1 h1:j+cpbjYvu4R8zbleSs36gvB7jR+wsL2fGD6n0jO4kdg= @@ -236,10 +238,11 @@ github.com/ipfs/go-unixfs v0.2.4 h1:6NwppOXefWIyysZ4LR/qUBPvXd5//8J3jiMdvpbw6Lo= github.com/ipfs/go-unixfs v0.2.4/go.mod h1:SUdisfUjNoSDzzhGVxvCL9QO/nKdwXdr+gbMUdqcbYw= github.com/ipfs/go-verifcid v0.0.1 h1:m2HI7zIuR5TFyQ1b79Da5N9dnnCP1vcu2QqawmWlK2E= github.com/ipfs/go-verifcid v0.0.1/go.mod h1:5Hrva5KBeIog4A+UpqlaIU+DEstipcJYQQZc0g37pY0= -github.com/ipld/go-ipld-prime v0.7.0 h1:eigF1ZpaL1prbsKYVMqPLoPJqD/pzkQOe2j1uzvVg7w= -github.com/ipld/go-ipld-prime v0.7.0/go.mod h1:0xEgdD6MKbZ1vF0GC+YcR/C4SQCAlRuOjIJ2i0HxqzM= -github.com/ipld/go-ipld-prime-proto v0.1.1 h1:EX4yWYaIqSLwtVE30nxEcZDcvsWDtx1vImSG+XCJebY= -github.com/ipld/go-ipld-prime-proto v0.1.1/go.mod h1:cI9NwYAUKCLUwqufoUjChISxuTEkaY2yvNYCRCuhRck= +github.com/ipld/go-codec-dagpb v1.2.0 h1:2umV7ud8HBMkRuJgd8gXw95cLhwmcYrihS3cQEy9zpI= +github.com/ipld/go-codec-dagpb v1.2.0/go.mod h1:6nBN7X7h8EOsEejZGqC7tej5drsdBAXbMHyBT+Fne5s= +github.com/ipld/go-ipld-prime v0.9.0/go.mod h1:KvBLMr4PX1gWptgkzRjVZCrLmSGcZCb/jioOQwCqZN8= +github.com/ipld/go-ipld-prime v0.9.1-0.20210324083106-dc342a9917db h1:kFwGn8rXa/Z31ev1OFNQsYeNKNCdifnTPl/NvPy5L38= +github.com/ipld/go-ipld-prime v0.9.1-0.20210324083106-dc342a9917db/go.mod h1:KvBLMr4PX1gWptgkzRjVZCrLmSGcZCb/jioOQwCqZN8= github.com/jackpal/gateway v1.0.5/go.mod h1:lTpwd4ACLXmpyiCTRtfiNyVnUmqT9RivzCDQetPfnjA= github.com/jackpal/go-nat-pmp v1.0.1/go.mod h1:QPH045xvCAeXUZOxsnwmrtiCoxIr9eob+4orBN1SBKc= github.com/jackpal/go-nat-pmp v1.0.2 h1:KzKSgb7qkJvOUTqYl9/Hg/me3pWgBmERKrTGD7BdWus= @@ -268,13 +271,16 @@ github.com/kisielk/errcheck v1.2.0/go.mod h1:/BMXB+zMLi60iA8Vv6Ksmxu/1UDYcXs4uQL github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= github.com/kkdai/bstream v0.0.0-20161212061736-f391b8402d23/go.mod h1:J+Gs4SYgM6CZQHDETBtE9HaSEkGmuNXF86RwHhHUvq4= +github.com/klauspost/cpuid/v2 v2.0.4 h1:g0I61F2K2DjRHz1cnxlkNSBIaePVoJIjjnHui8QHbiw= +github.com/klauspost/cpuid/v2 v2.0.4/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg= github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= github.com/koron/go-ssdp v0.0.0-20180514024734-4a0ed625a78b/go.mod h1:5Ky9EC2xfoUKUor0Hjgi2BJhCSXJfMOFlmyYrVKGQMk= github.com/koron/go-ssdp v0.0.0-20191105050749-2e1c40ed0b5d h1:68u9r4wEvL3gYg2jvAOgROwZ3H+Y3hIDk4tbbmIjcYQ= github.com/koron/go-ssdp v0.0.0-20191105050749-2e1c40ed0b5d/go.mod h1:5Ky9EC2xfoUKUor0Hjgi2BJhCSXJfMOFlmyYrVKGQMk= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= -github.com/kr/pretty v0.2.0 h1:s5hAObm+yFO5uHYt5dYjxi2rXrsnmRpJx4OYvIWUaQs= github.com/kr/pretty v0.2.0/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= +github.com/kr/pretty v0.2.1 h1:Fmg33tUaq4/8ym9TJN1x7sLJnHVwhP33CNkpYV/7rwI= +github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= @@ -494,6 +500,8 @@ github.com/minio/sha256-simd v0.1.0/go.mod h1:2FMWW+8GMoPweT6+pI63m9YE3Lmw4J71hV github.com/minio/sha256-simd v0.1.1-0.20190913151208-6de447530771/go.mod h1:B5e1o+1/KgNmWrSQK08Y6Z1Vb5pwIktudl0J58iy0KM= github.com/minio/sha256-simd v0.1.1 h1:5QHSlgo3nt5yKOJrC7W8w7X+NFl8cMPZm96iu8kKUJU= github.com/minio/sha256-simd v0.1.1/go.mod h1:B5e1o+1/KgNmWrSQK08Y6Z1Vb5pwIktudl0J58iy0KM= +github.com/minio/sha256-simd v1.0.0 h1:v1ta+49hkWZyvaKwrQB8elexRqm6Y0aMLjCNsrYxo6g= +github.com/minio/sha256-simd v1.0.0/go.mod h1:OuYzVNI5vcoYIAmbIvHPl3N3jUzVedXbKy5RFepssQM= github.com/mitchellh/go-homedir v1.1.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrkLzIz1N1q0pr0= github.com/mitchellh/mapstructure v1.1.2/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y= github.com/mr-tron/base58 v1.1.0/go.mod h1:xcD2VGqlgYjBdcBLw+TuYLr8afG+Hj8g2eTVqeSzSU8= @@ -543,6 +551,8 @@ github.com/multiformats/go-multihash v0.0.10/go.mod h1:YSLudS+Pi8NHE7o6tb3D8vrpK github.com/multiformats/go-multihash v0.0.13/go.mod h1:VdAWLKTwram9oKAatUcLxBNUjdtcVwxObEQBtRfuyjc= github.com/multiformats/go-multihash v0.0.14 h1:QoBceQYQQtNUuf6s7wHxnE2c8bhbMqhfGzNI032se/I= github.com/multiformats/go-multihash v0.0.14/go.mod h1:VdAWLKTwram9oKAatUcLxBNUjdtcVwxObEQBtRfuyjc= +github.com/multiformats/go-multihash v0.0.15 h1:hWOPdrNqDjwHDx82vsYGSDZNyktOJJ2dzZJzFkOV1jM= +github.com/multiformats/go-multihash v0.0.15/go.mod h1:D6aZrWNLFTV/ynMpKsNtB40mJzmCl4jb1alC0OvHiHg= github.com/multiformats/go-multistream v0.1.0/go.mod h1:fJTiDfXJVmItycydCnNx4+wSzZ5NwG2FEVAI30fiovg= github.com/multiformats/go-multistream v0.1.1/go.mod h1:KmHZ40hzVxiaiwlj3MEbYgK9JFk2/9UktWZAF54Du38= github.com/multiformats/go-multistream v0.2.0 h1:6AuNmQVKUkRnddw2YiDjt5Elit40SFxMJkVnhmETXtU= @@ -679,11 +689,11 @@ golang.org/x/crypto v0.0.0-20190530122614-20be4c3c3ed5/go.mod h1:yigFU9vqHzYiE8U golang.org/x/crypto v0.0.0-20190611184440-5c40567a22f8/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20190618222545-ea8f1a30c443/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= -golang.org/x/crypto v0.0.0-20200117160349-530e935923ad/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20200221231518-2aa609cf4a9d/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20200423211502-4bdfaf469ed5/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= -golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9 h1:psW17arqaxU48Z5kZ0CQnkZWQJsqcURM6tKiBApRjXI= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= +golang.org/x/crypto v0.0.0-20210220033148-5ea612d1eb83 h1:/ZScEX8SfEmUGRHs0gxpqteO5nfNW6axyZbBdw9A12g= +golang.org/x/crypto v0.0.0-20210220033148-5ea612d1eb83/go.mod h1:jdWPYTVW3xRLrWPugEBEK3UY2ZEsg3UU495nc5E+M+I= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE= golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU= @@ -738,13 +748,16 @@ golang.org/x/sys v0.0.0-20190610200419-93c9922d18ae/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20190626221950-04f50cda93cb/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190904154756-749cb33beabd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190924154521-2837fb4f24fe/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20191026070338-33540a1f6037/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20191120155948-bd437916bb0e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20200122134326-e047566fdf82/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200202164722-d101bd2416d5/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200223170610-d5e6a3e2c0ae/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210124154548-22da62e12c0c h1:VwygUrnw9jn88c4u8GD3rZQbqrP/tgas88tPUbBxQrk= golang.org/x/sys v0.0.0-20210124154548-22da62e12c0c/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210309074719-68d13333faf2 h1:46ULzRKLh1CwgRq2dC5SlBzEqqNCi8rreOZnNrbqcIY= +golang.org/x/sys v0.0.0-20210309074719-68d13333faf2/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= golang.org/x/text v0.3.3 h1:cokOdA+Jmi5PJGXLlLllQSgYigAEfHXJAERHVMaCc2k= diff --git a/graphsync.go b/graphsync.go index 05bb7312..39cc5c00 100644 --- a/graphsync.go +++ b/graphsync.go @@ -257,7 +257,7 @@ type GraphExchange interface { Request(ctx context.Context, p peer.ID, root ipld.Link, selector ipld.Node, extensions ...ExtensionData) (<-chan ResponseProgress, <-chan error) // RegisterPersistenceOption registers an alternate loader/storer combo that can be substituted for the default - RegisterPersistenceOption(name string, loader ipld.Loader, storer ipld.Storer) error + RegisterPersistenceOption(name string, lsys ipld.LinkSystem) error // UnregisterPersistenceOption unregisters an alternate loader/storer combo UnregisterPersistenceOption(name string) error diff --git a/impl/graphsync.go b/impl/graphsync.go index 04aaafe8..5009db6d 100644 --- a/impl/graphsync.go +++ b/impl/graphsync.go @@ -36,8 +36,7 @@ const defaultMaxInProgressRequests = uint64(6) // the graphsync protocol. type GraphSync struct { network gsnet.GraphSyncNetwork - loader ipld.Loader - storer ipld.Storer + linkSystem ipld.LinkSystem requestManager *requestmanager.RequestManager responseManager *responsemanager.ResponseManager asyncLoader *asyncloader.AsyncLoader @@ -107,7 +106,7 @@ func MaxInProgressRequests(maxInProgressRequests uint64) Option { // New creates a new GraphSync Exchange on the given network, // and the given link loader+storer. func New(parent context.Context, network gsnet.GraphSyncNetwork, - loader ipld.Loader, storer ipld.Storer, options ...Option) graphsync.GraphExchange { + linkSystem ipld.LinkSystem, options ...Option) graphsync.GraphExchange { ctx, cancel := context.WithCancel(parent) gsConfig := &graphsyncConfigOptions{ @@ -139,15 +138,14 @@ func New(parent context.Context, network gsnet.GraphSyncNetwork, return messagequeue.New(ctx, p, network, allocator) } peerManager := peermanager.NewMessageManager(ctx, createMessageQueue) - asyncLoader := asyncloader.New(ctx, loader, storer) + asyncLoader := asyncloader.New(ctx, linkSystem) requestManager := requestmanager.New(ctx, asyncLoader, outgoingRequestHooks, incomingResponseHooks, incomingBlockHooks, networkErrorListeners) responseAssembler := responseassembler.New(ctx, peerManager) peerTaskQueue := peertaskqueue.New() - responseManager := responsemanager.New(ctx, loader, responseAssembler, peerTaskQueue, incomingRequestHooks, outgoingBlockHooks, requestUpdatedHooks, completedResponseListeners, requestorCancelledListeners, blockSentListeners, networkErrorListeners, gsConfig.maxInProgressRequests) + responseManager := responsemanager.New(ctx, linkSystem, responseAssembler, peerTaskQueue, incomingRequestHooks, outgoingBlockHooks, requestUpdatedHooks, completedResponseListeners, requestorCancelledListeners, blockSentListeners, networkErrorListeners, gsConfig.maxInProgressRequests) graphSync := &GraphSync{ network: network, - loader: loader, - storer: storer, + linkSystem: linkSystem, requestManager: requestManager, responseManager: responseManager, asyncLoader: asyncLoader, @@ -203,12 +201,12 @@ func (gs *GraphSync) RegisterOutgoingRequestHook(hook graphsync.OnOutgoingReques } // RegisterPersistenceOption registers an alternate loader/storer combo that can be substituted for the default -func (gs *GraphSync) RegisterPersistenceOption(name string, loader ipld.Loader, storer ipld.Storer) error { - err := gs.asyncLoader.RegisterPersistenceOption(name, loader, storer) +func (gs *GraphSync) RegisterPersistenceOption(name string, lsys ipld.LinkSystem) error { + err := gs.asyncLoader.RegisterPersistenceOption(name, lsys) if err != nil { return err } - return gs.persistenceOptions.Register(name, loader) + return gs.persistenceOptions.Register(name, lsys) } // UnregisterPersistenceOption unregisters an alternate loader/storer combo diff --git a/impl/graphsync_test.go b/impl/graphsync_test.go index b2baa1c0..24244455 100644 --- a/impl/graphsync_test.go +++ b/impl/graphsync_test.go @@ -44,6 +44,7 @@ import ( "github.com/ipfs/go-graphsync/ipldutil" gsmsg "github.com/ipfs/go-graphsync/message" gsnet "github.com/ipfs/go-graphsync/network" + "github.com/ipfs/go-graphsync/storeutil" "github.com/ipfs/go-graphsync/testutil" ) @@ -60,7 +61,7 @@ func TestMakeRequestToNetwork(t *testing.T) { graphSync := td.GraphSyncHost1() blockChainLength := 100 - blockChain := testutil.SetupBlockChain(ctx, t, td.loader1, td.storer1, 100, blockChainLength) + blockChain := testutil.SetupBlockChain(ctx, t, td.persistence1, 100, blockChainLength) requestCtx, requestCancel := context.WithCancel(ctx) defer requestCancel() @@ -110,7 +111,7 @@ func TestSendResponseToIncomingRequest(t *testing.T) { ) blockChainLength := 100 - blockChain := testutil.SetupBlockChain(ctx, t, td.loader2, td.storer2, 100, blockChainLength) + blockChain := testutil.SetupBlockChain(ctx, t, td.persistence2, 100, blockChainLength) requestID := graphsync.RequestID(rand.Int31()) @@ -164,7 +165,7 @@ func TestRejectRequestsByDefault(t *testing.T) { _ = td.GraphSyncHost2(RejectAllRequestsByDefault()) blockChainLength := 5 - blockChain := testutil.SetupBlockChain(ctx, t, td.loader2, td.storer2, 5, blockChainLength) + blockChain := testutil.SetupBlockChain(ctx, t, td.persistence2, 5, blockChainLength) // send request across network progressChan, errChan := requestor.Request(ctx, td.host2.ID(), blockChain.TipLink, blockChain.Selector(), td.extension) @@ -185,7 +186,7 @@ func TestGraphsyncRoundTrip(t *testing.T) { // setup receiving peer to just record message coming in blockChainLength := 100 - blockChain := testutil.SetupBlockChain(ctx, t, td.loader2, td.storer2, 100, blockChainLength) + blockChain := testutil.SetupBlockChain(ctx, t, td.persistence2, 100, blockChainLength) // initialize graphsync on second node to response to requests responder := td.GraphSyncHost2() @@ -295,7 +296,7 @@ func TestGraphsyncRoundTripIgnoreCids(t *testing.T) { // setup receiving peer to just record message coming in blockChainLength := 100 - blockChain := testutil.SetupBlockChain(ctx, t, td.loader2, td.storer2, 100, blockChainLength) + blockChain := testutil.SetupBlockChain(ctx, t, td.persistence2, 100, blockChainLength) firstHalf := blockChain.Blocks(0, 50) set := cid.NewSet() @@ -344,7 +345,7 @@ func TestPauseResume(t *testing.T) { // setup receiving peer to just record message coming in blockChainLength := 100 - blockChain := testutil.SetupBlockChain(ctx, t, td.loader2, td.storer2, 100, blockChainLength) + blockChain := testutil.SetupBlockChain(ctx, t, td.persistence2, 100, blockChainLength) // initialize graphsync on second node to response to requests responder := td.GraphSyncHost2() @@ -396,7 +397,7 @@ func TestPauseResumeRequest(t *testing.T) { // setup receiving peer to just record message coming in blockChainLength := 100 blockSize := 100 - blockChain := testutil.SetupBlockChain(ctx, t, td.loader2, td.storer2, uint64(blockSize), blockChainLength) + blockChain := testutil.SetupBlockChain(ctx, t, td.persistence2, uint64(blockSize), blockChainLength) // initialize graphsync on second node to response to requests _ = td.GraphSyncHost2() @@ -454,7 +455,7 @@ func TestPauseResumeViaUpdate(t *testing.T) { // setup receiving peer to just record message coming in blockChainLength := 100 - blockChain := testutil.SetupBlockChain(ctx, t, td.loader2, td.storer2, 100, blockChainLength) + blockChain := testutil.SetupBlockChain(ctx, t, td.persistence2, 100, blockChainLength) // initialize graphsync on second node to response to requests responder := td.GraphSyncHost2() @@ -503,7 +504,7 @@ func TestPauseResumeViaUpdateOnBlockHook(t *testing.T) { // setup receiving peer to just record message coming in blockChainLength := 100 - blockChain := testutil.SetupBlockChain(ctx, t, td.loader2, td.storer2, 100, blockChainLength) + blockChain := testutil.SetupBlockChain(ctx, t, td.persistence2, 100, blockChainLength) stopPoint := 50 blocksReceived := 0 @@ -562,7 +563,7 @@ func TestNetworkDisconnect(t *testing.T) { // setup receiving peer to just record message coming in blockChainLength := 100 - blockChain := testutil.SetupBlockChain(ctx, t, td.loader2, td.storer2, 100, blockChainLength) + blockChain := testutil.SetupBlockChain(ctx, t, td.persistence2, 100, blockChainLength) // initialize graphsync on second node to response to requests responder := td.GraphSyncHost2() @@ -609,8 +610,8 @@ func TestNetworkDisconnect(t *testing.T) { testutil.AssertChannelEmpty(t, networkError, "no network errors so far") // unlink peers so they cannot communicate - td.mn.DisconnectPeers(td.host1.ID(), td.host2.ID()) - td.mn.UnlinkPeers(td.host1.ID(), td.host2.ID()) + require.NoError(t, td.mn.DisconnectPeers(td.host1.ID(), td.host2.ID())) + require.NoError(t, td.mn.UnlinkPeers(td.host1.ID(), td.host2.ID())) requestID := <-requestIDChan err := responder.UnpauseResponse(td.host1.ID(), requestID) require.NoError(t, err) @@ -632,14 +633,14 @@ func TestConnectFail(t *testing.T) { requestor := td.GraphSyncHost1() blockChainLength := 100 - blockChain := testutil.SetupBlockChain(ctx, t, td.loader2, td.storer2, 100, blockChainLength) + blockChain := testutil.SetupBlockChain(ctx, t, td.persistence2, 100, blockChainLength) requestCtx, requestCancel := context.WithTimeout(ctx, 1*time.Second) defer requestCancel() // unlink peers so they cannot communicate - td.mn.DisconnectPeers(td.host1.ID(), td.host2.ID()) - td.mn.UnlinkPeers(td.host1.ID(), td.host2.ID()) + require.NoError(t, td.mn.DisconnectPeers(td.host1.ID(), td.host2.ID())) + require.NoError(t, td.mn.UnlinkPeers(td.host1.ID(), td.host2.ID())) reqNetworkError := make(chan error, 1) requestor.RegisterNetworkErrorListener(func(p peer.ID, request graphsync.RequestData, err error) { @@ -671,20 +672,22 @@ func TestGraphsyncRoundTripAlternatePersistenceAndNodes(t *testing.T) { // alternate storing location for responder altStore1 := make(map[ipld.Link][]byte) - altLoader1, altStorer1 := testutil.NewTestStore(altStore1) + altPersistence1 := testutil.NewTestStore(altStore1) // alternate storing location for requestor altStore2 := make(map[ipld.Link][]byte) - altLoader2, altStorer2 := testutil.NewTestStore(altStore2) + altPersistence2 := testutil.NewTestStore(altStore2) - err := requestor.RegisterPersistenceOption("chainstore", altLoader1, altStorer1) + err := requestor.RegisterPersistenceOption("chainstore", altPersistence1) require.NoError(t, err) - err = responder.RegisterPersistenceOption("chainstore", altLoader2, altStorer2) + err = responder.RegisterPersistenceOption("chainstore", altPersistence2) require.NoError(t, err) blockChainLength := 100 - blockChain := testutil.SetupBlockChain(ctx, t, altLoader1, altStorer2, 100, blockChainLength) + blockChainPersistence := altPersistence1 + blockChainPersistence.StorageWriteOpener = altPersistence2.StorageWriteOpener + blockChain := testutil.SetupBlockChain(ctx, t, blockChainPersistence, 100, blockChainLength) extensionName := graphsync.ExtensionName("blockchain") extension := graphsync.ExtensionData{ @@ -734,20 +737,20 @@ func TestGraphsyncRoundTripMultipleAlternatePersistence(t *testing.T) { // alternate storing location for responder altStore1 := make(map[ipld.Link][]byte) - altLoader1, altStorer1 := testutil.NewTestStore(altStore1) + altPersistence1 := testutil.NewTestStore(altStore1) // alternate storing location for requestor altStore2 := make(map[ipld.Link][]byte) - altLoader2, altStorer2 := testutil.NewTestStore(altStore2) + altPersistence2 := testutil.NewTestStore(altStore2) - err := requestor.RegisterPersistenceOption("chainstore1", altLoader1, altStorer1) + err := requestor.RegisterPersistenceOption("chainstore1", altPersistence1) require.NoError(t, err) - err = requestor.RegisterPersistenceOption("chainstore2", altLoader2, altStorer2) + err = requestor.RegisterPersistenceOption("chainstore2", altPersistence2) require.NoError(t, err) blockChainLength := 100 - blockChain := testutil.SetupBlockChain(ctx, t, td.loader2, td.storer2, 100, blockChainLength) + blockChain := testutil.SetupBlockChain(ctx, t, td.persistence2, 100, blockChainLength) extensionName1 := graphsync.ExtensionName("blockchain1") extension1 := graphsync.ExtensionData{ @@ -808,7 +811,9 @@ func TestRoundTripLargeBlocksSlowNetwork(t *testing.T) { // setup receiving peer to just record message coming in blockChainLength := 40 - blockChain := testutil.SetupBlockChain(ctx, t, td.loader1, td.storer2, 200000, blockChainLength) + blockChainPersistence := td.persistence1 + blockChainPersistence.StorageWriteOpener = td.persistence2.StorageWriteOpener + blockChain := testutil.SetupBlockChain(ctx, t, blockChainPersistence, 200000, blockChainLength) // initialize graphsync on second node to response to requests td.GraphSyncHost2() @@ -840,38 +845,6 @@ func TestUnixFSFetch(t *testing.T) { ctx, cancel := context.WithTimeout(ctx, 20*time.Second) defer cancel() - makeLoader := func(bs bstore.Blockstore) ipld.Loader { - return func(lnk ipld.Link, lnkCtx ipld.LinkContext) (io.Reader, error) { - c, ok := lnk.(cidlink.Link) - if !ok { - return nil, errors.New("Incorrect Link Type") - } - // read block from one store - block, err := bs.Get(c.Cid) - if err != nil { - return nil, err - } - return bytes.NewReader(block.RawData()), nil - } - } - - makeStorer := func(bs bstore.Blockstore) ipld.Storer { - return func(lnkCtx ipld.LinkContext) (io.Writer, ipld.StoreCommitter, error) { - var buf bytes.Buffer - var committer ipld.StoreCommitter = func(lnk ipld.Link) error { - c, ok := lnk.(cidlink.Link) - if !ok { - return errors.New("Incorrect Link Type") - } - block, err := blocks.NewBlockWithCid(buf.Bytes(), c.Cid) - if err != nil { - return err - } - return bs.Put(block) - } - return &buf, committer, nil - } - } // make a blockstore and dag service bs1 := bstore.NewBlockstore(dss.MutexWrap(datastore.NewMapDatastore())) @@ -913,16 +886,14 @@ func TestUnixFSFetch(t *testing.T) { origBytes := buf.Bytes() // setup an IPLD loader/storer for blockstore 1 - loader1 := makeLoader(bs1) - storer1 := makeStorer(bs1) + persistence1 := storeutil.LinkSystemForBlockstore(bs1) // setup an IPLD loader/storer for blockstore 2 - loader2 := makeLoader(bs2) - storer2 := makeStorer(bs2) + persistence2 := storeutil.LinkSystemForBlockstore(bs2) td := newGsTestData(ctx, t) - requestor := New(ctx, td.gsnet1, loader1, storer1) - responder := New(ctx, td.gsnet2, loader2, storer2) + requestor := New(ctx, td.gsnet1, persistence1) + responder := New(ctx, td.gsnet2, persistence2) extensionName := graphsync.ExtensionName("Free for all") responder.RegisterIncomingRequestHook(func(p peer.ID, requestData graphsync.RequestData, hookActions graphsync.IncomingRequestHookActions) { hookActions.ValidateRequest() @@ -985,7 +956,7 @@ func TestGraphsyncBlockListeners(t *testing.T) { // setup receiving peer to just record message coming in blockChainLength := 100 - blockChain := testutil.SetupBlockChain(ctx, t, td.loader2, td.storer2, 100, blockChainLength) + blockChain := testutil.SetupBlockChain(ctx, t, td.persistence2, 100, blockChainLength) // initialize graphsync on second node to response to requests responder := td.GraphSyncHost2() @@ -1054,22 +1025,21 @@ func TestGraphsyncBlockListeners(t *testing.T) { } type gsTestData struct { - mn mocknet.Mocknet - ctx context.Context - host1 host.Host - host2 host.Host - gsnet1 gsnet.GraphSyncNetwork - gsnet2 gsnet.GraphSyncNetwork - blockStore1, blockStore2 map[ipld.Link][]byte - loader1, loader2 ipld.Loader - storer1, storer2 ipld.Storer - extensionData []byte - extensionName graphsync.ExtensionName - extension graphsync.ExtensionData - extensionResponseData []byte - extensionResponse graphsync.ExtensionData - extensionUpdateData []byte - extensionUpdate graphsync.ExtensionData + mn mocknet.Mocknet + ctx context.Context + host1 host.Host + host2 host.Host + gsnet1 gsnet.GraphSyncNetwork + gsnet2 gsnet.GraphSyncNetwork + blockStore1, blockStore2 map[ipld.Link][]byte + persistence1, persistence2 ipld.LinkSystem + extensionData []byte + extensionName graphsync.ExtensionName + extension graphsync.ExtensionData + extensionResponseData []byte + extensionResponse graphsync.ExtensionData + extensionUpdateData []byte + extensionUpdate graphsync.ExtensionData } func newGsTestData(ctx context.Context, t *testing.T) *gsTestData { @@ -1087,9 +1057,9 @@ func newGsTestData(ctx context.Context, t *testing.T) *gsTestData { td.gsnet1 = gsnet.NewFromLibp2pHost(td.host1) td.gsnet2 = gsnet.NewFromLibp2pHost(td.host2) td.blockStore1 = make(map[ipld.Link][]byte) - td.loader1, td.storer1 = testutil.NewTestStore(td.blockStore1) + td.persistence1 = testutil.NewTestStore(td.blockStore1) td.blockStore2 = make(map[ipld.Link][]byte) - td.loader2, td.storer2 = testutil.NewTestStore(td.blockStore2) + td.persistence2 = testutil.NewTestStore(td.blockStore2) // setup extension handlers td.extensionData = testutil.RandomBytes(100) td.extensionName = graphsync.ExtensionName("AppleSauce/McGee") @@ -1112,12 +1082,12 @@ func newGsTestData(ctx context.Context, t *testing.T) *gsTestData { } func (td *gsTestData) GraphSyncHost1(options ...Option) graphsync.GraphExchange { - return New(td.ctx, td.gsnet1, td.loader1, td.storer1, options...) + return New(td.ctx, td.gsnet1, td.persistence1, options...) } func (td *gsTestData) GraphSyncHost2(options ...Option) graphsync.GraphExchange { - return New(td.ctx, td.gsnet2, td.loader2, td.storer2, options...) + return New(td.ctx, td.gsnet2, td.persistence2, options...) } type receivedMessage struct { diff --git a/ipldutil/ipldutil.go b/ipldutil/ipldutil.go index 235f832a..2a5d98a7 100644 --- a/ipldutil/ipldutil.go +++ b/ipldutil/ipldutil.go @@ -2,54 +2,29 @@ package ipldutil import ( "bytes" - "context" + dagpb "github.com/ipld/go-codec-dagpb" ipld "github.com/ipld/go-ipld-prime" - dagpb "github.com/ipld/go-ipld-prime-proto" "github.com/ipld/go-ipld-prime/codec/dagcbor" + _ "github.com/ipld/go-ipld-prime/codec/raw" + cidlink "github.com/ipld/go-ipld-prime/linking/cid" basicnode "github.com/ipld/go-ipld-prime/node/basic" - "github.com/ipld/go-ipld-prime/traversal" - ipldtraversal "github.com/ipld/go-ipld-prime/traversal" "github.com/ipld/go-ipld-prime/traversal/selector" ipldselector "github.com/ipld/go-ipld-prime/traversal/selector" ) -var ( - defaultChooser traversal.LinkTargetNodePrototypeChooser = dagpb.AddDagPBSupportToChooser(func(ipld.Link, ipld.LinkContext) (ipld.NodePrototype, error) { - return basicnode.Prototype.Any, nil - }) -) - -func Traverse(ctx context.Context, loader ipld.Loader, chooser traversal.LinkTargetNodePrototypeChooser, root ipld.Link, s selector.Selector, fn traversal.AdvVisitFn) error { - if chooser == nil { - chooser = defaultChooser - } - ns, err := chooser(root, ipld.LinkContext{}) - if err != nil { - return err +var defaultChooser = func(lnk ipld.Link, lctx ipld.LinkContext) (ipld.NodePrototype, error) { + // We can decode all nodes into basicnode's Any, except for + // dagpb nodes, which must explicitly use the PBNode prototype. + if lnk, ok := lnk.(cidlink.Link); ok && lnk.Cid.Prefix().Codec == 0x70 { + return dagpb.Type.PBNode, nil } - nb := ns.NewBuilder() - err = root.Load(ctx, ipld.LinkContext{}, nb, loader) - if err != nil { - return err - } - node := nb.Build() - return traversal.Progress{ - Cfg: &traversal.Config{ - Ctx: ctx, - LinkLoader: loader, - LinkTargetNodePrototypeChooser: chooser, - }, - }.WalkAdv(node, s, fn) -} - -func WalkMatching(node ipld.Node, s selector.Selector, fn traversal.VisitFn) error { - return ipldtraversal.WalkMatching(node, s, fn) + return basicnode.Prototype.Any, nil } func EncodeNode(node ipld.Node) ([]byte, error) { var buffer bytes.Buffer - err := dagcbor.Encoder(node, &buffer) + err := dagcbor.Encode(node, &buffer) if err != nil { return nil, err } @@ -58,7 +33,7 @@ func EncodeNode(node ipld.Node) ([]byte, error) { func DecodeNode(encoded []byte) (ipld.Node, error) { nb := basicnode.Prototype.Any.NewBuilder() - if err := dagcbor.Decoder(nb, bytes.NewReader(encoded)); err != nil { + if err := dagcbor.Decode(nb, bytes.NewReader(encoded)); err != nil { return nil, err } return nb.Build(), nil diff --git a/ipldutil/traverser.go b/ipldutil/traverser.go index dce0c2e6..3ff5dd82 100644 --- a/ipldutil/traverser.go +++ b/ipldutil/traverser.go @@ -6,10 +6,13 @@ import ( "io" "github.com/ipld/go-ipld-prime" + cidlink "github.com/ipld/go-ipld-prime/linking/cid" "github.com/ipld/go-ipld-prime/traversal" "github.com/ipld/go-ipld-prime/traversal/selector" ) +var defaultLinkSystem = cidlink.DefaultLinkSystem() + var defaultVisitor traversal.AdvVisitFn = func(traversal.Progress, ipld.Node, traversal.VisitReason) error { return nil } // ContextCancelError is a sentinel that indicates the passed in context @@ -22,10 +25,11 @@ func (cp ContextCancelError) Error() string { // TraversalBuilder defines parameters for an iterative traversal type TraversalBuilder struct { - Root ipld.Link - Selector ipld.Node - Visitor traversal.AdvVisitFn - Chooser traversal.LinkTargetNodePrototypeChooser + Root ipld.Link + Selector ipld.Node + Visitor traversal.AdvVisitFn + LinkSystem ipld.LinkSystem + Chooser traversal.LinkTargetNodePrototypeChooser } // Traverser is an interface for performing a selector traversal that operates iteratively -- @@ -67,6 +71,7 @@ func (tb TraversalBuilder) Start(parentCtx context.Context) Traverser { selector: tb.Selector, visitor: defaultVisitor, chooser: defaultChooser, + linkSystem: tb.LinkSystem, awaitRequest: make(chan struct{}, 1), stateChan: make(chan state, 1), responses: make(chan nextResponse), @@ -78,6 +83,16 @@ func (tb TraversalBuilder) Start(parentCtx context.Context) Traverser { if tb.Chooser != nil { t.chooser = tb.Chooser } + if tb.LinkSystem.DecoderChooser == nil { + t.linkSystem.DecoderChooser = defaultLinkSystem.DecoderChooser + } + if tb.LinkSystem.EncoderChooser == nil { + t.linkSystem.EncoderChooser = defaultLinkSystem.EncoderChooser + } + if tb.LinkSystem.HasherChooser == nil { + t.linkSystem.HasherChooser = defaultLinkSystem.HasherChooser + } + t.linkSystem.StorageReadOpener = t.loader t.start() return t } @@ -91,6 +106,7 @@ type traverser struct { root ipld.Link selector ipld.Node visitor traversal.AdvVisitFn + linkSystem ipld.LinkSystem chooser traversal.LinkTargetNodePrototypeChooser currentLink ipld.Link currentContext ipld.LinkContext @@ -102,6 +118,20 @@ type traverser struct { stopped chan struct{} } +func (t *traverser) loader(lnkCtx ipld.LinkContext, lnk ipld.Link) (io.Reader, error) { + select { + case <-t.ctx.Done(): + return nil, ContextCancelError{} + case t.stateChan <- state{false, nil, lnk, lnkCtx}: + } + select { + case <-t.ctx.Done(): + return nil, ContextCancelError{} + case response := <-t.responses: + return response.input, response.err + } +} + func (t *traverser) checkState() { select { case <-t.awaitRequest: @@ -134,31 +164,16 @@ func (t *traverser) start() { } go func() { defer close(t.stopped) - loader := func(lnk ipld.Link, lnkCtx ipld.LinkContext) (io.Reader, error) { - select { - case <-t.ctx.Done(): - return nil, ContextCancelError{} - case t.stateChan <- state{false, nil, lnk, lnkCtx}: - } - select { - case <-t.ctx.Done(): - return nil, ContextCancelError{} - case response := <-t.responses: - return response.input, response.err - } - } ns, err := t.chooser(t.root, ipld.LinkContext{}) if err != nil { t.writeDone(err) return } - nb := ns.NewBuilder() - err = t.root.Load(t.ctx, ipld.LinkContext{}, nb, loader) + nd, err := t.linkSystem.Load(ipld.LinkContext{Ctx: t.ctx}, t.root, ns) if err != nil { t.writeDone(err) return } - nd := nb.Build() sel, err := selector.ParseSelector(t.selector) if err != nil { @@ -168,7 +183,7 @@ func (t *traverser) start() { err = traversal.Progress{ Cfg: &traversal.Config{ Ctx: t.ctx, - LinkLoader: loader, + LinkSystem: t.linkSystem, LinkTargetNodePrototypeChooser: t.chooser, }, }.WalkAdv(nd, sel, t.visitor) diff --git a/ipldutil/traverser_test.go b/ipldutil/traverser_test.go index 54ba4acd..5a244317 100644 --- a/ipldutil/traverser_test.go +++ b/ipldutil/traverser_test.go @@ -44,14 +44,15 @@ func TestTraverser(t *testing.T) { t.Run("traverses correctly, blockchain", func(t *testing.T) { store := make(map[ipld.Link][]byte) - loader, storer := testutil.NewTestStore(store) - blockChain := testutil.SetupBlockChain(ctx, t, loader, storer, 100, 10) + persistence := testutil.NewTestStore(store) + blockChain := testutil.SetupBlockChain(ctx, t, persistence, 100, 10) inProgressChan := make(chan graphsync.ResponseProgress) done := make(chan struct{}) traverser := TraversalBuilder{ - Root: blockChain.TipLink, - Selector: blockChain.Selector(), - Chooser: blockChain.Chooser, + Root: blockChain.TipLink, + Selector: blockChain.Selector(), + Chooser: blockChain.Chooser, + LinkSystem: persistence, Visitor: func(tp traversal.Progress, node ipld.Node, r traversal.VisitReason) error { select { case <-ctx.Done(): diff --git a/message/message.go b/message/message.go index ad677c78..55435232 100644 --- a/message/message.go +++ b/message/message.go @@ -304,6 +304,9 @@ func (gsm GraphSyncMessage) ToProto() (*pb.Message, error) { func (gsm GraphSyncMessage) ToNet(w io.Writer) error { msg, err := gsm.ToProto() + if err != nil { + return err + } size := proto.Size(msg) buf := pool.Get(size + binary.MaxVarintLen64) defer pool.Put(buf) diff --git a/messagequeue/messagequeue.go b/messagequeue/messagequeue.go index 745974b9..52e47537 100644 --- a/messagequeue/messagequeue.go +++ b/messagequeue/messagequeue.go @@ -133,7 +133,7 @@ func (mq *MessageQueue) Shutdown() { func (mq *MessageQueue) runQueue() { defer func() { - mq.allocator.ReleasePeerMemory(mq.p) + _ = mq.allocator.ReleasePeerMemory(mq.p) mq.eventPublisher.Shutdown() }() mq.eventPublisher.Startup() @@ -308,12 +308,12 @@ func (mp *messagePublisher) publishQueued() { func (mp *messagePublisher) publishSent() { mp.mq.eventPublisher.Publish(mp.topic, Event{Name: Sent}) - mp.mq.allocator.ReleaseBlockMemory(mp.mq.p, mp.msgSize) + _ = mp.mq.allocator.ReleaseBlockMemory(mp.mq.p, mp.msgSize) } func (mp *messagePublisher) publishError(err error) { mp.mq.eventPublisher.Publish(mp.topic, Event{Name: Error, Err: err}) - mp.mq.allocator.ReleaseBlockMemory(mp.mq.p, mp.msgSize) + _ = mp.mq.allocator.ReleaseBlockMemory(mp.mq.p, mp.msgSize) } func (mp *messagePublisher) close() { diff --git a/messagequeue/messagequeue_test.go b/messagequeue/messagequeue_test.go index 25aff4d7..0425d936 100644 --- a/messagequeue/messagequeue_test.go +++ b/messagequeue/messagequeue_test.go @@ -332,7 +332,7 @@ func TestSendsResponsesMemoryPressure(t *testing.T) { defer cancel() p := testutil.GeneratePeers(1)[0] - messagesSent := make(chan gsmsg.GraphSyncMessage, 0) + messagesSent := make(chan gsmsg.GraphSyncMessage) resetChan := make(chan struct{}, 1) fullClosedChan := make(chan struct{}, 1) messageSender := &fakeMessageSender{nil, fullClosedChan, resetChan, messagesSent} @@ -363,6 +363,7 @@ func TestSendsResponsesMemoryPressure(t *testing.T) { // assert transaction does not complete within 200ms because it is waiting on memory ctx2, cancel2 := context.WithTimeout(ctx, 200*time.Millisecond) + defer cancel2() select { case <-finishes: t.Fatal("transaction failed to wait on memory") diff --git a/metadata/metadata_test.go b/metadata/metadata_test.go index 7990f97a..ff0f61a6 100644 --- a/metadata/metadata_test.go +++ b/metadata/metadata_test.go @@ -38,19 +38,19 @@ func TestDecodeEncodeMetadata(t *testing.T) { // verify metadata is equivalent of IPLD node encoding encodedNode := new(bytes.Buffer) - err = dagcbor.Encoder(nd, encodedNode) + err = dagcbor.Encode(nd, encodedNode) require.NoError(t, err) decodedMetadataFromNode, err := DecodeMetadata(encodedNode.Bytes()) require.NoError(t, err) require.Equal(t, decodedMetadata, decodedMetadataFromNode, "metadata not equal to IPLD encoding") nb := basicnode.Prototype.List.NewBuilder() - err = dagcbor.Decoder(nb, encodedNode) + err = dagcbor.Decode(nb, encodedNode) require.NoError(t, err) decodedNode := nb.Build() require.Equal(t, nd, decodedNode) nb = basicnode.Prototype.List.NewBuilder() - err = dagcbor.Decoder(nb, bytes.NewReader(encoded)) + err = dagcbor.Decode(nb, bytes.NewReader(encoded)) require.NoError(t, err) decodedNodeFromMetadata := nb.Build() require.Equal(t, decodedNode, decodedNodeFromMetadata, "deserialzed metadata does not match deserialized node") diff --git a/notifications/data_subscriber.go b/notifications/data_subscriber.go index 18db2936..f8643bb1 100644 --- a/notifications/data_subscriber.go +++ b/notifications/data_subscriber.go @@ -34,9 +34,7 @@ func (m *TopicDataSubscriber) getData(id Topic) []TopicData { return []TopicData{} } newData := make([]TopicData, len(data)) - for i, d := range data { - newData[i] = d - } + copy(newData, data) return newData } diff --git a/requestmanager/asyncloader/asyncloader.go b/requestmanager/asyncloader/asyncloader.go index c0111b5d..614dd7f0 100644 --- a/requestmanager/asyncloader/asyncloader.go +++ b/requestmanager/asyncloader/asyncloader.go @@ -33,32 +33,30 @@ type AsyncLoader struct { incomingMessages chan loaderMessage outgoingMessages chan loaderMessage - defaultLoader ipld.Loader - defaultStorer ipld.Storer - activeRequests map[graphsync.RequestID]struct{} - requestQueues map[graphsync.RequestID]string - alternateQueues map[string]alternateQueue - responseCache *responsecache.ResponseCache - loadAttemptQueue *loadattemptqueue.LoadAttemptQueue + defaultLinkSystem ipld.LinkSystem + activeRequests map[graphsync.RequestID]struct{} + requestQueues map[graphsync.RequestID]string + alternateQueues map[string]alternateQueue + responseCache *responsecache.ResponseCache + loadAttemptQueue *loadattemptqueue.LoadAttemptQueue } // New initializes a new link loading manager for asynchronous loads from the given context // and local store loading and storing function -func New(ctx context.Context, loader ipld.Loader, storer ipld.Storer) *AsyncLoader { - responseCache, loadAttemptQueue := setupAttemptQueue(loader, storer) +func New(ctx context.Context, linkSystem ipld.LinkSystem) *AsyncLoader { + responseCache, loadAttemptQueue := setupAttemptQueue(linkSystem) ctx, cancel := context.WithCancel(ctx) return &AsyncLoader{ - ctx: ctx, - cancel: cancel, - incomingMessages: make(chan loaderMessage), - outgoingMessages: make(chan loaderMessage), - defaultLoader: loader, - defaultStorer: storer, - activeRequests: make(map[graphsync.RequestID]struct{}), - requestQueues: make(map[graphsync.RequestID]string), - alternateQueues: make(map[string]alternateQueue), - responseCache: responseCache, - loadAttemptQueue: loadAttemptQueue, + ctx: ctx, + cancel: cancel, + incomingMessages: make(chan loaderMessage), + outgoingMessages: make(chan loaderMessage), + defaultLinkSystem: linkSystem, + activeRequests: make(map[graphsync.RequestID]struct{}), + requestQueues: make(map[graphsync.RequestID]string), + alternateQueues: make(map[string]alternateQueue), + responseCache: responseCache, + loadAttemptQueue: loadAttemptQueue, } } @@ -74,12 +72,12 @@ func (al *AsyncLoader) Shutdown() { } // RegisterPersistenceOption registers a new loader/storer option for processing requests -func (al *AsyncLoader) RegisterPersistenceOption(name string, loader ipld.Loader, storer ipld.Storer) error { +func (al *AsyncLoader) RegisterPersistenceOption(name string, lsys ipld.LinkSystem) error { if name == "" { return errors.New("Persistence option must have a name") } response := make(chan error, 1) - err := al.sendSyncMessage(®isterPersistenceOptionMessage{name, loader, storer, response}, response) + err := al.sendSyncMessage(®isterPersistenceOptionMessage{name, lsys, response}, response) return err } @@ -167,10 +165,9 @@ type newResponsesAvailableMessage struct { } type registerPersistenceOptionMessage struct { - name string - loader ipld.Loader - storer ipld.Storer - response chan error + name string + linkSystem ipld.LinkSystem + response chan error } type unregisterPersistenceOptionMessage struct { @@ -258,7 +255,7 @@ func (rpom *registerPersistenceOptionMessage) register(al *AsyncLoader) error { if existing { return errors.New("already registerd a persistence option with this name") } - responseCache, loadAttemptQueue := setupAttemptQueue(rpom.loader, rpom.storer) + responseCache, loadAttemptQueue := setupAttemptQueue(rpom.linkSystem) al.alternateQueues[rpom.name] = alternateQueue{responseCache, loadAttemptQueue} return nil } @@ -347,16 +344,16 @@ func (crm *cleanupRequestMessage) handle(al *AsyncLoader) { al.responseCache.FinishRequest(crm.requestID) } -func setupAttemptQueue(loader ipld.Loader, storer ipld.Storer) (*responsecache.ResponseCache, *loadattemptqueue.LoadAttemptQueue) { +func setupAttemptQueue(lsys ipld.LinkSystem) (*responsecache.ResponseCache, *loadattemptqueue.LoadAttemptQueue) { - unverifiedBlockStore := unverifiedblockstore.New(storer) + unverifiedBlockStore := unverifiedblockstore.New(lsys.StorageWriteOpener) responseCache := responsecache.New(unverifiedBlockStore) loadAttemptQueue := loadattemptqueue.New(func(requestID graphsync.RequestID, link ipld.Link) types.AsyncLoadResult { // load from response cache data, err := responseCache.AttemptLoad(requestID, link) if data == nil && err == nil { // fall back to local store - stream, loadErr := loader(link, ipld.LinkContext{}) + stream, loadErr := lsys.StorageReadOpener(ipld.LinkContext{}, link) if stream != nil && loadErr == nil { localData, loadErr := ioutil.ReadAll(stream) if loadErr == nil && localData != nil { diff --git a/requestmanager/asyncloader/asyncloader_test.go b/requestmanager/asyncloader/asyncloader_test.go index 8d3a41bc..701b310d 100644 --- a/requestmanager/asyncloader/asyncloader_test.go +++ b/requestmanager/asyncloader/asyncloader_test.go @@ -199,7 +199,7 @@ func TestRegisterUnregister(t *testing.T) { err := asyncLoader.StartRequest(requestID1, "other") require.EqualError(t, err, "Unknown persistence option") - err = asyncLoader.RegisterPersistenceOption("other", otherSt.loader, otherSt.storer) + err = asyncLoader.RegisterPersistenceOption("other", otherSt.lsys) require.NoError(t, err) requestID2 := graphsync.RequestID(rand.Int31()) err = asyncLoader.StartRequest(requestID2, "other") @@ -224,7 +224,7 @@ func TestRequestSplittingLoadLocallyFromBlockstore(t *testing.T) { block := testutil.GenerateBlocksOfSize(1, 100)[0] link := otherSt.Store(t, block) withLoader(st, func(ctx context.Context, asyncLoader *AsyncLoader) { - err := asyncLoader.RegisterPersistenceOption("other", otherSt.loader, otherSt.storer) + err := asyncLoader.RegisterPersistenceOption("other", otherSt.lsys) require.NoError(t, err) requestID1 := graphsync.RequestID(rand.Int31()) resultChan1 := asyncLoader.AsyncLoad(requestID1, link) @@ -246,7 +246,7 @@ func TestRequestSplittingSameBlockTwoStores(t *testing.T) { block := blocks[0] link := cidlink.Link{Cid: block.Cid()} withLoader(st, func(ctx context.Context, asyncLoader *AsyncLoader) { - err := asyncLoader.RegisterPersistenceOption("other", otherSt.loader, otherSt.storer) + err := asyncLoader.RegisterPersistenceOption("other", otherSt.lsys) require.NoError(t, err) requestID1 := graphsync.RequestID(rand.Int31()) requestID2 := graphsync.RequestID(rand.Int31()) @@ -286,7 +286,7 @@ func TestRequestSplittingSameBlockOnlyOneResponse(t *testing.T) { block := blocks[0] link := cidlink.Link{Cid: block.Cid()} withLoader(st, func(ctx context.Context, asyncLoader *AsyncLoader) { - err := asyncLoader.RegisterPersistenceOption("other", otherSt.loader, otherSt.storer) + err := asyncLoader.RegisterPersistenceOption("other", otherSt.lsys) require.NoError(t, err) requestID1 := graphsync.RequestID(rand.Int31()) requestID2 := graphsync.RequestID(rand.Int31()) @@ -314,8 +314,8 @@ func TestRequestSplittingSameBlockOnlyOneResponse(t *testing.T) { } type store struct { - internalLoader ipld.Loader - storer ipld.Storer + internalLoader ipld.BlockReadOpener + lsys ipld.LinkSystem blockstore map[ipld.Link][]byte localLoads int called chan struct{} @@ -323,24 +323,25 @@ type store struct { func newStore() *store { blockstore := make(map[ipld.Link][]byte) - loader, storer := testutil.NewTestStore(blockstore) - return &store{ - internalLoader: loader, - storer: storer, - blockstore: blockstore, - localLoads: 0, - called: make(chan struct{}), + st := &store{ + lsys: testutil.NewTestStore(blockstore), + blockstore: blockstore, + localLoads: 0, + called: make(chan struct{}), } + st.internalLoader = st.lsys.StorageReadOpener + st.lsys.StorageReadOpener = st.loader + return st } -func (st *store) loader(lnk ipld.Link, lnkCtx ipld.LinkContext) (io.Reader, error) { +func (st *store) loader(lnkCtx ipld.LinkContext, lnk ipld.Link) (io.Reader, error) { select { case <-st.called: default: close(st.called) } st.localLoads++ - return st.internalLoader(lnk, lnkCtx) + return st.internalLoader(lnkCtx, lnk) } func (st *store) AssertLocalLoads(t *testing.T, localLoads int) { @@ -356,7 +357,7 @@ func (st *store) AssertAttemptLoadWithoutResult(ctx context.Context, t *testing. } func (st *store) Store(t *testing.T, blk blocks.Block) ipld.Link { - writer, commit, err := st.storer(ipld.LinkContext{}) + writer, commit, err := st.lsys.StorageWriteOpener(ipld.LinkContext{}) require.NoError(t, err) _, err = writer.Write(blk.RawData()) require.NoError(t, err, "seeds block store") @@ -370,7 +371,7 @@ func withLoader(st *store, exec func(ctx context.Context, asyncLoader *AsyncLoad ctx := context.Background() ctx, cancel := context.WithTimeout(ctx, 10*time.Second) defer cancel() - asyncLoader := New(ctx, st.loader, st.storer) + asyncLoader := New(ctx, st.lsys) asyncLoader.Startup() exec(ctx, asyncLoader) } diff --git a/requestmanager/asyncloader/unverifiedblockstore/unverifiedblockstore.go b/requestmanager/asyncloader/unverifiedblockstore/unverifiedblockstore.go index 56d2e1e8..617e8487 100644 --- a/requestmanager/asyncloader/unverifiedblockstore/unverifiedblockstore.go +++ b/requestmanager/asyncloader/unverifiedblockstore/unverifiedblockstore.go @@ -14,12 +14,12 @@ type settableWriter interface { // that have not been verified to be part of a traversal type UnverifiedBlockStore struct { inMemoryBlocks map[ipld.Link][]byte - storer ipld.Storer + storer ipld.BlockWriteOpener } // New initializes a new unverified store with the given storer function for writing // to permaneant storage if the block is verified -func New(storer ipld.Storer) *UnverifiedBlockStore { +func New(storer ipld.BlockWriteOpener) *UnverifiedBlockStore { return &UnverifiedBlockStore{ inMemoryBlocks: make(map[ipld.Link][]byte), storer: storer, diff --git a/requestmanager/asyncloader/unverifiedblockstore/unverifiedblockstore_test.go b/requestmanager/asyncloader/unverifiedblockstore/unverifiedblockstore_test.go index 283301cb..5afb21fd 100644 --- a/requestmanager/asyncloader/unverifiedblockstore/unverifiedblockstore_test.go +++ b/requestmanager/asyncloader/unverifiedblockstore/unverifiedblockstore_test.go @@ -14,10 +14,10 @@ import ( func TestVerifyBlockPresent(t *testing.T) { blocksWritten := make(map[ipld.Link][]byte) - loader, storer := testutil.NewTestStore(blocksWritten) - unverifiedBlockStore := New(storer) + lsys := testutil.NewTestStore(blocksWritten) + unverifiedBlockStore := New(lsys.StorageWriteOpener) block := testutil.GenerateBlocksOfSize(1, 100)[0] - reader, err := loader(cidlink.Link{Cid: block.Cid()}, ipld.LinkContext{}) + reader, err := lsys.StorageReadOpener(ipld.LinkContext{}, cidlink.Link{Cid: block.Cid()}) require.Nil(t, reader) require.Error(t, err, "block should not be loadable till it's verified and stored") @@ -26,7 +26,7 @@ func TestVerifyBlockPresent(t *testing.T) { require.Error(t, err, "block should not be verifiable till it's added as an unverifiable block") unverifiedBlockStore.AddUnverifiedBlock(cidlink.Link{Cid: block.Cid()}, block.RawData()) - reader, err = loader(cidlink.Link{Cid: block.Cid()}, ipld.LinkContext{}) + reader, err = lsys.StorageReadOpener(ipld.LinkContext{}, cidlink.Link{Cid: block.Cid()}) require.Nil(t, reader) require.Error(t, err, "block should not be loadable till it's verified") @@ -34,7 +34,7 @@ func TestVerifyBlockPresent(t *testing.T) { require.NoError(t, err) require.Equal(t, block.RawData(), data, "block should be returned on verification if added") - reader, err = loader(cidlink.Link{Cid: block.Cid()}, ipld.LinkContext{}) + reader, err = lsys.StorageReadOpener(ipld.LinkContext{}, cidlink.Link{Cid: block.Cid()}) require.NoError(t, err) var buffer bytes.Buffer _, err = io.Copy(&buffer, reader) diff --git a/requestmanager/executor/executor_test.go b/requestmanager/executor/executor_test.go index a05ab9f8..6b10252f 100644 --- a/requestmanager/executor/executor_test.go +++ b/requestmanager/executor/executor_test.go @@ -259,8 +259,8 @@ func TestRequestExecutionBlockChain(t *testing.T) { ctx := context.Background() ctx, cancel := context.WithTimeout(ctx, 5*time.Second) defer cancel() - loader, storer := testutil.NewTestStore(make(map[ipld.Link][]byte)) - tbc := testutil.SetupBlockChain(ctx, t, loader, storer, 100, 10) + persistence := testutil.NewTestStore(make(map[ipld.Link][]byte)) + tbc := testutil.SetupBlockChain(ctx, t, persistence, 100, 10) fal := testloader.NewFakeAsyncLoader() requestID := graphsync.RequestID(rand.Int31()) p := testutil.GeneratePeers(1)[0] diff --git a/requestmanager/requestmanager_test.go b/requestmanager/requestmanager_test.go index 31b248aa..010d60dd 100644 --- a/requestmanager/requestmanager_test.go +++ b/requestmanager/requestmanager_test.go @@ -91,7 +91,7 @@ func TestNormalSimultaneousFetch(t *testing.T) { defer cancel() peers := testutil.GeneratePeers(1) - blockChain2 := testutil.SetupBlockChain(ctx, t, td.loader, td.storer, 100, 5) + blockChain2 := testutil.SetupBlockChain(ctx, t, td.persistence, 100, 5) returnedResponseChan1, returnedErrorChan1 := td.requestManager.SendRequest(requestCtx, peers[0], td.blockChain.TipLink, td.blockChain.Selector()) returnedResponseChan2, returnedErrorChan2 := td.requestManager.SendRequest(requestCtx, peers[0], blockChain2.TipLink, blockChain2.Selector()) @@ -873,8 +873,7 @@ type testData struct { blockHooks *hooks.IncomingBlockHooks requestManager *RequestManager blockStore map[ipld.Link][]byte - loader ipld.Loader - storer ipld.Storer + persistence ipld.LinkSystem blockChain *testutil.TestBlockChain extensionName1 graphsync.ExtensionName extensionData1 []byte @@ -898,8 +897,8 @@ func newTestData(ctx context.Context, t *testing.T) *testData { td.requestManager.SetDelegate(td.fph) td.requestManager.Startup() td.blockStore = make(map[ipld.Link][]byte) - td.loader, td.storer = testutil.NewTestStore(td.blockStore) - td.blockChain = testutil.SetupBlockChain(ctx, t, td.loader, td.storer, 100, 5) + td.persistence = testutil.NewTestStore(td.blockStore) + td.blockChain = testutil.SetupBlockChain(ctx, t, td.persistence, 100, 5) td.extensionData1 = testutil.RandomBytes(100) td.extensionName1 = graphsync.ExtensionName("AppleSauce/McGee") td.extension1 = graphsync.ExtensionData{ diff --git a/requestmanager/responsecollector_test.go b/requestmanager/responsecollector_test.go index c543157e..cb312606 100644 --- a/requestmanager/responsecollector_test.go +++ b/requestmanager/responsecollector_test.go @@ -29,8 +29,8 @@ func TestBufferingResponseProgress(t *testing.T) { requestCtx, incomingResponses, incomingErrors, cancelRequest) blockStore := make(map[ipld.Link][]byte) - loader, storer := testutil.NewTestStore(blockStore) - blockChain := testutil.SetupBlockChain(ctx, t, loader, storer, 100, 10) + persistence := testutil.NewTestStore(blockStore) + blockChain := testutil.SetupBlockChain(ctx, t, persistence, 100, 10) blocks := blockChain.AllBlocks() for i, block := range blocks { diff --git a/responsemanager/hooks/hooks_test.go b/responsemanager/hooks/hooks_test.go index eaf1ccc9..9ed18bd9 100644 --- a/responsemanager/hooks/hooks_test.go +++ b/responsemanager/hooks/hooks_test.go @@ -7,6 +7,7 @@ import ( "testing" "github.com/ipld/go-ipld-prime" + cidlink "github.com/ipld/go-ipld-prime/linking/cid" basicnode "github.com/ipld/go-ipld-prime/node/basic" "github.com/ipld/go-ipld-prime/traversal/selector/builder" peer "github.com/libp2p/go-libp2p-core/peer" @@ -19,10 +20,10 @@ import ( ) type fakePersistenceOptions struct { - po map[string]ipld.Loader + po map[string]ipld.LinkSystem } -func (fpo *fakePersistenceOptions) GetLoader(name string) (ipld.Loader, bool) { +func (fpo *fakePersistenceOptions) GetLinkSystem(name string) (ipld.LinkSystem, bool) { loader, ok := fpo.po[name] return loader, ok } @@ -31,12 +32,13 @@ func TestRequestHookProcessing(t *testing.T) { fakeChooser := func(ipld.Link, ipld.LinkContext) (ipld.NodePrototype, error) { return basicnode.Prototype.Any, nil } - fakeLoader := func(link ipld.Link, lnkCtx ipld.LinkContext) (io.Reader, error) { + fakeSystem := cidlink.DefaultLinkSystem() + fakeSystem.StorageReadOpener = func(lnkCtx ipld.LinkContext, link ipld.Link) (io.Reader, error) { return nil, nil } fpo := &fakePersistenceOptions{ - po: map[string]ipld.Loader{ - "chainstore": fakeLoader, + po: map[string]ipld.LinkSystem{ + "chainstore": fakeSystem, }, } extensionData := testutil.RandomBytes(100) @@ -65,7 +67,7 @@ func TestRequestHookProcessing(t *testing.T) { require.False(t, result.IsValidated) require.Empty(t, result.Extensions) require.Nil(t, result.CustomChooser) - require.Nil(t, result.CustomLoader) + require.Nil(t, result.CustomLinkSystem.StorageReadOpener) require.NoError(t, result.Err) }, }, @@ -80,7 +82,7 @@ func TestRequestHookProcessing(t *testing.T) { require.Len(t, result.Extensions, 1) require.Contains(t, result.Extensions, extensionResponse) require.Nil(t, result.CustomChooser) - require.Nil(t, result.CustomLoader) + require.Nil(t, result.CustomLinkSystem.StorageReadOpener) require.NoError(t, result.Err) }, }, @@ -96,7 +98,7 @@ func TestRequestHookProcessing(t *testing.T) { require.Len(t, result.Extensions, 1) require.Contains(t, result.Extensions, extensionResponse) require.Nil(t, result.CustomChooser) - require.Nil(t, result.CustomLoader) + require.Nil(t, result.CustomLinkSystem.StorageReadOpener) require.NoError(t, result.Err) }, }, @@ -114,7 +116,7 @@ func TestRequestHookProcessing(t *testing.T) { require.False(t, result.IsValidated) require.Empty(t, result.Extensions) require.Nil(t, result.CustomChooser) - require.Nil(t, result.CustomLoader) + require.Nil(t, result.CustomLinkSystem.StorageReadOpener) require.EqualError(t, result.Err, "something went wrong") }, }, @@ -130,7 +132,7 @@ func TestRequestHookProcessing(t *testing.T) { require.False(t, result.IsValidated) require.Empty(t, result.Extensions) require.Nil(t, result.CustomChooser) - require.Nil(t, result.CustomLoader) + require.Nil(t, result.CustomLinkSystem.StorageReadOpener) require.NoError(t, result.Err) }, }, @@ -148,7 +150,7 @@ func TestRequestHookProcessing(t *testing.T) { require.Len(t, result.Extensions, 1) require.Contains(t, result.Extensions, extensionResponse) require.Nil(t, result.CustomChooser) - require.NotNil(t, result.CustomLoader) + require.NotNil(t, result.CustomLinkSystem.StorageReadOpener) require.NoError(t, result.Err) }, }, @@ -166,7 +168,7 @@ func TestRequestHookProcessing(t *testing.T) { require.Len(t, result.Extensions, 1) require.Contains(t, result.Extensions, extensionResponse) require.Nil(t, result.CustomChooser) - require.Nil(t, result.CustomLoader) + require.Nil(t, result.CustomLinkSystem.StorageReadOpener) require.EqualError(t, result.Err, "unknown loader option") }, }, @@ -197,7 +199,7 @@ func TestRequestHookProcessing(t *testing.T) { require.Len(t, result.Extensions, 1) require.Contains(t, result.Extensions, extensionResponse) require.NotNil(t, result.CustomChooser) - require.Nil(t, result.CustomLoader) + require.Nil(t, result.CustomLinkSystem.StorageReadOpener) require.NoError(t, result.Err) }, }, diff --git a/responsemanager/hooks/requesthook.go b/responsemanager/hooks/requesthook.go index e6505719..d3e98e20 100644 --- a/responsemanager/hooks/requesthook.go +++ b/responsemanager/hooks/requesthook.go @@ -13,7 +13,7 @@ import ( // PersistenceOptions is an interface for getting loaders by name type PersistenceOptions interface { - GetLoader(name string) (ipld.Loader, bool) + GetLinkSystem(name string) (ipld.LinkSystem, bool) } // IncomingRequestHooks is a set of incoming request hooks that can be processed @@ -50,12 +50,12 @@ func (irh *IncomingRequestHooks) Register(hook graphsync.OnIncomingRequestHook) // RequestResult is the outcome of running requesthooks type RequestResult struct { - IsValidated bool - IsPaused bool - CustomLoader ipld.Loader - CustomChooser traversal.LinkTargetNodePrototypeChooser - Err error - Extensions []graphsync.ExtensionData + IsValidated bool + IsPaused bool + CustomLinkSystem ipld.LinkSystem + CustomChooser traversal.LinkTargetNodePrototypeChooser + Err error + Extensions []graphsync.ExtensionData } // ProcessRequestHooks runs request hooks against an incoming request @@ -72,19 +72,19 @@ type requestHookActions struct { isValidated bool isPaused bool err error - loader ipld.Loader + linkSystem ipld.LinkSystem chooser traversal.LinkTargetNodePrototypeChooser extensions []graphsync.ExtensionData } func (ha *requestHookActions) result() RequestResult { return RequestResult{ - IsValidated: ha.isValidated, - IsPaused: ha.isPaused, - CustomLoader: ha.loader, - CustomChooser: ha.chooser, - Err: ha.err, - Extensions: ha.extensions, + IsValidated: ha.isValidated, + IsPaused: ha.isPaused, + CustomLinkSystem: ha.linkSystem, + CustomChooser: ha.chooser, + Err: ha.err, + Extensions: ha.extensions, } } @@ -101,12 +101,12 @@ func (ha *requestHookActions) ValidateRequest() { } func (ha *requestHookActions) UsePersistenceOption(name string) { - loader, ok := ha.persistenceOptions.GetLoader(name) + linkSystem, ok := ha.persistenceOptions.GetLinkSystem(name) if !ok { ha.TerminateWithError(errors.New("unknown loader option")) return } - ha.loader = loader + ha.linkSystem = linkSystem } func (ha *requestHookActions) UseLinkTargetNodePrototypeChooser(chooser traversal.LinkTargetNodePrototypeChooser) { diff --git a/responsemanager/persistenceoptions/persistenceoptions.go b/responsemanager/persistenceoptions/persistenceoptions.go index a082d27f..fe3ec27b 100644 --- a/responsemanager/persistenceoptions/persistenceoptions.go +++ b/responsemanager/persistenceoptions/persistenceoptions.go @@ -10,29 +10,29 @@ import ( // PersistenceOptions is a registry of loaders for persistence options type PersistenceOptions struct { persistenceOptionsLk sync.RWMutex - persistenceOptions map[string]ipld.Loader + persistenceOptions map[string]ipld.LinkSystem } // New returns a new registry of persistence options func New() *PersistenceOptions { return &PersistenceOptions{ - persistenceOptions: make(map[string]ipld.Loader), + persistenceOptions: make(map[string]ipld.LinkSystem), } } -// Register registers a new loader for the response manager -func (po *PersistenceOptions) Register(name string, loader ipld.Loader) error { +// Register registers a new link system for the response manager +func (po *PersistenceOptions) Register(name string, linkSystem ipld.LinkSystem) error { po.persistenceOptionsLk.Lock() defer po.persistenceOptionsLk.Unlock() _, ok := po.persistenceOptions[name] if ok { return errors.New("persistence option alreayd registered") } - po.persistenceOptions[name] = loader + po.persistenceOptions[name] = linkSystem return nil } -// Unregister unregisters a loader for the response manager +// Unregister unregisters a link system for the response manager func (po *PersistenceOptions) Unregister(name string) error { po.persistenceOptionsLk.Lock() defer po.persistenceOptionsLk.Unlock() @@ -44,10 +44,10 @@ func (po *PersistenceOptions) Unregister(name string) error { return nil } -// GetLoader returns the loader for the named persistence option -func (po *PersistenceOptions) GetLoader(name string) (ipld.Loader, bool) { +// GetLinkSystem returns the link system for the named persistence option +func (po *PersistenceOptions) GetLinkSystem(name string) (ipld.LinkSystem, bool) { po.persistenceOptionsLk.RLock() defer po.persistenceOptionsLk.RUnlock() - loader, ok := po.persistenceOptions[name] - return loader, ok + linkSystem, ok := po.persistenceOptions[name] + return linkSystem, ok } diff --git a/responsemanager/queryexecutor.go b/responsemanager/queryexecutor.go index 07e4bd14..cecdc3ea 100644 --- a/responsemanager/queryexecutor.go +++ b/responsemanager/queryexecutor.go @@ -31,7 +31,7 @@ type queryExecutor struct { updateHooks UpdateHooks cancelledListeners CancelledListeners responseAssembler ResponseAssembler - loader ipld.Loader + linkSystem ipld.LinkSystem queryQueue QueryQueue messages chan responseManagerMessage ctx context.Context @@ -114,7 +114,7 @@ func (qe *queryExecutor) executeTask(key responseKey, taskData responseTaskData) func (qe *queryExecutor) prepareQuery(ctx context.Context, p peer.ID, - request gsmsg.GraphSyncRequest, signals signals, sub *notifications.TopicDataSubscriber) (ipld.Loader, ipldutil.Traverser, bool, error) { + request gsmsg.GraphSyncRequest, signals signals, sub *notifications.TopicDataSubscriber) (ipld.BlockReadOpener, ipldutil.Traverser, bool, error) { result := qe.requestHooks.ProcessRequestHooks(p, request) var transactionError error var isPaused bool @@ -146,16 +146,18 @@ func (qe *queryExecutor) prepareQuery(ctx context.Context, return nil, nil, false, err } rootLink := cidlink.Link{Cid: request.Root()} + linkSystem := result.CustomLinkSystem + if linkSystem.StorageReadOpener == nil { + linkSystem = qe.linkSystem + } traverser := ipldutil.TraversalBuilder{ - Root: rootLink, - Selector: request.Selector(), - Chooser: result.CustomChooser, + Root: rootLink, + Selector: request.Selector(), + LinkSystem: linkSystem, + Chooser: result.CustomChooser, }.Start(ctx) - loader := result.CustomLoader - if loader == nil { - loader = qe.loader - } - return loader, traverser, isPaused, nil + + return linkSystem.StorageReadOpener, traverser, isPaused, nil } func (qe *queryExecutor) processDedupByKey(request gsmsg.GraphSyncRequest, p peer.ID, failNotifee notifications.Notifee) error { @@ -205,7 +207,7 @@ func (qe *queryExecutor) processDoNoSendCids(request gsmsg.GraphSyncRequest, p p func (qe *queryExecutor) executeQuery( p peer.ID, request gsmsg.GraphSyncRequest, - loader ipld.Loader, + loader ipld.BlockReadOpener, traverser ipldutil.Traverser, signals signals, sub *notifications.TopicDataSubscriber) (graphsync.ResponseStatusCode, error) { diff --git a/responsemanager/responseassembler/responseassembler_test.go b/responsemanager/responseassembler/responseassembler_test.go index 0c5b0c40..e6b644ba 100644 --- a/responsemanager/responseassembler/responseassembler_test.go +++ b/responsemanager/responseassembler/responseassembler_test.go @@ -444,17 +444,3 @@ func (fph *fakePeerHandler) sendResponse(p peer.ID, responses []gsmsg.GraphSyncR fph.lastBlocks = blks fph.lastNotifiees = notifees } - -type fakeAllocator struct { - response chan error -} - -func newFakeAllocator() *fakeAllocator { - return &fakeAllocator{ - response: make(chan error, 0), - } -} - -func (fa *fakeAllocator) AllocateBlockMemory(p peer.ID, amount uint64) <-chan error { - return fa.response -} diff --git a/responsemanager/responsemanager.go b/responsemanager/responsemanager.go index 608ae232..dfbeb244 100644 --- a/responsemanager/responsemanager.go +++ b/responsemanager/responsemanager.go @@ -29,7 +29,7 @@ type inProgressResponseStatus struct { ctx context.Context cancelFn func() request gsmsg.GraphSyncRequest - loader ipld.Loader + loader ipld.BlockReadOpener traverser ipldutil.Traverser signals signals updates []gsmsg.GraphSyncRequest @@ -53,7 +53,7 @@ type responseTaskData struct { subscriber *notifications.TopicDataSubscriber ctx context.Context request gsmsg.GraphSyncRequest - loader ipld.Loader + loader ipld.BlockReadOpener traverser ipldutil.Traverser signals signals } @@ -135,7 +135,7 @@ type ResponseManager struct { // New creates a new response manager for responding to requests func New(ctx context.Context, - loader ipld.Loader, + linkSystem ipld.LinkSystem, responseAssembler ResponseAssembler, queryQueue QueryQueue, requestHooks RequestHooks, @@ -156,7 +156,7 @@ func New(ctx context.Context, updateHooks: updateHooks, cancelledListeners: cancelledListeners, responseAssembler: responseAssembler, - loader: loader, + linkSystem: linkSystem, queryQueue: queryQueue, messages: messages, ctx: ctx, @@ -270,7 +270,7 @@ type finishTaskRequest struct { type setResponseDataRequest struct { key responseKey - loader ipld.Loader + loader ipld.BlockReadOpener traverser ipldutil.Traverser } diff --git a/responsemanager/responsemanager_test.go b/responsemanager/responsemanager_test.go index dcfc216b..0f965cd8 100644 --- a/responsemanager/responsemanager_test.go +++ b/responsemanager/responsemanager_test.go @@ -205,7 +205,7 @@ func TestValidationAndExtensions(t *testing.T) { responseManager.ProcessRequests(td.ctx, td.p, td.requests) td.assertCompleteRequestWithFailure() - err := td.peristenceOptions.Register("chainstore", td.loader) + err := td.peristenceOptions.Register("chainstore", td.persistence) require.NoError(t, err) // register hook to use different loader _ = td.requestHooks.Register(func(p peer.ID, requestData graphsync.RequestData, hookActions graphsync.IncomingRequestHookActions) { @@ -843,8 +843,7 @@ type testData struct { t *testing.T cancel func() blockStore map[ipld.Link][]byte - loader ipld.Loader - storer ipld.Storer + persistence ipld.LinkSystem blockChainLength int blockChain *testutil.TestBlockChain completedRequestChan chan completedRequest @@ -889,9 +888,9 @@ func newTestData(t *testing.T) testData { td.ctx, td.cancel = context.WithTimeout(ctx, 10*time.Second) td.blockStore = make(map[ipld.Link][]byte) - td.loader, td.storer = testutil.NewTestStore(td.blockStore) + td.persistence = testutil.NewTestStore(td.blockStore) td.blockChainLength = 5 - td.blockChain = testutil.SetupBlockChain(ctx, t, td.loader, td.storer, 100, td.blockChainLength) + td.blockChain = testutil.SetupBlockChain(ctx, t, td.persistence, 100, td.blockChainLength) td.completedRequestChan = make(chan completedRequest, 1) td.sentResponses = make(chan sentResponse, td.blockChainLength*2) @@ -970,13 +969,13 @@ func newTestData(t *testing.T) testData { } func (td *testData) newResponseManager() *ResponseManager { - return New(td.ctx, td.loader, td.responseAssembler, td.queryQueue, td.requestHooks, td.blockHooks, td.updateHooks, td.completedListeners, td.cancelledListeners, td.blockSentListeners, td.networkErrorListeners, 6) + return New(td.ctx, td.persistence, td.responseAssembler, td.queryQueue, td.requestHooks, td.blockHooks, td.updateHooks, td.completedListeners, td.cancelledListeners, td.blockSentListeners, td.networkErrorListeners, 6) } func (td *testData) alternateLoaderResponseManager() *ResponseManager { obs := make(map[ipld.Link][]byte) - oloader, _ := testutil.NewTestStore(obs) - return New(td.ctx, oloader, td.responseAssembler, td.queryQueue, td.requestHooks, td.blockHooks, td.updateHooks, td.completedListeners, td.cancelledListeners, td.blockSentListeners, td.networkErrorListeners, 6) + persistence := testutil.NewTestStore(obs) + return New(td.ctx, persistence, td.responseAssembler, td.queryQueue, td.requestHooks, td.blockHooks, td.updateHooks, td.completedListeners, td.cancelledListeners, td.blockSentListeners, td.networkErrorListeners, 6) } func (td *testData) assertPausedRequest() { diff --git a/responsemanager/runtraversal/runtraversal.go b/responsemanager/runtraversal/runtraversal.go index fc30f062..5814921c 100644 --- a/responsemanager/runtraversal/runtraversal.go +++ b/responsemanager/runtraversal/runtraversal.go @@ -19,7 +19,7 @@ type ResponseSender func( // RunTraversal wraps a given loader with an interceptor that sends loaded // blocks out to the network with the given response sender. func RunTraversal( - loader ipld.Loader, + loader ipld.BlockReadOpener, traverser ipldutil.Traverser, sendResponse ResponseSender) error { for { @@ -28,7 +28,7 @@ func RunTraversal( return err } lnk, lnkCtx := traverser.CurrentRequest() - result, err := loader(lnk, lnkCtx) + result, err := loader(lnkCtx, lnk) var data []byte if err != nil { traverser.Error(traversal.SkipMe{}) diff --git a/responsemanager/runtraversal/runtraversal_test.go b/responsemanager/runtraversal/runtraversal_test.go index 35bc1436..00e510c1 100644 --- a/responsemanager/runtraversal/runtraversal_test.go +++ b/responsemanager/runtraversal/runtraversal_test.go @@ -111,7 +111,7 @@ type fakeLoader struct { currentLoader int } -func (fl *fakeLoader) Load(link ipld.Link, linkCtx ipld.LinkContext) (io.Reader, error) { +func (fl *fakeLoader) Load(linkCtx ipld.LinkContext, link ipld.Link) (io.Reader, error) { fl.receivedLoads = append(fl.receivedLoads, loadedLink{link, linkCtx}) outcome := fl.loadReturns[fl.currentLoader] fl.currentLoader++ diff --git a/storeutil/storeutil.go b/storeutil/storeutil.go index 09840066..70e58d41 100644 --- a/storeutil/storeutil.go +++ b/storeutil/storeutil.go @@ -11,10 +11,11 @@ import ( cidlink "github.com/ipld/go-ipld-prime/linking/cid" ) -// LoaderForBlockstore returns an IPLD Loader function compatible with graphsync -// from an IPFS blockstore -func LoaderForBlockstore(bs bstore.Blockstore) ipld.Loader { - return func(lnk ipld.Link, lnkCtx ipld.LinkContext) (io.Reader, error) { +// LinkSystemForBlockstore constructs an IPLD LinkSystem for a blockstore +func LinkSystemForBlockstore(bs bstore.Blockstore) ipld.LinkSystem { + lsys := cidlink.DefaultLinkSystem() + lsys.TrustedStorage = true + lsys.StorageReadOpener = func(lnkCtx ipld.LinkContext, lnk ipld.Link) (io.Reader, error) { asCidLink, ok := lnk.(cidlink.Link) if !ok { return nil, fmt.Errorf("Unsupported Link Type") @@ -25,12 +26,7 @@ func LoaderForBlockstore(bs bstore.Blockstore) ipld.Loader { } return bytes.NewBuffer(block.RawData()), nil } -} - -// StorerForBlockstore returns an IPLD Storer function compatible with graphsync -// from an IPFS blockstore -func StorerForBlockstore(bs bstore.Blockstore) ipld.Storer { - return func(lnkCtx ipld.LinkContext) (io.Writer, ipld.StoreCommitter, error) { + lsys.StorageWriteOpener = func(lnkCtx ipld.LinkContext) (io.Writer, ipld.BlockWriteCommitter, error) { var buffer settableBuffer committer := func(lnk ipld.Link) error { asCidLink, ok := lnk.(cidlink.Link) @@ -45,6 +41,7 @@ func StorerForBlockstore(bs bstore.Blockstore) ipld.Storer { } return &buffer, committer, nil } + return lsys } type settableBuffer struct { diff --git a/storeutil/storeutil_test.go b/storeutil/storeutil_test.go index 6ca3f9f1..c25b2753 100644 --- a/storeutil/storeutil_test.go +++ b/storeutil/storeutil_test.go @@ -15,30 +15,20 @@ import ( "github.com/ipfs/go-graphsync/testutil" ) -func TestLoader(t *testing.T) { +func TestLinkSystem(t *testing.T) { store := bstore.NewBlockstore(dss.MutexWrap(datastore.NewMapDatastore())) blk := testutil.GenerateBlocksOfSize(1, 1000)[0] - err := store.Put(blk) + persistence := LinkSystemForBlockstore(store) + buffer, commit, err := persistence.StorageWriteOpener(ipld.LinkContext{}) + require.NoError(t, err, "Unable to setup buffer") + _, err = buffer.Write(blk.RawData()) + require.NoError(t, err, "Unable to write data to buffer") + err = commit(cidlink.Link{Cid: blk.Cid()}) require.NoError(t, err, "Unable to put block to store") - loader := LoaderForBlockstore(store) - data, err := loader(cidlink.Link{Cid: blk.Cid()}, ipld.LinkContext{}) + data, err := persistence.StorageReadOpener(ipld.LinkContext{}, cidlink.Link{Cid: blk.Cid()}) require.NoError(t, err, "Unable to load block with loader") bytes, err := ioutil.ReadAll(data) require.NoError(t, err, "Unable to read bytes from reader returned by loader") _, err = blocks.NewBlockWithCid(bytes, blk.Cid()) require.NoError(t, err, "Did not return correct block with loader") } - -func TestStorer(t *testing.T) { - store := bstore.NewBlockstore(dss.MutexWrap(datastore.NewMapDatastore())) - blk := testutil.GenerateBlocksOfSize(1, 1000)[0] - storer := StorerForBlockstore(store) - buffer, commit, err := storer(ipld.LinkContext{}) - require.NoError(t, err, "Unable to setup buffer") - _, err = buffer.Write(blk.RawData()) - require.NoError(t, err, "Unable to write data to buffer") - err = commit(cidlink.Link{Cid: blk.Cid()}) - require.NoError(t, err, "Unable to commit with storer function") - _, err = store.Get(blk.Cid()) - require.NoError(t, err, "Block not written to store") -} diff --git a/testutil/chaintypes/gen/main.go b/testutil/chaintypes/gen/main.go index 9ca672cd..d2786400 100644 --- a/testutil/chaintypes/gen/main.go +++ b/testutil/chaintypes/gen/main.go @@ -28,5 +28,5 @@ func main() { schema.SpawnStructRepresentationMap(nil), )) gengo.Generate(".", pkgName, ts, adjCfg) - exec.Command("go", "fmt").Run() + _ = exec.Command("go", "fmt").Run() } diff --git a/testutil/test_ipld_tree.go b/testutil/test_ipld_tree.go index 57d15384..2855ec0f 100644 --- a/testutil/test_ipld_tree.go +++ b/testutil/test_ipld_tree.go @@ -1,10 +1,6 @@ package testutil import ( - "bytes" - "context" - "io" - blocks "github.com/ipfs/go-block-format" "github.com/ipfs/go-cid" "github.com/ipld/go-ipld-prime" @@ -40,22 +36,15 @@ type TestIPLDTree struct { // NewTestIPLDTree returns a fake tree of nodes, spread across 5 blocks func NewTestIPLDTree() TestIPLDTree { var storage = make(map[ipld.Link][]byte) + persistence := NewTestStore(storage) encode := func(n ipld.Node) (ipld.Node, ipld.Link) { - lb := cidlink.LinkBuilder{Prefix: cid.Prefix{ + lp := cidlink.LinkPrototype{Prefix: cid.Prefix{ Version: 1, Codec: 0x0129, - MhType: 0x17, + MhType: 0x13, MhLength: 4, }} - lnk, err := lb.Build(context.Background(), ipld.LinkContext{}, n, - func(ipld.LinkContext) (io.Writer, ipld.StoreCommitter, error) { - buf := bytes.Buffer{} - return &buf, func(lnk ipld.Link) error { - storage[lnk] = buf.Bytes() - return nil - }, nil - }, - ) + lnk, err := persistence.Store(ipld.LinkContext{}, lp, n) if err != nil { panic(err) } diff --git a/testutil/testchain.go b/testutil/testchain.go index dbe240e0..5998446a 100644 --- a/testutil/testchain.go +++ b/testutil/testchain.go @@ -33,7 +33,7 @@ const blockChainTraversedNodesPerBlock = 2 type TestBlockChain struct { t TestingT blockChainLength int - loader ipld.Loader + loader ipld.BlockReadOpener GenisisNode ipld.Node GenisisLink ipld.Link MiddleNodes []ipld.Node @@ -96,14 +96,13 @@ func createBlock(parents []ipld.Link, size uint64) (ipld.Node, error) { func SetupBlockChain( ctx context.Context, t TestingT, - loader ipld.Loader, - storer ipld.Storer, + lsys ipld.LinkSystem, size uint64, blockChainLength int) *TestBlockChain { - linkBuilder := cidlink.LinkBuilder{Prefix: cid.NewPrefixV1(cid.DagCBOR, mh.SHA2_256)} + linkPrototype := cidlink.LinkPrototype{Prefix: cid.NewPrefixV1(cid.DagCBOR, mh.SHA2_256)} genisisNode, err := createBlock([]ipld.Link{}, size) require.NoError(t, err, "Error creating genesis block") - genesisLink, err := linkBuilder.Build(ctx, ipld.LinkContext{}, genisisNode, storer) + genesisLink, err := lsys.Store(ipld.LinkContext{Ctx: ctx}, linkPrototype, genisisNode) require.NoError(t, err, "Error creating link to genesis block") parent := genesisLink middleNodes := make([]ipld.Node, 0, blockChainLength-2) @@ -112,16 +111,16 @@ func SetupBlockChain( node, err := createBlock([]ipld.Link{parent}, size) require.NoError(t, err, "Error creating middle block") middleNodes = append(middleNodes, node) - link, err := linkBuilder.Build(ctx, ipld.LinkContext{}, node, storer) + link, err := lsys.Store(ipld.LinkContext{Ctx: ctx}, linkPrototype, node) require.NoError(t, err, "Error creating link to middle block") middleLinks = append(middleLinks, link) parent = link } tipNode, err := createBlock([]ipld.Link{parent}, size) require.NoError(t, err, "Error creating tip block") - tipLink, err := linkBuilder.Build(ctx, ipld.LinkContext{}, tipNode, storer) + tipLink, err := lsys.Store(ipld.LinkContext{Ctx: ctx}, linkPrototype, tipNode) require.NoError(t, err, "Error creating link to tip block") - return &TestBlockChain{t, blockChainLength, loader, genisisNode, genesisLink, middleNodes, middleLinks, tipNode, tipLink} + return &TestBlockChain{t, blockChainLength, lsys.StorageReadOpener, genisisNode, genesisLink, middleNodes, middleLinks, tipNode, tipLink} } // Selector returns the selector to recursive traverse the block chain parent links @@ -249,7 +248,7 @@ func (tbc *TestBlockChain) Blocks(from int, to int) []blocks.Block { var blks []blocks.Block for i := from; i < to; i++ { link := tbc.LinkTipIndex(i) - reader, err := tbc.loader(link, ipld.LinkContext{}) + reader, err := tbc.loader(ipld.LinkContext{}, link) require.NoError(tbc.t, err) data, err := ioutil.ReadAll(reader) require.NoError(tbc.t, err) diff --git a/testutil/testnotifications.go b/testutil/testnotifications.go index 80bddee8..53e7ce60 100644 --- a/testutil/testnotifications.go +++ b/testutil/testnotifications.go @@ -11,7 +11,6 @@ import ( ) type TestSubscriber struct { - expectedTopic notifications.Topic receivedEvents chan DispatchedEvent closed chan notifications.Topic } diff --git a/testutil/teststore.go b/testutil/teststore.go index 86f84673..d24f6a1d 100644 --- a/testutil/teststore.go +++ b/testutil/teststore.go @@ -7,12 +7,15 @@ import ( "sync" ipld "github.com/ipld/go-ipld-prime" + cidlink "github.com/ipld/go-ipld-prime/linking/cid" ) // NewTestStore provides a loader and storer for the given in memory link -> byte data map -func NewTestStore(blocksWritten map[ipld.Link][]byte) (ipld.Loader, ipld.Storer) { +func NewTestStore(blocksWritten map[ipld.Link][]byte) ipld.LinkSystem { var storeLk sync.RWMutex - storer := func(lnkCtx ipld.LinkContext) (io.Writer, ipld.StoreCommitter, error) { + lsys := cidlink.DefaultLinkSystem() + lsys.TrustedStorage = true + lsys.StorageWriteOpener = func(lnkCtx ipld.LinkContext) (io.Writer, ipld.BlockWriteCommitter, error) { var buffer bytes.Buffer committer := func(lnk ipld.Link) error { storeLk.Lock() @@ -22,7 +25,7 @@ func NewTestStore(blocksWritten map[ipld.Link][]byte) (ipld.Loader, ipld.Storer) } return &buffer, committer, nil } - loader := func(lnk ipld.Link, lnkCtx ipld.LinkContext) (io.Reader, error) { + lsys.StorageReadOpener = func(lnkCtx ipld.LinkContext, lnk ipld.Link) (io.Reader, error) { storeLk.RLock() data, ok := blocksWritten[lnk] storeLk.RUnlock() @@ -32,5 +35,5 @@ func NewTestStore(blocksWritten map[ipld.Link][]byte) (ipld.Loader, ipld.Storer) return nil, fmt.Errorf("unable to load block") } - return loader, storer + return lsys }