diff --git a/bloom_cache.go b/bloom_cache.go index 927ad12..86c0190 100644 --- a/bloom_cache.go +++ b/bloom_cache.go @@ -2,6 +2,7 @@ package blockstore import ( "context" + "fmt" "sync/atomic" "time" @@ -19,82 +20,95 @@ func bloomCached(ctx context.Context, bs Blockstore, bloomSize, hashCount int) ( if err != nil { return nil, err } - bc := &bloomcache{blockstore: bs, bloom: bl} - bc.hits = metrics.NewCtx(ctx, "bloom.hits_total", - "Number of cache hits in bloom cache").Counter() - bc.total = metrics.NewCtx(ctx, "bloom_total", - "Total number of requests to bloom cache").Counter() - - bc.Invalidate() - go bc.Rebuild(ctx) - if metrics.Active() { - go func() { + bc := &bloomcache{ + blockstore: bs, + bloom: bl, + hits: metrics.NewCtx(ctx, "bloom.hits_total", + "Number of cache hits in bloom cache").Counter(), + total: metrics.NewCtx(ctx, "bloom_total", + "Total number of requests to bloom cache").Counter(), + buildChan: make(chan struct{}), + } + go func() { + err := bc.build(ctx) + if err != nil { + select { + case <-ctx.Done(): + log.Warning("Cache rebuild closed by context finishing: ", err) + default: + log.Error(err) + } + return + } + if metrics.Active() { fill := metrics.NewCtx(ctx, "bloom_fill_ratio", "Ratio of bloom filter fullnes, (updated once a minute)").Gauge() - <-bc.rebuildChan t := time.NewTicker(1 * time.Minute) + defer t.Stop() for { select { case <-ctx.Done(): - t.Stop() return case <-t.C: fill.Set(bc.bloom.FillRatio()) } } - }() - } + } + }() return bc, nil } type bloomcache struct { - bloom *bloom.Bloom active int32 - // This chan is only used for testing to wait for bloom to enable - rebuildChan chan struct{} - blockstore Blockstore + bloom *bloom.Bloom + buildErr error + + buildChan chan struct{} + blockstore Blockstore // Statistics hits metrics.Counter total metrics.Counter } -func (b *bloomcache) Invalidate() { - b.rebuildChan = make(chan struct{}) - atomic.StoreInt32(&b.active, 0) -} - func (b *bloomcache) BloomActive() bool { return atomic.LoadInt32(&b.active) != 0 } -func (b *bloomcache) Rebuild(ctx context.Context) { - evt := log.EventBegin(ctx, "bloomcache.Rebuild") +func (b *bloomcache) Wait(ctx context.Context) error { + select { + case <-ctx.Done(): + return ctx.Err() + case <-b.buildChan: + return b.buildErr + } +} + +func (b *bloomcache) build(ctx context.Context) error { + evt := log.EventBegin(ctx, "bloomcache.build") defer evt.Done() + defer close(b.buildChan) ch, err := b.blockstore.AllKeysChan(ctx) if err != nil { - log.Errorf("AllKeysChan failed in bloomcache rebuild with: %v", err) - return + b.buildErr = fmt.Errorf("AllKeysChan failed in bloomcache rebuild with: %v", err) + return b.buildErr } - finish := false - for !finish { + for { select { case key, ok := <-ch: - if ok { - b.bloom.AddTS(key.Bytes()) // Use binary key, the more compact the better - } else { - finish = true + if !ok { + atomic.StoreInt32(&b.active, 1) + return nil } + b.bloom.AddTS(key.Bytes()) // Use binary key, the more compact the better case <-ctx.Done(): - log.Warning("Cache rebuild closed by context finishing.") - return + b.buildErr = ctx.Err() + return b.buildErr } } - close(b.rebuildChan) - atomic.StoreInt32(&b.active, 1) } func (b *bloomcache) DeleteBlock(k *cid.Cid) error { diff --git a/bloom_cache_test.go b/bloom_cache_test.go index c165eee..86c6d79 100644 --- a/bloom_cache_test.go +++ b/bloom_cache_test.go @@ -37,10 +37,8 @@ func TestPutManyAddsToBloom(t *testing.T) { t.Fatal(err) } - select { - case <-cachedbs.rebuildChan: - case <-ctx.Done(): - t.Fatalf("Timeout wating for rebuild: %d", cachedbs.bloom.ElementsAdded()) + if err := cachedbs.Wait(ctx); err != nil { + t.Fatalf("Failed while waiting for the filter to build: %d", cachedbs.bloom.ElementsAdded()) } block1 := blocks.NewBlock([]byte("foo")) @@ -86,10 +84,8 @@ func TestHasIsBloomCached(t *testing.T) { t.Fatal(err) } - select { - case <-cachedbs.rebuildChan: - case <-ctx.Done(): - t.Fatalf("Timeout wating for rebuild: %d", cachedbs.bloom.ElementsAdded()) + if err := cachedbs.Wait(ctx); err != nil { + t.Fatalf("Failed while waiting for the filter to build: %d", cachedbs.bloom.ElementsAdded()) } cacheFails := 0 @@ -102,7 +98,7 @@ func TestHasIsBloomCached(t *testing.T) { } if float64(cacheFails)/float64(1000) > float64(0.05) { - t.Fatal("Bloom filter has cache miss rate of more than 5%") + t.Fatalf("Bloom filter has cache miss rate of more than 5%%") } cacheFails = 0