Skip to content

Commit ad5730d

Browse files
Merge pull request #2885 from ipfs/feature/bloom-cache
Add ARC caching and bloom filter for blockstorage
2 parents 50a7df6 + f13506c commit ad5730d

File tree

6 files changed

+254
-92
lines changed

6 files changed

+254
-92
lines changed

blocks/blockstore/bloom_cache.go

Lines changed: 181 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,181 @@
1+
package blockstore
2+
3+
import (
4+
"github.com/ipfs/go-ipfs/blocks"
5+
key "github.com/ipfs/go-ipfs/blocks/key"
6+
lru "gx/ipfs/QmVYxfoJQiZijTgPNHCHgHELvQpbsJNTg6Crmc3dQkj3yy/golang-lru"
7+
bloom "gx/ipfs/QmWQ2SJisXwcCLsUXLwYCKSfyExXjFRW2WbBH5sqCUnwX5/bbloom"
8+
context "gx/ipfs/QmZy2y8t9zQH2a1b8q2ZSLKp17ATuJoCNxxyMFG5qFExpt/go-net/context"
9+
ds "gx/ipfs/QmfQzVugPq1w5shWRcLWSeiHF4a2meBX7yVD8Vw7GWJM9o/go-datastore"
10+
11+
"sync/atomic"
12+
)
13+
14+
// BloomCached returns Blockstore that caches Has requests using Bloom filter
15+
// Size is size of bloom filter in bytes
16+
func BloomCached(bs Blockstore, bloomSize, lruSize int) (*bloomcache, error) {
17+
bl, err := bloom.New(float64(bloomSize), float64(7))
18+
if err != nil {
19+
return nil, err
20+
}
21+
arc, err := lru.NewARC(lruSize)
22+
if err != nil {
23+
return nil, err
24+
}
25+
bc := &bloomcache{blockstore: bs, bloom: bl, arc: arc}
26+
bc.Invalidate()
27+
go bc.Rebuild()
28+
29+
return bc, nil
30+
}
31+
32+
type bloomcache struct {
33+
bloom *bloom.Bloom
34+
active int32
35+
36+
arc *lru.ARCCache
37+
// This chan is only used for testing to wait for bloom to enable
38+
rebuildChan chan struct{}
39+
blockstore Blockstore
40+
41+
// Statistics
42+
hits uint64
43+
misses uint64
44+
}
45+
46+
func (b *bloomcache) Invalidate() {
47+
b.rebuildChan = make(chan struct{})
48+
atomic.StoreInt32(&b.active, 0)
49+
}
50+
51+
func (b *bloomcache) BloomActive() bool {
52+
return atomic.LoadInt32(&b.active) != 0
53+
}
54+
55+
func (b *bloomcache) Rebuild() {
56+
ctx := context.TODO()
57+
evt := log.EventBegin(ctx, "bloomcache.Rebuild")
58+
defer evt.Done()
59+
60+
ch, err := b.blockstore.AllKeysChan(ctx)
61+
if err != nil {
62+
log.Errorf("AllKeysChan failed in bloomcache rebuild with: %v", err)
63+
return
64+
}
65+
for key := range ch {
66+
b.bloom.AddTS([]byte(key)) // Use binary key, the more compact the better
67+
}
68+
close(b.rebuildChan)
69+
atomic.StoreInt32(&b.active, 1)
70+
}
71+
72+
func (b *bloomcache) DeleteBlock(k key.Key) error {
73+
if has, ok := b.hasCached(k); ok && !has {
74+
return ErrNotFound
75+
}
76+
77+
b.arc.Remove(k) // Invalidate cache before deleting.
78+
err := b.blockstore.DeleteBlock(k)
79+
switch err {
80+
case nil:
81+
b.arc.Add(k, false)
82+
case ds.ErrNotFound, ErrNotFound:
83+
b.arc.Add(k, false)
84+
default:
85+
return err
86+
}
87+
return nil
88+
}
89+
90+
// if ok == false has is inconclusive
91+
// if ok == true then has respons to question: is it contained
92+
func (b *bloomcache) hasCached(k key.Key) (has bool, ok bool) {
93+
if k == "" {
94+
// Return cache invalid so call to blockstore
95+
// in case of invalid key is forwarded deeper
96+
return false, false
97+
}
98+
if b.BloomActive() {
99+
blr := b.bloom.HasTS([]byte(k))
100+
if blr == false { // not contained in bloom is only conclusive answer bloom gives
101+
return false, true
102+
}
103+
}
104+
h, ok := b.arc.Get(k)
105+
if ok {
106+
return h.(bool), ok
107+
} else {
108+
return false, false
109+
}
110+
}
111+
112+
func (b *bloomcache) Has(k key.Key) (bool, error) {
113+
if has, ok := b.hasCached(k); ok {
114+
return has, nil
115+
}
116+
117+
res, err := b.blockstore.Has(k)
118+
if err == nil {
119+
b.arc.Add(k, res)
120+
}
121+
return res, err
122+
}
123+
124+
func (b *bloomcache) Get(k key.Key) (blocks.Block, error) {
125+
if has, ok := b.hasCached(k); ok && !has {
126+
return nil, ErrNotFound
127+
}
128+
129+
bl, err := b.blockstore.Get(k)
130+
if bl == nil && err == ErrNotFound {
131+
b.arc.Add(k, false)
132+
} else if bl != nil {
133+
b.arc.Add(k, true)
134+
}
135+
return bl, err
136+
}
137+
138+
func (b *bloomcache) Put(bl blocks.Block) error {
139+
if has, ok := b.hasCached(bl.Key()); ok && has {
140+
return nil
141+
}
142+
143+
err := b.blockstore.Put(bl)
144+
if err == nil {
145+
b.bloom.AddTS([]byte(bl.Key()))
146+
b.arc.Add(bl.Key(), true)
147+
}
148+
return err
149+
}
150+
151+
func (b *bloomcache) PutMany(bs []blocks.Block) error {
152+
var good []blocks.Block
153+
for _, block := range bs {
154+
if has, ok := b.hasCached(block.Key()); !ok || (ok && !has) {
155+
good = append(good, block)
156+
}
157+
}
158+
err := b.blockstore.PutMany(bs)
159+
if err == nil {
160+
for _, block := range bs {
161+
b.bloom.AddTS([]byte(block.Key()))
162+
}
163+
}
164+
return err
165+
}
166+
167+
func (b *bloomcache) AllKeysChan(ctx context.Context) (<-chan key.Key, error) {
168+
return b.blockstore.AllKeysChan(ctx)
169+
}
170+
171+
func (b *bloomcache) GCLock() Unlocker {
172+
return b.blockstore.(GCBlockstore).GCLock()
173+
}
174+
175+
func (b *bloomcache) PinLock() Unlocker {
176+
return b.blockstore.(GCBlockstore).PinLock()
177+
}
178+
179+
func (b *bloomcache) GCRequested() bool {
180+
return b.blockstore.(GCBlockstore).GCRequested()
181+
}

blocks/blockstore/write_cache_test.go renamed to blocks/blockstore/bloom_cache_test.go

Lines changed: 64 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,34 +1,44 @@
11
package blockstore
22

33
import (
4+
"fmt"
5+
"sync"
46
"testing"
7+
"time"
58

69
"github.com/ipfs/go-ipfs/blocks"
10+
711
ds "gx/ipfs/QmfQzVugPq1w5shWRcLWSeiHF4a2meBX7yVD8Vw7GWJM9o/go-datastore"
812
dsq "gx/ipfs/QmfQzVugPq1w5shWRcLWSeiHF4a2meBX7yVD8Vw7GWJM9o/go-datastore/query"
913
syncds "gx/ipfs/QmfQzVugPq1w5shWRcLWSeiHF4a2meBX7yVD8Vw7GWJM9o/go-datastore/sync"
1014
)
1115

1216
func TestReturnsErrorWhenSizeNegative(t *testing.T) {
1317
bs := NewBlockstore(syncds.MutexWrap(ds.NewMapDatastore()))
14-
_, err := WriteCached(bs, -1)
15-
if err != nil {
16-
return
18+
_, err := BloomCached(bs, 100, -1)
19+
if err == nil {
20+
t.Fail()
21+
}
22+
_, err = BloomCached(bs, -1, 100)
23+
if err == nil {
24+
t.Fail()
1725
}
18-
t.Fail()
1926
}
2027

2128
func TestRemoveCacheEntryOnDelete(t *testing.T) {
2229
b := blocks.NewBlock([]byte("foo"))
2330
cd := &callbackDatastore{f: func() {}, ds: ds.NewMapDatastore()}
2431
bs := NewBlockstore(syncds.MutexWrap(cd))
25-
cachedbs, err := WriteCached(bs, 1)
32+
cachedbs, err := BloomCached(bs, 1, 1)
2633
if err != nil {
2734
t.Fatal(err)
2835
}
2936
cachedbs.Put(b)
3037

38+
cd.Lock()
3139
writeHitTheDatastore := false
40+
cd.Unlock()
41+
3242
cd.SetFunc(func() {
3343
writeHitTheDatastore = true
3444
})
@@ -43,7 +53,7 @@ func TestRemoveCacheEntryOnDelete(t *testing.T) {
4353
func TestElideDuplicateWrite(t *testing.T) {
4454
cd := &callbackDatastore{f: func() {}, ds: ds.NewMapDatastore()}
4555
bs := NewBlockstore(syncds.MutexWrap(cd))
46-
cachedbs, err := WriteCached(bs, 1)
56+
cachedbs, err := BloomCached(bs, 1, 1)
4757
if err != nil {
4858
t.Fatal(err)
4959
}
@@ -56,36 +66,78 @@ func TestElideDuplicateWrite(t *testing.T) {
5666
})
5767
cachedbs.Put(b1)
5868
}
69+
func TestHasIsBloomCached(t *testing.T) {
70+
cd := &callbackDatastore{f: func() {}, ds: ds.NewMapDatastore()}
71+
bs := NewBlockstore(syncds.MutexWrap(cd))
72+
73+
for i := 0; i < 1000; i++ {
74+
bs.Put(blocks.NewBlock([]byte(fmt.Sprintf("data: %d", i))))
75+
}
76+
cachedbs, err := BloomCached(bs, 256*1024, 128)
77+
if err != nil {
78+
t.Fatal(err)
79+
}
80+
81+
select {
82+
case <-cachedbs.rebuildChan:
83+
case <-time.After(1 * time.Second):
84+
t.Fatalf("Timeout wating for rebuild: %d", cachedbs.bloom.ElementsAdded())
85+
}
86+
87+
cacheFails := 0
88+
cd.SetFunc(func() {
89+
cacheFails++
90+
})
91+
92+
for i := 0; i < 1000; i++ {
93+
cachedbs.Has(blocks.NewBlock([]byte(fmt.Sprintf("data: %d", i+2000))).Key())
94+
}
95+
96+
if float64(cacheFails)/float64(1000) > float64(0.05) {
97+
t.Fatal("Bloom filter has cache miss rate of more than 5%")
98+
}
99+
}
59100

60101
type callbackDatastore struct {
102+
sync.Mutex
61103
f func()
62104
ds ds.Datastore
63105
}
64106

65-
func (c *callbackDatastore) SetFunc(f func()) { c.f = f }
107+
func (c *callbackDatastore) SetFunc(f func()) {
108+
c.Lock()
109+
defer c.Unlock()
110+
c.f = f
111+
}
112+
113+
func (c *callbackDatastore) CallF() {
114+
c.Lock()
115+
defer c.Unlock()
116+
c.f()
117+
}
66118

67119
func (c *callbackDatastore) Put(key ds.Key, value interface{}) (err error) {
68-
c.f()
120+
c.CallF()
69121
return c.ds.Put(key, value)
70122
}
71123

72124
func (c *callbackDatastore) Get(key ds.Key) (value interface{}, err error) {
73-
c.f()
125+
c.CallF()
74126
return c.ds.Get(key)
75127
}
76128

77129
func (c *callbackDatastore) Has(key ds.Key) (exists bool, err error) {
78-
c.f()
130+
c.CallF()
79131
return c.ds.Has(key)
80132
}
81133

82134
func (c *callbackDatastore) Delete(key ds.Key) (err error) {
83-
c.f()
135+
c.CallF()
84136
return c.ds.Delete(key)
85137
}
86138

87139
func (c *callbackDatastore) Query(q dsq.Query) (dsq.Results, error) {
88-
c.f()
140+
c.CallF()
89141
return c.ds.Query(q)
90142
}
91143

0 commit comments

Comments
 (0)