Skip to content
This repository was archived by the owner on Feb 1, 2023. It is now read-only.

Commit fef4be2

Browse files
authored
Merge pull request #191 from ipfs/feat/tag-ewma
engine: tag peers based on usefulness
2 parents 5fa55e8 + fcb13fc commit fef4be2

File tree

5 files changed

+242
-35
lines changed

5 files changed

+242
-35
lines changed

decision/engine.go

Lines changed: 133 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -57,11 +57,35 @@ const (
5757
outboxChanBuffer = 0
5858
// maxMessageSize is the maximum size of the batched payload
5959
maxMessageSize = 512 * 1024
60-
// tagPrefix is the tag given to peers associated an engine
61-
tagPrefix = "bs-engine-%s"
60+
// tagFormat is the tag given to peers associated an engine
61+
tagFormat = "bs-engine-%s-%s"
6262

63-
// tagWeight is the default weight for peers associated with an engine
64-
tagWeight = 5
63+
// queuedTagWeight is the default weight for peers that have work queued
64+
// on their behalf.
65+
queuedTagWeight = 10
66+
67+
// the alpha for the EWMA used to track short term usefulness
68+
shortTermAlpha = 0.5
69+
70+
// the alpha for the EWMA used to track long term usefulness
71+
longTermAlpha = 0.05
72+
73+
// long term ratio defines what "long term" means in terms of the
74+
// shortTerm duration. Peers that interact once every longTermRatio are
75+
// considered useful over the long term.
76+
longTermRatio = 10
77+
78+
// long/short term scores for tagging peers
79+
longTermScore = 10 // this is a high tag but it grows _very_ slowly.
80+
shortTermScore = 10 // this is a high tag but it'll go away quickly if we aren't using the peer.
81+
)
82+
83+
var (
84+
// how frequently the engine should sample usefulness. Peers that
85+
// interact every shortTerm time period are considered "active".
86+
//
87+
// this is only a variable to make testing easier.
88+
shortTerm = 10 * time.Second
6589
)
6690

6791
// Envelope contains a message for a Peer.
@@ -105,7 +129,8 @@ type Engine struct {
105129

106130
peerTagger PeerTagger
107131

108-
tag string
132+
tagQueued, tagUseful string
133+
109134
lock sync.Mutex // protects the fields immediatly below
110135
// ledgerMap lists Ledgers by their Partner key.
111136
ledgerMap map[peer.ID]*ledger
@@ -123,18 +148,118 @@ func NewEngine(ctx context.Context, bs bstore.Blockstore, peerTagger PeerTagger)
123148
workSignal: make(chan struct{}, 1),
124149
ticker: time.NewTicker(time.Millisecond * 100),
125150
}
126-
e.tag = fmt.Sprintf(tagPrefix, uuid.New().String())
151+
e.tagQueued = fmt.Sprintf(tagFormat, "queued", uuid.New().String())
152+
e.tagUseful = fmt.Sprintf(tagFormat, "useful", uuid.New().String())
127153
e.peerRequestQueue = peertaskqueue.New(peertaskqueue.OnPeerAddedHook(e.onPeerAdded), peertaskqueue.OnPeerRemovedHook(e.onPeerRemoved))
128154
go e.taskWorker(ctx)
155+
go e.scoreWorker(ctx)
129156
return e
130157
}
131158

159+
// scoreWorker keeps track of how "useful" our peers are, updating scores in the
160+
// connection manager.
161+
//
162+
// It does this by tracking two scores: short-term usefulness and long-term
163+
// usefulness. Short-term usefulness is sampled frequently and highly weights
164+
// new observations. Long-term usefulness is sampled less frequently and highly
165+
// weights on long-term trends.
166+
//
167+
// In practice, we do this by keeping two EWMAs. If we see an interaction
168+
// within the sampling period, we record the score, otherwise, we record a 0.
169+
// The short-term one has a high alpha and is sampled every shortTerm period.
170+
// The long-term one has a low alpha and is sampled every
171+
// longTermRatio*shortTerm period.
172+
//
173+
// To calculate the final score, we sum the short-term and long-term scores then
174+
// adjust it ±25% based on our debt ratio. Peers that have historically been
175+
// more useful to us than we are to them get the highest score.
176+
func (e *Engine) scoreWorker(ctx context.Context) {
177+
ticker := time.NewTicker(shortTerm)
178+
defer ticker.Stop()
179+
180+
type update struct {
181+
peer peer.ID
182+
score int
183+
}
184+
var (
185+
lastShortUpdate, lastLongUpdate time.Time
186+
updates []update
187+
)
188+
189+
for i := 0; ; i = (i + 1) % longTermRatio {
190+
var now time.Time
191+
select {
192+
case now = <-ticker.C:
193+
case <-ctx.Done():
194+
return
195+
}
196+
197+
// The long term update ticks every `longTermRatio` short
198+
// intervals.
199+
updateLong := i == 0
200+
201+
e.lock.Lock()
202+
for _, ledger := range e.ledgerMap {
203+
ledger.lk.Lock()
204+
205+
// Update the short-term score.
206+
if ledger.lastExchange.After(lastShortUpdate) {
207+
ledger.shortScore = ewma(ledger.shortScore, shortTermScore, shortTermAlpha)
208+
} else {
209+
ledger.shortScore = ewma(ledger.shortScore, 0, shortTermAlpha)
210+
}
211+
212+
// Update the long-term score.
213+
if updateLong {
214+
if ledger.lastExchange.After(lastLongUpdate) {
215+
ledger.longScore = ewma(ledger.longScore, longTermScore, longTermAlpha)
216+
} else {
217+
ledger.longScore = ewma(ledger.longScore, 0, longTermAlpha)
218+
}
219+
}
220+
221+
// Calculate the new score.
222+
//
223+
// The accounting score adjustment prefers peers _we_
224+
// need over peers that need us. This doesn't help with
225+
// leeching.
226+
score := int((ledger.shortScore + ledger.longScore) * ((ledger.Accounting.Score())*.5 + .75))
227+
228+
// Avoid updating the connection manager unless there's a change. This can be expensive.
229+
if ledger.score != score {
230+
// put these in a list so we can perform the updates outside _global_ the lock.
231+
updates = append(updates, update{ledger.Partner, score})
232+
ledger.score = score
233+
}
234+
ledger.lk.Unlock()
235+
}
236+
e.lock.Unlock()
237+
238+
// record the times.
239+
lastShortUpdate = now
240+
if updateLong {
241+
lastLongUpdate = now
242+
}
243+
244+
// apply the updates
245+
for _, update := range updates {
246+
if update.score == 0 {
247+
e.peerTagger.UntagPeer(update.peer, e.tagUseful)
248+
} else {
249+
e.peerTagger.TagPeer(update.peer, e.tagUseful, update.score)
250+
}
251+
}
252+
// Keep the memory. It's not much and it saves us from having to allocate.
253+
updates = updates[:0]
254+
}
255+
}
256+
132257
func (e *Engine) onPeerAdded(p peer.ID) {
133-
e.peerTagger.TagPeer(p, e.tag, tagWeight)
258+
e.peerTagger.TagPeer(p, e.tagQueued, queuedTagWeight)
134259
}
135260

136261
func (e *Engine) onPeerRemoved(p peer.ID) {
137-
e.peerTagger.UntagPeer(p, e.tag)
262+
e.peerTagger.UntagPeer(p, e.tagQueued)
138263
}
139264

140265
// WantlistForPeer returns the currently understood want list for a given peer

decision/engine_test.go

Lines changed: 84 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -19,38 +19,63 @@ import (
1919
testutil "github.com/libp2p/go-libp2p-core/test"
2020
)
2121

22+
type peerTag struct {
23+
done chan struct{}
24+
peers map[peer.ID]int
25+
}
26+
2227
type fakePeerTagger struct {
23-
lk sync.Mutex
24-
wait sync.WaitGroup
25-
taggedPeers []peer.ID
28+
lk sync.Mutex
29+
tags map[string]*peerTag
2630
}
2731

2832
func (fpt *fakePeerTagger) TagPeer(p peer.ID, tag string, n int) {
29-
fpt.wait.Add(1)
30-
3133
fpt.lk.Lock()
3234
defer fpt.lk.Unlock()
33-
fpt.taggedPeers = append(fpt.taggedPeers, p)
35+
if fpt.tags == nil {
36+
fpt.tags = make(map[string]*peerTag, 1)
37+
}
38+
pt, ok := fpt.tags[tag]
39+
if !ok {
40+
pt = &peerTag{peers: make(map[peer.ID]int, 1), done: make(chan struct{})}
41+
fpt.tags[tag] = pt
42+
}
43+
pt.peers[p] = n
3444
}
3545

3646
func (fpt *fakePeerTagger) UntagPeer(p peer.ID, tag string) {
37-
defer fpt.wait.Done()
38-
3947
fpt.lk.Lock()
4048
defer fpt.lk.Unlock()
41-
for i := 0; i < len(fpt.taggedPeers); i++ {
42-
if fpt.taggedPeers[i] == p {
43-
fpt.taggedPeers[i] = fpt.taggedPeers[len(fpt.taggedPeers)-1]
44-
fpt.taggedPeers = fpt.taggedPeers[:len(fpt.taggedPeers)-1]
45-
return
46-
}
49+
pt := fpt.tags[tag]
50+
if pt == nil {
51+
return
52+
}
53+
delete(pt.peers, p)
54+
if len(pt.peers) == 0 {
55+
close(pt.done)
56+
delete(fpt.tags, tag)
4757
}
4858
}
4959

50-
func (fpt *fakePeerTagger) count() int {
60+
func (fpt *fakePeerTagger) count(tag string) int {
5161
fpt.lk.Lock()
5262
defer fpt.lk.Unlock()
53-
return len(fpt.taggedPeers)
63+
if pt, ok := fpt.tags[tag]; ok {
64+
return len(pt.peers)
65+
}
66+
return 0
67+
}
68+
69+
func (fpt *fakePeerTagger) wait(tag string) {
70+
fpt.lk.Lock()
71+
pt := fpt.tags[tag]
72+
if pt == nil {
73+
fpt.lk.Unlock()
74+
return
75+
}
76+
doneCh := pt.done
77+
fpt.lk.Unlock()
78+
<-doneCh
5479
}
5580

5681
type engineSet struct {
@@ -241,16 +266,56 @@ func TestTaggingPeers(t *testing.T) {
241266
next := <-sanfrancisco.Engine.Outbox()
242267
envelope := <-next
243268

244-
if sanfrancisco.PeerTagger.count() != 1 {
269+
if sanfrancisco.PeerTagger.count(sanfrancisco.Engine.tagQueued) != 1 {
245270
t.Fatal("Incorrect number of peers tagged")
246271
}
247272
envelope.Sent()
248273
<-sanfrancisco.Engine.Outbox()
249-
sanfrancisco.PeerTagger.wait.Wait()
250-
if sanfrancisco.PeerTagger.count() != 0 {
274+
sanfrancisco.PeerTagger.wait(sanfrancisco.Engine.tagQueued)
275+
if sanfrancisco.PeerTagger.count(sanfrancisco.Engine.tagQueued) != 0 {
251276
t.Fatal("Peers should be untagged but weren't")
252277
}
253278
}
279+
280+
func TestTaggingUseful(t *testing.T) {
281+
oldShortTerm := shortTerm
282+
shortTerm = 1 * time.Millisecond
283+
defer func() { shortTerm = oldShortTerm }()
284+
285+
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
286+
defer cancel()
287+
me := newEngine(ctx, "engine")
288+
friend := peer.ID("friend")
289+
290+
block := blocks.NewBlock([]byte("foobar"))
291+
msg := message.New(false)
292+
msg.AddBlock(block)
293+
294+
for i := 0; i < 3; i++ {
295+
if me.PeerTagger.count(me.Engine.tagUseful) != 0 {
296+
t.Fatal("Peers should be untagged but weren't")
297+
}
298+
me.Engine.MessageSent(friend, msg)
299+
time.Sleep(shortTerm * 2)
300+
if me.PeerTagger.count(me.Engine.tagUseful) != 1 {
301+
t.Fatal("Peers should be tagged but weren't")
302+
}
303+
time.Sleep(shortTerm * 8)
304+
}
305+
306+
if me.PeerTagger.count(me.Engine.tagUseful) == 0 {
307+
t.Fatal("peers should still be tagged due to long-term usefulness")
308+
}
309+
time.Sleep(shortTerm * 2)
310+
if me.PeerTagger.count(me.Engine.tagUseful) == 0 {
311+
t.Fatal("peers should still be tagged due to long-term usefulness")
312+
}
313+
time.Sleep(shortTerm * 10)
314+
if me.PeerTagger.count(me.Engine.tagUseful) != 0 {
315+
t.Fatal("peers should finally be untagged")
316+
}
317+
}
318+
254319
func partnerWants(e *Engine, keys []string, partner peer.ID) {
255320
add := message.New(false)
256321
for i, letter := range keys {

decision/ewma.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
package decision
2+
3+
func ewma(old, new, alpha float64) float64 {
4+
return new*alpha + (1-alpha)*old
5+
}

decision/ledger.go

Lines changed: 18 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -12,9 +12,8 @@ import (
1212

1313
func newLedger(p peer.ID) *ledger {
1414
return &ledger{
15-
wantList: wl.New(),
16-
Partner: p,
17-
sentToPeer: make(map[string]time.Time),
15+
wantList: wl.New(),
16+
Partner: p,
1817
}
1918
}
2019

@@ -30,16 +29,19 @@ type ledger struct {
3029
// lastExchange is the time of the last data exchange.
3130
lastExchange time.Time
3231

32+
// These scores keep track of how useful we think this peer is. Short
33+
// tracks short-term usefulness and long tracks long-term usefulness.
34+
shortScore, longScore float64
35+
// Score keeps track of the score used in the peer tagger. We track it
36+
// here to avoid unnecessarily updating the tags in the connection manager.
37+
score int
38+
3339
// exchangeCount is the number of exchanges with this peer
3440
exchangeCount uint64
3541

3642
// wantList is a (bounded, small) set of keys that Partner desires.
3743
wantList *wl.Wantlist
3844

39-
// sentToPeer is a set of keys to ensure we dont send duplicate blocks
40-
// to a given peer
41-
sentToPeer map[string]time.Time
42-
4345
// ref is the reference count for this ledger, its used to ensure we
4446
// don't drop the reference to this ledger in multi-connection scenarios
4547
ref int
@@ -63,10 +65,19 @@ type debtRatio struct {
6365
BytesRecv uint64
6466
}
6567

68+
// Value returns the debt ratio, sent:receive.
6669
func (dr *debtRatio) Value() float64 {
6770
return float64(dr.BytesSent) / float64(dr.BytesRecv+1)
6871
}
6972

73+
// Score returns the debt _score_ on a 0-1 scale.
74+
func (dr *debtRatio) Score() float64 {
75+
if dr.BytesRecv == 0 {
76+
return 0
77+
}
78+
return float64(dr.BytesRecv) / float64(dr.BytesRecv+dr.BytesSent)
79+
}
80+
7081
func (l *ledger) SentBytes(n int) {
7182
l.exchangeCount++
7283
l.lastExchange = time.Now()

go.mod

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@ require (
77
github.com/gogo/protobuf v1.2.1
88
github.com/golang/protobuf v1.3.1 // indirect
99
github.com/google/uuid v1.1.1
10-
github.com/hashicorp/golang-lru v0.5.1
1110
github.com/ipfs/go-block-format v0.0.2
1211
github.com/ipfs/go-cid v0.0.2
1312
github.com/ipfs/go-datastore v0.0.5
@@ -38,3 +37,5 @@ require (
3837
golang.org/x/text v0.3.2 // indirect
3938
gopkg.in/yaml.v2 v2.2.2 // indirect
4039
)
40+
41+
go 1.12

0 commit comments

Comments
 (0)