Skip to content

Commit 2af50e8

Browse files
authored
Remove restart ack timeout (#211)
* fix: disable restart ack timeout * fix: option to disable accept and complete timeouts
1 parent 3a130c3 commit 2af50e8

File tree

3 files changed

+40
-104
lines changed

3 files changed

+40
-104
lines changed

channelmonitor/channelmonitor.go

+20-72
Original file line numberDiff line numberDiff line change
@@ -38,19 +38,18 @@ type Monitor struct {
3838
}
3939

4040
type Config struct {
41-
// Max time to wait for other side to accept open channel request before attempting restart
41+
// Max time to wait for other side to accept open channel request before attempting restart.
42+
// Set to 0 to disable timeout.
4243
AcceptTimeout time.Duration
4344
// Debounce when restart is triggered by multiple errors
4445
RestartDebounce time.Duration
4546
// Backoff after restarting
4647
RestartBackoff time.Duration
4748
// Number of times to try to restart before failing
4849
MaxConsecutiveRestarts uint32
49-
// Max time to wait for the peer to acknowledge a restart request.
50-
// Note: Does not include the time taken to reconnect to the peer.
51-
RestartAckTimeout time.Duration
5250
// Max time to wait for the responder to send a Complete message once all
53-
// data has been sent
51+
// data has been sent.
52+
// Set to 0 to disable timeout.
5453
CompleteTimeout time.Duration
5554
// Called when a restart completes successfully
5655
OnRestartComplete func(id datatransfer.ChannelID)
@@ -74,17 +73,14 @@ func checkConfig(cfg *Config) {
7473
}
7574

7675
prefix := "data-transfer channel monitor config "
77-
if cfg.AcceptTimeout <= 0 {
78-
panic(fmt.Sprintf(prefix+"AcceptTimeout is %s but must be > 0", cfg.AcceptTimeout))
76+
if cfg.AcceptTimeout < 0 {
77+
panic(fmt.Sprintf(prefix+"AcceptTimeout is %s but must be >= 0", cfg.AcceptTimeout))
7978
}
8079
if cfg.MaxConsecutiveRestarts == 0 {
8180
panic(fmt.Sprintf(prefix+"MaxConsecutiveRestarts is %d but must be > 0", cfg.MaxConsecutiveRestarts))
8281
}
83-
if cfg.RestartAckTimeout <= 0 {
84-
panic(fmt.Sprintf(prefix+"RestartAckTimeout is %s but must be > 0", cfg.RestartAckTimeout))
85-
}
86-
if cfg.CompleteTimeout <= 0 {
87-
panic(fmt.Sprintf(prefix+"CompleteTimeout is %s but must be > 0", cfg.CompleteTimeout))
82+
if cfg.CompleteTimeout < 0 {
83+
panic(fmt.Sprintf(prefix+"CompleteTimeout is %s but must be >= 0", cfg.CompleteTimeout))
8884
}
8985
}
9086

@@ -275,6 +271,11 @@ func (mc *monitoredChannel) start() {
275271
// an Accept to our open channel request before the accept timeout.
276272
// Returns a function that can be used to cancel the timer.
277273
func (mc *monitoredChannel) watchForResponderAccept() func() {
274+
// Check if the accept timeout is disabled
275+
if mc.cfg.AcceptTimeout == 0 {
276+
return func() {}
277+
}
278+
278279
// Start a timer for the accept timeout
279280
timer := time.NewTimer(mc.cfg.AcceptTimeout)
280281

@@ -297,6 +298,11 @@ func (mc *monitoredChannel) watchForResponderAccept() func() {
297298

298299
// Wait up to the configured timeout for the responder to send a Complete message
299300
func (mc *monitoredChannel) watchForResponderComplete() {
301+
// Check if the complete timeout is disabled
302+
if mc.cfg.CompleteTimeout == 0 {
303+
return
304+
}
305+
300306
// Start a timer for the complete timeout
301307
timer := time.NewTimer(mc.cfg.CompleteTimeout)
302308
defer timer.Stop()
@@ -308,7 +314,7 @@ func (mc *monitoredChannel) watchForResponderComplete() {
308314
case <-timer.C:
309315
// Timer expired before we received a Complete message from the responder
310316
err := xerrors.Errorf("%s: timed out waiting %s for Complete message from remote peer",
311-
mc.chid, mc.cfg.AcceptTimeout)
317+
mc.chid, mc.cfg.CompleteTimeout)
312318
mc.closeChannelAndShutdown(err)
313319
}
314320
}
@@ -414,8 +420,7 @@ func (mc *monitoredChannel) doRestartChannel() error {
414420
err := mc.sendRestartMessage(restartCount)
415421
if err != nil {
416422
log.Warnf("%s: restart failed, trying again: %s", mc.chid, err)
417-
// If the restart message could not be sent, or there was a timeout
418-
// waiting for the restart to be acknowledged, try again
423+
// If the restart message could not be sent, try again
419424
return mc.doRestartChannel()
420425
}
421426
log.Infof("%s: restart completed successfully", mc.chid)
@@ -438,25 +443,12 @@ func (mc *monitoredChannel) sendRestartMessage(restartCount int) error {
438443
log.Infof("%s: re-established connection to %s in %s", mc.chid, p, time.Since(start))
439444

440445
// Send a restart message for the channel
441-
restartResult := mc.waitForRestartResponse()
442446
log.Infof("%s: sending restart message to %s (%d consecutive restarts)", mc.chid, p, restartCount)
443447
err = mc.mgr.RestartDataTransferChannel(mc.ctx, mc.chid)
444448
if err != nil {
445449
return xerrors.Errorf("%s: failed to send restart message to %s: %w", mc.chid, p, err)
446450
}
447451

448-
// The restart message is fire and forget, so we need to watch for a
449-
// restart response to know that the restart message reached the peer.
450-
select {
451-
case <-mc.ctx.Done():
452-
return nil // channel shutdown so just bail out
453-
case err = <-restartResult:
454-
if err != nil {
455-
return xerrors.Errorf("%s: failed to send restart message to %s: %w", mc.chid, p, err)
456-
}
457-
}
458-
log.Infof("%s: received restart response from %s", mc.chid, p)
459-
460452
// The restart message was sent successfully.
461453
// If a restart backoff is configured, backoff after a restart before
462454
// attempting another.
@@ -490,47 +482,3 @@ func (mc *monitoredChannel) closeChannelAndShutdown(cherr error) {
490482
log.Errorf("error closing data-transfer channel %s: %s", mc.chid, err)
491483
}
492484
}
493-
494-
// Wait for the peer to send an acknowledgement to the restart request
495-
func (mc *monitoredChannel) waitForRestartResponse() chan error {
496-
restartFired := make(chan struct{})
497-
restarted := make(chan error, 1)
498-
timer := time.NewTimer(mc.cfg.RestartAckTimeout)
499-
500-
unsub := mc.mgr.SubscribeToEvents(func(event datatransfer.Event, channelState datatransfer.ChannelState) {
501-
if channelState.ChannelID() != mc.chid {
502-
return
503-
}
504-
505-
// The Restart event is fired when we receive an acknowledgement
506-
// from the peer that it has received a restart request
507-
if event.Code == datatransfer.Restart {
508-
close(restartFired)
509-
}
510-
})
511-
512-
go func() {
513-
defer unsub()
514-
defer timer.Stop()
515-
516-
select {
517-
518-
// Restart ack received from peer
519-
case <-restartFired:
520-
restarted <- nil
521-
522-
// Channel monitor shutdown, just bail out
523-
case <-mc.ctx.Done():
524-
restarted <- nil
525-
526-
// Timer expired before receiving a restart ack from peer
527-
case <-timer.C:
528-
p := mc.chid.OtherParty(mc.mgr.PeerID())
529-
err := xerrors.Errorf("did not receive response to restart request from %s after %s",
530-
p, mc.cfg.RestartAckTimeout)
531-
restarted <- err
532-
}
533-
}()
534-
535-
return restarted
536-
}

channelmonitor/channelmonitor_test.go

+20-31
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@ func TestChannelMonitorAutoRestart(t *testing.T) {
2727
name string
2828
errReconnect bool
2929
errSendRestartMsg bool
30-
timeoutRestartAck bool
3130
}
3231
testCases := []testCase{{
3332
name: "attempt restart",
@@ -37,9 +36,6 @@ func TestChannelMonitorAutoRestart(t *testing.T) {
3736
}, {
3837
name: "fail to send restart message",
3938
errSendRestartMsg: true,
40-
}, {
41-
name: "timeout waiting for restart message ack from peer",
42-
timeoutRestartAck: true,
4339
}}
4440

4541
runTest := func(name string, isPush bool) {
@@ -59,7 +55,6 @@ func TestChannelMonitorAutoRestart(t *testing.T) {
5955
m := NewMonitor(mockAPI, &Config{
6056
AcceptTimeout: time.Hour,
6157
MaxConsecutiveRestarts: 1,
62-
RestartAckTimeout: 20 * time.Millisecond,
6358
CompleteTimeout: time.Hour,
6459
})
6560

@@ -96,16 +91,6 @@ func TestChannelMonitorAutoRestart(t *testing.T) {
9691
err := mockAPI.awaitRestartSent()
9792
require.NoError(t, err)
9893

99-
// If simulating a restart ack timeout, don't fire the restart
100-
// ack event and expect the channel to be closed with an error
101-
if tc.timeoutRestartAck {
102-
mockAPI.verifyChannelClosed(t, true)
103-
return
104-
}
105-
106-
// Simulate receiving restart message ack from responder
107-
mockAPI.restartEvent()
108-
10994
if isPush {
11095
// Simulate sending the remaining data
11196
mockAPI.dataSent(5)
@@ -145,7 +130,6 @@ func TestChannelMonitorMaxConsecutiveRestarts(t *testing.T) {
145130
m := NewMonitor(mockAPI, &Config{
146131
AcceptTimeout: time.Hour,
147132
MaxConsecutiveRestarts: uint32(maxConsecutiveRestarts),
148-
RestartAckTimeout: time.Hour,
149133
CompleteTimeout: time.Hour,
150134
})
151135

@@ -169,9 +153,6 @@ func TestChannelMonitorMaxConsecutiveRestarts(t *testing.T) {
169153
err := mockAPI.awaitRestartSent()
170154
require.NoError(t, err)
171155

172-
// Simulate receiving restart ack from peer
173-
mockAPI.restartEvent()
174-
175156
err = awaitRestartComplete(mch)
176157
require.NoError(t, err)
177158
}
@@ -232,7 +213,6 @@ func TestChannelMonitorQueuedRestart(t *testing.T) {
232213
AcceptTimeout: time.Hour,
233214
RestartDebounce: 10 * time.Millisecond,
234215
MaxConsecutiveRestarts: 3,
235-
RestartAckTimeout: time.Hour,
236216
CompleteTimeout: time.Hour,
237217
})
238218

@@ -256,9 +236,6 @@ func TestChannelMonitorQueuedRestart(t *testing.T) {
256236
// Trigger another error event before the restart has completed
257237
triggerErrorEvent()
258238

259-
// Simulate receiving restart ack from peer (for first restart)
260-
mockAPI.restartEvent()
261-
262239
// A second restart should be sent because of the second error
263240
err = mockAPI.awaitRestartSent()
264241
require.NoError(t, err)
@@ -273,9 +250,11 @@ func TestChannelMonitorQueuedRestart(t *testing.T) {
273250

274251
func TestChannelMonitorTimeouts(t *testing.T) {
275252
type testCase struct {
276-
name string
277-
expectAccept bool
278-
expectComplete bool
253+
name string
254+
expectAccept bool
255+
expectComplete bool
256+
acceptTimeoutDisabled bool
257+
completeTimeoutDisabled bool
279258
}
280259
testCases := []testCase{{
281260
name: "accept in time",
@@ -284,6 +263,10 @@ func TestChannelMonitorTimeouts(t *testing.T) {
284263
}, {
285264
name: "accept too late",
286265
expectAccept: false,
266+
}, {
267+
name: "disable accept timeout",
268+
acceptTimeoutDisabled: true,
269+
expectAccept: true,
287270
}, {
288271
name: "complete in time",
289272
expectAccept: true,
@@ -292,6 +275,11 @@ func TestChannelMonitorTimeouts(t *testing.T) {
292275
name: "complete too late",
293276
expectAccept: true,
294277
expectComplete: false,
278+
}, {
279+
name: "disable complete timeout",
280+
completeTimeoutDisabled: true,
281+
expectAccept: true,
282+
expectComplete: true,
295283
}}
296284

297285
runTest := func(name string, isPush bool) {
@@ -309,10 +297,15 @@ func TestChannelMonitorTimeouts(t *testing.T) {
309297

310298
acceptTimeout := 10 * time.Millisecond
311299
completeTimeout := 10 * time.Millisecond
300+
if tc.acceptTimeoutDisabled {
301+
acceptTimeout = 0
302+
}
303+
if tc.completeTimeoutDisabled {
304+
completeTimeout = 0
305+
}
312306
m := NewMonitor(mockAPI, &Config{
313307
AcceptTimeout: acceptTimeout,
314308
MaxConsecutiveRestarts: 1,
315-
RestartAckTimeout: time.Hour,
316309
CompleteTimeout: completeTimeout,
317310
})
318311

@@ -520,10 +513,6 @@ func (m *mockMonitorAPI) receiveDataErrorEvent() {
520513
m.fireEvent(datatransfer.Event{Code: datatransfer.ReceiveDataError}, m.ch)
521514
}
522515

523-
func (m *mockMonitorAPI) restartEvent() {
524-
m.fireEvent(datatransfer.Event{Code: datatransfer.Restart}, m.ch)
525-
}
526-
527516
type mockChannelState struct {
528517
chid datatransfer.ChannelID
529518
queued uint64

impl/integration_test.go

-1
Original file line numberDiff line numberDiff line change
@@ -731,7 +731,6 @@ func TestAutoRestart(t *testing.T) {
731731
RestartDebounce: 500 * time.Millisecond,
732732
RestartBackoff: 500 * time.Millisecond,
733733
MaxConsecutiveRestarts: 10,
734-
RestartAckTimeout: 100 * time.Millisecond,
735734
CompleteTimeout: 100 * time.Millisecond,
736735
})
737736
initiator, err := NewDataTransfer(gsData.DtDs1, gsData.TempDir1, gsData.DtNet1, initiatorGSTspt, restartConf)

0 commit comments

Comments
 (0)