@@ -160,7 +160,7 @@ type monitoredChannel struct {
160
160
chid datatransfer.ChannelID
161
161
cfg * Config
162
162
unsub datatransfer.Unsubscribe
163
- restartChannelDebounced func ()
163
+ restartChannelDebounced func (error )
164
164
onShutdown func (datatransfer.ChannelID )
165
165
shutdownLk sync.Mutex
166
166
@@ -187,8 +187,33 @@ func newMonitoredChannel(
187
187
cfg : cfg ,
188
188
onShutdown : onShutdown ,
189
189
}
190
+
191
+ // "debounce" calls to restart channel, ie if there are multiple calls in a
192
+ // short space of time, only send a message to restart the channel once
193
+ var lk sync.Mutex
194
+ var lastErr error
190
195
debouncer := debounce .New (cfg .RestartDebounce )
191
- mpc .restartChannelDebounced = func () { debouncer (mpc .restartChannel ) }
196
+ mpc .restartChannelDebounced = func (err error ) {
197
+ // Log the error at debug level
198
+ log .Debug (err .Error ())
199
+
200
+ // Save the last error passed to restartChannelDebounced
201
+ lk .Lock ()
202
+ lastErr = err
203
+ lk .Unlock ()
204
+
205
+ debouncer (func () {
206
+ // Log only the last error passed to restartChannelDebounced at warning level
207
+ lk .Lock ()
208
+ log .Warnf ("%s" , lastErr )
209
+ lk .Unlock ()
210
+
211
+ // Restart the channel
212
+ mpc .restartChannel ()
213
+ })
214
+ }
215
+
216
+ // Start monitoring the channel
192
217
mpc .start ()
193
218
return mpc
194
219
}
@@ -247,13 +272,13 @@ func (mc *monitoredChannel) start() {
247
272
case datatransfer .SendDataError :
248
273
// If the transport layer reports an error sending data over the wire,
249
274
// attempt to restart the channel
250
- log . Warnf ("%s: data transfer transport send error, restarting data transfer" , mc .chid )
251
- go mc .restartChannelDebounced ()
275
+ err := xerrors . Errorf ("%s: data transfer transport send error, restarting data transfer" , mc .chid )
276
+ go mc .restartChannelDebounced (err )
252
277
case datatransfer .ReceiveDataError :
253
278
// If the transport layer reports an error receiving data over the wire,
254
279
// attempt to restart the channel
255
- log . Warnf ("%s: data transfer transport receive error, restarting data transfer" , mc .chid )
256
- go mc .restartChannelDebounced ()
280
+ err := xerrors . Errorf ("%s: data transfer transport receive error, restarting data transfer" , mc .chid )
281
+ go mc .restartChannelDebounced (err )
257
282
case datatransfer .FinishTransfer :
258
283
// The channel initiator has finished sending / receiving all data.
259
284
// Watch to make sure that the responder sends a message to acknowledge
@@ -355,7 +380,7 @@ func (mc *monitoredChannel) restartChannel() {
355
380
356
381
// Check if channel is already being restarted
357
382
if ! restartedAt .IsZero () {
358
- log .Infof ("%s: restart called but already restarting channel, " +
383
+ log .Debugf ("%s: restart called but already restarting channel, " +
359
384
"waiting to restart again (since %s; restart backoff is %s)" ,
360
385
mc .chid , time .Since (restartedAt ), mc .cfg .RestartBackoff )
361
386
return
@@ -397,7 +422,7 @@ func (mc *monitoredChannel) restartChannel() {
397
422
}
398
423
399
424
// There was a restart queued, restart again
400
- log .Infof ("%s: restart was queued - restarting again" , mc .chid )
425
+ log .Debugf ("%s: restart was queued - restarting again" , mc .chid )
401
426
}
402
427
}
403
428
@@ -416,7 +441,7 @@ func (mc *monitoredChannel) doRestartChannel() error {
416
441
}
417
442
418
443
// Send the restart message
419
- log .Infof ("%s: restarting (%d consecutive restarts)" , mc .chid , restartCount )
444
+ log .Debugf ("%s: restarting (%d consecutive restarts)" , mc .chid , restartCount )
420
445
err := mc .sendRestartMessage (restartCount )
421
446
if err != nil {
422
447
log .Warnf ("%s: restart failed, trying again: %s" , mc .chid , err )
@@ -433,17 +458,17 @@ func (mc *monitoredChannel) sendRestartMessage(restartCount int) error {
433
458
// Note that at the networking layer there is logic to retry if a network
434
459
// connection cannot be established, so this may take some time.
435
460
p := mc .chid .OtherParty (mc .mgr .PeerID ())
436
- log .Infof ("%s: re-establishing connection to %s" , mc .chid , p )
461
+ log .Debugf ("%s: re-establishing connection to %s" , mc .chid , p )
437
462
start := time .Now ()
438
463
err := mc .mgr .ConnectTo (mc .ctx , p )
439
464
if err != nil {
440
465
return xerrors .Errorf ("%s: failed to reconnect to peer %s after %s: %w" ,
441
466
mc .chid , p , time .Since (start ), err )
442
467
}
443
- log .Infof ("%s: re-established connection to %s in %s" , mc .chid , p , time .Since (start ))
468
+ log .Debugf ("%s: re-established connection to %s in %s" , mc .chid , p , time .Since (start ))
444
469
445
470
// Send a restart message for the channel
446
- log .Infof ("%s: sending restart message to %s (%d consecutive restarts)" , mc .chid , p , restartCount )
471
+ log .Debugf ("%s: sending restart message to %s (%d consecutive restarts)" , mc .chid , p , restartCount )
447
472
err = mc .mgr .RestartDataTransferChannel (mc .ctx , mc .chid )
448
473
if err != nil {
449
474
return xerrors .Errorf ("%s: failed to send restart message to %s: %w" , mc .chid , p , err )
@@ -453,11 +478,11 @@ func (mc *monitoredChannel) sendRestartMessage(restartCount int) error {
453
478
// If a restart backoff is configured, backoff after a restart before
454
479
// attempting another.
455
480
if mc .cfg .RestartBackoff > 0 {
456
- log .Infof ("%s: backing off %s before allowing any other restarts" ,
481
+ log .Debugf ("%s: backing off %s before allowing any other restarts" ,
457
482
mc .chid , mc .cfg .RestartBackoff )
458
483
select {
459
484
case <- time .After (mc .cfg .RestartBackoff ):
460
- log .Infof ("%s: restart back-off of %s complete" , mc .chid , mc .cfg .RestartBackoff )
485
+ log .Debugf ("%s: restart back-off of %s complete" , mc .chid , mc .cfg .RestartBackoff )
461
486
case <- mc .ctx .Done ():
462
487
return nil
463
488
}
0 commit comments