@@ -3,9 +3,9 @@ package graphsync_test
3
3
import (
4
4
"bytes"
5
5
"context"
6
- "crypto/rand"
7
6
"fmt"
8
7
"io/ioutil"
8
+ "math/rand"
9
9
"os"
10
10
"runtime"
11
11
"strings"
@@ -31,6 +31,7 @@ import (
31
31
basicnode "github.com/ipld/go-ipld-prime/node/basic"
32
32
ipldselector "github.com/ipld/go-ipld-prime/traversal/selector"
33
33
"github.com/ipld/go-ipld-prime/traversal/selector/builder"
34
+ mocknet "github.com/libp2p/go-libp2p/p2p/net/mock"
34
35
"github.com/stretchr/testify/require"
35
36
)
36
37
@@ -48,10 +49,81 @@ func BenchmarkRoundtripSuccess(b *testing.B) {
48
49
tdm , err := newTempDirMaker (b )
49
50
require .NoError (b , err )
50
51
b .Run ("test-20-10000" , func (b * testing.B ) {
51
- subtestDistributeAndFetch (ctx , b , 20 , delay .Fixed (0 ), time .Duration (0 ), allFilesUniformSize (10000 ), tdm )
52
+ subtestDistributeAndFetch (ctx , b , 20 , delay .Fixed (0 ), time .Duration (0 ), allFilesUniformSize (10000 , defaultUnixfsChunkSize , defaultUnixfsLinksPerLevel ), tdm )
53
+ })
54
+ b .Run ("test-p2p-stress-10-128MB" , func (b * testing.B ) {
55
+ p2pStrestTest (ctx , b , 20 , allFilesUniformSize (128 * (1 << 20 ), 1 << 20 , 1024 ), tdm )
52
56
})
53
57
}
54
58
59
+ func p2pStrestTest (ctx context.Context , b * testing.B , numfiles int , df distFunc , tdm * tempDirMaker ) {
60
+ ctx , cancel := context .WithCancel (ctx )
61
+ defer cancel ()
62
+ mn := mocknet .New (ctx )
63
+ net := tn .StreamNet (ctx , mn )
64
+ ig := testinstance .NewTestInstanceGenerator (ctx , net , nil , tdm )
65
+ instances , err := ig .Instances (1 + b .N )
66
+ require .NoError (b , err )
67
+ var allCids []cid.Cid
68
+ for i := 0 ; i < numfiles ; i ++ {
69
+ thisCids := df (ctx , b , instances [:1 ])
70
+ allCids = append (allCids , thisCids ... )
71
+ }
72
+ ssb := builder .NewSelectorSpecBuilder (basicnode .Style .Any )
73
+
74
+ allSelector := ssb .ExploreRecursive (ipldselector .RecursionLimitNone (),
75
+ ssb .ExploreAll (ssb .ExploreRecursiveEdge ())).Node ()
76
+
77
+ runtime .GC ()
78
+ b .ResetTimer ()
79
+ b .ReportAllocs ()
80
+ for i := 0 ; i < b .N ; i ++ {
81
+ fetcher := instances [i + 1 ]
82
+ var wg sync.WaitGroup
83
+ ctx , cancel := context .WithTimeout (ctx , 10 * time .Second )
84
+ require .NoError (b , err )
85
+ start := time .Now ()
86
+ disconnectOn := rand .Intn (numfiles )
87
+ for j := 0 ; j < numfiles ; j ++ {
88
+ resultChan , errChan := fetcher .Exchange .Request (ctx , instances [0 ].Peer , cidlink.Link {Cid : allCids [j ]}, allSelector )
89
+
90
+ wg .Add (1 )
91
+ go func (j int ) {
92
+ defer wg .Done ()
93
+ results := 0
94
+ for {
95
+ select {
96
+ case <- ctx .Done ():
97
+ return
98
+ case <- resultChan :
99
+ results ++
100
+ if results == 100 && j == disconnectOn {
101
+ mn .DisconnectPeers (instances [0 ].Peer , instances [i + 1 ].Peer )
102
+ mn .UnlinkPeers (instances [0 ].Peer , instances [i + 1 ].Peer )
103
+ time .Sleep (100 * time .Millisecond )
104
+ mn .LinkPeers (instances [0 ].Peer , instances [i + 1 ].Peer )
105
+ }
106
+ case err , ok := <- errChan :
107
+ if ! ok {
108
+ return
109
+ }
110
+ b .Fatalf ("received error on request: %s" , err .Error ())
111
+ }
112
+ }
113
+ }(j )
114
+ }
115
+ wg .Wait ()
116
+ result := runStats {
117
+ Time : time .Since (start ),
118
+ Name : b .Name (),
119
+ }
120
+ benchmarkLog = append (benchmarkLog , result )
121
+ cancel ()
122
+ fetcher .Close ()
123
+ }
124
+ testinstance .Close (instances )
125
+ ig .Close ()
126
+ }
55
127
func subtestDistributeAndFetch (ctx context.Context , b * testing.B , numnodes int , d delay.D , bstoreLatency time.Duration , df distFunc , tdm * tempDirMaker ) {
56
128
ctx , cancel := context .WithCancel (ctx )
57
129
defer cancel ()
@@ -116,10 +188,10 @@ func subtestDistributeAndFetch(ctx context.Context, b *testing.B, numnodes int,
116
188
117
189
type distFunc func (ctx context.Context , b * testing.B , provs []testinstance.Instance ) []cid.Cid
118
190
119
- const unixfsChunkSize uint64 = 1 << 10
120
- const unixfsLinksPerLevel = 1024
191
+ const defaultUnixfsChunkSize uint64 = 1 << 10
192
+ const defaultUnixfsLinksPerLevel = 1024
121
193
122
- func loadRandomUnixFxFile (ctx context.Context , b * testing.B , bs blockstore.Blockstore , size uint64 ) cid.Cid {
194
+ func loadRandomUnixFxFile (ctx context.Context , b * testing.B , bs blockstore.Blockstore , size uint64 , unixfsChunkSize uint64 , unixfsLinksPerLevel int ) cid.Cid {
123
195
124
196
data := make ([]byte , size )
125
197
_ , err := rand .Read (data )
@@ -151,11 +223,11 @@ func loadRandomUnixFxFile(ctx context.Context, b *testing.B, bs blockstore.Block
151
223
return nd .Cid ()
152
224
}
153
225
154
- func allFilesUniformSize (size uint64 ) distFunc {
226
+ func allFilesUniformSize (size uint64 , unixfsChunkSize uint64 , unixfsLinksPerLevel int ) distFunc {
155
227
return func (ctx context.Context , b * testing.B , provs []testinstance.Instance ) []cid.Cid {
156
228
cids := make ([]cid.Cid , 0 , len (provs ))
157
229
for _ , prov := range provs {
158
- c := loadRandomUnixFxFile (ctx , b , prov .BlockStore , size )
230
+ c := loadRandomUnixFxFile (ctx , b , prov .BlockStore , size , unixfsChunkSize , unixfsLinksPerLevel )
159
231
cids = append (cids , c )
160
232
}
161
233
return cids
0 commit comments