Skip to content

Commit f4852b8

Browse files
karalabeholiman
andauthored
core/txpool, eth, miner: retrieve plain and blob txs separately (#29026)
* core/txpool, eth, miner: retrieve plain and blob txs separately * core/txpool: fix typo, no farming * miner: farm all the typos Co-authored-by: Martin HS <[email protected]> --------- Co-authored-by: Martin HS <[email protected]>
1 parent ac0ff04 commit f4852b8

File tree

12 files changed

+125
-50
lines changed

12 files changed

+125
-50
lines changed

core/txpool/blobpool/blobpool.go

+12-7
Original file line numberDiff line numberDiff line change
@@ -1446,7 +1446,12 @@ func (p *BlobPool) drop() {
14461446
//
14471447
// The transactions can also be pre-filtered by the dynamic fee components to
14481448
// reduce allocations and load on downstream subsystems.
1449-
func (p *BlobPool) Pending(minTip *uint256.Int, baseFee *uint256.Int, blobFee *uint256.Int) map[common.Address][]*txpool.LazyTransaction {
1449+
func (p *BlobPool) Pending(filter txpool.PendingFilter) map[common.Address][]*txpool.LazyTransaction {
1450+
// If only plain transactions are requested, this pool is unsuitable as it
1451+
// contains none, don't even bother.
1452+
if filter.OnlyPlainTxs {
1453+
return nil
1454+
}
14501455
// Track the amount of time waiting to retrieve the list of pending blob txs
14511456
// from the pool and the amount of time actually spent on assembling the data.
14521457
// The latter will be pretty much moot, but we've kept it to have symmetric
@@ -1466,20 +1471,20 @@ func (p *BlobPool) Pending(minTip *uint256.Int, baseFee *uint256.Int, blobFee *u
14661471
lazies := make([]*txpool.LazyTransaction, 0, len(txs))
14671472
for _, tx := range txs {
14681473
// If transaction filtering was requested, discard badly priced ones
1469-
if minTip != nil && baseFee != nil {
1470-
if tx.execFeeCap.Lt(baseFee) {
1474+
if filter.MinTip != nil && filter.BaseFee != nil {
1475+
if tx.execFeeCap.Lt(filter.BaseFee) {
14711476
break // basefee too low, cannot be included, discard rest of txs from the account
14721477
}
1473-
tip := new(uint256.Int).Sub(tx.execFeeCap, baseFee)
1478+
tip := new(uint256.Int).Sub(tx.execFeeCap, filter.BaseFee)
14741479
if tip.Gt(tx.execTipCap) {
14751480
tip = tx.execTipCap
14761481
}
1477-
if tip.Lt(minTip) {
1482+
if tip.Lt(filter.MinTip) {
14781483
break // allowed or remaining tip too low, cannot be included, discard rest of txs from the account
14791484
}
14801485
}
1481-
if blobFee != nil {
1482-
if tx.blobFeeCap.Lt(blobFee) {
1486+
if filter.BlobFee != nil {
1487+
if tx.blobFeeCap.Lt(filter.BlobFee) {
14831488
break // blobfee too low, cannot be included, discard rest of txs from the account
14841489
}
14851490
}

core/txpool/blobpool/blobpool_test.go

+5-1
Original file line numberDiff line numberDiff line change
@@ -1340,7 +1340,11 @@ func benchmarkPoolPending(b *testing.B, datacap uint64) {
13401340
b.ReportAllocs()
13411341

13421342
for i := 0; i < b.N; i++ {
1343-
p := pool.Pending(uint256.NewInt(1), chain.basefee, chain.blobfee)
1343+
p := pool.Pending(txpool.PendingFilter{
1344+
MinTip: uint256.NewInt(1),
1345+
BaseFee: chain.basefee,
1346+
BlobFee: chain.blobfee,
1347+
})
13441348
if len(p) != int(capacity) {
13451349
b.Fatalf("have %d want %d", len(p), capacity)
13461350
}

core/txpool/legacypool/legacypool.go

+10-6
Original file line numberDiff line numberDiff line change
@@ -522,7 +522,12 @@ func (pool *LegacyPool) ContentFrom(addr common.Address) ([]*types.Transaction,
522522
//
523523
// The transactions can also be pre-filtered by the dynamic fee components to
524524
// reduce allocations and load on downstream subsystems.
525-
func (pool *LegacyPool) Pending(minTip *uint256.Int, baseFee *uint256.Int, blobFee *uint256.Int) map[common.Address][]*txpool.LazyTransaction {
525+
func (pool *LegacyPool) Pending(filter txpool.PendingFilter) map[common.Address][]*txpool.LazyTransaction {
526+
// If only blob transactions are requested, this pool is unsuitable as it
527+
// contains none, don't even bother.
528+
if filter.OnlyBlobTxs {
529+
return nil
530+
}
526531
pool.mu.Lock()
527532
defer pool.mu.Unlock()
528533

@@ -531,13 +536,12 @@ func (pool *LegacyPool) Pending(minTip *uint256.Int, baseFee *uint256.Int, blobF
531536
minTipBig *big.Int
532537
baseFeeBig *big.Int
533538
)
534-
if minTip != nil {
535-
minTipBig = minTip.ToBig()
539+
if filter.MinTip != nil {
540+
minTipBig = filter.MinTip.ToBig()
536541
}
537-
if baseFee != nil {
538-
baseFeeBig = baseFee.ToBig()
542+
if filter.BaseFee != nil {
543+
baseFeeBig = filter.BaseFee.ToBig()
539544
}
540-
541545
pending := make(map[common.Address][]*txpool.LazyTransaction, len(pool.pending))
542546
for addr, list := range pool.pending {
543547
txs := list.Flatten()

core/txpool/subpool.go

+16-1
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,21 @@ type LazyResolver interface {
7070
// may request (and relinquish) exclusive access to certain addresses.
7171
type AddressReserver func(addr common.Address, reserve bool) error
7272

73+
// PendingFilter is a collection of filter rules to allow retrieving a subset
74+
// of transactions for announcement or mining.
75+
//
76+
// Note, the entries here are not arbitrary useful filters, rather each one has
77+
// a very specific call site in mind and each one can be evaluated very cheaply
78+
// by the pool implementations. Only add new ones that satisfy those constraints.
79+
type PendingFilter struct {
80+
MinTip *uint256.Int // Minimum miner tip required to include a transaction
81+
BaseFee *uint256.Int // Minimum 1559 basefee needed to include a transaction
82+
BlobFee *uint256.Int // Minimum 4844 blobfee needed to include a blob transaction
83+
84+
OnlyPlainTxs bool // Return only plain EVM transactions (peer-join announces, block space filling)
85+
OnlyBlobTxs bool // Return only blob transactions (block blob-space filling)
86+
}
87+
7388
// SubPool represents a specialized transaction pool that lives on its own (e.g.
7489
// blob pool). Since independent of how many specialized pools we have, they do
7590
// need to be updated in lockstep and assemble into one coherent view for block
@@ -118,7 +133,7 @@ type SubPool interface {
118133
//
119134
// The transactions can also be pre-filtered by the dynamic fee components to
120135
// reduce allocations and load on downstream subsystems.
121-
Pending(minTip *uint256.Int, baseFee *uint256.Int, blobFee *uint256.Int) map[common.Address][]*LazyTransaction
136+
Pending(filter PendingFilter) map[common.Address][]*LazyTransaction
122137

123138
// SubscribeTransactions subscribes to new transaction events. The subscriber
124139
// can decide whether to receive notifications only for newly seen transactions

core/txpool/txpool.go

+2-3
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@ import (
2828
"github.com/ethereum/go-ethereum/event"
2929
"github.com/ethereum/go-ethereum/log"
3030
"github.com/ethereum/go-ethereum/metrics"
31-
"github.com/holiman/uint256"
3231
)
3332

3433
// TxStatus is the current status of a transaction as seen by the pool.
@@ -357,10 +356,10 @@ func (p *TxPool) Add(txs []*types.Transaction, local bool, sync bool) []error {
357356
//
358357
// The transactions can also be pre-filtered by the dynamic fee components to
359358
// reduce allocations and load on downstream subsystems.
360-
func (p *TxPool) Pending(minTip *uint256.Int, baseFee *uint256.Int, blobFee *uint256.Int) map[common.Address][]*LazyTransaction {
359+
func (p *TxPool) Pending(filter PendingFilter) map[common.Address][]*LazyTransaction {
361360
txs := make(map[common.Address][]*LazyTransaction)
362361
for _, subpool := range p.subpools {
363-
for addr, set := range subpool.Pending(minTip, baseFee, blobFee) {
362+
for addr, set := range subpool.Pending(filter) {
364363
txs[addr] = set
365364
}
366365
}

eth/api_backend.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -292,7 +292,7 @@ func (b *EthAPIBackend) SendTx(ctx context.Context, signedTx *types.Transaction)
292292
}
293293

294294
func (b *EthAPIBackend) GetPoolTransactions() (types.Transactions, error) {
295-
pending := b.eth.txPool.Pending(nil, nil, nil)
295+
pending := b.eth.txPool.Pending(txpool.PendingFilter{})
296296
var txs types.Transactions
297297
for _, batch := range pending {
298298
for _, lazy := range batch {

eth/catalyst/simulated_beacon.go

+3-2
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import (
2525

2626
"github.com/ethereum/go-ethereum/beacon/engine"
2727
"github.com/ethereum/go-ethereum/common"
28+
"github.com/ethereum/go-ethereum/core/txpool"
2829
"github.com/ethereum/go-ethereum/core/types"
2930
"github.com/ethereum/go-ethereum/eth"
3031
"github.com/ethereum/go-ethereum/log"
@@ -263,7 +264,7 @@ func (c *SimulatedBeacon) Rollback() {
263264

264265
// Fork sets the head to the provided hash.
265266
func (c *SimulatedBeacon) Fork(parentHash common.Hash) error {
266-
if len(c.eth.TxPool().Pending(nil, nil, nil)) != 0 {
267+
if len(c.eth.TxPool().Pending(txpool.PendingFilter{})) != 0 {
267268
return errors.New("pending block dirty")
268269
}
269270
parent := c.eth.BlockChain().GetBlockByHash(parentHash)
@@ -275,7 +276,7 @@ func (c *SimulatedBeacon) Fork(parentHash common.Hash) error {
275276

276277
// AdjustTime creates a new block with an adjusted timestamp.
277278
func (c *SimulatedBeacon) AdjustTime(adjustment time.Duration) error {
278-
if len(c.eth.TxPool().Pending(nil, nil, nil)) != 0 {
279+
if len(c.eth.TxPool().Pending(txpool.PendingFilter{})) != 0 {
279280
return errors.New("could not adjust time on non-empty block")
280281
}
281282
parent := c.eth.BlockChain().CurrentBlock()

eth/handler.go

+1-2
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,6 @@ import (
4242
"github.com/ethereum/go-ethereum/metrics"
4343
"github.com/ethereum/go-ethereum/p2p"
4444
"github.com/ethereum/go-ethereum/triedb/pathdb"
45-
"github.com/holiman/uint256"
4645
)
4746

4847
const (
@@ -74,7 +73,7 @@ type txPool interface {
7473

7574
// Pending should return pending transactions.
7675
// The slice should be modifiable by the caller.
77-
Pending(minTip *uint256.Int, baseFee *uint256.Int, blobFee *uint256.Int) map[common.Address][]*txpool.LazyTransaction
76+
Pending(filter txpool.PendingFilter) map[common.Address][]*txpool.LazyTransaction
7877

7978
// SubscribeTransactions subscribes to new transaction events. The subscriber
8079
// can decide whether to receive notifications only for newly seen transactions

eth/handler_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,7 @@ func (p *testTxPool) Add(txs []*types.Transaction, local bool, sync bool) []erro
9393
}
9494

9595
// Pending returns all the transactions known to the pool
96-
func (p *testTxPool) Pending(minTip *uint256.Int, baseFee *uint256.Int, blobFee *uint256.Int) map[common.Address][]*txpool.LazyTransaction {
96+
func (p *testTxPool) Pending(filter txpool.PendingFilter) map[common.Address][]*txpool.LazyTransaction {
9797
p.lock.RLock()
9898
defer p.lock.RUnlock()
9999

eth/sync.go

+2-1
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import (
2323

2424
"github.com/ethereum/go-ethereum/common"
2525
"github.com/ethereum/go-ethereum/core/rawdb"
26+
"github.com/ethereum/go-ethereum/core/txpool"
2627
"github.com/ethereum/go-ethereum/eth/downloader"
2728
"github.com/ethereum/go-ethereum/eth/protocols/eth"
2829
"github.com/ethereum/go-ethereum/log"
@@ -36,7 +37,7 @@ const (
3637
// syncTransactions starts sending all currently pending transactions to the given peer.
3738
func (h *handler) syncTransactions(p *eth.Peer) {
3839
var hashes []common.Hash
39-
for _, batch := range h.txpool.Pending(nil, nil, nil) {
40+
for _, batch := range h.txpool.Pending(txpool.PendingFilter{OnlyPlainTxs: true}) {
4041
for _, tx := range batch {
4142
hashes = append(hashes, tx.Hash)
4243
}

miner/ordering.go

+11
Original file line numberDiff line numberDiff line change
@@ -153,3 +153,14 @@ func (t *transactionsByPriceAndNonce) Shift() {
153153
func (t *transactionsByPriceAndNonce) Pop() {
154154
heap.Pop(&t.heads)
155155
}
156+
157+
// Empty returns if the price heap is empty. It can be used to check it simpler
158+
// than calling peek and checking for nil return.
159+
func (t *transactionsByPriceAndNonce) Empty() bool {
160+
return len(t.heads) == 0
161+
}
162+
163+
// Clear removes the entire content of the heap.
164+
func (t *transactionsByPriceAndNonce) Clear() {
165+
t.heads, t.txs = nil, nil
166+
}

miner/worker.go

+61-25
Original file line numberDiff line numberDiff line change
@@ -562,9 +562,11 @@ func (w *worker) mainLoop() {
562562
BlobGas: tx.BlobGas(),
563563
})
564564
}
565-
txset := newTransactionsByPriceAndNonce(w.current.signer, txs, w.current.header.BaseFee)
565+
plainTxs := newTransactionsByPriceAndNonce(w.current.signer, txs, w.current.header.BaseFee) // Mixed bag of everrything, yolo
566+
blobTxs := newTransactionsByPriceAndNonce(w.current.signer, nil, w.current.header.BaseFee) // Empty bag, don't bother optimising
567+
566568
tcount := w.current.tcount
567-
w.commitTransactions(w.current, txset, nil, new(uint256.Int))
569+
w.commitTransactions(w.current, plainTxs, blobTxs, nil)
568570

569571
// Only update the snapshot if any new transactions were added
570572
// to the pending block
@@ -802,7 +804,7 @@ func (w *worker) applyTransaction(env *environment, tx *types.Transaction) (*typ
802804
return receipt, err
803805
}
804806

805-
func (w *worker) commitTransactions(env *environment, txs *transactionsByPriceAndNonce, interrupt *atomic.Int32, minTip *uint256.Int) error {
807+
func (w *worker) commitTransactions(env *environment, plainTxs, blobTxs *transactionsByPriceAndNonce, interrupt *atomic.Int32) error {
806808
gasLimit := env.header.GasLimit
807809
if env.gasPool == nil {
808810
env.gasPool = new(core.GasPool).AddGas(gasLimit)
@@ -821,8 +823,33 @@ func (w *worker) commitTransactions(env *environment, txs *transactionsByPriceAn
821823
log.Trace("Not enough gas for further transactions", "have", env.gasPool, "want", params.TxGas)
822824
break
823825
}
826+
// If we don't have enough blob space for any further blob transactions,
827+
// skip that list altogether
828+
if !blobTxs.Empty() && env.blobs*params.BlobTxBlobGasPerBlob >= params.MaxBlobGasPerBlock {
829+
log.Trace("Not enough blob space for further blob transactions")
830+
blobTxs.Clear()
831+
// Fall though to pick up any plain txs
832+
}
824833
// Retrieve the next transaction and abort if all done.
825-
ltx, tip := txs.Peek()
834+
var (
835+
ltx *txpool.LazyTransaction
836+
txs *transactionsByPriceAndNonce
837+
)
838+
pltx, ptip := plainTxs.Peek()
839+
bltx, btip := blobTxs.Peek()
840+
841+
switch {
842+
case pltx == nil:
843+
txs, ltx = blobTxs, bltx
844+
case bltx == nil:
845+
txs, ltx = plainTxs, pltx
846+
default:
847+
if ptip.Lt(btip) {
848+
txs, ltx = blobTxs, bltx
849+
} else {
850+
txs, ltx = plainTxs, pltx
851+
}
852+
}
826853
if ltx == nil {
827854
break
828855
}
@@ -837,11 +864,6 @@ func (w *worker) commitTransactions(env *environment, txs *transactionsByPriceAn
837864
txs.Pop()
838865
continue
839866
}
840-
// If we don't receive enough tip for the next transaction, skip the account
841-
if tip.Cmp(minTip) < 0 {
842-
log.Trace("Not enough tip for transaction", "hash", ltx.Hash, "tip", tip, "needed", minTip)
843-
break // If the next-best is too low, surely no better will be available
844-
}
845867
// Transaction seems to fit, pull it up from the pool
846868
tx := ltx.Resolve()
847869
if tx == nil {
@@ -1005,35 +1027,49 @@ func (w *worker) fillTransactions(interrupt *atomic.Int32, env *environment) err
10051027
w.mu.RUnlock()
10061028

10071029
// Retrieve the pending transactions pre-filtered by the 1559/4844 dynamic fees
1008-
var baseFee *uint256.Int
1030+
filter := txpool.PendingFilter{
1031+
MinTip: tip,
1032+
}
10091033
if env.header.BaseFee != nil {
1010-
baseFee = uint256.MustFromBig(env.header.BaseFee)
1034+
filter.BaseFee = uint256.MustFromBig(env.header.BaseFee)
10111035
}
1012-
var blobFee *uint256.Int
10131036
if env.header.ExcessBlobGas != nil {
1014-
blobFee = uint256.MustFromBig(eip4844.CalcBlobFee(*env.header.ExcessBlobGas))
1037+
filter.BlobFee = uint256.MustFromBig(eip4844.CalcBlobFee(*env.header.ExcessBlobGas))
10151038
}
1016-
pending := w.eth.TxPool().Pending(tip, baseFee, blobFee)
1039+
filter.OnlyPlainTxs, filter.OnlyBlobTxs = true, false
1040+
pendingPlainTxs := w.eth.TxPool().Pending(filter)
1041+
1042+
filter.OnlyPlainTxs, filter.OnlyBlobTxs = false, true
1043+
pendingBlobTxs := w.eth.TxPool().Pending(filter)
10171044

10181045
// Split the pending transactions into locals and remotes.
1019-
localTxs, remoteTxs := make(map[common.Address][]*txpool.LazyTransaction), pending
1046+
localPlainTxs, remotePlainTxs := make(map[common.Address][]*txpool.LazyTransaction), pendingPlainTxs
1047+
localBlobTxs, remoteBlobTxs := make(map[common.Address][]*txpool.LazyTransaction), pendingBlobTxs
1048+
10201049
for _, account := range w.eth.TxPool().Locals() {
1021-
if txs := remoteTxs[account]; len(txs) > 0 {
1022-
delete(remoteTxs, account)
1023-
localTxs[account] = txs
1050+
if txs := remotePlainTxs[account]; len(txs) > 0 {
1051+
delete(remotePlainTxs, account)
1052+
localPlainTxs[account] = txs
1053+
}
1054+
if txs := remoteBlobTxs[account]; len(txs) > 0 {
1055+
delete(remoteBlobTxs, account)
1056+
localBlobTxs[account] = txs
10241057
}
10251058
}
1026-
10271059
// Fill the block with all available pending transactions.
1028-
if len(localTxs) > 0 {
1029-
txs := newTransactionsByPriceAndNonce(env.signer, localTxs, env.header.BaseFee)
1030-
if err := w.commitTransactions(env, txs, interrupt, new(uint256.Int)); err != nil {
1060+
if len(localPlainTxs) > 0 || len(localBlobTxs) > 0 {
1061+
plainTxs := newTransactionsByPriceAndNonce(env.signer, localPlainTxs, env.header.BaseFee)
1062+
blobTxs := newTransactionsByPriceAndNonce(env.signer, localBlobTxs, env.header.BaseFee)
1063+
1064+
if err := w.commitTransactions(env, plainTxs, blobTxs, interrupt); err != nil {
10311065
return err
10321066
}
10331067
}
1034-
if len(remoteTxs) > 0 {
1035-
txs := newTransactionsByPriceAndNonce(env.signer, remoteTxs, env.header.BaseFee)
1036-
if err := w.commitTransactions(env, txs, interrupt, tip); err != nil {
1068+
if len(remotePlainTxs) > 0 || len(remoteBlobTxs) > 0 {
1069+
plainTxs := newTransactionsByPriceAndNonce(env.signer, remotePlainTxs, env.header.BaseFee)
1070+
blobTxs := newTransactionsByPriceAndNonce(env.signer, remoteBlobTxs, env.header.BaseFee)
1071+
1072+
if err := w.commitTransactions(env, plainTxs, blobTxs, interrupt); err != nil {
10371073
return err
10381074
}
10391075
}

0 commit comments

Comments
 (0)