Skip to content

Commit ba42dbf

Browse files
authored
Revert "eth/fetcher: throttle tx fetches to 128KB responses (ethereum#28304)"
This reverts commit 32f701e.
1 parent 7f8e001 commit ba42dbf

File tree

2 files changed

+37
-117
lines changed

2 files changed

+37
-117
lines changed

eth/fetcher/tx_fetcher.go

Lines changed: 32 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -39,22 +39,16 @@ const (
3939
// can announce in a short time.
4040
maxTxAnnounces = 4096
4141

42-
// maxTxRetrievals is the maximum number of transactions that can be fetched
43-
// in one request. The rationale for picking 256 is to have a reasonabe lower
44-
// bound for the transferred data (don't waste RTTs, transfer more meaningful
45-
// batch sizes), but also have an upper bound on the sequentiality to allow
46-
// using our entire peerset for deliveries.
47-
//
48-
// This number also acts as a failsafe against malicious announces which might
49-
// cause us to request more data than we'd expect.
42+
// maxTxRetrievals is the maximum transaction number can be fetched in one
43+
// request. The rationale to pick 256 is:
44+
// - In eth protocol, the softResponseLimit is 2MB. Nowadays according to
45+
// Etherscan the average transaction size is around 200B, so in theory
46+
// we can include lots of transaction in a single protocol packet.
47+
// - However the maximum size of a single transaction is raised to 128KB,
48+
// so pick a middle value here to ensure we can maximize the efficiency
49+
// of the retrieval and response size overflow won't happen in most cases.
5050
maxTxRetrievals = 256
5151

52-
// maxTxRetrievalSize is the max number of bytes that delivered transactions
53-
// should weigh according to the announcements. The 128KB was chosen to limit
54-
// retrieving a maximum of one blob transaction at a time to minimize hogging
55-
// a connection between two peers.
56-
maxTxRetrievalSize = 128 * 1024
57-
5852
// maxTxUnderpricedSetSize is the size of the underpriced transaction set that
5953
// is used to track recent transactions that have been dropped so we don't
6054
// re-request them.
@@ -865,36 +859,25 @@ func (f *TxFetcher) scheduleFetches(timer *mclock.Timer, timeout chan struct{},
865859
if len(f.announces[peer]) == 0 {
866860
return // continue in the for-each
867861
}
868-
var (
869-
hashes = make([]common.Hash, 0, maxTxRetrievals)
870-
bytes uint64
871-
)
872-
f.forEachAnnounce(f.announces[peer], func(hash common.Hash, meta *txMetadata) bool {
873-
// If the transaction is alcear fetching, skip to the next one
874-
if _, ok := f.fetching[hash]; ok {
875-
return true
876-
}
877-
// Mark the hash as fetching and stash away possible alternates
878-
f.fetching[hash] = peer
879-
880-
if _, ok := f.alternates[hash]; ok {
881-
panic(fmt.Sprintf("alternate tracker already contains fetching item: %v", f.alternates[hash]))
882-
}
883-
f.alternates[hash] = f.announced[hash]
884-
delete(f.announced, hash)
862+
hashes := make([]common.Hash, 0, maxTxRetrievals)
863+
f.forEachHash(f.announces[peer], func(hash common.Hash) bool {
864+
if _, ok := f.fetching[hash]; !ok {
865+
// Mark the hash as fetching and stash away possible alternates
866+
f.fetching[hash] = peer
867+
868+
if _, ok := f.alternates[hash]; ok {
869+
panic(fmt.Sprintf("alternate tracker already contains fetching item: %v", f.alternates[hash]))
870+
}
871+
f.alternates[hash] = f.announced[hash]
872+
delete(f.announced, hash)
885873

886-
// Accumulate the hash and stop if the limit was reached
887-
hashes = append(hashes, hash)
888-
if len(hashes) >= maxTxRetrievals {
889-
return false // break in the for-each
890-
}
891-
if meta != nil { // Only set eth/68 and upwards
892-
bytes += uint64(meta.size)
893-
if bytes >= maxTxRetrievalSize {
894-
return false
874+
// Accumulate the hash and stop if the limit was reached
875+
hashes = append(hashes, hash)
876+
if len(hashes) >= maxTxRetrievals {
877+
return false // break in the for-each
895878
}
896879
}
897-
return true // scheduled, try to add more
880+
return true // continue in the for-each
898881
})
899882
// If any hashes were allocated, request them from the peer
900883
if len(hashes) > 0 {
@@ -939,28 +922,27 @@ func (f *TxFetcher) forEachPeer(peers map[string]struct{}, do func(peer string))
939922
}
940923
}
941924

942-
// forEachAnnounce does a range loop over a map of announcements in production,
943-
// but during testing it does a deterministic sorted random to allow reproducing
944-
// issues.
945-
func (f *TxFetcher) forEachAnnounce(announces map[common.Hash]*txMetadata, do func(hash common.Hash, meta *txMetadata) bool) {
925+
// forEachHash does a range loop over a map of hashes in production, but during
926+
// testing it does a deterministic sorted random to allow reproducing issues.
927+
func (f *TxFetcher) forEachHash(hashes map[common.Hash]*txMetadata, do func(hash common.Hash) bool) {
946928
// If we're running production, use whatever Go's map gives us
947929
if f.rand == nil {
948-
for hash, meta := range announces {
949-
if !do(hash, meta) {
930+
for hash := range hashes {
931+
if !do(hash) {
950932
return
951933
}
952934
}
953935
return
954936
}
955937
// We're running the test suite, make iteration deterministic
956-
list := make([]common.Hash, 0, len(announces))
957-
for hash := range announces {
938+
list := make([]common.Hash, 0, len(hashes))
939+
for hash := range hashes {
958940
list = append(list, hash)
959941
}
960942
sortHashes(list)
961943
rotateHashes(list, f.rand.Intn(len(list)))
962944
for _, hash := range list {
963-
if !do(hash, announces[hash]) {
945+
if !do(hash) {
964946
return
965947
}
966948
}

eth/fetcher/tx_fetcher_test.go

Lines changed: 5 additions & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@ import (
2727
"github.com/ethereum/go-ethereum/common/mclock"
2828
"github.com/ethereum/go-ethereum/core/txpool"
2929
"github.com/ethereum/go-ethereum/core/types"
30-
"github.com/ethereum/go-ethereum/params"
3130
)
3231

3332
var (
@@ -994,14 +993,15 @@ func TestTransactionFetcherTimeoutTimerResets(t *testing.T) {
994993
})
995994
}
996995

997-
// Tests that if thousands of transactions are announced, only a small
996+
// Tests that if thousands of transactions are announces, only a small
998997
// number of them will be requested at a time.
999998
func TestTransactionFetcherRateLimiting(t *testing.T) {
1000-
// Create a slew of transactions and announce them
999+
// Create a slew of transactions and to announce them
10011000
var hashes []common.Hash
10021001
for i := 0; i < maxTxAnnounces; i++ {
10031002
hashes = append(hashes, common.Hash{byte(i / 256), byte(i % 256)})
10041003
}
1004+
10051005
testTransactionFetcherParallel(t, txFetcherTest{
10061006
init: func() *TxFetcher {
10071007
return NewTxFetcher(
@@ -1029,68 +1029,6 @@ func TestTransactionFetcherRateLimiting(t *testing.T) {
10291029
})
10301030
}
10311031

1032-
// Tests that if huge transactions are announced, only a small number of them will
1033-
// be requested at a time, to keep the responses below a resonable level.
1034-
func TestTransactionFetcherBandwidthLimiting(t *testing.T) {
1035-
testTransactionFetcherParallel(t, txFetcherTest{
1036-
init: func() *TxFetcher {
1037-
return NewTxFetcher(
1038-
func(common.Hash) bool { return false },
1039-
nil,
1040-
func(string, []common.Hash) error { return nil },
1041-
nil,
1042-
)
1043-
},
1044-
steps: []interface{}{
1045-
// Announce mid size transactions from A to verify that multiple
1046-
// ones can be piled into a single request.
1047-
doTxNotify{peer: "A",
1048-
hashes: []common.Hash{{0x01}, {0x02}, {0x03}, {0x04}},
1049-
types: []byte{types.LegacyTxType, types.LegacyTxType, types.LegacyTxType, types.LegacyTxType},
1050-
sizes: []uint32{48 * 1024, 48 * 1024, 48 * 1024, 48 * 1024},
1051-
},
1052-
// Announce exactly on the limit transactions to see that only one
1053-
// gets requested
1054-
doTxNotify{peer: "B",
1055-
hashes: []common.Hash{{0x05}, {0x06}},
1056-
types: []byte{types.LegacyTxType, types.LegacyTxType},
1057-
sizes: []uint32{maxTxRetrievalSize, maxTxRetrievalSize},
1058-
},
1059-
// Announce oversized blob transactions to see that overflows are ok
1060-
doTxNotify{peer: "C",
1061-
hashes: []common.Hash{{0x07}, {0x08}},
1062-
types: []byte{types.BlobTxType, types.BlobTxType},
1063-
sizes: []uint32{params.MaxBlobGasPerBlock, params.MaxBlobGasPerBlock},
1064-
},
1065-
doWait{time: txArriveTimeout, step: true},
1066-
isWaiting(nil),
1067-
isScheduledWithMeta{
1068-
tracking: map[string][]announce{
1069-
"A": {
1070-
{common.Hash{0x01}, typeptr(types.LegacyTxType), sizeptr(48 * 1024)},
1071-
{common.Hash{0x02}, typeptr(types.LegacyTxType), sizeptr(48 * 1024)},
1072-
{common.Hash{0x03}, typeptr(types.LegacyTxType), sizeptr(48 * 1024)},
1073-
{common.Hash{0x04}, typeptr(types.LegacyTxType), sizeptr(48 * 1024)},
1074-
},
1075-
"B": {
1076-
{common.Hash{0x05}, typeptr(types.LegacyTxType), sizeptr(maxTxRetrievalSize)},
1077-
{common.Hash{0x06}, typeptr(types.LegacyTxType), sizeptr(maxTxRetrievalSize)},
1078-
},
1079-
"C": {
1080-
{common.Hash{0x07}, typeptr(types.BlobTxType), sizeptr(params.MaxBlobGasPerBlock)},
1081-
{common.Hash{0x08}, typeptr(types.BlobTxType), sizeptr(params.MaxBlobGasPerBlock)},
1082-
},
1083-
},
1084-
fetching: map[string][]common.Hash{
1085-
"A": {{0x02}, {0x03}, {0x04}},
1086-
"B": {{0x06}},
1087-
"C": {{0x08}},
1088-
},
1089-
},
1090-
},
1091-
})
1092-
}
1093-
10941032
// Tests that then number of transactions a peer is allowed to announce and/or
10951033
// request at the same time is hard capped.
10961034
func TestTransactionFetcherDoSProtection(t *testing.T) {
@@ -1726,7 +1664,7 @@ func testTransactionFetcher(t *testing.T, tt txFetcherTest) {
17261664
if (meta == nil && (ann.kind != nil || ann.size != nil)) ||
17271665
(meta != nil && (ann.kind == nil || ann.size == nil)) ||
17281666
(meta != nil && (meta.kind != *ann.kind || meta.size != *ann.size)) {
1729-
t.Errorf("step %d, peer %s, hash %x: waitslot metadata mismatch: want %v, have %v/%v", i, peer, ann.hash, meta, *ann.kind, *ann.size)
1667+
t.Errorf("step %d, peer %s, hash %x: waitslot metadata mismatch: want %v, have %v/%v", i, peer, ann.hash, meta, ann.kind, ann.size)
17301668
}
17311669
}
17321670
}
@@ -1795,7 +1733,7 @@ func testTransactionFetcher(t *testing.T, tt txFetcherTest) {
17951733
if (meta == nil && (ann.kind != nil || ann.size != nil)) ||
17961734
(meta != nil && (ann.kind == nil || ann.size == nil)) ||
17971735
(meta != nil && (meta.kind != *ann.kind || meta.size != *ann.size)) {
1798-
t.Errorf("step %d, peer %s, hash %x: announce metadata mismatch: want %v, have %v/%v", i, peer, ann.hash, meta, *ann.kind, *ann.size)
1736+
t.Errorf("step %d, peer %s, hash %x: announce metadata mismatch: want %v, have %v/%v", i, peer, ann.hash, meta, ann.kind, ann.size)
17991737
}
18001738
}
18011739
}

0 commit comments

Comments
 (0)