Skip to content

Commit c3edb50

Browse files
committed
Add backfilling transactions bloom to freezer
1 parent 10fdb2f commit c3edb50

9 files changed

+165
-4
lines changed

core/rawdb/chain_freezer.go

Lines changed: 93 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -192,6 +192,7 @@ func (f *chainFreezer) freeze(db ethdb.KeyValueStore) {
192192
return
193193
}
194194
}
195+
195196
threshold, err := f.freezeThreshold(nfdb)
196197
if err != nil {
197198
backoff = true
@@ -200,6 +201,50 @@ func (f *chainFreezer) freeze(db ethdb.KeyValueStore) {
200201
}
201202
frozen, _ := f.Ancients() // no error will occur, safe to ignore
202203

204+
// Back fill transactions bloom untill it catches up then resume normal freezing
205+
txbFrozen, err := f.AncientItems(ChainFreezerTransactionBloomTable)
206+
if err != nil {
207+
log.Error("Failed to check frozen transaction bloom", "err", err)
208+
backoff = true
209+
continue
210+
}
211+
212+
if txbFrozen < frozen {
213+
var (
214+
first = txbFrozen
215+
last = threshold
216+
)
217+
if last-first+1 > freezerBatchLimit {
218+
last = freezerBatchLimit + first - 1
219+
}
220+
// Don't go ahead of the rest of the frozen datas
221+
if last > frozen {
222+
last = frozen - 1
223+
}
224+
225+
log.Debug("Freezing historical tx bloom", "from", first, "to", last)
226+
txAncients, err := f.freezeTxBloomRange(nfdb, first, last)
227+
if err != nil {
228+
log.Error("Error in tx bloom freeze operation", "err", err)
229+
backoff = true
230+
continue
231+
}
232+
233+
// Wipe out all data from the active database
234+
batch := db.NewBatch()
235+
for i := 0; i < len(txAncients); i++ {
236+
// Always keep the genesis block in active database
237+
if first+uint64(i) != 0 {
238+
DeleteTxBloom(batch, txAncients[i], first+uint64(i))
239+
}
240+
}
241+
if err := batch.Write(); err != nil {
242+
log.Crit("Failed to delete frozen canonical blocks", "err", err)
243+
}
244+
batch.Reset()
245+
continue
246+
}
247+
203248
// Short circuit if the blocks below threshold are already frozen.
204249
if frozen != 0 && frozen-1 >= threshold {
205250
backoff = true
@@ -353,7 +398,7 @@ func (f *chainFreezer) freezeRange(nfdb *nofreezedb, number, limit uint64) (hash
353398
// TODO this can throw an error when rewinding to a block
354399
txBloom := ReadTxBloomRLP(nfdb, hash, number)
355400
if len(txBloom) == 0 && !outOfShard {
356-
return fmt.Errorf("total transaction bloom, can't freeze block %d", number)
401+
return fmt.Errorf("total transaction bloom missing, can't freeze block %d", number)
357402
}
358403

359404
// Write to the batch.
@@ -379,3 +424,50 @@ func (f *chainFreezer) freezeRange(nfdb *nofreezedb, number, limit uint64) (hash
379424
})
380425
return hashes, err
381426
}
427+
428+
// Back fill transactions bloom data
429+
func (f *chainFreezer) freezeTxBloomRange(nfdb *nofreezedb, number, limit uint64) (hashes []common.Hash, err error) {
430+
hashes = make([]common.Hash, 0, limit-number+1)
431+
432+
dataConfig := ReadChainDataConfig(nfdb)
433+
434+
_, err = f.ModifyAncients(func(op ethdb.AncientWriteOp) error {
435+
for ; number <= limit; number++ {
436+
437+
// If the data is out of the shard range then we allow writing empty data, this will allow truncating the tail of the freezer later
438+
outOfShard := false
439+
if dataConfig != nil && dataConfig.DesiredChainDataStart != nil {
440+
outOfShard = number < *dataConfig.DesiredChainDataStart
441+
}
442+
443+
// Retrieve all the components of the canonical block.
444+
hash := ReadCanonicalHash(nfdb, number)
445+
if hash == (common.Hash{}) {
446+
// Get the hash from the freezer, its probably already frozen
447+
data, err := f.AncientStore.Ancient(ChainFreezerHashTable, number)
448+
if err != nil || len(data) == 0 {
449+
return fmt.Errorf("canonical hash missing from freezer, can't freeze block %d", number)
450+
}
451+
hash = common.BytesToHash(data)
452+
if hash == (common.Hash{}) && !outOfShard {
453+
return fmt.Errorf("canonical hash missing, can't freeze block %d", number)
454+
}
455+
}
456+
// TODO this can throw an error when rewinding to a block
457+
// This can happen when the tx bloom indexer has not yet indexed the block, it will abort the current batch but eventually complete
458+
txBloom := ReadTxBloomRLP(nfdb, hash, number)
459+
if len(txBloom) == 0 && !outOfShard {
460+
return fmt.Errorf("total transaction bloom missing, can't freeze block %d", number)
461+
}
462+
463+
// Write to the batch.
464+
if err := op.AppendRaw(ChainFreezerTransactionBloomTable, number, txBloom); err != nil {
465+
return fmt.Errorf("can't write transaction bloom to Freezer: %v", err)
466+
}
467+
468+
hashes = append(hashes, hash)
469+
}
470+
return nil
471+
})
472+
return hashes, err
473+
}

core/rawdb/database.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,11 @@ func (db *nofreezedb) AncientSize(kind string) (uint64, error) {
111111
return 0, errNotSupported
112112
}
113113

114+
// AncientItems returns the number of items the ancient of the specified category.
115+
func (db *nofreezedb) AncientItems(kind string) (uint64, error) {
116+
return 0, errNotSupported
117+
}
118+
114119
// ModifyAncients is not supported.
115120
func (db *nofreezedb) ModifyAncients(func(ethdb.AncientWriteOp) error) (int64, error) {
116121
return 0, errNotSupported

core/rawdb/freezer.go

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -236,6 +236,19 @@ func (f *Freezer) AncientSize(kind string) (uint64, error) {
236236
return 0, errUnknownTable
237237
}
238238

239+
// AncientItems returns the number of items the ancient of the specified category.
240+
func (f *Freezer) AncientItems(kind string) (uint64, error) {
241+
// This needs the write lock to avoid data races on table fields.
242+
// Speed doesn't matter here, AncientSize is for debugging.
243+
f.writeLock.RLock()
244+
defer f.writeLock.RUnlock()
245+
246+
if table := f.tables[kind]; table != nil {
247+
return table.items.Load(), nil
248+
}
249+
return 0, errUnknownTable
250+
}
251+
239252
// ReadAncients runs the given read operation while ensuring that no writes take place
240253
// on the underlying freezer.
241254
func (f *Freezer) ReadAncients(fn func(ethdb.AncientReaderOp) error) (err error) {
@@ -373,9 +386,13 @@ func (f *Freezer) repair() error {
373386
head = uint64(math.MaxUint64)
374387
tail = uint64(0)
375388
)
376-
for _, table := range f.tables {
389+
for kind, table := range f.tables {
377390
items := table.items.Load()
378391
if head > items {
392+
// Allow the transactions bloom to be behind, this will be backfilled later on
393+
if kind == ChainFreezerTransactionBloomTable {
394+
continue
395+
}
379396
head = items
380397
}
381398
hidden := table.itemHidden.Load()

core/rawdb/freezer_batch.go

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -64,11 +64,26 @@ func (batch *freezerBatch) reset() {
6464
func (batch *freezerBatch) commit() (item uint64, writeSize int64, err error) {
6565
// Check that count agrees on all batches.
6666
item = uint64(math.MaxUint64)
67+
maxData := uint64(math.MaxUint64)
68+
69+
// This has been modified for transactions bloom back filling
70+
// Check that tables with data have the same count
71+
for name, tb := range batch.tables {
72+
if tb.totalBytes > 0 {
73+
if maxData == uint64(math.MaxUint64) {
74+
maxData = tb.curItem
75+
} else if maxData != tb.curItem {
76+
return 0, 0, fmt.Errorf("table %s is at item %d, want %d", name, tb.curItem, item)
77+
}
78+
}
79+
}
80+
// Find the max value
6781
for name, tb := range batch.tables {
68-
if item < math.MaxUint64 && tb.curItem != item {
82+
if item == uint64(math.MaxUint64) || item < tb.curItem {
83+
item = tb.curItem
84+
} else if item != tb.curItem && tb.totalBytes == 0 {
6985
return 0, 0, fmt.Errorf("table %s is at item %d, want %d", name, tb.curItem, item)
7086
}
71-
item = tb.curItem
7287
}
7388

7489
// Commit all table batches.

core/rawdb/freezer_memory.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -302,6 +302,17 @@ func (f *MemoryFreezer) AncientSize(kind string) (uint64, error) {
302302
return 0, errUnknownTable
303303
}
304304

305+
// AncientItems returns the number of items the ancient of the specified category.
306+
func (f *MemoryFreezer) AncientItems(kind string) (uint64, error) {
307+
f.lock.RLock()
308+
defer f.lock.RUnlock()
309+
310+
if table := f.tables[kind]; table != nil {
311+
return table.items, nil
312+
}
313+
return 0, errUnknownTable
314+
}
315+
305316
// ReadAncients runs the given read operation while ensuring that no writes take place
306317
// on the underlying freezer.
307318
func (f *MemoryFreezer) ReadAncients(fn func(ethdb.AncientReaderOp) error) (err error) {

core/rawdb/freezer_resettable.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -159,6 +159,14 @@ func (f *resettableFreezer) AncientSize(kind string) (uint64, error) {
159159
return f.freezer.AncientSize(kind)
160160
}
161161

162+
// AncientItems returns the number of items the ancient of the specified category.
163+
func (f *resettableFreezer) AncientItems(kind string) (uint64, error) {
164+
f.lock.RLock()
165+
defer f.lock.RUnlock()
166+
167+
return f.freezer.AncientItems(kind)
168+
}
169+
162170
// ReadAncients runs the given read operation while ensuring that no writes take place
163171
// on the underlying freezer.
164172
func (f *resettableFreezer) ReadAncients(fn func(ethdb.AncientReaderOp) error) (err error) {

core/rawdb/table.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,12 @@ func (t *table) AncientSize(kind string) (uint64, error) {
8686
return t.db.AncientSize(kind)
8787
}
8888

89+
// AncientItems is a noop passthrough that just forwards the request to the underlying
90+
// database.
91+
func (t *table) AncientItems(kind string) (uint64, error) {
92+
return t.db.AncientItems(kind)
93+
}
94+
8995
// ModifyAncients runs an ancient write operation on the underlying database.
9096
func (t *table) ModifyAncients(fn func(ethdb.AncientWriteOp) error) (int64, error) {
9197
return t.db.ModifyAncients(fn)

ethdb/database.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,9 @@ type AncientReaderOp interface {
101101

102102
// AncientSize returns the ancient size of the specified category.
103103
AncientSize(kind string) (uint64, error)
104+
105+
// AncientItems returns the number of items the ancient of the specified category.
106+
AncientItems(kind string) (uint64, error)
104107
}
105108

106109
// AncientReader is the extended ancient reader interface including 'batched' or 'atomic' reading.

ethdb/remotedb/remotedb.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,10 @@ func (db *Database) AncientSize(kind string) (uint64, error) {
8282
panic("not supported")
8383
}
8484

85+
func (db *Database) AncientItems(kind string) (uint64, error) {
86+
panic("not supported")
87+
}
88+
8589
func (db *Database) ReadAncients(fn func(op ethdb.AncientReaderOp) error) (err error) {
8690
return fn(db)
8791
}

0 commit comments

Comments
 (0)