Skip to content

miner, core/txpool: refactor processing of pending transactions #29025

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
89 changes: 65 additions & 24 deletions core/txpool/blobpool/blobpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -1446,11 +1446,15 @@ func (p *BlobPool) drop() {
//
// The transactions can also be pre-filtered by the dynamic fee components to
// reduce allocations and load on downstream subsystems.
func (p *BlobPool) Pending(filter txpool.PendingFilter) map[common.Address][]*txpool.LazyTransaction {
func (p *BlobPool) Pending(filter txpool.PendingFilter) txpool.Pending {
// If only plain transactions are requested, this pool is unsuitable as it
// contains none, don't even bother.
if filter.OnlyPlainTxs {
return nil
return txpool.EmptyPending
}
if filter.OnlyLocals {
// There is no notion of local accounts in the blob pool.
return txpool.EmptyPending
}
// Track the amount of time waiting to retrieve the list of pending blob txs
// from the pool and the amount of time actually spent on assembling the data.
Expand All @@ -1466,19 +1470,30 @@ func (p *BlobPool) Pending(filter txpool.PendingFilter) map[common.Address][]*tx
pendtimeHist.Update(time.Since(execStart).Nanoseconds())
}()

pending := make(map[common.Address][]*txpool.LazyTransaction, len(p.index))
var (
baseFee = new(uint256.Int)
heads = make(txpool.TipList, 0, len(p.index))
tails = make(map[common.Address][]*txpool.LazyTransaction, len(p.index))
)
if filter.BaseFee != nil {
baseFee = filter.BaseFee
}
for addr, txs := range p.index {
lazies := make([]*txpool.LazyTransaction, 0, len(txs))
for _, tx := range txs {
// If transaction filtering was requested, discard badly priced ones
if filter.MinTip != nil && filter.BaseFee != nil {
var (
tail []*txpool.LazyTransaction
)
for i, tx := range txs {
var tip *uint256.Int
if filter.BaseFee != nil {
if tx.execFeeCap.Lt(filter.BaseFee) {
break // basefee too low, cannot be included, discard rest of txs from the account
}
tip := new(uint256.Int).Sub(tx.execFeeCap, filter.BaseFee)
if tip.Gt(tx.execTipCap) {
tip = tx.execTipCap
}
}
tip = new(uint256.Int).Sub(tx.execFeeCap, baseFee)
if tip.Gt(tx.execTipCap) {
tip = tx.execTipCap
}
if filter.MinTip != nil {
if tip.Lt(filter.MinTip) {
break // allowed or remaining tip too low, cannot be included, discard rest of txs from the account
}
Expand All @@ -1488,22 +1503,48 @@ func (p *BlobPool) Pending(filter txpool.PendingFilter) map[common.Address][]*tx
break // blobfee too low, cannot be included, discard rest of txs from the account
}
}
// Transaction was accepted according to the filter, append to the pending list
lazies = append(lazies, &txpool.LazyTransaction{
Pool: p,
Hash: tx.hash,
Time: execStart, // TODO(karalabe): Maybe save these and use that?
GasFeeCap: tx.execFeeCap,
GasTipCap: tx.execTipCap,
Gas: tx.execGas,
BlobGas: tx.blobGas,
})
// Transaction was accepted according to the filter, append to the pending set
lazyTx := &txpool.LazyTransaction{
Pool: p,
Hash: tx.hash,
Time: execStart,
Fees: *tip,
Gas: tx.execGas,
BlobGas: tx.blobGas,
}
if len(tail) == 0 {
tail = make([]*txpool.LazyTransaction, 0, len(txs)-i)
heads = append(heads, &txpool.TxTips{
From: addr,
Tips: lazyTx.Fees,
Time: lazyTx.Time.UnixNano(),
})
}
tail = append(tail, lazyTx)
}
if len(tail) > 0 {
tails[addr] = tail
}
if len(lazies) > 0 {
pending[addr] = lazies
}
return txpool.NewPendingSet(heads, tails)
}

// PendingHashes retrieves the hashes of all currently processable transactions.
// The returned list is grouped by origin account and sorted by nonce
func (p *BlobPool) PendingHashes(filter txpool.PendingFilter) []common.Hash {
if filter.OnlyPlainTxs || filter.OnlyLocals {
return nil
}
p.lock.RLock()
defer p.lock.RUnlock()

var hashes = make([]common.Hash, 0, len(p.index))
for _, txs := range p.index {
for _, tx := range txs {
hashes = append(hashes, tx.hash)
}
}
return pending
return hashes
}

// updateStorageMetrics retrieves a bunch of stats from the data store and pushes
Expand Down
17 changes: 12 additions & 5 deletions core/txpool/blobpool/blobpool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1356,15 +1356,22 @@ func benchmarkPoolPending(b *testing.B, datacap uint64) {
// Benchmark assembling the pending
b.ResetTimer()
b.ReportAllocs()

var p txpool.Pending
for i := 0; i < b.N; i++ {
p := pool.Pending(txpool.PendingFilter{
p = pool.Pending(txpool.PendingFilter{
MinTip: uint256.NewInt(1),
BaseFee: chain.basefee,
BlobFee: chain.blobfee,
})
if len(p) != int(capacity) {
b.Fatalf("have %d want %d", len(p), capacity)
}
}
b.StopTimer()

count := 0
for ltx, _ := p.Peek(); ltx != nil; ltx, _ = p.Peek() {
p.Shift()
count++
}
if count != int(capacity) {
b.Fatalf("have %d want %d", count, capacity)
}
}
103 changes: 70 additions & 33 deletions core/txpool/legacypool/legacypool.go
Original file line number Diff line number Diff line change
Expand Up @@ -522,57 +522,94 @@ func (pool *LegacyPool) ContentFrom(addr common.Address) ([]*types.Transaction,
//
// The transactions can also be pre-filtered by the dynamic fee components to
// reduce allocations and load on downstream subsystems.
func (pool *LegacyPool) Pending(filter txpool.PendingFilter) map[common.Address][]*txpool.LazyTransaction {
func (pool *LegacyPool) Pending(filter txpool.PendingFilter) txpool.Pending {
// If only blob transactions are requested, this pool is unsuitable as it
// contains none, don't even bother.
if filter.OnlyBlobTxs {
return nil
return txpool.EmptyPending
}
pool.mu.Lock()
defer pool.mu.Unlock()

// Convert the new uint256.Int types to the old big.Int ones used by the legacy pool
var (
minTipBig *big.Int
baseFeeBig *big.Int
baseFee = new(uint256.Int)
minTip = new(uint256.Int)
heads = make(txpool.TipList, 0, len(pool.pending))
tails = make(map[common.Address][]*txpool.LazyTransaction, len(pool.pending))
)
if filter.MinTip != nil {
minTipBig = filter.MinTip.ToBig()
minTip = filter.MinTip
}
if filter.BaseFee != nil {
baseFeeBig = filter.BaseFee.ToBig()
baseFee = filter.BaseFee
}
pending := make(map[common.Address][]*txpool.LazyTransaction, len(pool.pending))
baseFeeBig := baseFee.ToBig()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like you dropped the if filter.BaseFee != nil branch?

for addr, list := range pool.pending {
txs := list.Flatten()

// If the miner requests tip enforcement, cap the lists now
if minTipBig != nil && !pool.locals.contains(addr) {
for i, tx := range txs {
if tx.EffectiveGasTipIntCmp(minTipBig, baseFeeBig) < 0 {
txs = txs[:i]
break
}
}
if filter.NoLocals && pool.locals.contains(addr) {
continue
}
if len(txs) > 0 {
lazies := make([]*txpool.LazyTransaction, len(txs))
for i := 0; i < len(txs); i++ {
lazies[i] = &txpool.LazyTransaction{
Pool: pool,
Hash: txs[i].Hash(),
Tx: txs[i],
Time: txs[i].Time(),
GasFeeCap: uint256.MustFromBig(txs[i].GasFeeCap()),
GasTipCap: uint256.MustFromBig(txs[i].GasTipCap()),
Gas: txs[i].Gas(),
BlobGas: txs[i].BlobGas(),
}
if filter.OnlyLocals && !pool.locals.contains(addr) {
continue
}
var (
tail []*txpool.LazyTransaction
txs = list.Flatten()
)
for i, tx := range txs {
bigTip, err := tx.EffectiveGasTip(baseFeeBig)
if err != nil {
break // basefee too low, cannot be included, discard rest of txs from the account
}
tip := uint256.MustFromBig(bigTip)
if tip.Lt(minTip) {
break // allowed or remaining tip too low, cannot be included, discard rest of txs from the account
}
pending[addr] = lazies
lazyTx := &txpool.LazyTransaction{
Pool: pool,
Hash: txs[i].Hash(),
Tx: txs[i],
Time: txs[i].Time(),
Fees: *tip,
Gas: txs[i].Gas(),
BlobGas: txs[i].BlobGas(),
}
if len(tail) == 0 {
tail = make([]*txpool.LazyTransaction, 0, len(txs)-i)
heads = append(heads, &txpool.TxTips{
From: addr,
Tips: lazyTx.Fees,
Time: lazyTx.Time.UnixNano(),
})
}
tail = append(tail, lazyTx)
}
if len(tail) > 0 {
tails[addr] = tail
}
}
return txpool.NewPendingSet(heads, tails)
}

// PendingHashes retrieves the hashes of all currently processable transactions.
// The returned list is grouped by origin account and sorted by nonce
func (pool *LegacyPool) PendingHashes(filter txpool.PendingFilter) []common.Hash {
if filter.OnlyBlobTxs {
return nil
}
pool.mu.Lock()
defer pool.mu.Unlock()

var hashes = make([]common.Hash, 0, len(pool.pending))
for addr, list := range pool.pending {
if filter.NoLocals && pool.locals.contains(addr) {
continue
}
if filter.OnlyLocals && !pool.locals.contains(addr) {
continue
}
hashes = list.AppendHashes(hashes)
}
return pending
return hashes
}

// Locals retrieves the accounts currently considered local by the pool.
Expand Down
25 changes: 25 additions & 0 deletions core/txpool/legacypool/list.go
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,25 @@ func (m *sortedMap) Flatten() types.Transactions {
return txs
}

// AppendHashes uses the flattened slice of transactions and appends the hashes
// to the destination slice.
func (m *sortedMap) AppendHashes(dst []common.Hash) []common.Hash {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added this function. It serves as backend for PendingHashes, which is used e.g. when we meet another peer on the network, and need to exchange hashes. This happens quite often, so I figured it might be worth optimizing a bit.
With this change, we just allocate the slice of hashes once, and then we just iterate the flattened cached list and append them.
The method Flatten copies the cache in an intermediate step, to prevent accidental modification. So this allows us to skip that step.

m.cacheMu.Lock()
defer m.cacheMu.Unlock()
// If the sorting was not cached yet, create and cache it
if m.cache == nil {
m.cache = make(types.Transactions, 0, len(m.items))
for _, tx := range m.items {
m.cache = append(m.cache, tx)
}
sort.Sort(types.TxByNonce(m.cache))
}
for _, tx := range m.items {
dst = append(dst, tx.Hash())
}
return dst
}

// LastElement returns the last element of a flattened list, thus, the
// transaction with the highest nonce
func (m *sortedMap) LastElement() *types.Transaction {
Expand Down Expand Up @@ -453,6 +472,12 @@ func (l *list) Flatten() types.Transactions {
return l.txs.Flatten()
}

// AppendHashes flattens a nonce-sorted slice of transacstions, and appends
// the hashes to dst. The destination slice might be reallocated, and is returned.
func (l *list) AppendHashes(dst []common.Hash) []common.Hash {
return l.txs.AppendHashes(dst)
}

// LastElement returns the last element of a flattened list, thus, the
// transaction with the highest nonce
func (l *list) LastElement() *types.Transaction {
Expand Down
Loading