Skip to content

core/txpool, miner: speed up blob pool pending retrievals #29008

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Feb 19, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 9 additions & 8 deletions core/txpool/blobpool/blobpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -1456,13 +1456,14 @@ func (p *BlobPool) Pending(minTip *uint256.Int, baseFee *uint256.Int, blobFee *u
pendwaitHist.Update(time.Since(pendStart).Nanoseconds())
defer p.lock.RUnlock()

defer func(start time.Time) {
pendtimeHist.Update(time.Since(start).Nanoseconds())
}(time.Now())
execStart := time.Now()
defer func() {
pendtimeHist.Update(time.Since(execStart).Nanoseconds())
}()

pending := make(map[common.Address][]*txpool.LazyTransaction)
pending := make(map[common.Address][]*txpool.LazyTransaction, len(p.index))
for addr, txs := range p.index {
var lazies []*txpool.LazyTransaction
lazies := make([]*txpool.LazyTransaction, 0, len(txs))
for _, tx := range txs {
// If transaction filtering was requested, discard badly priced ones
if minTip != nil && baseFee != nil {
Expand All @@ -1486,9 +1487,9 @@ func (p *BlobPool) Pending(minTip *uint256.Int, baseFee *uint256.Int, blobFee *u
lazies = append(lazies, &txpool.LazyTransaction{
Pool: p,
Hash: tx.hash,
Time: time.Now(), // TODO(karalabe): Maybe save these and use that?
GasFeeCap: tx.execFeeCap.ToBig(),
GasTipCap: tx.execTipCap.ToBig(),
Time: execStart, // TODO(karalabe): Maybe save these and use that?
GasFeeCap: tx.execFeeCap,
GasTipCap: tx.execTipCap,
Gas: tx.execGas,
BlobGas: tx.blobGas,
})
Expand Down
52 changes: 52 additions & 0 deletions core/txpool/blobpool/blobpool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1288,3 +1288,55 @@ func TestAdd(t *testing.T) {
pool.Close()
}
}

// Benchmarks the time it takes to assemble the lazy pending transaction list
// from the pool contents.
func BenchmarkPoolPending100Mb(b *testing.B) { benchmarkPoolPending(b, 100_000_000) }
func BenchmarkPoolPending1GB(b *testing.B) { benchmarkPoolPending(b, 1_000_000_000) }
func BenchmarkPoolPending10GB(b *testing.B) { benchmarkPoolPending(b, 10_000_000_000) }

func benchmarkPoolPending(b *testing.B, datacap uint64) {
// Calculate the maximum number of transaction that would fit into the pool
// and generate a set of random accounts to seed them with.
capacity := datacap / params.BlobTxBlobGasPerBlob

var (
keys = make([]*ecdsa.PrivateKey, capacity)
addrs = make([]common.Address, capacity)
)
for i := 0; i < int(capacity); i++ {
keys[i], _ = crypto.GenerateKey()
addrs[i] = crypto.PubkeyToAddress(keys[i].PublicKey)
}
// Create the empty blob pool, initialized with the accounts
statedb, _ := state.New(types.EmptyRootHash, state.NewDatabase(rawdb.NewDatabase(memorydb.New())), nil)
for _, addr := range addrs {
statedb.AddBalance(addr, uint256.NewInt(1_000_000_000))
}
statedb.Commit(0, true)

chain := &testBlockChain{
config: testChainConfig,
basefee: uint256.NewInt(1050),
blobfee: uint256.NewInt(105),
statedb: statedb,
}
pool := New(Config{Datadir: ""}, chain)
if err := pool.Init(1, chain.CurrentBlock(), makeAddressReserver()); err != nil {
b.Fatalf("failed to create blob pool: %v", err)
}
defer pool.Close()

// Fill the pool up with one random transaction from each account with the
// same price and everything to maximize the worst case scenario
for _, key := range keys {
pool.add(makeTx(0, 10, chain.basefee.Uint64()+10, chain.blobfee.Uint64(), key))
}
// Benchmark assembling the pending
b.ResetTimer()
b.ReportAllocs()

for i := 0; i < b.N; i++ {
pool.Pending(uint256.NewInt(1), chain.basefee, chain.blobfee)
}
}
4 changes: 2 additions & 2 deletions core/txpool/legacypool/legacypool.go
Original file line number Diff line number Diff line change
Expand Up @@ -559,8 +559,8 @@ func (pool *LegacyPool) Pending(minTip *uint256.Int, baseFee *uint256.Int, blobF
Hash: txs[i].Hash(),
Tx: txs[i],
Time: txs[i].Time(),
GasFeeCap: txs[i].GasFeeCap(),
GasTipCap: txs[i].GasTipCap(),
GasFeeCap: uint256.MustFromBig(txs[i].GasFeeCap()),
GasTipCap: uint256.MustFromBig(txs[i].GasTipCap()),
Gas: txs[i].Gas(),
BlobGas: txs[i].BlobGas(),
}
Expand Down
6 changes: 3 additions & 3 deletions core/txpool/subpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,9 @@ type LazyTransaction struct {
Hash common.Hash // Transaction hash to pull up if needed
Tx *types.Transaction // Transaction if already resolved

Time time.Time // Time when the transaction was first seen
GasFeeCap *big.Int // Maximum fee per gas the transaction may consume
GasTipCap *big.Int // Maximum miner tip per gas the transaction can pay
Time time.Time // Time when the transaction was first seen
GasFeeCap *uint256.Int // Maximum fee per gas the transaction may consume
GasTipCap *uint256.Int // Maximum miner tip per gas the transaction can pay

Gas uint64 // Amount of gas required by the transaction
BlobGas uint64 // Amount of blob gas required by the transaction
Expand Down
4 changes: 2 additions & 2 deletions eth/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,8 +112,8 @@ func (p *testTxPool) Pending(minTip *uint256.Int, baseFee *uint256.Int, blobFee
Hash: tx.Hash(),
Tx: tx,
Time: tx.Time(),
GasFeeCap: tx.GasFeeCap(),
GasTipCap: tx.GasTipCap(),
GasFeeCap: uint256.MustFromBig(tx.GasFeeCap()),
GasTipCap: uint256.MustFromBig(tx.GasTipCap()),
Gas: tx.Gas(),
BlobGas: tx.BlobGas(),
})
Expand Down
26 changes: 17 additions & 9 deletions miner/ordering.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,28 +21,31 @@ import (
"math/big"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/math"
"github.com/ethereum/go-ethereum/core/txpool"
"github.com/ethereum/go-ethereum/core/types"
"github.com/holiman/uint256"
)

// txWithMinerFee wraps a transaction with its gas price or effective miner gasTipCap
type txWithMinerFee struct {
tx *txpool.LazyTransaction
from common.Address
fees *big.Int
fees *uint256.Int
}

// newTxWithMinerFee creates a wrapped transaction, calculating the effective
// miner gasTipCap if a base fee is provided.
// Returns error in case of a negative effective miner gasTipCap.
func newTxWithMinerFee(tx *txpool.LazyTransaction, from common.Address, baseFee *big.Int) (*txWithMinerFee, error) {
tip := new(big.Int).Set(tx.GasTipCap)
func newTxWithMinerFee(tx *txpool.LazyTransaction, from common.Address, baseFee *uint256.Int) (*txWithMinerFee, error) {
tip := new(uint256.Int).Set(tx.GasTipCap)
if baseFee != nil {
if tx.GasFeeCap.Cmp(baseFee) < 0 {
return nil, types.ErrGasFeeCapTooLow
}
tip = math.BigMin(tx.GasTipCap, new(big.Int).Sub(tx.GasFeeCap, baseFee))
tip = new(uint256.Int).Sub(tx.GasFeeCap, baseFee)
if tip.Gt(tx.GasTipCap) {
tip = tx.GasTipCap
}
}
return &txWithMinerFee{
tx: tx,
Expand Down Expand Up @@ -87,7 +90,7 @@ type transactionsByPriceAndNonce struct {
txs map[common.Address][]*txpool.LazyTransaction // Per account nonce-sorted list of transactions
heads txByPriceAndTime // Next transaction for each unique account (price heap)
signer types.Signer // Signer for the set of transactions
baseFee *big.Int // Current base fee
baseFee *uint256.Int // Current base fee
}

// newTransactionsByPriceAndNonce creates a transaction set that can retrieve
Expand All @@ -96,10 +99,15 @@ type transactionsByPriceAndNonce struct {
// Note, the input map is reowned so the caller should not interact any more with
// if after providing it to the constructor.
func newTransactionsByPriceAndNonce(signer types.Signer, txs map[common.Address][]*txpool.LazyTransaction, baseFee *big.Int) *transactionsByPriceAndNonce {
// Convert the basefee from header format to uint256 format
var baseFeeUint *uint256.Int
if baseFee != nil {
baseFeeUint = uint256.MustFromBig(baseFee)
}
// Initialize a price and received time based heap with the head transactions
heads := make(txByPriceAndTime, 0, len(txs))
for from, accTxs := range txs {
wrapped, err := newTxWithMinerFee(accTxs[0], from, baseFee)
wrapped, err := newTxWithMinerFee(accTxs[0], from, baseFeeUint)
if err != nil {
delete(txs, from)
continue
Expand All @@ -114,12 +122,12 @@ func newTransactionsByPriceAndNonce(signer types.Signer, txs map[common.Address]
txs: txs,
heads: heads,
signer: signer,
baseFee: baseFee,
baseFee: baseFeeUint,
}
}

// Peek returns the next transaction by price.
func (t *transactionsByPriceAndNonce) Peek() (*txpool.LazyTransaction, *big.Int) {
func (t *transactionsByPriceAndNonce) Peek() (*txpool.LazyTransaction, *uint256.Int) {
if len(t.heads) == 0 {
return nil, nil
}
Expand Down
9 changes: 5 additions & 4 deletions miner/ordering_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/ethereum/go-ethereum/core/txpool"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/crypto"
"github.com/holiman/uint256"
)

func TestTransactionPriceNonceSortLegacy(t *testing.T) {
Expand Down Expand Up @@ -92,8 +93,8 @@ func testTransactionPriceNonceSort(t *testing.T, baseFee *big.Int) {
Hash: tx.Hash(),
Tx: tx,
Time: tx.Time(),
GasFeeCap: tx.GasFeeCap(),
GasTipCap: tx.GasTipCap(),
GasFeeCap: uint256.MustFromBig(tx.GasFeeCap()),
GasTipCap: uint256.MustFromBig(tx.GasTipCap()),
Gas: tx.Gas(),
BlobGas: tx.BlobGas(),
})
Expand Down Expand Up @@ -160,8 +161,8 @@ func TestTransactionTimeSort(t *testing.T) {
Hash: tx.Hash(),
Tx: tx,
Time: tx.Time(),
GasFeeCap: tx.GasFeeCap(),
GasTipCap: tx.GasTipCap(),
GasFeeCap: uint256.MustFromBig(tx.GasFeeCap()),
GasTipCap: uint256.MustFromBig(tx.GasTipCap()),
Gas: tx.Gas(),
BlobGas: tx.BlobGas(),
})
Expand Down
18 changes: 9 additions & 9 deletions miner/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ type worker struct {
mu sync.RWMutex // The lock used to protect the coinbase and extra fields
coinbase common.Address
extra []byte
tip *big.Int // Minimum tip needed for non-local transaction to include them
tip *uint256.Int // Minimum tip needed for non-local transaction to include them

pendingMu sync.RWMutex
pendingTasks map[common.Hash]*task
Expand Down Expand Up @@ -253,7 +253,7 @@ func newWorker(config *Config, chainConfig *params.ChainConfig, engine consensus
isLocalBlock: isLocalBlock,
coinbase: config.Etherbase,
extra: config.ExtraData,
tip: config.GasPrice,
tip: uint256.MustFromBig(config.GasPrice),
pendingTasks: make(map[common.Hash]*task),
txsCh: make(chan core.NewTxsEvent, txChanSize),
chainHeadCh: make(chan core.ChainHeadEvent, chainHeadChanSize),
Expand Down Expand Up @@ -334,7 +334,7 @@ func (w *worker) setExtra(extra []byte) {
func (w *worker) setGasTip(tip *big.Int) {
w.mu.Lock()
defer w.mu.Unlock()
w.tip = tip
w.tip = uint256.MustFromBig(tip)
}

// setRecommitInterval updates the interval for miner sealing work recommitting.
Expand Down Expand Up @@ -556,15 +556,15 @@ func (w *worker) mainLoop() {
Hash: tx.Hash(),
Tx: nil, // Do *not* set this! We need to resolve it later to pull blobs in
Time: tx.Time(),
GasFeeCap: tx.GasFeeCap(),
GasTipCap: tx.GasTipCap(),
GasFeeCap: uint256.MustFromBig(tx.GasFeeCap()),
GasTipCap: uint256.MustFromBig(tx.GasTipCap()),
Gas: tx.Gas(),
BlobGas: tx.BlobGas(),
})
}
txset := newTransactionsByPriceAndNonce(w.current.signer, txs, w.current.header.BaseFee)
tcount := w.current.tcount
w.commitTransactions(w.current, txset, nil, new(big.Int))
w.commitTransactions(w.current, txset, nil, new(uint256.Int))

// Only update the snapshot if any new transactions were added
// to the pending block
Expand Down Expand Up @@ -802,7 +802,7 @@ func (w *worker) applyTransaction(env *environment, tx *types.Transaction) (*typ
return receipt, err
}

func (w *worker) commitTransactions(env *environment, txs *transactionsByPriceAndNonce, interrupt *atomic.Int32, minTip *big.Int) error {
func (w *worker) commitTransactions(env *environment, txs *transactionsByPriceAndNonce, interrupt *atomic.Int32, minTip *uint256.Int) error {
gasLimit := env.header.GasLimit
if env.gasPool == nil {
env.gasPool = new(core.GasPool).AddGas(gasLimit)
Expand Down Expand Up @@ -1013,7 +1013,7 @@ func (w *worker) fillTransactions(interrupt *atomic.Int32, env *environment) err
if env.header.ExcessBlobGas != nil {
blobFee = uint256.MustFromBig(eip4844.CalcBlobFee(*env.header.ExcessBlobGas))
}
pending := w.eth.TxPool().Pending(uint256.MustFromBig(tip), baseFee, blobFee)
pending := w.eth.TxPool().Pending(tip, baseFee, blobFee)

// Split the pending transactions into locals and remotes.
localTxs, remoteTxs := make(map[common.Address][]*txpool.LazyTransaction), pending
Expand All @@ -1027,7 +1027,7 @@ func (w *worker) fillTransactions(interrupt *atomic.Int32, env *environment) err
// Fill the block with all available pending transactions.
if len(localTxs) > 0 {
txs := newTransactionsByPriceAndNonce(env.signer, localTxs, env.header.BaseFee)
if err := w.commitTransactions(env, txs, interrupt, new(big.Int)); err != nil {
if err := w.commitTransactions(env, txs, interrupt, new(uint256.Int)); err != nil {
return err
}
}
Expand Down