Skip to content

Commit 26a8652

Browse files
committed
merkledag: switch to new dag interface
Also: * Update the blockstore/blockservice methods to match. * Construct a new temporary offline dag instead of having a GetOfflineLinkService method. License: MIT Signed-off-by: Steven Allen <[email protected]>
1 parent 5202f55 commit 26a8652

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

68 files changed

+493
-654
lines changed

blockservice/blockservice.go

Lines changed: 39 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"context"
88
"errors"
99
"fmt"
10+
"io"
1011

1112
"github.com/ipfs/go-ipfs/blocks/blockstore"
1213
exchange "github.com/ipfs/go-ipfs/exchange"
@@ -21,31 +22,42 @@ var log = logging.Logger("blockservice")
2122

2223
var ErrNotFound = errors.New("blockservice: key not found")
2324

25+
type BlockGetter interface {
26+
// GetBlock gets the requested block.
27+
GetBlock(ctx context.Context, c *cid.Cid) (blocks.Block, error)
28+
29+
// GetBlocks does a batch request for the given cids, returning blocks as
30+
// they are found, in no particular order.
31+
//
32+
// It may not be able to find all requested blocks (or the context may
33+
// be canceled). In that case, it will close the channel early. It is up
34+
// to the consumer to detect this situation and keep track which blocks
35+
// it has received and which it hasn't.
36+
GetBlocks(ctx context.Context, ks []*cid.Cid) <-chan blocks.Block
37+
}
38+
2439
// BlockService is a hybrid block datastore. It stores data in a local
2540
// datastore and may retrieve data from a remote Exchange.
2641
// It uses an internal `datastore.Datastore` instance to store values.
2742
type BlockService interface {
43+
io.Closer
44+
BlockGetter
45+
2846
// Blockstore returns a reference to the underlying blockstore
2947
Blockstore() blockstore.Blockstore
3048

3149
// Exchange returns a reference to the underlying exchange (usually bitswap)
3250
Exchange() exchange.Interface
3351

3452
// AddBlock puts a given block to the underlying datastore
35-
AddBlock(o blocks.Block) (*cid.Cid, error)
53+
AddBlock(o blocks.Block) error
3654

3755
// AddBlocks adds a slice of blocks at the same time using batching
3856
// capabilities of the underlying datastore whenever possible.
39-
AddBlocks(bs []blocks.Block) ([]*cid.Cid, error)
40-
41-
GetBlock(ctx context.Context, c *cid.Cid) (blocks.Block, error)
42-
DeleteBlock(o blocks.Block) error
57+
AddBlocks(bs []blocks.Block) error
4358

44-
// GetBlocks does a batch request for the given cids, returning blocks as
45-
// they are found, in no particular order.
46-
GetBlocks(ctx context.Context, ks []*cid.Cid) <-chan blocks.Block
47-
48-
Close() error
59+
// DeleteBlock deletes the given block from the blockservice.
60+
DeleteBlock(o *cid.Cid) error
4961
}
5062

5163
type blockService struct {
@@ -110,38 +122,34 @@ func NewSession(ctx context.Context, bs BlockService) *Session {
110122

111123
// AddBlock adds a particular block to the service, Putting it into the datastore.
112124
// TODO pass a context into this if the remote.HasBlock is going to remain here.
113-
func (s *blockService) AddBlock(o blocks.Block) (*cid.Cid, error) {
125+
func (s *blockService) AddBlock(o blocks.Block) error {
114126
c := o.Cid()
115127
if s.checkFirst {
116-
has, err := s.blockstore.Has(c)
117-
if err != nil {
118-
return nil, err
119-
}
120-
121-
if has {
122-
return c, nil
128+
if has, err := s.blockstore.Has(c); has || err != nil {
129+
return err
123130
}
124131
}
125132

126-
err := s.blockstore.Put(o)
127-
if err != nil {
128-
return nil, err
133+
if err := s.blockstore.Put(o); err != nil {
134+
return err
129135
}
130136

131137
if err := s.exchange.HasBlock(o); err != nil {
132-
return nil, errors.New("blockservice is closed")
138+
// TODO(stebalien): really an error?
139+
return errors.New("blockservice is closed")
133140
}
134141

135-
return c, nil
142+
return nil
136143
}
137144

138-
func (s *blockService) AddBlocks(bs []blocks.Block) ([]*cid.Cid, error) {
145+
func (s *blockService) AddBlocks(bs []blocks.Block) error {
139146
var toput []blocks.Block
140147
if s.checkFirst {
148+
toput = make([]blocks.Block, 0, len(bs))
141149
for _, b := range bs {
142150
has, err := s.blockstore.Has(b.Cid())
143151
if err != nil {
144-
return nil, err
152+
return err
145153
}
146154
if !has {
147155
toput = append(toput, b)
@@ -153,18 +161,16 @@ func (s *blockService) AddBlocks(bs []blocks.Block) ([]*cid.Cid, error) {
153161

154162
err := s.blockstore.PutMany(toput)
155163
if err != nil {
156-
return nil, err
164+
return err
157165
}
158166

159-
var ks []*cid.Cid
160167
for _, o := range toput {
161168
if err := s.exchange.HasBlock(o); err != nil {
162-
return nil, fmt.Errorf("blockservice is closed (%s)", err)
169+
// TODO(stebalien): Should this really *return*?
170+
return fmt.Errorf("blockservice is closed (%s)", err)
163171
}
164-
165-
ks = append(ks, o.Cid())
166172
}
167-
return ks, nil
173+
return nil
168174
}
169175

170176
// GetBlock retrieves a particular block from the service,
@@ -256,8 +262,8 @@ func getBlocks(ctx context.Context, ks []*cid.Cid, bs blockstore.Blockstore, f e
256262
}
257263

258264
// DeleteBlock deletes a block in the blockservice from the datastore
259-
func (s *blockService) DeleteBlock(o blocks.Block) error {
260-
return s.blockstore.DeleteBlock(o.Cid())
265+
func (s *blockService) DeleteBlock(c *cid.Cid) error {
266+
return s.blockstore.DeleteBlock(c)
261267
}
262268

263269
func (s *blockService) Close() error {

blockservice/test/blocks_test.go

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -33,16 +33,12 @@ func TestBlocks(t *testing.T) {
3333
t.Error("Block key and data multihash key not equal")
3434
}
3535

36-
k, err := bs.AddBlock(o)
36+
err := bs.AddBlock(o)
3737
if err != nil {
3838
t.Error("failed to add block to BlockService", err)
3939
return
4040
}
4141

42-
if !k.Equals(o.Cid()) {
43-
t.Error("returned key is not equal to block key", err)
44-
}
45-
4642
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
4743
defer cancel()
4844
b2, err := bs.GetBlock(ctx, o.Cid())

core/commands/block.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -199,14 +199,14 @@ It reads from stdin, and <key> is a base58 encoded multihash.
199199
return
200200
}
201201

202-
k, err := n.Blocks.AddBlock(b)
202+
err = n.Blocks.AddBlock(b)
203203
if err != nil {
204204
res.SetError(err, cmdkit.ErrNormal)
205205
return
206206
}
207207

208208
err = cmds.EmitOnce(res, &BlockStat{
209-
Key: k.String(),
209+
Key: b.Cid().String(),
210210
Size: len(data),
211211
})
212212
if err != nil {

core/commands/dag/dag.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import (
1818
cid "gx/ipfs/QmcZfnkapfECQGcLZaf9B79NRg7cRa9EnZh4LSbkCzwNvY/go-cid"
1919
cmdkit "gx/ipfs/QmceUdzxkimdYsgtX733uNgzf1DLHyBKN6ehGSp85ayppM/go-ipfs-cmdkit"
2020
files "gx/ipfs/QmceUdzxkimdYsgtX733uNgzf1DLHyBKN6ehGSp85ayppM/go-ipfs-cmdkit/files"
21+
node "gx/ipfs/Qme5bWv7wtjUNGsK2BNGVUFPKiuxWrsqrtvYwCLRw8YFES/go-ipld-format"
2122
)
2223

2324
var log = logging.Logger("cmds/files")
@@ -102,7 +103,7 @@ into an object of the specified format.
102103

103104
addAllAndPin := func(f files.File) error {
104105
cids := cid.NewSet()
105-
b := n.DAG.Batch()
106+
b := node.NewBatch(req.Context(), n.DAG)
106107

107108
for {
108109
file, err := f.NextFile()
@@ -122,7 +123,7 @@ into an object of the specified format.
122123
}
123124

124125
for _, nd := range nds {
125-
_, err := b.Add(nd)
126+
err := b.Add(nd)
126127
if err != nil {
127128
return err
128129
}

core/commands/dht.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import (
1919
peer "gx/ipfs/Qma7H6RW8wRrfZpNSXwxYGcd1E149s42FpWNpDNieSVrnU/go-libp2p-peer"
2020
cid "gx/ipfs/QmcZfnkapfECQGcLZaf9B79NRg7cRa9EnZh4LSbkCzwNvY/go-cid"
2121
"gx/ipfs/QmceUdzxkimdYsgtX733uNgzf1DLHyBKN6ehGSp85ayppM/go-ipfs-cmdkit"
22+
node "gx/ipfs/Qme5bWv7wtjUNGsK2BNGVUFPKiuxWrsqrtvYwCLRw8YFES/go-ipld-format"
2223
pstore "gx/ipfs/QmeZVQzUrXqaszo24DAoHfGzcmCptN9JyngLkGAiEfk2x7/go-libp2p-peerstore"
2324
ipdht "gx/ipfs/QmfChjky1VNaHUQR9F2xqR1QEyX45pqU78nhsoq5GDYoKL/go-libp2p-kad-dht"
2425
)
@@ -377,7 +378,7 @@ func provideKeys(ctx context.Context, r routing.IpfsRouting, cids []*cid.Cid) er
377378
return nil
378379
}
379380

380-
func provideKeysRec(ctx context.Context, r routing.IpfsRouting, dserv dag.DAGService, cids []*cid.Cid) error {
381+
func provideKeysRec(ctx context.Context, r routing.IpfsRouting, dserv node.DAGService, cids []*cid.Cid) error {
381382
provided := cid.NewSet()
382383
for _, c := range cids {
383384
kset := cid.NewSet()

core/commands/files/files.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -170,7 +170,7 @@ func statGetFormatOptions(req cmds.Request) (string, error) {
170170
}
171171
}
172172

173-
func statNode(ds dag.DAGService, fsn mfs.FSNode) (*Object, error) {
173+
func statNode(ds node.DAGService, fsn mfs.FSNode) (*Object, error) {
174174
nd, err := fsn.GetNode()
175175
if err != nil {
176176
return nil, err

core/commands/ls.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -134,7 +134,7 @@ The JSON output contains type information.
134134
t := unixfspb.Data_DataType(-1)
135135

136136
linkNode, err := link.GetNode(req.Context(), dserv)
137-
if err == merkledag.ErrNotFound && !resolve {
137+
if err == node.ErrNotFound && !resolve {
138138
// not an error
139139
linkNode = nil
140140
} else if err != nil {

core/commands/object/object.go

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package objectcmd
22

33
import (
44
"bytes"
5+
"context"
56
"encoding/base64"
67
"encoding/json"
78
"encoding/xml"
@@ -422,7 +423,7 @@ And then run:
422423
defer n.Blockstore.PinLock().Unlock()
423424
}
424425

425-
objectCid, err := objectPut(n, input, inputenc, datafieldenc)
426+
objectCid, err := objectPut(req.Context(), n, input, inputenc, datafieldenc)
426427
if err != nil {
427428
errType := cmdkit.ErrNormal
428429
if err == ErrUnknownObjectEnc {
@@ -504,12 +505,12 @@ Available templates:
504505
}
505506
}
506507

507-
k, err := n.DAG.Add(node)
508+
err = n.DAG.Add(req.Context(), node)
508509
if err != nil {
509510
res.SetError(err, cmdkit.ErrNormal)
510511
return
511512
}
512-
res.SetOutput(&Object{Hash: k.String()})
513+
res.SetOutput(&Object{Hash: node.Cid().String()})
513514
},
514515
Marshalers: cmds.MarshalerMap{
515516
cmds.Text: func(res cmds.Response) (io.Reader, error) {
@@ -542,7 +543,7 @@ func nodeFromTemplate(template string) (*dag.ProtoNode, error) {
542543
var ErrEmptyNode = errors.New("no data or links in this node")
543544

544545
// objectPut takes a format option, serializes bytes from stdin and updates the dag with that data
545-
func objectPut(n *core.IpfsNode, input io.Reader, encoding string, dataFieldEncoding string) (*cid.Cid, error) {
546+
func objectPut(ctx context.Context, n *core.IpfsNode, input io.Reader, encoding string, dataFieldEncoding string) (*cid.Cid, error) {
546547

547548
data, err := ioutil.ReadAll(io.LimitReader(input, inputLimit+10))
548549
if err != nil {
@@ -602,7 +603,7 @@ func objectPut(n *core.IpfsNode, input io.Reader, encoding string, dataFieldEnco
602603
return nil, err
603604
}
604605

605-
_, err = n.DAG.Add(dagnode)
606+
err = n.DAG.Add(ctx, dagnode)
606607
if err != nil {
607608
return nil, err
608609
}

core/commands/object/patch.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -109,13 +109,13 @@ the limit will not be respected by the network.
109109

110110
rtpb.SetData(append(rtpb.Data(), data...))
111111

112-
newkey, err := nd.DAG.Add(rtpb)
112+
err = nd.DAG.Add(req.Context(), rtpb)
113113
if err != nil {
114114
res.SetError(err, cmdkit.ErrNormal)
115115
return
116116
}
117117

118-
res.SetOutput(&Object{Hash: newkey.String()})
118+
res.SetOutput(&Object{Hash: rtpb.Cid().String()})
119119
},
120120
Type: Object{},
121121
Marshalers: cmds.MarshalerMap{
@@ -177,13 +177,13 @@ Example:
177177

178178
rtpb.SetData(data)
179179

180-
newkey, err := nd.DAG.Add(rtpb)
180+
err = nd.DAG.Add(req.Context(), rtpb)
181181
if err != nil {
182182
res.SetError(err, cmdkit.ErrNormal)
183183
return
184184
}
185185

186-
res.SetOutput(&Object{Hash: newkey.String()})
186+
res.SetOutput(&Object{Hash: rtpb.Cid().String()})
187187
},
188188
Type: Object{},
189189
Marshalers: cmds.MarshalerMap{
@@ -237,7 +237,7 @@ Removes a link by the given name from root.
237237
return
238238
}
239239

240-
nnode, err := e.Finalize(nd.DAG)
240+
nnode, err := e.Finalize(req.Context(), nd.DAG)
241241
if err != nil {
242242
res.SetError(err, cmdkit.ErrNormal)
243243
return
@@ -334,7 +334,7 @@ to a file containing 'bar', and returns the hash of the new object.
334334
return
335335
}
336336

337-
nnode, err := e.Finalize(nd.DAG)
337+
nnode, err := e.Finalize(req.Context(), nd.DAG)
338338
if err != nil {
339339
res.SetError(err, cmdkit.ErrNormal)
340340
return

core/commands/pin.go

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,10 +7,12 @@ import (
77
"io"
88
"time"
99

10+
bserv "github.com/ipfs/go-ipfs/blockservice"
1011
cmds "github.com/ipfs/go-ipfs/commands"
1112
core "github.com/ipfs/go-ipfs/core"
1213
e "github.com/ipfs/go-ipfs/core/commands/e"
1314
corerepo "github.com/ipfs/go-ipfs/core/corerepo"
15+
offline "github.com/ipfs/go-ipfs/exchange/offline"
1416
dag "github.com/ipfs/go-ipfs/merkledag"
1517
path "github.com/ipfs/go-ipfs/path"
1618
pin "github.com/ipfs/go-ipfs/pin"
@@ -555,7 +557,7 @@ func pinLsAll(typeStr string, ctx context.Context, n *core.IpfsNode) (map[string
555557
if typeStr == "indirect" || typeStr == "all" {
556558
set := cid.NewSet()
557559
for _, k := range n.Pinning.RecursiveKeys() {
558-
err := dag.EnumerateChildren(n.Context(), n.DAG.GetLinks, k, set.Visit)
560+
err := dag.EnumerateChildren(n.Context(), dag.GetLinksWithDAG(n.DAG), k, set.Visit)
559561
if err != nil {
560562
return nil, err
561563
}
@@ -594,7 +596,10 @@ type pinVerifyOpts struct {
594596

595597
func pinVerify(ctx context.Context, n *core.IpfsNode, opts pinVerifyOpts) <-chan interface{} {
596598
visited := make(map[string]PinStatus)
597-
getLinks := n.DAG.GetOfflineLinkService().GetLinks
599+
600+
bs := n.Blocks.Blockstore()
601+
DAG := dag.NewDAGService(bserv.New(bs, offline.Exchange(bs)))
602+
getLinks := dag.GetLinksWithDAG(DAG)
598603
recPins := n.Pinning.RecursiveKeys()
599604

600605
var checkPin func(root *cid.Cid) PinStatus

core/commands/pubsub.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -100,13 +100,13 @@ This command outputs data in the following encodings:
100100
if discover {
101101
go func() {
102102
blk := blocks.NewBlock([]byte("floodsub:" + topic))
103-
cid, err := n.Blocks.AddBlock(blk)
103+
err := n.Blocks.AddBlock(blk)
104104
if err != nil {
105105
log.Error("pubsub discovery: ", err)
106106
return
107107
}
108108

109-
connectToPubSubPeers(req.Context, n, cid)
109+
connectToPubSubPeers(req.Context, n, blk.Cid())
110110
}()
111111
}
112112

0 commit comments

Comments
 (0)