Skip to content

Commit 711cd8f

Browse files
Merge pull request #3294 from ipfs/kevina/dup-prov-cache-fix
Create a write-through block service
2 parents edf2b52 + affb06d commit 711cd8f

File tree

14 files changed

+149
-65
lines changed

14 files changed

+149
-65
lines changed

blocks/blocksutil/block_generator.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,13 +10,13 @@ type BlockGenerator struct {
1010
seq int
1111
}
1212

13-
func (bg *BlockGenerator) Next() blocks.Block {
13+
func (bg *BlockGenerator) Next() *blocks.BasicBlock {
1414
bg.seq++
1515
return blocks.NewBlock([]byte(string(bg.seq)))
1616
}
1717

18-
func (bg *BlockGenerator) Blocks(n int) []blocks.Block {
19-
blocks := make([]blocks.Block, 0)
18+
func (bg *BlockGenerator) Blocks(n int) []*blocks.BasicBlock {
19+
blocks := make([]*blocks.BasicBlock, 0)
2020
for i := 0; i < n; i++ {
2121
b := bg.Next()
2222
blocks = append(blocks, b)

blockservice/blockservice.go

Lines changed: 82 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -4,14 +4,14 @@
44
package blockservice
55

66
import (
7+
"context"
78
"errors"
89
"fmt"
910

1011
blocks "github.com/ipfs/go-ipfs/blocks"
1112
"github.com/ipfs/go-ipfs/blocks/blockstore"
1213
exchange "github.com/ipfs/go-ipfs/exchange"
1314

14-
context "context"
1515
logging "gx/ipfs/QmSpJByNKFX1sCsHBEp3R73FL4NF6FnQTEGyNAXHm2GS52/go-log"
1616
cid "gx/ipfs/QmakyCk6Vnn16WEKjbkxieZmM2YLTzkFWizbmGowoYPjro/go-cid"
1717
)
@@ -23,77 +23,112 @@ var ErrNotFound = errors.New("blockservice: key not found")
2323
// BlockService is a hybrid block datastore. It stores data in a local
2424
// datastore and may retrieve data from a remote Exchange.
2525
// It uses an internal `datastore.Datastore` instance to store values.
26-
type BlockService struct {
27-
// TODO don't expose underlying impl details
28-
Blockstore blockstore.Blockstore
29-
Exchange exchange.Interface
26+
type BlockService interface {
27+
Blockstore() blockstore.Blockstore
28+
Exchange() exchange.Interface
29+
AddBlock(o blocks.Block) (*cid.Cid, error)
30+
AddBlocks(bs []blocks.Block) ([]*cid.Cid, error)
31+
GetBlock(ctx context.Context, c *cid.Cid) (blocks.Block, error)
32+
GetBlocks(ctx context.Context, ks []*cid.Cid) <-chan blocks.Block
33+
DeleteBlock(o blocks.Block) error
34+
Close() error
35+
}
36+
37+
type blockService struct {
38+
blockstore blockstore.Blockstore
39+
exchange exchange.Interface
40+
// If checkFirst is true then first check that a block doesn't
41+
// already exist to avoid republishing the block on the exchange.
42+
checkFirst bool
3043
}
3144

3245
// NewBlockService creates a BlockService with given datastore instance.
33-
func New(bs blockstore.Blockstore, rem exchange.Interface) *BlockService {
46+
func New(bs blockstore.Blockstore, rem exchange.Interface) BlockService {
47+
if rem == nil {
48+
log.Warning("blockservice running in local (offline) mode.")
49+
}
50+
51+
return &blockService{
52+
blockstore: bs,
53+
exchange: rem,
54+
checkFirst: true,
55+
}
56+
}
57+
58+
// NewWriteThrough ceates a BlockService that guarantees writes will go
59+
// through to the blockstore and are not skipped by cache checks.
60+
func NewWriteThrough(bs blockstore.Blockstore, rem exchange.Interface) BlockService {
3461
if rem == nil {
3562
log.Warning("blockservice running in local (offline) mode.")
3663
}
3764

38-
return &BlockService{
39-
Blockstore: bs,
40-
Exchange: rem,
65+
return &blockService{
66+
blockstore: bs,
67+
exchange: rem,
68+
checkFirst: false,
4169
}
4270
}
4371

72+
func (bs *blockService) Blockstore() blockstore.Blockstore {
73+
return bs.blockstore
74+
}
75+
76+
func (bs *blockService) Exchange() exchange.Interface {
77+
return bs.exchange
78+
}
79+
4480
// AddBlock adds a particular block to the service, Putting it into the datastore.
4581
// TODO pass a context into this if the remote.HasBlock is going to remain here.
46-
func (s *BlockService) AddBlock(o blocks.Block) (*cid.Cid, error) {
47-
// TODO: while this is a great optimization, we should think about the
48-
// possibility of streaming writes directly to disk. If we can pass this object
49-
// all the way down to the datastore without having to 'buffer' its data,
50-
// we could implement a `WriteTo` method on it that could do a streaming write
51-
// of the content, saving us (probably) considerable memory.
82+
func (s *blockService) AddBlock(o blocks.Block) (*cid.Cid, error) {
5283
c := o.Cid()
53-
has, err := s.Blockstore.Has(c)
54-
if err != nil {
55-
return nil, err
56-
}
84+
if s.checkFirst {
85+
has, err := s.blockstore.Has(c)
86+
if err != nil {
87+
return nil, err
88+
}
5789

58-
if has {
59-
return c, nil
90+
if has {
91+
return c, nil
92+
}
6093
}
6194

62-
err = s.Blockstore.Put(o)
95+
err := s.blockstore.Put(o)
6396
if err != nil {
6497
return nil, err
6598
}
6699

67-
if err := s.Exchange.HasBlock(o); err != nil {
100+
if err := s.exchange.HasBlock(o); err != nil {
68101
return nil, errors.New("blockservice is closed")
69102
}
70103

71104
return c, nil
72105
}
73106

74-
func (s *BlockService) AddBlocks(bs []blocks.Block) ([]*cid.Cid, error) {
107+
func (s *blockService) AddBlocks(bs []blocks.Block) ([]*cid.Cid, error) {
75108
var toput []blocks.Block
76-
for _, b := range bs {
77-
has, err := s.Blockstore.Has(b.Cid())
78-
if err != nil {
79-
return nil, err
80-
}
81-
82-
if has {
83-
continue
109+
if s.checkFirst {
110+
for _, b := range bs {
111+
has, err := s.blockstore.Has(b.Cid())
112+
if err != nil {
113+
return nil, err
114+
}
115+
if has {
116+
continue
117+
}
118+
toput = append(toput, b)
84119
}
85-
86-
toput = append(toput, b)
120+
} else {
121+
toput = bs
87122
}
88123

89-
err := s.Blockstore.PutMany(toput)
124+
err := s.blockstore.PutMany(toput)
90125
if err != nil {
91126
return nil, err
92127
}
93128

94129
var ks []*cid.Cid
95130
for _, o := range toput {
96-
if err := s.Exchange.HasBlock(o); err != nil {
131+
if err := s.exchange.HasBlock(o); err != nil {
97132
return nil, fmt.Errorf("blockservice is closed (%s)", err)
98133
}
99134

@@ -104,19 +139,19 @@ func (s *BlockService) AddBlocks(bs []blocks.Block) ([]*cid.Cid, error) {
104139

105140
// GetBlock retrieves a particular block from the service,
106141
// Getting it from the datastore using the key (hash).
107-
func (s *BlockService) GetBlock(ctx context.Context, c *cid.Cid) (blocks.Block, error) {
142+
func (s *blockService) GetBlock(ctx context.Context, c *cid.Cid) (blocks.Block, error) {
108143
log.Debugf("BlockService GetBlock: '%s'", c)
109144

110-
block, err := s.Blockstore.Get(c)
145+
block, err := s.blockstore.Get(c)
111146
if err == nil {
112147
return block, nil
113148
}
114149

115-
if err == blockstore.ErrNotFound && s.Exchange != nil {
150+
if err == blockstore.ErrNotFound && s.exchange != nil {
116151
// TODO be careful checking ErrNotFound. If the underlying
117152
// implementation changes, this will break.
118153
log.Debug("Blockservice: Searching bitswap")
119-
blk, err := s.Exchange.GetBlock(ctx, c)
154+
blk, err := s.exchange.GetBlock(ctx, c)
120155
if err != nil {
121156
if err == blockstore.ErrNotFound {
122157
return nil, ErrNotFound
@@ -137,13 +172,13 @@ func (s *BlockService) GetBlock(ctx context.Context, c *cid.Cid) (blocks.Block,
137172
// GetBlocks gets a list of blocks asynchronously and returns through
138173
// the returned channel.
139174
// NB: No guarantees are made about order.
140-
func (s *BlockService) GetBlocks(ctx context.Context, ks []*cid.Cid) <-chan blocks.Block {
175+
func (s *blockService) GetBlocks(ctx context.Context, ks []*cid.Cid) <-chan blocks.Block {
141176
out := make(chan blocks.Block, 0)
142177
go func() {
143178
defer close(out)
144179
var misses []*cid.Cid
145180
for _, c := range ks {
146-
hit, err := s.Blockstore.Get(c)
181+
hit, err := s.blockstore.Get(c)
147182
if err != nil {
148183
misses = append(misses, c)
149184
continue
@@ -160,7 +195,7 @@ func (s *BlockService) GetBlocks(ctx context.Context, ks []*cid.Cid) <-chan bloc
160195
return
161196
}
162197

163-
rblocks, err := s.Exchange.GetBlocks(ctx, misses)
198+
rblocks, err := s.exchange.GetBlocks(ctx, misses)
164199
if err != nil {
165200
log.Debugf("Error with GetBlocks: %s", err)
166201
return
@@ -178,11 +213,11 @@ func (s *BlockService) GetBlocks(ctx context.Context, ks []*cid.Cid) <-chan bloc
178213
}
179214

180215
// DeleteBlock deletes a block in the blockservice from the datastore
181-
func (s *BlockService) DeleteBlock(o blocks.Block) error {
182-
return s.Blockstore.DeleteBlock(o.Cid())
216+
func (s *blockService) DeleteBlock(o blocks.Block) error {
217+
return s.blockstore.DeleteBlock(o.Cid())
183218
}
184219

185-
func (s *BlockService) Close() error {
220+
func (s *blockService) Close() error {
186221
log.Debug("blockservice is shutting down...")
187-
return s.Exchange.Close()
222+
return s.exchange.Close()
188223
}

blockservice/blockservice_test.go

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
package blockservice
2+
3+
import (
4+
"testing"
5+
6+
"github.com/ipfs/go-ipfs/blocks"
7+
"github.com/ipfs/go-ipfs/blocks/blockstore"
8+
butil "github.com/ipfs/go-ipfs/blocks/blocksutil"
9+
offline "github.com/ipfs/go-ipfs/exchange/offline"
10+
11+
ds "gx/ipfs/QmbzuUusHqaLLoNTDEVLcSF6vZDHZDLPC7p4bztRvvkXxU/go-datastore"
12+
dssync "gx/ipfs/QmbzuUusHqaLLoNTDEVLcSF6vZDHZDLPC7p4bztRvvkXxU/go-datastore/sync"
13+
)
14+
15+
func TestWriteThroughWorks(t *testing.T) {
16+
bstore := &PutCountingBlockstore{
17+
blockstore.NewBlockstore(dssync.MutexWrap(ds.NewMapDatastore())),
18+
0,
19+
}
20+
bstore2 := blockstore.NewBlockstore(dssync.MutexWrap(ds.NewMapDatastore()))
21+
exch := offline.Exchange(bstore2)
22+
bserv := NewWriteThrough(bstore, exch)
23+
bgen := butil.NewBlockGenerator()
24+
25+
block := bgen.Next()
26+
27+
t.Logf("PutCounter: %d", bstore.PutCounter)
28+
bserv.AddBlock(block)
29+
if bstore.PutCounter != 1 {
30+
t.Fatalf("expected just one Put call, have: %d", bstore.PutCounter)
31+
}
32+
33+
bserv.AddBlock(block)
34+
if bstore.PutCounter != 2 {
35+
t.Fatal("Put should have called again, should be 2 is: %d", bstore.PutCounter)
36+
}
37+
}
38+
39+
var _ blockstore.GCBlockstore = (*PutCountingBlockstore)(nil)
40+
41+
type PutCountingBlockstore struct {
42+
blockstore.GCBlockstore
43+
PutCounter int
44+
}
45+
46+
func (bs *PutCountingBlockstore) Put(block blocks.Block) error {
47+
bs.PutCounter++
48+
return bs.GCBlockstore.Put(block)
49+
}

blockservice/test/mock.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,13 +9,13 @@ import (
99
)
1010

1111
// Mocks returns |n| connected mock Blockservices
12-
func Mocks(n int) []*BlockService {
12+
func Mocks(n int) []BlockService {
1313
net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(0))
1414
sg := bitswap.NewTestSessionGenerator(net)
1515

1616
instances := sg.Instances(n)
1717

18-
var servs []*BlockService
18+
var servs []BlockService
1919
for _, i := range instances {
2020
servs = append(servs, New(i.Blockstore(), i.Exchange))
2121
}

cmd/ipfs/init.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,13 +8,13 @@ import (
88
"os"
99
"path"
1010

11+
context "context"
1112
assets "github.com/ipfs/go-ipfs/assets"
1213
cmds "github.com/ipfs/go-ipfs/commands"
1314
core "github.com/ipfs/go-ipfs/core"
1415
namesys "github.com/ipfs/go-ipfs/namesys"
1516
config "github.com/ipfs/go-ipfs/repo/config"
1617
fsrepo "github.com/ipfs/go-ipfs/repo/fsrepo"
17-
context "context"
1818
)
1919

2020
const (

core/core.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,7 @@ type IpfsNode struct {
9696
// Services
9797
Peerstore pstore.Peerstore // storage for other Peer instances
9898
Blockstore bstore.GCBlockstore // the block store (lower level)
99-
Blocks *bserv.BlockService // the block service, get/add blocks.
99+
Blocks bserv.BlockService // the block service, get/add blocks.
100100
DAG merkledag.DAGService // the merkle dag service, get/add objects.
101101
Resolver *path.Resolver // the path resolution system
102102
Reporter metrics.Reporter

core/core_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,10 @@ package core
33
import (
44
"testing"
55

6+
context "context"
67
"github.com/ipfs/go-ipfs/repo"
78
config "github.com/ipfs/go-ipfs/repo/config"
89
"github.com/ipfs/go-ipfs/thirdparty/testutil"
9-
context "context"
1010
)
1111

1212
func TestInitialization(t *testing.T) {

core/corerepo/stat.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,9 @@ package corerepo
33
import (
44
"fmt"
55

6+
context "context"
67
"github.com/ipfs/go-ipfs/core"
78
fsrepo "github.com/ipfs/go-ipfs/repo/fsrepo"
8-
context "context"
99
)
1010

1111
type Stat struct {

core/coreunix/cat.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
11
package coreunix
22

33
import (
4+
context "context"
45
core "github.com/ipfs/go-ipfs/core"
56
path "github.com/ipfs/go-ipfs/path"
67
uio "github.com/ipfs/go-ipfs/unixfs/io"
7-
context "context"
88
)
99

1010
func Cat(ctx context.Context, n *core.IpfsNode, pstr string) (*uio.DagReader, error) {

fuse/ipns/link_unix.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,9 @@ package ipns
55
import (
66
"os"
77

8+
"context"
89
"github.com/ipfs/go-ipfs/Godeps/_workspace/src/bazil.org/fuse"
910
"github.com/ipfs/go-ipfs/Godeps/_workspace/src/bazil.org/fuse/fs"
10-
"context"
1111
)
1212

1313
type Link struct {

fuse/node/mount_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,13 +9,13 @@ import (
99
"testing"
1010
"time"
1111

12+
context "context"
1213
core "github.com/ipfs/go-ipfs/core"
1314
ipns "github.com/ipfs/go-ipfs/fuse/ipns"
1415
mount "github.com/ipfs/go-ipfs/fuse/mount"
1516
namesys "github.com/ipfs/go-ipfs/namesys"
1617
offroute "github.com/ipfs/go-ipfs/routing/offline"
1718
ci "github.com/ipfs/go-ipfs/thirdparty/testutil/ci"
18-
context "context"
1919
)
2020

2121
func maybeSkipFuseTests(t *testing.T) {

0 commit comments

Comments
 (0)