Skip to content

Commit eee8d1c

Browse files
authored
Handle network errors/stalls (#101)
* feat(datatransfer): handle network errors Handle errors when network messages don't go through * fix(datatransfer): cleanups lint, remove unused status
1 parent ad43f2d commit eee8d1c

14 files changed

+225
-30
lines changed

Diff for: channels/channels_fsm.go

+8-2
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,10 @@ var log = logging.Logger("data-transfer")
1717
var ChannelEvents = fsm.Events{
1818
fsm.Event(datatransfer.Open).FromAny().To(datatransfer.Requested),
1919
fsm.Event(datatransfer.Accept).From(datatransfer.Requested).To(datatransfer.Ongoing),
20-
fsm.Event(datatransfer.Restart).FromAny().To(datatransfer.Ongoing),
20+
fsm.Event(datatransfer.Restart).FromAny().ToNoChange().Action(func(chst *internal.ChannelState) error {
21+
chst.Message = ""
22+
return nil
23+
}),
2124

2225
fsm.Event(datatransfer.Cancel).FromAny().To(datatransfer.Cancelling),
2326

@@ -46,7 +49,10 @@ var ChannelEvents = fsm.Events{
4649
return nil
4750
}),
4851

49-
fsm.Event(datatransfer.Disconnected).FromAny().To(datatransfer.PeerDisconnected),
52+
fsm.Event(datatransfer.Disconnected).FromAny().ToNoChange().Action(func(chst *internal.ChannelState) error {
53+
chst.Message = datatransfer.ErrDisconnected.Error()
54+
return nil
55+
}),
5056

5157
fsm.Event(datatransfer.Error).FromAny().To(datatransfer.Failing).Action(func(chst *internal.ChannelState, err error) error {
5258
chst.Message = err.Error()

Diff for: channels/channels_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -303,7 +303,7 @@ func TestChannels(t *testing.T) {
303303
err = channelList.Disconnected(chid)
304304
require.NoError(t, err)
305305
state = checkEvent(ctx, t, received, datatransfer.Disconnected)
306-
require.Equal(t, datatransfer.PeerDisconnected, state.Status())
306+
require.Equal(t, datatransfer.ErrDisconnected.Error(), state.Message())
307307
})
308308

309309
t.Run("test self peer and other peer", func(t *testing.T) {

Diff for: errors.go

+6
Original file line numberDiff line numberDiff line change
@@ -33,3 +33,9 @@ const ErrRejected = errorType("response rejected")
3333

3434
// ErrUnsupported indicates an operation is not supported by the transport protocol
3535
const ErrUnsupported = errorType("unsupported")
36+
37+
// ErrDisconnected indicates the other peer may have hung up and you should try restarting the channel.
38+
const ErrDisconnected = errorType("other peer appears to have hung up. restart Channel")
39+
40+
// ErrRemoved indicates the channel was inactive long enough that it was put in a permaneant error state
41+
const ErrRemoved = errorType("channel removed due to inactivity")

Diff for: go.mod

+1-1
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ require (
1313
github.com/ipfs/go-blockservice v0.1.3
1414
github.com/ipfs/go-cid v0.0.7
1515
github.com/ipfs/go-datastore v0.4.5
16-
github.com/ipfs/go-graphsync v0.2.1
16+
github.com/ipfs/go-graphsync v0.2.1-0.20201013053840-5d8ea8076a2c
1717
github.com/ipfs/go-ipfs-blockstore v1.0.1
1818
github.com/ipfs/go-ipfs-blocksutil v0.0.1
1919
github.com/ipfs/go-ipfs-chunker v0.0.5

Diff for: go.sum

+2-2
Original file line numberDiff line numberDiff line change
@@ -198,8 +198,8 @@ github.com/ipfs/go-ds-badger v0.0.5/go.mod h1:g5AuuCGmr7efyzQhLL8MzwqcauPojGPUaH
198198
github.com/ipfs/go-ds-badger v0.2.1/go.mod h1:Tx7l3aTph3FMFrRS838dcSJh+jjA7cX9DrGVwx/NOwE=
199199
github.com/ipfs/go-ds-leveldb v0.0.1/go.mod h1:feO8V3kubwsEF22n0YRQCffeb79OOYIykR4L04tMOYc=
200200
github.com/ipfs/go-ds-leveldb v0.4.1/go.mod h1:jpbku/YqBSsBc1qgME8BkWS4AxzF2cEu1Ii2r79Hh9s=
201-
github.com/ipfs/go-graphsync v0.2.1 h1:MdehhqBSuTI2LARfKLkpYnt0mUrqHs/mtuDnESXHBfU=
202-
github.com/ipfs/go-graphsync v0.2.1/go.mod h1:gEBvJUNelzMkaRPJTpg/jaKN4AQW/7wDWu0K92D8o10=
201+
github.com/ipfs/go-graphsync v0.2.1-0.20201013053840-5d8ea8076a2c h1:De/AZGvRa3WMyw5zdMMhcvRcho46BVo+C0NRud+T4io=
202+
github.com/ipfs/go-graphsync v0.2.1-0.20201013053840-5d8ea8076a2c/go.mod h1:gEBvJUNelzMkaRPJTpg/jaKN4AQW/7wDWu0K92D8o10=
203203
github.com/ipfs/go-ipfs-blockstore v0.0.1/go.mod h1:d3WClOmRQKFnJ0Jz/jj/zmksX0ma1gROTlovZKBmN08=
204204
github.com/ipfs/go-ipfs-blockstore v0.1.0/go.mod h1:5aD0AvHPi7mZc6Ci1WCAhiBQu2IsfTduLl+422H6Rqw=
205205
github.com/ipfs/go-ipfs-blockstore v0.1.4 h1:2SGI6U1B44aODevza8Rde3+dY30Pb+lbcObe1LETxOQ=

Diff for: impl/events.go

+33-5
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,6 @@ import (
1717
"github.com/filecoin-project/go-data-transfer/registry"
1818
)
1919

20-
var ChannelRemoveTimeout = 1 * time.Hour
21-
2220
func (m *manager) OnChannelOpened(chid datatransfer.ChannelID) error {
2321
has, err := m.channels.HasChannel(chid)
2422
if err != nil {
@@ -170,12 +168,42 @@ func (m *manager) OnRequestTimedOut(ctx context.Context, chid datatransfer.Chann
170168
go func() {
171169
select {
172170
case <-ctx.Done():
173-
case <-time.After(ChannelRemoveTimeout):
171+
case <-time.After(m.channelRemoveTimeout):
172+
channel, err := m.channels.GetByID(ctx, chid)
173+
if err == nil {
174+
if !(channels.IsChannelTerminated(channel.Status()) ||
175+
channels.IsChannelCleaningUp(channel.Status())) {
176+
if err := m.channels.Error(chid, datatransfer.ErrRemoved); err != nil {
177+
log.Errorf("failed to cancel timed-out channel: %v", err)
178+
return
179+
}
180+
log.Warnf("channel %+v has ben cancelled because of timeout", chid)
181+
}
182+
}
183+
}
184+
}()
185+
186+
return nil
187+
}
188+
189+
func (m *manager) OnRequestDisconnected(ctx context.Context, chid datatransfer.ChannelID) error {
190+
log.Warnf("channel %+v has stalled or disconnected", chid)
191+
192+
// mark peer disconnected for informational purposes
193+
err := m.channels.Disconnected(chid)
194+
if err != nil {
195+
return err
196+
}
197+
198+
go func() {
199+
select {
200+
case <-ctx.Done():
201+
case <-time.After(m.channelRemoveTimeout):
174202
channel, err := m.channels.GetByID(ctx, chid)
175203
if err == nil {
176204
if !(channels.IsChannelTerminated(channel.Status()) ||
177205
channels.IsChannelCleaningUp(channel.Status())) {
178-
if err := m.channels.Cancel(chid); err != nil {
206+
if err := m.channels.Error(chid, datatransfer.ErrRemoved); err != nil {
179207
log.Errorf("failed to cancel timed-out channel: %v", err)
180208
return
181209
}
@@ -198,7 +226,7 @@ func (m *manager) OnChannelCompleted(chid datatransfer.ChannelID, success bool)
198226
if msg != nil {
199227
if err := m.dataTransferNetwork.SendMessage(context.TODO(), chid.Initiator, msg); err != nil {
200228
log.Warnf("failed to send completion message, err : %v", err)
201-
return m.channels.Disconnected(chid)
229+
return m.OnRequestDisconnected(context.TODO(), chid)
202230
}
203231
}
204232
if msg.Accepted() {

Diff for: impl/impl.go

+22-4
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"context"
55
"errors"
66
"fmt"
7+
"time"
78

89
"github.com/hannahhoward/go-pubsub"
910
"github.com/hashicorp/go-multierror"
@@ -39,6 +40,7 @@ type manager struct {
3940
peerID peer.ID
4041
transport datatransfer.Transport
4142
storedCounter *storedcounter.StoredCounter
43+
channelRemoveTimeout time.Duration
4244
}
4345

4446
type internalEvent struct {
@@ -72,8 +74,20 @@ func readyDispatcher(evt pubsub.Event, fn pubsub.SubscriberFn) error {
7274
return nil
7375
}
7476

77+
// DataTransferOption configures the data transfer manager
78+
type DataTransferOption func(*manager)
79+
80+
// ChannelRemoveTimeout sets the timeout after which channels are removed from the manager
81+
func ChannelRemoveTimeout(timeout time.Duration) DataTransferOption {
82+
return func(m *manager) {
83+
m.channelRemoveTimeout = timeout
84+
}
85+
}
86+
87+
const defaultChannelRemoveTimeout = 1 * time.Hour
88+
7589
// NewDataTransfer initializes a new instance of a data transfer manager
76-
func NewDataTransfer(ds datastore.Batching, dataTransferNetwork network.DataTransferNetwork, transport datatransfer.Transport, storedCounter *storedcounter.StoredCounter) (datatransfer.Manager, error) {
90+
func NewDataTransfer(ds datastore.Batching, dataTransferNetwork network.DataTransferNetwork, transport datatransfer.Transport, storedCounter *storedcounter.StoredCounter, options ...DataTransferOption) (datatransfer.Manager, error) {
7791
m := &manager{
7892
dataTransferNetwork: dataTransferNetwork,
7993
validatedTypes: registry.NewRegistry(),
@@ -85,12 +99,16 @@ func NewDataTransfer(ds datastore.Batching, dataTransferNetwork network.DataTran
8599
peerID: dataTransferNetwork.ID(),
86100
transport: transport,
87101
storedCounter: storedCounter,
102+
channelRemoveTimeout: defaultChannelRemoveTimeout,
88103
}
89104
channels, err := channels.New(ds, m.notifier, m.voucherDecoder, m.resultTypes.Decoder, &channelEnvironment{m}, dataTransferNetwork.ID())
90105
if err != nil {
91106
return nil, err
92107
}
93108
m.channels = channels
109+
for _, option := range options {
110+
option(m)
111+
}
94112
return m, nil
95113
}
96114

@@ -230,7 +248,7 @@ func (m *manager) SendVoucher(ctx context.Context, channelID datatransfer.Channe
230248
}
231249
if err := m.dataTransferNetwork.SendMessage(ctx, chst.OtherPeer(), updateRequest); err != nil {
232250
err = fmt.Errorf("Unable to send request: %w", err)
233-
_ = m.channels.Error(channelID, err)
251+
_ = m.OnRequestDisconnected(ctx, channelID)
234252
return err
235253
}
236254
return m.channels.NewVoucher(channelID, voucher)
@@ -249,7 +267,7 @@ func (m *manager) CloseDataTransferChannel(ctx context.Context, chid datatransfe
249267

250268
if err := m.dataTransferNetwork.SendMessage(ctx, chst.OtherPeer(), m.cancelMessage(chid)); err != nil {
251269
err = fmt.Errorf("Unable to send cancel message: %w", err)
252-
_ = m.channels.Error(chid, err)
270+
_ = m.OnRequestDisconnected(ctx, chid)
253271
return err
254272
}
255273

@@ -271,7 +289,7 @@ func (m *manager) PauseDataTransferChannel(ctx context.Context, chid datatransfe
271289

272290
if err := m.dataTransferNetwork.SendMessage(ctx, chid.OtherParty(m.peerID), m.pauseMessage(chid)); err != nil {
273291
err = fmt.Errorf("Unable to send pause message: %w", err)
274-
_ = m.channels.Error(chid, err)
292+
_ = m.OnRequestDisconnected(ctx, chid)
275293
return err
276294
}
277295

Diff for: impl/initiating_test.go

+15-8
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ func TestDataTransferInitiating(t *testing.T) {
2828
ctx := context.Background()
2929
testCases := map[string]struct {
3030
expectedEvents []datatransfer.EventCode
31+
options []DataTransferOption
3132
verify func(t *testing.T, h *harness)
3233
}{
3334
"OpenPushDataTransfer": {
@@ -83,21 +84,27 @@ func TestDataTransferInitiating(t *testing.T) {
8384
},
8485
},
8586
"Remove Timed-out request": {
86-
expectedEvents: []datatransfer.EventCode{datatransfer.Open, datatransfer.Cancel, datatransfer.CleanupComplete},
87+
expectedEvents: []datatransfer.EventCode{datatransfer.Open, datatransfer.Error, datatransfer.CleanupComplete},
88+
options: []DataTransferOption{ChannelRemoveTimeout(10 * time.Millisecond)},
8789
verify: func(t *testing.T, h *harness) {
88-
orig := ChannelRemoveTimeout
89-
ChannelRemoveTimeout = 10 * time.Millisecond
90-
defer func() {
91-
ChannelRemoveTimeout = orig
92-
}()
93-
9490
channelID, err := h.dt.OpenPullDataChannel(h.ctx, h.peers[1], h.voucher, h.baseCid, h.stor)
9591
require.NoError(t, err)
9692
require.NoError(t, h.transport.EventHandler.OnRequestTimedOut(ctx, channelID))
9793
// need time for the events to take place
9894
time.Sleep(1 * time.Second)
9995
},
10096
},
97+
"Remove disconnected request": {
98+
expectedEvents: []datatransfer.EventCode{datatransfer.Open, datatransfer.Disconnected, datatransfer.Error, datatransfer.CleanupComplete},
99+
options: []DataTransferOption{ChannelRemoveTimeout(10 * time.Millisecond)},
100+
verify: func(t *testing.T, h *harness) {
101+
channelID, err := h.dt.OpenPullDataChannel(h.ctx, h.peers[1], h.voucher, h.baseCid, h.stor)
102+
require.NoError(t, err)
103+
require.NoError(t, h.transport.EventHandler.OnRequestDisconnected(ctx, channelID))
104+
// need time for the events to take place
105+
time.Sleep(1 * time.Second)
106+
},
107+
},
101108
"SendVoucher with no channel open": {
102109
verify: func(t *testing.T, h *harness) {
103110
err := h.dt.SendVoucher(h.ctx, datatransfer.ChannelID{Initiator: h.peers[1], Responder: h.peers[0], ID: 999999}, h.voucher)
@@ -344,7 +351,7 @@ func TestDataTransferInitiating(t *testing.T) {
344351
h.transport = testutil.NewFakeTransport()
345352
h.ds = dss.MutexWrap(datastore.NewMapDatastore())
346353
h.storedCounter = storedcounter.New(h.ds, datastore.NewKey("counter"))
347-
dt, err := NewDataTransfer(h.ds, h.network, h.transport, h.storedCounter)
354+
dt, err := NewDataTransfer(h.ds, h.network, h.transport, h.storedCounter, verify.options...)
348355
require.NoError(t, err)
349356
testutil.StartAndWaitForReady(ctx, t, dt)
350357
h.dt = dt

Diff for: impl/restart.go

+3-2
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import (
1414
"github.com/filecoin-project/go-data-transfer/message"
1515
)
1616

17+
// ChannelDataTransferType identifies the type of a data transfer channel for the purposes of a restart
1718
type ChannelDataTransferType int
1819

1920
const (
@@ -23,10 +24,10 @@ const (
2324
// ManagerPeerCreatePush is the type of a channel wherein the manager peer created a Push Data Transfer
2425
ManagerPeerCreatePush
2526

26-
// ManagerPeerCreatePush is the type of a channel wherein the manager peer received a Pull Data Transfer Request
27+
// ManagerPeerReceivePull is the type of a channel wherein the manager peer received a Pull Data Transfer Request
2728
ManagerPeerReceivePull
2829

29-
// ManagerPeerCreatePush is the type of a channel wherein the manager peer received a Push Data Transfer Request
30+
// ManagerPeerReceivePush is the type of a channel wherein the manager peer received a Push Data Transfer Request
3031
ManagerPeerReceivePush
3132
)
3233

Diff for: statuses.go

-3
Original file line numberDiff line numberDiff line change
@@ -58,9 +58,6 @@ const (
5858

5959
// ChannelNotFoundError means the searched for data transfer does not exist
6060
ChannelNotFoundError
61-
62-
// PeerDisconnected means that we do NOT have a connection to the other peer
63-
PeerDisconnected
6461
)
6562

6663
// Statuses are human readable names for data transfer states

Diff for: testutil/fakegraphsync.go

+14
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,8 @@ type FakeGraphSync struct {
110110
RequestUpdatedHook graphsync.OnRequestUpdatedHook
111111
IncomingResponseHook graphsync.OnIncomingResponseHook
112112
RequestorCancelledListener graphsync.OnRequestorCancelledListener
113+
BlockSentListener graphsync.OnBlockSentListener
114+
NetworkErrorListener graphsync.OnNetworkErrorListener
113115
}
114116

115117
// NewFakeGraphSync returns a new fake graphsync implementation
@@ -352,6 +354,18 @@ func (fgs *FakeGraphSync) RegisterRequestorCancelledListener(listener graphsync.
352354
return nil
353355
}
354356

357+
// RegisterBlockSentListener adds a listener on the responder as blocks go out
358+
func (fgs *FakeGraphSync) RegisterBlockSentListener(listener graphsync.OnBlockSentListener) graphsync.UnregisterHookFunc {
359+
fgs.BlockSentListener = listener
360+
return nil
361+
}
362+
363+
// RegisterNetworkErrorListener adds a listener on the responder as blocks go out
364+
func (fgs *FakeGraphSync) RegisterNetworkErrorListener(listener graphsync.OnNetworkErrorListener) graphsync.UnregisterHookFunc {
365+
fgs.NetworkErrorListener = listener
366+
return nil
367+
}
368+
355369
var _ graphsync.GraphExchange = &FakeGraphSync{}
356370

357371
type fakeBlkData struct {

Diff for: transport.go

+4
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,10 @@ type EventsHandler interface {
5151
// OnRequestTimedOut is called when a request we opened (with the given channel Id) to receive data times out.
5252
// Error returns are logged but otherwise have no effect
5353
OnRequestTimedOut(ctx context.Context, chid ChannelID) error
54+
55+
// OnRequestDisconnected is called when a network error occurs in a graphsync request
56+
// or we appear to stall while receiving data
57+
OnRequestDisconnected(ctx context.Context, chid ChannelID) error
5458
}
5559

5660
/*

Diff for: transport/graphsync/graphsync.go

+14
Original file line numberDiff line numberDiff line change
@@ -284,6 +284,7 @@ func (t *Transport) SetEventHandler(events datatransfer.EventsHandler) error {
284284
t.gs.RegisterIncomingResponseHook(t.gsIncomingResponseHook)
285285
t.gs.RegisterRequestUpdatedHook(t.gsRequestUpdatedHook)
286286
t.gs.RegisterRequestorCancelledListener(t.gsRequestorCancelledListener)
287+
t.gs.RegisterNetworkErrorListener(t.gsNetworkErrorListener)
287288
return nil
288289
}
289290

@@ -597,3 +598,16 @@ func (t *Transport) gsRequestorCancelledListener(p peer.ID, request graphsync.Re
597598
t.requestorCancelledMap[chid] = struct{}{}
598599
}
599600
}
601+
602+
func (t *Transport) gsNetworkErrorListener(p peer.ID, request graphsync.RequestData, err error) {
603+
t.dataLock.Lock()
604+
defer t.dataLock.Unlock()
605+
606+
chid, ok := t.graphsyncRequestMap[graphsyncKey{request.ID(), p}]
607+
if ok {
608+
err := t.events.OnRequestDisconnected(context.TODO(), chid)
609+
if err != nil {
610+
log.Error(err)
611+
}
612+
}
613+
}

0 commit comments

Comments
 (0)