@@ -3,9 +3,9 @@ package graphsync_test
3
3
import (
4
4
"bytes"
5
5
"context"
6
- "crypto/rand"
7
6
"fmt"
8
7
"io/ioutil"
8
+ "math/rand"
9
9
"os"
10
10
"runtime"
11
11
"strings"
@@ -31,6 +31,7 @@ import (
31
31
basicnode "github.com/ipld/go-ipld-prime/node/basic"
32
32
ipldselector "github.com/ipld/go-ipld-prime/traversal/selector"
33
33
"github.com/ipld/go-ipld-prime/traversal/selector/builder"
34
+ mocknet "github.com/libp2p/go-libp2p/p2p/net/mock"
34
35
"github.com/stretchr/testify/require"
35
36
)
36
37
@@ -48,10 +49,82 @@ func BenchmarkRoundtripSuccess(b *testing.B) {
48
49
tdm , err := newTempDirMaker (b )
49
50
require .NoError (b , err )
50
51
b .Run ("test-20-10000" , func (b * testing.B ) {
51
- subtestDistributeAndFetch (ctx , b , 20 , delay .Fixed (0 ), time .Duration (0 ), allFilesUniformSize (10000 ), tdm )
52
+ subtestDistributeAndFetch (ctx , b , 20 , delay .Fixed (0 ), time .Duration (0 ), allFilesUniformSize (10000 , defaultUnixfsChunkSize , defaultUnixfsLinksPerLevel ), tdm )
53
+ })
54
+ b .Run ("test-p2p-stress-10-128MB" , func (b * testing.B ) {
55
+ p2pStrestTest (ctx , b , 20 , allFilesUniformSize (128 * (1 << 20 ), 1 << 20 , 1024 ), tdm )
52
56
})
53
57
}
54
58
59
+ func p2pStrestTest (ctx context.Context , b * testing.B , numfiles int , df distFunc , tdm * tempDirMaker ) {
60
+ ctx , cancel := context .WithCancel (ctx )
61
+ defer cancel ()
62
+ mn := mocknet .New (ctx )
63
+ mn .SetLinkDefaults (mocknet.LinkOptions {Latency : 100 * time .Millisecond , Bandwidth : 3000000 })
64
+ net := tn .StreamNet (ctx , mn )
65
+ ig := testinstance .NewTestInstanceGenerator (ctx , net , nil , tdm )
66
+ instances , err := ig .Instances (1 + b .N )
67
+ require .NoError (b , err )
68
+ var allCids []cid.Cid
69
+ for i := 0 ; i < numfiles ; i ++ {
70
+ thisCids := df (ctx , b , instances [:1 ])
71
+ allCids = append (allCids , thisCids ... )
72
+ }
73
+ ssb := builder .NewSelectorSpecBuilder (basicnode .Style .Any )
74
+
75
+ allSelector := ssb .ExploreRecursive (ipldselector .RecursionLimitNone (),
76
+ ssb .ExploreAll (ssb .ExploreRecursiveEdge ())).Node ()
77
+
78
+ runtime .GC ()
79
+ b .ResetTimer ()
80
+ b .ReportAllocs ()
81
+ for i := 0 ; i < b .N ; i ++ {
82
+ fetcher := instances [i + 1 ]
83
+ var wg sync.WaitGroup
84
+ ctx , cancel := context .WithTimeout (ctx , 10 * time .Second )
85
+ require .NoError (b , err )
86
+ start := time .Now ()
87
+ disconnectOn := rand .Intn (numfiles )
88
+ for j := 0 ; j < numfiles ; j ++ {
89
+ resultChan , errChan := fetcher .Exchange .Request (ctx , instances [0 ].Peer , cidlink.Link {Cid : allCids [j ]}, allSelector )
90
+
91
+ wg .Add (1 )
92
+ go func (j int ) {
93
+ defer wg .Done ()
94
+ results := 0
95
+ for {
96
+ select {
97
+ case <- ctx .Done ():
98
+ return
99
+ case <- resultChan :
100
+ results ++
101
+ if results == 100 && j == disconnectOn {
102
+ mn .DisconnectPeers (instances [0 ].Peer , instances [i + 1 ].Peer )
103
+ mn .UnlinkPeers (instances [0 ].Peer , instances [i + 1 ].Peer )
104
+ time .Sleep (100 * time .Millisecond )
105
+ mn .LinkPeers (instances [0 ].Peer , instances [i + 1 ].Peer )
106
+ }
107
+ case err , ok := <- errChan :
108
+ if ! ok {
109
+ return
110
+ }
111
+ b .Fatalf ("received error on request: %s" , err .Error ())
112
+ }
113
+ }
114
+ }(j )
115
+ }
116
+ wg .Wait ()
117
+ result := runStats {
118
+ Time : time .Since (start ),
119
+ Name : b .Name (),
120
+ }
121
+ benchmarkLog = append (benchmarkLog , result )
122
+ cancel ()
123
+ fetcher .Close ()
124
+ }
125
+ testinstance .Close (instances )
126
+ ig .Close ()
127
+ }
55
128
func subtestDistributeAndFetch (ctx context.Context , b * testing.B , numnodes int , d delay.D , bstoreLatency time.Duration , df distFunc , tdm * tempDirMaker ) {
56
129
ctx , cancel := context .WithCancel (ctx )
57
130
defer cancel ()
@@ -116,10 +189,10 @@ func subtestDistributeAndFetch(ctx context.Context, b *testing.B, numnodes int,
116
189
117
190
type distFunc func (ctx context.Context , b * testing.B , provs []testinstance.Instance ) []cid.Cid
118
191
119
- const unixfsChunkSize uint64 = 1 << 10
120
- const unixfsLinksPerLevel = 1024
192
+ const defaultUnixfsChunkSize uint64 = 1 << 10
193
+ const defaultUnixfsLinksPerLevel = 1024
121
194
122
- func loadRandomUnixFxFile (ctx context.Context , b * testing.B , bs blockstore.Blockstore , size uint64 ) cid.Cid {
195
+ func loadRandomUnixFxFile (ctx context.Context , b * testing.B , bs blockstore.Blockstore , size uint64 , unixfsChunkSize uint64 , unixfsLinksPerLevel int ) cid.Cid {
123
196
124
197
data := make ([]byte , size )
125
198
_ , err := rand .Read (data )
@@ -151,11 +224,11 @@ func loadRandomUnixFxFile(ctx context.Context, b *testing.B, bs blockstore.Block
151
224
return nd .Cid ()
152
225
}
153
226
154
- func allFilesUniformSize (size uint64 ) distFunc {
227
+ func allFilesUniformSize (size uint64 , unixfsChunkSize uint64 , unixfsLinksPerLevel int ) distFunc {
155
228
return func (ctx context.Context , b * testing.B , provs []testinstance.Instance ) []cid.Cid {
156
229
cids := make ([]cid.Cid , 0 , len (provs ))
157
230
for _ , prov := range provs {
158
- c := loadRandomUnixFxFile (ctx , b , prov .BlockStore , size )
231
+ c := loadRandomUnixFxFile (ctx , b , prov .BlockStore , size , unixfsChunkSize , unixfsLinksPerLevel )
159
232
cids = append (cids , c )
160
233
}
161
234
return cids
0 commit comments