Skip to content

Commit 18181bc

Browse files
karalabejonastheis
authored andcommitted
eth: enforce announcement metadatas and drop peers violating the protocol (ethereum#28261)
* eth: enforce announcement metadatas and drop peers violating the protocol * eth/fetcher: relax eth/68 validation a bit for flakey clients * tests/fuzzers/txfetcher: pull in suggestion from Marius * eth/fetcher: add tests for peer dropping * eth/fetcher: linter linter linter linter linter Conflicts: eth/fetcher/tx_fetcher.go eth/handler.go eth/handler_eth.go
1 parent f633a27 commit 18181bc

File tree

5 files changed

+542
-78
lines changed

5 files changed

+542
-78
lines changed

eth/fetcher/tx_fetcher.go

Lines changed: 105 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// Copyright 2020 The go-ethereum Authors
1+
// Copyright 2019 The go-ethereum Authors
22
// This file is part of the go-ethereum library.
33
//
44
// The go-ethereum library is free software: you can redistribute it and/or modify
@@ -20,6 +20,7 @@ import (
2020
"bytes"
2121
"errors"
2222
"fmt"
23+
"math"
2324
mrand "math/rand"
2425
"sort"
2526
"time"
@@ -103,6 +104,14 @@ var (
103104
type txAnnounce struct {
104105
origin string // Identifier of the peer originating the notification
105106
hashes []common.Hash // Batch of transaction hashes being announced
107+
metas []*txMetadata // Batch of metadatas associated with the hashes (nil before eth/68)
108+
}
109+
110+
// txMetadata is a set of extra data transmitted along the announcement for better
111+
// fetch scheduling.
112+
type txMetadata struct {
113+
kind byte // Transaction consensus type
114+
size uint32 // Transaction size in bytes
106115
}
107116

108117
// txRequest represents an in-flight transaction retrieval request destined to
@@ -118,10 +127,11 @@ type txRequest struct {
118127
type txDelivery struct {
119128
origin string // Identifier of the peer originating the notification
120129
hashes []common.Hash // Batch of transaction hashes having been delivered
130+
metas []txMetadata // Batch of metadatas associated with the delivered hashes
121131
direct bool // Whether this is a direct reply or a broadcast
122132
}
123133

124-
// txDrop is the notiication that a peer has disconnected.
134+
// txDrop is the notification that a peer has disconnected.
125135
type txDrop struct {
126136
peer string
127137
}
@@ -153,14 +163,14 @@ type TxFetcher struct {
153163

154164
// Stage 1: Waiting lists for newly discovered transactions that might be
155165
// broadcast without needing explicit request/reply round trips.
156-
waitlist map[common.Hash]map[string]struct{} // Transactions waiting for an potential broadcast
157-
waittime map[common.Hash]mclock.AbsTime // Timestamps when transactions were added to the waitlist
158-
waitslots map[string]map[common.Hash]struct{} // Waiting announcement sgroupped by peer (DoS protection)
166+
waitlist map[common.Hash]map[string]struct{} // Transactions waiting for an potential broadcast
167+
waittime map[common.Hash]mclock.AbsTime // Timestamps when transactions were added to the waitlist
168+
waitslots map[string]map[common.Hash]*txMetadata // Waiting announcement sgroupped by peer (DoS protection)
159169

160170
// Stage 2: Queue of transactions that waiting to be allocated to some peer
161171
// to be retrieved directly.
162-
announces map[string]map[common.Hash]struct{} // Set of announced transactions, grouped by origin peer
163-
announced map[common.Hash]map[string]struct{} // Set of download locations, grouped by transaction hash
172+
announces map[string]map[common.Hash]*txMetadata // Set of announced transactions, grouped by origin peer
173+
announced map[common.Hash]map[string]struct{} // Set of download locations, grouped by transaction hash
164174

165175
// Stage 3: Set of transactions currently being retrieved, some which may be
166176
// fulfilled and some rescheduled. Note, this step shares 'announces' from the
@@ -173,6 +183,7 @@ type TxFetcher struct {
173183
hasTx func(common.Hash) bool // Retrieves a tx from the local txpool
174184
addTxs func([]*types.Transaction) []error // Insert a batch of transactions into local txpool
175185
fetchTxs func(string, []common.Hash) error // Retrieves a set of txs from a remote peer
186+
dropPeer func(string) // Drops a peer in case of announcement violation
176187

177188
step chan struct{} // Notification channel when the fetcher loop iterates
178189
clock mclock.Clock // Time wrapper to simulate in tests
@@ -181,14 +192,14 @@ type TxFetcher struct {
181192

182193
// NewTxFetcher creates a transaction fetcher to retrieve transaction
183194
// based on hash announcements.
184-
func NewTxFetcher(hasTx func(common.Hash) bool, addTxs func([]*types.Transaction) []error, fetchTxs func(string, []common.Hash) error) *TxFetcher {
185-
return NewTxFetcherForTests(hasTx, addTxs, fetchTxs, mclock.System{}, nil)
195+
func NewTxFetcher(hasTx func(common.Hash) bool, addTxs func([]*types.Transaction) []error, fetchTxs func(string, []common.Hash) error, dropPeer func(string)) *TxFetcher {
196+
return NewTxFetcherForTests(hasTx, addTxs, fetchTxs, dropPeer, mclock.System{}, nil)
186197
}
187198

188199
// NewTxFetcherForTests is a testing method to mock out the realtime clock with
189200
// a simulated version and the internal randomness with a deterministic one.
190201
func NewTxFetcherForTests(
191-
hasTx func(common.Hash) bool, addTxs func([]*types.Transaction) []error, fetchTxs func(string, []common.Hash) error,
202+
hasTx func(common.Hash) bool, addTxs func([]*types.Transaction) []error, fetchTxs func(string, []common.Hash) error, dropPeer func(string),
192203
clock mclock.Clock, rand *mrand.Rand) *TxFetcher {
193204
return &TxFetcher{
194205
notify: make(chan *txAnnounce),
@@ -197,8 +208,8 @@ func NewTxFetcherForTests(
197208
quit: make(chan struct{}),
198209
waitlist: make(map[common.Hash]map[string]struct{}),
199210
waittime: make(map[common.Hash]mclock.AbsTime),
200-
waitslots: make(map[string]map[common.Hash]struct{}),
201-
announces: make(map[string]map[common.Hash]struct{}),
211+
waitslots: make(map[string]map[common.Hash]*txMetadata),
212+
announces: make(map[string]map[common.Hash]*txMetadata),
202213
announced: make(map[common.Hash]map[string]struct{}),
203214
fetching: make(map[common.Hash]string),
204215
requests: make(map[string]*txRequest),
@@ -207,27 +218,31 @@ func NewTxFetcherForTests(
207218
hasTx: hasTx,
208219
addTxs: addTxs,
209220
fetchTxs: fetchTxs,
221+
dropPeer: dropPeer,
210222
clock: clock,
211223
rand: rand,
212224
}
213225
}
214226

215227
// Notify announces the fetcher of the potential availability of a new batch of
216228
// transactions in the network.
217-
func (f *TxFetcher) Notify(peer string, hashes []common.Hash) error {
229+
func (f *TxFetcher) Notify(peer string, types []byte, sizes []uint32, hashes []common.Hash) error {
218230
// Keep track of all the announced transactions
219231
txAnnounceInMeter.Mark(int64(len(hashes)))
220232

221233
// Skip any transaction announcements that we already know of, or that we've
222-
// previously marked as cheap and discarded. This check is of course racey,
234+
// previously marked as cheap and discarded. This check is of course racy,
223235
// because multiple concurrent notifies will still manage to pass it, but it's
224236
// still valuable to check here because it runs concurrent to the internal
225237
// loop, so anything caught here is time saved internally.
226238
var (
227-
unknowns = make([]common.Hash, 0, len(hashes))
228-
duplicate, underpriced int64
239+
unknownHashes = make([]common.Hash, 0, len(hashes))
240+
unknownMetas = make([]*txMetadata, 0, len(hashes))
241+
242+
duplicate int64
243+
underpriced int64
229244
)
230-
for _, hash := range hashes {
245+
for i, hash := range hashes {
231246
switch {
232247
case f.hasTx(hash):
233248
duplicate++
@@ -236,20 +251,22 @@ func (f *TxFetcher) Notify(peer string, hashes []common.Hash) error {
236251
underpriced++
237252

238253
default:
239-
unknowns = append(unknowns, hash)
254+
unknownHashes = append(unknownHashes, hash)
255+
if types == nil {
256+
unknownMetas = append(unknownMetas, nil)
257+
} else {
258+
unknownMetas = append(unknownMetas, &txMetadata{kind: types[i], size: sizes[i]})
259+
}
240260
}
241261
}
242262
txAnnounceKnownMeter.Mark(duplicate)
243263
txAnnounceUnderpricedMeter.Mark(underpriced)
244264

245265
// If anything's left to announce, push it into the internal loop
246-
if len(unknowns) == 0 {
266+
if len(unknownHashes) == 0 {
247267
return nil
248268
}
249-
announce := &txAnnounce{
250-
origin: peer,
251-
hashes: unknowns,
252-
}
269+
announce := &txAnnounce{origin: peer, hashes: unknownHashes, metas: unknownMetas}
253270
select {
254271
case f.notify <- announce:
255272
return nil
@@ -261,7 +278,7 @@ func (f *TxFetcher) Notify(peer string, hashes []common.Hash) error {
261278
// Enqueue imports a batch of received transaction into the transaction pool
262279
// and the fetcher. This method may be called by both transaction broadcasts and
263280
// direct request replies. The differentiation is important so the fetcher can
264-
// re-shedule missing transactions as soon as possible.
281+
// re-schedule missing transactions as soon as possible.
265282
func (f *TxFetcher) Enqueue(peer string, txs []*types.Transaction, direct bool) error {
266283
// Keep track of all the propagated transactions
267284
if direct {
@@ -273,6 +290,7 @@ func (f *TxFetcher) Enqueue(peer string, txs []*types.Transaction, direct bool)
273290
// re-requesting them and dropping the peer in case of malicious transfers.
274291
var (
275292
added = make([]common.Hash, 0, len(txs))
293+
metas = make([]txMetadata, 0, len(txs))
276294
duplicate int64
277295
underpriced int64
278296
otherreject int64
@@ -302,6 +320,10 @@ func (f *TxFetcher) Enqueue(peer string, txs []*types.Transaction, direct bool)
302320
otherreject++
303321
}
304322
added = append(added, txs[i].Hash())
323+
metas = append(metas, txMetadata{
324+
kind: txs[i].Type(),
325+
size: uint32(txs[i].Size()),
326+
})
305327
}
306328
if direct {
307329
txReplyKnownMeter.Mark(duplicate)
@@ -313,7 +335,7 @@ func (f *TxFetcher) Enqueue(peer string, txs []*types.Transaction, direct bool)
313335
txBroadcastOtherRejectMeter.Mark(otherreject)
314336
}
315337
select {
316-
case f.cleanup <- &txDelivery{origin: peer, hashes: added, direct: direct}:
338+
case f.cleanup <- &txDelivery{origin: peer, hashes: added, metas: metas, direct: direct}:
317339
return nil
318340
case <-f.quit:
319341
return errTerminated
@@ -370,13 +392,15 @@ func (f *TxFetcher) loop() {
370392
want := used + len(ann.hashes)
371393
if want > maxTxAnnounces {
372394
txAnnounceDOSMeter.Mark(int64(want - maxTxAnnounces))
395+
373396
ann.hashes = ann.hashes[:want-maxTxAnnounces]
397+
ann.metas = ann.metas[:want-maxTxAnnounces]
374398
}
375399
// All is well, schedule the remainder of the transactions
376400
idleWait := len(f.waittime) == 0
377401
_, oldPeer := f.announces[ann.origin]
378402

379-
for _, hash := range ann.hashes {
403+
for i, hash := range ann.hashes {
380404
// If the transaction is already downloading, add it to the list
381405
// of possible alternates (in case the current retrieval fails) and
382406
// also account it for the peer.
@@ -385,9 +409,9 @@ func (f *TxFetcher) loop() {
385409

386410
// Stage 2 and 3 share the set of origins per tx
387411
if announces := f.announces[ann.origin]; announces != nil {
388-
announces[hash] = struct{}{}
412+
announces[hash] = ann.metas[i]
389413
} else {
390-
f.announces[ann.origin] = map[common.Hash]struct{}{hash: {}}
414+
f.announces[ann.origin] = map[common.Hash]*txMetadata{hash: ann.metas[i]}
391415
}
392416
continue
393417
}
@@ -398,22 +422,28 @@ func (f *TxFetcher) loop() {
398422

399423
// Stage 2 and 3 share the set of origins per tx
400424
if announces := f.announces[ann.origin]; announces != nil {
401-
announces[hash] = struct{}{}
425+
announces[hash] = ann.metas[i]
402426
} else {
403-
f.announces[ann.origin] = map[common.Hash]struct{}{hash: {}}
427+
f.announces[ann.origin] = map[common.Hash]*txMetadata{hash: ann.metas[i]}
404428
}
405429
continue
406430
}
407431
// If the transaction is already known to the fetcher, but not
408432
// yet downloading, add the peer as an alternate origin in the
409433
// waiting list.
410434
if f.waitlist[hash] != nil {
435+
// Ignore double announcements from the same peer. This is
436+
// especially important if metadata is also passed along to
437+
// prevent malicious peers flip-flopping good/bad values.
438+
if _, ok := f.waitlist[hash][ann.origin]; ok {
439+
continue
440+
}
411441
f.waitlist[hash][ann.origin] = struct{}{}
412442

413443
if waitslots := f.waitslots[ann.origin]; waitslots != nil {
414-
waitslots[hash] = struct{}{}
444+
waitslots[hash] = ann.metas[i]
415445
} else {
416-
f.waitslots[ann.origin] = map[common.Hash]struct{}{hash: {}}
446+
f.waitslots[ann.origin] = map[common.Hash]*txMetadata{hash: ann.metas[i]}
417447
}
418448
continue
419449
}
@@ -422,9 +452,9 @@ func (f *TxFetcher) loop() {
422452
f.waittime[hash] = f.clock.Now()
423453

424454
if waitslots := f.waitslots[ann.origin]; waitslots != nil {
425-
waitslots[hash] = struct{}{}
455+
waitslots[hash] = ann.metas[i]
426456
} else {
427-
f.waitslots[ann.origin] = map[common.Hash]struct{}{hash: {}}
457+
f.waitslots[ann.origin] = map[common.Hash]*txMetadata{hash: ann.metas[i]}
428458
}
429459
}
430460
// If a new item was added to the waitlist, schedule it into the fetcher
@@ -450,9 +480,9 @@ func (f *TxFetcher) loop() {
450480
f.announced[hash] = f.waitlist[hash]
451481
for peer := range f.waitlist[hash] {
452482
if announces := f.announces[peer]; announces != nil {
453-
announces[hash] = struct{}{}
483+
announces[hash] = f.waitslots[peer][hash]
454484
} else {
455-
f.announces[peer] = map[common.Hash]struct{}{hash: {}}
485+
f.announces[peer] = map[common.Hash]*txMetadata{hash: f.waitslots[peer][hash]}
456486
}
457487
delete(f.waitslots[peer], hash)
458488
if len(f.waitslots[peer]) == 0 {
@@ -521,10 +551,27 @@ func (f *TxFetcher) loop() {
521551

522552
case delivery := <-f.cleanup:
523553
// Independent if the delivery was direct or broadcast, remove all
524-
// traces of the hash from internal trackers
525-
for _, hash := range delivery.hashes {
554+
// traces of the hash from internal trackers. That said, compare any
555+
// advertised metadata with the real ones and drop bad peers.
556+
for i, hash := range delivery.hashes {
526557
if _, ok := f.waitlist[hash]; ok {
527558
for peer, txset := range f.waitslots {
559+
if meta := txset[hash]; meta != nil {
560+
if delivery.metas[i].kind != meta.kind {
561+
log.Warn("Announced transaction type mismatch", "peer", peer, "tx", hash, "type", delivery.metas[i].kind, "ann", meta.kind)
562+
f.dropPeer(peer)
563+
} else if delivery.metas[i].size != meta.size {
564+
log.Warn("Announced transaction size mismatch", "peer", peer, "tx", hash, "size", delivery.metas[i].size, "ann", meta.size)
565+
if math.Abs(float64(delivery.metas[i].size)-float64(meta.size)) > 8 {
566+
// Normally we should drop a peer considering this is a protocol violation.
567+
// However, due to the RLP vs consensus format messyness, allow a few bytes
568+
// wiggle-room where we only warn, but don't drop.
569+
//
570+
// TODO(karalabe): Get rid of this relaxation when clients are proven stable.
571+
f.dropPeer(peer)
572+
}
573+
}
574+
}
528575
delete(txset, hash)
529576
if len(txset) == 0 {
530577
delete(f.waitslots, peer)
@@ -534,6 +581,22 @@ func (f *TxFetcher) loop() {
534581
delete(f.waittime, hash)
535582
} else {
536583
for peer, txset := range f.announces {
584+
if meta := txset[hash]; meta != nil {
585+
if delivery.metas[i].kind != meta.kind {
586+
log.Warn("Announced transaction type mismatch", "peer", peer, "tx", hash, "type", delivery.metas[i].kind, "ann", meta.kind)
587+
f.dropPeer(peer)
588+
} else if delivery.metas[i].size != meta.size {
589+
log.Warn("Announced transaction size mismatch", "peer", peer, "tx", hash, "size", delivery.metas[i].size, "ann", meta.size)
590+
if math.Abs(float64(delivery.metas[i].size)-float64(meta.size)) > 8 {
591+
// Normally we should drop a peer considering this is a protocol violation.
592+
// However, due to the RLP vs consensus format messyness, allow a few bytes
593+
// wiggle-room where we only warn, but don't drop.
594+
//
595+
// TODO(karalabe): Get rid of this relaxation when clients are proven stable.
596+
f.dropPeer(peer)
597+
}
598+
}
599+
}
537600
delete(txset, hash)
538601
if len(txset) == 0 {
539602
delete(f.announces, peer)
@@ -559,7 +622,7 @@ func (f *TxFetcher) loop() {
559622
// In case of a direct delivery, also reschedule anything missing
560623
// from the original query
561624
if delivery.direct {
562-
// Mark the reqesting successful (independent of individual status)
625+
// Mark the requesting successful (independent of individual status)
563626
txRequestDoneMeter.Mark(int64(len(delivery.hashes)))
564627

565628
// Make sure something was pending, nuke it
@@ -608,7 +671,7 @@ func (f *TxFetcher) loop() {
608671
delete(f.alternates, hash)
609672
delete(f.fetching, hash)
610673
}
611-
// Something was delivered, try to rechedule requests
674+
// Something was delivered, try to reschedule requests
612675
f.scheduleFetches(timeoutTimer, timeoutTrigger, nil) // Partial delivery may enable others to deliver too
613676
}
614677

@@ -720,7 +783,7 @@ func (f *TxFetcher) rescheduleWait(timer *mclock.Timer, trigger chan struct{}) {
720783
// should be rescheduled if some request is pending. In practice, a timeout will
721784
// cause the timer to be rescheduled every 5 secs (until the peer comes through or
722785
// disconnects). This is a limitation of the fetcher code because we don't trac
723-
// pending requests and timed out requests separatey. Without double tracking, if
786+
// pending requests and timed out requests separately. Without double tracking, if
724787
// we simply didn't reschedule the timer on all-timeout then the timer would never
725788
// be set again since len(request) > 0 => something's running.
726789
func (f *TxFetcher) rescheduleTimeout(timer *mclock.Timer, trigger chan struct{}) {
@@ -835,7 +898,7 @@ func (f *TxFetcher) forEachPeer(peers map[string]struct{}, do func(peer string))
835898

836899
// forEachHash does a range loop over a map of hashes in production, but during
837900
// testing it does a deterministic sorted random to allow reproducing issues.
838-
func (f *TxFetcher) forEachHash(hashes map[common.Hash]struct{}, do func(hash common.Hash) bool) {
901+
func (f *TxFetcher) forEachHash(hashes map[common.Hash]*txMetadata, do func(hash common.Hash) bool) {
839902
// If we're running production, use whatever Go's map gives us
840903
if f.rand == nil {
841904
for hash := range hashes {

0 commit comments

Comments
 (0)