Skip to content

Commit a4c34bd

Browse files
karalabetyler-smith
authored andcommitted
eth/fetcher: throttle tx fetches to 128KB responses (ethereum#28304)
* eth/fetcher: throttle tx fetches to 128KB responses * eth/fetcher: unindent a clause per review request
1 parent 4325efe commit a4c34bd

File tree

2 files changed

+117
-37
lines changed

2 files changed

+117
-37
lines changed

eth/fetcher/tx_fetcher.go

+50-32
Original file line numberDiff line numberDiff line change
@@ -41,16 +41,22 @@ const (
4141
// can announce in a short time.
4242
maxTxAnnounces = 4096
4343

44-
// maxTxRetrievals is the maximum transaction number can be fetched in one
45-
// request. The rationale to pick 256 is:
46-
// - In eth protocol, the softResponseLimit is 2MB. Nowadays according to
47-
// Etherscan the average transaction size is around 200B, so in theory
48-
// we can include lots of transaction in a single protocol packet.
49-
// - However the maximum size of a single transaction is raised to 128KB,
50-
// so pick a middle value here to ensure we can maximize the efficiency
51-
// of the retrieval and response size overflow won't happen in most cases.
44+
// maxTxRetrievals is the maximum number of transactions that can be fetched
45+
// in one request. The rationale for picking 256 is to have a reasonabe lower
46+
// bound for the transferred data (don't waste RTTs, transfer more meaningful
47+
// batch sizes), but also have an upper bound on the sequentiality to allow
48+
// using our entire peerset for deliveries.
49+
//
50+
// This number also acts as a failsafe against malicious announces which might
51+
// cause us to request more data than we'd expect.
5252
maxTxRetrievals = 256
5353

54+
// maxTxRetrievalSize is the max number of bytes that delivered transactions
55+
// should weigh according to the announcements. The 128KB was chosen to limit
56+
// retrieving a maximum of one blob transaction at a time to minimize hogging
57+
// a connection between two peers.
58+
maxTxRetrievalSize = 128 * 1024
59+
5460
// maxTxUnderpricedSetSize is the size of the underpriced transaction set that
5561
// is used to track recent transactions that have been dropped so we don't
5662
// re-request them.
@@ -866,25 +872,36 @@ func (f *TxFetcher) scheduleFetches(timer *mclock.Timer, timeout chan struct{},
866872
if len(f.announces[peer]) == 0 {
867873
return // continue in the for-each
868874
}
869-
hashes := make([]common.Hash, 0, maxTxRetrievals)
870-
f.forEachHash(f.announces[peer], func(hash common.Hash) bool {
871-
if _, ok := f.fetching[hash]; !ok {
872-
// Mark the hash as fetching and stash away possible alternates
873-
f.fetching[hash] = peer
874-
875-
if _, ok := f.alternates[hash]; ok {
876-
panic(fmt.Sprintf("alternate tracker already contains fetching item: %v", f.alternates[hash]))
877-
}
878-
f.alternates[hash] = f.announced[hash]
879-
delete(f.announced, hash)
875+
var (
876+
hashes = make([]common.Hash, 0, maxTxRetrievals)
877+
bytes uint64
878+
)
879+
f.forEachAnnounce(f.announces[peer], func(hash common.Hash, meta *txMetadata) bool {
880+
// If the transaction is alcear fetching, skip to the next one
881+
if _, ok := f.fetching[hash]; ok {
882+
return true
883+
}
884+
// Mark the hash as fetching and stash away possible alternates
885+
f.fetching[hash] = peer
880886

881-
// Accumulate the hash and stop if the limit was reached
882-
hashes = append(hashes, hash)
883-
if len(hashes) >= maxTxRetrievals {
884-
return false // break in the for-each
887+
if _, ok := f.alternates[hash]; ok {
888+
panic(fmt.Sprintf("alternate tracker already contains fetching item: %v", f.alternates[hash]))
889+
}
890+
f.alternates[hash] = f.announced[hash]
891+
delete(f.announced, hash)
892+
893+
// Accumulate the hash and stop if the limit was reached
894+
hashes = append(hashes, hash)
895+
if len(hashes) >= maxTxRetrievals {
896+
return false // break in the for-each
897+
}
898+
if meta != nil { // Only set eth/68 and upwards
899+
bytes += uint64(meta.size)
900+
if bytes >= maxTxRetrievalSize {
901+
return false
885902
}
886903
}
887-
return true // continue in the for-each
904+
return true // scheduled, try to add more
888905
})
889906
// If any hashes were allocated, request them from the peer
890907
if len(hashes) > 0 {
@@ -929,27 +946,28 @@ func (f *TxFetcher) forEachPeer(peers map[string]struct{}, do func(peer string))
929946
}
930947
}
931948

932-
// forEachHash does a range loop over a map of hashes in production, but during
933-
// testing it does a deterministic sorted random to allow reproducing issues.
934-
func (f *TxFetcher) forEachHash(hashes map[common.Hash]*txMetadata, do func(hash common.Hash) bool) {
949+
// forEachAnnounce does a range loop over a map of announcements in production,
950+
// but during testing it does a deterministic sorted random to allow reproducing
951+
// issues.
952+
func (f *TxFetcher) forEachAnnounce(announces map[common.Hash]*txMetadata, do func(hash common.Hash, meta *txMetadata) bool) {
935953
// If we're running production, use whatever Go's map gives us
936954
if f.rand == nil {
937-
for hash := range hashes {
938-
if !do(hash) {
955+
for hash, meta := range announces {
956+
if !do(hash, meta) {
939957
return
940958
}
941959
}
942960
return
943961
}
944962
// We're running the test suite, make iteration deterministic
945-
list := make([]common.Hash, 0, len(hashes))
946-
for hash := range hashes {
963+
list := make([]common.Hash, 0, len(announces))
964+
for hash := range announces {
947965
list = append(list, hash)
948966
}
949967
sortHashes(list)
950968
rotateHashes(list, f.rand.Intn(len(list)))
951969
for _, hash := range list {
952-
if !do(hash) {
970+
if !do(hash, announces[hash]) {
953971
return
954972
}
955973
}

eth/fetcher/tx_fetcher_test.go

+67-5
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import (
2727
"github.com/ethereum/go-ethereum/common/mclock"
2828
"github.com/ethereum/go-ethereum/core/txpool"
2929
"github.com/ethereum/go-ethereum/core/types"
30+
"github.com/ethereum/go-ethereum/params"
3031
)
3132

3233
var (
@@ -993,15 +994,14 @@ func TestTransactionFetcherTimeoutTimerResets(t *testing.T) {
993994
})
994995
}
995996

996-
// Tests that if thousands of transactions are announces, only a small
997+
// Tests that if thousands of transactions are announced, only a small
997998
// number of them will be requested at a time.
998999
func TestTransactionFetcherRateLimiting(t *testing.T) {
999-
// Create a slew of transactions and to announce them
1000+
// Create a slew of transactions and announce them
10001001
var hashes []common.Hash
10011002
for i := 0; i < maxTxAnnounces; i++ {
10021003
hashes = append(hashes, common.Hash{byte(i / 256), byte(i % 256)})
10031004
}
1004-
10051005
testTransactionFetcherParallel(t, txFetcherTest{
10061006
init: func() *TxFetcher {
10071007
return NewTxFetcher(
@@ -1029,6 +1029,68 @@ func TestTransactionFetcherRateLimiting(t *testing.T) {
10291029
})
10301030
}
10311031

1032+
// Tests that if huge transactions are announced, only a small number of them will
1033+
// be requested at a time, to keep the responses below a resonable level.
1034+
func TestTransactionFetcherBandwidthLimiting(t *testing.T) {
1035+
testTransactionFetcherParallel(t, txFetcherTest{
1036+
init: func() *TxFetcher {
1037+
return NewTxFetcher(
1038+
func(common.Hash) bool { return false },
1039+
nil,
1040+
func(string, []common.Hash) error { return nil },
1041+
nil,
1042+
)
1043+
},
1044+
steps: []interface{}{
1045+
// Announce mid size transactions from A to verify that multiple
1046+
// ones can be piled into a single request.
1047+
doTxNotify{peer: "A",
1048+
hashes: []common.Hash{{0x01}, {0x02}, {0x03}, {0x04}},
1049+
types: []byte{types.LegacyTxType, types.LegacyTxType, types.LegacyTxType, types.LegacyTxType},
1050+
sizes: []uint32{48 * 1024, 48 * 1024, 48 * 1024, 48 * 1024},
1051+
},
1052+
// Announce exactly on the limit transactions to see that only one
1053+
// gets requested
1054+
doTxNotify{peer: "B",
1055+
hashes: []common.Hash{{0x05}, {0x06}},
1056+
types: []byte{types.LegacyTxType, types.LegacyTxType},
1057+
sizes: []uint32{maxTxRetrievalSize, maxTxRetrievalSize},
1058+
},
1059+
// Announce oversized blob transactions to see that overflows are ok
1060+
doTxNotify{peer: "C",
1061+
hashes: []common.Hash{{0x07}, {0x08}},
1062+
types: []byte{types.BlobTxType, types.BlobTxType},
1063+
sizes: []uint32{params.MaxBlobGasPerBlock, params.MaxBlobGasPerBlock},
1064+
},
1065+
doWait{time: txArriveTimeout, step: true},
1066+
isWaiting(nil),
1067+
isScheduledWithMeta{
1068+
tracking: map[string][]announce{
1069+
"A": {
1070+
{common.Hash{0x01}, typeptr(types.LegacyTxType), sizeptr(48 * 1024)},
1071+
{common.Hash{0x02}, typeptr(types.LegacyTxType), sizeptr(48 * 1024)},
1072+
{common.Hash{0x03}, typeptr(types.LegacyTxType), sizeptr(48 * 1024)},
1073+
{common.Hash{0x04}, typeptr(types.LegacyTxType), sizeptr(48 * 1024)},
1074+
},
1075+
"B": {
1076+
{common.Hash{0x05}, typeptr(types.LegacyTxType), sizeptr(maxTxRetrievalSize)},
1077+
{common.Hash{0x06}, typeptr(types.LegacyTxType), sizeptr(maxTxRetrievalSize)},
1078+
},
1079+
"C": {
1080+
{common.Hash{0x07}, typeptr(types.BlobTxType), sizeptr(params.MaxBlobGasPerBlock)},
1081+
{common.Hash{0x08}, typeptr(types.BlobTxType), sizeptr(params.MaxBlobGasPerBlock)},
1082+
},
1083+
},
1084+
fetching: map[string][]common.Hash{
1085+
"A": {{0x02}, {0x03}, {0x04}},
1086+
"B": {{0x06}},
1087+
"C": {{0x08}},
1088+
},
1089+
},
1090+
},
1091+
})
1092+
}
1093+
10321094
// Tests that then number of transactions a peer is allowed to announce and/or
10331095
// request at the same time is hard capped.
10341096
func TestTransactionFetcherDoSProtection(t *testing.T) {
@@ -1664,7 +1726,7 @@ func testTransactionFetcher(t *testing.T, tt txFetcherTest) {
16641726
if (meta == nil && (ann.kind != nil || ann.size != nil)) ||
16651727
(meta != nil && (ann.kind == nil || ann.size == nil)) ||
16661728
(meta != nil && (meta.kind != *ann.kind || meta.size != *ann.size)) {
1667-
t.Errorf("step %d, peer %s, hash %x: waitslot metadata mismatch: want %v, have %v/%v", i, peer, ann.hash, meta, ann.kind, ann.size)
1729+
t.Errorf("step %d, peer %s, hash %x: waitslot metadata mismatch: want %v, have %v/%v", i, peer, ann.hash, meta, *ann.kind, *ann.size)
16681730
}
16691731
}
16701732
}
@@ -1733,7 +1795,7 @@ func testTransactionFetcher(t *testing.T, tt txFetcherTest) {
17331795
if (meta == nil && (ann.kind != nil || ann.size != nil)) ||
17341796
(meta != nil && (ann.kind == nil || ann.size == nil)) ||
17351797
(meta != nil && (meta.kind != *ann.kind || meta.size != *ann.size)) {
1736-
t.Errorf("step %d, peer %s, hash %x: announce metadata mismatch: want %v, have %v/%v", i, peer, ann.hash, meta, ann.kind, ann.size)
1798+
t.Errorf("step %d, peer %s, hash %x: announce metadata mismatch: want %v, have %v/%v", i, peer, ann.hash, meta, *ann.kind, *ann.size)
17371799
}
17381800
}
17391801
}

0 commit comments

Comments
 (0)