Skip to content

Commit 34ebfef

Browse files
dirkmcnonsense
andauthored
Better retry config (#124)
* fix: better retry config * docs: fix TestSendMessageRetry comment Co-authored-by: Anton Evangelatov <[email protected]>
1 parent 4fab043 commit 34ebfef

File tree

3 files changed

+146
-7
lines changed

3 files changed

+146
-7
lines changed

network/libp2p_impl.go

+20-5
Original file line numberDiff line numberDiff line change
@@ -23,10 +23,18 @@ var log = logging.Logger("data_transfer_network")
2323

2424
var sendMessageTimeout = time.Minute * 10
2525

26+
// The max number of attempts to open a stream
2627
const defaultMaxStreamOpenAttempts = 5
28+
29+
// The min backoff time between retries
2730
const defaultMinAttemptDuration = 1 * time.Second
31+
32+
// The max backoff time between retries
2833
const defaultMaxAttemptDuration = 5 * time.Minute
2934

35+
// The multiplier in the backoff time for each retry
36+
const defaultBackoffFactor = 5
37+
3038
var defaultDataTransferProtocols = []protocol.ID{datatransfer.ProtocolDataTransfer1_1, datatransfer.ProtocolDataTransfer1_0}
3139

3240
// Option is an option for configuring the libp2p storage market network
@@ -41,11 +49,12 @@ func DataTransferProtocols(protocols []protocol.ID) Option {
4149
}
4250

4351
// RetryParameters changes the default parameters around connection reopening
44-
func RetryParameters(minDuration time.Duration, maxDuration time.Duration, attempts float64) Option {
52+
func RetryParameters(minDuration time.Duration, maxDuration time.Duration, attempts float64, backoffFactor float64) Option {
4553
return func(impl *libp2pDataTransferNetwork) {
4654
impl.maxStreamOpenAttempts = attempts
4755
impl.minAttemptDuration = minDuration
4856
impl.maxAttemptDuration = maxDuration
57+
impl.backoffFactor = backoffFactor
4958
}
5059
}
5160

@@ -57,6 +66,7 @@ func NewFromLibp2pHost(host host.Host, options ...Option) DataTransferNetwork {
5766
maxStreamOpenAttempts: defaultMaxStreamOpenAttempts,
5867
minAttemptDuration: defaultMinAttemptDuration,
5968
maxAttemptDuration: defaultMaxAttemptDuration,
69+
backoffFactor: defaultBackoffFactor,
6070
dtProtocols: defaultDataTransferProtocols,
6171
}
6272

@@ -78,13 +88,14 @@ type libp2pDataTransferNetwork struct {
7888
minAttemptDuration time.Duration
7989
maxAttemptDuration time.Duration
8090
dtProtocols []protocol.ID
91+
backoffFactor float64
8192
}
8293

8394
func (impl *libp2pDataTransferNetwork) openStream(ctx context.Context, id peer.ID, protocols ...protocol.ID) (network.Stream, error) {
8495
b := &backoff.Backoff{
8596
Min: impl.minAttemptDuration,
8697
Max: impl.maxAttemptDuration,
87-
Factor: impl.maxStreamOpenAttempts,
98+
Factor: impl.backoffFactor,
8899
Jitter: true,
89100
}
90101

@@ -95,12 +106,16 @@ func (impl *libp2pDataTransferNetwork) openStream(ctx context.Context, id peer.I
95106
return s, err
96107
}
97108

98-
nAttempts := b.Attempt()
99-
if nAttempts == impl.maxStreamOpenAttempts {
100-
return nil, xerrors.Errorf("exhausted %d attempts but failed to open stream, err: %w", impl.maxStreamOpenAttempts, err)
109+
// b.Attempt() starts from zero
110+
nAttempts := b.Attempt() + 1
111+
if nAttempts >= impl.maxStreamOpenAttempts {
112+
return nil, xerrors.Errorf("exhausted %g attempts but failed to open stream to %s, err: %w", impl.maxStreamOpenAttempts, id, err)
101113
}
102114

103115
d := b.Duration()
116+
log.Warnf("failed to open stream to %s on attempt %g of %g, waiting %s to try again, err: %w",
117+
id, nAttempts, impl.maxStreamOpenAttempts, d, err)
118+
104119
select {
105120
case <-ctx.Done():
106121
return nil, ctx.Err()

network/libp2p_impl_test.go

+124
Original file line numberDiff line numberDiff line change
@@ -2,16 +2,21 @@ package network_test
22

33
import (
44
"context"
5+
"fmt"
56
"math/rand"
67
"testing"
78
"time"
89

910
basicnode "github.com/ipld/go-ipld-prime/node/basic"
1011
"github.com/ipld/go-ipld-prime/traversal/selector/builder"
12+
"github.com/libp2p/go-libp2p-core/host"
13+
libp2pnet "github.com/libp2p/go-libp2p-core/network"
1114
"github.com/libp2p/go-libp2p-core/peer"
15+
"github.com/libp2p/go-libp2p-core/protocol"
1216
mocknet "github.com/libp2p/go-libp2p/p2p/net/mock"
1317
"github.com/stretchr/testify/assert"
1418
"github.com/stretchr/testify/require"
19+
"golang.org/x/xerrors"
1520

1621
datatransfer "github.com/filecoin-project/go-data-transfer"
1722
"github.com/filecoin-project/go-data-transfer/message"
@@ -174,3 +179,122 @@ func TestMessageSendAndReceive(t *testing.T) {
174179
})
175180

176181
}
182+
183+
// Wrap a host so that we can mock out errors when calling NewStream
184+
type wrappedHost struct {
185+
host.Host
186+
errs chan error
187+
}
188+
189+
func (w wrappedHost) NewStream(ctx context.Context, p peer.ID, pids ...protocol.ID) (libp2pnet.Stream, error) {
190+
var err error
191+
select {
192+
case err = <-w.errs:
193+
default:
194+
}
195+
if err != nil {
196+
return nil, err
197+
}
198+
199+
return w.Host.NewStream(ctx, p, pids...)
200+
}
201+
202+
// TestSendMessageRetry verifies that if the number of retry attempts
203+
// is greater than the number of errors, SendMessage will succeed.
204+
func TestSendMessageRetry(t *testing.T) {
205+
tcases := []struct {
206+
attempts int
207+
errors int
208+
expSuccess bool
209+
}{{
210+
attempts: 1,
211+
errors: 0,
212+
expSuccess: true,
213+
}, {
214+
attempts: 1,
215+
errors: 1,
216+
expSuccess: false,
217+
}, {
218+
attempts: 2,
219+
errors: 1,
220+
expSuccess: true,
221+
}, {
222+
attempts: 2,
223+
errors: 2,
224+
expSuccess: false,
225+
}}
226+
for _, tcase := range tcases {
227+
name := fmt.Sprintf("%d attempts, %d errors", tcase.attempts, tcase.errors)
228+
t.Run(name, func(t *testing.T) {
229+
// create network
230+
ctx := context.Background()
231+
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
232+
defer cancel()
233+
mn := mocknet.New(ctx)
234+
235+
host1, err := mn.GenPeer()
236+
require.NoError(t, err)
237+
238+
// Create a wrapped host that will return tcase.errors errors from
239+
// NewStream
240+
mockHost1 := &wrappedHost{
241+
Host: host1,
242+
errs: make(chan error, tcase.errors),
243+
}
244+
for i := 0; i < tcase.errors; i++ {
245+
mockHost1.errs <- xerrors.Errorf("network err")
246+
}
247+
host1 = mockHost1
248+
249+
host2, err := mn.GenPeer()
250+
require.NoError(t, err)
251+
err = mn.LinkAll()
252+
require.NoError(t, err)
253+
254+
retry := network.RetryParameters(
255+
time.Millisecond,
256+
time.Millisecond,
257+
float64(tcase.attempts),
258+
1)
259+
dtnet1 := network.NewFromLibp2pHost(host1, retry)
260+
dtnet2 := network.NewFromLibp2pHost(host2)
261+
r := &receiver{
262+
messageReceived: make(chan struct{}),
263+
connectedPeers: make(chan peer.ID, 2),
264+
}
265+
dtnet1.SetDelegate(r)
266+
dtnet2.SetDelegate(r)
267+
268+
err = dtnet1.ConnectTo(ctx, host2.ID())
269+
require.NoError(t, err)
270+
271+
baseCid := testutil.GenerateCids(1)[0]
272+
selector := builder.NewSelectorSpecBuilder(basicnode.Prototype.Any).Matcher().Node()
273+
isPull := false
274+
id := datatransfer.TransferID(rand.Int31())
275+
voucher := testutil.NewFakeDTType()
276+
request, err := message.NewRequest(id, false, isPull, voucher.Type(), voucher, baseCid, selector)
277+
require.NoError(t, err)
278+
279+
err = dtnet1.SendMessage(ctx, host2.ID(), request)
280+
if !tcase.expSuccess {
281+
require.Error(t, err)
282+
return
283+
}
284+
285+
require.NoError(t, err)
286+
287+
select {
288+
case <-ctx.Done():
289+
t.Fatal("did not receive message sent")
290+
case <-r.messageReceived:
291+
}
292+
293+
sender := r.lastSender
294+
require.Equal(t, sender, host1.ID())
295+
296+
receivedRequest := r.lastRequest
297+
require.NotNil(t, receivedRequest)
298+
})
299+
}
300+
}

testutil/gstestdata.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -143,8 +143,8 @@ func NewGraphsyncTestingData(ctx context.Context, t *testing.T, host1Protocols [
143143
gsData.GsNet1 = gsnet.NewFromLibp2pHost(gsData.Host1)
144144
gsData.GsNet2 = gsnet.NewFromLibp2pHost(gsData.Host2)
145145

146-
opts1 := []network.Option{network.RetryParameters(0, 0, 0)}
147-
opts2 := []network.Option{network.RetryParameters(0, 0, 0)}
146+
opts1 := []network.Option{network.RetryParameters(0, 0, 0, 0)}
147+
opts2 := []network.Option{network.RetryParameters(0, 0, 0, 0)}
148148

149149
if len(host1Protocols) != 0 {
150150
opts1 = append(opts1, network.DataTransferProtocols(host1Protocols))

0 commit comments

Comments
 (0)