Skip to content

Commit d6c2eaf

Browse files
authored
Feat/cleanup errors (#90)
* feat(impl): cleanup errors cleanup errors -- move more error codes to proper types and insure single error dispatched * test(impl): add unit test for incomplete response
1 parent 8a1e460 commit d6c2eaf

File tree

5 files changed

+119
-4
lines changed

5 files changed

+119
-4
lines changed

errors.go

+9
Original file line numberDiff line numberDiff line change
@@ -24,3 +24,12 @@ const ErrPause = errorType("pause channel")
2424
// ErrResume is a special error that the RequestReceived / ResponseReceived hooks can
2525
// use to resume the channel
2626
const ErrResume = errorType("resume channel")
27+
28+
// ErrIncomplete indicates a channel did not finish transferring data successfully
29+
const ErrIncomplete = errorType("incomplete response")
30+
31+
// ErrRejected indicates a request was not accepted
32+
const ErrRejected = errorType("response rejected")
33+
34+
// ErrUnsupported indicates an operation is not supported by the transport protocol
35+
const ErrUnsupported = errorType("unsupported")

impl/events.go

+10-2
Original file line numberDiff line numberDiff line change
@@ -123,7 +123,7 @@ func (m *manager) OnResponseReceived(chid datatransfer.ChannelID, response datat
123123
}
124124
}
125125
if !response.Accepted() {
126-
return m.channels.Error(chid, errors.New("Response Rejected"))
126+
return m.channels.Error(chid, datatransfer.ErrRejected)
127127
}
128128
if response.IsNew() {
129129
err := m.channels.Accept(chid)
@@ -170,7 +170,15 @@ func (m *manager) OnChannelCompleted(chid datatransfer.ChannelID, success bool)
170170
}
171171
return m.channels.FinishTransfer(chid)
172172
}
173-
return m.channels.Error(chid, errors.New("incomplete response"))
173+
chst, err := m.channels.GetByID(context.TODO(), chid)
174+
if err != nil {
175+
return err
176+
}
177+
// send an error, but only if we haven't already errored for some reason
178+
if chst.Status() != datatransfer.Failing && chst.Status() != datatransfer.Failed {
179+
return m.channels.Error(chid, datatransfer.ErrIncomplete)
180+
}
181+
return nil
174182
}
175183

176184
func (m *manager) receiveNewRequest(

impl/impl.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -231,7 +231,7 @@ func (m *manager) PauseDataTransferChannel(ctx context.Context, chid datatransfe
231231

232232
pausable, ok := m.transport.(datatransfer.PauseableTransport)
233233
if !ok {
234-
return errors.New("unsupported")
234+
return datatransfer.ErrUnsupported
235235
}
236236

237237
err := pausable.PauseChannel(ctx, chid)
@@ -252,7 +252,7 @@ func (m *manager) PauseDataTransferChannel(ctx context.Context, chid datatransfe
252252
func (m *manager) ResumeDataTransferChannel(ctx context.Context, chid datatransfer.ChannelID) error {
253253
pausable, ok := m.transport.(datatransfer.PauseableTransport)
254254
if !ok {
255-
return errors.New("unsupported")
255+
return datatransfer.ErrUnsupported
256256
}
257257

258258
err := pausable.ResumeChannel(ctx, m.resumeMessage(chid), chid)

impl/integration_test.go

+77
Original file line numberDiff line numberDiff line change
@@ -890,6 +890,83 @@ func TestPauseAndResume(t *testing.T) {
890890
}
891891
}
892892

893+
func TestUnrecognizedVoucherRoundTrip(t *testing.T) {
894+
ctx := context.Background()
895+
testCases := map[string]bool{
896+
"push requests": false,
897+
"pull requests": true,
898+
}
899+
for testCase, isPull := range testCases {
900+
t.Run(testCase, func(t *testing.T) {
901+
// ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
902+
// defer cancel()
903+
904+
gsData := testutil.NewGraphsyncTestingData(ctx, t)
905+
host1 := gsData.Host1 // initiator, data sender
906+
host2 := gsData.Host2 // data recipient
907+
908+
tp1 := gsData.SetupGSTransportHost1()
909+
tp2 := gsData.SetupGSTransportHost2()
910+
911+
dt1, err := NewDataTransfer(gsData.DtDs1, gsData.DtNet1, tp1, gsData.StoredCounter1)
912+
require.NoError(t, err)
913+
err = dt1.Start(ctx)
914+
require.NoError(t, err)
915+
dt2, err := NewDataTransfer(gsData.DtDs2, gsData.DtNet2, tp2, gsData.StoredCounter2)
916+
require.NoError(t, err)
917+
err = dt2.Start(ctx)
918+
require.NoError(t, err)
919+
920+
finished := make(chan struct{}, 2)
921+
errChan := make(chan string, 2)
922+
opened := make(chan struct{}, 2)
923+
var subscriber datatransfer.Subscriber = func(event datatransfer.Event, channelState datatransfer.ChannelState) {
924+
if channelState.Status() == datatransfer.Failed {
925+
finished <- struct{}{}
926+
}
927+
if event.Code == datatransfer.Error {
928+
errChan <- channelState.Message()
929+
}
930+
if event.Code == datatransfer.Open {
931+
opened <- struct{}{}
932+
}
933+
}
934+
dt1.SubscribeToEvents(subscriber)
935+
dt2.SubscribeToEvents(subscriber)
936+
voucher := testutil.FakeDTType{Data: "applesauce"}
937+
938+
root, _ := testutil.LoadUnixFSFile(ctx, t, gsData.DagService1)
939+
rootCid := root.(cidlink.Link).Cid
940+
941+
if isPull {
942+
_, err = dt2.OpenPullDataChannel(ctx, host1.ID(), &voucher, rootCid, gsData.AllSelector)
943+
} else {
944+
_, err = dt1.OpenPushDataChannel(ctx, host2.ID(), &voucher, rootCid, gsData.AllSelector)
945+
}
946+
require.NoError(t, err)
947+
opens := 0
948+
var errMessages []string
949+
finishes := 0
950+
for opens < 1 || finishes < 1 {
951+
select {
952+
case <-ctx.Done():
953+
t.Fatal("Did not complete succcessful data transfer")
954+
case <-finished:
955+
finishes++
956+
case <-opened:
957+
opens++
958+
case errMessage := <-errChan:
959+
require.Equal(t, errMessage, datatransfer.ErrRejected.Error())
960+
errMessages = append(errMessages, errMessage)
961+
if len(errMessages) > 1 {
962+
t.Fatal("too many errors")
963+
}
964+
}
965+
}
966+
})
967+
}
968+
}
969+
893970
func TestDataTransferSubscribing(t *testing.T) {
894971
// create network
895972
ctx := context.Background()

impl/responding_test.go

+21
Original file line numberDiff line numberDiff line change
@@ -482,6 +482,27 @@ func TestDataTransferResponding(t *testing.T) {
482482
require.False(t, response.IsPaused())
483483
},
484484
},
485+
"validated, incomplete response": {
486+
expectedEvents: []datatransfer.EventCode{
487+
datatransfer.Open,
488+
datatransfer.NewVoucherResult,
489+
datatransfer.Accept,
490+
datatransfer.Error,
491+
datatransfer.CleanupComplete,
492+
},
493+
configureValidator: func(sv *testutil.StubbedValidator) {
494+
sv.ExpectSuccessPull()
495+
sv.StubResult(testutil.NewFakeDTType())
496+
},
497+
configureRevalidator: func(srv *testutil.StubbedRevalidator) {
498+
},
499+
verify: func(t *testing.T, h *receiverHarness) {
500+
_, err := h.transport.EventHandler.OnRequestReceived(channelID(h.id, h.peers), h.pullRequest)
501+
require.NoError(t, err)
502+
err = h.transport.EventHandler.OnChannelCompleted(channelID(h.id, h.peers), false)
503+
require.NoError(t, err)
504+
},
505+
},
485506
"new push request, customized transport": {
486507
expectedEvents: []datatransfer.EventCode{datatransfer.Open, datatransfer.NewVoucherResult, datatransfer.Accept},
487508
configureValidator: func(sv *testutil.StubbedValidator) {

0 commit comments

Comments
 (0)