diff --git a/core/blockchain.go b/core/blockchain.go
index f67f071e3688..93c40591c6b6 100644
--- a/core/blockchain.go
+++ b/core/blockchain.go
@@ -192,17 +192,6 @@ type txLookup struct {
transaction *types.Transaction
}
-// TxIndexProgress is the struct describing the progress for transaction indexing.
-type TxIndexProgress struct {
- Indexed uint64 // number of blocks whose transactions are indexed
- Remaining uint64 // number of blocks whose transactions are not indexed yet
-}
-
-// Done returns an indicator if the transaction indexing is finished.
-func (prog TxIndexProgress) Done() bool {
- return prog.Remaining == 0
-}
-
// BlockChain represents the canonical chain given a database with a genesis
// block. The Blockchain manages chain imports, reverts, chain reorganisations.
//
@@ -229,13 +218,7 @@ type BlockChain struct {
flushInterval atomic.Int64 // Time interval (processing time) after which to flush a state
triedb *trie.Database // The database handler for maintaining trie nodes.
stateCache state.Database // State database to reuse between imports (contains state cache)
-
- // txLookupLimit is the maximum number of blocks from head whose tx indices
- // are reserved:
- // * 0: means no limit and regenerate any missing indexes
- // * N: means N block limit [HEAD-N+1, HEAD] and delete extra indexes
- // * nil: disable tx reindexer/deleter, but still index new blocks
- txLookupLimit uint64
+ txIndexer *txIndexer // Transaction indexer, might be nil if not enabled
hc *HeaderChain
rmLogsFeed event.Feed
@@ -270,9 +253,6 @@ type BlockChain struct {
stopping atomic.Bool // false if chain is running, true when stopped
procInterrupt atomic.Bool // interrupt signaler for block processing
- txIndexRunning bool // flag if the background tx indexer is activated
- txIndexProgCh chan chan TxIndexProgress // chan for querying the progress of transaction indexing
-
engine consensus.Engine
validator Validator // Block and state validator interface
prefetcher Prefetcher
@@ -320,7 +300,6 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, genesis *Genesis
blockCache: lru.NewCache[common.Hash, *types.Block](blockCacheLimit),
txLookupCache: lru.NewCache[common.Hash, txLookup](txLookupCacheLimit),
futureBlocks: lru.NewCache[common.Hash, *types.Block](maxFutureBlocks),
- txIndexProgCh: make(chan chan TxIndexProgress),
engine: engine,
vmConfig: vmConfig,
}
@@ -485,13 +464,9 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, genesis *Genesis
}
rawdb.WriteChainConfig(db, genesisHash, chainConfig)
}
- // Start tx indexer/unindexer if required.
+ // Start tx indexer if it's enabled.
if txLookupLimit != nil {
- bc.txLookupLimit = *txLookupLimit
- bc.txIndexRunning = true
-
- bc.wg.Add(1)
- go bc.maintainTxIndex()
+ bc.txIndexer = newTxIndexer(*txLookupLimit, bc)
}
return bc, nil
}
@@ -981,7 +956,10 @@ func (bc *BlockChain) stopWithoutSaving() {
if !bc.stopping.CompareAndSwap(false, true) {
return
}
-
+ // Signal shutdown tx indexer.
+ if bc.txIndexer != nil {
+ bc.txIndexer.close()
+ }
// Unsubscribe all subscriptions registered from blockchain.
bc.scope.Close()
@@ -2403,148 +2381,6 @@ func (bc *BlockChain) skipBlock(err error, it *insertIterator) bool {
return false
}
-// indexBlocks reindexes or unindexes transactions depending on user configuration
-func (bc *BlockChain) indexBlocks(tail *uint64, head uint64, done chan struct{}) {
- defer func() { close(done) }()
-
- // If head is 0, it means the chain is just initialized and no blocks are
- // inserted, so don't need to index anything.
- if head == 0 {
- return
- }
- // The tail flag is not existent, it means the node is just initialized
- // and all blocks in the chain (part of them may from ancient store) are
- // not indexed yet, index the chain according to the configuration then.
- if tail == nil {
- from := uint64(0)
- if bc.txLookupLimit != 0 && head >= bc.txLookupLimit {
- from = head - bc.txLookupLimit + 1
- }
- rawdb.IndexTransactions(bc.db, from, head+1, bc.quit, true)
- return
- }
- // The tail flag is existent (which means indexes in [tail, head] should be
- // present), while the whole chain are requested for indexing.
- if bc.txLookupLimit == 0 || head < bc.txLookupLimit {
- if *tail > 0 {
- // It can happen when chain is rewound to a historical point which
- // is even lower than the indexes tail, recap the indexing target
- // to new head to avoid reading non-existent block bodies.
- end := *tail
- if end > head+1 {
- end = head + 1
- }
- rawdb.IndexTransactions(bc.db, 0, end, bc.quit, true)
- }
- return
- }
- // The tail flag is existent, adjust the index range according to configuration
- // and latest head.
- if head-bc.txLookupLimit+1 < *tail {
- // Reindex a part of missing indices and rewind index tail to HEAD-limit
- rawdb.IndexTransactions(bc.db, head-bc.txLookupLimit+1, *tail, bc.quit, true)
- } else {
- // Unindex a part of stale indices and forward index tail to HEAD-limit
- rawdb.UnindexTransactions(bc.db, *tail, head-bc.txLookupLimit+1, bc.quit, false)
- }
-}
-
-// reportTxIndexProgress returns the tx indexing progress.
-func (bc *BlockChain) reportTxIndexProgress(head uint64) TxIndexProgress {
- var (
- remaining uint64
- tail = rawdb.ReadTxIndexTail(bc.db)
- )
- total := bc.txLookupLimit
- if bc.txLookupLimit == 0 {
- total = head + 1 // genesis included
- }
- var indexed uint64
- if tail != nil {
- indexed = head - *tail + 1
- }
- // The value of indexed might be larger than total if some blocks need
- // to be unindexed, avoiding a negative remaining.
- if indexed < total {
- remaining = total - indexed
- }
- return TxIndexProgress{
- Indexed: indexed,
- Remaining: remaining,
- }
-}
-
-// TxIndexProgress retrieves the tx indexing progress, or an error if the
-// background tx indexer is not activated or already stopped.
-func (bc *BlockChain) TxIndexProgress() (TxIndexProgress, error) {
- if !bc.txIndexRunning {
- return TxIndexProgress{}, errors.New("tx indexer is not activated")
- }
- ch := make(chan TxIndexProgress, 1)
- select {
- case bc.txIndexProgCh <- ch:
- return <-ch, nil
- case <-bc.quit:
- return TxIndexProgress{}, errors.New("blockchain is closed")
- }
-}
-
-// maintainTxIndex is responsible for the construction and deletion of the
-// transaction index.
-//
-// User can use flag `txlookuplimit` to specify a "recentness" block, below
-// which ancient tx indices get deleted. If `txlookuplimit` is 0, it means
-// all tx indices will be reserved.
-//
-// The user can adjust the txlookuplimit value for each launch after sync,
-// Geth will automatically construct the missing indices or delete the extra
-// indices.
-func (bc *BlockChain) maintainTxIndex() {
- defer bc.wg.Done()
-
- // Listening to chain events and manipulate the transaction indexes.
- var (
- done chan struct{} // Non-nil if background unindexing or reindexing routine is active.
- lastHead uint64 // The latest announced chain head (whose tx indexes are assumed created)
- headCh = make(chan ChainHeadEvent, 1) // Buffered to avoid locking up the event feed
- )
- sub := bc.SubscribeChainHeadEvent(headCh)
- if sub == nil {
- return
- }
- defer sub.Unsubscribe()
- log.Info("Initialized transaction indexer", "limit", bc.TxLookupLimit())
-
- // Launch the initial processing if chain is not empty (head != genesis).
- // This step is useful in these scenarios that chain has no progress and
- // indexer is never triggered.
- if head := rawdb.ReadHeadBlock(bc.db); head != nil && head.Number().Uint64() != 0 {
- done = make(chan struct{})
- lastHead = head.Number().Uint64()
- go bc.indexBlocks(rawdb.ReadTxIndexTail(bc.db), head.NumberU64(), done)
- }
- for {
- select {
- case head := <-headCh:
- if done == nil {
- done = make(chan struct{})
- go bc.indexBlocks(rawdb.ReadTxIndexTail(bc.db), head.Block.NumberU64(), done)
- }
- lastHead = head.Block.NumberU64()
- case <-done:
- done = nil
- case ch := <-bc.txIndexProgCh:
- ch <- bc.reportTxIndexProgress(lastHead)
- case <-bc.quit:
- if done != nil {
- log.Info("Waiting background transaction indexer to exit")
- <-done
- }
- return
- }
- }
-}
-
// reportBlock logs a bad block error.
func (bc *BlockChain) reportBlock(block *types.Block, receipts types.Receipts, err error) {
rawdb.WriteBadBlock(bc.db, block)
diff --git a/core/blockchain_reader.go b/core/blockchain_reader.go
index 059232946086..6fb09abaccb5 100644
--- a/core/blockchain_reader.go
+++ b/core/blockchain_reader.go
@@ -397,16 +397,12 @@ func (bc *BlockChain) GetVMConfig() *vm.Config {
return &bc.vmConfig
}
-// SetTxLookupLimit is responsible for updating the txlookup limit to the
-// original one stored in db if the new mismatches with the old one.
-func (bc *BlockChain) SetTxLookupLimit(limit uint64) {
- bc.txLookupLimit = limit
-}
-
-// TxLookupLimit retrieves the txlookup limit used by blockchain to prune
-// stale transaction indices.
-func (bc *BlockChain) TxLookupLimit() uint64 {
- return bc.txLookupLimit
+// TxIndexProgress returns the transaction indexing progress.
+func (bc *BlockChain) TxIndexProgress() (TxIndexProgress, error) {
+ if bc.txIndexer == nil {
+ return TxIndexProgress{}, errors.New("tx indexer is not enabled")
+ }
+ return bc.txIndexer.txIndexProgress()
}
// TrieDB retrieves the low level trie database used for data storage.
diff --git a/core/blockchain_test.go b/core/blockchain_test.go
index 71260e44a096..a7880ec2ff79 100644
--- a/core/blockchain_test.go
+++ b/core/blockchain_test.go
@@ -2722,106 +2722,6 @@ func testReorgToShorterRemovesCanonMappingHeaderChain(t *testing.T, scheme strin
}
}
-func TestTransactionIndices(t *testing.T) {
- // Configure and generate a sample block chain
- var (
- key, _ = crypto.HexToECDSA("b71c71a67e1177ad4e901695e1b4b9ee17ae16c6668d313eac2f96dbcda3f291")
- address = crypto.PubkeyToAddress(key.PublicKey)
- funds = big.NewInt(100000000000000000)
- gspec = &Genesis{
- Config: params.TestChainConfig,
- Alloc: GenesisAlloc{address: {Balance: funds}},
- BaseFee: big.NewInt(params.InitialBaseFee),
- }
- signer = types.LatestSigner(gspec.Config)
- )
- _, blocks, receipts := GenerateChainWithGenesis(gspec, ethash.NewFaker(), 128, func(i int, block *BlockGen) {
- tx, err := types.SignTx(types.NewTransaction(block.TxNonce(address), common.Address{0x00}, big.NewInt(1000), params.TxGas, block.header.BaseFee, nil), signer, key)
- if err != nil {
- panic(err)
- }
- block.AddTx(tx)
- })
-
- check := func(tail *uint64, chain *BlockChain) {
- stored := rawdb.ReadTxIndexTail(chain.db)
- if tail == nil && stored != nil {
- t.Fatalf("Oldest indexded block mismatch, want nil, have %d", *stored)
- }
- if tail != nil && *stored != *tail {
- t.Fatalf("Oldest indexded block mismatch, want %d, have %d", *tail, *stored)
- }
- if tail != nil {
- for i := *tail; i <= chain.CurrentBlock().Number.Uint64(); i++ {
- block := rawdb.ReadBlock(chain.db, rawdb.ReadCanonicalHash(chain.db, i), i)
- if block.Transactions().Len() == 0 {
- continue
- }
- for _, tx := range block.Transactions() {
- if index := rawdb.ReadTxLookupEntry(chain.db, tx.Hash()); index == nil {
- t.Fatalf("Miss transaction indice, number %d hash %s", i, tx.Hash().Hex())
- }
- }
- }
- for i := uint64(0); i < *tail; i++ {
- block := rawdb.ReadBlock(chain.db, rawdb.ReadCanonicalHash(chain.db, i), i)
- if block.Transactions().Len() == 0 {
- continue
- }
- for _, tx := range block.Transactions() {
- if index := rawdb.ReadTxLookupEntry(chain.db, tx.Hash()); index != nil {
- t.Fatalf("Transaction indice should be deleted, number %d hash %s", i, tx.Hash().Hex())
- }
- }
- }
- }
- }
- // Init block chain with external ancients, check all needed indices has been indexed.
- limit := []uint64{0, 32, 64, 128}
- for _, l := range limit {
- frdir := t.TempDir()
- ancientDb, _ := rawdb.NewDatabaseWithFreezer(rawdb.NewMemoryDatabase(), frdir, "", false)
- rawdb.WriteAncientBlocks(ancientDb, append([]*types.Block{gspec.ToBlock()}, blocks...), append([]types.Receipts{{}}, receipts...), big.NewInt(0))
-
- l := l
- chain, err := NewBlockChain(ancientDb, nil, gspec, nil, ethash.NewFaker(), vm.Config{}, nil, &l)
- if err != nil {
- t.Fatalf("failed to create tester chain: %v", err)
- }
- chain.indexBlocks(rawdb.ReadTxIndexTail(ancientDb), 128, make(chan struct{}))
-
- var tail uint64
- if l != 0 {
- tail = uint64(128) - l + 1
- }
- check(&tail, chain)
- chain.Stop()
- ancientDb.Close()
- os.RemoveAll(frdir)
- }
-
- // Reconstruct a block chain which only reserves HEAD-64 tx indices
- ancientDb, _ := rawdb.NewDatabaseWithFreezer(rawdb.NewMemoryDatabase(), t.TempDir(), "", false)
- defer ancientDb.Close()
-
- rawdb.WriteAncientBlocks(ancientDb, append([]*types.Block{gspec.ToBlock()}, blocks...), append([]types.Receipts{{}}, receipts...), big.NewInt(0))
- limit = []uint64{0, 64 /* drop stale */, 32 /* shorten history */, 64 /* extend history */, 0 /* restore all */}
- for _, l := range limit {
- l := l
- chain, err := NewBlockChain(ancientDb, nil, gspec, nil, ethash.NewFaker(), vm.Config{}, nil, &l)
- if err != nil {
- t.Fatalf("failed to create tester chain: %v", err)
- }
- var tail uint64
- if l != 0 {
- tail = uint64(128) - l + 1
- }
- chain.indexBlocks(rawdb.ReadTxIndexTail(ancientDb), 128, make(chan struct{}))
- check(&tail, chain)
- chain.Stop()
- }
-}
-
// Benchmarks large blocks with value transfers to non-existing accounts
func benchmarkLargeNumberOfValueToNonexisting(b *testing.B, numTxs, numBlocks int, recipientFn func(uint64) common.Address, dataFn func(uint64) []byte) {
var (
@@ -4018,222 +3918,6 @@ func testCanonicalHashMarker(t *testing.T, scheme string) {
}
}
-// TestTxIndexer tests the tx indexes are updated correctly.
-func TestTxIndexer(t *testing.T) {
- var (
- testBankKey, _ = crypto.GenerateKey()
- testBankAddress = crypto.PubkeyToAddress(testBankKey.PublicKey)
- testBankFunds = big.NewInt(1000000000000000000)
-
- gspec = &Genesis{
- Config: params.TestChainConfig,
- Alloc: GenesisAlloc{testBankAddress: {Balance: testBankFunds}},
- BaseFee: big.NewInt(params.InitialBaseFee),
- }
- engine = ethash.NewFaker()
- nonce = uint64(0)
- )
- _, blocks, receipts := GenerateChainWithGenesis(gspec, engine, 128, func(i int, gen *BlockGen) {
- tx, _ := types.SignTx(types.NewTransaction(nonce, common.HexToAddress("0xdeadbeef"), big.NewInt(1000), params.TxGas, big.NewInt(10*params.InitialBaseFee), nil), types.HomesteadSigner{}, testBankKey)
- gen.AddTx(tx)
- nonce += 1
- })
-
- // verifyIndexes checks if the transaction indexes are present or not
- // of the specified block.
- verifyIndexes := func(db ethdb.Database, number uint64, exist bool) {
- if number == 0 {
- return
- }
- block := blocks[number-1]
- for _, tx := range block.Transactions() {
- lookup := rawdb.ReadTxLookupEntry(db, tx.Hash())
- if exist && lookup == nil {
- t.Fatalf("missing %d %x", number, tx.Hash().Hex())
- }
- if !exist && lookup != nil {
- t.Fatalf("unexpected %d %x", number, tx.Hash().Hex())
- }
- }
- }
- // verifyRange runs verifyIndexes for a range of blocks, from and to are included.
- verifyRange := func(db ethdb.Database, from, to uint64, exist bool) {
- for number := from; number <= to; number += 1 {
- verifyIndexes(db, number, exist)
- }
- }
- verify := func(db ethdb.Database, expTail uint64) {
- tail := rawdb.ReadTxIndexTail(db)
- if tail == nil {
- t.Fatal("Failed to write tx index tail")
- }
- if *tail != expTail {
- t.Fatalf("Unexpected tx index tail, want %v, got %d", expTail, *tail)
- }
- if *tail != 0 {
- verifyRange(db, 0, *tail-1, false)
- }
- verifyRange(db, *tail, 128, true)
- }
- verifyProgress := func(chain *BlockChain) {
- prog := chain.reportTxIndexProgress(128)
- if !prog.Done() {
- t.Fatalf("Expect fully indexed")
- }
- }
-
- var cases = []struct {
- limitA uint64
- tailA uint64
- limitB uint64
- tailB uint64
- limitC uint64
- tailC uint64
- }{
- {
- // LimitA: 0
- // TailA: 0
- //
- // all blocks are indexed
- limitA: 0,
- tailA: 0,
-
- // LimitB: 1
- // TailB: 128
- //
- // block-128 is indexed
- limitB: 1,
- tailB: 128,
-
- // LimitB: 64
- // TailB: 65
- //
- // block [65, 128] are indexed
- limitC: 64,
- tailC: 65,
- },
- {
- // LimitA: 64
- // TailA: 65
- //
- // block [65, 128] are indexed
- limitA: 64,
- tailA: 65,
-
- // LimitB: 1
- // TailB: 128
- //
- // block-128 is indexed
- limitB: 1,
- tailB: 128,
-
- // LimitB: 64
- // TailB: 65
- //
- // block [65, 128] are indexed
- limitC: 64,
- tailC: 65,
- },
- {
- // LimitA: 127
- // TailA: 2
- //
- // block [2, 128] are indexed
- limitA: 127,
- tailA: 2,
-
- // LimitB: 1
- // TailB: 128
- //
- // block-128 is indexed
- limitB: 1,
- tailB: 128,
-
- // LimitB: 64
- // TailB: 65
- //
- // block [65, 128] are indexed
- limitC: 64,
- tailC: 65,
- },
- {
- // LimitA: 128
- // TailA: 1
- //
- // block [2, 128] are indexed
- limitA: 128,
- tailA: 1,
-
- // LimitB: 1
- // TailB: 128
- //
- // block-128 is indexed
- limitB: 1,
- tailB: 128,
-
- // LimitB: 64
- // TailB: 65
- //
- // block [65, 128] are indexed
- limitC: 64,
- tailC: 65,
- },
- {
- // LimitA: 129
- // TailA: 0
- //
- // block [0, 128] are indexed
- limitA: 129,
- tailA: 0,
-
- // LimitB: 1
- // TailB: 128
- //
- // block-128 is indexed
- limitB: 1,
- tailB: 128,
-
- // LimitB: 64
- // TailB: 65
- //
- // block [65, 128] are indexed
- limitC: 64,
- tailC: 65,
- },
- }
- for _, c := range cases {
- frdir := t.TempDir()
- db, _ := rawdb.NewDatabaseWithFreezer(rawdb.NewMemoryDatabase(), frdir, "", false)
- rawdb.WriteAncientBlocks(db, append([]*types.Block{gspec.ToBlock()}, blocks...), append([]types.Receipts{{}}, receipts...), big.NewInt(0))
-
- // Index the initial blocks from ancient store
- chain, _ := NewBlockChain(db, nil, gspec, nil, engine, vm.Config{}, nil, &c.limitA)
- chain.indexBlocks(nil, 128, make(chan struct{}))
- verify(db, c.tailA)
- verifyProgress(chain)
-
- chain.SetTxLookupLimit(c.limitB)
- chain.indexBlocks(rawdb.ReadTxIndexTail(db), 128, make(chan struct{}))
- verify(db, c.tailB)
- verifyProgress(chain)
-
- chain.SetTxLookupLimit(c.limitC)
- chain.indexBlocks(rawdb.ReadTxIndexTail(db), 128, make(chan struct{}))
- verify(db, c.tailC)
- verifyProgress(chain)
-
- // Recover all indexes
- chain.SetTxLookupLimit(0)
- chain.indexBlocks(rawdb.ReadTxIndexTail(db), 128, make(chan struct{}))
- verify(db, 0)
- verifyProgress(chain)
-
- chain.Stop()
- db.Close()
- os.RemoveAll(frdir)
- }
-}
-
func TestCreateThenDeletePreByzantium(t *testing.T) {
// We use Ropsten chain config instead of Testchain config, this is
// deliberate: we want to use pre-byz rules where we have intermediate state roots
diff --git a/core/txindexer.go b/core/txindexer.go
new file mode 100644
index 000000000000..61de41947cee
--- /dev/null
+++ b/core/txindexer.go
@@ -0,0 +1,220 @@
+// Copyright 2024 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see
+
+package core
+
+import (
+ "errors"
+ "fmt"
+
+ "github.com/ethereum/go-ethereum/core/rawdb"
+ "github.com/ethereum/go-ethereum/ethdb"
+ "github.com/ethereum/go-ethereum/log"
+)
+
+// TxIndexProgress is the struct describing the progress for transaction indexing.
+type TxIndexProgress struct {
+ Indexed uint64 // number of blocks whose transactions are indexed
+ Remaining uint64 // number of blocks whose transactions are not indexed yet
+}
+
+// Done returns an indicator if the transaction indexing is finished.
+func (progress TxIndexProgress) Done() bool {
+ return progress.Remaining == 0
+}
+
+// txIndexer is the module responsible for maintaining transaction indexes
+// according to the configured indexing range by users.
+type txIndexer struct {
+ // limit is the maximum number of blocks from head whose tx indexes
+ // are reserved:
+ // * 0: means the entire chain should be indexed
+ // * N: means the latest N blocks [HEAD-N+1, HEAD] should be indexed
+ // and all others shouldn't.
+ limit uint64
+ db ethdb.Database
+ progress chan chan TxIndexProgress
+ term chan chan struct{}
+ closed chan struct{}
+}
+
+// newTxIndexer initializes the transaction indexer.
+func newTxIndexer(limit uint64, chain *BlockChain) *txIndexer {
+ indexer := &txIndexer{
+ limit: limit,
+ db: chain.db,
+ progress: make(chan chan TxIndexProgress),
+ term: make(chan chan struct{}),
+ closed: make(chan struct{}),
+ }
+ go indexer.loop(chain)
+
+ var msg string
+ if limit == 0 {
+ msg = "entire chain"
+ } else {
+ msg = fmt.Sprintf("last %d blocks", limit)
+ }
+ log.Info("Initialized transaction indexer", "range", msg)
+
+ return indexer
+}
+
+// run executes the scheduled indexing/unindexing task in a separate thread.
+// If the stop channel is closed, the task should be terminated as soon as
+// possible, the done channel will be closed once the task is finished.
+func (indexer *txIndexer) run(tail *uint64, head uint64, stop chan struct{}, done chan struct{}) {
+ defer func() { close(done) }()
+
+ // Short circuit if chain is empty and nothing to index.
+ if head == 0 {
+ return
+ }
+ // The tail flag is not existent, it means the node is just initialized
+ // and all blocks in the chain (part of them may from ancient store) are
+ // not indexed yet, index the chain according to the configured limit.
+ if tail == nil {
+ from := uint64(0)
+ if indexer.limit != 0 && head >= indexer.limit {
+ from = head - indexer.limit + 1
+ }
+ rawdb.IndexTransactions(indexer.db, from, head+1, stop, true)
+ return
+ }
+ // The tail flag is existent (which means indexes in [tail, head] should be
+ // present), while the whole chain are requested for indexing.
+ if indexer.limit == 0 || head < indexer.limit {
+ if *tail > 0 {
+ // It can happen when chain is rewound to a historical point which
+ // is even lower than the indexes tail, recap the indexing target
+ // to new head to avoid reading non-existent block bodies.
+ end := *tail
+ if end > head+1 {
+ end = head + 1
+ }
+ rawdb.IndexTransactions(indexer.db, 0, end, stop, true)
+ }
+ return
+ }
+ // The tail flag is existent, adjust the index range according to configured
+ // limit and the latest chain head.
+ if head-indexer.limit+1 < *tail {
+ // Reindex a part of missing indices and rewind index tail to HEAD-limit
+ rawdb.IndexTransactions(indexer.db, head-indexer.limit+1, *tail, stop, true)
+ } else {
+ // Unindex a part of stale indices and forward index tail to HEAD-limit
+ rawdb.UnindexTransactions(indexer.db, *tail, head-indexer.limit+1, stop, false)
+ }
+}
+
+// loop is the scheduler of the indexer, assigning indexing/unindexing tasks depending
+// on the received chain event.
+func (indexer *txIndexer) loop(chain *BlockChain) {
+ defer close(indexer.closed)
+
+ // Listening to chain events and manipulate the transaction indexes.
+ var (
+ stop chan struct{} // Non-nil if background routine is active.
+ done chan struct{} // Non-nil if background routine is active.
+ lastHead uint64 // The latest announced chain head (whose tx indexes are assumed created)
+
+ headCh = make(chan ChainHeadEvent)
+ sub = chain.SubscribeChainHeadEvent(headCh)
+ )
+ defer sub.Unsubscribe()
+
+ // Launch the initial processing if chain is not empty (head != genesis).
+ // This step is useful in these scenarios that chain has no progress.
+ if head := rawdb.ReadHeadBlock(indexer.db); head != nil && head.Number().Uint64() != 0 {
+ stop = make(chan struct{})
+ done = make(chan struct{})
+ lastHead = head.Number().Uint64()
+ go indexer.run(rawdb.ReadTxIndexTail(indexer.db), head.NumberU64(), stop, done)
+ }
+ for {
+ select {
+ case head := <-headCh:
+ if done == nil {
+ stop = make(chan struct{})
+ done = make(chan struct{})
+ go indexer.run(rawdb.ReadTxIndexTail(indexer.db), head.Block.NumberU64(), stop, done)
+ }
+ lastHead = head.Block.NumberU64()
+ case <-done:
+ stop = nil
+ done = nil
+ case ch := <-indexer.progress:
+ ch <- indexer.report(lastHead)
+ case ch := <-indexer.term:
+ if stop != nil {
+ close(stop)
+ }
+ if done != nil {
+ log.Info("Waiting background transaction indexer to exit")
+ <-done
+ }
+ close(ch)
+ return
+ }
+ }
+}
+
+// report returns the tx indexing progress.
+func (indexer *txIndexer) report(head uint64) TxIndexProgress {
+ var (
+ remaining uint64
+ tail = rawdb.ReadTxIndexTail(indexer.db)
+ )
+ total := indexer.limit
+ if indexer.limit == 0 || total > head {
+ total = head + 1 // genesis included
+ }
+ var indexed uint64
+ if tail != nil {
+ indexed = head - *tail + 1
+ }
+ // The value of indexed might be larger than total if some blocks need
+ // to be unindexed, avoiding a negative remaining.
+ if indexed < total {
+ remaining = total - indexed
+ }
+ return TxIndexProgress{
+ Indexed: indexed,
+ Remaining: remaining,
+ }
+}
+
+// txIndexProgress retrieves the tx indexing progress, or an error if the
+// background tx indexer is already stopped.
+func (indexer *txIndexer) txIndexProgress() (TxIndexProgress, error) {
+ ch := make(chan TxIndexProgress, 1)
+ select {
+ case indexer.progress <- ch:
+ return <-ch, nil
+ case <-indexer.closed:
+ return TxIndexProgress{}, errors.New("indexer is closed")
+ }
+}
+
+// close shutdown the indexer. Safe to be called for multiple times.
+func (indexer *txIndexer) close() {
+ ch := make(chan struct{})
+ select {
+ case indexer.term <- ch:
+ <-ch
+ case <-indexer.closed:
+ }
+}
diff --git a/core/txindexer_test.go b/core/txindexer_test.go
new file mode 100644
index 000000000000..66f26edaebcd
--- /dev/null
+++ b/core/txindexer_test.go
@@ -0,0 +1,243 @@
+// Copyright 2024 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see
+
+package core
+
+import (
+ "math/big"
+ "os"
+ "testing"
+
+ "github.com/ethereum/go-ethereum/common"
+ "github.com/ethereum/go-ethereum/consensus/ethash"
+ "github.com/ethereum/go-ethereum/core/rawdb"
+ "github.com/ethereum/go-ethereum/core/types"
+ "github.com/ethereum/go-ethereum/crypto"
+ "github.com/ethereum/go-ethereum/ethdb"
+ "github.com/ethereum/go-ethereum/params"
+)
+
+// TestTxIndexer tests the functionalities for managing transaction indexes.
+func TestTxIndexer(t *testing.T) {
+ var (
+ testBankKey, _ = crypto.GenerateKey()
+ testBankAddress = crypto.PubkeyToAddress(testBankKey.PublicKey)
+ testBankFunds = big.NewInt(1000000000000000000)
+
+ gspec = &Genesis{
+ Config: params.TestChainConfig,
+ Alloc: GenesisAlloc{testBankAddress: {Balance: testBankFunds}},
+ BaseFee: big.NewInt(params.InitialBaseFee),
+ }
+ engine = ethash.NewFaker()
+ nonce = uint64(0)
+ chainHead = uint64(128)
+ )
+ _, blocks, receipts := GenerateChainWithGenesis(gspec, engine, int(chainHead), func(i int, gen *BlockGen) {
+ tx, _ := types.SignTx(types.NewTransaction(nonce, common.HexToAddress("0xdeadbeef"), big.NewInt(1000), params.TxGas, big.NewInt(10*params.InitialBaseFee), nil), types.HomesteadSigner{}, testBankKey)
+ gen.AddTx(tx)
+ nonce += 1
+ })
+
+ // verifyIndexes checks if the transaction indexes are present or not
+ // of the specified block.
+ verifyIndexes := func(db ethdb.Database, number uint64, exist bool) {
+ if number == 0 {
+ return
+ }
+ block := blocks[number-1]
+ for _, tx := range block.Transactions() {
+ lookup := rawdb.ReadTxLookupEntry(db, tx.Hash())
+ if exist && lookup == nil {
+ t.Fatalf("missing %d %x", number, tx.Hash().Hex())
+ }
+ if !exist && lookup != nil {
+ t.Fatalf("unexpected %d %x", number, tx.Hash().Hex())
+ }
+ }
+ }
+ verify := func(db ethdb.Database, expTail uint64, indexer *txIndexer) {
+ tail := rawdb.ReadTxIndexTail(db)
+ if tail == nil {
+ t.Fatal("Failed to write tx index tail")
+ }
+ if *tail != expTail {
+ t.Fatalf("Unexpected tx index tail, want %v, got %d", expTail, *tail)
+ }
+ if *tail != 0 {
+ for number := uint64(0); number < *tail; number += 1 {
+ verifyIndexes(db, number, false)
+ }
+ }
+ for number := *tail; number <= chainHead; number += 1 {
+ verifyIndexes(db, number, true)
+ }
+ progress := indexer.report(chainHead)
+ if !progress.Done() {
+ t.Fatalf("Expect fully indexed")
+ }
+ }
+
+ var cases = []struct {
+ limitA uint64
+ tailA uint64
+ limitB uint64
+ tailB uint64
+ limitC uint64
+ tailC uint64
+ }{
+ {
+ // LimitA: 0
+ // TailA: 0
+ //
+ // all blocks are indexed
+ limitA: 0,
+ tailA: 0,
+
+ // LimitB: 1
+ // TailB: 128
+ //
+ // block-128 is indexed
+ limitB: 1,
+ tailB: 128,
+
+ // LimitB: 64
+ // TailB: 65
+ //
+ // block [65, 128] are indexed
+ limitC: 64,
+ tailC: 65,
+ },
+ {
+ // LimitA: 64
+ // TailA: 65
+ //
+ // block [65, 128] are indexed
+ limitA: 64,
+ tailA: 65,
+
+ // LimitB: 1
+ // TailB: 128
+ //
+ // block-128 is indexed
+ limitB: 1,
+ tailB: 128,
+
+ // LimitB: 64
+ // TailB: 65
+ //
+ // block [65, 128] are indexed
+ limitC: 64,
+ tailC: 65,
+ },
+ {
+ // LimitA: 127
+ // TailA: 2
+ //
+ // block [2, 128] are indexed
+ limitA: 127,
+ tailA: 2,
+
+ // LimitB: 1
+ // TailB: 128
+ //
+ // block-128 is indexed
+ limitB: 1,
+ tailB: 128,
+
+ // LimitB: 64
+ // TailB: 65
+ //
+ // block [65, 128] are indexed
+ limitC: 64,
+ tailC: 65,
+ },
+ {
+ // LimitA: 128
+ // TailA: 1
+ //
+ // block [2, 128] are indexed
+ limitA: 128,
+ tailA: 1,
+
+ // LimitB: 1
+ // TailB: 128
+ //
+ // block-128 is indexed
+ limitB: 1,
+ tailB: 128,
+
+ // LimitB: 64
+ // TailB: 65
+ //
+ // block [65, 128] are indexed
+ limitC: 64,
+ tailC: 65,
+ },
+ {
+ // LimitA: 129
+ // TailA: 0
+ //
+ // block [0, 128] are indexed
+ limitA: 129,
+ tailA: 0,
+
+ // LimitB: 1
+ // TailB: 128
+ //
+ // block-128 is indexed
+ limitB: 1,
+ tailB: 128,
+
+ // LimitB: 64
+ // TailB: 65
+ //
+ // block [65, 128] are indexed
+ limitC: 64,
+ tailC: 65,
+ },
+ }
+ for _, c := range cases {
+ frdir := t.TempDir()
+ db, _ := rawdb.NewDatabaseWithFreezer(rawdb.NewMemoryDatabase(), frdir, "", false)
+ rawdb.WriteAncientBlocks(db, append([]*types.Block{gspec.ToBlock()}, blocks...), append([]types.Receipts{{}}, receipts...), big.NewInt(0))
+
+ // Index the initial blocks from ancient store
+ indexer := &txIndexer{
+ limit: c.limitA,
+ db: db,
+ progress: make(chan chan TxIndexProgress),
+ }
+ indexer.run(nil, 128, make(chan struct{}), make(chan struct{}))
+ verify(db, c.tailA, indexer)
+
+ indexer.limit = c.limitB
+ indexer.run(rawdb.ReadTxIndexTail(db), 128, make(chan struct{}), make(chan struct{}))
+ verify(db, c.tailB, indexer)
+
+ indexer.limit = c.limitC
+ indexer.run(rawdb.ReadTxIndexTail(db), 128, make(chan struct{}), make(chan struct{}))
+ verify(db, c.tailC, indexer)
+
+ // Recover all indexes
+ indexer.limit = 0
+ indexer.run(rawdb.ReadTxIndexTail(db), 128, make(chan struct{}), make(chan struct{}))
+ verify(db, 0, indexer)
+
+ db.Close()
+ os.RemoveAll(frdir)
+ }
+}
diff --git a/internal/ethapi/errors.go b/internal/ethapi/errors.go
index 6171cc4d6b91..b5e668a8050a 100644
--- a/internal/ethapi/errors.go
+++ b/internal/ethapi/errors.go
@@ -71,7 +71,7 @@ func (e *TxIndexingError) Error() string {
// ErrorCode returns the JSON error code for a revert.
// See: https://github.com/ethereum/wiki/wiki/JSON-RPC-Error-Codes-Improvement-Proposal
func (e *TxIndexingError) ErrorCode() int {
- return 3 // TODO tbd
+ return -32000 // to be decided
}
// ErrorData returns the hex encoded revert reason.