Skip to content
This repository was archived by the owner on Feb 1, 2023. It is now read-only.

Commit 18c401d

Browse files
authored
Merge pull request #42 from ipfs/feat/bandwidth-limited-tests
Feat/bandwidth limited tests
2 parents 916de59 + 48f53bb commit 18c401d

File tree

6 files changed

+169
-19
lines changed

6 files changed

+169
-19
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
tmp

dup_blocks_test.go renamed to benchmarks_test.go

Lines changed: 40 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,10 @@ import (
99
"testing"
1010
"time"
1111

12-
tn "github.com/ipfs/go-bitswap/testnet"
12+
"github.com/ipfs/go-bitswap/testutil"
1313

1414
bssession "github.com/ipfs/go-bitswap/session"
15+
tn "github.com/ipfs/go-bitswap/testnet"
1516
"github.com/ipfs/go-block-format"
1617
cid "github.com/ipfs/go-cid"
1718
blocksutil "github.com/ipfs/go-ipfs-blocksutil"
@@ -34,6 +35,7 @@ type runStats struct {
3435
var benchmarkLog []runStats
3536

3637
func BenchmarkDups2Nodes(b *testing.B) {
38+
benchmarkLog = nil
3739
fixedDelay := delay.Fixed(10 * time.Millisecond)
3840
b.Run("AllToAll-OneAtATime", func(b *testing.B) {
3941
subtestDistributeAndFetch(b, 3, 100, fixedDelay, allToAll, oneAtATime)
@@ -93,50 +95,83 @@ func BenchmarkDups2Nodes(b *testing.B) {
9395
subtestDistributeAndFetch(b, 200, 20, fixedDelay, allToAll, batchFetchAll)
9496
})
9597
out, _ := json.MarshalIndent(benchmarkLog, "", " ")
96-
ioutil.WriteFile("benchmark.json", out, 0666)
98+
ioutil.WriteFile("tmp/benchmark.json", out, 0666)
9799
}
98100

99101
const fastSpeed = 60 * time.Millisecond
100102
const mediumSpeed = 200 * time.Millisecond
101103
const slowSpeed = 800 * time.Millisecond
102104
const superSlowSpeed = 4000 * time.Millisecond
103105
const distribution = 20 * time.Millisecond
106+
const fastBandwidth = 1250000.0
107+
const fastBandwidthDeviation = 300000.0
108+
const mediumBandwidth = 500000.0
109+
const mediumBandwidthDeviation = 80000.0
110+
const slowBandwidth = 100000.0
111+
const slowBandwidthDeviation = 16500.0
112+
const stdBlockSize = 8000
104113

105114
func BenchmarkDupsManyNodesRealWorldNetwork(b *testing.B) {
115+
benchmarkLog = nil
106116
fastNetworkDelayGenerator := tn.InternetLatencyDelayGenerator(
107117
mediumSpeed-fastSpeed, slowSpeed-fastSpeed,
108118
0.0, 0.0, distribution, nil)
109119
fastNetworkDelay := delay.Delay(fastSpeed, fastNetworkDelayGenerator)
120+
fastBandwidthGenerator := tn.VariableRateLimitGenerator(fastBandwidth, fastBandwidthDeviation, nil)
110121
averageNetworkDelayGenerator := tn.InternetLatencyDelayGenerator(
111122
mediumSpeed-fastSpeed, slowSpeed-fastSpeed,
112123
0.3, 0.3, distribution, nil)
113124
averageNetworkDelay := delay.Delay(fastSpeed, averageNetworkDelayGenerator)
125+
averageBandwidthGenerator := tn.VariableRateLimitGenerator(mediumBandwidth, mediumBandwidthDeviation, nil)
114126
slowNetworkDelayGenerator := tn.InternetLatencyDelayGenerator(
115127
mediumSpeed-fastSpeed, superSlowSpeed-fastSpeed,
116128
0.3, 0.3, distribution, nil)
117129
slowNetworkDelay := delay.Delay(fastSpeed, slowNetworkDelayGenerator)
130+
slowBandwidthGenerator := tn.VariableRateLimitGenerator(slowBandwidth, slowBandwidthDeviation, nil)
118131

119132
b.Run("200Nodes-AllToAll-BigBatch-FastNetwork", func(b *testing.B) {
120-
subtestDistributeAndFetch(b, 300, 200, fastNetworkDelay, allToAll, batchFetchAll)
133+
subtestDistributeAndFetchRateLimited(b, 300, 200, fastNetworkDelay, fastBandwidthGenerator, stdBlockSize, allToAll, batchFetchAll)
121134
})
122135
b.Run("200Nodes-AllToAll-BigBatch-AverageVariableSpeedNetwork", func(b *testing.B) {
123-
subtestDistributeAndFetch(b, 300, 200, averageNetworkDelay, allToAll, batchFetchAll)
136+
subtestDistributeAndFetchRateLimited(b, 300, 200, averageNetworkDelay, averageBandwidthGenerator, stdBlockSize, allToAll, batchFetchAll)
124137
})
125138
b.Run("200Nodes-AllToAll-BigBatch-SlowVariableSpeedNetwork", func(b *testing.B) {
126-
subtestDistributeAndFetch(b, 300, 200, slowNetworkDelay, allToAll, batchFetchAll)
139+
subtestDistributeAndFetchRateLimited(b, 300, 200, slowNetworkDelay, slowBandwidthGenerator, stdBlockSize, allToAll, batchFetchAll)
127140
})
141+
out, _ := json.MarshalIndent(benchmarkLog, "", " ")
142+
ioutil.WriteFile("tmp/rw-benchmark.json", out, 0666)
128143
}
129144

130145
func subtestDistributeAndFetch(b *testing.B, numnodes, numblks int, d delay.D, df distFunc, ff fetchFunc) {
131146
start := time.Now()
132147
net := tn.VirtualNetwork(mockrouting.NewServer(), d)
148+
133149
sg := NewTestSessionGenerator(net)
134150
defer sg.Close()
135151

136152
bg := blocksutil.NewBlockGenerator()
137153

138154
instances := sg.Instances(numnodes)
139155
blocks := bg.Blocks(numblks)
156+
runDistribution(b, instances, blocks, df, ff, start)
157+
}
158+
159+
func subtestDistributeAndFetchRateLimited(b *testing.B, numnodes, numblks int, d delay.D, rateLimitGenerator tn.RateLimitGenerator, blockSize int64, df distFunc, ff fetchFunc) {
160+
start := time.Now()
161+
net := tn.RateLimitedVirtualNetwork(mockrouting.NewServer(), d, rateLimitGenerator)
162+
163+
sg := NewTestSessionGenerator(net)
164+
defer sg.Close()
165+
166+
instances := sg.Instances(numnodes)
167+
blocks := testutil.GenerateBlocksOfSize(numblks, blockSize)
168+
169+
runDistribution(b, instances, blocks, df, ff, start)
170+
}
171+
172+
func runDistribution(b *testing.B, instances []Instance, blocks []blocks.Block, df distFunc, ff fetchFunc, start time.Time) {
173+
174+
numnodes := len(instances)
140175

141176
fetcher := instances[numnodes-1]
142177

package.json

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,9 @@
99
"gxDependencies": [
1010
{
1111
"author": "whyrusleeping",
12-
"hash": "QmdJdFQc5U3RAKgJQGmWR7SSM7TLuER5FWz5Wq6Tzs2CnS",
12+
"hash": "QmYxivS34F2M2n44WQQnRHGAKS8aoRUxwGpi9wk4Cdn4Jf",
1313
"name": "go-libp2p",
14-
"version": "6.0.29"
14+
"version": "6.0.30"
1515
},
1616
{
1717
"author": "hsanjuan",
@@ -184,6 +184,12 @@
184184
"hash": "Qmf7HqcW7LtCi1W8y2bdx2eJpze74jkbKqpByxgXikdbLF",
185185
"name": "go-detect-race",
186186
"version": "1.0.1"
187+
},
188+
{
189+
"author": "jbenet",
190+
"hash": "QmSJ9n2s9NUoA9D849W5jj5SJ94nMcZpj1jCgQJieiNqSt",
191+
"name": "go-random",
192+
"version": "1.0.0"
187193
}
188194
],
189195
"gxVersion": "0.12.1",

testnet/rate_limit_generators.go

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
package bitswap
2+
3+
import (
4+
"math/rand"
5+
)
6+
7+
type fixedRateLimitGenerator struct {
8+
rateLimit float64
9+
}
10+
11+
// FixedRateLimitGenerator returns a rate limit generatoe that always generates
12+
// the specified rate limit in bytes/sec.
13+
func FixedRateLimitGenerator(rateLimit float64) RateLimitGenerator {
14+
return &fixedRateLimitGenerator{rateLimit}
15+
}
16+
17+
func (rateLimitGenerator *fixedRateLimitGenerator) NextRateLimit() float64 {
18+
return rateLimitGenerator.rateLimit
19+
}
20+
21+
type variableRateLimitGenerator struct {
22+
rateLimit float64
23+
std float64
24+
rng *rand.Rand
25+
}
26+
27+
// VariableRateLimitGenerator makes rate limites that following a normal distribution.
28+
func VariableRateLimitGenerator(rateLimit float64, std float64, rng *rand.Rand) RateLimitGenerator {
29+
if rng == nil {
30+
rng = sharedRNG
31+
}
32+
33+
return &variableRateLimitGenerator{
34+
std: std,
35+
rng: rng,
36+
rateLimit: rateLimit,
37+
}
38+
}
39+
40+
func (rateLimitGenerator *variableRateLimitGenerator) NextRateLimit() float64 {
41+
return rateLimitGenerator.rng.NormFloat64()*rateLimitGenerator.std + rateLimitGenerator.rateLimit
42+
}

testnet/virtual.go

Lines changed: 55 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -18,28 +18,51 @@ import (
1818
ifconnmgr "github.com/libp2p/go-libp2p-interface-connmgr"
1919
peer "github.com/libp2p/go-libp2p-peer"
2020
routing "github.com/libp2p/go-libp2p-routing"
21+
mocknet "github.com/libp2p/go-libp2p/p2p/net/mock"
2122
testutil "github.com/libp2p/go-testutil"
2223
)
2324

2425
var log = logging.Logger("bstestnet")
2526

2627
func VirtualNetwork(rs mockrouting.Server, d delay.D) Network {
2728
return &network{
28-
latencies: make(map[peer.ID]map[peer.ID]time.Duration),
29-
clients: make(map[peer.ID]*receiverQueue),
30-
delay: d,
31-
routingserver: rs,
32-
conns: make(map[string]struct{}),
29+
latencies: make(map[peer.ID]map[peer.ID]time.Duration),
30+
clients: make(map[peer.ID]*receiverQueue),
31+
delay: d,
32+
routingserver: rs,
33+
isRateLimited: false,
34+
rateLimitGenerator: nil,
35+
conns: make(map[string]struct{}),
36+
}
37+
}
38+
39+
type RateLimitGenerator interface {
40+
NextRateLimit() float64
41+
}
42+
43+
func RateLimitedVirtualNetwork(rs mockrouting.Server, d delay.D, rateLimitGenerator RateLimitGenerator) Network {
44+
return &network{
45+
latencies: make(map[peer.ID]map[peer.ID]time.Duration),
46+
rateLimiters: make(map[peer.ID]map[peer.ID]*mocknet.RateLimiter),
47+
clients: make(map[peer.ID]*receiverQueue),
48+
delay: d,
49+
routingserver: rs,
50+
isRateLimited: true,
51+
rateLimitGenerator: rateLimitGenerator,
52+
conns: make(map[string]struct{}),
3353
}
3454
}
3555

3656
type network struct {
37-
mu sync.Mutex
38-
latencies map[peer.ID]map[peer.ID]time.Duration
39-
clients map[peer.ID]*receiverQueue
40-
routingserver mockrouting.Server
41-
delay delay.D
42-
conns map[string]struct{}
57+
mu sync.Mutex
58+
latencies map[peer.ID]map[peer.ID]time.Duration
59+
rateLimiters map[peer.ID]map[peer.ID]*mocknet.RateLimiter
60+
clients map[peer.ID]*receiverQueue
61+
routingserver mockrouting.Server
62+
delay delay.D
63+
isRateLimited bool
64+
rateLimitGenerator RateLimitGenerator
65+
conns map[string]struct{}
4366
}
4467

4568
type message struct {
@@ -102,6 +125,26 @@ func (n *network) SendMessage(
102125
latencies[to] = latency
103126
}
104127

128+
var bandwidthDelay time.Duration
129+
if n.isRateLimited {
130+
rateLimiters, ok := n.rateLimiters[from]
131+
if !ok {
132+
rateLimiters = make(map[peer.ID]*mocknet.RateLimiter)
133+
n.rateLimiters[from] = rateLimiters
134+
}
135+
136+
rateLimiter, ok := rateLimiters[to]
137+
if !ok {
138+
rateLimiter = mocknet.NewRateLimiter(n.rateLimitGenerator.NextRateLimit())
139+
rateLimiters[to] = rateLimiter
140+
}
141+
142+
size := mes.ToProtoV1().Size()
143+
bandwidthDelay = rateLimiter.Limit(size)
144+
} else {
145+
bandwidthDelay = 0
146+
}
147+
105148
receiver, ok := n.clients[to]
106149
if !ok {
107150
return errors.New("cannot locate peer on network")
@@ -113,7 +156,7 @@ func (n *network) SendMessage(
113156
msg := &message{
114157
from: from,
115158
msg: mes,
116-
shouldSend: time.Now().Add(latency),
159+
shouldSend: time.Now().Add(latency).Add(bandwidthDelay),
117160
}
118161
receiver.enqueue(msg)
119162

testutil/testutil.go

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,10 @@
11
package testutil
22

33
import (
4+
"bytes"
5+
6+
random "github.com/jbenet/go-random"
7+
48
bsmsg "github.com/ipfs/go-bitswap/message"
59
"github.com/ipfs/go-bitswap/wantlist"
610
"github.com/ipfs/go-block-format"
@@ -11,6 +15,25 @@ import (
1115

1216
var blockGenerator = blocksutil.NewBlockGenerator()
1317
var prioritySeq int
18+
var seedSeq int64
19+
20+
func randomBytes(n int64, seed int64) []byte {
21+
data := new(bytes.Buffer)
22+
random.WritePseudoRandomBytes(n, data, seed)
23+
return data.Bytes()
24+
}
25+
26+
// GenerateBlocksOfSize generates a series of blocks of the given byte size
27+
func GenerateBlocksOfSize(n int, size int64) []blocks.Block {
28+
generatedBlocks := make([]blocks.Block, 0, n)
29+
for i := 0; i < n; i++ {
30+
seedSeq++
31+
b := blocks.NewBlock(randomBytes(size, seedSeq))
32+
generatedBlocks = append(generatedBlocks, b)
33+
34+
}
35+
return generatedBlocks
36+
}
1437

1538
// GenerateCids produces n content identifiers.
1639
func GenerateCids(n int) []cid.Cid {

0 commit comments

Comments
 (0)