@@ -29,17 +29,15 @@ import (
29
29
basicnode "github.com/ipld/go-ipld-prime/node/basic"
30
30
ipldselector "github.com/ipld/go-ipld-prime/traversal/selector"
31
31
"github.com/ipld/go-ipld-prime/traversal/selector/builder"
32
- peer "github.com/libp2p/go-libp2p-core/peer"
33
32
mocknet "github.com/libp2p/go-libp2p/p2p/net/mock"
34
33
"github.com/stretchr/testify/require"
34
+ "golang.org/x/sync/errgroup"
35
35
36
36
"github.com/ipfs/go-graphsync/benchmarks/testinstance"
37
37
tn "github.com/ipfs/go-graphsync/benchmarks/testnet"
38
38
graphsync "github.com/ipfs/go-graphsync/impl"
39
39
)
40
40
41
- const stdBlockSize = 8000
42
-
43
41
type runStats struct {
44
42
Time time.Duration
45
43
Name string
@@ -96,34 +94,36 @@ func benchmarkRepeatedDisconnects(ctx context.Context, b *testing.B, numnodes in
96
94
b .ReportAllocs ()
97
95
fetcher := instances [0 ]
98
96
for i := 0 ; i < b .N ; i ++ {
99
- var wg sync.WaitGroup
100
97
ctx , cancel := context .WithTimeout (ctx , 10 * time .Second )
101
98
require .NoError (b , err )
102
99
start := time .Now ()
100
+ errgrp , grpctx := errgroup .WithContext (ctx )
103
101
for j := 0 ; j < numnodes ; j ++ {
104
102
instance := instances [j + 1 ]
105
- _ , errChan := fetcher .Exchange .Request (ctx , instance .Peer , cidlink.Link {Cid : allCids [i ][j ]}, allSelector )
103
+ _ , errChan := fetcher .Exchange .Request (grpctx , instance .Peer , cidlink.Link {Cid : allCids [i ][j ]}, allSelector )
104
+ other := instance .Peer
106
105
107
- wg .Add (1 )
108
- go func (other peer.ID ) {
106
+ errgrp .Go (func () error {
109
107
defer func () {
110
- mn .DisconnectPeers (fetcher .Peer , other )
111
- wg .Done ()
108
+ _ = mn .DisconnectPeers (fetcher .Peer , other )
112
109
}()
113
110
for {
114
111
select {
115
- case <- ctx .Done ():
116
- return
112
+ case <- grpctx .Done ():
113
+ return nil
117
114
case err , ok := <- errChan :
118
115
if ! ok {
119
- return
116
+ return nil
120
117
}
121
- b . Fatalf ( "received error on request: %s" , err . Error ())
118
+ return err
122
119
}
123
120
}
124
- }(instance .Peer )
121
+ })
122
+
123
+ }
124
+ if err := errgrp .Wait (); err != nil {
125
+ b .Fatalf ("received error on request: %s" , err .Error ())
125
126
}
126
- wg .Wait ()
127
127
result := runStats {
128
128
Time : time .Since (start ),
129
129
Name : b .Name (),
@@ -163,24 +163,24 @@ func p2pStrestTest(ctx context.Context, b *testing.B, numfiles int, df distFunc,
163
163
b .ReportAllocs ()
164
164
for i := 0 ; i < b .N ; i ++ {
165
165
fetcher := instances [i + 1 ]
166
- var wg sync.WaitGroup
167
166
ctx , cancel := context .WithTimeout (ctx , 10 * time .Second )
168
167
require .NoError (b , err )
169
168
start := time .Now ()
169
+ errgrp , grpctx := errgroup .WithContext (ctx )
170
170
for j := 0 ; j < numfiles ; j ++ {
171
- responseChan , errChan := fetcher .Exchange .Request (ctx , instances [0 ].Peer , cidlink.Link {Cid : allCids [j ]}, allSelector )
172
-
173
- wg .Add (1 )
174
- go func (j int ) {
175
- defer wg .Done ()
176
- for _ = range responseChan {
171
+ responseChan , errChan := fetcher .Exchange .Request (grpctx , instances [0 ].Peer , cidlink.Link {Cid : allCids [j ]}, allSelector )
172
+ errgrp .Go (func () error {
173
+ for range responseChan {
177
174
}
178
175
for err := range errChan {
179
- b . Fatalf ( "received error on request: %s" , err . Error ())
176
+ return err
180
177
}
181
- }(j )
178
+ return nil
179
+ })
180
+ }
181
+ if err := errgrp .Wait (); err != nil {
182
+ b .Fatalf ("received error on request: %s" , err .Error ())
182
183
}
183
- wg .Wait ()
184
184
result := runStats {
185
185
Time : time .Since (start ),
186
186
Name : b .Name (),
0 commit comments