Skip to content

Commit a6deb2d

Browse files
authored
eth/fetcher: throttle tx fetches to 128KB responses (#28304)
* eth/fetcher: throttle tx fetches to 128KB responses * eth/fetcher: unindent a clause per review request
1 parent 7776a32 commit a6deb2d

File tree

2 files changed

+117
-37
lines changed

2 files changed

+117
-37
lines changed

Diff for: eth/fetcher/tx_fetcher.go

+50-32
Original file line numberDiff line numberDiff line change
@@ -39,16 +39,22 @@ const (
3939
// can announce in a short time.
4040
maxTxAnnounces = 4096
4141

42-
// maxTxRetrievals is the maximum transaction number can be fetched in one
43-
// request. The rationale to pick 256 is:
44-
// - In eth protocol, the softResponseLimit is 2MB. Nowadays according to
45-
// Etherscan the average transaction size is around 200B, so in theory
46-
// we can include lots of transaction in a single protocol packet.
47-
// - However the maximum size of a single transaction is raised to 128KB,
48-
// so pick a middle value here to ensure we can maximize the efficiency
49-
// of the retrieval and response size overflow won't happen in most cases.
42+
// maxTxRetrievals is the maximum number of transactions that can be fetched
43+
// in one request. The rationale for picking 256 is to have a reasonabe lower
44+
// bound for the transferred data (don't waste RTTs, transfer more meaningful
45+
// batch sizes), but also have an upper bound on the sequentiality to allow
46+
// using our entire peerset for deliveries.
47+
//
48+
// This number also acts as a failsafe against malicious announces which might
49+
// cause us to request more data than we'd expect.
5050
maxTxRetrievals = 256
5151

52+
// maxTxRetrievalSize is the max number of bytes that delivered transactions
53+
// should weigh according to the announcements. The 128KB was chosen to limit
54+
// retrieving a maximum of one blob transaction at a time to minimize hogging
55+
// a connection between two peers.
56+
maxTxRetrievalSize = 128 * 1024
57+
5258
// maxTxUnderpricedSetSize is the size of the underpriced transaction set that
5359
// is used to track recent transactions that have been dropped so we don't
5460
// re-request them.
@@ -859,25 +865,36 @@ func (f *TxFetcher) scheduleFetches(timer *mclock.Timer, timeout chan struct{},
859865
if len(f.announces[peer]) == 0 {
860866
return // continue in the for-each
861867
}
862-
hashes := make([]common.Hash, 0, maxTxRetrievals)
863-
f.forEachHash(f.announces[peer], func(hash common.Hash) bool {
864-
if _, ok := f.fetching[hash]; !ok {
865-
// Mark the hash as fetching and stash away possible alternates
866-
f.fetching[hash] = peer
867-
868-
if _, ok := f.alternates[hash]; ok {
869-
panic(fmt.Sprintf("alternate tracker already contains fetching item: %v", f.alternates[hash]))
870-
}
871-
f.alternates[hash] = f.announced[hash]
872-
delete(f.announced, hash)
868+
var (
869+
hashes = make([]common.Hash, 0, maxTxRetrievals)
870+
bytes uint64
871+
)
872+
f.forEachAnnounce(f.announces[peer], func(hash common.Hash, meta *txMetadata) bool {
873+
// If the transaction is alcear fetching, skip to the next one
874+
if _, ok := f.fetching[hash]; ok {
875+
return true
876+
}
877+
// Mark the hash as fetching and stash away possible alternates
878+
f.fetching[hash] = peer
873879

874-
// Accumulate the hash and stop if the limit was reached
875-
hashes = append(hashes, hash)
876-
if len(hashes) >= maxTxRetrievals {
877-
return false // break in the for-each
880+
if _, ok := f.alternates[hash]; ok {
881+
panic(fmt.Sprintf("alternate tracker already contains fetching item: %v", f.alternates[hash]))
882+
}
883+
f.alternates[hash] = f.announced[hash]
884+
delete(f.announced, hash)
885+
886+
// Accumulate the hash and stop if the limit was reached
887+
hashes = append(hashes, hash)
888+
if len(hashes) >= maxTxRetrievals {
889+
return false // break in the for-each
890+
}
891+
if meta != nil { // Only set eth/68 and upwards
892+
bytes += uint64(meta.size)
893+
if bytes >= maxTxRetrievalSize {
894+
return false
878895
}
879896
}
880-
return true // continue in the for-each
897+
return true // scheduled, try to add more
881898
})
882899
// If any hashes were allocated, request them from the peer
883900
if len(hashes) > 0 {
@@ -922,27 +939,28 @@ func (f *TxFetcher) forEachPeer(peers map[string]struct{}, do func(peer string))
922939
}
923940
}
924941

925-
// forEachHash does a range loop over a map of hashes in production, but during
926-
// testing it does a deterministic sorted random to allow reproducing issues.
927-
func (f *TxFetcher) forEachHash(hashes map[common.Hash]*txMetadata, do func(hash common.Hash) bool) {
942+
// forEachAnnounce does a range loop over a map of announcements in production,
943+
// but during testing it does a deterministic sorted random to allow reproducing
944+
// issues.
945+
func (f *TxFetcher) forEachAnnounce(announces map[common.Hash]*txMetadata, do func(hash common.Hash, meta *txMetadata) bool) {
928946
// If we're running production, use whatever Go's map gives us
929947
if f.rand == nil {
930-
for hash := range hashes {
931-
if !do(hash) {
948+
for hash, meta := range announces {
949+
if !do(hash, meta) {
932950
return
933951
}
934952
}
935953
return
936954
}
937955
// We're running the test suite, make iteration deterministic
938-
list := make([]common.Hash, 0, len(hashes))
939-
for hash := range hashes {
956+
list := make([]common.Hash, 0, len(announces))
957+
for hash := range announces {
940958
list = append(list, hash)
941959
}
942960
sortHashes(list)
943961
rotateHashes(list, f.rand.Intn(len(list)))
944962
for _, hash := range list {
945-
if !do(hash) {
963+
if !do(hash, announces[hash]) {
946964
return
947965
}
948966
}

Diff for: eth/fetcher/tx_fetcher_test.go

+67-5
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import (
2727
"github.com/ethereum/go-ethereum/common/mclock"
2828
"github.com/ethereum/go-ethereum/core/txpool"
2929
"github.com/ethereum/go-ethereum/core/types"
30+
"github.com/ethereum/go-ethereum/params"
3031
)
3132

3233
var (
@@ -993,15 +994,14 @@ func TestTransactionFetcherTimeoutTimerResets(t *testing.T) {
993994
})
994995
}
995996

996-
// Tests that if thousands of transactions are announces, only a small
997+
// Tests that if thousands of transactions are announced, only a small
997998
// number of them will be requested at a time.
998999
func TestTransactionFetcherRateLimiting(t *testing.T) {
999-
// Create a slew of transactions and to announce them
1000+
// Create a slew of transactions and announce them
10001001
var hashes []common.Hash
10011002
for i := 0; i < maxTxAnnounces; i++ {
10021003
hashes = append(hashes, common.Hash{byte(i / 256), byte(i % 256)})
10031004
}
1004-
10051005
testTransactionFetcherParallel(t, txFetcherTest{
10061006
init: func() *TxFetcher {
10071007
return NewTxFetcher(
@@ -1029,6 +1029,68 @@ func TestTransactionFetcherRateLimiting(t *testing.T) {
10291029
})
10301030
}
10311031

1032+
// Tests that if huge transactions are announced, only a small number of them will
1033+
// be requested at a time, to keep the responses below a resonable level.
1034+
func TestTransactionFetcherBandwidthLimiting(t *testing.T) {
1035+
testTransactionFetcherParallel(t, txFetcherTest{
1036+
init: func() *TxFetcher {
1037+
return NewTxFetcher(
1038+
func(common.Hash) bool { return false },
1039+
nil,
1040+
func(string, []common.Hash) error { return nil },
1041+
nil,
1042+
)
1043+
},
1044+
steps: []interface{}{
1045+
// Announce mid size transactions from A to verify that multiple
1046+
// ones can be piled into a single request.
1047+
doTxNotify{peer: "A",
1048+
hashes: []common.Hash{{0x01}, {0x02}, {0x03}, {0x04}},
1049+
types: []byte{types.LegacyTxType, types.LegacyTxType, types.LegacyTxType, types.LegacyTxType},
1050+
sizes: []uint32{48 * 1024, 48 * 1024, 48 * 1024, 48 * 1024},
1051+
},
1052+
// Announce exactly on the limit transactions to see that only one
1053+
// gets requested
1054+
doTxNotify{peer: "B",
1055+
hashes: []common.Hash{{0x05}, {0x06}},
1056+
types: []byte{types.LegacyTxType, types.LegacyTxType},
1057+
sizes: []uint32{maxTxRetrievalSize, maxTxRetrievalSize},
1058+
},
1059+
// Announce oversized blob transactions to see that overflows are ok
1060+
doTxNotify{peer: "C",
1061+
hashes: []common.Hash{{0x07}, {0x08}},
1062+
types: []byte{types.BlobTxType, types.BlobTxType},
1063+
sizes: []uint32{params.MaxBlobGasPerBlock, params.MaxBlobGasPerBlock},
1064+
},
1065+
doWait{time: txArriveTimeout, step: true},
1066+
isWaiting(nil),
1067+
isScheduledWithMeta{
1068+
tracking: map[string][]announce{
1069+
"A": {
1070+
{common.Hash{0x01}, typeptr(types.LegacyTxType), sizeptr(48 * 1024)},
1071+
{common.Hash{0x02}, typeptr(types.LegacyTxType), sizeptr(48 * 1024)},
1072+
{common.Hash{0x03}, typeptr(types.LegacyTxType), sizeptr(48 * 1024)},
1073+
{common.Hash{0x04}, typeptr(types.LegacyTxType), sizeptr(48 * 1024)},
1074+
},
1075+
"B": {
1076+
{common.Hash{0x05}, typeptr(types.LegacyTxType), sizeptr(maxTxRetrievalSize)},
1077+
{common.Hash{0x06}, typeptr(types.LegacyTxType), sizeptr(maxTxRetrievalSize)},
1078+
},
1079+
"C": {
1080+
{common.Hash{0x07}, typeptr(types.BlobTxType), sizeptr(params.MaxBlobGasPerBlock)},
1081+
{common.Hash{0x08}, typeptr(types.BlobTxType), sizeptr(params.MaxBlobGasPerBlock)},
1082+
},
1083+
},
1084+
fetching: map[string][]common.Hash{
1085+
"A": {{0x02}, {0x03}, {0x04}},
1086+
"B": {{0x06}},
1087+
"C": {{0x08}},
1088+
},
1089+
},
1090+
},
1091+
})
1092+
}
1093+
10321094
// Tests that then number of transactions a peer is allowed to announce and/or
10331095
// request at the same time is hard capped.
10341096
func TestTransactionFetcherDoSProtection(t *testing.T) {
@@ -1664,7 +1726,7 @@ func testTransactionFetcher(t *testing.T, tt txFetcherTest) {
16641726
if (meta == nil && (ann.kind != nil || ann.size != nil)) ||
16651727
(meta != nil && (ann.kind == nil || ann.size == nil)) ||
16661728
(meta != nil && (meta.kind != *ann.kind || meta.size != *ann.size)) {
1667-
t.Errorf("step %d, peer %s, hash %x: waitslot metadata mismatch: want %v, have %v/%v", i, peer, ann.hash, meta, ann.kind, ann.size)
1729+
t.Errorf("step %d, peer %s, hash %x: waitslot metadata mismatch: want %v, have %v/%v", i, peer, ann.hash, meta, *ann.kind, *ann.size)
16681730
}
16691731
}
16701732
}
@@ -1733,7 +1795,7 @@ func testTransactionFetcher(t *testing.T, tt txFetcherTest) {
17331795
if (meta == nil && (ann.kind != nil || ann.size != nil)) ||
17341796
(meta != nil && (ann.kind == nil || ann.size == nil)) ||
17351797
(meta != nil && (meta.kind != *ann.kind || meta.size != *ann.size)) {
1736-
t.Errorf("step %d, peer %s, hash %x: announce metadata mismatch: want %v, have %v/%v", i, peer, ann.hash, meta, ann.kind, ann.size)
1798+
t.Errorf("step %d, peer %s, hash %x: announce metadata mismatch: want %v, have %v/%v", i, peer, ann.hash, meta, *ann.kind, *ann.size)
17371799
}
17381800
}
17391801
}

0 commit comments

Comments
 (0)